redmine

cluster support continued

... ... @@ -39,7 +39,11 @@
// Global variables. They will be initialized in cluster_init()
int sock = -1;
int sock_i = -1;
struct sockaddr_in sa_i = {0};
int sock_o = -1;
struct sockaddr_in sa_o = {0};
options_t *options_p = NULL;
indexes_t *indexes_p = NULL;
... ... @@ -92,7 +96,7 @@ static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *cluste
}
// Calculating required memory space in buffer for the message
size_t clustercmd_size = CLUSTERCMD_SIZE(*clustercmd_p);
size_t clustercmd_size = CLUSTERCMD_SIZE(clustercmd_p);
size_t required_space = sizeof(clustercmdqueuedpackethdr_t) + clustercmd_size;
// Searching occupied boundaries in the window_p->buffer
... ... @@ -390,6 +394,7 @@ int cluster_send(clustercmd_t *clustercmd_p) {
nodeinfo_p = &nodeinfo[clustercmd_p->h.dst_node_id];
// Checking if the node online
switch(nodeinfo_p->status) {
case NODESTATUS_DOESNTEXIST:
case NODESTATUS_OFFLINE:
... ... @@ -399,61 +404,19 @@ int cluster_send(clustercmd_t *clustercmd_p) {
break;
}
// CODE HERE
printf_ddd("Debug3: cluster_send(): Sending: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, crc32: %u, data_len: %u}\n",
clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id, clustercmd_p->h.crc32, clustercmd_p->h.data_len);
return 0;
}
/**
* @brief (syncop) Sends message to another nodes of the cluster and waits for ACK-answers. (with skipping all other packets)
*
* @param[in] clustercmd_p Command structure pointer.
*
* @retval zero Successfully send.
* @retval non-zero Got error, while sending.
*
* /
int cluster_send_ack(clustercmd_t *clustercmd_p) {
uint32_t cmd_serial = clustercmd_p->serial;
// Putting the message into output window
clustercmd_window_add(&window_o, clustercmd_p, nodeinfo_my->serial2queuedpacket_ht);
// Sending the message
int ret = cluster_send(clustercmd_p);
if(ret) {
printf_e("Error: cluster_send_ack(): Got error from cluster_send(): %s (errno %i).\n", strerror(ret), ret);
return ret;
}
sendto(sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED(clustercmd_p), 0, &sa_o, sizeof(sa_o));
// Waiting for ACK-messages from all registered nodes
{
clustercmd_t *clustercmd_p=NULL;
size_t size=0;
unsigned int timeout = cluster_timeout;
while((ret=cluster_recv(&clustercmd_p, &size, &timeout)) && (timeout>0)) {
// Skipping not ACK-messages.
CLUSTER_LOOP_EXPECTCMD(clustercmd_p, CLUSTERCMDID_ACK, ret);
// Is this an acknowledge packet for us? Skipping if not.
clustercmd_ack_t *data_ack_p = &clustercmd_p->data_ack;
if(clustercmd_p->h.dst_node_id != node_id_my)
continue;
// Is this acknowledge packet about the commend we sent? Skipping if not.
if(data_ack_p->serial != cmd_serial)
continue;
}
free(clustercmd_p);
}
// Finishing
printf_ddd("Debug3: cluster_send(): Sending: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, crc32: %u, data_len: %u}\n",
clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id, clustercmd_p->h.crc32, clustercmd_p->h.data_len);
return 0;
}
*/
/**
* @brief Sets message processing functions for cluster_recv_proc() function for specified command type
... ... @@ -528,9 +491,8 @@ static inline int cluster_read(int sock, void *buf, size_t size, cluster_read_fl
/**
* @brief Sends packet-reject notification
*
* @param[in] sock The socket descriptor
* @param[in] buf Pointer to buffer
* @param[in] size Amount of bytes to read
* @param[in] clustercmd_p Pointer to clustercmd that will be rejected
* @param[in] reason Reason why the clustercmd is denied
*
* @retval zero Successful
* @retval non-zero If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
... ... @@ -598,12 +560,12 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, unsigned int *timeout_p) {
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(sock, &rfds);
FD_SET(sock_i, &rfds);
tv.tv_sec = timeout / 1000;
tv.tv_usec = timeout % 1000;
int selret = select(sock+1, &rfds, NULL, NULL, &tv);
int selret = select(sock_i+1, &rfds, NULL, NULL, &tv);
// Remembering the rest part of timeout
if(timeout_p != NULL)
... ... @@ -626,7 +588,7 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, unsigned int *timeout_p) {
//clustercmd_t *clustercmd_p = (clustercmd_t *)mmap(NULL, sizeof(clustercmdhdr_t), PROT_NONE,
// MAP_PRIVATE, sock, 0);
int ret;
if((ret=cluster_read(sock, (void *)clustercmd_p, sizeof(clustercmdhdr_t), CLREAD_NONE))) {
if((ret=cluster_read(sock_i, (void *)clustercmd_p, sizeof(clustercmdhdr_t), CLREAD_NONE))) {
if(ret == -1) return 0; // Invalid message? Skipping.
printf_e("Error: cluster_recv(): Got error from cluster_read(): %s (errno %i).\n",
... ... @@ -739,13 +701,13 @@ static int cluster_recv(clustercmd_t **clustercmd_pp, unsigned int *timeout_p) {
}
// Need more space for this packet?
if(CLUSTERCMD_SIZE(*clustercmd_p) > size) {
size = CLUSTERCMD_SIZE(*clustercmd_p);
if(CLUSTERCMD_SIZE(clustercmd_p) > size) {
size = CLUSTERCMD_SIZE(clustercmd_p);
clustercmd_p = (clustercmd_t *)xrealloc((char *)clustercmd_p, size);
}
// Reading the data
if((ret=cluster_read(sock, (void *)clustercmd_p->data_p, clustercmd_p->h.data_len, CLREAD_CONTINUE))) {
if((ret=cluster_read(sock_i, (void *)clustercmd_p->data_p, clustercmd_p->h.data_len, CLREAD_CONTINUE))) {
if(ret == -1) return 0;
printf_e("Error: cluster_recv(): Got error from cluster_read(): %s (errno %i).\n",
... ... @@ -958,39 +920,89 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
node_status_change(NODEID_NOID, NODESTATUS_ONLINE);
// Initializing network routines
sock = socket(AF_INET, SOCK_DGRAM, 0);
// Input socket
// Creating socket
sock_i = socket(AF_INET, SOCK_DGRAM, 0);
if(sock_i < 0) {
printf_e("cluster_init(): Cannot create socket for input traffic: %s (errno: %i)\n", strerror(errno), errno);
return errno;
}
// Enable SO_REUSEADDR to allow multiple instances of this application to receive copies
// of the multicast datagrams.
int reuse = 1;
if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,(char *)&reuse, sizeof(reuse)) < 0) {
if(setsockopt(sock_i, SOL_SOCKET, SO_REUSEADDR,(char *)&reuse, sizeof(reuse)) < 0) {
printf_e("Error: cluster_init(): Got error while setsockopt(): %s (errno: %i)\n",
strerror(errno), errno);
return errno;
}
struct sockaddr_in sa = {0};
(void)sa; // Anti-warning
// Binding
sa.sin_family = AF_INET;
sa.sin_port = htons(options_p->cluster_mcastipport);
sa.sin_addr.s_addr = INADDR_ANY;
sa_i.sin_family = AF_INET;
sa_i.sin_port = htons(options_p->cluster_mcastipport);
sa_i.sin_addr.s_addr = INADDR_ANY;
if(bind(sock, (struct sockaddr*)&sa, sizeof(sa))) {
if(bind(sock_i, (struct sockaddr*)&sa_i, sizeof(sa_i))) {
printf_e("Error: cluster_init(): Got error while bind(): %s (errno: %i)\n",
strerror(errno), errno);
return errno;
}
// Joining to multicast group
struct ip_mreq group;
group.imr_interface.s_addr = inet_addr(options_p->cluster_iface);
group.imr_multiaddr.s_addr = inet_addr(options_p->cluster_mcastipaddr);
if(setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
if(setsockopt(sock_i, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(char *)&group, sizeof(group)) < 0) {
printf_e("Error: cluster_init(): Cannot setsockopt() to enter to membership %s -> %s\n",
options_p->cluster_iface, options_p->cluster_mcastipaddr);
return errno;
}
// Output socket
// Creating socket
sock_o = socket(AF_INET, SOCK_DGRAM, 0);
if(sock_o < 0) {
printf_e("cluster_init(): Cannot create socket for output traffic: %s (errno: %i)\n", strerror(errno), errno);
return errno;
}
// Initializing the group sockaddr structure
sa_o.sin_family = AF_INET;
sa_o.sin_port = htons(options_p->cluster_mcastipport);
sa_o.sin_addr.s_addr = inet_addr(options_p->cluster_mcastipaddr);
// Disable looping back output datagrams
{
char loopch = 0;
if(setsockopt(sock_o, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch))<0) {
printf_e("Error: Cannot disable loopback for output socket: %s (errno: %i).\n", strerror(errno), errno);
return errno;
}
}
// Setting local interface for output traffic
{
struct in_addr addr_o;
addr_o.s_addr = inet_addr(options_p->cluster_iface);
if(setsockopt(sock_o, IPPROTO_IP, IP_MULTICAST_IF, &addr_o, sizeof(addr_o)) < 0) {
printf_e("Error: Cannot set local interface for outbound traffic: %s (errno: %i)\n", strerror(errno), errno);
return errno;
}
}
// Initializing another routines
clustercmd_crc32_calc_init();
... ... @@ -1112,13 +1124,17 @@ int cluster_deinit() {
node_status_change(0, NODESTATUS_DOESNTEXIST);
}
close(sock);
close(sock_i);
close(sock_o);
#ifdef VERYPARANOID
memset(node_info, 0, sizeof(node_info));
node_count = 0;
node_online = 0;
node_id_my = NODEID_NOID;
memset(&sa_i, 0, sizeof(sa_i));
memset(&sa_o, 0, sizeof(sa_o));
#endif
return ret;
... ...
... ... @@ -43,12 +43,12 @@
(sizeof(clustercmdhdr_t) + sizeof(data_type) + (restdata_len) + 2)
// calculated required memory for clustercmd packet with padding
#define CLUSTER_PREQMEM(data_type, restdata_len) \
((CLUSTER_REQMEM(data_type, restdata_len)+3)%4)
#define CLUSTER_REQMEM_PADDED(data_type, restdata_len) \
CLUSTER_PAD(CLUSTER_REQMEM(data_type, restdata_len))
// allocated memory for clustercmd packet with padding
#define CLUSTER_ALLOC(data_type, restdata_len, alloc_funct)\
(clustercmd_t *)PARANOIDV(memset)((alloc_funct)(CLUSTER_PREQMEM(data_type, restdata_len))PARANOIDV(, 0, CLUSTER_PREQMEM(data_type, restdata_len)))
(clustercmd_t *)PARANOIDV(memset)((alloc_funct)(CLUSTER_REQMEM_PADDED(data_type, restdata_len))PARANOIDV(, 0, CLUSTER_REQMEM_PADDED(data_type, restdata_len)))
// allocated memory for clustercmd packet with padding with alloca()
#define CLUSTER_ALLOCA(data_type, restdata_len)\
... ... @@ -60,7 +60,10 @@
// Common macros
#define CLUSTERCMD_SIZE(clustercmd) (sizeof(clustercmdhdr_t) + (clustercmd).h.data_len)
#define CLUSTER_PAD(size) (((size) + 3) % 4)
#define CLUSTERCMD_SIZE(clustercmd_p) (sizeof(clustercmdhdr_t) + (*(clustercmd_p)).h.data_len)
#define CLUSTERCMD_SIZE_PADDED(clustercmd_p) (sizeof(clustercmdhdr_t) + CLUSTER_PAD((*(clustercmd_p)).h.data_len))
// Types
... ...