redmine

"--rsyncinclimit" added

... ... @@ -22,7 +22,8 @@ log*
from/*
to/*
.clsync-list.*
debian/clsync/DEBIAN
debian/clsync
debian/clsync.debhelper.log
debian/clsync.substvars
example/rules
... ...
... ... @@ -78,6 +78,7 @@ enum flags_enum {
VERBOSE = 'v',
OUTLISTSDIR = 'd',
RSYNC = 'R',
RSYNCINCLIMIT = 'I',
DONTUNLINK = 'U',
#ifdef FANOTIFY_SUPPORT
FANOTIFY = 'f',
... ... @@ -90,11 +91,11 @@ enum flags_enum {
typedef enum flags_enum flags_t;
enum queue_enum {
QUEUE_AUTO = 0,
QUEUE_NORMAL,
QUEUE_BIGFILE,
QUEUE_INSTANT,
QUEUE_MAX
QUEUE_MAX,
QUEUE_AUTO
};
typedef enum queue_enum queue_id_t;
... ... @@ -131,6 +132,7 @@ struct options {
size_t bfilethreshold;
unsigned int commondelay;
queueinfo_t _queues[QUEUE_MAX]; // TODO: remove this from here
unsigned int rsyncinclimit;
};
typedef struct options options_t;
... ... @@ -199,9 +201,12 @@ typedef enum initsync_enum initsync_t;
struct dosync_arg {
int evcount;
char outf_path[PATH_MAX+1];
FILE *outf;
options_t *options_p;
indexes_t *indexes_p;
void *data;
int linescount;
char buf[BUFSIZ+1];
};
... ...
... ... @@ -16,6 +16,7 @@
#define DEFAULT_BFILETHRESHOLD (128 * 1024 * 1024)
#define DEFAULT_BFILECOLLECTDELAY 1800
#define DEFAULT_LABEL "nolabel"
#define DEFAULT_RSYNC_INCLUDELINESLIMIT 100000
#define FANOTIFY_FLAGS (FAN_CLOEXEC|FAN_UNLIMITED_QUEUE|FAN_UNLIMITED_MARKS)
#define FANOTIFY_EVFLAGS (O_LARGEFILE|O_RDONLY|O_CLOEXEC)
... ...
... ... @@ -11,7 +11,11 @@ ARG1="$4"
function rsynclist() {
LISTFILE="$1"
rsync -avH --delete-before --include-from="$LISTFILE" --exclude='*' "$FROM"/ "$TO"/
sort < "$LISTFILE" | uniq > "$LISTFILE"-uniq
rsync -avH --delete-before --include-from="${LISTFILE}-uniq" --exclude='*' "$FROM"/ "$TO"/
rm -f -- "${LISTFILE}-uniq"
}
case "$ACTION" in
... ...
... ... @@ -34,6 +34,7 @@ static struct option long_options[] =
{"commondelay", required_argument, NULL, COMMONDELAY},
{"outlistsdir", required_argument, NULL, OUTLISTSDIR},
{"rsync", no_argument, NULL, RSYNC},
{"rsyncinclimit", required_argument, NULL, RSYNCINCLIMIT},
{"dontunlinklists", no_argument, NULL, DONTUNLINK},
{"bigfilethreshold", required_argument, NULL, BFILETHRESHOLD},
{"bigfilecollectdelay", required_argument, NULL, BFILEDELAY},
... ... @@ -70,9 +71,9 @@ int parse_arguments(int argc, char *argv[], struct options *options) {
int option_index = 0;
while(1) {
#ifdef FANOTIFY_SUPPORT
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDhaVRUf", long_options, &option_index);
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDhaVRUI:f", long_options, &option_index);
#else
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDhaVRU", long_options, &option_index);
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDhaVRUI:", long_options, &option_index);
#endif
if (c == -1) break;
... ... @@ -106,8 +107,12 @@ int parse_arguments(int argc, char *argv[], struct options *options) {
case 'i':
options->notifyengine = NE_INOTIFY;
break;
case 'I':
options->rsyncinclimit = (unsigned int)atol(optarg);
break;
case 'V':
version();
break;
default:
options->flags[c]++;
break;
... ... @@ -266,7 +271,7 @@ int main_rehash(options_t *options_p) {
int main(int argc, char *argv[]) {
struct options options;
int ret = 0;
int ret = 0, nret;
memset(&options, 0, sizeof(options));
options.notifyengine = DEFAULT_NOTIFYENGINE;
options.commondelay = DEFAULT_COMMONDELAY;
... ... @@ -275,6 +280,7 @@ int main(int argc, char *argv[]) {
options._queues[QUEUE_INSTANT].collectdelay= COLLECTDELAY_INSTANT;
options.bfilethreshold = DEFAULT_BFILETHRESHOLD;
options.label = DEFAULT_LABEL;
options.rsyncinclimit = DEFAULT_RSYNC_INCLUDELINESLIMIT;
parse_arguments(argc, argv, &options);
out_init(options.flags);
... ... @@ -291,15 +297,20 @@ int main(int argc, char *argv[]) {
}
#endif
main_rehash(&options);
nret=main_rehash(&options);
if(nret)
ret = nret;
if(access(options.actfpath, X_OK) == -1) {
printf_e("Error: \"%s\" is not executable: %s (errno: %i).\n", options.actfpath, strerror(errno), errno);
ret = errno;
}
if(options.flags[BACKGROUND])
ret = becomedaemon();
if(options.flags[BACKGROUND]) {
nret = becomedaemon();
if(nret)
ret = nret;
}
options.watchdirlen = strlen(options.watchdir);
if(ret == 0)
... ...
... ... @@ -125,6 +125,23 @@ Is not set by default.
.RE
.PP
.B -R, --rsyncinclimit
.I rsync-includes-line-limit
.RS 8
Sets soft limit for lines count in files by path
.I rsynclistpath
\*S. Unfortunatelly, rsync works very slow with huge "--include-from" files.
So,
.B clsync
splits that list with approximately
.I rsync-includes-line-limit
lines per list if it's to big, and executes by one rsync instance per list
part.
Default value is "100000".
.RE
.PP
.B -U, --dontunlinklists
.RS 8
Forces
... ...
... ... @@ -752,8 +752,9 @@ int sync_idle_dosync_collectedevents_cleanup(options_t *options_p, char **argv)
return unlink(argv[3]);
}
int sync_idle_dosync_collectedevents_aggrqueue(queue_id_t queue_id, options_t *options_p, indexes_t *indexes_p, time_t tm, struct dosync_arg *dosync_arg) {
int sync_idle_dosync_collectedevents_aggrqueue(queue_id_t queue_id, options_t *options_p, indexes_t *indexes_p, struct dosync_arg *dosync_arg) {
// char *buf, *fpath;
time_t tm = time(NULL);
queueinfo_t *queueinfo = &options_p->_queues[queue_id];
... ... @@ -779,13 +780,65 @@ int sync_idle_dosync_collectedevents_aggrqueue(queue_id_t queue_id, options_t *o
return 0;
}
int sync_idle_dosync_collectedevents_listcreate(struct dosync_arg *dosync_arg_p) {
printf_ddd("Debug3: Creating list file\n");
struct stat64 stat64;
char *fpath = dosync_arg_p->outf_path;
options_t *options_p = dosync_arg_p->options_p;
pid_t pid = getpid();
time_t tm = time(NULL);
int counter = 0;
do {
snprintf(fpath, PATH_MAX, "%s/.clsync-list.%u.%lu.%u", options_p->listoutdir, pid, (unsigned long)tm, rand()); // To be uniquea
lstat64(fpath, &stat64);
if(counter++ > COUNTER_LIMIT) {
printf_e("Error: Cannot file unused filename for list-file. The last try was \"%s\".\n", fpath);
return ELOOP;
}
} while(errno != ENOENT); // TODO: find another way to check if the object exists
errno=0;
dosync_arg_p->outf = fopen(fpath, "w");
if(dosync_arg_p->outf == NULL) {
printf_e("Error: Cannot open \"%s\" as file for writing: %s (errno: %i).\n", fpath, strerror(errno), errno);
return errno;
}
setbuffer(dosync_arg_p->outf, dosync_arg_p->buf, BUFSIZ);
printf_ddd("Debug3: Created list-file \"%s\"\n", fpath);
dosync_arg_p->linescount = 0;
return 0;
}
int sync_idle_dosync_collectedevents_listcommit(struct dosync_arg *dosync_arg_p) {
printf_ddd("Debug3: Commiting the file\n");
options_t *options_p = dosync_arg_p->options_p;
fclose(dosync_arg_p->outf);
dosync_arg_p->outf = NULL;
if(dosync_arg_p->evcount > 0) {
if(options_p->flags[PTHREAD])
return sync_exec_thread(options_p, sync_idle_dosync_collectedevents_cleanup,
options_p->actfpath, options_p->flags[RSYNC]?"rsynclist":"synclist", options_p->label, dosync_arg_p->outf_path, NULL);
else
return sync_exec (options_p, sync_idle_dosync_collectedevents_cleanup,
options_p->actfpath, options_p->flags[RSYNC]?"rsynclist":"synclist", options_p->label, dosync_arg_p->outf_path, NULL);
}
return 0;
}
gboolean sync_idle_dosync_collectedevents_listpush(gpointer fpath_gp, gpointer evinfo_gp, gpointer arg_gp) {
struct dosync_arg *dosync_arg_p = (struct dosync_arg *)arg_gp;
char *fpath = (char *)fpath_gp;
eventinfo_t *evinfo = (eventinfo_t *)evinfo_gp;
// int *evcount_p =&((struct dosync_arg *)arg_gp)->evcount;
FILE *outf = ((struct dosync_arg *)arg_gp)->outf;
options_t *options_p = ((struct dosync_arg *)arg_gp)->options_p;
// indexes_t *indexes_p = ((struct dosync_arg *)arg_gp)->indexes_p;
// int *evcount_p =&dosync_arg_p->evcount;
FILE *outf = dosync_arg_p->outf;
options_t *options_p = dosync_arg_p->options_p;
int *linescount_p =&dosync_arg_p->linescount;
// indexes_t *indexes_p = dosync_arg_p->indexes_p;
printf_ddd("Debug3: sync_idle_dosync_collectedevents_listpush(): \"%s\" with int-flags %p\n", fpath, (void *)(unsigned long)evinfo->flags);
if(!options_p->flags[RSYNC]) {
... ... @@ -796,6 +849,21 @@ gboolean sync_idle_dosync_collectedevents_listpush(gpointer fpath_gp, gpointer e
}
// RSYNC case
// TODO:
// - Deduplicate output records
if(*linescount_p >= options_p->rsyncinclimit) {
int ret;
if((ret=sync_idle_dosync_collectedevents_listcommit(dosync_arg_p))) {
printf_e("Error: sync_idle_dosync_collectedevents_listpush(): Cannot commit list-file \"%s\": %s (errno: %i)\n", dosync_arg_p->outf_path, strerror(ret), ret);
exit(ret); // TODO: replace with kill(0, ...);
}
if((ret=sync_idle_dosync_collectedevents_listcreate(dosync_arg_p))) {
printf_e("Error: sync_idle_dosync_collectedevents_listpush(): Cannot create new list-file: %s (errno: %i)\n", strerror(ret), ret);
exit(ret); // TODO: replace with kill(0, ...);
}
outf = dosync_arg_p->outf;
}
size_t fpathlen = strlen(fpath);
char *fpath_rel_p = xmalloc(fpathlen+1);
... ... @@ -808,6 +876,7 @@ gboolean sync_idle_dosync_collectedevents_listpush(gpointer fpath_gp, gpointer e
if(evinfo->flags & EVIF_RECURSIVELY) {
printf_ddd("Debug3: sync_idle_dosync_collectedevents_listpush(): Recursively \"%s\": Adding to rsynclist: \"%s/***\".\n", fpath, fpath_rel);
fprintf(outf, "%s/***\n", fpath_rel);
(*linescount_p)++;
}
... ... @@ -816,6 +885,7 @@ gboolean sync_idle_dosync_collectedevents_listpush(gpointer fpath_gp, gpointer e
break;
printf_ddd("Debug3: sync_idle_dosync_collectedevents_listpush(): Non-recursively \"%s\": Adding to rsynclist: \"%s\".\n", fpath, fpath_rel);
fprintf(outf, "%s\n", fpath_rel);
(*linescount_p)++;
end = strrchr(fpath_rel, '/');
if(end == NULL)
break;
... ... @@ -829,14 +899,14 @@ gboolean sync_idle_dosync_collectedevents_listpush(gpointer fpath_gp, gpointer e
return TRUE;
}
int sync_idle_dosync_collectedevents(options_t *options_p, indexes_t *indexes_p) {
char *buf, *fpath;
struct dosync_arg dosync_arg;
dosync_arg.evcount = 0;
dosync_arg.options_p = options_p;
dosync_arg.indexes_p = indexes_p;
dosync_arg.outf = NULL;
time_t tm = time(NULL);
if(options_p->listoutdir != NULL)
g_hash_table_remove_all(indexes_p->fpath2ei_ht); // Just in case.
... ... @@ -846,7 +916,7 @@ int sync_idle_dosync_collectedevents(options_t *options_p, indexes_t *indexes_p)
int ret;
queue_id_t *queue_id_p = (queue_id_t *)&dosync_arg.data;
*queue_id_p = queue_id;
ret = sync_idle_dosync_collectedevents_aggrqueue(queue_id, options_p, indexes_p, tm, &dosync_arg);
ret = sync_idle_dosync_collectedevents_aggrqueue(queue_id, options_p, indexes_p, &dosync_arg);
if(ret) {
printf_e("Error: Got error while processing queue #%i\n: %s (errno: %i).\n", queue_id, strerror(ret), ret);
g_hash_table_remove_all(indexes_p->fpath2ei_ht);
... ... @@ -862,44 +932,20 @@ int sync_idle_dosync_collectedevents(options_t *options_p, indexes_t *indexes_p)
}
if(options_p->listoutdir != NULL) {
buf = alloca(BUFSIZ+1);
fpath = alloca(PATH_MAX+1);
int counter = 0;
struct stat64 stat64;
pid_t pid = getpid();
do {
snprintf(fpath, PATH_MAX, "%s/.clsync-list.%u.%lu.%u", options_p->listoutdir, pid, (unsigned long)tm, rand()); // To be uniquea
lstat64(fpath, &stat64);
if(counter++ > COUNTER_LIMIT) {
printf_e("Error: Cannot file unused filename for list-file. The last try was \"%s\".\n", fpath);
return ELOOP;
}
} while(errno != ENOENT); // TODO: find another way to check if the object exists
errno=0;
dosync_arg.outf = fopen(fpath, "w");
int ret;
if(dosync_arg.outf == NULL) {
printf_e("Error: Cannot open \"%s\" as file for writing: %s (errno: %i).\n", fpath, strerror(errno), errno);
return errno;
if((ret=sync_idle_dosync_collectedevents_listcreate(&dosync_arg))) {
printf_e("Error: Cannot create list-file: %s (errno: %i)\n", strerror(ret), ret);
return ret;
}
setbuffer(dosync_arg.outf, buf, BUFSIZ);
printf_ddd("Debug3: Created list-file \"%s\"\n", fpath);
g_hash_table_foreach_remove(indexes_p->fpath2ei_ht, sync_idle_dosync_collectedevents_listpush, &dosync_arg);
// g_hash_table_remove_all(indexes_p->fpath2ei_ht);
fclose(dosync_arg.outf);
if(dosync_arg.evcount > 0) {
if(options_p->flags[PTHREAD])
return sync_exec_thread(options_p, sync_idle_dosync_collectedevents_cleanup,
options_p->actfpath, options_p->flags[RSYNC]?"rsynclist":"synclist", options_p->label, fpath, NULL);
else
return sync_exec (options_p, sync_idle_dosync_collectedevents_cleanup,
options_p->actfpath, options_p->flags[RSYNC]?"rsynclist":"synclist", options_p->label, fpath, NULL);
if((ret=sync_idle_dosync_collectedevents_listcommit(&dosync_arg))) {
printf_e("Error: Cannot submit to sync the list \"%s\": %s (errno: %i)\n", dosync_arg.outf_path, strerror(ret), ret);
return ret;
}
}
return 0;
... ... @@ -912,6 +958,8 @@ int sync_idle(int notify_d, options_t *options_p, indexes_t *indexes_p) {
if(ret) return ret;
printf_ddd("Debug3: sync_idle(): calling sync_idle_dosync_collectedevents()\n");
// TODO: make a separate thread on sync_idle_dosync_collectedevents();
sync_idle_dosync_collectedevents(options_p, indexes_p);
return 0;
}
... ... @@ -1156,7 +1204,7 @@ int sync_inotify_loop(int inotify_d, options_t *options_p, indexes_t *indexes_p)
}
int count=sync_inotify_handle(inotify_d, options_p, indexes_p);
if(count<=0) {
if(count <= 0) {
printf_e("Error: Cannot handle with inotify events: %s (errno: %i).\n", strerror(errno), errno);
return errno;
}
... ...