Blame view

cluster.c 38.3 KB
redmine authored
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
    clsync - file tree sync utility based on fanotify and inotify
    
    Copyright (C) 2013  Dmitry Yu Okunev <xai@mephi.ru> 0x8E30679C
    
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

redmine authored
20 21 22 23 24 25
/*
    Hello, dear developer.

    Cluster technologies are almost always very difficult. So I'll try to
    fill this code with comments. Enjoy ;)

redmine authored
26 27 28
    Also you can ask me directly by e-mail or IRC, if something seems too
    hard.

redmine authored
29 30 31
                                                           -- 0x8E30679C
 */

redmine authored
32
#ifdef CLUSTER_SUPPORT
redmine authored
33

redmine authored
34 35
#include "common.h"
#include "cluster.h"
redmine authored
36
#include "sync.h"
redmine authored
37 38
#include "output.h"
#include "malloc.h"
redmine authored
39

redmine authored
40 41
// Global variables. They will be initialized in cluster_init()

redmine authored
42
int sock			= -1;
redmine authored
43

redmine authored
44 45 46 47 48 49
options_t  *options_p		= NULL;
indexes_t  *indexes_p		= NULL;
pthread_t   pthread_cluster;

nodeinfo_t nodeinfo[MAXNODES]   = {{0}};

redmine authored
50 51 52 53 54 55
nodeinfo_t *nodeinfo_my				= NULL;
uint8_t	node_id_my				= NODEID_NOID;
uint8_t node_ids[MAX(MAXNODES, NODEID_NOID)+1]	= {0};
unsigned int cluster_timeout			= 0;
uint8_t node_count				= 0;
uint8_t node_online				= 0;
redmine authored
56

redmine authored
57 58
cluster_recvproc_funct_t recvproc_funct[COUNT_CLUSTERCMDID] = {NULL};

redmine authored
59 60
window_t window_i = {0};
window_t window_o = {0};
redmine authored
61

redmine authored
62
uint32_t clustercmd_crc32_table[1<<8];
redmine authored
63 64

/**
redmine authored
65
 * @brief 			Adds command (message) to window_p->buffer
redmine authored
66 67 68 69 70 71 72 73
 * 
 * @param[in]	clustercmd_p	Pointer to cluster cmd to put into window
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while deleting the message. The error-code is placed into returned value.
 * 
 */

redmine authored
74 75 76 77 78 79 80
static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *clustercmd_p, GHashTable *serial2queuedpacket_ht) {
#ifdef PARANOID
	if(clustercmd_p->h.src_node_id >= MAXNODES) {
		printf_e("Error: clustercmd_window_add(): Invalid src_node_id: %i.\n", clustercmd_p->h.src_node_id);
		return EINVAL;
	}
#endif
redmine authored
81

redmine authored
82 83 84
	// Checking if there enough window_p->cells allocated
	if(window_p->packets_len >= window_p->size) {
		window_p->size 		+= ALLOC_PORTION;
redmine authored
85 86 87 88

#		define CXREALLOC(a, size) \
			(typeof(a))xrealloc((char *)(a), (size_t)(size) * sizeof(*(a)))
	
redmine authored
89 90
		window_p->packets_id     = CXREALLOC(window_p->packets_id,	 window_p->size);
		window_p->occupied_sides = CXREALLOC(window_p->occupied_sides, window_p->size);
redmine authored
91 92 93 94
#		undef CXREALLOC
	}

	// Calculating required memory space in buffer for the message
redmine authored
95
	size_t clustercmd_size = CLUSTERCMD_SIZE(*clustercmd_p);
redmine authored
96
	size_t required_space  = sizeof(clustercmdqueuedpackethdr_t) + clustercmd_size;
redmine authored
97

redmine authored
98
	// Searching occupied boundaries in the window_p->buffer
redmine authored
99
	size_t occupied_left = SIZE_MAX, occupied_right=0;
redmine authored
100 101
	int i;
	i = 0;
redmine authored
102
	while(i < window_p->packets_len) {
redmine authored
103
		unsigned int window_id;
redmine authored
104
		window_id  = window_p->packets_id[i];
redmine authored
105

redmine authored
106 107
		occupied_left  = MIN(occupied_left,  window_p->occupied_sides[window_id].left);
		occupied_right = MAX(occupied_right, window_p->occupied_sides[window_id].right);
redmine authored
108 109 110
	}

	printf_ddd("Debug3: clustercmd_window_add(): w.size == %u, b_left == %u; b_right == %u; w.buf_size == %u; r_space == %u\n",
redmine authored
111
		window_p->size, occupied_left, occupied_right, window_p->buf_size, required_space);
redmine authored
112 113 114

	// Trying to find a space in the buffer to place message
	size_t buf_coordinate = SIZE_MAX;
redmine authored
115
	if(window_p->packets_len) {
redmine authored
116 117 118
		// Free space from left  (start of buffer)
		size_t free_left  = occupied_left;
		// Free space from right (end of buffer)
redmine authored
119
		size_t free_right = window_p->buf_size - occupied_right;
redmine authored
120 121 122 123 124

		if(free_left  > required_space)
			buf_coordinate = free_left - required_space;
		else
		if(free_right > required_space)
redmine authored
125 126 127
			buf_coordinate   = occupied_right;
		else
		{
redmine authored
128 129 130 131
			// Not enough space in the window_p->buffer;
			window_p->buf_size += MAX(CLUSTER_WINDOW_BUFSIZE_PORTION, required_space);
			window_p->buf	    = xrealloc(window_p->buf, window_p->buf_size);
			buf_coordinate      = occupied_right;
redmine authored
132 133
		}
		printf_ddd("Debug3: clustercmd_window_add(): f_left == %u; f_right == %u; b_coord == %u; w.buf_size == %u",
redmine authored
134
			free_left, free_right, buf_coordinate, window_p->buf_size);
redmine authored
135 136
	}

redmine authored
137
	unsigned int window_id;
redmine authored
138

redmine authored
139
	// packet id in window
redmine authored
140
	window_id = window_p->packets_len;
redmine authored
141

redmine authored
142
	// reserving the space in buffer
redmine authored
143 144
	window_p->occupied_sides[window_id].left  = buf_coordinate;
	window_p->occupied_sides[window_id].right = buf_coordinate + required_space;
redmine authored
145

redmine authored
146
	// placing information into buffer
redmine authored
147
	clustercmdqueuedpacket_t *queuedpacket_p;
redmine authored
148

redmine authored
149
	queuedpacket_p = (clustercmdqueuedpacket_t *)&window_p->buf[buf_coordinate];
redmine authored
150

redmine authored
151 152
	memset(&queuedpacket_p->h,	0,		sizeof(queuedpacket_p->h));
	memcpy(&queuedpacket_p->cmd, 	clustercmd_p, 	clustercmd_size);
redmine authored
153

redmine authored
154
	queuedpacket_p->h.window_id  = window_id;
redmine authored
155 156

	// remembering new packet
redmine authored
157
	g_hash_table_insert(serial2queuedpacket_ht, GINT_TO_POINTER(clustercmd_p->h.serial), queuedpacket_p);
redmine authored
158
	window_p->packets_id[window_p->packets_len++] = window_id;
redmine authored
159 160 161 162 163 164

	return 0;
}


