redmine

cluster support continued

... ... @@ -47,12 +47,12 @@ pthread_t pthread_cluster;
nodeinfo_t nodeinfo[MAXNODES] = {{0}};
nodeinfo_t *nodeinfo_my = NULL;
uint8_t node_id_my = NODEID_NOID;
uint8_t node_ids[MAXNODES] = {0};
unsigned int cluster_timeout = 0;
uint8_t node_count = 0;
uint8_t node_online = 0;
nodeinfo_t *nodeinfo_my = NULL;
uint8_t node_id_my = NODEID_NOID;
uint8_t node_ids[MAX(MAXNODES, NODEID_NOID)+1] = {0};
unsigned int cluster_timeout = 0;
uint8_t node_count = 0;
uint8_t node_online = 0;
cluster_recvproc_funct_t recvproc_funct[COUNT_CLUSTERCMDID] = {NULL};
... ... @@ -386,6 +386,18 @@ int node_status_change(uint8_t node_id, uint8_t node_status) {
int cluster_send(clustercmd_t *clustercmd_p) {
clustercmd_p->h.src_node_id = node_id_my;
nodeinfo_t *nodeinfo_p;
nodeinfo_p = &nodeinfo[clustercmd_p->h.dst_node_id];
switch(nodeinfo_p->status) {
case NODESTATUS_DOESNTEXIST:
case NODESTATUS_OFFLINE:
printf_d("Debug: cluster_send(): There's no online node with id %u. Skipping sending.\n", clustercmd_p->h.dst_node_id);
return EADDRNOTAVAIL;
default:
break;
}
// CODE HERE
... ... @@ -473,11 +485,20 @@ static inline int cluster_recv_proc_set(clustercmd_id_t cmd_id, cluster_recvproc
*
*/
static inline int cluster_read(int sock, void *buf, size_t size) {
static inline int cluster_read(int sock, void *buf, size_t size, cluster_read_flags_t flags) {
static struct in_addr last_addr = {0};
struct sockaddr_in sa_in;
size_t sa_in_len = sizeof(sa_in);
int readret = recvfrom(sock, buf, size, MSG_WAITALL, (struct sockaddr *)&sa_in, (socklen_t * restrict)&sa_in_len);
if(flags & CLREAD_CONTINUE) {
if(memcmp(&last_addr, &sa_in.sin_addr, sizeof(last_addr))) {
printf_d("Debug: Get message from wrong source (%s != %s). Skipping it :(.\n", inet_ntoa(sa_in.sin_addr), inet_ntoa(last_addr));
size = 0;
return 0;
}
}
memcpy(&last_addr, &sa_in.sin_addr, sizeof(last_addr));
#ifdef PARANOID
if(!readret) {
... ... @@ -492,7 +513,7 @@ static inline int cluster_read(int sock, void *buf, size_t size) {
return errno != -1 ? errno : -2;
}
printf_dd("Debug2: cluster_read(): Got message from %s.\n", inet_ntoa(sa_in.sin_addr));
printf_dd("Debug2: cluster_read(): Got message from %s (len: %i, expected: %i).\n", inet_ntoa(sa_in.sin_addr), readret, size);
if(readret < size) {
// Too short message
... ... @@ -525,11 +546,18 @@ static inline int clustercmd_reject(clustercmd_t *clustercmd_p, uint8_t reason)
return cluster_send(clustercmd_rej_p);
}
#define CLUSTER_RECV_RETURNMESSAGE(clustercmd_p) {\
last_serial = (clustercmd_p)->h.serial;\
last_src_node_id = (clustercmd_p)->h.src_node_id;\
if(clustercmd_pp != NULL)\
*clustercmd_pp = (clustercmd_p);\
return 1;\
}
/**
* @brief Receives message from another nodes of the cluster.
* @brief Receives message from another nodes of the cluster. (not thread-safe)
*
* @param[i/o] clustercmd_pp Pointer to command structure pointer. It will be re-allocated every time when size is not enough.
* @param[i/o] size_p Pointer to size of allocated memory for command structure (see related to clustercmd_pp). The value of size will be updated on re-allocs.
* @param[out] clustercmd_pp Pointer to command structure pointer. It will be re-allocated every time when size is not enough. Allocated space will be reused on next calling.
* @param[i/o] timeout_p Pointer to timeout (in milliseconds). Timeout is assumed zero if the pointer is NULL. After waiting the event timeout value will be decreased on elapsed time.
*
* @retval 1 If there's new message.
... ... @@ -538,48 +566,21 @@ static inline int clustercmd_reject(clustercmd_t *clustercmd_p, uint8_t reason)
*
*/
static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned int *timeout_p) {
static int cluster_recv(clustercmd_t **clustercmd_pp, unsigned int *timeout_p) {
static clustercmd_t *clustercmd_p=NULL;
static size_t size=0;
static uint8_t last_src_node_id = NODEID_NOID;
static uint32_t last_serial = 0;
int timeout;
size_t size;
#ifdef PARANOID
// Checking arguments
if((clustercmd_pp == NULL) || (size_p == NULL)) {
printf_e("Error: cluster_recv() clustercmd_p or size_p is equals to NULL.\n");
errno = EINVAL;
return -1;
}
if((*clustercmd_pp != NULL) && (*size_p == 0)) {
printf_e("Error: cluster_recv(): *clustercmd_pp != NULL && *size_p == 0.\n");
errno = EINVAL;
return -1;
}
if((*clustercmd_pp == NULL) && (*size_p != 0)) {
printf_e("Error: cluster_recv(): *clustercmd_pp == NULL && *size_p != 0.\n");
errno = EINVAL;
return -1;
}
#endif
// Getting the timeout
timeout = (timeout_p == NULL ? 0 : *timeout_p);
// Getting size
if(*size_p) {
size = *size_p;
} else {
if(!size) {
size = BUFSIZ;
*clustercmd_pp = (clustercmd_t *)xmalloc(size);
clustercmd_p = (clustercmd_t *)xmalloc(size);
}
// Getting pointer to space to place clustercmd
clustercmd_t *clustercmd_p = *clustercmd_pp;
// Checking if there message is waiting in the window
if(last_src_node_id != NODEID_NOID) {
nodeinfo_t *nodeinfo_p = &nodeinfo[last_src_node_id];
... ... @@ -587,13 +588,8 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned i
clustercmdqueuedpacket_t *clustercmdqueuedpacket_p = (clustercmdqueuedpacket_t *)
g_hash_table_lookup(nodeinfo_p->serial2queuedpacket_ht, GINT_TO_POINTER(last_serial+1));
if(clustercmdqueuedpacket_p != NULL) {
/*
if()
free()
size_p
*/
}
if(clustercmdqueuedpacket_p != NULL)
CLUSTER_RECV_RETURNMESSAGE(&clustercmdqueuedpacket_p->cmd);
}
// Checking if there any event on read socket
... ... @@ -630,7 +626,7 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned i
//clustercmd_t *clustercmd_p = (clustercmd_t *)mmap(NULL, sizeof(clustercmdhdr_t), PROT_NONE,
// MAP_PRIVATE, sock, 0);
int ret;
if((ret=cluster_read(sock, (void *)clustercmd_p, sizeof(clustercmdhdr_t)))) {
if((ret=cluster_read(sock, (void *)clustercmd_p, sizeof(clustercmdhdr_t), CLREAD_NONE))) {
if(ret == -1) return 0; // Invalid message? Skipping.
printf_e("Error: cluster_recv(): Got error from cluster_read(): %s (errno %i).\n",
... ... @@ -645,7 +641,7 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned i
printf_d("Debug: cluster_recv(): hdr-CRC32 mismatch: %p != %p.\n",
(void*)(long)clustercmd_p->h.crc32.hdr, (void*)(long)crc32.hdr);
if((ret=clustercmd_reject(clustercmd_p, REJ_CRC32MISMATCH))) {
if((ret=clustercmd_reject(clustercmd_p, REJ_CRC32MISMATCH)) != EADDRNOTAVAIL) {
printf_e("Error: cluster_recv(): Got error while clustercmd_reject(): %s (errno: %i).\n",
strerror(ret), ret);
errno = ret;
... ... @@ -726,11 +722,8 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned i
}
// Is this the end of packet (packet without data)
if(clustercmd_p->h.data_len == 0) {
last_serial = clustercmd_p->h.serial;
last_src_node_id = clustercmd_p->h.src_node_id;
return 1;
}
if(clustercmd_p->h.data_len == 0)
CLUSTER_RECV_RETURNMESSAGE(clustercmd_p);
// Too big data?
if(clustercmd_p->h.data_len > CLUSTER_PACKET_MAXSIZE) {
... ... @@ -747,14 +740,12 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned i
// Need more space for this packet?
if(CLUSTERCMD_SIZE(*clustercmd_p) > size) {
size = CLUSTERCMD_SIZE(*clustercmd_p);
*clustercmd_pp = (clustercmd_t *)xrealloc((char *)clustercmd_p, size);
clustercmd_p = *clustercmd_pp;
*size_p = size;
size = CLUSTERCMD_SIZE(*clustercmd_p);
clustercmd_p = (clustercmd_t *)xrealloc((char *)clustercmd_p, size);
}
// Reading the data
if((ret=cluster_read(sock, (void *)clustercmd_p->data_p, clustercmd_p->h.data_len))) {
if((ret=cluster_read(sock, (void *)clustercmd_p->data_p, clustercmd_p->h.data_len, CLREAD_CONTINUE))) {
if(ret == -1) return 0;
printf_e("Error: cluster_recv(): Got error from cluster_read(): %s (errno %i).\n",
... ... @@ -769,7 +760,7 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned i
printf_d("Debug: cluster_recv(): dat-CRC32 mismatch: %p != %p.\n",
(void*)(long)clustercmd_p->h.crc32.dat, (void*)(long)crc32.dat);
if((ret=clustercmd_reject(clustercmd_p, REJ_CRC32MISMATCH))) {
if((ret=clustercmd_reject(clustercmd_p, REJ_CRC32MISMATCH)) != EADDRNOTAVAIL) {
printf_e("Error: cluster_recv(): Got error while clustercmd_reject(): %s (errno: %i).\n",
strerror(ret), ret);
errno = ret;
... ... @@ -777,9 +768,7 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned i
}
}
last_serial = clustercmd_p->h.serial;
last_src_node_id = clustercmd_p->h.src_node_id;
return 1;
CLUSTER_RECV_RETURNMESSAGE(clustercmd_p);
}
... ... @@ -793,12 +782,11 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, size_t *size_p, unsigned i
*
*/
static int cluster_recv_proc(unsigned int _timeout) {
static clustercmd_t *clustercmd_p=NULL;
int cluster_recv_proc(unsigned int _timeout) {
clustercmd_t *clustercmd_p;
int ret;
size_t size=0;
unsigned int timeout = _timeout;
while((ret=cluster_recv(&clustercmd_p, &size, &timeout))) {
while((ret=cluster_recv(&clustercmd_p, &timeout))) {
// Exit if error
if(ret == -1) {
printf_e("Error: cluster_recv_proc(): Got error while cluster_recv(): %s (%i).\n",
... ... @@ -967,6 +955,7 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
options_p = _options_p;
indexes_p = _indexes_p;
cluster_timeout = options_p->cluster_timeout * 1000;
node_status_change(NODEID_NOID, NODESTATUS_ONLINE);
// Initializing network routines
sock = socket(AF_INET, SOCK_DGRAM, 0);
... ... @@ -1109,6 +1098,7 @@ int cluster_deinit() {
cluster_io_deinit();
node_status_change(NODEID_NOID, NODESTATUS_DOESNTEXIST);
#ifdef VERYPARANOID
int i=0;
#endif
... ...
... ... @@ -72,6 +72,13 @@ enum crc32_calc {
};
typedef enum crc32_calc crc32_calc_t;
enum cluster_read_flags {
CLREAD_NONE = 0x00,
CLREAD_CONTINUE = 0x01,
CLREAD_ALL = 0xff
};
typedef enum cluster_read_flags cluster_read_flags_t;
enum nodestatus {
NODESTATUS_DOESNTEXIST = 0,
NODESTATUS_OFFLINE,
... ...