redmine

Added untested option "--custom-signals"

... ... @@ -13,6 +13,8 @@
// clsync should be used, if there's more than 5-10 nodes. So the limit in 255 is quite enough. :)
#define MAXNODES ((1<<8)-1)
#define MAXSIGNALNUM (1<<9)
// max user/group lengths
#define USER_LEN (1<<8)
#define GROUP_LEN USER_LEN
... ...
... ... @@ -102,6 +102,8 @@ enum flags_enum {
SYNCHANDLERARGS0 = 21|OPTION_LONGOPTONLY,
SYNCHANDLERARGS1 = 22|OPTION_LONGOPTONLY,
CUSTOMSIGNALS = 23|OPTION_LONGOPTONLY,
};
typedef enum flags_enum flags_t;
... ... @@ -233,6 +235,7 @@ struct ctx {
#ifndef LIBCLSYNC
char *config_path;
char *config_block;
char *customsignal[MAXSIGNALNUM+1];
char *label;
char *watchdir;
char *pidfile;
... ...
... ... @@ -54,6 +54,7 @@ static const struct option long_options[] =
{"config-file", required_argument, NULL, CONFIGFILE},
{"config-block", required_argument, NULL, CONFIGBLOCK},
{"config-block-inherits",required_argument, NULL, CONFIGBLOCKINHERITS},
{"custom-signals", required_argument, NULL, CUSTOMSIGNALS},
{"pid-file", required_argument, NULL, PIDFILE},
{"uid", required_argument, NULL, UID},
{"gid", required_argument, NULL, GID},
... ... @@ -447,6 +448,80 @@ static int synchandler_arg1(char *arg, size_t arg_len, void *_ctx_p) {
return synchandler_arg(arg, arg_len, _ctx_p, SHARGS_INITIAL);
}
int parse_customsignals(ctx_t *ctx_p, char *arg) {
char *ptr = arg, *start = arg;
unsigned int signal;
do {
switch (*ptr) {
case 0:
case ',':
case ':':
signal = (unsigned int)atoi(start);
if (signal == 0) {
// flushing the setting
int i = 0;
while (i < 256) {
if (ctx_p->customsignal[i]) {
free(ctx_p->customsignal[i]);
ctx_p->customsignal[i] = NULL;
}
i++;
}
#ifdef _DEBUG
fprintf(stderr, "Force-Debug: parse_parameter(): Reset custom signals.\n");
#endif
} else {
if (*ptr != ':') {
char ch = *ptr;
*ptr = 0;
errno = EINVAL;
error("Expected \":\" in \"%s\"", start);
*ptr = ch;
return errno;
}
{
char ch, *end;
ptr++;
end = ptr;
while (*end && *end != ',') end++;
if (end == ptr) {
errno = EINVAL;
error("Empty config block name on signal \"%u\"", signal);
return errno;
}
if (signal > MAXSIGNALNUM) {
errno = EINVAL;
error("Too high value of the signal: \"%u\" > "XTOSTR(MAXSIGNALNUM)"", signal);
return errno;
}
ch = *end; *end = 0;
ctx_p->customsignal[signal] = strdup(ptr);
*end = ch;
#ifdef _DEBUG
fprintf(stderr, "Force-Debug: parse_parameter(): Adding custom signal %u.\n", signal);
#endif
}
}
start = ptr+1;
break;
case '0' ... '9':
break;
default:
errno = EINVAL;
error("Expected a digit, comma (or colon) but got \"%c\"", *ptr);
return errno;
}
} while (*(ptr++));
return 0;
}
int parse_parameter(ctx_t *ctx_p, uint16_t param_id, char *arg, paramsource_t paramsource) {
#ifdef _DEBUG
fprintf(stderr, "Force-Debug: parse_parameter(): %i: %i = \"%s\"\n", paramsource, param_id, arg);
... ... @@ -500,6 +575,15 @@ int parse_parameter(ctx_t *ctx_p, uint16_t param_id, char *arg, paramsource_t pa
break;
case CONFIGBLOCKINHERITS:
break;
case CUSTOMSIGNALS:
if (paramsource == PS_CONTROL) {
warning("Cannot change \"custom-signal\" in run-time. Ignoring.");
return 0;
}
if (parse_customsignals(ctx_p, arg))
return errno;
break;
case GID:
ctx_p->gid = (unsigned int)atol(arg);
ctx_p->flags[param_id]++;
... ... @@ -552,7 +636,7 @@ int parse_parameter(ctx_t *ctx_p, uint16_t param_id, char *arg, paramsource_t pa
}
#ifdef CLUSTER_SUPPORT
case CLUSTERIFACE:
ctx_p->cluster_iface = arg;
ctx_p->cluster_iface = arg;
break;
case CLUSTERMCASTIPADDR:
ctx_p->cluster_mcastipaddr = arg;
... ... @@ -561,10 +645,10 @@ int parse_parameter(ctx_t *ctx_p, uint16_t param_id, char *arg, paramsource_t pa
ctx_p->cluster_mcastipport = (uint16_t)atoi(arg);
break;
case CLUSTERTIMEOUT:
ctx_p->cluster_timeout = (unsigned int)atol(arg);
ctx_p->cluster_timeout = (unsigned int)atol(arg);
break;
case CLUSTERNODENAME:
ctx_p->cluster_nodename = arg;
ctx_p->cluster_nodename = arg;
break;
case CLUSTERHDLMIN:
ctx_p->cluster_hash_dl_min = (uint16_t)atoi(arg);
... ... @@ -580,7 +664,7 @@ int parse_parameter(ctx_t *ctx_p, uint16_t param_id, char *arg, paramsource_t pa
ctx_p->listoutdir = arg;
break;
case LABEL:
ctx_p->label = arg;
ctx_p->label = arg;
break;
case STANDBYFILE:
if(strlen(arg)) {
... ... @@ -677,10 +761,10 @@ int parse_parameter(ctx_t *ctx_p, uint16_t param_id, char *arg, paramsource_t pa
case ',':
// *ptr=0;
exitcode = (unsigned char)atoi(start);
if(exitcode == 0) {
if (exitcode == 0) {
// flushing the setting
int i = 0;
while(i < 256)
while (i < 256)
ctx_p->isignoredexitcode[i++] = 0;
#ifdef _DEBUG
fprintf(stderr, "Force-Debug: parse_parameter(): Reset ignored exitcodes.\n");
... ... @@ -693,6 +777,12 @@ int parse_parameter(ctx_t *ctx_p, uint16_t param_id, char *arg, paramsource_t pa
}
start = ptr+1;
break;
case '0' ... '9':
break;
default:
errno = EINVAL;
error("Expected a digit or comma but got \"%c\"", *ptr);
return errno;
}
} while(*(ptr++));
break;
... ... @@ -991,6 +1081,16 @@ int configs_parse(ctx_t *ctx_p) {
return 0;
}
int config_block_parse(ctx_t *ctx_p, const char *const config_block_name)
{
debug(1, "(ctx_p, \"%s\")", config_block_name);
free(ctx_p->config_block);
ctx_p->config_block = strdup(config_block_name);
return configs_parse(ctx_p);
}
int ctx_check(ctx_t *ctx_p) {
int ret = 0;
#ifdef CLUSTER_SUPPORT
... ... @@ -1486,6 +1586,8 @@ int ctx_set(ctx_t *ctx_p, const char *const parameter_name, const char *const pa
}
ret = ctx_check(ctx_p);
if (ret)
critical("Cannot continue with this setup");
return ret;
}
... ...
... ... @@ -20,6 +20,7 @@
extern int main_rehash(ctx_t *ctx_p);
extern int main_status_update(ctx_t *ctx_p);
extern int ctx_set(ctx_t *ctx_p, const char *const parameter_name, const char *const parameter_value);
extern int config_block_parse(ctx_t *ctx_p, const char *const config_block_name);
extern char *parameter_expand(
ctx_t *ctx_p,
char *arg,
... ...
... ... @@ -207,6 +207,31 @@ Default value is "default".
.PP
.RE
.B \-\-custom\-signals
.I custom\-signals
.RS
Set a list of signals and corresponding config block names. The config block
will be use on catching the corresponding signal.
Format is
.RS
.I signal:config\-block\-name[,signal:config\-block\-name[,...]]
.RE
For example:
.RS
\-\-custom\-signals=29:debug,28:normal
.RE
In this line signals "28" and "29" will be added to the sighandler.
And clsync will use options from config block "debug" on signal 29 and
"normal" on signal 28.
To reset all custom signals use the 0-th signal (e.g. "\-\-custom\-signals=0").
Default value is "".
.PP
.RE
.B \-z, \-\-pid\-file
.I path\-to\-pidfile
.RS 8
... ...
... ... @@ -3062,29 +3062,37 @@ void hook_preexit(ctx_t *ctx_p) {
int sync_loop(ctx_t *ctx_p, indexes_t *indexes_p) {
int ret;
threadsinfo_t *threadsinfo_p = thread_info();
state_p = &ctx_p->state;
ctx_p->state = ctx_p->flags[SKIPINITSYNC] ? STATE_RUNNING : STATE_INITSYNC;
while (ctx_p->state != STATE_EXIT) {
int events;
threadsinfo_t *threadsinfo_p = thread_info();
debug(4, "pthread_mutex_lock()");
pthread_mutex_lock(&threadsinfo_p->mutex[PTHREAD_MUTEX_STATE]);
debug(3, "current state is %i (iteration: %u/%u)",
ctx_p->state, ctx_p->iteration_num, ctx_p->flags[MAXITERATIONS]);
debug(3, "current state is %i (iteration: %u/%u); threadsinfo_p->used == %u",
ctx_p->state, ctx_p->iteration_num, ctx_p->flags[MAXITERATIONS], threadsinfo_p->used);
while ((ctx_p->flags[THREADING] == PM_OFF) && threadsinfo_p->used) {
debug(1, "We are in non-threading mode but have %u syncer threads. Waiting for them end.", threadsinfo_p->used);
pthread_cond_wait(&threadsinfo_p->cond[PTHREAD_MUTEX_STATE], &threadsinfo_p->mutex[PTHREAD_MUTEX_STATE]);
pthread_mutex_unlock(&threadsinfo_p->mutex[PTHREAD_MUTEX_STATE]);
}
events = 0;
switch (ctx_p->state) {
case STATE_THREAD_GC:
main_status_update(ctx_p);
if(thread_gc(ctx_p)) {
if (thread_gc(ctx_p)) {
ctx_p->state = STATE_EXIT;
break;
}
ctx_p->state = STATE_RUNNING;
SYNC_LOOP_CONTINUE_UNLOCK;
case STATE_INITSYNC:
if(!ctx_p->flags[THREADING]) {
if (!ctx_p->flags[THREADING]) {
ctx_p->iteration_num = 0;
setenv_iteration(ctx_p->iteration_num);
}
... ... @@ -3105,7 +3113,7 @@ int sync_loop(ctx_t *ctx_p, indexes_t *indexes_p) {
continue;
case STATE_PREEXIT:
case STATE_RUNNING:
if((!ctx_p->flags[THREADING]) && ctx_p->flags[MAXITERATIONS]) {
if ((!ctx_p->flags[THREADING]) && ctx_p->flags[MAXITERATIONS]) {
if (ctx_p->flags[MAXITERATIONS] == ctx_p->iteration_num-1)
ctx_p->state = STATE_PREEXIT;
else
... ... @@ -3474,7 +3482,7 @@ int sync_sighandler(sighandler_arg_t *sighandler_arg_p) {
sync_sighandler_exitcode_p = exitcode_p;
while(1) {
while((*state_p != STATE_TERM) && (*state_p != STATE_EXIT)) {
debug(3, "waiting for signal");
ret = sigwait(sigset_p, &signal);
... ... @@ -3498,11 +3506,16 @@ int sync_sighandler(sighandler_arg_t *sighandler_arg_p) {
debug(3, "got signal %i. *state_p == %i.", signal, *state_p);
if(ret) {
if (ret) {
// TODO: handle an error here
}
switch(signal) {
if (ctx_p->customsignal[signal] != NULL) {
config_block_parse(ctx_p, ctx_p->customsignal[signal]);
continue;
}
switch (signal) {
case SIGALRM:
*exitcode_p = ETIME;
case SIGQUIT:
... ... @@ -3517,21 +3530,21 @@ int sync_sighandler(sighandler_arg_t *sighandler_arg_p) {
// bugfix of https://github.com/xaionaro/clsync/issues/44
while (ctx_p->children) { // Killing children if non-pthread mode or/and (mode=="so" or mode=="rsyncso")
pid_t child_pid = ctx_p->child_pid[--ctx_p->children];
if(waitpid(child_pid, NULL, WNOHANG)>=0) {
if (waitpid(child_pid, NULL, WNOHANG)>=0) {
debug(3, "Sending signal %u to child process with pid %u.",
signal, child_pid);
kill(child_pid, signal);
sleep(1); // TODO: replace this sleep() with something to do not sleep if process already died
} else
continue;
if(waitpid(child_pid, NULL, WNOHANG)>=0) {
if (waitpid(child_pid, NULL, WNOHANG)>=0) {
debug(3, "Sending signal SIGQUIT to child process with pid %u.",
child_pid);
kill(child_pid, SIGQUIT);
sleep(1); // TODO: replace this sleep() with something to do not sleep if process already died
} else
continue;
if(waitpid(child_pid, NULL, WNOHANG)>=0) {
if (waitpid(child_pid, NULL, WNOHANG)>=0) {
debug(3, "Sending signal SIGKILL to child process with pid %u.",
child_pid);
kill(child_pid, SIGKILL);
... ... @@ -3556,9 +3569,6 @@ int sync_sighandler(sighandler_arg_t *sighandler_arg_p) {
break;
}
if((*state_p == STATE_TERM) || (*state_p == STATE_EXIT)) {
break;
}
}
debug(3, "signal handler closed.");
... ... @@ -3571,62 +3581,75 @@ int sync_term(int exitcode) {
}
int sync_run(ctx_t *ctx_p) {
int ret, i;
int ret;
sighandler_arg_t sighandler_arg = {0};
indexes_t indexes = {NULL};
// Creating signal handler thread
sigset_t sigset_sighandler;
sigemptyset(&sigset_sighandler);
sigaddset(&sigset_sighandler, SIGALRM);
sigaddset(&sigset_sighandler, SIGHUP);
sigaddset(&sigset_sighandler, SIGQUIT);
sigaddset(&sigset_sighandler, SIGTERM);
sigaddset(&sigset_sighandler, SIGINT);
sigaddset(&sigset_sighandler, SIGUSR_THREAD_GC);
sigaddset(&sigset_sighandler, SIGUSR_INITSYNC);
sigaddset(&sigset_sighandler, SIGUSR_DUMP);
ret = pthread_sigmask(SIG_BLOCK, &sigset_sighandler, NULL);
if (ret) return ret;
sighandler_arg.ctx_p = ctx_p;
// sighandler_arg.indexes_p = &indexes;
sighandler_arg.pthread_parent = pthread_self();
sighandler_arg.exitcode_p = &ret;
sighandler_arg.sigset_p = &sigset_sighandler;
ret = pthread_create(&pthread_sighandler, NULL, (void *(*)(void *))sync_sighandler, &sighandler_arg);
if (ret) return ret;
sigset_t sigset_parent;
sigemptyset(&sigset_parent);
sigaddset(&sigset_parent, SIGUSR_BLOPINT);
ret = pthread_sigmask(SIG_UNBLOCK, &sigset_parent, NULL);
if (ret) return ret;
signal(SIGUSR_BLOPINT, sync_sig_int);
{
int i;
sigset_t sigset_sighandler;
sigemptyset(&sigset_sighandler);
sigaddset(&sigset_sighandler, SIGALRM);
sigaddset(&sigset_sighandler, SIGHUP);
sigaddset(&sigset_sighandler, SIGQUIT);
sigaddset(&sigset_sighandler, SIGTERM);
sigaddset(&sigset_sighandler, SIGINT);
sigaddset(&sigset_sighandler, SIGUSR_THREAD_GC);
sigaddset(&sigset_sighandler, SIGUSR_INITSYNC);
sigaddset(&sigset_sighandler, SIGUSR_DUMP);
i = 0;
while (i < MAXSIGNALNUM) {
if (ctx_p->customsignal[i] != NULL)
sigaddset(&sigset_sighandler, i);
i++;
}
// Creating hash tables
ret = pthread_sigmask(SIG_BLOCK, &sigset_sighandler, NULL);
if (ret) return ret;
indexes_t indexes = {NULL};
ctx_p->indexes_p = &indexes;
sighandler_arg.ctx_p = ctx_p;
sighandler_arg.pthread_parent = pthread_self();
sighandler_arg.exitcode_p = &ret;
sighandler_arg.sigset_p = &sigset_sighandler;
ret = pthread_create(&pthread_sighandler, NULL, (void *(*)(void *))sync_sighandler, &sighandler_arg);
if (ret) return ret;
indexes.wd2fpath_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0);
indexes.fpath2wd_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
indexes.fpath2ei_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
indexes.exc_fpath_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
indexes.out_lines_aggr_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
i=0;
while (i<QUEUE_MAX) {
switch (i) {
case QUEUE_LOCKWAIT:
indexes.fpath2ei_coll_ht[i] = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
break;
default:
indexes.fpath2ei_coll_ht[i] = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
indexes.exc_fpath_coll_ht[i] = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
sigset_t sigset_parent;
sigemptyset(&sigset_parent);
sigaddset(&sigset_parent, SIGUSR_BLOPINT);
ret = pthread_sigmask(SIG_UNBLOCK, &sigset_parent, NULL);
if (ret) return ret;
signal(SIGUSR_BLOPINT, sync_sig_int);
}
// Creating hash tables
{
int i;
ctx_p->indexes_p = &indexes;
indexes.wd2fpath_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0);
indexes.fpath2wd_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
indexes.fpath2ei_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
indexes.exc_fpath_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
indexes.out_lines_aggr_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
i=0;
while (i<QUEUE_MAX) {
switch (i) {
case QUEUE_LOCKWAIT:
indexes.fpath2ei_coll_ht[i] = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
break;
default:
indexes.fpath2ei_coll_ht[i] = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
indexes.exc_fpath_coll_ht[i] = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
}
i++;
}
i++;
}
// Loading dynamical libraries
... ... @@ -3844,23 +3867,27 @@ int sync_run(ctx_t *ctx_p) {
rsync_escape_cleanup();
// Removing hash-tables
debug(3, "Closing hash tables");
g_hash_table_destroy(indexes.wd2fpath_ht);
g_hash_table_destroy(indexes.fpath2wd_ht);
g_hash_table_destroy(indexes.fpath2ei_ht);
g_hash_table_destroy(indexes.exc_fpath_ht);
g_hash_table_destroy(indexes.out_lines_aggr_ht);
i=0;
while(i<QUEUE_MAX) {
switch (i) {
case QUEUE_LOCKWAIT:
g_hash_table_destroy(indexes.fpath2ei_coll_ht[i]);
break;
default:
g_hash_table_destroy(indexes.fpath2ei_coll_ht[i]);
g_hash_table_destroy(indexes.exc_fpath_coll_ht[i]);
{
int i;
debug(3, "Closing hash tables");
g_hash_table_destroy(indexes.wd2fpath_ht);
g_hash_table_destroy(indexes.fpath2wd_ht);
g_hash_table_destroy(indexes.fpath2ei_ht);
g_hash_table_destroy(indexes.exc_fpath_ht);
g_hash_table_destroy(indexes.out_lines_aggr_ht);
i = 0;
while (i<QUEUE_MAX) {
switch (i) {
case QUEUE_LOCKWAIT:
g_hash_table_destroy(indexes.fpath2ei_coll_ht[i]);
break;
default:
g_hash_table_destroy(indexes.fpath2ei_coll_ht[i]);
g_hash_table_destroy(indexes.exc_fpath_coll_ht[i]);
}
i++;
}
i++;
}
// Deinitializing cluster subsystem
... ...