/**
redmine authored
165
 * @brief 			Removes command (message) from window_p->buffer
redmine authored
166
 * 
redmine authored
167
 * @param[in]	queuedpacket_p	Pointer to queuedpacket structure of the command (message)
redmine authored
168 169 170 171 172 173
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while deleting the message. The error-code is placed into returned value.
 * 
 */

redmine authored
174
static inline int clustercmd_window_del(window_t *window_p, clustercmdqueuedpacket_t *queuedpacket_p, GHashTable *serial2queuedpacket_ht) {
redmine authored
175
#ifdef PARANOID
redmine authored
176 177
	if(!window_p->size) {
		printf_e("Error: clustercmd_window_del(): window not allocated.\n");
redmine authored
178 179
		return EINVAL;
	}
redmine authored
180 181
	if(!window_p->packets_len) {
		printf_e("Error: clustercmd_window_del(): there already no packets in the window.\n");
redmine authored
182 183 184 185
		return EINVAL;
	}
#endif

redmine authored
186 187
	unsigned int window_id_del  =  queuedpacket_p->h.window_id;
	unsigned int window_id_last = --window_p->packets_len;
redmine authored
188

redmine authored
189
	// Forgeting the packet
redmine authored
190

redmine authored
191
	// 	Moving the last packet into place of deleting packet, to free the tail in "window_p->packets_id" and "window_p->occupied_sides"
redmine authored
192 193 194
	if(window_id_del != window_id_last) {
		printf_ddd("Debug3: clustercmd_window_del(): %i -> %i\n", window_id_last, window_id_del);

redmine authored
195
		window_p->packets_id[window_id_del] = window_p->packets_id[window_id_last];
redmine authored
196

redmine authored
197
		memcpy(&window_p->occupied_sides[window_id_del], &window_p->occupied_sides[window_id_last], sizeof(window_p->occupied_sides[window_id_del]));
redmine authored
198 199
	}

redmine authored
200
	// 	Removing from hash table
redmine authored
201
	g_hash_table_remove(serial2queuedpacket_ht, GINT_TO_POINTER(queuedpacket_p->cmd.h.serial));
redmine authored
202 203 204 205

	return 0;
}

redmine authored
206 207

/**
redmine authored
208
 * @brief 			Initializes table for CRC32 calculations
redmine authored
209 210 211 212 213 214 215
 * 
 * @param[in]	clustercmd_p	Pointer to clustercmd
 * 
 * @retval	uint32_t	CRC32 value of clustecmd
 * 
 */

redmine authored
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
/*
   Name  : CRC-32
   Poly  : 0x04C11DB7    x^32 + x^26 + x^23 + x^22 + x^16 + x^12 + x^11 
                        + x^10 + x^8 + x^7 + x^5 + x^4 + x^2 + x + 1
   Init  : 0xFFFFFFFF
   Revert: true
   XorOut: 0xFFFFFFFF
   Check : 0xCBF43926 ("123456789")
 */
int clustercmd_crc32_calc_init() {
	int i;
	uint32_t crc32;

	i=0;
	while(i < (1<<8)) {
		int j;
		crc32 = i;

		j = 0;
		while(j < 8) {
			crc32  =  (crc32 & 1) ? (crc32 >> 1) ^ 0xEDB88320 : crc32 >> 1;
			j++;
		}
	
		clustercmd_crc32_table[i] = crc32;
		i++;
	};

redmine authored
244 245 246
	return 0;
}

redmine authored
247 248 249 250 251
/**
 * @brief 			Calculates CRC32 for clustercmd
 * 
 * @param[in]	clustercmd_p	Pointer to clustercmd
 * 
redmine authored
252 253
 * @retval	zero		On successful calculation
 * @retval	non-zero	On error. Error-code is placed into returned value.
redmine authored
254 255 256 257 258 259 260 261 262 263 264 265
 * 
 */

/*
   Name  : CRC-32
   Poly  : 0x04C11DB7    x^32 + x^26 + x^23 + x^22 + x^16 + x^12 + x^11 
                        + x^10 + x^8 + x^7 + x^5 + x^4 + x^2 + x + 1
   Init  : 0xFFFFFFFF
   Revert: true
   XorOut: 0xFFFFFFFF
   Check : 0xCBF43926 ("123456789")
 */
redmine authored
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
int clustercmd_crc32_calc(clustercmd_t *clustercmd_p, clustercmdcrc32_t *clustercmdcrc32_p, crc32_calc_t flags) {

	if(flags & CRC32_CALC_HEADER) {
		uint32_t crc32;
		clustercmdcrc32_t crc32_save;

		// Preparing
		memcpy(&crc32_save, 	&clustercmd_p->h.crc32, sizeof(clustercmdcrc32_t));
		memset(&clustercmd_p->h.crc32, 		0, 	sizeof(clustercmdcrc32_t));
		crc32 = 0xFFFFFFFF;

		uint32_t size = sizeof(clustercmdhdr_t);
		char    *ptr  = (char *)&clustercmd_p->h;

		// Calculating
		crc32 = 0;
		while(size--) 
			crc32 = clustercmd_crc32_table[(crc32 ^ *(ptr++)) & 0xFF] ^ (crc32 >> 8);

		// Ending
		memcpy(&clustercmd_p->h.crc32, &crc32_save, sizeof(clustercmdcrc32_t));
		clustercmdcrc32_p->hdr = crc32 ^ 0xFFFFFFFF;
	}
redmine authored
289

redmine authored
290 291
	if(flags & CRC32_CALC_DATA) {
		uint32_t crc32;
redmine authored
292

redmine authored
293 294
		uint32_t size = clustercmd_p->h.data_len;
		char    *ptr  = clustercmd_p->data_p;
redmine authored
295

redmine authored
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
#ifdef PARANOID
		if(size & 0x3) {
			printf_e("Error: clustercmd_crc32_calc(): clustercmd_p->h.data_len&0x3 != 0: %u\n",
				clustercmd_p->h.data_len);
			return EINVAL;
		}
#endif

		// Calculating
		crc32 = 0;
		while(size--) 
			crc32 = clustercmd_crc32_table[(crc32 ^ *(ptr++)) & 0xFF] ^ (crc32 >> 8);

		// Ending
		clustercmdcrc32_p->dat = crc32 ^ 0xFFFFFFFF;
	}
redmine authored
312

redmine authored
313
	return 0;
redmine authored
314
}
redmine authored
315 316

/**
redmine authored
317
 * @brief 			Changes information about node's status in nodeinfo[] and updates connected information.
redmine authored
318
 * 
redmine authored
319 320
 * @param[in]	node_id		node_id of the node.
 * @param[in]	node_status	New node status.
redmine authored
321 322 323
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while changing the status. The error-code is placed into returned value.
redmine authored
324 325 326
 * 
 */

