diff --git a/Makefile.am b/Makefile.am index 8b1036a..6c87fc4 100644 --- a/Makefile.am +++ b/Makefile.am @@ -16,6 +16,9 @@ clsync_SOURCES = calc.c cluster.c error.c fileutils.c glibex.c \ clsync_CFLAGS = $(AM_CFLAGS) clsync_LDFLAGS = $(AM_LDFLAGS) +if HAVE_PTHREAD_TIMEDJOIN_NP +clsync_CFLAGS += -DTHREADING_SUPPORT +endif if HAVE_KQUEUE clsync_CFLAGS += -DKQUEUE_SUPPORT clsync_SOURCES += mon_kqueue.c mon_kqueue.h diff --git a/calc.c b/calc.c index 93d1317..9d54c1d 100644 --- a/calc.c +++ b/calc.c @@ -41,8 +41,10 @@ uint32_t adler32_calc(const unsigned char *const data, uint32_t len) { // where data is the location of the data in physical // memory and len is the length of the data in bytes +/* if (len&3) warning("len [%i] & 3 == %i != 0. Wrong length (not a multiple of 4).", len, len&3); +*/ debug(70, "%p, %i", data, len); diff --git a/configuration.h b/configuration.h index 751b4a8..2bb82c6 100644 --- a/configuration.h +++ b/configuration.h @@ -46,7 +46,7 @@ # endif # endif # ifndef DEFAULT_NOTIFYENGINE -# ifdef __FreeBSD__ +# if __FreeBSD__ | __FreeBSD_kernel__ # ifdef KQUEUE_SUPPORT # define DEFAULT_NOTIFYENGINE NE_KQUEUE # endif diff --git a/configure.ac b/configure.ac index 781b7ad..19b77fd 100644 --- a/configure.ac +++ b/configure.ac @@ -210,6 +210,9 @@ AC_CHECK_FUNC([getmntent], [HAVE_GETMNTENT=1]) dnl searching for pivot_root AC_CHECK_FUNC([pivot_root], [HAVE_PIVOTROOT=1]) +dnl searching for pthread_timedjoin_np +AC_CHECK_FUNC([pthread_timedjoin_np], [HAVE_PTHREAD_TIMEDJOIN_NP=1]) + dnl libcgroup check AC_ARG_WITH(libcgroup, AS_HELP_STRING(--with-libcgroup, @@ -469,6 +472,7 @@ AM_CONDITIONAL([HAVE_UNSHARE], [test "x$HAVE_UNSHARE" != "x"]) AM_CONDITIONAL([HAVE_SECCOMP], [test "x$HAVE_SECCOMP" != "x"]) AM_CONDITIONAL([HAVE_TRE], [test "x$HAVE_TRE" != "x"]) AM_CONDITIONAL([HAVE_LIBCGROUP], [test "x$HAVE_LIBCGROUP" != "x"]) +AM_CONDITIONAL([HAVE_PTHREAD_TIMEDJOIN_NP], [test "x$HAVE_PTHREAD_TIMEDJOIN_NP" != "x"]) AS_IF([test "$HAVE_KQUEUE" = '' -a "$HAVE_INOTIFY" = '' -a "$HAVE_FANOTIFY" = '' -a "$HAVE_BSM" = '' -a "$HAVE_GIO" = ''], [AC_MSG_FAILURE([At least one monitoring engine must be enabled! diff --git a/examples/clsync-start-cluster.sh b/examples/clsync-start-cluster.sh index e5200e4..ff50738 100755 --- a/examples/clsync-start-cluster.sh +++ b/examples/clsync-start-cluster.sh @@ -25,5 +25,14 @@ cat > rules < rules < rules <retries = (unsigned int)xstrtol(arg, &ret); break; +#ifdef THREADING_SUPPORT case THREADING: { char *value, *arg_orig = arg; @@ -1320,6 +1328,7 @@ static int parse_parameter(ctx_t *ctx_p, uint16_t param_id, char *arg, paramsour break; } +#endif case OUTPUT_METHOD: { char *value, *arg_orig = arg; @@ -1891,18 +1900,18 @@ int ctx_check(ctx_t *ctx_p) { # endif #endif -#ifdef VERYPARANOID - if ((ctx_p->retries != 1) && ctx_p->flags[THREADING]) { - ret = errno = EINVAL; - error("\"--retries\" values should be equal to \"1\" for this \"--threading\" value."); - } -#endif - if (ctx_p->flags[STANDBYFILE] && (ctx_p->flags[MODE] == MODE_SIMPLE)) { ret = errno = EINVAL; error("Sorry but option \"--standby-file\" cannot be used in mode \"simple\", yet."); } +#ifdef THREADING_SUPPORT +# ifdef VERYPARANOID + if ((ctx_p->retries != 1) && ctx_p->flags[THREADING]) { + ret = errno = EINVAL; + error("\"--retries\" values should be equal to \"1\" for this \"--threading\" value."); + } +# endif if (ctx_p->flags[THREADING] && ctx_p->flags[ONLYINITSYNC]) { ret = errno = EINVAL; error("Conflicting options: This value of \"--threading\" cannot be used in conjunction with \"--only-initialsync\"."); @@ -1924,6 +1933,7 @@ int ctx_check(ctx_t *ctx_p) { ret = errno = EINVAL; error("Conflicting options: This value of \"--threading\" cannot be used in conjunction with \"--splitting=thread\"."); } +#endif if (ctx_p->flags[SKIPINITSYNC] && ctx_p->flags[EXITONNOEVENTS]) { ret = errno = EINVAL; error("Conflicting options: \"--skip-initialsync\" and \"--exit-on-no-events\" cannot be used together."); diff --git a/port-hacks.h b/port-hacks.h index 1e742ee..1c57f89 100644 --- a/port-hacks.h +++ b/port-hacks.h @@ -17,7 +17,6 @@ along with this program. If not, see . */ - #ifndef __PORT_HACKS_H #define __PORT_HACKS_H @@ -28,17 +27,14 @@ #include #include -#ifndef __FreeBSD__ - typedef struct stat64 stat64_t; -#endif - -#ifdef __FreeBSD__ +#if __FreeBSD__ || __FreeBSD_kernel__ +# include # define O_PATH 0 - typedef struct stat stat64_t; # include +# ifdef THREADING_SUPPORT static inline int pthread_tryjoin_np(pthread_t thread, void **retval) { struct timespec abstime; int rc; @@ -55,11 +51,19 @@ return rc; } +# endif +# ifndef __USE_LARGEFILE64 + typedef struct stat stat64_t; static inline int lstat64(const char *pathname, struct stat *buf) { return lstat(pathname, buf); } +# else + typedef struct stat64 stat64_t; +# endif +#else + typedef struct stat64 stat64_t; #endif #ifdef CLSYNC_ITSELF diff --git a/socket.h b/socket.h index fce1993..8dec4c6 100644 --- a/socket.h +++ b/socket.h @@ -27,6 +27,7 @@ #include #include +#include "port-hacks.h" #include "clsync.h" #include "ctx.h" diff --git a/sync.c b/sync.c index aa8547d..393166f 100644 --- a/sync.c +++ b/sync.c @@ -85,7 +85,9 @@ static inline void finish_iteration(ctx_t *ctx_p) { if (ctx_p->iteration_num < ~0) // ~0 is the max value for unsigned variables ctx_p->iteration_num++; +#ifdef THREADING_SUPPORT if (!ctx_p->flags[THREADING]) +#endif setenv_iteration(ctx_p->iteration_num); debug(3, "next iteration: %u/%u", @@ -198,6 +200,7 @@ threadsinfo_t *thread_info() { // TODO: optimize this return &threadsinfo; } +#ifdef THREADING_SUPPORT #define thread_info_lock() _thread_info_lock(__FUNCTION__) static inline threadsinfo_t *_thread_info_lock(const char *const function_name) { threadsinfo_t *threadsinfo_p = thread_info(); @@ -498,6 +501,7 @@ int thread_cleanup(ctx_t *ctx_p) { debug(3, "done."); return thread_info_unlock(0); } +#endif volatile state_t *state_p = NULL; volatile int exitcode = 0; @@ -549,6 +553,7 @@ int exec_argv(char **argv, int *child_pid) { return exitcode; } +#ifdef THREADING_SUPPORT static inline int thread_exit(threadinfo_t *threadinfo_p, int exitcode ) { int err=0; threadinfo_p->exitcode = exitcode; @@ -580,6 +585,7 @@ static inline int thread_exit(threadinfo_t *threadinfo_p, int exitcode ) { debug(3, "thread %p is sending signal to sighandler to call GC", threadinfo_p->pthread); return pthread_kill(pthread_sighandler, SIGUSR_THREAD_GC); } +#endif static inline void so_call_sync_finished(int n, api_eventinfo_t *ei) { int i = 0; @@ -602,6 +608,7 @@ static inline void so_call_sync_finished(int n, api_eventinfo_t *ei) { return; } +#ifdef THREADING_SUPPORT int so_call_sync_thread(threadinfo_t *threadinfo_p) { debug(3, "thread_num == %i; threadinfo_p == %p; i_p->pthread %p; thread %p", threadinfo_p->thread_num, threadinfo_p, threadinfo_p->pthread, pthread_self()); @@ -642,11 +649,14 @@ int so_call_sync_thread(threadinfo_t *threadinfo_p) { return rc; } +#endif static inline int so_call_sync(ctx_t *ctx_p, indexes_t *indexes_p, int n, api_eventinfo_t *ei) { debug(2, "n == %i", n); +#ifdef THREADING_SUPPORT if (!SHOULD_THREAD(ctx_p)) { +#endif int rc=0, ret=0, err=0; int try_n=0, try_again; state_t status = STATE_UNKNOWN; @@ -691,6 +701,7 @@ static inline int so_call_sync(ctx_t *ctx_p, indexes_t *indexes_p, int n, api_ev so_call_sync_finished(n, ei); return ret; +#ifdef THREADING_SUPPORT } threadinfo_t *threadinfo_p = thread_new(); @@ -716,7 +727,7 @@ static inline int so_call_sync(ctx_t *ctx_p, indexes_t *indexes_p, int n, api_ev } debug(3, "thread %p", threadinfo_p->pthread); return 0; - +#endif } static inline int so_call_rsync_finished(ctx_t *ctx_p, const char *inclistfile, const char *exclistfile) { @@ -747,6 +758,7 @@ static inline int so_call_rsync_finished(ctx_t *ctx_p, const char *inclistfile, return ret0 == 0 ? ret1 : ret0; } +#ifdef THREADING_SUPPORT int so_call_rsync_thread(threadinfo_t *threadinfo_p) { debug(3, "thread_num == %i; threadinfo_p == %p; i_p->pthread %p; thread %p", threadinfo_p->thread_num, threadinfo_p, threadinfo_p->pthread, pthread_self()); @@ -791,11 +803,14 @@ int so_call_rsync_thread(threadinfo_t *threadinfo_p) { return rc; } +#endif static inline int so_call_rsync(ctx_t *ctx_p, indexes_t *indexes_p, const char *inclistfile, const char *exclistfile) { debug(2, "inclistfile == \"%s\"; exclistfile == \"%s\"", inclistfile, exclistfile); +#ifdef THREADING_SUPPORT if (!SHOULD_THREAD(ctx_p)) { +#endif debug(3, "ctx_p->handler_funct.rsync == %p", ctx_p->handler_funct.rsync); // indexes_p->nonthreaded_syncing_fpath2ei_ht = g_hash_table_dup(indexes_p->fpath2ei_ht, g_str_hash, g_str_equal, free, free, (gpointer(*)(gpointer))strdup, eidup); @@ -842,6 +857,7 @@ static inline int so_call_rsync(ctx_t *ctx_p, indexes_t *indexes_p, const char * if ((ret_cleanup=so_call_rsync_finished(ctx_p, inclistfile, exclistfile))) return rc ? rc : ret_cleanup; return rc; +#ifdef THREADING_SUPPORT } threadinfo_t *threadinfo_p = thread_new(); @@ -868,13 +884,17 @@ static inline int so_call_rsync(ctx_t *ctx_p, indexes_t *indexes_p, const char * } debug(3, "thread %p", threadinfo_p->pthread); return 0; - +#endif } // === SYNC_EXEC() === { //#define SYNC_EXEC(...) (SHOULD_THREAD(ctx_p) ? sync_exec_thread : sync_exec )(__VA_ARGS__) +#ifdef THREADING_SUPPORT #define SYNC_EXEC_ARGV(...) (SHOULD_THREAD(ctx_p) ? sync_exec_argv_thread : sync_exec_argv)(__VA_ARGS__) +#else +#define SYNC_EXEC_ARGV(...) sync_exec_argv(__VA_ARGS__) +#endif #define debug_argv_dump(level, argv)\ if (unlikely(ctx_p->flags[DEBUG] >= level))\ @@ -1083,6 +1103,7 @@ static inline int sync_exec(ctx_t *ctx_p, indexes_t *indexes_p, thread_callbackf } */ +#ifdef THREADING_SUPPORT int __sync_exec_thread(threadinfo_t *threadinfo_p) { char **argv = threadinfo_p->argv; ctx_t *ctx_p = threadinfo_p->ctx_p; @@ -1166,6 +1187,7 @@ static inline int sync_exec_thread(ctx_t *ctx_p, indexes_t *indexes_p, thread_ca return sync_exec_argv_thread(ctx_p, indexes_p, callback, callback_arg_p, argv); } */ +#endif // } === SYNC_EXEC() === @@ -1627,7 +1649,9 @@ int sync_initialsync(const char *path, ctx_t *ctx_p, indexes_t *indexes_p, inits NULL, argv); +#ifdef THREADING_SUPPORT if (!SHOULD_THREAD(ctx_p)) // If it's a thread then it will free the argv in GC. If not a thread then we have to free right here. +#endif argv_free(argv); return sync_initialsync_finish(ctx_p, initsync, ret); @@ -1914,7 +1938,9 @@ static inline int sync_dosync_exec(ctx_t *ctx_p, indexes_t *indexes_p, const cha NULL, NULL, argv); +#ifdef THREADING_SUPPORT if (!SHOULD_THREAD(ctx_p)) // If it's a thread then it will free the argv in GC. If not a thread then we have to free right here. +#endif argv_free(argv); return rc; @@ -2254,9 +2280,13 @@ int _sync_islocked(threadinfo_t *threadinfo_p, void *_fpath) { } static inline int sync_islocked(const char *const fpath) { +#ifdef THREADING_SUPPORT int rc = threads_foreach(_sync_islocked, STATE_RUNNING, (void *)fpath); debug(3, "<%s>: %u", fpath, rc); return rc; +#else + return 0; +#endif } void _sync_idle_dosync_collectedevents(gpointer fpath_gp, gpointer evinfo_gp, gpointer arg_gp) { @@ -2740,7 +2770,9 @@ int sync_idle_dosync_collectedevents_commitpart(struct dosync_arg *dosync_arg_p) callback_arg_p, argv); +#ifdef THREADING_SUPPORT if (!SHOULD_THREAD(ctx_p)) // If it's a thread then it will free the argv in GC. If not a thread then we have to free right here. +#endif argv_free(argv); return rc; } @@ -3009,11 +3041,14 @@ int apievinfo2rsynclist(indexes_t *indexes_p, FILE *listfile, int n, api_eventin } int sync_idle(ctx_t *ctx_p, indexes_t *indexes_p) { + int ret; // Collecting garbage - int ret=thread_gc(ctx_p); +#ifdef THREADING_SUPPORT + ret=thread_gc(ctx_p); if(ret) return ret; +#endif // Checking if we can sync @@ -3094,6 +3129,7 @@ int notify_wait(ctx_t *ctx_p, indexes_t *indexes_p) { delay = MAX(delay, synctime_delay); delay = delay > 0 ? delay : 0; +#ifdef THREADING_SUPPORT if (ctx_p->flags[THREADING]) { time_t _thread_nextexpiretime = thread_nextexpiretime(); debug(3, "thread_nextexpiretime == %i", _thread_nextexpiretime); @@ -3105,6 +3141,7 @@ int notify_wait(ctx_t *ctx_p, indexes_t *indexes_p) { delay = MIN(delay, thread_expiredelay); } } +#endif if ((!delay) || (ctx_p->state != STATE_RUNNING)) return 0; @@ -3210,10 +3247,12 @@ int sync_loop(ctx_t *ctx_p, indexes_t *indexes_p) { switch (ctx_p->state) { case STATE_THREAD_GC: main_status_update(ctx_p); +#ifdef THREADING_SUPPORT if (thread_gc(ctx_p)) { ctx_p->state = STATE_EXIT; break; } +#endif ctx_p->state = STATE_RUNNING; SYNC_LOOP_CONTINUE_UNLOCK; case STATE_INITSYNC: @@ -3416,14 +3455,22 @@ int sync_switch_state(ctx_t *ctx_p, pthread_t pthread_parent, int newstate) { debug(4, "pthread_mutex_unlock( pthread_mutex_select )"); pthread_mutex_unlock(pthread_mutex_select); +#ifdef THREADING_SUPPORT return thread_info_unlock(0); +#else + return 0; +#endif l_sync_parent_interrupt_end: *state_p = newstate; pthread_kill(pthread_parent, SIGUSR_BLOPINT); +#ifdef THREADING_SUPPORT return thread_info_unlock(0); +#else + return 0; +#endif } /* === DUMP === */ @@ -3595,7 +3642,9 @@ int sync_dump(ctx_t *ctx_p, const char *const dir_path) { queue_id++; } +#ifdef THREADING_SUPPORT threads_foreach(sync_dump_thread, STATE_RUNNING, &arg); +#endif l_sync_dump_end: dirfd_obj = DUMP_DIRFD_ROOT; @@ -3990,7 +4039,9 @@ int sync_run(ctx_t *ctx_p) { // Killing children +#ifdef THREADING_SUPPORT thread_cleanup(ctx_p); +#endif debug(2, "Deinitializing the FS monitor subsystem"); switch (ctx_p->flags[MONITOR]) {