redmine

Added untested "socket_send_cb()"

... ... @@ -104,6 +104,8 @@ filesz:1M\n\
#define SYSLOG_BUFSIZ (1<<16)
#define CLSYNCSOCK_WINDOW (1<<8)
#define DEFAULT_SYNCHANDLER_ARGS_SIMPLE "sync \%label\% \%EVENT-MASK\% \%INCLUDE-LIST\%"
#define DEFAULT_SYNCHANDLER_ARGS_DIRECT "\%INCLUDE-LIST\% \%destination-dir\%/"
#define DEFAULT_SYNCHANDLER_ARGS_SHELL_NR "synclist \%label\% \%INCLUDE-LIST-PATH\%"
... ...
... ... @@ -37,9 +37,9 @@
static pthread_t pthread_control;
static inline int control_error(clsyncsock_t *clsyncsock_p, const char *const funct, const char *const args) {
static inline int control_error(clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p, const char *const funct, const char *const args) {
debug(3, "%s(%s): %u: %s", funct, args, errno, strerror(errno));
return socket_send(clsyncsock_p, SOCKCMD_REPLY_ECUSTOM, funct, args, errno, strerror(errno));
return socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_ECUSTOM, funct, args, errno, strerror(errno));
}
... ... @@ -49,8 +49,8 @@ int control_dump(ctx_t *ctx_p, clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p)
debug(3, "%s", dat->dir_path);
return (sync_dump(ctx_p, dat->dir_path)) ?
control_error(clsyncsock_p, "sync_dump", dat->dir_path) :
socket_send(clsyncsock_p, SOCKCMD_REPLY_DUMP);
control_error(clsyncsock_p, sockcmd_p, "sync_dump", dat->dir_path) :
socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_DUMP);
}
int control_procclsyncsock(socket_sockthreaddata_t *arg, sockcmd_t *sockcmd_p) {
... ... @@ -63,16 +63,16 @@ int control_procclsyncsock(socket_sockthreaddata_t *arg, sockcmd_t *sockcmd_p) {
rc = control_dump(ctx_p, clsyncsock_p, sockcmd_p);
break;
case SOCKCMD_REQUEST_INFO:
rc = socket_send(clsyncsock_p, SOCKCMD_REPLY_INFO, ctx_p->config_block, ctx_p->label, ctx_p->flags, ctx_p->flags_set);
rc = socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_INFO, ctx_p->config_block, ctx_p->label, ctx_p->flags, ctx_p->flags_set);
break;
case SOCKCMD_REQUEST_SET: {
sockcmd_dat_set_t *dat = sockcmd_p->data;
rc = ctx_set(ctx_p, dat->key, dat->value);
if (rc) {
control_error(clsyncsock_p, "ctx_set", dat->key);
control_error(clsyncsock_p, sockcmd_p, "ctx_set", dat->key);
break;
}
rc = socket_send(clsyncsock_p, SOCKCMD_REPLY_SET);
rc = socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_SET);
break;
}
case SOCKCMD_REQUEST_DIE:
... ...
... ... @@ -267,16 +267,15 @@ clsyncsock_t *socket_connect_unix(const char *const socket_path) {
}
#endif
int socket_send(clsyncsock_t *clsyncsock, sockcmd_id_t cmd_id, ...) {
va_list ap;
int _socket_send(clsyncsock_t *clsyncsock, uint64_t *cmd_num_p, sockcmd_id_t cmd_id, va_list ap)
{
int ret;
va_start(ap, cmd_id);
char prebuf0[SOCKET_BUFSIZ], prebuf1[SOCKET_BUFSIZ], sendbuf[SOCKET_BUFSIZ];
ret = 0;
switch(clsyncsock->prot) {
switch (clsyncsock->prot) {
case 0:
switch(clsyncsock->subprot) {
case SUBPROT0_TEXT: {
... ... @@ -293,8 +292,14 @@ int socket_send(clsyncsock_t *clsyncsock, sockcmd_id_t cmd_id, ...) {
va_copy(ap_copy, ap);
vsprintf(prebuf1, textmessage_descr[cmd_id], ap);
size_t sendlen = sprintf(sendbuf, "%03u %s :%s\n", cmd_id, prebuf0, prebuf1);
size_t sendlen = sprintf(
sendbuf,
"%lu %03u %s :%s\n",
(*cmd_num_p)++,
cmd_id, prebuf0, prebuf1
);
debug(5, "send(): \"%s\"", sendbuf);
send(clsyncsock->sock, sendbuf, sendlen, 0);
break;
}
... ... @@ -313,10 +318,69 @@ int socket_send(clsyncsock_t *clsyncsock, sockcmd_id_t cmd_id, ...) {
}
l_socket_send_end:
return ret;
}
int socket_reply(clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p, sockcmd_id_t cmd_id, ...)
{
va_list ap;
int ret;
uint64_t cmd_num = sockcmd_p->cmd_num;
va_start(ap, cmd_id);
ret = _socket_send(clsyncsock_p, &cmd_num, cmd_id, ap);
va_end(ap);
return ret;
}
int socket_send(clsyncsock_t *clsyncsock_p, sockcmd_id_t cmd_id, ...)
{
va_list ap;
int ret;
va_start(ap, cmd_id);
ret = _socket_send(clsyncsock_p, &clsyncsock_p->cmd_num, cmd_id, ap);
va_end(ap);
return ret;
}
int socket_send_cb(clsyncsock_t *clsyncsock_p, sockcmd_id_t cmd_id, clsyncsock_cb_funct_t cb, void *cb_arg, ...)
{
if (clsyncsock_p->cbqueue_len >= CLSYNCSOCK_WINDOW) {
errno = EOVERFLOW;
error("Callback queue overflowed. Closing the socket.");
socket_close(clsyncsock_p);
return errno;
}
{
va_list ap;
int ret;
uint64_t cmd_num = clsyncsock_p->cmd_num;
va_start(ap, cb_arg);
ret = _socket_send(clsyncsock_p, &clsyncsock_p->cmd_num, cmd_id, ap);
va_end(ap);
if (!ret) {
clsynccbqueue_t *cbq = &clsyncsock_p->cbqueue[clsyncsock_p->cbqueue_len];
int id;
cbq->cmd_num = cmd_num;
cbq->callback_funct = cb;
cbq->callback_arg = cb_arg;
id = cmd_num % (2*CLSYNCSOCK_WINDOW);
while (clsyncsock_p->cbqueue_cache[id] != NULL) id++;
clsyncsock_p->cbqueue_cache[id] = cbq;
clsyncsock_p->cbqueue_len++;
}
return ret;
}
}
static inline int socket_overflow_fix(char *buf, char **data_start_p, char **data_end_p) {
debug(3, "buf==%p; data_start==%p; data_end==%p", buf, *data_start_p, *data_end_p);
if(buf == *data_start_p)
... ... @@ -464,7 +528,7 @@ int socket_recv(clsyncsock_t *clsyncsock, sockcmd_t *sockcmd_p) {
switch(clsyncsock->subprot) {
case SUBPROT0_TEXT:
if((end=strchr(ptr, '\n'))!=NULL) {
if(sscanf(start, "%03u", (unsigned int *)&sockcmd_p->cmd_id) != 1)
if(sscanf(start, "%lu %03u", &sockcmd_p->cmd_num, (unsigned int *)&sockcmd_p->cmd_id) != 1)
return ENOMSG;
char *str_args = &start[3+1];
... ... @@ -511,18 +575,17 @@ l_socket_recv_end:
recv_stps[clsyncsock_sock] = start;
recv_ptrs[clsyncsock_sock] = ptr;
debug(3, "sockcmd_p->cmd_id == %i; buf==%p; ptr==%p; end==%p, filled=%p, buf_end==%p",
sockcmd_p->cmd_id, buf, ptr, end, &buf[filled_length_new], &buf[SOCKET_BUFSIZ]);
debug(3, "sockcmd_p->cmd_num == %lu; sockcmd_p->cmd_id == %i; buf==%p; ptr==%p; end==%p, filled=%p, buf_end==%p",
sockcmd_p->cmd_num, sockcmd_p->cmd_id, buf, ptr, end, &buf[filled_length_new], &buf[SOCKET_BUFSIZ]);
sockcmd_p->cmd_num++;
return 0;
}
int socket_sendinvalid(clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p) {
if(sockcmd_p->cmd_id >= 1000)
return socket_send(clsyncsock_p, SOCKCMD_REPLY_INVALIDCMDID, sockcmd_p->cmd_num);
return socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_INVALIDCMDID, sockcmd_p->cmd_num);
else
return socket_send(clsyncsock_p, SOCKCMD_REPLY_UNKNOWNCMD, sockcmd_p->cmd_id, sockcmd_p->cmd_num);
return socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_UNKNOWNCMD, sockcmd_p->cmd_id, sockcmd_p->cmd_num);
}
int socket_procclsyncsock(socket_sockthreaddata_t *arg) {
... ... @@ -534,8 +597,6 @@ int socket_procclsyncsock(socket_sockthreaddata_t *arg) {
clsyncsock_procfunct_t procfunct = arg->procfunct;
//sockprocflags_t flags = arg->flags;
sockcmd_p->cmd_num = -1;
enum auth_flags {
AUTHFLAG_ENTERED_LOGIN = 0x01,
};
... ... @@ -547,51 +608,74 @@ int socket_procclsyncsock(socket_sockthreaddata_t *arg) {
arg->state = (arg->authtype == SOCKAUTH_NULL) ? CLSTATE_MAIN : CLSTATE_AUTH;
socket_send(clsyncsock_p, SOCKCMD_REQUEST_NEGOTIATION, clsyncsock_p->prot, clsyncsock_p->subprot);
while((arg->running && *arg->running) && (arg->state==CLSTATE_AUTH || arg->state==CLSTATE_MAIN)) {
while ((arg->running && *arg->running) && (arg->state==CLSTATE_AUTH || arg->state==CLSTATE_MAIN)) {
debug(3, "Iteration.");
// Receiving message
int ret;
if((ret = socket_recv(clsyncsock_p, sockcmd_p))) {
if ((ret = socket_recv(clsyncsock_p, sockcmd_p))) {
error("Got error while receiving a message from clsyncsock with sock %u: %s (errno: %u)",
arg->clsyncsock_p->sock);
break;
}
// Checking for a callback for this answer
{
uint64_t cmd_num = sockcmd_p->cmd_num;
int i;
i = cmd_num % (2*CLSYNCSOCK_WINDOW);
while (clsyncsock_p->cbqueue_cache[i] != NULL) {
if (clsyncsock_p->cbqueue_cache[i]->cmd_num == cmd_num) { // Found!
clsynccbqueue_t *cbq;
// Calling the callback function
cbq->callback_funct(arg, sockcmd_p, cbq->callback_arg);
// Removing from queue
cbq = clsyncsock_p->cbqueue_cache[i];
memcpy(cbq, &clsyncsock_p->cbqueue[--clsyncsock_p->cbqueue_len], sizeof(*cbq));
clsyncsock_p->cbqueue_cache[i] = NULL;
}
i++;
}
}
// Processing the message
if(procfunct(arg, sockcmd_p))
switch(sockcmd_p->cmd_id) {
if (procfunct(arg, sockcmd_p))
switch (sockcmd_p->cmd_id) {
case SOCKCMD_REPLY_NEGOTIATION:
case SOCKCMD_REQUEST_NEGOTIATION: {
sockcmd_dat_negotiation_t *data = (sockcmd_dat_negotiation_t *)sockcmd_p->data;
switch(data->prot) {
switch (data->prot) {
case 0:
switch(data->subprot) {
switch (data->subprot) {
case SUBPROT0_TEXT:
case SUBPROT0_BINARY:
clsyncsock_p->subprot = data->subprot;
if(sockcmd_p->cmd_id == SOCKCMD_REQUEST_NEGOTIATION)
socket_send(clsyncsock_p, SOCKCMD_REPLY_NEGOTIATION, data->prot, data->subprot);
if (sockcmd_p->cmd_id == SOCKCMD_REQUEST_NEGOTIATION)
socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_NEGOTIATION, data->prot, data->subprot);
else {
socket_send(clsyncsock_p, SOCKCMD_REPLY_ACK, sockcmd_p->cmd_id, sockcmd_p->cmd_num);
socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_ACK, sockcmd_p->cmd_id, sockcmd_p->cmd_num);
debug(1, "Negotiated proto: %u %u", data->prot, data->subprot);
}
break;
default:
socket_send(clsyncsock_p, SOCKCMD_REPLY_EINVAL, sockcmd_p->cmd_id, sockcmd_p->cmd_num, "Incorrect subprotocol id");
socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_EINVAL, sockcmd_p->cmd_id, sockcmd_p->cmd_num, "Incorrect subprotocol id");
}
break;
default:
socket_send(clsyncsock_p, SOCKCMD_REPLY_EINVAL, sockcmd_p->cmd_id, sockcmd_p->cmd_num, "Incorrect protocol id");
socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_EINVAL, sockcmd_p->cmd_id, sockcmd_p->cmd_num, "Incorrect protocol id");
}
break;
}
case SOCKCMD_REQUEST_VERSION: {
socket_send(clsyncsock_p, SOCKCMD_REPLY_VERSION, VERSION_MAJ, VERSION_MIN, REVISION);
socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_VERSION, VERSION_MAJ, VERSION_MIN, REVISION);
break;
}
case SOCKCMD_REQUEST_QUIT: {
socket_send(clsyncsock_p, SOCKCMD_REPLY_BYE);
socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_BYE);
arg->state = CLSTATE_DYING;
break;
}
... ... @@ -600,19 +684,19 @@ int socket_procclsyncsock(socket_sockthreaddata_t *arg) {
break;
}
if(sockcmd_p->data != NULL) {
if (sockcmd_p->data != NULL) {
free(sockcmd_p->data);
sockcmd_p->data = NULL;
}
// Check if the socket is still alive
if(socket_check(clsyncsock_p)) {
if (socket_check(clsyncsock_p)) {
debug(1, "clsyncsock socket error: %s", strerror(errno));
break;
}
// Sending prompt
switch(arg->state) {
switch (arg->state) {
case CLSTATE_AUTH:
if(!(auth_flags&AUTHFLAG_ENTERED_LOGIN))
socket_send(clsyncsock_p, SOCKCMD_REQUEST_LOGIN);
... ...
... ... @@ -49,10 +49,28 @@
# define SOCKET_MAX SOCKET_MAX_CLSYNC
#endif
struct socket_sockthreaddata;
struct sockcmd;
typedef int (*clsyncsock_cb_funct_t)(struct socket_sockthreaddata *thread, struct sockcmd *sockcmd_p, void *arg);
struct clsynccbqueue {
uint64_t cmd_num;
clsyncsock_cb_funct_t callback_funct;
void *callback_arg;
};
typedef struct clsynccbqueue clsynccbqueue_t;
struct clsyncsock {
int sock;
uint16_t prot;
uint16_t subprot;
uint64_t cmd_num;
size_t cbqueue_len;
clsynccbqueue_t cbqueue[CLSYNCSOCK_WINDOW+1];
clsynccbqueue_t *cbqueue_cache[4*CLSYNCSOCK_WINDOW+1]; // It's a hacky hash-table of size "CLSYNCSOCK_WINDOW*2"
};
typedef struct clsyncsock clsyncsock_t;
... ... @@ -201,7 +219,9 @@ struct socket_sockthreaddata {
};
typedef struct socket_sockthreaddata socket_sockthreaddata_t;
extern int socket_reply(clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p, sockcmd_id_t cmd_id, ...);
extern int socket_send(clsyncsock_t *clsyncsock, sockcmd_id_t cmd_id, ...);
extern int socket_send_cb(clsyncsock_t *clsyncsock_p, sockcmd_id_t cmd_id, clsyncsock_cb_funct_t cb, void *cb_arg, ...);
extern int socket_sendinvalid(clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p);
extern int socket_recv(clsyncsock_t *clsyncsock, sockcmd_t *sockcmd);
extern int socket_check_bysock(int sock);
... ...