redmine

Few bsm support fixes

... ... @@ -26,6 +26,14 @@
# include <sys/capability.h> // __u32
#endif
#define MAX_BLOCKTHREADS (1<<4)
#define register_blockthread(thread) {\
critical_on (ctx_p->blockthread_count >= MAX_BLOCKTHREADS);\
ctx_p->blockthread[ctx_p->blockthread_count++] = pthread_self();\
debug(3, "register_blockthread(): ctx_p->blockthread_count -> %i", ctx_p->blockthread_count);\
}
#define OPTION_FLAGS (1<<10)
#define OPTION_LONGOPTONLY (1<<9)
#define OPTION_CONFIGONLY (1<<8)
... ... @@ -362,6 +370,8 @@ struct ctx {
unsigned int synctimeout;
sigset_t *sigset;
char isignoredexitcode[(1<<8)];
pthread_t blockthread[MAX_BLOCKTHREADS];
size_t blockthread_count;
char *chroot_dir;
... ...
... ... @@ -41,6 +41,7 @@ struct mondata {
FILE *pipe;
int config_fd;
size_t event_count;
size_t event_count_wasinqueue;
size_t event_alloc;
struct bsm_event *event;
};
... ... @@ -71,6 +72,7 @@ struct recognize_event_return {
pthread_t prefetcher_thread;
pthread_mutex_t bsm_mutex_prefetcher = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t bsm_cond_gotevent = PTHREAD_COND_INITIALIZER;
pthread_cond_t bsm_cond_queueend = PTHREAD_COND_INITIALIZER;
int bsm_queue_len;
... ... @@ -344,6 +346,7 @@ int bsm_init(ctx_t *ctx_p) {
case NE_BSM_PREFETCH:
pthread_mutex_init(&bsm_mutex_prefetcher, NULL);
pthread_cond_init (&bsm_cond_gotevent, NULL);
pthread_cond_init (&bsm_cond_queueend, NULL);
bsm_wait = bsm_wait_prefetched;
bsm_handle = bsm_handle_prefetched;
... ... @@ -357,10 +360,14 @@ int bsm_init(ctx_t *ctx_p) {
}
int select_rfd(int fd, struct timeval *timeout_p) {
int rc;
debug(9, "%i, {%li, %li}", fd, timeout_p == NULL ? -1 : timeout_p->tv_sec, timeout_p == NULL ? 0 : timeout_p->tv_usec);
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
return select(fd+1, &rfds, NULL, NULL, timeout_p);
rc = select(fd+1, &rfds, NULL, NULL, timeout_p);
debug(9, "rc -> %i", rc);
return rc;
}
int bsm_fetch(ctx_t *ctx_p, indexes_t *indexes_p, struct bsm_event *event_p, int pipe_fd, struct timeval *timeout_p, struct timeval *timeout_abs_p) {
... ... @@ -370,7 +377,6 @@ int bsm_fetch(ctx_t *ctx_p, indexes_t *indexes_p, struct bsm_event *event_p, int
tokenstr_t tok;
int recalc_timeout;
static int dont_wait = 0;
static int event_count_wasinqueue = 0;
struct timeval tv_abs;
struct timeval timeout_zero = {0};
mondata_t *mondata = ctx_p->fsmondata;
... ... @@ -399,11 +405,18 @@ int bsm_fetch(ctx_t *ctx_p, indexes_t *indexes_p, struct bsm_event *event_p, int
rc = select_rfd(pipe_fd, &timeout_zero);
if (rc == 0) {
dont_wait = 0;
event_count_wasinqueue = 1;
mondata->event_count_wasinqueue = 0;
switch (ctx_p->flags[MONITOR]) {
case NE_BSM_PREFETCH:
pthread_cond_broadcast(&bsm_cond_queueend);
break;
default:
break;
}
} else
if (rc > 0) {
event_count_wasinqueue++;
if (event_count_wasinqueue+1 >= bsm_queue_len)
mondata->event_count_wasinqueue++;
if (mondata->event_count_wasinqueue+1 >= bsm_queue_len)
critical_or_warning(ctx_p->flags[EXITONSYNCSKIP], "The was too many events in BSM queue (reached kernel BSM queue limit: %u).", bsm_queue_len);
}
}
... ... @@ -425,14 +438,18 @@ int bsm_fetch(ctx_t *ctx_p, indexes_t *indexes_p, struct bsm_event *event_p, int
recalc_timeout);
rc = select_rfd(pipe_fd, timeout_p);
if (rc > 0)
mondata->event_count_wasinqueue++;
if (recalc_timeout == 1)
recalc_timeout++;
}
critical_on (rc == -1);
if (rc == 0)
critical_on ((rc == -1) && (errno != EINTR));
if (rc == 0 || rc == -1) {
debug(3, "rc == %i; errno == %i; return 0", rc, errno);
return 0;
}
dont_wait = 1;
... ... @@ -540,7 +557,7 @@ int bsm_handle_allevents(struct ctx *ctx_p, struct indexes *indexes_p, bsm_handl
#ifdef PARANOID
if (!*event_p->path && !*event_p->path_to) {
warning("no events are parsed (event_p == %p).", event_p);
warning("no events are parsed (event_p == %p; mondata->event_count == %i).", event_p, mondata->event_count);
continue;
}
#endif
... ... @@ -591,9 +608,11 @@ int bsm_handle_allevents(struct ctx *ctx_p, struct indexes *indexes_p, bsm_handl
}
switch (how) {
case BSM_HANDLE_CALLWAIT:
debug(15, "BSM_HANDLE_CALLWAIT");
left_count = bsm_wait(ctx_p, indexes_p, &tv);
break;
case BSM_HANDLE_ITERATE:
debug(15, "BSM_HANDLE_ITERATE");
event_num++;
left_count = mondata->event_count - event_num;
break;
... ... @@ -625,22 +644,34 @@ int bsm_handle_allevents(struct ctx *ctx_p, struct indexes *indexes_p, bsm_handl
// Moving events from local queue to global ones
sync_prequeue_unload(ctx_p, indexes_p);
debug(4, "Result count: %i -> %i", count, mondata->event_count);
debug(4, "Result processed count: %i (left, mondata->event_count == %i)", count, mondata->event_count);
if (count == -1)
return -1;
return count;
}
void bsm_prefetcher_sig_int(int signal) {
debug(2, "signal -> %i. Sending pthread_cond_broadcast() to bsm_cond_gotevent and bsm_cond_queueend.", signal);
pthread_cond_broadcast(&bsm_cond_gotevent);
pthread_cond_broadcast(&bsm_cond_queueend);
return;
}
static int bsm_prefetcher_running=2;
int bsm_prefetcher(struct ctx *ctx_p) {
mondata_t *mondata = ctx_p->fsmondata;
indexes_t *indexes_p = ctx_p->indexes_p;
struct bsm_event event, *event_p;
register_blockthread();
signal(SIGUSR_BLOPINT, bsm_prefetcher_sig_int);
int pipe_fd = fileno(mondata->pipe);
mondata->event = xcalloc(sizeof(*mondata->event), ALLOC_PORTION);
while (42) {
bsm_prefetcher_running = 1;
while (bsm_prefetcher_running) {
if (bsm_fetch(ctx_p, indexes_p, &event, pipe_fd, NULL, NULL) > 0) {
// Pushing the event
debug(5, "We have an event. Pushing.");
... ... @@ -648,7 +679,7 @@ int bsm_prefetcher(struct ctx *ctx_p) {
critical_on (mondata->event_count >= BSM_QUEUE_LENGTH_MAX);
#endif
if (mondata->event_count >= mondata->event_alloc) {
debug(2, "Increasing queue length: %u -> %u (limit is "TOSTR(BSM_QUEUE_LENGTH_MAX)")", mondata->event_alloc, mondata->event_alloc+ALLOC_PORTION);
debug(2, "Increasing queue length: %u -> %u (limit is "XTOSTR(BSM_QUEUE_LENGTH_MAX)")", mondata->event_alloc, mondata->event_alloc+ALLOC_PORTION);
mondata->event_alloc += ALLOC_PORTION;
mondata->event = xrealloc(mondata->event, mondata->event_alloc*sizeof(*mondata->event));
memset(&mondata->event[mondata->event_count], 0, sizeof(*mondata->event)*(mondata->event_alloc - mondata->event_count));
... ... @@ -665,8 +696,7 @@ int bsm_prefetcher(struct ctx *ctx_p) {
}
}
critical ("This code shouldn't be reached");
return -1;
return 0;
}
int bsm_wait_prefetched(struct ctx *ctx_p, struct indexes *indexes_p, struct timeval *timeout_p) {
debug(3, "(ctx_p, indexes_p, %p {%u, %u})", timeout_p, timeout_p == NULL?-1:timeout_p->tv_sec, timeout_p == NULL?0:timeout_p->tv_usec);
... ... @@ -696,8 +726,11 @@ int bsm_wait_prefetched(struct ctx *ctx_p, struct indexes *indexes_p, struct tim
}
if (timeout_p->tv_sec == 0 && timeout_p->tv_sec == 0) {
debug(2, "Zero timeout. Waiting for the current queue to be processed.")
pthread_cond_wait(&bsm_cond_queueend, &bsm_mutex_prefetcher);
pthread_mutex_unlock(&bsm_mutex_prefetcher);
return 0;
debug(3, "return mondata->event_count == %i", mondata->event_count);
return mondata->event_count;
}
//l_pthread_cond_timedwait_restart:
... ... @@ -720,10 +753,11 @@ int bsm_wait_prefetched(struct ctx *ctx_p, struct indexes *indexes_p, struct tim
}
pthread_mutex_unlock(&bsm_mutex_prefetcher);
#ifdef PARANOID
/*#ifdef PARANOID
critical_on (!mondata->event_count);
#endif
debug(2, "Got an event. mondata->event_count == %i", mondata->event_count);
#endif*/
debug(2, "%s. mondata->event_count == %i", mondata->event_count?"Got an event":"Got signal SIGUSR_BLOPINT", mondata->event_count);
return mondata->event_count;
}
int bsm_handle_prefetched(struct ctx *ctx_p, struct indexes *indexes_p) {
... ... @@ -783,9 +817,16 @@ int bsm_add_watch_dir(struct ctx *ctx_p, struct indexes *indexes_p, const char *
return id++;
}
int bsm_deinit(ctx_t *ctx_p) {
void *ret;
int rc = 0;
mondata_t *mondata = ctx_p->fsmondata;
bsm_prefetcher_running = 0;
pthread_kill(prefetcher_thread, SIGUSR_BLOPINT);
pthread_cond_destroy (&bsm_cond_gotevent);
pthread_mutex_destroy(&bsm_mutex_prefetcher);
pthread_join(prefetcher_thread, &ret);
rc |= fclose(mondata->pipe);
rc |= bsm_config_revert(mondata);
... ... @@ -795,11 +836,6 @@ int bsm_deinit(ctx_t *ctx_p) {
rc |= auditd_restart();
if (pthread_kill(prefetcher_thread, 9)) {
pthread_cond_destroy (&bsm_cond_gotevent);
pthread_mutex_destroy(&bsm_mutex_prefetcher);
}
return rc;
}
... ...
... ... @@ -843,6 +843,7 @@ int privileged_handler(ctx_t *ctx_p)
critical_on(!parent_isalive());
# endif
} else {
register_blockthread();
pthread_setname_np(pthread_self(), "clsync-helper");
}
cap_drop(ctx_p, ctx_p->caps);
... ...
... ... @@ -3211,12 +3211,22 @@ void sync_sig_int(int signal) {
return;
}
int sync_tryforcecycle(pthread_t pthread_parent) {
debug(3, "sending signal to interrupt blocking operations like select()-s and so on");
pthread_kill(pthread_parent, SIGUSR_BLOPINT);
#ifdef PARANOID
int i=0;
if (++i > KILL_TIMEOUT) {
int _sync_tryforcecycle_i;
#endif
int sync_tryforcecycle(ctx_t *ctx_p, pthread_t pthread_parent) {
debug(3, "sending signal to interrupt blocking operations like select()-s and so on (ctx_p->blockthread_count == %i)", ctx_p->blockthread_count);
//pthread_kill(pthread_parent, SIGUSR_BLOPINT);
int i, count;
count = ctx_p->blockthread_count;
i = 0;
while (i < count) {
debug(2, "Sending SIGUSR_BLOPINT to thread %p", ctx_p->blockthread[i]);
pthread_kill(ctx_p->blockthread[i], SIGUSR_BLOPINT);
i++;
}
#ifdef PARANOID
if (++_sync_tryforcecycle_i > KILL_TIMEOUT) {
error("Seems we got a deadlock.");
return EDEADLK;
}
... ... @@ -3231,20 +3241,20 @@ int sync_tryforcecycle(pthread_t pthread_parent) {
if (pthread_cond_timedwait(pthread_cond_state, pthread_mutex_state, &time_timeout) != ETIMEDOUT)
return 0;
#else
debug(9, "sleep("TOSTR(SLEEP_SECONDS)")");
debug(9, "sleep("XTOSTR(SLEEP_SECONDS)")");
sleep(SLEEP_SECONDS); // TODO: replace this with pthread_cond_timedwait()
#endif
return EINPROGRESS;
}
int sync_switch_state(pthread_t pthread_parent, int newstate) {
int sync_switch_state(ctx_t *ctx_p, pthread_t pthread_parent, int newstate) {
if (state_p == NULL) {
debug(3, "sync_switch_state(%p, %i), but state_p == NULL", pthread_parent, newstate);
debug(3, "sync_switch_state(ctx_p, %p, %i), but state_p == NULL", pthread_parent, newstate);
return 0;
}
debug(3, "sync_switch_state(%p, %i)", pthread_parent, newstate);
debug(3, "sync_switch_state(ctx_p, %p, %i)", pthread_parent, newstate);
// Getting mutexes
threadsinfo_t *threadsinfo_p = thread_info();
... ... @@ -3261,17 +3271,23 @@ int sync_switch_state(pthread_t pthread_parent, int newstate) {
pthread_cond_t *pthread_cond_state = &threadsinfo_p->cond [PTHREAD_MUTEX_STATE];
// Locking all necessary mutexes
#ifdef PARANOID
_sync_tryforcecycle_i = 0;
#endif
debug(4, "while(pthread_mutex_trylock( pthread_mutex_state ))");
while (pthread_mutex_trylock(pthread_mutex_state) == EBUSY) {
int rc = sync_tryforcecycle(pthread_parent);
int rc = sync_tryforcecycle(ctx_p, pthread_parent);
if (rc && rc != EINPROGRESS)
return rc;
if (!rc)
break;
}
#ifdef PARANOID
_sync_tryforcecycle_i = 0;
#endif
debug(4, "while(pthread_mutex_trylock( pthread_mutex_select ))");
while (pthread_mutex_trylock(pthread_mutex_select) == EBUSY) {
int rc = sync_tryforcecycle(pthread_parent);
int rc = sync_tryforcecycle(ctx_p, pthread_parent);
if (rc && rc != EINPROGRESS)
return rc;
if (!rc)
... ... @@ -3553,13 +3569,13 @@ int sync_sighandler(sighandler_arg_t *sighandler_arg_p) {
*exitcode_p = ETIME;
case SIGQUIT:
if (ctx_p->flags[PREEXITHOOK])
sync_switch_state(pthread_parent, STATE_PREEXIT);
sync_switch_state(ctx_p, pthread_parent, STATE_PREEXIT);
else
sync_switch_state(pthread_parent, STATE_TERM);
sync_switch_state(ctx_p, pthread_parent, STATE_TERM);
break;
case SIGTERM:
case SIGINT:
sync_switch_state(pthread_parent, STATE_TERM);
sync_switch_state(ctx_p, pthread_parent, STATE_TERM);
// bugfix of https://github.com/xaionaro/clsync/issues/44
while (ctx_p->children) { // Killing children if non-pthread mode or/and (mode=="so" or mode=="rsyncso")
pid_t child_pid = ctx_p->child_pid[--ctx_p->children];
... ... @@ -3577,23 +3593,23 @@ int sync_sighandler(sighandler_arg_t *sighandler_arg_p) {
}
break;
case SIGHUP:
sync_switch_state(pthread_parent, STATE_REHASH);
sync_switch_state(ctx_p, pthread_parent, STATE_REHASH);
break;
case SIGCHLD:
sync_sigchld();
break;
case SIGUSR_THREAD_GC:
sync_switch_state(pthread_parent, STATE_THREAD_GC);
sync_switch_state(ctx_p, pthread_parent, STATE_THREAD_GC);
break;
case SIGUSR_INITSYNC:
sync_switch_state(pthread_parent, STATE_INITSYNC);
sync_switch_state(ctx_p, pthread_parent, STATE_INITSYNC);
break;
case SIGUSR_DUMP:
sync_dump(ctx_p, ctx_p->dump_path);
break;
default:
error("Unknown signal: %i. Exit.", signal);
sync_switch_state(pthread_parent, STATE_TERM);
sync_switch_state(ctx_p, pthread_parent, STATE_TERM);
break;
}
... ... @@ -3618,6 +3634,8 @@ int sync_run(ctx_t *ctx_p) {
{
int i;
register_blockthread();
sigset_t sigset_sighandler;
sigemptyset(&sigset_sighandler);
sigaddset(&sigset_sighandler, SIGALRM);
... ...