redmine

cluster.c: fixed segfault in clustercmd_window_add(); continued ht-exchange

... ... @@ -142,6 +142,12 @@ static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *cluste
}
printf_ddd("Debug3: clustercmd_window_add(): f_left == %u; f_right == %u; b_coord == %u; w.buf_size == %u",
free_left, free_right, buf_coordinate, window_p->buf_size);
} else {
buf_coordinate = 0;
if(window_p->buf_size <= required_space) {
window_p->buf_size += MAX(CLUSTER_WINDOW_BUFSIZE_PORTION, required_space);
window_p->buf = xrealloc(window_p->buf, window_p->buf_size);
}
}
unsigned int window_id;
... ... @@ -156,6 +162,7 @@ static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *cluste
// placing information into buffer
clustercmdqueuedpacket_t *queuedpacket_p;
printf_ddd("Debug3: clustercmd_window_add(): b_coord == %u\n", buf_coordinate);
queuedpacket_p = (clustercmdqueuedpacket_t *)&window_p->buf[buf_coordinate];
memset(&queuedpacket_p->h, 0, sizeof(queuedpacket_p->h));
... ... @@ -744,7 +751,7 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, unsigned int *timeout_p) {
*/
int cluster_recv_proc(unsigned int _timeout) {
printf_ddd("Debu3: cluster_recv_proc(%i)\n", _timeout);
printf_ddd("Debug3: cluster_recv_proc(%i)\n", _timeout);
clustercmd_t *clustercmd_p;
int ret;
unsigned int timeout = _timeout;
... ... @@ -1060,6 +1067,7 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
memcpy(data_reg_p->node_name, options_p->cluster_nodename, options_p->cluster_nodename_len+1);
clustercmd_p->h.data_len = options_p->cluster_nodename_len+1;
clustercmd_p->h.cmd_id = CLUSTERCMDID_REG;
clustercmd_p->h.dst_node_id = NODEID_NOID; // broadcast
if((ret=cluster_send(clustercmd_p)))
... ... @@ -1229,12 +1237,27 @@ int cluster_loop() {
// Starting the loop
while(1) {
// Waiting for event
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(sock_i, &rfds);
printf_ddd("Debug3: cluster_loop(): select()\n");
ret = select(sock_i+1, &rfds, NULL, NULL, NULL);
// Exit if error
if((ret == -1) && (errno != EINTR)) {
sync_term(ret);
return ret;
}
// Breaking the loop, if there's SIGTERM signal for this thread
printf_ddd("Debug3: cluster_loop(): sigpending()\n");
if(sigpending(&sigset_cluster))
if(sigismember(&sigset_cluster, SIGTERM))
break;
// Processing new messages
printf_ddd("Debug3: cluster_loop(): cluster_recv_proc()\n");
if((ret=cluster_recv_proc(0))) {
sync_term(ret);
return ret;
... ... @@ -1348,7 +1371,7 @@ void cluster_modtime_exchange_pushentry(gpointer dir_gp, gpointer modtype_gp, vo
struct pushdoubleentry_arg *pushentry_arg_p = (struct pushdoubleentry_arg *)pushentry_arg_gp;
char *dir = (char *)dir_gp;
time_t ctime = (time_t)GPOINTER_TO_INT(modtype_gp);
size_t size = strlen(dir); // TODO: strlen should be already prepared
size_t size = strlen(dir)+1; // TODO: strlen should be already prepared
// but not re-calculated here
if(pushentry_arg_p->allocated <= pushentry_arg_p->total) {
... ... @@ -1414,15 +1437,16 @@ int cluster_modtime_exchange_cleanup() {
int cluster_modtime_exchange() {
struct pushdoubleentry_arg *pushentry_arg_p = &cluster_modtime_exchange_pushentry_arg;
// Getting hash table entries
pushentry_arg_p->size=0;
pushentry_arg_p->total=0;
g_hash_table_foreach(nodeinfo_my->modtime_ht, cluster_modtime_exchange_pushentry, (void *)pushentry_arg_p);
if(!pushentry_arg_p->total) {
// !!!
}
// Calculating required RAM to compile clustercmd
size_t toalloc = 0;
int i = 0;
while(i < pushentry_arg_p->total) {
... ... @@ -1431,7 +1455,42 @@ int cluster_modtime_exchange() {
toalloc += pushentry_arg_p->entry[i].size1; // for ctime
}
// Allocating space for the clustercmd
clustercmd_t *clustercmd_p = (clustercmd_t *)xmalloc(sizeof(clustercmdhdr_t) + toalloc);
memset(clustercmd_p, 0, sizeof(clustercmdhdr_t));
// Setting up clustercmd
clustercmd_p->h.dst_node_id = NODEID_NOID;
clustercmd_p->h.cmd_id = CLUSTERCMDID_HT_EXCH;
clustercmd_p->h.data_len = toalloc;
// Filing clustercmd with hash-table entriyes
i = 0;
clustercmd_ht_exch_t *clustercmd_ht_exch_p = &clustercmd_p->data.ht_exch;
while(i < pushentry_arg_p->total) {
// Setting the data
clustercmd_ht_exch_p->ctime = (time_t)pushentry_arg_p->entry[i].dat1;
clustercmd_ht_exch_p->path_length = (time_t)pushentry_arg_p->entry[i].size0;
memcpy(
clustercmd_ht_exch_p->path,
pushentry_arg_p->entry[i].dat0,
clustercmd_ht_exch_p->path_length
);
// Pointing to space for next entry:
size_t offset = sizeof(clustercmd_ht_exch_t)-1+pushentry_arg_p->entry[i].size0;
clustercmd_ht_exch_p = (clustercmd_ht_exch_t *)
(&((char *) clustercmd_ht_exch_p)[offset] );
}
// Sending
cluster_send(clustercmd_p);
// Cleanup
free(clustercmd_p);
return 0;
}
... ...
... ... @@ -121,6 +121,7 @@ enum clustercmd_id {
CLUSTERCMDID_REG = 2,
CLUSTERCMDID_GETMYID = 3,
CLUSTERCMDID_SETID = 4,
CLUSTERCMDID_HT_EXCH = 5,
COUNT_CLUSTERCMDID
};
typedef enum clustercmd_id clustercmd_id_t;
... ... @@ -158,6 +159,13 @@ struct clustercmd_rej {
};
typedef struct clustercmd_rej clustercmd_rej_t;
struct clustercmd_ht_exch {
time_t ctime;
size_t path_length;
char path[1];
};
typedef struct clustercmd_ht_exch clustercmd_ht_exch_t;
struct clustercmdadler32 {
uint32_t hdr;
uint32_t dat;
... ... @@ -185,6 +193,7 @@ struct clustercmd {
clustercmd_ack_t ack;
clustercmd_rej_t rej;
clustercmd_getmyid_t getmyid;
clustercmd_ht_exch_t ht_exch;
} data;
};
typedef struct clustercmd clustercmd_t;
... ...