From e429b6c08d52c1c9bd6e706e1d965b51e038a3da Mon Sep 17 00:00:00 2001 From: Dmitry Yu Okunev Date: Mon, 5 May 2014 15:31:06 +0400 Subject: [PATCH] Added dumping support --- clsync.h | 5 +++++ common.h | 1 + configuration.h | 2 ++ control.c | 195 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------ ctx.h | 1 + indexes.h | 5 +++++ socket.c | 19 ++++++++++++++++++- socket.h | 25 ++++++++++++++++++++++--- sync.c | 41 +++++++++++++++++++++++------------------ sync.h | 1 + 10 files changed, 267 insertions(+), 28 deletions(-) diff --git a/clsync.h b/clsync.h index 95de249..d21a990 100644 --- a/clsync.h +++ b/clsync.h @@ -17,6 +17,9 @@ along with this program. If not, see . */ +#ifndef __CLSYNC_CLSYNC_H +#define __CLSYNC_CLSYNC_H + #include #include #include @@ -89,3 +92,5 @@ extern int clsyncapi_getapiversion(); */ extern pid_t clsyncapi_fork(struct ctx *ctx_p); +#endif + diff --git a/common.h b/common.h index 86e3ae0..aea3dbd 100644 --- a/common.h +++ b/common.h @@ -182,6 +182,7 @@ typedef struct eventinfo eventinfo_t; typedef int (*thread_callbackfunct_t)(ctx_t *ctx_p, char **argv); struct threadinfo { int thread_num; + uint32_t iteration; thread_callbackfunct_t callback; char **argv; pthread_t pthread; diff --git a/configuration.h b/configuration.h index 58abcec..4421916 100644 --- a/configuration.h +++ b/configuration.h @@ -71,3 +71,5 @@ #define CONFIG_PATHS { ".clsync.conf", "/etc/clsync/clsync.conf", NULL } /* "~/.clsync.conf" and "/etc/clsync/clsync.conf" */ #define API_PREFIX "clsyncapi_" + +#define DUMP_DIRMODE 0700 diff --git a/control.c b/control.c index 0f97512..e0a6985 100644 --- a/control.c +++ b/control.c @@ -20,7 +20,14 @@ #include "common.h" #include // for "struct sockaddr_un" +#include // mkdir() +#include // mkdir() +#include // mkdirat() +#include // g_hash_table_foreach() + +#include "indexes.h" +#include "ctx.h" #include "error.h" #include "sync.h" #include "control.h" @@ -28,19 +35,195 @@ static pthread_t pthread_control; + +static inline int control_error(clsyncsock_t *clsyncsock_p, const char *const funct, const char *const args) { + return socket_send(clsyncsock_p, SOCKCMD_REPLY_ECUSTOM, funct, args, errno, strerror(errno)); +} + +enum dump_dirfd_obj { + DUMP_DIRFD_ROOT = 0, + DUMP_DIRFD_QUEUE, + DUMP_DIRFD_THREAD, + + DUMP_DIRFD_MAX +}; + +enum dump_ltype { + DUMP_LTYPE_INCLUDE, + DUMP_LTYPE_EXCLUDE, + DUMP_LTYPE_EVINFO, +}; + +struct control_dump_arg { + clsyncsock_t *clsyncsock_p; + ctx_t *ctx_p; + int dirfd[DUMP_DIRFD_MAX]; + int fd_out; + int data; +}; + +void control_dump_liststep(gpointer fpath_gp, gpointer evinfo_gp, gpointer arg_gp) { + char *fpath = (char *)fpath_gp; + eventinfo_t *evinfo = (eventinfo_t *)evinfo_gp; + struct control_dump_arg *arg = arg_gp; + char act, num; + + switch (arg->data) { + case DUMP_LTYPE_INCLUDE: + act = '+'; + num = '1'; + break; + case DUMP_LTYPE_EXCLUDE: + act = '-'; + num = '1'; + break; + case DUMP_LTYPE_EVINFO: + act = '+'; + num = evinfo->flags&EVIF_RECURSIVELY ? '*' : + (evinfo->flags&EVIF_CONTENTRECURSIVELY ? '/' : '1'); + break; + default: + act = '?'; + num = '?'; + } + + dprintf(arg->fd_out, "%c%c %s\n", act, num, fpath); + + return; +} + +int control_dump_thread(threadinfo_t *threadinfo_p, void *_arg) { + struct control_dump_arg *arg = _arg; + char buf[BUFSIZ]; + + snprintf(buf, BUFSIZ, "%u-%u-%lx", threadinfo_p->iteration, threadinfo_p->thread_num, (long)threadinfo_p->pthread); + + arg->fd_out = openat(arg->dirfd[DUMP_DIRFD_THREAD], buf, O_WRONLY); + if (arg->fd_out == -1) + return errno; + + { + char **argv; + + dprintf(arg->fd_out, + "thread:\n\titeration == %u;\n\tnum == %u;\n\tpthread == %lx;\n\tstarttime == %lu\n\texpiretime == %lu\n\tchild_pid == %u\n\ttry_n == %u\nCommand:", + threadinfo_p->iteration, + threadinfo_p->thread_num, + (long)threadinfo_p->pthread, + threadinfo_p->starttime, + threadinfo_p->expiretime, + threadinfo_p->child_pid, + threadinfo_p->try_n + ); + + argv = threadinfo_p->argv; + while (argv != NULL) + dprintf(arg->fd_out, " \"%s\"", *(argv++)); + + dprintf(arg->fd_out, "\n"); + } + + arg->data = DUMP_LTYPE_EVINFO; + g_hash_table_foreach(threadinfo_p->fpath2ei_ht, control_dump_liststep, arg); + + close(arg->fd_out); + + return 0; +} + +int control_mkdir_open(clsyncsock_t *clsyncsock_p, const char *const dir_path) { + int dirfd; + + if (mkdir(dir_path, DUMP_DIRMODE)) { + control_error(clsyncsock_p, "mkdir", dir_path); + return -1; + } + + dirfd = open(dir_path, O_RDWR); + if (dirfd == -1) { + control_error(clsyncsock_p, "open", dir_path); + return -1; + } + + return dirfd; +} + +int control_dump(ctx_t *ctx_p, clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p) { + indexes_t *indexes_p = ctx_p->indexes_p; + sockcmd_dat_dump_t *dat = sockcmd_p->data; + int rootfd; + struct control_dump_arg arg; + enum dump_dirfd_obj dirfd_obj; + + static const char *const subdirs[] = { + [DUMP_DIRFD_QUEUE] = "queue", + [DUMP_DIRFD_THREAD] = "threads" + }; + + rootfd = control_mkdir_open(clsyncsock_p, dat->dir_path); + if (rootfd == -1) + goto l_control_dump_end; + + arg.dirfd[DUMP_DIRFD_ROOT] = rootfd; + + dirfd_obj = DUMP_DIRFD_ROOT+1; + while (dirfd_obj < DUMP_DIRFD_MAX) { + const char *const subdir = subdirs[dirfd_obj]; + + arg.dirfd[dirfd_obj] = control_mkdir_open(clsyncsock_p, subdir); + if (arg.dirfd[dirfd_obj] == -1) + goto l_control_dump_end; + + dirfd_obj++; + } + + arg.clsyncsock_p = clsyncsock_p; + arg.ctx_p = ctx_p; + + int queue_id = 0; + while (queue_id < QUEUE_MAX) { + char buf[BUFSIZ]; + snprintf(buf, BUFSIZ, "%u", queue_id); + + arg.fd_out = openat(arg.dirfd[DUMP_DIRFD_QUEUE], buf, O_WRONLY); + + arg.data = DUMP_LTYPE_EVINFO; + g_hash_table_foreach(indexes_p->fpath2ei_coll_ht[queue_id], control_dump_liststep, &arg); + arg.data = DUMP_LTYPE_EXCLUDE; + g_hash_table_foreach(indexes_p->exc_fpath_coll_ht[queue_id], control_dump_liststep, &arg); + + close(arg.fd_out); + queue_id++; + } + + threads_foreach(control_dump_thread, STATE_RUNNING, &arg); + +l_control_dump_end: + dirfd_obj = DUMP_DIRFD_ROOT; + while (dirfd_obj < DUMP_DIRFD_MAX) { + if (arg.dirfd[dirfd_obj] != -1) + close(arg.dirfd[dirfd_obj]); + dirfd_obj++; + } + + return errno ? errno : socket_send(clsyncsock_p, SOCKCMD_REPLY_DUMP); +} + + int control_procclsyncsock(socket_sockthreaddata_t *arg, sockcmd_t *sockcmd_p) { - clsyncsock_t *clsyncsock_p = arg->clsyncsock_p; - ctx_t *ctx_p = (ctx_t *)arg->arg; + clsyncsock_t *clsyncsock_p = arg->clsyncsock_p; + ctx_t *ctx_p = (ctx_t *)arg->arg; switch(sockcmd_p->cmd_id) { - case SOCKCMD_REQUEST_INFO: { + case SOCKCMD_REQUEST_DUMP: + control_dump(ctx_p, clsyncsock_p, sockcmd_p); + break; + case SOCKCMD_REQUEST_INFO: socket_send(clsyncsock_p, SOCKCMD_REPLY_INFO, ctx_p->config_block, ctx_p->label, ctx_p->flags, ctx_p->flags_set); break; - } - case SOCKCMD_REQUEST_DIE: { + case SOCKCMD_REQUEST_DIE: sync_term(SIGTERM); break; - } default: return EINVAL; } diff --git a/ctx.h b/ctx.h index ec42191..339eed6 100644 --- a/ctx.h +++ b/ctx.h @@ -232,6 +232,7 @@ struct ctx { sigset_t *sigset; char isignoredexitcode[(1<<8)]; #endif + void *indexes_p; }; typedef struct ctx ctx_t; diff --git a/indexes.h b/indexes.h index 6110b0a..046bfc2 100644 --- a/indexes.h +++ b/indexes.h @@ -17,6 +17,9 @@ along with this program. If not, see . */ +#ifndef __CLSYNC_INDEXES_H +#define __CLSYNC_INDEXES_H + #include struct indexes { @@ -30,3 +33,5 @@ struct indexes { }; typedef struct indexes indexes_t; +#endif + diff --git a/socket.c b/socket.c index d4c3085..0c7903f 100644 --- a/socket.c +++ b/socket.c @@ -58,6 +58,7 @@ static char *recv_ptrs[SOCKET_MAX]; const char *const textmessage_args[SOCKCMD_MAXID] = { [SOCKCMD_REQUEST_NEGOTIATION] = "%u", + [SOCKCMD_REQUEST_DUMP] = "%s", [SOCKCMD_REPLY_NEGOTIATION] = "%u", [SOCKCMD_REPLY_ACK] = "%03u %lu", [SOCKCMD_REPLY_EINVAL] = "%03u %lu", @@ -65,6 +66,9 @@ const char *const textmessage_args[SOCKCMD_MAXID] = { [SOCKCMD_REPLY_INFO] = "%s\003/ %s\003/ %x %x", [SOCKCMD_REPLY_UNKNOWNCMD] = "%03u %lu", [SOCKCMD_REPLY_INVALIDCMDID] = "%lu", + [SOCKCMD_REPLY_EEXIST] = "%s\003/", + [SOCKCMD_REPLY_EPERM] = "%s\003/", + [SOCKCMD_REPLY_ECUSTOM] = "%s\003/ %s\003/ %u %s\003/", }; const char *const textmessage_descr[SOCKCMD_MAXID] = { @@ -78,8 +82,12 @@ const char *const textmessage_descr[SOCKCMD_MAXID] = { [SOCKCMD_REPLY_BYE] = "Bye.", [SOCKCMD_REPLY_VERSION] = "clsync v%u.%u%s", [SOCKCMD_REPLY_INFO] = "config_block == \"%s\"; label == \"%s\"; flags == %x; flags_set == %x.", + [SOCKCMD_REPLY_DUMP] = "Ready", [SOCKCMD_REPLY_UNKNOWNCMD] = "Unknown command.", [SOCKCMD_REPLY_INVALIDCMDID] = "Invalid command id. Required: 0 <= cmd_id < 1000.", + [SOCKCMD_REPLY_EEXIST] = "File exists: \"%s\".", + [SOCKCMD_REPLY_EPERM] = "Permission denied: \"%s\".", + [SOCKCMD_REPLY_ECUSTOM] = "%s(%s): Error #%u: \"%s\".", }; int socket_check_bysock(int sock) { @@ -346,6 +354,9 @@ static inline int parse_text_data(sockcmd_t *sockcmd_p, char *args, size_t args_ case SOCKCMD_REPLY_NEGOTIATION: PARSE_TEXT_DATA_SSCANF(sockcmd_dat_negotiation_t, &d->prot, &d->subprot); break; + case SOCKCMD_REQUEST_DUMP: + PARSE_TEXT_DATA_SSCANF(sockcmd_dat_dump_t, &d->dir_path); + break; case SOCKCMD_REPLY_ACK: PARSE_TEXT_DATA_SSCANF(sockcmd_dat_ack_t, &d->cmd_id, &d->cmd_num); break; @@ -368,10 +379,16 @@ static inline int parse_text_data(sockcmd_t *sockcmd_p, char *args, size_t args_ case SOCKCMD_REPLY_INVALIDCMDID: PARSE_TEXT_DATA_SSCANF(sockcmd_dat_invalidcmd_t, &d->cmd_num); break; + case SOCKCMD_REPLY_EEXIST: + PARSE_TEXT_DATA_SSCANF(sockcmd_dat_eexist_t, &d->file_path); + break; + case SOCKCMD_REPLY_EPERM: + PARSE_TEXT_DATA_SSCANF(sockcmd_dat_eperm_t, &d->descr); + break; default: sockcmd_p->data = xmalloc(args_len+1); memcpy(sockcmd_p->data, args, args_len); - sockcmd_p->data[args_len] = 0; + ((char *)sockcmd_p->data)[args_len] = 0; break; } diff --git a/socket.h b/socket.h index e9205e4..75fc3f7 100644 --- a/socket.h +++ b/socket.h @@ -78,18 +78,22 @@ enum sockcmd_id { SOCKCMD_REPLY_UNKNOWNCMD = 160, SOCKCMD_REPLY_INVALIDCMDID = 161, SOCKCMD_REPLY_EINVAL = 162, + SOCKCMD_REPLY_EEXIST = 163, + SOCKCMD_REPLY_EPERM = 164, + SOCKCMD_REPLY_ECUSTOM = 199, SOCKCMD_REQUEST_LOGIN = 200, SOCKCMD_REQUEST_VERSION = 201, SOCKCMD_REQUEST_INFO = 202, + SOCKCMD_REQUEST_DUMP = 203, SOCKCMD_REQUEST_DIE = 210, SOCKCMD_REQUEST_QUIT = 250, SOCKCMD_REPLY_LOGIN = 300, SOCKCMD_REPLY_VERSION = 301, SOCKCMD_REPLY_INFO = 302, + SOCKCMD_REPLY_DUMP = 303, SOCKCMD_REPLY_DIE = 310, - SOCKCMD_REPLY_UNEXPECTEDEND = 300, - SOCKCMD_REPLY_QUIT = 301, SOCKCMD_REPLY_BYE = 350, + SOCKCMD_REPLY_UNEXPECTEDEND = 351, SOCKCMD_MAXID }; typedef enum sockcmd_id sockcmd_id_t; @@ -132,11 +136,26 @@ struct sockcmd_dat_info { typedef struct sockcmd_dat_info sockcmd_dat_info_t; #endif +struct sockcmd_dat_dump { + char dir_path[PATH_MAX]; +}; +typedef struct sockcmd_dat_dump sockcmd_dat_dump_t; + +struct sockcmd_dat_eexist { + char file_path[PATH_MAX]; +}; +typedef struct sockcmd_dat_eexist sockcmd_dat_eexist_t; + +struct sockcmd_dat_eperm { + char descr[BUFSIZ]; +}; +typedef struct sockcmd_dat_eperm sockcmd_dat_eperm_t; + struct sockcmd { uint64_t cmd_num; uint16_t cmd_id; size_t data_len; - char *data; + void *data; }; typedef struct sockcmd sockcmd_t; diff --git a/sync.c b/sync.c index 22d01cd..ae7ae05 100644 --- a/sync.c +++ b/sync.c @@ -803,11 +803,12 @@ static inline int so_call_sync(ctx_t *ctx_p, indexes_t *indexes_p, int n, api_ev threadinfo_p->try_n = 0; threadinfo_p->callback = NULL; threadinfo_p->argv = NULL; - threadinfo_p->ctx_p = ctx_p; + threadinfo_p->ctx_p = ctx_p; threadinfo_p->starttime = time(NULL); threadinfo_p->fpath2ei_ht = g_hash_table_dup(indexes_p->fpath2ei_ht, g_str_hash, g_str_equal, free, free, (gpointer(*)(gpointer))strdup, eidup); threadinfo_p->n = n; threadinfo_p->ei = ei; + threadinfo_p->iteration = ctx_p->iteration_num; if (ctx_p->synctimeout) threadinfo_p->expiretime = threadinfo_p->starttime + ctx_p->synctimeout; @@ -937,9 +938,10 @@ static inline int so_call_rsync(ctx_t *ctx_p, indexes_t *indexes_p, const char * threadinfo_p->try_n = 0; threadinfo_p->callback = NULL; threadinfo_p->argv = xmalloc(sizeof(char *) * 3); - threadinfo_p->ctx_p = ctx_p; + threadinfo_p->ctx_p = ctx_p; threadinfo_p->starttime = time(NULL); threadinfo_p->fpath2ei_ht = g_hash_table_dup(indexes_p->fpath2ei_ht, g_str_hash, g_str_equal, free, free, (gpointer(*)(gpointer))strdup, eidup); + threadinfo_p->iteration = ctx_p->iteration_num; threadinfo_p->argv[0] = strdup(inclistfile); threadinfo_p->argv[1] = strdup(exclistfile); @@ -1184,9 +1186,10 @@ static inline int sync_exec_thread(ctx_t *ctx_p, indexes_t *indexes_p, thread_ca threadinfo_p->try_n = 0; threadinfo_p->callback = callback; threadinfo_p->argv = argv; - threadinfo_p->ctx_p = ctx_p; + threadinfo_p->ctx_p = ctx_p; threadinfo_p->starttime = time(NULL); threadinfo_p->fpath2ei_ht = g_hash_table_dup(indexes_p->fpath2ei_ht, g_str_hash, g_str_equal, free, free, (gpointer(*)(gpointer))strdup, eidup); + threadinfo_p->iteration = ctx_p->iteration_num; if (ctx_p->synctimeout) threadinfo_p->expiretime = threadinfo_p->starttime + ctx_p->synctimeout; @@ -2678,14 +2681,14 @@ int sync_idle_dosync_collectedevents(ctx_t *ctx_p, indexes_t *indexes_p) { g_hash_table_remove_all(indexes_p->fpath2ei_ht); } - if (!ctx_p->flags[THREADING]) { - if(ctx_p->iteration_num < ~0) // ~0 is the max value for unsigned variables - ctx_p->iteration_num++; + if(ctx_p->iteration_num < ~0) // ~0 is the max value for unsigned variables + ctx_p->iteration_num++; + + if (!ctx_p->flags[THREADING]) setenv_iteration(ctx_p->iteration_num); - debug(3, "next iteration: %u/%u", - ctx_p->iteration_num, ctx_p->flags[MAXITERATIONS]); - } + debug(3, "next iteration: %u/%u", + ctx_p->iteration_num, ctx_p->flags[MAXITERATIONS]); return 0; } @@ -3306,9 +3309,9 @@ int sync_run(ctx_t *ctx_p) { ret = pthread_sigmask(SIG_BLOCK, &sigset_sighandler, NULL); if (ret) return ret; - sighandler_arg.ctx_p = ctx_p; -// sighandler_arg.indexes_p = &indexes; - sighandler_arg.pthread_parent = pthread_self(); + sighandler_arg.ctx_p = ctx_p; +// sighandler_arg.indexes_p = &indexes; + sighandler_arg.pthread_parent = pthread_self(); sighandler_arg.exitcode_p = &ret; sighandler_arg.sigset_p = &sigset_sighandler; ret = pthread_create(&pthread_sighandler, NULL, (void *(*)(void *))sync_sighandler, &sighandler_arg); @@ -3325,12 +3328,14 @@ int sync_run(ctx_t *ctx_p) { // Creating hash tables - indexes_t indexes = {NULL}; - indexes.wd2fpath_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0); - indexes.fpath2wd_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0); - indexes.fpath2ei_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); - indexes.exc_fpath_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0); - indexes.out_lines_aggr_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0); + indexes_t indexes = {NULL}; + ctx_p->indexes_p = &indexes; + + indexes.wd2fpath_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0); + indexes.fpath2wd_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0); + indexes.fpath2ei_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); + indexes.exc_fpath_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0); + indexes.out_lines_aggr_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0); i=0; while (i