redmine

continued cluster support; added option "--ignoreexitcode"

... ... @@ -17,38 +17,84 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*
Hello, dear developer.
Cluster technologies are almost always very difficult. So I'll try to
fill this code with comments. Enjoy ;)
-- 0x8E30679C
*/
#include "common.h"
#include "cluster.h"
#include "sync.h"
#include "output.h"
#include "malloc.h"
// Global variables. They will be initialized in cluster_init()
options_t *options_p=NULL;
indexes_t *indexes_p=NULL;
pthread_t pthread_cluster;
extern int cluster_loop();
/**
* @brief Initializes cluster subsystem
*
* @param[in] _options_p Pointer to "options" variable, defined in main()
* @param[in] _indexes_p Pointer to "indexes" variable, defined in sync_run()
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
int ret;
// Preventing double initializing
if(options_p != NULL) {
printf_e("Error: cluster_init(): cluster subsystem is already initialized.\n");
return EALREADY;
}
// Initializing global variables
options_p = _options_p;
indexes_p = _indexes_p;
// Running thread, that will process incoming traffic from another nodes of the cluster.
// The process is based on function cluster_loop()
ret = pthread_create(&pthread_cluster, NULL, (void *(*)(void *))cluster_loop, NULL);
return ret;
}
/**
* @brief (syncop) Sends signal to thread that is processing incoming traffic from another nodes of the cluster [thread: cluster_loop()]
*
* @param[in] signal Signal number
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
inline int cluster_signal(int signal) {
return pthread_kill(pthread_cluster, signal);
}
/**
* @brief Antagonist of cluster_init() function. Kills cluster_loop()-thread.
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_deinit() {
cluster_signal(SIGTERM);
... ... @@ -56,18 +102,47 @@ int cluster_deinit() {
}
/**
* @brief (syncop) Forces anothes nodes to ignore events about the file or directory
*
* @param[in] fpath Path to the file or directory
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_lock(const char *fpath) {
return 0;
}
/**
* @brief (syncop) Forces anothes nodes to ignore events about all files and directories listed in queues of "indexes_p"
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_lock_byindexes() {
return 0;
}
/**
* @brief (syncop) Returns events-handling on another nodes about all files and directories, locked by cluster_lock() and cluster_lock_byindexes() from this node
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_unlock_all() {
return 0;
}
#define CLUSTER_LOOP_CHECK(a) {\
int ret = a;\
if(ret) {\
... ... @@ -76,17 +151,33 @@ int cluster_unlock_all() {
}\
}
/**
* @brief Processes incoming traffic from another nodes in loop. cluster_init() function create a thread for this function.
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_loop() {
// Ignoring SIGINT signal
sigset_t sigset_cluster;
sigemptyset(&sigset_cluster);
sigaddset(&sigset_cluster, SIGINT);
CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_BLOCK, &sigset_cluster, NULL));
// Don't ignoring SIGTERM signal
sigemptyset(&sigset_cluster);
sigaddset(&sigset_cluster, SIGTERM);
CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_UNBLOCK, &sigset_cluster, NULL));
// Starting the loop
while(1) {
// Breaking the loop, if there's SIGTERM signal for this thread
if(sigpending(&sigset_cluster))
if(sigismember(&sigset_cluster, SIGTERM))
break;
... ... @@ -95,6 +186,54 @@ int cluster_loop() {
}
return 0;
#ifdef DOXYGEN
sync_term(0);
#endif
}
/**
* @brief Updating information about modification time of a directory.
*
* @param[in] dirpath Path to the directory
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_modtime_update(const char *dirpath) {
return 0;
}
/**
* @brief (syncop) Syncing file tree with another nodes with using of directories' modification time as a recent-detector.
*
* @param[in] dirpath Path to the directory
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_initialsync() {
return 0;
}
/**
* @brief (syncop) "Captures" right to update the file or directory to another nodes. It just removes events about the file of directory from another nodes
*
* @param[in] dirpath Path to the directory
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_capture(const char *path) {
return 0;
}
... ...
... ... @@ -23,6 +23,9 @@ extern int cluster_deinit();
extern int cluster_lock(const char *fpath);
extern int cluster_lock_byindexes();
extern int cluster_unlock_all();
extern int cluster_capture(const char *fpath);
extern int cluster_modtime_update(const char *dirpath);
extern int cluster_initialsync();
... ...
... ... @@ -86,6 +86,7 @@ enum flags_enum {
CLUSTERIFACE = 'c',
CLUSTERMCASTIPADDR = 'm',
CLUSTERTIMEOUT = 'W',
CLUSTERNODENAME = 'n',
HELP = 'h',
DELAY = 't',
BFILEDELAY = 'T',
... ... @@ -97,6 +98,7 @@ enum flags_enum {
RSYNC = 'R',
RSYNCINCLIMIT = 'L',
RSYNC_PREFERINCLUDE= 'I',
IGNOREEXITCODE = 'x',
DONTUNLINK = 'U',
INITFULL = 'F',
SYNCTIMEOUT = 'k',
... ... @@ -156,6 +158,7 @@ struct options {
char *destdirwslash;
char *cluster_iface;
char *cluster_mcastipaddr;
char *cluster_nodename;
size_t watchdirlen;
size_t destdirlen;
size_t watchdirsize;
... ... @@ -174,6 +177,7 @@ struct options {
unsigned int synctimeout;
unsigned int cluster_timeout;
sigset_t *sigset;
char isignoredexitcode[(1<<8)];
};
typedef struct options options_t;
... ...
... ... @@ -34,12 +34,14 @@ static struct option long_options[] =
{"cluster-iface", required_argument, NULL, CLUSTERIFACE}, // Not implemented, yet
{"cluster-ip", required_argument, NULL, CLUSTERMCASTIPADDR}, // Not implemented, yet
{"cluster-timeout", required_argument, NULL, CLUSTERTIMEOUT}, // Not implemented, yet
{"cluster-node-name", required_argument, NULL, CLUSTERNODENAME}, // Not implemented, yet
{"collectdelay", required_argument, NULL, DELAY},
{"syncdelay", required_argument, NULL, SYNCDELAY},
{"outlistsdir", required_argument, NULL, OUTLISTSDIR},
{"rsync", no_argument, NULL, RSYNC},
{"rsyncinclimit", required_argument, NULL, RSYNCINCLIMIT},
{"rsyncpreferinclude", no_argument, NULL, RSYNC_PREFERINCLUDE},
{"ignoreexitcode", required_argument, NULL, IGNOREEXITCODE},
{"dontunlinklists", no_argument, NULL, DONTUNLINK},
{"fullinitialsync", no_argument, NULL, INITFULL},
{"bigfilethreshold", required_argument, NULL, BFILETHRESHOLD},
... ... @@ -78,9 +80,9 @@ int parse_arguments(int argc, char *argv[], struct options *options_p) {
int option_index = 0;
while(1) {
#ifdef FANOTIFY_SUPPORT
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:f", long_options, &option_index);
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:n:x:f", long_options, &option_index);
#else
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:", long_options, &option_index);
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:n:x:", long_options, &option_index);
#endif
if (c == -1) break;
... ... @@ -98,6 +100,9 @@ int parse_arguments(int argc, char *argv[], struct options *options_p) {
case 'W':
options_p->cluster_timeout = (unsigned int)atol(optarg);
break;
case 'n':
options_p->cluster_nodename = optarg;
break;
case 'd':
options_p->listoutdir = optarg;
break;
... ... @@ -129,6 +134,9 @@ int parse_arguments(int argc, char *argv[], struct options *options_p) {
case 'k':
options_p->synctimeout = (unsigned int)atol(optarg);
break;
case 'x':
options_p->isignoredexitcode[(unsigned char)atoi(optarg)] = 1;
break;
case 'V':
version();
break;
... ... @@ -326,11 +334,25 @@ int main(int argc, char *argv[]) {
ret = EINVAL;
}
if((options.cluster_iface == NULL) && (options.cluster_mcastipaddr != NULL)) {
printf_e("Error: Option \"--cluster-ip\" cannot be used without \"--cluster-iface\".\n");
if((options.flags[RSYNC]>1) && (options.cluster_iface != NULL)) {
printf_e("Error: Option \"-RR\" cannot be used in conjunction with \"--cluster-iface\".\n");
ret = EINVAL;
}
if((options.cluster_iface == NULL) && ((options.cluster_mcastipaddr != NULL) || (options.cluster_nodename != NULL))) {
printf_e("Error: Options \"--cluster-ip\" and/or \"--cluster-node-name\" cannot be used without \"--cluster-iface\".\n");
ret = EINVAL;
}
if(options.cluster_iface != NULL) {
if(options.cluster_nodename == NULL)
options.cluster_nodename = getenv("HOSTNAME");
if(options.cluster_nodename == NULL) {
printf_e("Error: Option \"--cluster-iface\" is set, but \"--cluster-node-name\" is not set and environment variable \"HOSTNAME\" doesn't exist.\n");
ret = EINVAL;
}
}
{
size_t size = options.watchdirlen + 2;
char *newwatchdir = xmalloc(size);
... ...
... ... @@ -82,6 +82,8 @@ with option
.B \-c, \-\-cluster\-iface
.I interface\-ip
.RS 8
Not implemented yet!
Enables inter-node notifing subsystem to prevent sync looping between nodes.
This's very useful features that provides ability of birectional sync of the
same directory between two or more nodes.
... ... @@ -101,6 +103,8 @@ ip a s eth0 | awk '{if($1=="inet") {gsub("/.*", "", $2); print $2}}'
.B \-c, \-\-cluster\-ip
.I multicast\-ip
.RS 8
Not implemented yet!
Sets IP-address for multicast group.
This option can be used only in conjuction with
... ... @@ -116,13 +120,29 @@ Default value is "227.108.115.121". [(128+"c")."l"."s"."y"]
.B \-c, \-\-cluster\-timeout
.I cluster\-timeout
.RS 8
Sets timeout (in milliseconds) of waiting answer from another nodes of the cluster.
If there's no answer from some node, it will be excluded.
Not implemented yet!
Sets timeout (in milliseconds) of waiting answer from another nodes of the
cluster. If there's no answer from some node, it will be excluded.
Default value is "1000". [1 second]
.RE
.PP
.B \-c, \-\-cluster\-node\-name
.I cluster\-node\-name
.RS 8
Not implemented yet!
Sets the name of current node in the cluster. It will be used in action
scripts of another nodes (see
.B ACTION SCRIPT
).
Default value is $HOSTNAME.
.RE
.PP
.B \-t, \-\-collectdelay
.I ordinary\-delay
.RS 8
... ... @@ -179,12 +199,12 @@ option.
Recommended option.
You can use "-RR" to run "rsync" directly, bypassing intermediate script
You can use "\-RR" to run "rsync" directly, bypassing intermediate script
(see
.B ACTION SCRIPT
case
.B e
)
). But you cannot use "\-\-cluster\-*" options in "\-RR" mode.
Is not set by default.
.RE
... ... @@ -226,6 +246,23 @@ Is not set by default.
.RE
.PP
.B \-x, \-\-ignoreexitcode
.I exitcode
.RS 8
Forces
.B clsync
to do not process exitcode
.I exitcode
of
.I action-script
as an error. You can set multiple ignores by passing this option multiple
times.
Recommended values for rsync case are "23" and "24".
["-x 23 -x 24"]
.RE
.PP
.B \-U, \-\-dontunlinklists
.RS 8
Do not delete list\-files after
... ... @@ -368,7 +405,7 @@ case
.RS
.I action\-script
initialsync
.I label dirpath
.I label dirpath [nodes]
In this case,
.I action\-script
... ... @@ -382,7 +419,7 @@ case
.RS
.I action\-script
sync
.I label evmask path
.I label evmask path [nodes]
In this case,
.I action\-script
... ... @@ -399,7 +436,7 @@ case
.RS
.I action\-script
synclist
.I label listpath
.I label listpath [nodes]
In this case,
.I action\-script
... ... @@ -417,7 +454,7 @@ case
.RS
.I action\-script
rsynclist
.I label rsync\-listpath [rsync\-exclude\-listpath]
.I label rsync\-listpath [nodes] [rsync\-exclude\-listpath]
In this case,
.I action\-script
... ... @@ -469,9 +506,15 @@ will be ignored in this case.
As can be noticed, in the first four cases clsync's
.I label
is passed (see
(see
.I \-\-label
).
) and
.I nodes
are passed (or can be passed).
.I nodes
is comma-separated list of cluster nodes names where to sync to (see
.I --cluster-node-name
)
The listfile by path
.I listpath
... ...
... ... @@ -28,6 +28,9 @@
pthread_t pthread_sighandler;
static inline int _exitcode_process(options_t *options_p, int exitcode) {
if(options_p->isignoredexitcode[(unsigned char)exitcode])
return 0;
if(exitcode && !((options_p->flags[RSYNC]>=2) && (exitcode == 24))) {
printf_e("Error: Got non-zero exitcode %i from __sync_exec().\n", exitcode);
return exitcode;
... ... @@ -648,6 +651,9 @@ static int sync_queuesync(const char *fpath, eventinfo_t *evinfo, options_t *opt
eventinfo_t *evinfo_dup = (eventinfo_t *)xmalloc(sizeof(*evinfo_dup));
memcpy(evinfo_dup, evinfo, sizeof(*evinfo_dup));
if(options_p->cluster_iface)
cluster_capture(fpath);
return indexes_queueevent(indexes_p, strdup(fpath), evinfo_dup, queue_id);
}
... ... @@ -759,6 +765,11 @@ int sync_initialsync_rsync_walk(options_t *options_p, const char *dirpath, index
int sync_initialsync(const char *path, options_t *options_p, indexes_t *indexes_p, initsync_t initsync) {
printf_ddd("Debug3: sync_initialsync(\"%s\", options_p, indexes_p, %i)\n", path, initsync);
if(initsync == INITSYNC_FULL) {
if(options_p->cluster_iface)
cluster_initialsync();
}
if(!options_p->flags[RSYNC]) {
// non-RSYNC case:
printf_ddd("Debug3: sync_initialsync(): syncing \"%s\"\n", path);
... ... @@ -816,6 +827,9 @@ int sync_notify_mark(int notify_d, options_t *options_p, const char *accpath, co
return wd;
}
if(options_p->cluster_iface)
cluster_modtime_update(path);
switch(options_p->notifyengine) {
#ifdef FANOTIFY_SUPPORT
case NE_FANOTIFY: {
... ... @@ -1650,12 +1664,17 @@ int sync_inotify_handle(int inotify_d, options_t *options_p, indexes_t *indexes_
char *end = &buf[r];
while(ptr < end) {
struct inotify_event *event = (struct inotify_event *)ptr;
// Removing stale wd-s
if(event->mask & IN_IGNORED) {
printf_dd("Debug2: Cleaning up info about watch descriptor %i.\n", event->wd);
indexes_remove_bywd(indexes_p, event->wd);
SYNC_INOTIFY_HANDLE_CONTINUE;
}
// Getting path
char *fpath = indexes_wd2fpath(indexes_p, event->wd);
if(fpath == NULL) {
... ... @@ -1664,12 +1683,16 @@ int sync_inotify_handle(int inotify_d, options_t *options_p, indexes_t *indexes_
}
printf_dd("Debug2: Event %p on \"%s\" (wd: %i; fpath: \"%s\").\n", (void *)(long)event->mask, event->len>0?event->name:"", event->wd, fpath);
// Getting full path
char *fpathfull = xmalloc(strlen(fpath) + event->len + 2);
if(event->len>0)
sprintf(fpathfull, "%s/%s", fpath, event->name);
else
sprintf(fpathfull, "%s", fpath);
// Getting infomation about file/dir/etc
struct stat64 lstat;
mode_t st_mode;
size_t st_size;
... ... @@ -1685,6 +1708,8 @@ int sync_inotify_handle(int inotify_d, options_t *options_p, indexes_t *indexes_
st_size = lstat.st_size;
}
// Checking by filter rules
ruleaction_t ruleaction = rules_check(fpathfull, st_mode, options_p->rules);
if(ruleaction == RULE_REJECT) {
... ... @@ -1692,8 +1717,12 @@ int sync_inotify_handle(int inotify_d, options_t *options_p, indexes_t *indexes_
SYNC_INOTIFY_HANDLE_CONTINUE;
}
// Handling different cases
if(event->mask & IN_ISDIR) {
if(event->mask & (IN_CREATE|IN_MOVED_TO)) { // Appeared
// If new dir is created
int ret;
ret = sync_mark_walk(inotify_d, options_p, fpathfull, indexes_p);
... ... @@ -1717,6 +1746,8 @@ int sync_inotify_handle(int inotify_d, options_t *options_p, indexes_t *indexes_
}
}
// Locally queueing the event
int isnew = 0;
eventinfo_t *evinfo = indexes_fpath2ei(indexes_p, fpathfull);
if(evinfo == NULL) {
... ... @@ -1737,6 +1768,8 @@ int sync_inotify_handle(int inotify_d, options_t *options_p, indexes_t *indexes_
SYNC_INOTIFY_HANDLE_CONTINUE;
}
// Globally queueing captured events
struct dosync_arg dosync_arg;
dosync_arg.options_p = options_p;
dosync_arg.indexes_p = indexes_p;
... ...