redmine authored
327
int node_status_change(uint8_t node_id, uint8_t node_status) {
redmine authored
328
	uint8_t node_status_old = nodeinfo[node_id].status;
redmine authored
329 330 331 332 333 334 335
	nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];

	if((node_status == NODESTATUS_DOESNTEXIST) && (node_status_old != NODESTATUS_DOESNTEXIST)) {
		node_count--;

		node_ids[nodeinfo_p->num] = node_ids[node_count];
		g_hash_table_destroy(nodeinfo_p->modtime_ht);
redmine authored
336 337
		g_hash_table_destroy(nodeinfo_p->serial2queuedpacket_ht);

redmine authored
338
#ifdef VERYPARANOID
redmine authored
339
		memset(nodeinfo_p, 0, sizeof(*nodeinfo_p));
redmine authored
340
#endif
redmine authored
341 342 343
		return 0;
	}

redmine authored
344 345
	if(node_status == node_status_old)
		return 0;
redmine authored
346

redmine authored
347

redmine authored
348 349
	switch(node_status_old) {
		case NODESTATUS_DOESNTEXIST:
redmine authored
350 351
			nodeinfo_p->id  = node_id;
			nodeinfo_p->num = node_count;
redmine authored
352 353 354
			nodeinfo_p->modtime_ht 		   = g_hash_table_new_full(g_str_hash,	  g_str_equal,	  free, 0);
			nodeinfo_p->serial2queuedpacket_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0,    0);

redmine authored
355 356
			node_ids[node_count] = node_id;

redmine authored
357
			node_count++;
redmine authored
358 359
#ifdef PARANOID
			if(node_status == NODESTATUS_OFFLINE)
redmine authored
360
				break; // In case of NODESTATUS_DOESNTEXIST -> NODESTATUS_OFFLINE, node_online should be increased
redmine authored
361
#endif
redmine authored
362 363 364 365 366 367 368 369
		case NODESTATUS_OFFLINE:
			node_online++;
			break;
		default:
			if(node_status == NODESTATUS_OFFLINE)
				node_online--;
			break;
	}
redmine authored
370

redmine authored
371
	nodeinfo[node_id].status = node_status;
redmine authored
372 373 374 375

	return 0;
}

redmine authored
376 377

/**
redmine authored
378 379 380 381 382 383 384 385 386 387 388
 * @brief 			Sends message to another nodes of the cluster.
 * 
 * @param[in]	clustercmd_p	Command structure pointer.
 *
 * @retval	zero 		Successfully send.
 * @retval	non-zero 	Got error, while sending.
 * 
 */

int cluster_send(clustercmd_t *clustercmd_p) {
	clustercmd_p->h.src_node_id = node_id_my;
redmine authored
389 390 391 392 393 394 395 396 397 398 399 400
	nodeinfo_t *nodeinfo_p;

	nodeinfo_p = &nodeinfo[clustercmd_p->h.dst_node_id];

	switch(nodeinfo_p->status) {
		case NODESTATUS_DOESNTEXIST:
		case NODESTATUS_OFFLINE:
			printf_d("Debug: cluster_send(): There's no online node with id %u. Skipping sending.\n", clustercmd_p->h.dst_node_id);
			return EADDRNOTAVAIL;
		default:
			break;
	}
redmine authored
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458

	// CODE HERE

	printf_ddd("Debug3: cluster_send(): Sending: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, crc32: %u, data_len: %u}\n",
		clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id, clustercmd_p->h.crc32, clustercmd_p->h.data_len);

	return 0;
}


/**
 * @brief 			(syncop) Sends message to another nodes of the cluster and waits for ACK-answers. (with skipping all other packets)
 * 
 * @param[in]	clustercmd_p	Command structure pointer.
 *
 * @retval	zero 		Successfully send.
 * @retval	non-zero 	Got error, while sending.
 * 
 * /

int cluster_send_ack(clustercmd_t *clustercmd_p) {
	uint32_t cmd_serial = clustercmd_p->serial;

	// Sending the message
	int ret = cluster_send(clustercmd_p);
	if(ret) {
		printf_e("Error: cluster_send_ack(): Got error from cluster_send(): %s (errno %i).\n", strerror(ret), ret);
		return ret;
	}

	// Waiting for ACK-messages from all registered nodes
	{
		clustercmd_t *clustercmd_p=NULL;
		size_t size=0;
		unsigned int timeout = cluster_timeout;
		while((ret=cluster_recv(&clustercmd_p, &size, &timeout)) && (timeout>0)) {
			// 	Skipping not ACK-messages.
			CLUSTER_LOOP_EXPECTCMD(clustercmd_p, CLUSTERCMDID_ACK, ret);

			// 	Is this an acknowledge packet for us? Skipping if not.
			clustercmd_ack_t *data_ack_p = &clustercmd_p->data_ack;
			if(clustercmd_p->h.dst_node_id != node_id_my)
				continue;

			// 	Is this acknowledge packet about the commend we sent? Skipping if not.
			if(data_ack_p->serial != cmd_serial)
				continue;

			
		}
		free(clustercmd_p);
	}

	return 0;
}
*/

/**
redmine authored
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
 * @brief 			Sets message processing functions for cluster_recv_proc() function for specified command type
 * 
 * @param[in]	cmd_id		The command type
 * @param[in]	procfunct	The processing function for messages with specified cmd_id
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while setting processing function. The error-code is placed into returned value.
 * 
 */

static inline int cluster_recv_proc_set(clustercmd_id_t cmd_id, cluster_recvproc_funct_t procfunct) {
	recvproc_funct[cmd_id] = procfunct;

	return 0;
}


redmine authored
476
/**
redmine authored
477
 * @brief 			Safe wrapper for recvfrom() function
redmine authored
478 479 480 481 482 483 484 485 486 487
 * 
 * @param[in]	sock		The socket descriptor
 * @param[in]	buf		Pointer to buffer
 * @param[in]	size		Amount of bytes to read
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
 * 
 */

redmine authored
488 489
static inline int cluster_read(int sock, void *buf, size_t size, cluster_read_flags_t flags) {
	static struct in_addr last_addr = {0};
redmine authored
490 491 492 493
	struct sockaddr_in sa_in;
	size_t sa_in_len = sizeof(sa_in);

	int readret = recvfrom(sock, buf, size, MSG_WAITALL, (struct sockaddr *)&sa_in, (socklen_t * restrict)&sa_in_len);
redmine authored
494 495 496 497 498 499 500 501
	if(flags & CLREAD_CONTINUE) {
		if(memcmp(&last_addr, &sa_in.sin_addr, sizeof(last_addr))) {
			printf_d("Debug: Get message from wrong source (%s != %s). Skipping it :(.\n", inet_ntoa(sa_in.sin_addr), inet_ntoa(last_addr));
			size = 0;
			return 0;
		}
	}
	memcpy(&last_addr, &sa_in.sin_addr, sizeof(last_addr));
redmine authored
502

redmine authored
503 504
#ifdef PARANOID
	if(!readret) {
redmine authored
505
		printf_e("Error: cluster_read(): recvfrom() returned 0. This shouldn't happend. Exit.");
redmine authored
506 507 508 509
		return EINVAL;
	}
#endif
	if(readret < 0) {
redmine authored
510
		printf_e("Error: cluster_read(): recvfrom() returned %i. "
redmine authored
511 512 513 514 515
			"Seems, that something wrong with network socket: %s (errno %i).\n", 
			readret, strerror(errno), errno);
		return errno != -1 ? errno : -2;
	}

redmine authored
516
	printf_dd("Debug2: cluster_read(): Got message from %s (len: %i, expected: %i).\n", inet_ntoa(sa_in.sin_addr), readret, size);
redmine authored
517

redmine authored
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
	if(readret < size) {
		// Too short message
		printf_e("Warning: cluster_read(): Got too short message from node. Ignoring it.\n");
		return -1;
	}

	return 0;
}


