redmine

cluster support continued

... ... @@ -55,10 +55,24 @@ uint8_t node_online = 0;
/**
* @brief Calculates CRC32 for clustercmd
*
* @param[in] clustercmd_p Pointer to clustercmd
*
* @retval uint32_t CRC32 value of clustecmd
*
*/
int clustercmd_crc32_calc(clustercmd_t *clustercmd_p) {
return 0;
}
/**
* @brief Changes information about node's status in nodeinfo[] and updates connected information.
*
* @param[out] node_id node_id of the node.
* @param[out] node_status New node status.
* @param[in] node_id node_id of the node.
* @param[in] node_status New node status.
*
* @retval zero Successful
* @retval non-zero If got error while changing the status. The error-code is placed into returned value.
... ... @@ -146,13 +160,16 @@ int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *tim
// CODE HERE
if(clustercmd_p->node_id >= MAXNODES) {
printf_e("Warning: cluster_recv(): Invalid node_id: %i >= "XTOSTR(MAXNODES)"\n", clustercmd_p->node_id);
if(clustercmd_p->src_node_id >= MAXNODES) {
printf_e("Warning: cluster_recv(): Invalid src_node_id: %i >= "XTOSTR(MAXNODES)"\n", clustercmd_p->src_node_id);
}
if(clustercmd_p->dst_node_id >= MAXNODES) {
printf_e("Warning: cluster_recv(): Invalid dst_node_id: %i >= "XTOSTR(MAXNODES)"\n", clustercmd_p->dst_node_id);
}
printf_ddd("Debug3: cluster_recv(): Received: {crc32: %u, node_id: %u, cmd_id: %u, data_len: %u, data_ptr: %p}, timeout: %u -> %u\n",
clustercmd_p->crc32, clustercmd_p->node_id, clustercmd_p->cmd_id, clustercmd_p->data_len, clustercmd_p->data_p, *timeout_p, timeout);
printf_ddd("Debug3: cluster_recv(): Received: {dst_node_id: %u, src_node_id: %u, cmd_id: %u, crc32: %u, data_len: %u}, timeout: %u -> %u\n",
clustercmd_p->dst_node_id, clustercmd_p->src_node_id, clustercmd_p->cmd_id, clustercmd_p->crc32, clustercmd_p->data_len, *timeout_p, timeout);
// Setting the timeout
if(timeout_p != NULL)
... ... @@ -161,14 +178,23 @@ int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *tim
// Setting the size
*size_p = size;
uint32_t crc32 = clustercmd_crc32_calc(clustercmd_p);
if(crc32 != clustercmd_p->crc32) {
// CLUSTER_ALLOCA(clustercmd_p, )
printf_d("Debug: cluster_recv(): CRC32 mismatch: clustercmd_p->crc32 != clustercmd_crc32_calc(clustercmd_p): %p != %p.\n", (void*)(long)clustercmd_p->crc32, (void*)(long)crc32);
// cluster_send
}
// Paranoid routines
// The message from us? Something wrong if it is.
#ifdef PARANOID
if(clustercmd_p->node_id == node_id_my) {
if(clustercmd_p->src_node_id == node_id_my) {
#ifdef VERYPARANOID
printf_e("Error: cluster_recv(): clustercmd_p->node_id != node_id_my (%i != %i). Exit.\n", clustercmd_p->node_id, node_id_my);
printf_e("Error: cluster_recv(): clustercmd_p->src_node_id == node_id_my (%i != %i). Exit.\n", clustercmd_p->src_node_id, node_id_my);
return EINVAL;
#else
printf_e("Warning: cluster_recv(): clustercmd_p->node_id != node_id_my (%i != %i). Ignoring the command.\n", clustercmd_p->node_id, node_id_my);
printf_e("Warning: cluster_recv(): clustercmd_p->src_node_id == node_id_my (%i != %i). Ignoring the command.\n", clustercmd_p->src_node_id, node_id_my);
clustercmd_p = NULL;
return 0;
#endif
... ... @@ -189,26 +215,12 @@ int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *tim
*/
int cluster_send(clustercmd_t *clustercmd_p) {
// Paranoid routines
#ifdef PARANOID
if(clustercmd_p->cmd_id != CLUSTERCMDID_PING) {
if(clustercmd_p->node_id != node_id_my) {
#ifdef VERYPARANOID
printf_e("Error: cluster_send(): clustercmd_p->node_id != node_id_my (%i != %i). Exit.\n", clustercmd_p->node_id, node_id_my);
return EINVAL;
#else
printf_e("Warning: cluster_send(): clustercmd_p->node_id != node_id_my (%i != %i). Correcting.\n", clustercmd_p->node_id, node_id_my);
clustercmd_p->node_id = node_id_my;
#endif
}
}
#endif
clustercmd_p->src_node_id = node_id_my;
// CODE HERE
printf_ddd("Debug3: cluster_send(): Sending: {crc32: %u, node_id: %u, cmd_id: %u, data_len: %u, data_ptr: %p}\n",
clustercmd_p->crc32, clustercmd_p->node_id, clustercmd_p->cmd_id, clustercmd_p->data_len, clustercmd_p->data_p);
printf_ddd("Debug3: cluster_send(): Sending: {dst_node_id: %u, src_node_id: %u, cmd_id: %u, crc32: %u, data_len: %u}\n",
clustercmd_p->dst_node_id, clustercmd_p->src_node_id, clustercmd_p->cmd_id, clustercmd_p->crc32, clustercmd_p->data_len);
return 0;
}
... ... @@ -240,16 +252,16 @@ int cluster_send_ack(clustercmd_t *clustercmd_p) {
size_t size=0;
unsigned int timeout = cluster_timeout;
while((ret=cluster_recv(&clustercmd_p, &size, &timeout)) && (timeout>0)) {
// Skipping not ACK-message.
// Skipping not ACK-messages.
CLUSTER_LOOP_EXPECTCMD(clustercmd_p, CLUSTERCMDID_ACK, ret);
// Is this an acknowledge packet for us? Skipping if not.
clustercmd_ack_t *ackdata = (clustercmd_ack_t *)clustercmd_p->data_p;
if(ackdata->node_id != node_id_my)
clustercmd_ack_t *data_ack_p = &clustercmd_p->data_ack;
if(clustercmd_p->dst_node_id != node_id_my)
continue;
// Is this acknowledge packet about the commend we sent? Skipping if not.
if(ackdata->serial != cmd_serial)
if(data_ack_p->serial != cmd_serial)
continue;
... ... @@ -309,10 +321,10 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
// Trying to preserve my node_id after restart. :)
// Asking another nodes about my previous node_id
{
CLUSTER_ALLOCA(clustercmd_p, void, data_p, options_p->cluster_nodename_len);
memcpy(clustercmd_p->data_p, options_p->cluster_nodename, clustercmd_p->data_len+1);
clustercmd_t *clustercmd_p = CLUSTER_ALLOCA(clustercmd_getmyid_t, options_p->cluster_nodename_len);
memcpy(clustercmd_p->data_getmyid.node_name, options_p->cluster_nodename, clustercmd_p->data_len+1);
clustercmd_p->cmd_id = CLUSTERCMDID_GETMYID;
clustercmd_p->cmd_id = CLUSTERCMDID_GETMYID;
cluster_send(clustercmd_p);
}
... ... @@ -327,8 +339,8 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
CLUSTER_LOOP_EXPECTCMD(clustercmd_p, CLUSTERCMDID_SETID, ret);
// Is this the most recent information? Skipping if not.
clustercmd_setiddata_t *setiddata = (clustercmd_setiddata_t *)clustercmd_p->data_p;
if(!(setiddata->updatets > updatets))
clustercmd_setiddata_t *data_setid_p = &clustercmd_p->data_setid;
if(!(data_setid_p->updatets > updatets))
continue;
// Is the node name length in message equals to our node name length? Skipping if not.
... ... @@ -338,15 +350,15 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
continue;
// Is the node name equals to ours? Skipping if not.
if(memcmp(setiddata->node_name, options_p->cluster_nodename, recv_nodename_len))
if(memcmp(data_setid_p->node_name, options_p->cluster_nodename, recv_nodename_len))
continue;
// Remembering the node that answered us
node_changestatus(clustercmd_p->node_id, NODESTATUS_SEEMSONLINE);
node_changestatus(clustercmd_p->src_node_id, NODESTATUS_SEEMSONLINE);
// Seems, that somebody knows our node id, remembering it.
node_id_my = setiddata->node_id;
updatets = setiddata->updatets;
node_id_my = clustercmd_p->dst_node_id;
updatets = data_setid_p->updatets;
}
free(clustercmd_p);
}
... ... @@ -375,10 +387,10 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
// Registering in the cluster
node_changestatus(node_id_my, NODESTATUS_SEEMSONLINE);
{
clustercmd_t *clustercmd_p = CLUSTER_ALLOCA(clustercmd_register_t, options_p->cluster_nodename_len);
clustercmd_register_t *data_register_p = &clustercmd_p->data_register;
CLUSTER_ALLOCA(clustercmd_p, clustercmd_register_t, registerdata_p, options_p->cluster_nodename_len);
memcpy(registerdata_p->node_name, options_p->cluster_nodename, options_p->cluster_nodename_len+1);
memcpy(data_register_p->node_name, options_p->cluster_nodename, options_p->cluster_nodename_len+1);
clustercmd_p->cmd_id = CLUSTERCMDID_REGISTER;
cluster_send_ack(clustercmd_p);
... ...
... ... @@ -41,17 +41,14 @@
#define CLUSTER_REQMEM(data_type, restdata_len) \
(sizeof(clustercmd_t)-1 + sizeof(data_type)-1 + (restdata_len) + 2)
#define CLUSTER_ALLOC(clustercmd_p, data_type, t_data_p, restdata_len, alloc_funct)\
clustercmd_t *clustercmd_p = (clustercmd_t *)(alloc_funct)(CLUSTER_REQMEM(data_type, restdata_len));\
PARANOIDV(memset(clustercmd_p, 0, CLUSTER_REQMEM(data_type, restdata_len)));\
data_type *t_data_p = (data_type *)clustercmd_p->data_p;\
(void)t_data_p; /* anti-warning */
#define CLUSTER_ALLOC(data_type, restdata_len, alloc_funct)\
(clustercmd_t *)PARANOIDV(memset)((alloc_funct)(CLUSTER_REQMEM(data_type, restdata_len))PARANOIDV(, 0, CLUSTER_REQMEM(data_type, restdata_len)))
#define CLUSTER_ALLOCA(clustercmd_p, data_type, data_p, restdata_len)\
CLUSTER_ALLOC(clustercmd_p, data_type, data_p, restdata_len, alloca)
#define CLUSTER_ALLOCA(data_type, restdata_len)\
CLUSTER_ALLOC(data_type, restdata_len, alloca)
#define CLUSTER_MALLOC(clustercmd_p, data_type, data_p, restdata_len)\
CLUSTER_ALLOC(clustercmd_p, data_type, data_p, restdata_len, xmalloc)
#define CLUSTER_MALLOC(data_type, restdata_len)\
CLUSTER_ALLOC(data_type, restdata_len, xmalloc)
// Types
... ... @@ -84,19 +81,12 @@ enum clustercmd_id {
};
typedef enum clustercmd_id clustercmd_id_t;
struct clustercmd {
uint32_t crc32;
uint8_t node_id;
uint8_t cmd_id;
uint32_t data_len;
uint32_t ts;
uint32_t serial;
char data_p[1];
struct clustercmd_getmyid {
char node_name[1];
};
typedef struct clustercmd clustercmd_t;
typedef struct clustercmd_getmyid clustercmd_getmyid_t;
struct clustercmd_setiddata {
uint8_t node_id;
uint32_t updatets;
char node_name[1];
};
... ... @@ -108,11 +98,28 @@ struct clustercmd_register {
typedef struct clustercmd_register clustercmd_register_t;
struct clustercmd_ack {
uint8_t node_id;
uint32_t serial;
};
typedef struct clustercmd_ack clustercmd_ack_t;
struct clustercmd {
uint8_t dst_node_id;
uint8_t src_node_id;
uint8_t cmd_id;
uint32_t crc32;
uint32_t data_len;
uint32_t ts;
uint32_t serial;
union {
char data_p[1];
clustercmd_setiddata_t data_setid;
clustercmd_register_t data_register;
clustercmd_ack_t data_ack;
clustercmd_getmyid_t data_getmyid;
};
};
typedef struct clustercmd clustercmd_t;
// Externs
extern int cluster_init(options_t *options_p, indexes_t *indexes_p);
... ...