redmine

continued

... ... @@ -48,7 +48,7 @@ onoldsystem: updaterevision
$(CC) $(OLDSYSTEMCFLAGS) $(CFLAGS) $(INC) $(LDFLAGS) *.c -o $(binary)
updaterevision:
(echo -n '#define REVISION "'; [ -d .git ] && (echo -n '.'; git log 2>/dev/null | grep -c ^commit | tr -d "\n") || echo -n '-release'; echo '"') > revision.h
(echo -n '#define REVISION "'; [ -d .git ] && (echo -n '.'$$(( $$(git log 2>/dev/null | grep -c ^commit | tr -d "\n") - 137 )) ) || echo -n '-release'; echo '"') > revision.h
touch main.c
clean:
... ...
... ... @@ -7,3 +7,4 @@
There's a memleak if not pthread_*join*() is done.
5. Deduplicate code from functions, that calls sync_exec() and sync_exec_thread()
6. Fix variables' names
... ...
... ... @@ -19,24 +19,81 @@
#include "common.h"
#include "cluster.h"
#include "sync.h"
#include "output.h"
#include "malloc.h"
options_t *options_p=NULL;
indexes_t *indexes_p=NULL;
pthread_t pthread_cluster;
int cluster_init(options_t *options_p) {
return 0;
extern int cluster_loop();
int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
int ret;
if(options_p != NULL) {
printf_e("Error: cluster_init(): cluster subsystem is already initialized.\n");
return EALREADY;
}
options_p = _options_p;
indexes_p = _indexes_p;
ret = pthread_create(&pthread_cluster, NULL, (void *(*)(void *))cluster_loop, NULL);
return ret;
}
inline int cluster_signal(int signal) {
return pthread_kill(pthread_cluster, signal);
}
int cluster_deinit() {
cluster_signal(SIGTERM);
return pthread_join(pthread_cluster, NULL);
}
int cluster_deinit(options_t *options_p) {
int cluster_lock(const char *fpath) {
return 0;
}
int cluster_lock_byindexes() {
return 0;
}
int cluster_send(options_t *options_p, indexes_t *indexes_p) {
int cluster_unlock_all() {
return 0;
}
int cluster_recv(options_t *options_p, indexes_t *indexes_p) {
#define CLUSTER_LOOP_CHECK(a) {\
int ret = a;\
if(ret) {\
sync_term(ret);\
return ret;\
}\
}
int cluster_loop() {
sigset_t sigset_cluster;
sigemptyset(&sigset_cluster);
sigaddset(&sigset_cluster, SIGINT);
CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_BLOCK, &sigset_cluster, NULL));
sigemptyset(&sigset_cluster);
sigaddset(&sigset_cluster, SIGTERM);
CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_UNBLOCK, &sigset_cluster, NULL));
while(1) {
if(sigpending(&sigset_cluster))
if(sigismember(&sigset_cluster, SIGTERM))
break;
// LISTENING
}
return 0;
}
... ...
... ... @@ -17,9 +17,12 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
extern int cluster_init(options_t *options_p);
extern int cluster_deinit(options_t *options_p);
extern int cluster_init(options_t *options_p, indexes_t *indexes_p);
extern int cluster_deinit();
extern int cluster_lock(const char *fpath);
extern int cluster_lock_byindexes();
extern int cluster_unlock_all();
extern int cluster_send(options_t *options_p, indexes_t *indexes_p);
extern int cluster_recv(options_t *options_p, indexes_t *indexes_p);
... ...
... ... @@ -85,6 +85,7 @@ enum flags_enum {
PTHREAD = 'p',
CLUSTERIFACE = 'c',
CLUSTERMCASTIPADDR = 'm',
CLUSTERTIMEOUT = 'W',
HELP = 'h',
DELAY = 't',
BFILEDELAY = 'T',
... ... @@ -171,6 +172,7 @@ struct options {
unsigned int rsyncinclimit;
time_t synctime;
unsigned int synctimeout;
unsigned int cluster_timeout;
sigset_t *sigset;
};
typedef struct options options_t;
... ...
... ... @@ -18,6 +18,7 @@
#define DEFAULT_LABEL "nolabel"
#define DEFAULT_RSYNC_INCLUDELINESLIMIT 20000
#define DEFAULT_SYNCTIMEOUT (3600 * 24)
#define DEFAULT_CLUSTERTIMEOUT 1000
#define FANOTIFY_FLAGS (FAN_CLOEXEC|FAN_UNLIMITED_QUEUE|FAN_UNLIMITED_MARKS)
#define FANOTIFY_EVFLAGS (O_LARGEFILE|O_RDONLY|O_CLOEXEC)
... ... @@ -35,3 +36,4 @@
#define KILL_TIMEOUT 60
... ...
... ... @@ -33,6 +33,7 @@ static struct option long_options[] =
{"pthread", no_argument, NULL, PTHREAD},
{"cluster-iface", required_argument, NULL, CLUSTERIFACE}, // Not implemented, yet
{"cluster-ip", required_argument, NULL, CLUSTERMCASTIPADDR}, // Not implemented, yet
{"cluster-timeout", required_argument, NULL, CLUSTERTIMEOUT}, // Not implemented, yet
{"collectdelay", required_argument, NULL, DELAY},
{"syncdelay", required_argument, NULL, SYNCDELAY},
{"outlistsdir", required_argument, NULL, OUTLISTSDIR},
... ... @@ -77,9 +78,9 @@ int parse_arguments(int argc, char *argv[], struct options *options_p) {
int option_index = 0;
while(1) {
#ifdef FANOTIFY_SUPPORT
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:f", long_options, &option_index);
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:f", long_options, &option_index);
#else
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:", long_options, &option_index);
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:", long_options, &option_index);
#endif
if (c == -1) break;
... ... @@ -94,6 +95,9 @@ int parse_arguments(int argc, char *argv[], struct options *options_p) {
case 'm':
options_p->cluster_mcastipaddr = optarg;
break;
case 'W':
options_p->cluster_timeout = (unsigned int)atol(optarg);
break;
case 'd':
options_p->listoutdir = optarg;
break;
... ... @@ -313,6 +317,7 @@ int main(int argc, char *argv[]) {
options.label = DEFAULT_LABEL;
options.rsyncinclimit = DEFAULT_RSYNC_INCLUDELINESLIMIT;
options.synctimeout = DEFAULT_SYNCTIMEOUT;
options.cluster_timeout = DEFAULT_CLUSTERTIMEOUT;
parse_arguments(argc, argv, &options);
out_init(options.flags);
... ... @@ -326,15 +331,6 @@ int main(int argc, char *argv[]) {
ret = EINVAL;
}
if(options.cluster_iface == NULL) {
int _ret;
_ret = cluster_init();
if(_ret) {
printf_e("Error: Cannot initialize cluster subsystem.\n");
ret = _ret;
}
}
{
size_t size = options.watchdirlen + 2;
char *newwatchdir = xmalloc(size);
... ... @@ -419,15 +415,6 @@ int main(int argc, char *argv[]) {
if(ret == 0)
ret = sync_run(&options);
if(options.cluster_iface == NULL) {
int _ret;
_ret = cluster_deinit();
if(_ret) {
printf_e("Error: Cannot deinitialize cluster subsystem.\n");
ret = _ret;
}
}
main_cleanup(&options);
if(options.watchdirsize)
... ...
... ... @@ -113,6 +113,16 @@ Default value is "227.108.115.121". [(128+"c")."l"."s"."y"]
.RE
.PP
.B \-c, \-\-cluster\-timeout
.I cluster\-timeout
.RS 8
Sets timeout (in milliseconds) of waiting answer from another nodes of the cluster.
If there's no answer from some node, it will be excluded.
Default value is "1000". [1 second]
.RE
.PP
.B \-t, \-\-collectdelay
.I ordinary\-delay
.RS 8
... ...
... ... @@ -22,6 +22,7 @@
#include "output.h"
#include "fileutils.h"
#include "malloc.h"
#include "cluster.h"
#include "sync.h"
pthread_t pthread_sighandler;
... ... @@ -980,13 +981,17 @@ static inline int sync_dosync_exec(options_t *options_p, const char *evmask_str,
static int sync_dosync(const char *fpath, uint32_t evmask, options_t *options_p, indexes_t *indexes_p) {
int ret;
ret = cluster_lock(fpath);
if(ret) return ret;
char *evmask_str = xmalloc(1<<8);
sprintf(evmask_str, "%u", evmask);
ret = sync_dosync_exec(options_p, evmask_str, fpath);
free(evmask_str);
ret = cluster_unlock_all();
return ret;
}
... ... @@ -1300,7 +1305,6 @@ gboolean sync_idle_dosync_collectedevents_listpush(gpointer fpath_gp, gpointer e
if(!options_p->flags[RSYNC]) {
// non-RSYNC case
fprintf(outf, "sync %s %i %s\n", options_p->label, evinfo->evmask, fpath);
return TRUE;
}
... ... @@ -1475,10 +1479,15 @@ int sync_idle(int notify_d, options_t *options_p, indexes_t *indexes_p) {
printf_ddd("Debug3: sync_idle(): calling sync_idle_dosync_collectedevents()\n");
// TODO: make a separate thread on sync_idle_dosync_collectedevents();
ret = cluster_lock_byindexes();
if(ret) return ret;
ret = sync_idle_dosync_collectedevents(options_p, indexes_p);
if(ret) return ret;
ret = cluster_unlock_all();
if(ret) return ret;
return 0;
}
... ... @@ -1841,57 +1850,6 @@ int sync_notify_loop(int notify_d, options_t *options_p, indexes_t *indexes_p) {
return -1;
}
/*
void sync_sig_alarm(int signal) {
printf_e("Error: Alarm received. Syncing process timed out. Exit.\n");
exitcode = ETIME;
pthread_kill(pthread_sighandler, SIGTERM);
return;
}
void sync_sig_pthread_gc(int signal) {
printf_ddd("Debug3: Got signal for thread_gc(). Thread %p.\n", pthread_self());
int ret = thread_gc(_options_p);
exitcode = ret;
if(ret) pthread_kill(pthread_sighandler, SIGTERM);
#if 0
if(state_p)
*state_p = STATE_PTHREAD_GC;
#endif
return;
}
void sync_sig_initsync(int signal) {
printf_ddd("Debug3: Got signal for sync_initialsync()\n");
exitcode = ret;
if(ret) pthread_kill(pthread_sighandler, SIGTERM);
return;
}
void sync_sig_rehash(int signal) {
if(state_p)
*state_p = STATE_REHASH;
return;
}
void sync_sig_term(int signal) {
printf_dd("Debug2: sync_sig_term(%i). Thread %p.\n", signal, pthread_self());
if(state_p)
*state_p = STATE_TERM;
return;
}
*/
void sync_sig_int(int signal) {
printf_dd("Debug2: sync_sig_int(%i): Thread %p\n", signal, pthread_self());
return;
... ... @@ -1963,6 +1921,7 @@ l_sync_parent_interrupt_end:
}
int *sync_sighandler_exitcode_p = NULL;
int sync_sighandler(sighandler_arg_t *sighandler_arg_p) {
int signal, ret;
// options_t *options_p = sighandler_arg_p->options_p;
... ... @@ -1971,6 +1930,8 @@ int sync_sighandler(sighandler_arg_t *sighandler_arg_p) {
sigset_t *sigset_p = sighandler_arg_p->sigset_p;
int *exitcode_p = sighandler_arg_p->exitcode_p;
sync_sighandler_exitcode_p = exitcode_p;
while(1) {
printf_ddd("Debug3: sync_sighandler(): waiting for signal\n");
ret = sigwait(sigset_p, &signal);
... ... @@ -2028,23 +1989,16 @@ int sync_sighandler(sighandler_arg_t *sighandler_arg_p) {
return 0;
}
int sync_term(int exitcode) {
*sync_sighandler_exitcode_p = exitcode;
return pthread_kill(pthread_sighandler, SIGTERM);
}
int sync_run(options_t *options_p) {
int ret, i;
sighandler_arg_t sighandler_arg = {0};
// Creating signal handler thread
#if 0
signal(SIGALRM, SIG_IGN);
signal(SIGHUP, SIG_IGN);
signal(SIGTERM, SIG_IGN);
signal(SIGINT, SIG_IGN);
signal(SIGUSR_PTHREAD_GC, SIG_IGN);
signal(SIGUSR_INITSYNC, SIG_IGN);
#endif
sigset_t sigset_sighandler;
sigemptyset(&sigset_sighandler);
sigaddset(&sigset_sighandler, SIGALRM);
... ... @@ -2074,16 +2028,6 @@ int sync_run(options_t *options_p) {
signal(SIGUSR_BLOPINT, sync_sig_int);
/*
signal(SIGALRM, sync_sig_alarm);
signal(SIGHUP, sync_sig_rehash);
signal(SIGTERM, sync_sig_term);
signal(SIGINT, sync_sig_term);
signal(SIGUSR_PTHREAD_GC, sync_sig_pthread_gc);
signal(SIGUSR_INITSYNC, sync_sig_initsync);
*/
// Creating hash tables
indexes_t indexes = {NULL};
... ... @@ -2099,6 +2043,16 @@ int sync_run(options_t *options_p) {
i++;
}
// Initializing cluster subsystem
if(options_p->cluster_iface == NULL) {
ret = cluster_init(options_p, &indexes);
if(ret) {
printf_e("Error: Cannot initialize cluster subsystem: %s (errno %i).\n", strerror(ret), ret);
return ret;
}
}
// Initializing rand-generator if it's required
if(options_p->listoutdir)
... ... @@ -2113,12 +2067,6 @@ int sync_run(options_t *options_p) {
ret = sync_mark_walk(notify_d, options_p, options_p->watchdir, &indexes);
if(ret) return ret;
/*
// Full syncing the tree after marking it
ret = sync_initialsync(options_p->watchdir, options_p, &indexes, INITSYNC_FIRST);
if(ret) return ret;
*/
// "Infinite" loop of processling the events
ret = sync_notify_loop(notify_d, options_p, &indexes);
if(ret) return ret;
... ... @@ -2145,6 +2093,15 @@ int sync_run(options_t *options_p) {
i++;
}
if(options_p->cluster_iface == NULL) {
int _ret;
_ret = cluster_deinit();
if(_ret) {
printf_e("Error: Cannot deinitialize cluster subsystem: %s (errno: %i).\n", strerror(_ret), _ret);
ret = _ret;
}
}
#ifdef VERYPARANOID
// One second for another threads
sleep(1);
... ...
... ... @@ -17,6 +17,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
extern int sync_run(struct options *options);
extern int sync_term(int exitcode);
... ...