/**
 * @brief 			Sends packet-reject notification
 * 
 * @param[in]	sock		The socket descriptor
 * @param[in]	buf		Pointer to buffer
 * @param[in]	size		Amount of bytes to read
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
 * 
 */

static inline int clustercmd_reject(clustercmd_t *clustercmd_p, uint8_t reason) {
	clustercmd_t *clustercmd_rej_p 		= CLUSTER_ALLOCA(clustercmd_rej_t, 0);
	clustercmd_rej_p->h.dst_node_id 	= clustercmd_p->h.src_node_id;
	clustercmd_rej_p->data_rej.serial 	= clustercmd_p->h.serial;
	clustercmd_rej_p->data_rej.reason 	= reason;

	return cluster_send(clustercmd_rej_p);
}

redmine authored
549 550 551 552 553 554 555 556

#define CLUSTER_RECV_RETURNMESSAGE(clustercmd_p) {\
		last_serial      = (clustercmd_p)->h.serial;\
		last_src_node_id = (clustercmd_p)->h.src_node_id;\
		if(clustercmd_pp != NULL)\
			*clustercmd_pp = (clustercmd_p);\
		return 1;\
}
redmine authored
557
/**
redmine authored
558
 * @brief 			Receives message from another nodes of the cluster. (not thread-safe)
redmine authored
559
 * 
redmine authored
560
 * @param[out]	clustercmd_pp	Pointer to command structure pointer. It will be re-allocated every time when size is not enough. Allocated space will be reused on next calling.
redmine authored
561
 * @param[i/o]	timeout_p	Pointer to timeout (in milliseconds). Timeout is assumed zero if the pointer is NULL. After waiting the event timeout value will be decreased on elapsed time.
redmine authored
562 563 564 565 566 567 568
 * 
 * @retval	1		If there's new message.
 * @retval	0		If there's no new messages.
 * @retval	-1		If got error while receiving. The error-code is placed into "errno".
 * 
 */

