redmine

cluster support continued

... ... @@ -2,22 +2,27 @@
I want to say thanks to next people:
1. Artyom A Anikeev <anikeev@ut.mephi.ru> 0xB5385841
1. Andrew A Savchenko <bircoph@ut.mephi.ru> 0x76B176E4
For preparing deb packages.
For adapting clsync for gentoo and for good programming advices :)
2. Andrew A Savchenko <bircoph@ut.mephi.ru> 0x76B176E4
2. Barak A Pearlmutter (https://github.com/barak)
For fixing my Makefile and good programming advices :)
For adapting clsync to autotools
3. oldlaptop (https://github.com/oldlaptop)
3. Artyom A Anikeev <anikeev@ut.mephi.ru> 0xB5385841
For preparing first deb packages.
4. oldlaptop (https://github.com/oldlaptop)
For fixing spelling and grammar.
4. Alexander M Gladtsin <amgladtsin@mephi.ru>
5. Alexander M Gladtsin <amgladtsin@mephi.ru>
For testing
... ...
Unfortunetaly, on L2 all messages are sending to all members of the cluster.
unicast: dst_node_id == node_id
broadcast: dst_node_id == NOID
session example of two nodes (A and B):
A appears
A -> getmyid (serial: 0; src: NOID ; dst: NOID; name: A)
A -> register (serial: 1; src: 0 ; dst: NOID; name: A)
B appers
B -> getmyid (serial: 0; src: NOID ; dst: NOID; name: B)
A -> setid (serial: 2; src: 0 ; dst: NOID; name: A; updatets: 100)
B -> register (serial: 1; src: 1 : dst: NOID; name: B)
A -> ack (serial: 3; src: 0 ; dst: 1; ack_serial: 1)
A -> lock (serial: 4: src: 0 ; dst: NOID; [paths list])
B -> ack (serial: 2; src: 1 ; dst: 0; ack_serial: 4)
A -> unlock_all (serial: 5: src: 0 ; dst: NOID)
B -> ack (serial: 3; src: 1 ; dst: 0; ack_serial: 5)
A disappers (shutdown)
A appears
A -> getmyid (serial: 0; src: NOID ; dst: NOID; name: A)
B -> setid (serial: 4; src: 1 ; dst: 0; name: A; updatets: 200)
A -> register (serial: 1; src: 0 ; dst: NOID; name: A)
B -> ack (serial: 5; src: 1 ; dst: 0; ack_serial: 1)
[...]
... ...
... ... @@ -332,6 +332,75 @@ int node_status_change(uint8_t node_id, uint8_t node_status) {
/**
* @brief Sends message to another nodes of the cluster.
*
* @param[in] clustercmd_p Command structure pointer.
*
* @retval zero Successfully send.
* @retval non-zero Got error, while sending.
*
*/
int cluster_send(clustercmd_t *clustercmd_p) {
clustercmd_p->h.src_node_id = node_id_my;
// CODE HERE
printf_ddd("Debug3: cluster_send(): Sending: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, crc32: %u, data_len: %u}\n",
clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id, clustercmd_p->h.crc32, clustercmd_p->h.data_len);
return 0;
}
/**
* @brief (syncop) Sends message to another nodes of the cluster and waits for ACK-answers. (with skipping all other packets)
*
* @param[in] clustercmd_p Command structure pointer.
*
* @retval zero Successfully send.
* @retval non-zero Got error, while sending.
*
* /
int cluster_send_ack(clustercmd_t *clustercmd_p) {
uint32_t cmd_serial = clustercmd_p->serial;
// Sending the message
int ret = cluster_send(clustercmd_p);
if(ret) {
printf_e("Error: cluster_send_ack(): Got error from cluster_send(): %s (errno %i).\n", strerror(ret), ret);
return ret;
}
// Waiting for ACK-messages from all registered nodes
{
clustercmd_t *clustercmd_p=NULL;
size_t size=0;
unsigned int timeout = cluster_timeout;
while((ret=cluster_recv(&clustercmd_p, &size, &timeout)) && (timeout>0)) {
// Skipping not ACK-messages.
CLUSTER_LOOP_EXPECTCMD(clustercmd_p, CLUSTERCMDID_ACK, ret);
// Is this an acknowledge packet for us? Skipping if not.
clustercmd_ack_t *data_ack_p = &clustercmd_p->data_ack;
if(clustercmd_p->h.dst_node_id != node_id_my)
continue;
// Is this acknowledge packet about the commend we sent? Skipping if not.
if(data_ack_p->serial != cmd_serial)
continue;
}
free(clustercmd_p);
}
return 0;
}
*/
/**
* @brief Sets message processing functions for cluster_recv_proc() function for specified command type
*
* @param[in] cmd_id The command type
... ... @@ -396,17 +465,28 @@ int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *tim
*clustercmd_pp = (clustercmd_t *)xmalloc(size);
}
// CODE HERE
// Getting clustercmd_p
clustercmd_t *clustercmd_p = *clustercmd_pp;
if(clustercmd_p->h.src_node_id == NODEID_NOID) {
// Packet from registering node
// CODE HERE
return 0;
} else
if(clustercmd_p->h.src_node_id >= MAXNODES) {
printf_e("Warning: cluster_recv(): Invalid h.src_node_id: %i >= "XTOSTR(MAXNODES)"\n", clustercmd_p->h.src_node_id);
return 0;
}
if(clustercmd_p->h.dst_node_id == NODEID_NOID) {
// Broadcast packet
} else
if(clustercmd_p->h.dst_node_id >= MAXNODES) {
printf_e("Warning: cluster_recv(): Invalid h.dst_node_id: %i >= "XTOSTR(MAXNODES)"\n", clustercmd_p->h.dst_node_id);
return 0;
}
// CODE HERE
printf_ddd("Debug3: cluster_recv(): Received: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, crc32: %u, data_len: %u}, timeout: %u -> %u\n",
... ... @@ -421,10 +501,15 @@ int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *tim
uint32_t crc32 = clustercmd_crc32_calc(clustercmd_p);
if(crc32 != clustercmd_p->h.crc32) {
// CLUSTER_ALLOCA(clustercmd_p, )
// CRC32 mismatch. Sending REJECT-packet.
clustercmd_t *clustercmd_rej_p = CLUSTER_ALLOCA(clustercmd_ackrej_t, 0);
clustercmd_rej_p->h.dst_node_id = clustercmd_p->h.src_node_id;
clustercmd_rej_p->data_ackrej.serial = clustercmd_p->h.serial;
printf_d("Debug: cluster_recv(): CRC32 mismatch: clustercmd_p->crc32 != clustercmd_crc32_calc(clustercmd_p): %p != %p.\n", (void*)(long)clustercmd_p->h.crc32, (void*)(long)crc32);
// cluster_send
cluster_send(clustercmd_rej_p);
}
// Paranoid routines
... ... @@ -506,7 +591,7 @@ static int cluster_recvproc_ack(clustercmd_t *clustercmd_p) {
waitack_p->h.ack_count++;
waitack_p->h.ack_from[node_id_from]++;
if(waitack_p->h.ack_count == node_count)
if(waitack_p->h.ack_count == node_count-1)
clustercmd_window_del(waitack_p);
}
... ... @@ -558,76 +643,6 @@ int cluster_recv_proc_deinit() {
return 0;
}
/**
* @brief Sends message to another nodes of the cluster.
*
* @param[in] clustercmd_p Command structure pointer.
*
* @retval zero Successfully send.
* @retval non-zero Got error, while sending.
*
*/
int cluster_send(clustercmd_t *clustercmd_p) {
clustercmd_p->h.src_node_id = node_id_my;
// CODE HERE
printf_ddd("Debug3: cluster_send(): Sending: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, crc32: %u, data_len: %u}\n",
clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id, clustercmd_p->h.crc32, clustercmd_p->h.data_len);
return 0;
}
/**
* @brief (syncop) Sends message to another nodes of the cluster and waits for ACK-answers. (with skipping all other packets)
*
* @param[in] clustercmd_p Command structure pointer.
*
* @retval zero Successfully send.
* @retval non-zero Got error, while sending.
*
* /
int cluster_send_ack(clustercmd_t *clustercmd_p) {
uint32_t cmd_serial = clustercmd_p->serial;
// Sending the message
int ret = cluster_send(clustercmd_p);
if(ret) {
printf_e("Error: cluster_send_ack(): Got error from cluster_send(): %s (errno %i).\n", strerror(ret), ret);
return ret;
}
// Waiting for ACK-messages from all registered nodes
{
clustercmd_t *clustercmd_p=NULL;
size_t size=0;
unsigned int timeout = cluster_timeout;
while((ret=cluster_recv(&clustercmd_p, &size, &timeout)) && (timeout>0)) {
// Skipping not ACK-messages.
CLUSTER_LOOP_EXPECTCMD(clustercmd_p, CLUSTERCMDID_ACK, ret);
// Is this an acknowledge packet for us? Skipping if not.
clustercmd_ack_t *data_ack_p = &clustercmd_p->data_ack;
if(clustercmd_p->h.dst_node_id != node_id_my)
continue;
// Is this acknowledge packet about the commend we sent? Skipping if not.
if(data_ack_p->serial != cmd_serial)
continue;
}
free(clustercmd_p);
}
return 0;
}
*/
/**
* @brief recvproc-function for setid-messages
*
... ... @@ -689,12 +704,18 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
return EALREADY;
}
// Initializing global variables, pt. 1
options_p = _options_p;
indexes_p = _indexes_p;
cluster_timeout = options_p->cluster_timeout * 1000;
// Initializing network routines
sock = socket(AF_INET, SOCK_DGRAM, 0);
int reuse = 1;
if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,(char *)&reuse, sizeof(reuse)) < 0) {
printf_e("Error: cluster_init(): Got error while setsockopt(): %s (errno: %i)\n", strerror(errno), errno);
printf_e("Error: cluster_init(): Got error while setsockopt(): %s (errno: %i)\n",
strerror(errno), errno);
return errno;
}
... ... @@ -705,11 +726,23 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
sa.sin_port = htons(options_p->cluster_mcastipport);
sa.sin_addr.s_addr = INADDR_ANY;
// Initializing global variables, pt. 1
options_p = _options_p;
indexes_p = _indexes_p;
if(bind(sock, (struct sockaddr*)&sa, sizeof(sa))) {
printf_e("Error: cluster_init(): Got error while bind(): %s (errno: %i)\n",
strerror(errno), errno);
return errno;
}
struct ip_mreq group;
group.imr_interface.s_addr = inet_addr(options_p->cluster_iface);
group.imr_multiaddr.s_addr = inet_addr(options_p->cluster_mcastipaddr);
if(setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(char *)&group, sizeof(group)) < 0) {
printf_e("Error: cluster_init(): Cannot setsockopt() to enter to membership %s -> %s\n",
options_p->cluster_iface, options_p->cluster_mcastipaddr);
return errno;
}
cluster_timeout = options_p->cluster_timeout * 1000;
// Initializing another routines
clustercmd_crc32_calc_init();
... ... @@ -831,6 +864,8 @@ int cluster_deinit() {
node_status_change(0, NODESTATUS_DOESNTEXIST);
}
close(sock);
#ifdef VERYPARANOID
memset(node_info, 0, sizeof(node_info));
node_count = 0;
... ...
... ... @@ -59,7 +59,8 @@ enum nodestatus {
NODESTATUS_DOESNTEXIST = 0,
NODESTATUS_OFFLINE,
NODESTATUS_SEEMSONLINE,
NODESTATUS_ONLINE
NODESTATUS_ONLINE,
NODESTATUS_BANNED
};
typedef enum nodestatus nodestatus_t;
... ... @@ -68,12 +69,21 @@ enum nodeid {
};
typedef enum nodeid nodeid_t;
struct packets_stats {
uint64_t tot;
uint64_t rej;
};
typedef struct packets_stats packets_stats_t;
struct nodeinfo {
uint8_t id;
uint8_t num;
nodestatus_t status;
uint32_t updatets;
GHashTable *modtime_ht;
uint8_t id;
uint8_t num;
nodestatus_t status;
uint32_t updatets;
GHashTable *modtime_ht;
packets_stats_t packets_in;
packets_stats_t packets_out;
uint32_t last_serial;
};
typedef struct nodeinfo nodeinfo_t;
... ... @@ -88,7 +98,7 @@ enum clustercmd_id {
typedef enum clustercmd_id clustercmd_id_t;
struct clustercmd_getmyid {
char node_name[1];
char node_name[1];
};
typedef struct clustercmd_getmyid clustercmd_getmyid_t;
... ... @@ -99,7 +109,7 @@ struct clustercmd_setiddata {
typedef struct clustercmd_setiddata clustercmd_setiddata_t;
struct clustercmd_register {
char node_name[1];
char node_name[1];
};
typedef struct clustercmd_register clustercmd_register_t;
... ... @@ -108,6 +118,11 @@ struct clustercmd_ack {
};
typedef struct clustercmd_ack clustercmd_ack_t;
struct clustercmd_ackrej {
uint32_t serial;
};
typedef struct clustercmd_ackrej clustercmd_ackrej_t;
struct clustercmdhdr {
uint8_t dst_node_id;
uint8_t src_node_id;
... ... @@ -126,6 +141,7 @@ struct clustercmd {
clustercmd_setiddata_t data_setid;
clustercmd_register_t data_register;
clustercmd_ack_t data_ack;
clustercmd_ackrej_t data_ackrej;
clustercmd_getmyid_t data_getmyid;
};
};
... ...
... ... @@ -2116,6 +2116,7 @@ int sync_run(options_t *options_p) {
ret = cluster_init(options_p, &indexes);
if(ret) {
printf_e("Error: Cannot initialize cluster subsystem: %s (errno %i).\n", strerror(ret), ret);
cluster_deinit();
return ret;
}
}
... ...