redmine

cluster subsystem continued

... ... @@ -23,6 +23,9 @@
Cluster technologies are almost always very difficult. So I'll try to
fill this code with comments. Enjoy ;)
Also you can ask me directly by e-mail or IRC, if something seems too
hard.
-- 0x8E30679C
*/
... ... @@ -374,27 +377,57 @@ int cluster_loop() {
/**
* @brief Updating information about modification time of a directory.
*
* @param[in] dirpath Path to the directory
* @param[in] path Canonized path to updated file/dir
* @param[in] dirlevel Directory level provided by fts (man 3 fts)
* @param[in] st_mode st_mode value to detect is it directory or not (S_IFDIR or not)
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_modtime_update(const char *dirpath) {
int cluster_modtime_update(const char *path, short int dirlevel, mode_t st_mode) {
// "modtime" is incorrent name-part of function. Actually it updates "change time" (man 2 lstat64).
int ret;
// Getting directory information (including "change time" aka "st_ctime")
// Getting directory level (depth)
short int dirlevel_rel = dirlevel - options_p->watchdir_dirlevel;
if((st_mode & S_IFMT) == S_IFDIR)
dirlevel_rel++;
// Don't remembering information about directories with level beyond the limits
if((dirlevel_rel > options_p->cluster_scan_dl_max) || (dirlevel_rel < options_p->cluster_hash_dl_min))
return 0;
// Getting directory/file-'s information (including "change time" aka "st_ctime")
struct stat64 stat64;
ret=lstat64(dirpath, &stat64);
ret=lstat64(path, &stat64);
if(ret) {
printf_e("Error: cluster_modtime_update() cannot lstat64() on \"%s\": %s (errno: %i)\n", dirpath, strerror(errno), errno);
printf_e("Error: cluster_modtime_update() cannot lstat64() on \"%s\": %s (errno: %i)\n", path, strerror(errno), errno);
return errno;
}
// Getting absolute directory path
const char *dirpath;
if((st_mode & S_IFMT) == S_IFDIR) {
dirpath = path;
} else {
char *path_dup = strdup(path);
dirpath = (const char *)dirname(path_dup);
free(path_dup);
}
// Getting relative directory path
size_t dirpath_len = strlen(dirpath);
char *dirpath_rel_p = xmalloc(dirpath_len+1);
char *dirpath_rel = dirpath_rel_p;
memcpy(dirpath_rel, &dirpath[options_p->watchdirlen], dirpath_len+1 - options_p->watchdirlen);
// Updating "st_ctime" information. g_hash_table_replace() will replace existent information about the directory or create it if it doesn't exist.
g_hash_table_replace(nodeinfo_my->modtime_ht, strdup(dirpath), GINT_TO_POINTER(stat64.st_ctime));
g_hash_table_replace(nodeinfo_my->modtime_ht, strdup(dirpath_rel), GINT_TO_POINTER(stat64.st_ctime));
// Why I'm using "st_ctime" instead of "st_mtime"? Because "st_ctime" also updates on updating inode information.
... ...
... ... @@ -69,6 +69,6 @@ extern int cluster_lock_byindexes();
extern int cluster_unlock_all();
extern int cluster_capture(const char *fpath);
extern int cluster_modtime_update(const char *dirpath);
extern int cluster_modtime_update(const char *dirpath, short int dirlevel, mode_t st_mode);
extern int cluster_initialsync();
... ...
... ... @@ -56,7 +56,7 @@
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <libgen.h>
#include "config.h"
... ... @@ -94,6 +94,9 @@ enum flags_enum {
CLUSTERMCASTIPPORT = 'P',
CLUSTERTIMEOUT = 'W',
CLUSTERNODENAME = 'n',
CLUSTERHDLMIN = 'o',
CLUSTERHDLMAX = 'O',
CLUSTERSDLMAX = 's',
HELP = 'h',
DELAY = 't',
BFILEDELAY = 'T',
... ... @@ -168,12 +171,16 @@ struct options {
char *cluster_nodename;
uint32_t cluster_nodename_len;
uint16_t cluster_mcastipport;
uint16_t cluster_hash_dl_min;
uint16_t cluster_hash_dl_max;
uint16_t cluster_scan_dl_max;
size_t watchdirlen;
size_t destdirlen;
size_t watchdirsize;
size_t destdirsize;
size_t watchdirwslashsize;
size_t destdirwslashsize;
short int watchdir_dirlevel;
char *actfpath;
char *rulfpath;
char *listoutdir;
... ...
... ... @@ -26,6 +26,9 @@
#define DEFAULT_CLUSTERTIMEOUT 1000
#define DEFAULT_CLUSTERIPADDR "227.108.115.121"
#define DEFAULT_CLUSTERIPPORT 40079
#define DEFAULT_CLUSTERHDLMIN 1
#define DEFAULT_CLUSTERHDLMAX 16
#define DEFAULT_CLUSTERSDLMAX 32
#define FANOTIFY_FLAGS (FAN_CLOEXEC|FAN_UNLIMITED_QUEUE|FAN_UNLIMITED_MARKS)
#define FANOTIFY_EVFLAGS (O_LARGEFILE|O_RDONLY|O_CLOEXEC)
... ...
... ... @@ -67,7 +67,7 @@ char *fd2fpath_malloc(int fd) {
*
*/
int fileutils_copy(char *path_from, char *path_to) {
int fileutils_copy(const char *path_from, const char *path_to) {
char buf[BUFSIZ];
FILE *from, *to;
... ... @@ -112,3 +112,44 @@ int fileutils_copy(char *path_from, char *path_to) {
return 0;
}
/**
* @brief Calculates directory level of a canonized path (actually it just counts "/"-s)
*
* @param[in] path Canonized path (with realpath())
*
* @retval zero or prositive Direcory level
* @retval negative Got error, while calculation. Error-code is placed to errno.
*
*/
short int fileutils_calcdirlevel(const char *path) {
short int dirlevel = 0;
const char *ptr = path;
if(path == NULL) {
printf_e("Error: fileutils_calcdirlevel(): path is NULL.\n");
errno=EINVAL;
return -1;
}
if(*path == 0) {
printf_e("Error: fileutils_calcdirlevel(): path has zero length.\n");
errno=EINVAL;
return -2;
}
if(*path != '/') {
printf_e("Error: fileutils_calcdirlevel(): path \"%s\" is not canonized.\n", path);
errno=EINVAL;
return -3;
}
while(*(ptr++))
if(*ptr == '/')
dirlevel++;
return dirlevel;
}
... ...
... ... @@ -19,5 +19,6 @@
extern char *fd2fpath_malloc(int fd);
extern int fileutils_copy(char *path_from, char *path_to);
extern int fileutils_copy(const char *path_from, const char *path_to);
extern short int fileutils_calcdirlevel(const char *path);
... ...
... ... @@ -22,6 +22,7 @@
#include "sync.h"
#include "malloc.h"
#include "cluster.h"
#include "fileutils.h"
#define VERSION_MAJ 0
#define VERSION_MIN 0
... ... @@ -36,6 +37,9 @@ static struct option long_options[] =
{"cluster-port", required_argument, NULL, CLUSTERMCASTIPPORT}, // Not implemented, yet
{"cluster-timeout", required_argument, NULL, CLUSTERTIMEOUT}, // Not implemented, yet
{"cluster-node-name", required_argument, NULL, CLUSTERNODENAME}, // Not implemented, yet
{"cluster-hash-dl-min", required_argument, NULL, CLUSTERHDLMIN},
{"cluster-hash-dl-max", required_argument, NULL, CLUSTERHDLMAX},
{"cluster-scan-dl-max", required_argument, NULL, CLUSTERSDLMAX},
{"collectdelay", required_argument, NULL, DELAY},
{"syncdelay", required_argument, NULL, SYNCDELAY},
{"outlistsdir", required_argument, NULL, OUTLISTSDIR},
... ... @@ -81,9 +85,9 @@ int parse_arguments(int argc, char *argv[], struct options *options_p) {
int option_index = 0;
while(1) {
#ifdef FANOTIFY_SUPPORT
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:n:x:f", long_options, &option_index);
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:n:x:o:O:s:f", long_options, &option_index);
#else
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:n:x:", long_options, &option_index);
c = getopt_long(argc, argv, "bT:B:d:t:l:pw:qvDFhaVRUL:Ik:m:c:W:n:x:o:O:s:", long_options, &option_index);
#endif
if (c == -1) break;
... ... @@ -141,6 +145,15 @@ int parse_arguments(int argc, char *argv[], struct options *options_p) {
case 'V':
version();
break;
case CLUSTERHDLMIN:
options_p->cluster_hash_dl_min = (uint16_t)atoi(optarg);
break;
case CLUSTERHDLMAX:
options_p->cluster_hash_dl_max = (uint16_t)atoi(optarg);
break;
case CLUSTERSDLMAX:
options_p->cluster_scan_dl_max = (uint16_t)atoi(optarg);
break;
default:
options_p->flags[c]++;
break;
... ... @@ -316,7 +329,6 @@ int main(int argc, char *argv[]) {
struct options options;
memset(&options, 0, sizeof(options));
int ret = 0, nret;
struct stat64 stat64={0};
options.notifyengine = DEFAULT_NOTIFYENGINE;
options.syncdelay = DEFAULT_SYNCDELAY;
options._queues[QUEUE_NORMAL].collectdelay = DEFAULT_COLLECTDELAY;
... ... @@ -326,6 +338,9 @@ int main(int argc, char *argv[]) {
options.label = DEFAULT_LABEL;
options.rsyncinclimit = DEFAULT_RSYNC_INCLUDELINESLIMIT;
options.synctimeout = DEFAULT_SYNCTIMEOUT;
options.cluster_hash_dl_min = DEFAULT_CLUSTERHDLMIN;
options.cluster_hash_dl_max = DEFAULT_CLUSTERHDLMAX;
options.cluster_scan_dl_max = DEFAULT_CLUSTERSDLMAX;
parse_arguments(argc, argv, &options);
out_init(options.flags);
... ... @@ -344,6 +359,16 @@ int main(int argc, char *argv[]) {
ret = EINVAL;
}
if(options.cluster_hash_dl_min > options.cluster_hash_dl_max) {
printf_e("Error: \"--cluster-hash-dl-min\" cannot be greater than \"--cluster-hash-dl-max\".\n");
ret = EINVAL;
}
if(options.cluster_hash_dl_max > options.cluster_scan_dl_max) {
printf_e("Error: \"--cluster-hash-dl-max\" cannot be greater than \"--cluster-scan-dl-max\".\n");
ret = EINVAL;
}
if(!options.cluster_timeout)
options.cluster_timeout = DEFAULT_CLUSTERTIMEOUT;
if(!options.cluster_mcastipport)
... ... @@ -369,42 +394,64 @@ int main(int argc, char *argv[]) {
}
{
size_t size = options.watchdirlen + 2;
char *newwatchdir = xmalloc(size);
memcpy( newwatchdir, options.watchdir, options.watchdirlen);
if(options.watchdir[options.watchdirlen - 1] == '/') {
options.watchdirsize = size;
options.watchdirwslash = options.watchdir;
options.watchdir = newwatchdir;
options.watchdirlen--;
newwatchdir[options.watchdirlen] = 0x00;
} else {
char *rwatchdir = realpath(options.watchdir, NULL);
if(rwatchdir == NULL) {
printf_e("Error: main(): Got error while realpath() on \"%s\": %s (errno: %i) [#0].\n", options.watchdir, strerror(errno), errno);
ret = errno;
}
if(!ret) {
options.watchdir = rwatchdir;
options.watchdirlen = strlen(options.watchdir);
options.watchdirsize = options.watchdirlen;
if(options.watchdirlen == 1) {
printf_e("Error: watchdir is supposed to be not \"/\".\n");
ret = EINVAL;
}
}
if(!ret) {
size_t size = options.watchdirlen + 2;
char *newwatchdir = xmalloc(size);
memcpy( newwatchdir, options.watchdir, options.watchdirlen);
options.watchdirwslash = newwatchdir;
options.watchdirwslashsize = size;
memcpy(&options.watchdirwslash[options.watchdirlen], "/", 2);
options.watchdir_dirlevel = fileutils_calcdirlevel(options.watchdirwslash);
}
}
if(options.destdir != NULL) {
size_t size = options.destdirlen + 2;
char *newdestdir = xmalloc(size);
memcpy( newdestdir, options.destdir, options.destdirlen);
if(options.destdir[options.destdirlen - 1] == '/') {
options.destdirsize = size;
options.destdirwslash = options.destdir;
options.destdir = newdestdir;
options.destdirlen--;
newdestdir[options.destdirlen] = 0x00;
} else {
char *rdestdir = realpath(options.destdir, NULL);
if(rdestdir == NULL) {
printf_e("Error: main(): Got error while realpath() on \"%s\": %s (errno: %i) [#1].\n", options.destdir, strerror(errno), errno);
ret = errno;
}
if(!ret) {
options.destdir = rdestdir;
options.destdirlen = strlen(options.destdir);
options.destdirsize = options.destdirlen;
if(options.destdirlen == 1) {
printf_e("Error: destdir is supposed to be not \"/\".\n");
ret = EINVAL;
}
}
if(!ret) {
size_t size = options.destdirlen + 2;
char *newdestdir = xmalloc(size);
memcpy( newdestdir, options.destdir, options.destdirlen);
options.destdirwslash = newdestdir;
options.destdirwslashsize = size;
memcpy(&options.destdirwslash[options.destdirlen], "/", 2);
}
}
printf_ddd("Debug3: %s [%s] (%p) -> %s [%s]\n", options.watchdir, options.watchdirwslash, options.watchdirwslash, options.destdir?options.destdir:"", options.destdirwslash?options.destdirwslash:"");
printf_d("Debug: %s [%s] (%p) -> %s [%s]\n", options.watchdir, options.watchdirwslash, options.watchdirwslash, options.destdir?options.destdir:"", options.destdirwslash?options.destdirwslash:"");
if(options.flags[RSYNC] && (options.listoutdir == NULL)) {
printf_e("Error: Option \"--rsync\" cannot be used without \"--outlistsdir\".\n");
... ... @@ -427,6 +474,8 @@ int main(int argc, char *argv[]) {
ret = errno;
}
#ifdef VERYPARANOID
struct stat64 stat64={0};
lstat64(options.watchdir, &stat64);
if((stat64.st_mode & S_IFMT) == S_IFLNK) {
// TODO: Fix the problem with symlinks as watch dir.
... ... @@ -438,6 +487,7 @@ int main(int argc, char *argv[]) {
printf_e("Error: Watch dir cannot be symlink, but \"%s\" is a symlink.\n", options.watchdir);
ret = EINVAL;
}
#endif
nret=main_rehash(&options);
if(nret)
... ...
... ... @@ -827,12 +827,6 @@ int sync_notify_mark(int notify_d, options_t *options_p, const char *accpath, co
return wd;
}
if(options_p->cluster_iface) {
int ret=cluster_modtime_update(path);
if(ret) printf_e("Error: sync_notify_mark() cannot cluster_modtime_update(): %s (errno %i)\n", strerror(ret), ret);
return ret;
}
switch(options_p->notifyengine) {
#ifdef FANOTIFY_SUPPORT
case NE_FANOTIFY: {
... ... @@ -875,6 +869,15 @@ int sync_notify_mark(int notify_d, options_t *options_p, const char *accpath, co
return wd;
}
static inline int sync_mark_walk_cluster_modtime_update(options_t *options_p, const char *path, short int dirlevel, mode_t st_mode) {
if(options_p->cluster_iface) {
int ret=cluster_modtime_update(path, dirlevel, st_mode);
if(ret) printf_e("Error: sync_mark_walk() cannot cluster_modtime_update(): %s (errno %i)\n", strerror(ret), ret);
return ret;
}
return 0;
}
int sync_mark_walk(int notify_d, options_t *options_p, const char *dirpath, indexes_t *indexes_p) {
const char *rootpaths[] = {dirpath, NULL};
FTS *tree;
... ... @@ -891,19 +894,26 @@ int sync_mark_walk(int notify_d, options_t *options_p, const char *dirpath, inde
FTSENT *node;
while((node = fts_read(tree))) {
int ret;
printf_dd("Debug3: walking: \"%s\" (depth %u): fts_info == %i\n", node->fts_path, node->fts_level, node->fts_info);
switch(node->fts_info) {
// Duplicates:
case FTS_DP:
continue;
case FTS_DEFAULT:
case FTS_SL:
case FTS_SLNONE:
case FTS_F:
case FTS_NSOK:
if((ret=sync_mark_walk_cluster_modtime_update(options_p, node->fts_path, node->fts_level, S_IFREG)))
return ret;
continue;
// To mark:
case FTS_D:
case FTS_DOT:
if((ret=sync_mark_walk_cluster_modtime_update(options_p, node->fts_path, node->fts_level, S_IFDIR)))
return ret;
break;
// Error cases:
case FTS_ERR:
... ...