redmine

cluster support continued

... ... @@ -71,7 +71,13 @@ uint32_t clustercmd_crc32_table[1<<8];
*
*/
static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *clustercmd_p) {
static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *clustercmd_p, GHashTable *serial2queuedpacket_ht) {
#ifdef PARANOID
if(clustercmd_p->h.src_node_id >= MAXNODES) {
printf_e("Error: clustercmd_window_add(): Invalid src_node_id: %i.\n", clustercmd_p->h.src_node_id);
return EINVAL;
}
#endif
// Checking if there enough window_p->cells allocated
if(window_p->packets_len >= window_p->size) {
... ... @@ -148,7 +154,7 @@ static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *cluste
queuedpacket_p->h.window_id = window_id;
// remembering new packet
g_hash_table_insert(window_p->serial2queuedpacket_ht, GINT_TO_POINTER(clustercmd_p->h.serial), queuedpacket_p);
g_hash_table_insert(serial2queuedpacket_ht, GINT_TO_POINTER(clustercmd_p->h.serial), queuedpacket_p);
window_p->packets_id[window_p->packets_len++] = window_id;
return 0;
... ... @@ -165,7 +171,7 @@ static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *cluste
*
*/
static inline int clustercmd_window_del(window_t *window_p, clustercmdqueuedpacket_t *queuedpacket_p) {
static inline int clustercmd_window_del(window_t *window_p, clustercmdqueuedpacket_t *queuedpacket_p, GHashTable *serial2queuedpacket_ht) {
#ifdef PARANOID
if(!window_p->size) {
printf_e("Error: clustercmd_window_del(): window not allocated.\n");
... ... @@ -192,7 +198,7 @@ static inline int clustercmd_window_del(window_t *window_p, clustercmdqueuedpack
}
// Removing from hash table
g_hash_table_remove(window_p->serial2queuedpacket_ht, GINT_TO_POINTER(queuedpacket_p->cmd.h.serial));
g_hash_table_remove(serial2queuedpacket_ht, GINT_TO_POINTER(queuedpacket_p->cmd.h.serial));
return 0;
}
... ... @@ -327,6 +333,8 @@ int node_status_change(uint8_t node_id, uint8_t node_status) {
node_ids[nodeinfo_p->num] = node_ids[node_count];
g_hash_table_destroy(nodeinfo_p->modtime_ht);
g_hash_table_destroy(nodeinfo_p->serial2queuedpacket_ht);
#ifdef VERYPARANOID
memset(nodeinfo_p, 0, sizeof(*nodeinfo_p));
#endif
... ... @@ -341,12 +349,15 @@ int node_status_change(uint8_t node_id, uint8_t node_status) {
case NODESTATUS_DOESNTEXIST:
nodeinfo_p->id = node_id;
nodeinfo_p->num = node_count;
nodeinfo_p->modtime_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
nodeinfo_p->serial2queuedpacket_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0);
node_ids[node_count] = node_id;
node_count++;
#ifdef PARANOID
if(node_status == NODESTATUS_OFFLINE)
break;
break; // In case of NODESTATUS_DOESNTEXIST -> NODESTATUS_OFFLINE, node_online should be increased
#endif
case NODESTATUS_OFFLINE:
node_online++;
... ... @@ -521,7 +532,9 @@ static inline int clustercmd_reject(clustercmd_t *clustercmd_p, uint8_t reason)
*
*/
int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *timeout_p) {
static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *timeout_p) {
static uint8_t last_src_node_id = NODEID_NOID;
static uint32_t last_serial = 0;
int timeout;
size_t size;
... ... @@ -561,6 +574,22 @@ int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *tim
// Getting pointer to space to place clustercmd
clustercmd_t *clustercmd_p = *clustercmd_pp;
// Checking if there message is waiting in the window
if(last_src_node_id != NODEID_NOID) {
nodeinfo_t *nodeinfo_p = &nodeinfo[last_src_node_id];
clustercmdqueuedpacket_t *clustercmdqueuedpacket_p = (clustercmdqueuedpacket_t *)
g_hash_table_lookup(nodeinfo_p->serial2queuedpacket_ht, GINT_TO_POINTER(last_serial+1));
if(clustercmdqueuedpacket_p != NULL) {
/*
if()
free()
size_p
*/
}
}
// Checking if there any event on read socket
// select()
struct timeval tv;
... ... @@ -629,6 +658,10 @@ int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *tim
printf_e("Warning: cluster_recv(): Got non getmyid packet from NOID node. Ignoring the packet.\n");
return 0;
}
if(clustercmd_p->h.serial != 0) {
printf_e("Warning: cluster_recv(): Got packet with non-zero serial from NOID node. Ignoring the packet.\n");
return 0;
}
} else
// Wrong src_node_id?
if(src_node_id >= MAXNODES) {
... ... @@ -682,12 +715,14 @@ int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *tim
// Is this misordered packet?
if(clustercmd_p->h.serial != nodeinfo_p->last_serial + 1) {
clustercmd_window_add(&window_i, clustercmd_p);
clustercmd_window_add(&window_i, clustercmd_p, nodeinfo_p->serial2queuedpacket_ht);
return 0;
}
// Is this the end of packet (packet without data)
if(clustercmd_p->h.data_len == 0) {
last_serial = clustercmd_p->h.serial;
last_src_node_id = clustercmd_p->h.src_node_id;
return 1;
}
... ... @@ -736,6 +771,8 @@ int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *tim
}
}
last_serial = clustercmd_p->h.serial;
last_src_node_id = clustercmd_p->h.src_node_id;
return 1;
}
... ... @@ -792,7 +829,7 @@ static int cluster_recvproc_ack(clustercmd_t *clustercmd_p) {
uint32_t cmd_serial_ack = clustercmd_p->data_ack.serial;
clustercmdqueuedpacket_t *queuedpacket_p =
(clustercmdqueuedpacket_t *)g_hash_table_lookup(window_o.serial2queuedpacket_ht, GINT_TO_POINTER(cmd_serial_ack));
(clustercmdqueuedpacket_t *)g_hash_table_lookup(nodeinfo_my->serial2queuedpacket_ht, GINT_TO_POINTER(cmd_serial_ack));
if(queuedpacket_p == NULL)
return 0;
... ... @@ -804,7 +841,7 @@ static int cluster_recvproc_ack(clustercmd_t *clustercmd_p) {
queuedpacket_p->h.o.ack_from[node_id_from]++;
if(queuedpacket_p->h.o.ack_count == node_count-1)
clustercmd_window_del(&window_o, queuedpacket_p);
clustercmd_window_del(&window_o, queuedpacket_p, nodeinfo_my->serial2queuedpacket_ht);
}
return 0;
... ... @@ -823,9 +860,6 @@ static int cluster_recvproc_ack(clustercmd_t *clustercmd_p) {
*/
int cluster_io_init() {
window_i.serial2queuedpacket_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0);
window_o.serial2queuedpacket_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0);
cluster_recv_proc_set(CLUSTERCMDID_ACK, cluster_recvproc_ack);
return 0;
}
... ... @@ -841,9 +875,6 @@ int cluster_io_init() {
int cluster_io_deinit() {
if(window_i.serial2queuedpacket_ht != NULL)
g_hash_table_destroy(window_i.serial2queuedpacket_ht);
if(window_i.buf_size) {
#ifdef PARANOID
if(window_i.buf == NULL) {
... ... @@ -853,9 +884,6 @@ int cluster_io_deinit() {
free(window_i.buf);
}
if(window_o.serial2queuedpacket_ht != NULL)
g_hash_table_destroy(window_o.serial2queuedpacket_ht);
if(window_o.buf_size) {
#ifdef PARANOID
if(window_o.buf == NULL) {
... ... @@ -1035,7 +1063,6 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
// Initializing global variables, pt. 2
nodeinfo_my = &nodeinfo[node_id_my];
nodeinfo_my->modtime_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
// Running thread, that will process background communicating routines with another nodes.
// The process is based on function cluster_loop() [let's use shorthand "cluster_loop()-thread"]
... ...
... ... @@ -98,6 +98,7 @@ struct nodeinfo {
nodestatus_t status;
uint32_t updatets;
GHashTable *modtime_ht;
GHashTable *serial2queuedpacket_ht;
packets_stats_t packets_in;
packets_stats_t packets_out;
uint32_t last_serial;
... ... @@ -214,7 +215,6 @@ struct window {
unsigned int packets_len; // Count of packets (are waiting for ACK-s)
unsigned int *packets_id; // Array of cells' id-s with packets
window_occupied_sides_t *occupied_sides; // Array of structures with coordinates in buffer of occupied space by cell ida (aka window_id)
GHashTable *serial2queuedpacket_ht;// Hash-table: clustercmd.h.serial -> clustercmdqueuedpacket
size_t buf_size; // Allocated space of the buffer
char *buf; // Pointer to the buffer
};
... ...
... ... @@ -81,7 +81,7 @@ ip a s eth0 | awk '{if($1=="inet") {gsub("/.*", "", $2); print $2}}'
.RE
.PP
.B \-c, \-\-cluster\-ip
.B \-m, \-\-cluster\-ip
.I multicast\-ip
.RS 8
Not implemented yet!
... ... @@ -98,7 +98,7 @@ Default value is "227.108.115.121". [(128+"c")."l"."s"."y"]
.RE
.PP
.B \-c, \-\-cluster\-port
.B \-P, \-\-cluster\-port
.I multicast\-port
.RS 8
Not implemented yet!
... ... @@ -116,7 +116,7 @@ Default value is "40079". [("n" << 8) + "c"]
.RE
.PP
.B \-c, \-\-cluster\-timeout
.B \-W, \-\-cluster\-timeout
.I cluster\-timeout
.RS 8
Not implemented yet!
... ... @@ -128,7 +128,7 @@ Default value is "1000". [1 second]
.RE
.PP
.B \-c, \-\-cluster\-node\-name
.B \-n, \-\-cluster\-node\-name
.I cluster\-node\-name
.RS 8
Not implemented yet!
... ...