redmine authored
569 570 571
static int cluster_recv(clustercmd_t **clustercmd_pp, unsigned int *timeout_p) {
	static clustercmd_t *clustercmd_p=NULL;
	static size_t size=0;
redmine authored
572 573
	static uint8_t  last_src_node_id = NODEID_NOID;
	static uint32_t last_serial	 = 0;
redmine authored
574 575 576 577 578
	int timeout;

	// Getting the timeout
	timeout = (timeout_p == NULL ? 0 : *timeout_p);

redmine authored
579
	if(!size) {
redmine authored
580
		size = BUFSIZ;
redmine authored
581
		clustercmd_p = (clustercmd_t *)xmalloc(size);
redmine authored
582 583
	}

redmine authored
584 585 586 587 588 589 590
	// Checking if there message is waiting in the window
	if(last_src_node_id != NODEID_NOID) {
		nodeinfo_t *nodeinfo_p = &nodeinfo[last_src_node_id];

		clustercmdqueuedpacket_t *clustercmdqueuedpacket_p = (clustercmdqueuedpacket_t *)
			g_hash_table_lookup(nodeinfo_p->serial2queuedpacket_ht, GINT_TO_POINTER(last_serial+1));

redmine authored
591 592
		if(clustercmdqueuedpacket_p != NULL)
			CLUSTER_RECV_RETURNMESSAGE(&clustercmdqueuedpacket_p->cmd);
redmine authored
593 594
	}

redmine authored
595 596 597
	// Checking if there any event on read socket
	//	select()
	struct timeval tv;
redmine authored
598

redmine authored
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615
	fd_set rfds;
	FD_ZERO(&rfds);
	FD_SET(sock, &rfds);

	tv.tv_sec  = timeout / 1000;
	tv.tv_usec = timeout % 1000;

	int selret = select(sock+1, &rfds, NULL, NULL, &tv);

	// 	Remembering the rest part of timeout
	if(timeout_p != NULL)
		*timeout_p = tv.tv_sec * 1000 + tv.tv_usec / 1000;

	//	processing select()'s retuned value
	if(selret <  0) {
		printf_e("Error: cluster_recv(): got error while select(): %s (errno: %i).\n", 
			strerror(errno), errno);
redmine authored
616
		return 0;
redmine authored
617
	}
redmine authored
618 619
	if(selret == 0) {
		printf_ddd("Debug: cluster_recv(): no new messages.\n");
redmine authored
620
		return 0;
redmine authored
621
	}
redmine authored
622
	printf_ddd("Debug: cluster_recv(): got new message(s).\n");
redmine authored
623

redmine authored
624 625 626 627 628
	// Reading new message's header
	clustercmdcrc32_t crc32;
	//clustercmd_t *clustercmd_p = (clustercmd_t *)mmap(NULL, sizeof(clustercmdhdr_t), PROT_NONE, 
	//	MAP_PRIVATE, sock, 0);
	int ret;
redmine authored
629
	if((ret=cluster_read(sock, (void *)clustercmd_p, sizeof(clustercmdhdr_t), CLREAD_NONE))) {
redmine authored
630
		if(ret == -1) return 0; // Invalid message? Skipping.
redmine authored
631

redmine authored
632 633 634 635 636
		printf_e("Error: cluster_recv(): Got error from cluster_read(): %s (errno %i).\n",
			strerror(errno), errno);
		errno = ret;
		return -1;
	}
redmine authored
637

redmine authored
638 639 640 641 642 643
	// Checking CRC32 of packet headers.
	clustercmd_crc32_calc(clustercmd_p, &crc32, CRC32_CALC_HEADER);
	if(crc32.hdr != clustercmd_p->h.crc32.hdr) {
		printf_d("Debug: cluster_recv(): hdr-CRC32 mismatch: %p != %p.\n", 
			(void*)(long)clustercmd_p->h.crc32.hdr, (void*)(long)crc32.hdr);

redmine authored
644
		if((ret=clustercmd_reject(clustercmd_p, REJ_CRC32MISMATCH)) != EADDRNOTAVAIL) {
redmine authored
645 646 647 648 649 650
			printf_e("Error: cluster_recv(): Got error while clustercmd_reject(): %s (errno: %i).\n", 
				strerror(ret), ret);
			errno = ret;
			return -1;
		}
	}
redmine authored
651

redmine authored
652 653 654
	// Checking src_node_id and dst_node_id
	uint8_t src_node_id = clustercmd_p->h.src_node_id;
	uint8_t dst_node_id = clustercmd_p->h.dst_node_id;
redmine authored
655

redmine authored
656 657 658 659 660 661 662
	// 	Packet from registering node?
	if(src_node_id == NODEID_NOID) {
		// 	Wrong command from registering node?
		if(clustercmd_p->h.cmd_id != CLUSTERCMDID_GETMYID) {
			printf_e("Warning: cluster_recv(): Got non getmyid packet from NOID node. Ignoring the packet.\n");
			return 0;
		}
redmine authored
663 664 665 666
		if(clustercmd_p->h.serial != 0) {
			printf_e("Warning: cluster_recv(): Got packet with non-zero serial from NOID node. Ignoring the packet.\n");
			return 0;
		}
redmine authored
667 668 669 670 671 672 673
	} else
	// 	Wrong src_node_id?
	if(src_node_id >= MAXNODES) {
		printf_e("Warning: cluster_recv(): Invalid h.src_node_id: %i >= "XTOSTR(MAXNODES)"\n",
			src_node_id);
		return 0;
	}
redmine authored
674

redmine authored
675 676 677 678 679 680 681 682 683
	// 	Is this broadcast message?
	if(dst_node_id == NODEID_NOID) {
		// CODE HERE
	} else
	//	Wrong dst_node_id?
	if(dst_node_id >= MAXNODES) {
		printf_e("Warning: cluster_recv(): Invalid h.dst_node_id: %i >= "XTOSTR(MAXNODES)"\n", 
			dst_node_id);
		return 0;
redmine authored
684 685
	}

redmine authored
686 687 688 689 690 691
	// Seems, that headers are correct. Continuing.
	printf_ddd("Debug3: cluster_recv(): Received: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u,"
		" crc32: %u, data_len: %u}, timeout: %u -> %u\n",
		dst_node_id, src_node_id, clustercmd_p->h.cmd_id, 
		clustercmd_p->h.crc32, clustercmd_p->h.data_len, *timeout_p, timeout);

redmine authored
692
	// Paranoid routines
redmine authored
693
	//	The message from us? Something wrong if it is.
redmine authored
694
#ifdef PARANOID
redmine authored
695
	if((clustercmd_p->h.src_node_id == node_id_my) && (node_id_my != NODEID_NOID)) {
redmine authored
696
#ifdef VERYPARANOID
redmine authored
697 698
		printf_e("Error: cluster_recv(): clustercmd_p->h.src_node_id == node_id_my (%i != %i)."
			" Exit.\n", clustercmd_p->h.src_node_id, node_id_my);
redmine authored
699 700
		return EINVAL;
#else
redmine authored
701 702
		printf_e("Warning: cluster_recv(): clustercmd_p->h.src_node_id == node_id_my (%i != %i)."
			" Ignoring the command.\n", clustercmd_p->h.src_node_id, node_id_my);
redmine authored
703 704 705 706 707
		clustercmd_p = NULL;
		return 0;
#endif
	}
#endif
redmine authored
708 709 710 711 712 713 714 715 716 717 718 719

	nodeinfo_t *nodeinfo_p = &nodeinfo[src_node_id];

	// Not actual packet?
	if(clustercmd_p->h.serial <= nodeinfo_p->last_serial) {
		printf_d("Debug: cluster_recv(): Ignoring packet from %i due to serial: %i <= %i\n", 
			src_node_id, clustercmd_p->h.serial, nodeinfo_p->last_serial);
		return 0;
	}

	// Is this misordered packet?
	if(clustercmd_p->h.serial != nodeinfo_p->last_serial + 1) {
redmine authored
720
		clustercmd_window_add(&window_i, clustercmd_p, nodeinfo_p->serial2queuedpacket_ht);
redmine authored
721 722 723 724
		return 0;
	}

	// Is this the end of packet (packet without data)
redmine authored
725 726
	if(clustercmd_p->h.data_len == 0)
		CLUSTER_RECV_RETURNMESSAGE(clustercmd_p);
redmine authored
727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742

	// Too big data?
	if(clustercmd_p->h.data_len > CLUSTER_PACKET_MAXSIZE) {
		printf_e("Warning: cluster_recv(): Got too big message from node %i. Ignoring it.\n",
			src_node_id);
		return 0;
	}

	// Incorrect size of data?
	if(clustercmd_p->h.data_len & 0x3) {
		printf_e("Warning: cluster_recv(): Received packet of size not a multiple of 4. Ignoring it.\n");
		return 0;
	}

	// Need more space for this packet?
	if(CLUSTERCMD_SIZE(*clustercmd_p) > size) {
redmine authored
743 744
		size   = CLUSTERCMD_SIZE(*clustercmd_p);
		clustercmd_p = (clustercmd_t *)xrealloc((char *)clustercmd_p, size);
redmine authored
745 746 747
	}

	// Reading the data
redmine authored
748
	if((ret=cluster_read(sock, (void *)clustercmd_p->data_p, clustercmd_p->h.data_len, CLREAD_CONTINUE))) {
redmine authored
749 750 751 752 753 754 755 756 757 758 759 760 761 762
		if(ret == -1) return 0;

		printf_e("Error: cluster_recv(): Got error from cluster_read(): %s (errno %i).\n", 
			strerror(errno), errno);
		errno = ret;
		return -1;
	}

	// Checking CRC32 of packet data.
	clustercmd_crc32_calc(clustercmd_p, &crc32, CRC32_CALC_DATA);
	if(crc32.dat != clustercmd_p->h.crc32.dat) {
		printf_d("Debug: cluster_recv(): dat-CRC32 mismatch: %p != %p.\n", 
			(void*)(long)clustercmd_p->h.crc32.dat, (void*)(long)crc32.dat);

redmine authored
763
		if((ret=clustercmd_reject(clustercmd_p, REJ_CRC32MISMATCH)) != EADDRNOTAVAIL) {
redmine authored
764 765 766 767 768 769 770
			printf_e("Error: cluster_recv(): Got error while clustercmd_reject(): %s (errno: %i).\n", 
				strerror(ret), ret);
			errno = ret;
			return -1;
		}
	}

redmine authored
771
	CLUSTER_RECV_RETURNMESSAGE(clustercmd_p);
redmine authored
772
}
redmine authored
773

redmine authored
774 775

/**
redmine authored
776 777 778 779 780 781 782 783 784
 * @brief 			(hsyncop) Reads messages for time "_timeout" and proceeding them to recvproc_funct[] functions
 * 
 * @param[in]	_timeout	How long to wait messages (totally)
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while reading or processing messages. The error-code is placed into returned value.
 * 
 */

