redmine

Added dumping support

... ... @@ -17,6 +17,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __CLSYNC_CLSYNC_H
#define __CLSYNC_CLSYNC_H
#include <stdio.h>
#include <stdint.h>
#include <sys/types.h>
... ... @@ -89,3 +92,5 @@ extern int clsyncapi_getapiversion();
*/
extern pid_t clsyncapi_fork(struct ctx *ctx_p);
#endif
... ...
... ... @@ -182,6 +182,7 @@ typedef struct eventinfo eventinfo_t;
typedef int (*thread_callbackfunct_t)(ctx_t *ctx_p, char **argv);
struct threadinfo {
int thread_num;
uint32_t iteration;
thread_callbackfunct_t callback;
char **argv;
pthread_t pthread;
... ...
... ... @@ -71,3 +71,5 @@
#define CONFIG_PATHS { ".clsync.conf", "/etc/clsync/clsync.conf", NULL } /* "~/.clsync.conf" and "/etc/clsync/clsync.conf" */
#define API_PREFIX "clsyncapi_"
#define DUMP_DIRMODE 0700
... ...
... ... @@ -20,7 +20,14 @@
#include "common.h"
#include <sys/un.h> // for "struct sockaddr_un"
#include <sys/stat.h> // mkdir()
#include <sys/types.h> // mkdir()
#include <fcntl.h> // mkdirat()
#include <glib.h> // g_hash_table_foreach()
#include "indexes.h"
#include "ctx.h"
#include "error.h"
#include "sync.h"
#include "control.h"
... ... @@ -28,19 +35,195 @@
static pthread_t pthread_control;
static inline int control_error(clsyncsock_t *clsyncsock_p, const char *const funct, const char *const args) {
return socket_send(clsyncsock_p, SOCKCMD_REPLY_ECUSTOM, funct, args, errno, strerror(errno));
}
enum dump_dirfd_obj {
DUMP_DIRFD_ROOT = 0,
DUMP_DIRFD_QUEUE,
DUMP_DIRFD_THREAD,
DUMP_DIRFD_MAX
};
enum dump_ltype {
DUMP_LTYPE_INCLUDE,
DUMP_LTYPE_EXCLUDE,
DUMP_LTYPE_EVINFO,
};
struct control_dump_arg {
clsyncsock_t *clsyncsock_p;
ctx_t *ctx_p;
int dirfd[DUMP_DIRFD_MAX];
int fd_out;
int data;
};
void control_dump_liststep(gpointer fpath_gp, gpointer evinfo_gp, gpointer arg_gp) {
char *fpath = (char *)fpath_gp;
eventinfo_t *evinfo = (eventinfo_t *)evinfo_gp;
struct control_dump_arg *arg = arg_gp;
char act, num;
switch (arg->data) {
case DUMP_LTYPE_INCLUDE:
act = '+';
num = '1';
break;
case DUMP_LTYPE_EXCLUDE:
act = '-';
num = '1';
break;
case DUMP_LTYPE_EVINFO:
act = '+';
num = evinfo->flags&EVIF_RECURSIVELY ? '*' :
(evinfo->flags&EVIF_CONTENTRECURSIVELY ? '/' : '1');
break;
default:
act = '?';
num = '?';
}
dprintf(arg->fd_out, "%c%c %s\n", act, num, fpath);
return;
}
int control_dump_thread(threadinfo_t *threadinfo_p, void *_arg) {
struct control_dump_arg *arg = _arg;
char buf[BUFSIZ];
snprintf(buf, BUFSIZ, "%u-%u-%lx", threadinfo_p->iteration, threadinfo_p->thread_num, (long)threadinfo_p->pthread);
arg->fd_out = openat(arg->dirfd[DUMP_DIRFD_THREAD], buf, O_WRONLY);
if (arg->fd_out == -1)
return errno;
{
char **argv;
dprintf(arg->fd_out,
"thread:\n\titeration == %u;\n\tnum == %u;\n\tpthread == %lx;\n\tstarttime == %lu\n\texpiretime == %lu\n\tchild_pid == %u\n\ttry_n == %u\nCommand:",
threadinfo_p->iteration,
threadinfo_p->thread_num,
(long)threadinfo_p->pthread,
threadinfo_p->starttime,
threadinfo_p->expiretime,
threadinfo_p->child_pid,
threadinfo_p->try_n
);
argv = threadinfo_p->argv;
while (argv != NULL)
dprintf(arg->fd_out, " \"%s\"", *(argv++));
dprintf(arg->fd_out, "\n");
}
arg->data = DUMP_LTYPE_EVINFO;
g_hash_table_foreach(threadinfo_p->fpath2ei_ht, control_dump_liststep, arg);
close(arg->fd_out);
return 0;
}
int control_mkdir_open(clsyncsock_t *clsyncsock_p, const char *const dir_path) {
int dirfd;
if (mkdir(dir_path, DUMP_DIRMODE)) {
control_error(clsyncsock_p, "mkdir", dir_path);
return -1;
}
dirfd = open(dir_path, O_RDWR);
if (dirfd == -1) {
control_error(clsyncsock_p, "open", dir_path);
return -1;
}
return dirfd;
}
int control_dump(ctx_t *ctx_p, clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p) {
indexes_t *indexes_p = ctx_p->indexes_p;
sockcmd_dat_dump_t *dat = sockcmd_p->data;
int rootfd;
struct control_dump_arg arg;
enum dump_dirfd_obj dirfd_obj;
static const char *const subdirs[] = {
[DUMP_DIRFD_QUEUE] = "queue",
[DUMP_DIRFD_THREAD] = "threads"
};
rootfd = control_mkdir_open(clsyncsock_p, dat->dir_path);
if (rootfd == -1)
goto l_control_dump_end;
arg.dirfd[DUMP_DIRFD_ROOT] = rootfd;
dirfd_obj = DUMP_DIRFD_ROOT+1;
while (dirfd_obj < DUMP_DIRFD_MAX) {
const char *const subdir = subdirs[dirfd_obj];
arg.dirfd[dirfd_obj] = control_mkdir_open(clsyncsock_p, subdir);
if (arg.dirfd[dirfd_obj] == -1)
goto l_control_dump_end;
dirfd_obj++;
}
arg.clsyncsock_p = clsyncsock_p;
arg.ctx_p = ctx_p;
int queue_id = 0;
while (queue_id < QUEUE_MAX) {
char buf[BUFSIZ];
snprintf(buf, BUFSIZ, "%u", queue_id);
arg.fd_out = openat(arg.dirfd[DUMP_DIRFD_QUEUE], buf, O_WRONLY);
arg.data = DUMP_LTYPE_EVINFO;
g_hash_table_foreach(indexes_p->fpath2ei_coll_ht[queue_id], control_dump_liststep, &arg);
arg.data = DUMP_LTYPE_EXCLUDE;
g_hash_table_foreach(indexes_p->exc_fpath_coll_ht[queue_id], control_dump_liststep, &arg);
close(arg.fd_out);
queue_id++;
}
threads_foreach(control_dump_thread, STATE_RUNNING, &arg);
l_control_dump_end:
dirfd_obj = DUMP_DIRFD_ROOT;
while (dirfd_obj < DUMP_DIRFD_MAX) {
if (arg.dirfd[dirfd_obj] != -1)
close(arg.dirfd[dirfd_obj]);
dirfd_obj++;
}
return errno ? errno : socket_send(clsyncsock_p, SOCKCMD_REPLY_DUMP);
}
int control_procclsyncsock(socket_sockthreaddata_t *arg, sockcmd_t *sockcmd_p) {
clsyncsock_t *clsyncsock_p = arg->clsyncsock_p;
ctx_t *ctx_p = (ctx_t *)arg->arg;
clsyncsock_t *clsyncsock_p = arg->clsyncsock_p;
ctx_t *ctx_p = (ctx_t *)arg->arg;
switch(sockcmd_p->cmd_id) {
case SOCKCMD_REQUEST_INFO: {
case SOCKCMD_REQUEST_DUMP:
control_dump(ctx_p, clsyncsock_p, sockcmd_p);
break;
case SOCKCMD_REQUEST_INFO:
socket_send(clsyncsock_p, SOCKCMD_REPLY_INFO, ctx_p->config_block, ctx_p->label, ctx_p->flags, ctx_p->flags_set);
break;
}
case SOCKCMD_REQUEST_DIE: {
case SOCKCMD_REQUEST_DIE:
sync_term(SIGTERM);
break;
}
default:
return EINVAL;
}
... ...
... ... @@ -232,6 +232,7 @@ struct ctx {
sigset_t *sigset;
char isignoredexitcode[(1<<8)];
#endif
void *indexes_p;
};
typedef struct ctx ctx_t;
... ...
... ... @@ -17,6 +17,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __CLSYNC_INDEXES_H
#define __CLSYNC_INDEXES_H
#include <glib.h>
struct indexes {
... ... @@ -30,3 +33,5 @@ struct indexes {
};
typedef struct indexes indexes_t;
#endif
... ...
... ... @@ -58,6 +58,7 @@ static char *recv_ptrs[SOCKET_MAX];
const char *const textmessage_args[SOCKCMD_MAXID] = {
[SOCKCMD_REQUEST_NEGOTIATION] = "%u",
[SOCKCMD_REQUEST_DUMP] = "%s",
[SOCKCMD_REPLY_NEGOTIATION] = "%u",
[SOCKCMD_REPLY_ACK] = "%03u %lu",
[SOCKCMD_REPLY_EINVAL] = "%03u %lu",
... ... @@ -65,6 +66,9 @@ const char *const textmessage_args[SOCKCMD_MAXID] = {
[SOCKCMD_REPLY_INFO] = "%s\003/ %s\003/ %x %x",
[SOCKCMD_REPLY_UNKNOWNCMD] = "%03u %lu",
[SOCKCMD_REPLY_INVALIDCMDID] = "%lu",
[SOCKCMD_REPLY_EEXIST] = "%s\003/",
[SOCKCMD_REPLY_EPERM] = "%s\003/",
[SOCKCMD_REPLY_ECUSTOM] = "%s\003/ %s\003/ %u %s\003/",
};
const char *const textmessage_descr[SOCKCMD_MAXID] = {
... ... @@ -78,8 +82,12 @@ const char *const textmessage_descr[SOCKCMD_MAXID] = {
[SOCKCMD_REPLY_BYE] = "Bye.",
[SOCKCMD_REPLY_VERSION] = "clsync v%u.%u%s",
[SOCKCMD_REPLY_INFO] = "config_block == \"%s\"; label == \"%s\"; flags == %x; flags_set == %x.",
[SOCKCMD_REPLY_DUMP] = "Ready",
[SOCKCMD_REPLY_UNKNOWNCMD] = "Unknown command.",
[SOCKCMD_REPLY_INVALIDCMDID] = "Invalid command id. Required: 0 <= cmd_id < 1000.",
[SOCKCMD_REPLY_EEXIST] = "File exists: \"%s\".",
[SOCKCMD_REPLY_EPERM] = "Permission denied: \"%s\".",
[SOCKCMD_REPLY_ECUSTOM] = "%s(%s): Error #%u: \"%s\".",
};
int socket_check_bysock(int sock) {
... ... @@ -346,6 +354,9 @@ static inline int parse_text_data(sockcmd_t *sockcmd_p, char *args, size_t args_
case SOCKCMD_REPLY_NEGOTIATION:
PARSE_TEXT_DATA_SSCANF(sockcmd_dat_negotiation_t, &d->prot, &d->subprot);
break;
case SOCKCMD_REQUEST_DUMP:
PARSE_TEXT_DATA_SSCANF(sockcmd_dat_dump_t, &d->dir_path);
break;
case SOCKCMD_REPLY_ACK:
PARSE_TEXT_DATA_SSCANF(sockcmd_dat_ack_t, &d->cmd_id, &d->cmd_num);
break;
... ... @@ -368,10 +379,16 @@ static inline int parse_text_data(sockcmd_t *sockcmd_p, char *args, size_t args_
case SOCKCMD_REPLY_INVALIDCMDID:
PARSE_TEXT_DATA_SSCANF(sockcmd_dat_invalidcmd_t, &d->cmd_num);
break;
case SOCKCMD_REPLY_EEXIST:
PARSE_TEXT_DATA_SSCANF(sockcmd_dat_eexist_t, &d->file_path);
break;
case SOCKCMD_REPLY_EPERM:
PARSE_TEXT_DATA_SSCANF(sockcmd_dat_eperm_t, &d->descr);
break;
default:
sockcmd_p->data = xmalloc(args_len+1);
memcpy(sockcmd_p->data, args, args_len);
sockcmd_p->data[args_len] = 0;
((char *)sockcmd_p->data)[args_len] = 0;
break;
}
... ...
... ... @@ -78,18 +78,22 @@ enum sockcmd_id {
SOCKCMD_REPLY_UNKNOWNCMD = 160,
SOCKCMD_REPLY_INVALIDCMDID = 161,
SOCKCMD_REPLY_EINVAL = 162,
SOCKCMD_REPLY_EEXIST = 163,
SOCKCMD_REPLY_EPERM = 164,
SOCKCMD_REPLY_ECUSTOM = 199,
SOCKCMD_REQUEST_LOGIN = 200,
SOCKCMD_REQUEST_VERSION = 201,
SOCKCMD_REQUEST_INFO = 202,
SOCKCMD_REQUEST_DUMP = 203,
SOCKCMD_REQUEST_DIE = 210,
SOCKCMD_REQUEST_QUIT = 250,
SOCKCMD_REPLY_LOGIN = 300,
SOCKCMD_REPLY_VERSION = 301,
SOCKCMD_REPLY_INFO = 302,
SOCKCMD_REPLY_DUMP = 303,
SOCKCMD_REPLY_DIE = 310,
SOCKCMD_REPLY_UNEXPECTEDEND = 300,
SOCKCMD_REPLY_QUIT = 301,
SOCKCMD_REPLY_BYE = 350,
SOCKCMD_REPLY_UNEXPECTEDEND = 351,
SOCKCMD_MAXID
};
typedef enum sockcmd_id sockcmd_id_t;
... ... @@ -132,11 +136,26 @@ struct sockcmd_dat_info {
typedef struct sockcmd_dat_info sockcmd_dat_info_t;
#endif
struct sockcmd_dat_dump {
char dir_path[PATH_MAX];
};
typedef struct sockcmd_dat_dump sockcmd_dat_dump_t;
struct sockcmd_dat_eexist {
char file_path[PATH_MAX];
};
typedef struct sockcmd_dat_eexist sockcmd_dat_eexist_t;
struct sockcmd_dat_eperm {
char descr[BUFSIZ];
};
typedef struct sockcmd_dat_eperm sockcmd_dat_eperm_t;
struct sockcmd {
uint64_t cmd_num;
uint16_t cmd_id;
size_t data_len;
char *data;
void *data;
};
typedef struct sockcmd sockcmd_t;
... ...
... ... @@ -803,11 +803,12 @@ static inline int so_call_sync(ctx_t *ctx_p, indexes_t *indexes_p, int n, api_ev
threadinfo_p->try_n = 0;
threadinfo_p->callback = NULL;
threadinfo_p->argv = NULL;
threadinfo_p->ctx_p = ctx_p;
threadinfo_p->ctx_p = ctx_p;
threadinfo_p->starttime = time(NULL);
threadinfo_p->fpath2ei_ht = g_hash_table_dup(indexes_p->fpath2ei_ht, g_str_hash, g_str_equal, free, free, (gpointer(*)(gpointer))strdup, eidup);
threadinfo_p->n = n;
threadinfo_p->ei = ei;
threadinfo_p->iteration = ctx_p->iteration_num;
if (ctx_p->synctimeout)
threadinfo_p->expiretime = threadinfo_p->starttime + ctx_p->synctimeout;
... ... @@ -937,9 +938,10 @@ static inline int so_call_rsync(ctx_t *ctx_p, indexes_t *indexes_p, const char *
threadinfo_p->try_n = 0;
threadinfo_p->callback = NULL;
threadinfo_p->argv = xmalloc(sizeof(char *) * 3);
threadinfo_p->ctx_p = ctx_p;
threadinfo_p->ctx_p = ctx_p;
threadinfo_p->starttime = time(NULL);
threadinfo_p->fpath2ei_ht = g_hash_table_dup(indexes_p->fpath2ei_ht, g_str_hash, g_str_equal, free, free, (gpointer(*)(gpointer))strdup, eidup);
threadinfo_p->iteration = ctx_p->iteration_num;
threadinfo_p->argv[0] = strdup(inclistfile);
threadinfo_p->argv[1] = strdup(exclistfile);
... ... @@ -1184,9 +1186,10 @@ static inline int sync_exec_thread(ctx_t *ctx_p, indexes_t *indexes_p, thread_ca
threadinfo_p->try_n = 0;
threadinfo_p->callback = callback;
threadinfo_p->argv = argv;
threadinfo_p->ctx_p = ctx_p;
threadinfo_p->ctx_p = ctx_p;
threadinfo_p->starttime = time(NULL);
threadinfo_p->fpath2ei_ht = g_hash_table_dup(indexes_p->fpath2ei_ht, g_str_hash, g_str_equal, free, free, (gpointer(*)(gpointer))strdup, eidup);
threadinfo_p->iteration = ctx_p->iteration_num;
if (ctx_p->synctimeout)
threadinfo_p->expiretime = threadinfo_p->starttime + ctx_p->synctimeout;
... ... @@ -2678,14 +2681,14 @@ int sync_idle_dosync_collectedevents(ctx_t *ctx_p, indexes_t *indexes_p) {
g_hash_table_remove_all(indexes_p->fpath2ei_ht);
}
if (!ctx_p->flags[THREADING]) {
if(ctx_p->iteration_num < ~0) // ~0 is the max value for unsigned variables
ctx_p->iteration_num++;
if(ctx_p->iteration_num < ~0) // ~0 is the max value for unsigned variables
ctx_p->iteration_num++;
if (!ctx_p->flags[THREADING])
setenv_iteration(ctx_p->iteration_num);
debug(3, "next iteration: %u/%u",
ctx_p->iteration_num, ctx_p->flags[MAXITERATIONS]);
}
debug(3, "next iteration: %u/%u",
ctx_p->iteration_num, ctx_p->flags[MAXITERATIONS]);
return 0;
}
... ... @@ -3306,9 +3309,9 @@ int sync_run(ctx_t *ctx_p) {
ret = pthread_sigmask(SIG_BLOCK, &sigset_sighandler, NULL);
if (ret) return ret;
sighandler_arg.ctx_p = ctx_p;
// sighandler_arg.indexes_p = &indexes;
sighandler_arg.pthread_parent = pthread_self();
sighandler_arg.ctx_p = ctx_p;
// sighandler_arg.indexes_p = &indexes;
sighandler_arg.pthread_parent = pthread_self();
sighandler_arg.exitcode_p = &ret;
sighandler_arg.sigset_p = &sigset_sighandler;
ret = pthread_create(&pthread_sighandler, NULL, (void *(*)(void *))sync_sighandler, &sighandler_arg);
... ... @@ -3325,12 +3328,14 @@ int sync_run(ctx_t *ctx_p) {
// Creating hash tables
indexes_t indexes = {NULL};
indexes.wd2fpath_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0);
indexes.fpath2wd_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
indexes.fpath2ei_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
indexes.exc_fpath_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
indexes.out_lines_aggr_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
indexes_t indexes = {NULL};
ctx_p->indexes_p = &indexes;
indexes.wd2fpath_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0);
indexes.fpath2wd_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
indexes.fpath2ei_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
indexes.exc_fpath_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
indexes.out_lines_aggr_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
i=0;
while (i<QUEUE_MAX) {
switch (i) {
... ...
... ... @@ -19,4 +19,5 @@
extern int sync_run(struct ctx *ctx);
extern int sync_term(int exitcode);
extern int threads_foreach(int (*funct)(threadinfo_t *, void *), state_t state, void *arg);
... ...