redmine authored
785 786
 int cluster_recv_proc(unsigned int _timeout) {
	clustercmd_t *clustercmd_p;
redmine authored
787 788
	int ret;
	unsigned int timeout = _timeout;
redmine authored
789
	while((ret=cluster_recv(&clustercmd_p, &timeout))) {
redmine authored
790 791
		// Exit if error
		if(ret == -1) {
redmine authored
792 793
			printf_e("Error: cluster_recv_proc(): Got error while cluster_recv(): %s (%i).\n", 
				strerror(errno), errno);
redmine authored
794 795 796 797 798 799
			return errno;
		}

		// If we have appropriate callback function, then call it! :)
		if(recvproc_funct[clustercmd_p->h.cmd_id])
			if((ret=recvproc_funct[clustercmd_p->h.cmd_id](clustercmd_p))) {
redmine authored
800 801
				printf_e("Error: cluster_recv_proc(): Got error from recvproc_funct[%i]: %s (%i)\n", 
					clustercmd_p->h.cmd_id, strerror(ret), ret);
redmine authored
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824
				return ret;
			}
	}

	return 0;
}


/**
 * @brief 			recvproc-function for ACK-messages
 * 
 * @param[in] 	clustercmd_p 	Pointer to clustercmd
 * @param[in] 	arg_p		Pointer to argument
 *
 * @retval	zero 		Successfully initialized.
 * @retval	non-zero 	Got error, while initializing.
 * 
 */

static int cluster_recvproc_ack(clustercmd_t *clustercmd_p) {

	uint32_t cmd_serial_ack = clustercmd_p->data_ack.serial;

redmine authored
825
	clustercmdqueuedpacket_t *queuedpacket_p = 
redmine authored
826
		(clustercmdqueuedpacket_t *)g_hash_table_lookup(nodeinfo_my->serial2queuedpacket_ht, GINT_TO_POINTER(cmd_serial_ack));
redmine authored
827

redmine authored
828
	if(queuedpacket_p == NULL)
redmine authored
829 830 831 832
		return 0;

	uint8_t node_id_from = clustercmd_p->h.src_node_id;

redmine authored
833 834 835
	if(! queuedpacket_p->h.o.ack_from[node_id_from]) {
		queuedpacket_p->h.o.ack_count++;
		queuedpacket_p->h.o.ack_from[node_id_from]++;
redmine authored
836

redmine authored
837
		if(queuedpacket_p->h.o.ack_count == node_count-1)
redmine authored
838
			clustercmd_window_del(&window_o, queuedpacket_p, nodeinfo_my->serial2queuedpacket_ht);
redmine authored
839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855
	}

	return 0;
}


/**
 * @brief 			Sets message processing functions for cluster_recv_proc() function for specified command type
 * 
 * @param[in]	cmd_id		The command type
 * @param[in]	procfunct	The processing function for messages with specified cmd_id
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while setting processing function. The error-code is placed into returned value.
 * 
 */

redmine authored
856
int cluster_io_init() {
redmine authored
857 858 859 860 861 862 863 864 865 866 867 868 869
	cluster_recv_proc_set(CLUSTERCMDID_ACK, cluster_recvproc_ack);
	return 0;
}


/**
 * @brief 			Antagonist of cluster_recv_proc_init() function. Freeing everything what was allocated in cluster_recv_proc_init()
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
870 871 872 873 874 875 876 877 878 879
int cluster_io_deinit() {

	if(window_i.buf_size) {
#ifdef PARANOID
		if(window_i.buf == NULL) {
			printf_e("Error: cluster_recv_proc_deinit(): window_i.buf_size != 0, but window_i.buf == NULL.\n");
		} else
#endif
		free(window_i.buf);
	}
redmine authored
880

redmine authored
881
	if(window_o.buf_size) {
redmine authored
882
#ifdef PARANOID
redmine authored
883 884
		if(window_o.buf == NULL) {
			printf_e("Error: cluster_recv_proc_deinit(): window_o.buf_size != 0, but window_o.buf == NULL.\n");
redmine authored
885 886
		} else
#endif
redmine authored
887
		free(window_o.buf);
redmine authored
888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
	}

	return 0;
}

/**
 * @brief 			recvproc-function for setid-messages
 * 
 * @param[in] 	clustercmd_p 	Pointer to clustercmd
 * @param[in] 	arg_p		Pointer to argument
 *
 * @retval	zero 		Successfully initialized.
 * @retval	non-zero 	Got error, while initializing.
 * 
 */

static int cluster_recvproc_setid(clustercmd_t *clustercmd_p) {
	static time_t updatets = 0;

	// 	Is this the most recent information? Skipping if not.
	clustercmd_setiddata_t *data_setid_p = &clustercmd_p->data_setid;
	if(!(data_setid_p->updatets > updatets))
		return 0;

	// 	Is the node name length in message equals to our node name length? Skipping if not.
	uint32_t recv_nodename_len;
	recv_nodename_len = CLUSTER_RESTDATALEN(clustercmd_p, clustercmd_setiddata_t);
	if(recv_nodename_len != options_p->cluster_nodename_len)
		return 0;

	// 	Is the node name equals to ours? Skipping if not.
	if(memcmp(data_setid_p->node_name, options_p->cluster_nodename, recv_nodename_len))
		return 0;

	//	Remembering the node that answered us
	node_status_change(clustercmd_p->h.src_node_id, NODESTATUS_SEEMSONLINE);

	// 	Seems, that somebody knows our node id, remembering it.
	node_id_my  = clustercmd_p->h.dst_node_id;
	updatets    = data_setid_p->updatets;

	return 0;
}
redmine authored
931 932


redmine authored
933
extern int cluster_loop();
redmine authored
934
/**
redmine authored
935
 * @brief 			Initializes cluster subsystem.
redmine authored
936
 * 
redmine authored
937 938
 * @param[in] 	_options_p 	Pointer to "options" variable, defined in main().
 * @param[in] 	_indexes_p	Pointer to "indexes" variable, defined in sync_run().
redmine authored
939
 *
redmine authored
940 941
 * @retval	zero 		Successfully initialized.
 * @retval	non-zero 	Got error, while initializing.
redmine authored
942 943
 * 
 */
redmine authored
944 945 946 947

int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
	int ret;

redmine authored
948
	// Preventing double initializing
redmine authored
949 950 951 952 953
	if(options_p != NULL) {
		printf_e("Error: cluster_init(): cluster subsystem is already initialized.\n");
		return EALREADY;
	}

redmine authored
954 955 956 957
	// Initializing global variables, pt. 1
	options_p	= _options_p;
	indexes_p	= _indexes_p;
	cluster_timeout	= options_p->cluster_timeout * 1000;
redmine authored
958
	node_status_change(NODEID_NOID, NODESTATUS_ONLINE);
redmine authored
959

redmine authored
960 961 962 963 964
	// Initializing network routines
	sock = socket(AF_INET, SOCK_DGRAM, 0);

	int reuse = 1;
	if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,(char *)&reuse, sizeof(reuse)) < 0) {
redmine authored
965 966
		printf_e("Error: cluster_init(): Got error while setsockopt(): %s (errno: %i)\n", 
			strerror(errno), errno);
redmine authored
967 968 969 970
		return errno;
	}

	struct sockaddr_in sa = {0};
redmine authored
971
	(void)sa;	// Anti-warning
redmine authored
972 973 974 975

	sa.sin_family		= AF_INET;
	sa.sin_port 		= htons(options_p->cluster_mcastipport);
	sa.sin_addr.s_addr	= INADDR_ANY;
redmine authored
976

redmine authored
977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992
	if(bind(sock, (struct sockaddr*)&sa, sizeof(sa))) {
		printf_e("Error: cluster_init(): Got error while bind(): %s (errno: %i)\n", 
			strerror(errno), errno);
		return errno;
	}

	struct ip_mreq group;
	group.imr_interface.s_addr = inet_addr(options_p->cluster_iface);
	group.imr_multiaddr.s_addr = inet_addr(options_p->cluster_mcastipaddr);

	if(setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, 
				(char *)&group, sizeof(group)) < 0) {
		printf_e("Error: cluster_init(): Cannot setsockopt() to enter to membership %s -> %s\n",
			options_p->cluster_iface, options_p->cluster_mcastipaddr);
		return errno;
	}
redmine authored
993 994


redmine authored
995 996
	// Initializing another routines
	clustercmd_crc32_calc_init();
redmine authored
997
	cluster_io_init();
redmine authored
998

redmine authored
999 1000 1001
	// Getting my ID in the cluster

	//	Trying to preserve my node_id after restart. :)
redmine authored
1002
	//	Asking another nodes about my previous node_id
redmine authored
1003
	{
redmine authored
1004
		clustercmd_t *clustercmd_p = CLUSTER_ALLOCA(clustercmd_getmyid_t, options_p->cluster_nodename_len);
redmine authored
1005
		memcpy(clustercmd_p->data_getmyid.node_name, options_p->cluster_nodename, clustercmd_p->h.data_len+1);
redmine authored
1006

redmine authored
1007
		clustercmd_p->h.cmd_id = CLUSTERCMDID_GETMYID;
redmine authored
1008 1009
		cluster_send(clustercmd_p);
	}
redmine authored
1010

redmine authored
1011
	//	Processing answers
redmine authored
1012
	cluster_recv_proc_set(CLUSTERCMDID_SETID, cluster_recvproc_setid);
redmine authored
1013

redmine authored
1014 1015
	if((ret=cluster_recv_proc(cluster_timeout)))
		return ret;
redmine authored
1016

redmine authored
1017 1018
	printf_ddd("Debug3: cluster_init(): After communicating with others, my node_id is %i.\n", node_id_my);

redmine authored
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028
	//	Getting free node_id if nobody said us the certain value (see above).
	if(node_id_my == NODEID_NOID) {
		int i=0;
		while(i<MAXNODES) {
			if(nodeinfo[i].status == NODESTATUS_DOESNTEXIST) {
				node_id_my = i;
				break;
			}
			i++;
		}
redmine authored
1029
		printf_ddd("Debug3: cluster_init(): I was have to set my node_id to %i.\n", node_id_my);
redmine authored
1030 1031 1032 1033
	}

	//	If there's no free id-s, then exit :(
	if(node_id_my == NODEID_NOID) {
redmine authored
1034
		printf_e("Error: Cannot find free node ID. Seems, that all %i ID-s are already occupied.\n");
redmine authored
1035 1036 1037
		return ENOMEM;
	}

redmine authored
1038
	// Registering in the cluster
redmine authored
1039 1040 1041

	// 	Sending registration information
	node_status_change(node_id_my, NODESTATUS_SEEMSONLINE);
redmine authored
1042
	{
redmine authored
1043 1044
		clustercmd_t *clustercmd_p = CLUSTER_ALLOCA(clustercmd_register_t, options_p->cluster_nodename_len);
		clustercmd_register_t *data_register_p = &clustercmd_p->data_register;
redmine authored
1045

redmine authored
1046
		memcpy(data_register_p->node_name, options_p->cluster_nodename, options_p->cluster_nodename_len+1);
redmine authored
1047

redmine authored
1048 1049
		clustercmd_p->h.cmd_id = CLUSTERCMDID_REGISTER;
		cluster_send(clustercmd_p);
redmine authored
1050
	}
redmine authored
1051 1052 1053 1054 1055 1056

	// 	Getting answers
	if((ret=cluster_recv_proc(cluster_timeout)))
		return ret;

	node_status_change(node_id_my, NODESTATUS_ONLINE);
redmine authored
1057 1058 1059 1060 1061 1062 1063

	// Initializing global variables, pt. 2
	nodeinfo_my = &nodeinfo[node_id_my];


	// Running thread, that will process background communicating routines with another nodes.
	// The process is based on function cluster_loop() [let's use shorthand "cluster_loop()-thread"]
redmine authored
1064 1065 1066 1067 1068
	ret = pthread_create(&pthread_cluster, NULL, (void *(*)(void *))cluster_loop, NULL);

	return ret;
}

redmine authored
1069
/**
redmine authored
1070
 * @brief 			(syncop) Sends signal to cluster_loop()-thread
redmine authored
1071 1072 1073 1074 1075 1076 1077 1078
 * 
 * @param[in] 	signal 		Signal number
 *
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1079
static inline int cluster_signal(int signal) {
redmine authored
1080 1081 1082
	return pthread_kill(pthread_cluster, signal);
}

redmine authored
1083 1084

/**
redmine authored
1085
 * @brief 			Antagonist of cluster_init() function. Kills cluster_loop()-thread and cleaning up
redmine authored
1086 1087 1088 1089 1090 1091
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1092
int cluster_deinit() {
redmine authored
1093 1094
	int ret = 0;

redmine authored
1095 1096
	cluster_signal(SIGTERM);

redmine authored
1097 1098
	ret = pthread_join(pthread_cluster, NULL);

redmine authored
1099
	cluster_io_deinit();
redmine authored
1100

redmine authored
1101
	node_status_change(NODEID_NOID, NODESTATUS_DOESNTEXIST);
redmine authored
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113
#ifdef VERYPARANOID
	int i=0;
#endif
	while(node_count) {
#ifdef VERYPARANOID
		if(i++ > NODES_MAX) {
			printf_e("Error: cluster_deinit() looped. Forcing break.");
			break;
		}
#endif
		node_status_change(0, NODESTATUS_DOESNTEXIST);
	}
redmine authored
1114

redmine authored
1115 1116
	close(sock);

redmine authored
1117 1118 1119 1120 1121 1122
#ifdef VERYPARANOID
	memset(node_info, 0, sizeof(node_info));
	node_count  = 0;
	node_online = 0;
	node_id_my  = NODEID_NOID;
#endif
redmine authored
1123

redmine authored
1124
	return ret;
redmine authored
1125 1126
}

redmine authored
1127

redmine authored
1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
/**
 * @brief 			(syncop) Forces anothes nodes to ignore events about the file or directory
 * 
 * @param[in] 	fpath 		Path to the file or directory
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1138
int cluster_lock(const char *fpath) {
redmine authored
1139 1140 1141
	return 0;
}

redmine authored
1142 1143 1144 1145 1146 1147 1148 1149 1150

/**
 * @brief 			(syncop) Forces anothes nodes to ignore events about all files and directories listed in queues of "indexes_p"
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1151 1152 1153
int cluster_lock_byindexes() {
	return 0;
}
redmine authored
1154

redmine authored
1155 1156 1157 1158 1159 1160 1161 1162 1163

/**
 * @brief 			(syncop) Returns events-handling on another nodes about all files and directories, locked by cluster_lock() and cluster_lock_byindexes() from this node
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1164
int cluster_unlock_all() {
redmine authored
1165 1166 1167
	return 0;
}

redmine authored
1168

redmine authored
1169 1170 1171 1172 1173 1174 1175 1176
#define CLUSTER_LOOP_CHECK(a) {\
	int ret = a;\
	if(ret) {\
		sync_term(ret);\
		return ret;\
	}\
}

redmine authored
1177
/**
redmine authored
1178
 * @brief 			Processes background communicating routines with another nodes. cluster_init() function create a thread for this function.
redmine authored
1179 1180 1181 1182 1183 1184
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1185
int cluster_loop() {
redmine authored
1186
	sigset_t sigset_cluster;
redmine authored
1187 1188 1189

	// Ignoring SIGINT signal

redmine authored
1190 1191 1192 1193
	sigemptyset(&sigset_cluster);
	sigaddset(&sigset_cluster, SIGINT);
	CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_BLOCK, &sigset_cluster, NULL));

redmine authored
1194 1195
	// Don't ignoring SIGTERM signal

redmine authored
1196 1197 1198 1199
	sigemptyset(&sigset_cluster);
	sigaddset(&sigset_cluster, SIGTERM);
	CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_UNBLOCK, &sigset_cluster, NULL));

redmine authored
1200 1201
	// Starting the loop

redmine authored
1202
	while(1) {
redmine authored
1203
		// Breaking the loop, if there's SIGTERM signal for this thread
redmine authored
1204 1205 1206 1207 1208 1209 1210
		if(sigpending(&sigset_cluster))
			if(sigismember(&sigset_cluster, SIGTERM))
				break;

		// LISTENING
	}

redmine authored
1211
	return 0;
redmine authored
1212 1213 1214 1215 1216 1217 1218 1219 1220
#ifdef DOXYGEN
	sync_term(0);
#endif
}


/**
 * @brief 			Updating information about modification time of a directory.
 * 
redmine authored
1221 1222 1223
 * @param[in]	path		Canonized path to updated file/dir
 * @param[in]	dirlevel	Directory level provided by fts (man 3 fts)
 * @param[in]	st_mode		st_mode value to detect is it directory or not (S_IFDIR or not)
redmine authored
1224 1225 1226 1227 1228 1229
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1230
int cluster_modtime_update(const char *path, short int dirlevel, mode_t st_mode) {
redmine authored
1231 1232 1233
	// "modtime" is incorrent name-part of function. Actually it updates "change time" (man 2 lstat64).
	int ret;

redmine authored
1234
	// Getting relative directory level (depth)
redmine authored
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245
	short int dirlevel_rel = dirlevel - options_p->watchdir_dirlevel;

	if((st_mode & S_IFMT) == S_IFDIR)
		dirlevel_rel++;

	// Don't remembering information about directories with level beyond the limits
	if((dirlevel_rel > options_p->cluster_scan_dl_max) || (dirlevel_rel < options_p->cluster_hash_dl_min))
		return 0;


	// Getting directory/file-'s information (including "change time" aka "st_ctime")
redmine authored
1246
	struct stat64 stat64;
redmine authored
1247
	ret=lstat64(path, &stat64);
redmine authored
1248
	if(ret) {
redmine authored
1249
		printf_e("Error: cluster_modtime_update() cannot lstat64() on \"%s\": %s (errno: %i)\n", path, strerror(errno), errno);
redmine authored
1250 1251 1252
		return errno;
	}

redmine authored
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
	// Getting absolute directory path
	const char *dirpath;
	if((st_mode & S_IFMT) == S_IFDIR) {
		dirpath = path;
	} else {
		char *path_dup = strdup(path);
		dirpath = (const char *)dirname(path_dup);
		free(path_dup);
	}

	// Getting relative directory path
redmine authored
1264
	//	Initializing
redmine authored
1265 1266 1267 1268
	size_t  dirpath_len   = strlen(dirpath);
	char   *dirpath_rel_p = xmalloc(dirpath_len+1);
	char   *dirpath_rel   = dirpath_rel_p;

redmine authored
1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282
	const char *dirpath_rel_full     = &dirpath[options_p->watchdirlen];
	size_t      dirpath_rel_full_len = dirpath_len - options_p->watchdirlen;

	// 	Getting coodinate of the end (directory path is already canonized, so we can simply count number of slashes to get directory level)
	int     slashcount=0;
	size_t  dirpath_rel_end=0;
	while(dirpath_rel_full[dirpath_rel_end] && (dirpath_rel_end < dirpath_rel_full_len)) {
		if(dirpath_rel_full[dirpath_rel_end] == '/') {
			slashcount++;
			if(slashcount >= options_p->cluster_hash_dl_max)
				break;
		}
		dirpath_rel_end++;
	}
redmine authored
1283

redmine authored
1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
	//	Copy the required part of path to dirpath_rel
	memcpy(dirpath_rel, dirpath_rel_full, dirpath_rel_end);

	
	// Updating "st_ctime" information. We should check current value for this directory and update it only if it less or not set.
	//	Checking current value
	char toupdate = 0;
	gpointer ctime_gp = g_hash_table_lookup(nodeinfo_my->modtime_ht, dirpath_rel);
	if(ctime_gp == NULL)
		toupdate++;
	else if(GPOINTER_TO_INT(ctime_gp) < stat64.st_ctime)
		toupdate++;

	//	g_hash_table_replace() will replace existent information about the directory or create it if it doesn't exist.
	if(toupdate)
		g_hash_table_replace(nodeinfo_my->modtime_ht, strdup(dirpath_rel), GINT_TO_POINTER(stat64.st_ctime));
redmine authored
1300 1301

	// Why I'm using "st_ctime" instead of "st_mtime"? Because "st_ctime" also updates on updating inode information.
redmine authored
1302 1303
	
	return 0;
redmine authored
1304 1305 1306
}


redmine authored
1307
/**
redmine authored
1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321
 * @brief 			(syncop) Exchanging with "modtime_ht"-s to be able to compare them.
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

int cluster_modtime_exchange() {

	return 0;
}


/**
redmine authored
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331
 * @brief 			(syncop) Syncing file tree with another nodes with using of directories' modification time as a recent-detector.
 * 
 * @param[in] 	dirpath		Path to the directory
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

int cluster_initialsync() {
redmine authored
1332 1333
	cluster_modtime_exchange();

redmine authored
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
	return 0;
}


/**
 * @brief 			(syncop) "Captures" right to update the file or directory to another nodes. It just removes events about the file of directory from another nodes
 * 
 * @param[in] 	dirpath		Path to the directory
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

int cluster_capture(const char *path) {
	return 0;
}

redmine authored
1352 1353
#endif