Blame view

cluster.c 48 KB
redmine authored
1
/*
redmine authored
2
    clsync - file tree sync utility based on inotify
redmine authored
3
    
redmine authored
4
    Copyright (C) 2013  Dmitry Yu Okunev <dyokunev@ut.mephi.ru> 0x8E30679C
redmine authored
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
    
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

redmine authored
20 21 22
/*
    Hello, dear developer.

redmine authored
23
    You can ask me directly by e-mail or IRC, if something seems too
redmine authored
24 25
    hard.

redmine authored
26 27 28
                                                           -- 0x8E30679C
 */

redmine authored
29

redmine authored
30
#ifdef CLUSTER_SUPPORT
redmine authored
31

redmine authored
32
#include "common.h"
redmine authored
33
#include "indexes.h"
redmine authored
34
#include "error.h"
redmine authored
35
#include "cluster.h"
redmine authored
36
#include "sync.h"
redmine authored
37
#include "calc.h"
redmine authored
38
#include "malloc.h"
redmine authored
39

redmine authored
40 41
// Global variables. They will be initialized in cluster_init()

redmine authored
42
#define CLUSTER_RECV_PROC_ERRLIMIT (1<<8)
redmine authored
43 44
#define NODES_ALLOC (MAX(MAXNODES, NODEID_NOID)+1)

redmine authored
45

redmine authored
46 47 48 49 50
int sock_i			= -1;
struct sockaddr_in sa_i		= {0};

int sock_o			= -1;
struct sockaddr_in sa_o		= {0};
redmine authored
51

redmine authored
52
ctx_t  *ctx_p		= NULL;
redmine authored
53
indexes_t  *indexes_p		= NULL;
redmine authored
54
pthread_t   pthread_cluster	= 0;
redmine authored
55

redmine authored
56
nodeinfo_t nodeinfo[NODES_ALLOC]= {{0}};
redmine authored
57

redmine authored
58 59
nodeinfo_t *nodeinfo_my				= NULL;
uint8_t	node_id_my				= NODEID_NOID;
redmine authored
60
uint8_t node_ids[NODES_ALLOC]			= {0};
redmine authored
61 62 63
unsigned int cluster_timeout			= 0;
uint8_t node_count				= 0;
uint8_t node_online				= 0;
redmine authored
64

redmine authored
65 66
cluster_recvproc_funct_t recvproc_funct[COUNT_CLUSTERCMDID] = {NULL};

redmine authored
67 68
window_t window_i = {0};
window_t window_o = {0};
redmine authored
69 70

/**
redmine authored
71
 * @brief 			Adds command (message) to window_p->buffer
redmine authored
72 73 74 75 76 77 78 79
 * 
 * @param[in]	clustercmd_p	Pointer to cluster cmd to put into window
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while deleting the message. The error-code is placed into returned value.
 * 
 */

redmine authored
80 81
static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *clustercmd_p, GHashTable *serial2queuedpacket_ht) {
#ifdef PARANOID
redmine authored
82
	if (clustercmd_p->h.src_node_id >= MAXNODES) {
redmine authored
83
		error("Invalid src_node_id: %i.", clustercmd_p->h.src_node_id);
redmine authored
84 85 86
		return EINVAL;
	}
#endif
redmine authored
87

redmine authored
88
	// Checking if there enough window_p->cells allocated
redmine authored
89
	if (window_p->packets_len >= window_p->size) {
redmine authored
90
		window_p->size 		+= ALLOC_PORTION;
redmine authored
91 92 93 94

#		define CXREALLOC(a, size) \
			(typeof(a))xrealloc((char *)(a), (size_t)(size) * sizeof(*(a)))
	
redmine authored
95 96
		window_p->packets_id     = CXREALLOC(window_p->packets_id,	window_p->size);
		window_p->occupied_sides = CXREALLOC(window_p->occupied_sides,	window_p->size);
redmine authored
97 98 99 100
#		undef CXREALLOC
	}

	// Calculating required memory space in buffer for the message
redmine authored
101
	size_t clustercmd_size = CLUSTERCMD_SIZE(clustercmd_p);
redmine authored
102
	size_t required_space  = sizeof(clustercmdqueuedpackethdr_t) + clustercmd_size;
redmine authored
103

redmine authored
104
	// Searching occupied boundaries in the window_p->buffer
redmine authored
105
	size_t occupied_left = SIZE_MAX, occupied_right=0;
redmine authored
106 107
	int i;
	i = 0;
redmine authored
108
	while (i < window_p->packets_len) {
redmine authored
109
		unsigned int window_id;
redmine authored
110
		window_id  = window_p->packets_id[ i++ ];
redmine authored
111

redmine authored
112 113
		occupied_left  = MIN(occupied_left,  window_p->occupied_sides[window_id].left);
		occupied_right = MAX(occupied_right, window_p->occupied_sides[window_id].right);
redmine authored
114 115
	}

redmine authored
116
	debug(3, "w.size == %u, b_left == %u; b_right == %u; w.buf_size == %u; r_space == %u",
redmine authored
117
		window_p->size, occupied_left, occupied_right, window_p->buf_size, required_space);
redmine authored
118 119 120

	// Trying to find a space in the buffer to place message
	size_t buf_coordinate = SIZE_MAX;
redmine authored
121
	if (window_p->packets_len) {
redmine authored
122 123 124
		// Free space from left  (start of buffer)
		size_t free_left  = occupied_left;
		// Free space from right (end of buffer)
redmine authored
125
		size_t free_right = window_p->buf_size - occupied_right;
redmine authored
126

redmine authored
127
		if (free_left  > required_space)
redmine authored
128 129
			buf_coordinate = free_left - required_space;
		else
redmine authored
130
		if (free_right > required_space)
redmine authored
131 132 133
			buf_coordinate   = occupied_right;
		else
		{
redmine authored
134 135 136 137
			// Not enough space in the window_p->buffer;
			window_p->buf_size += MAX(CLUSTER_WINDOW_BUFSIZE_PORTION, required_space);
			window_p->buf	    = xrealloc(window_p->buf, window_p->buf_size);
			buf_coordinate      = occupied_right;
redmine authored
138
		}
redmine authored
139
		debug(3, "f_left == %u; f_right == %u; b_coord == %u; w.buf_size == %u",
redmine authored
140
			free_left, free_right, buf_coordinate, window_p->buf_size);
redmine authored
141 142
	} else {
		buf_coordinate = 0;
redmine authored
143
		if (window_p->buf_size <= required_space) {
redmine authored
144 145 146
			window_p->buf_size += MAX(CLUSTER_WINDOW_BUFSIZE_PORTION, required_space);
			window_p->buf	    = xrealloc(window_p->buf, window_p->buf_size);
		}
redmine authored
147 148
	}

redmine authored
149
	unsigned int window_id;
redmine authored
150

redmine authored
151
	// packet id in window
redmine authored
152
	window_id = window_p->packets_len;
redmine authored
153

redmine authored
154
	// reserving the space in buffer
redmine authored
155 156
	window_p->occupied_sides[window_id].left  = buf_coordinate;
	window_p->occupied_sides[window_id].right = buf_coordinate + required_space;
redmine authored
157

redmine authored
158
	// placing information into buffer
redmine authored
159
	clustercmdqueuedpacket_t *queuedpacket_p;
redmine authored
160

redmine authored
161
	debug(3, "b_coord == %u", buf_coordinate);
redmine authored
162
	queuedpacket_p = (clustercmdqueuedpacket_t *)&window_p->buf[buf_coordinate];
redmine authored
163

redmine authored
164 165
	memset(&queuedpacket_p->h,	0,		sizeof(queuedpacket_p->h));
	memcpy(&queuedpacket_p->cmd, 	clustercmd_p, 	clustercmd_size);
redmine authored
166

redmine authored
167
	queuedpacket_p->h.window_id  = window_id;
redmine authored
168 169

	// remembering new packet
redmine authored
170
	g_hash_table_insert(serial2queuedpacket_ht, GINT_TO_POINTER(clustercmd_p->h.serial), queuedpacket_p);
redmine authored
171
	window_p->packets_id[window_p->packets_len++] = window_id;
redmine authored
172 173 174 175 176 177

	return 0;
}


/**
redmine authored
178
 * @brief 			Removes command (message) from window_p->buffer
redmine authored
179
 * 
redmine authored
180
 * @param[in]	queuedpacket_p	Pointer to queuedpacket structure of the command (message)
redmine authored
181 182 183 184 185 186
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while deleting the message. The error-code is placed into returned value.
 * 
 */

redmine authored
187
static inline int clustercmd_window_del(window_t *window_p, clustercmdqueuedpacket_t *queuedpacket_p, GHashTable *serial2queuedpacket_ht) {
redmine authored
188
#ifdef PARANOID
redmine authored
189
	if(!window_p->size) {
redmine authored
190
		error("window not allocated.");
redmine authored
191 192
		return EINVAL;
	}
redmine authored
193
	if(!window_p->packets_len) {
redmine authored
194
		error("there already no packets in the window.");
redmine authored
195 196 197 198
		return EINVAL;
	}
#endif

redmine authored
199 200
	unsigned int window_id_del  =  queuedpacket_p->h.window_id;
	unsigned int window_id_last = --window_p->packets_len;
redmine authored
201

redmine authored
202
	// Forgeting the packet
redmine authored
203

redmine authored
204
	// 	Moving the last packet into place of deleting packet, to free the tail in "window_p->packets_id" and "window_p->occupied_sides"
redmine authored
205
	if(window_id_del != window_id_last) {
redmine authored
206
		debug(3, "%i -> %i", window_id_last, window_id_del);
redmine authored
207

redmine authored
208
		window_p->packets_id[window_id_del] = window_p->packets_id[window_id_last];
redmine authored
209

redmine authored
210
		memcpy(&window_p->occupied_sides[window_id_del], &window_p->occupied_sides[window_id_last], sizeof(window_p->occupied_sides[window_id_del]));
redmine authored
211 212
	}

redmine authored
213
	// 	Removing from hash table
redmine authored
214
	g_hash_table_remove(serial2queuedpacket_ht, GINT_TO_POINTER(queuedpacket_p->cmd.h.serial));
redmine authored
215 216 217 218

	return 0;
}

redmine authored
219

redmine authored
220
/**
redmine authored
221
 * @brief 			Calculates Adler32 for clustercmd
redmine authored
222 223
 * 
 * @param[in]	clustercmd_p	Pointer to clustercmd
redmine authored
224
 * @param[out]	clustercmdadler32_p Pointer to structure to return value(s)
redmine authored
225
 * 
redmine authored
226 227
 * @retval	zero		On successful calculation
 * @retval	non-zero	On error. Error-code is placed into returned value.
redmine authored
228 229 230
 * 
 */

redmine authored
231
int clustercmd_adler32_calc(clustercmd_t *clustercmd_p, clustercmdadler32_t *clustercmdadler32_p, adler32_calc_t flags) {
redmine authored
232
	debug(15, "(%p, %p, 0x%x)", clustercmd_p, clustercmdadler32_p, flags);
redmine authored
233

redmine authored
234
	if (flags & ADLER32_CALC_DATA) {
redmine authored
235
		uint32_t adler32;
redmine authored
236

redmine authored
237 238
		uint32_t size = clustercmd_p->h.data_len;
		char    *ptr  = clustercmd_p->data.p;
redmine authored
239 240

		// Calculating
redmine authored
241 242
		adler32 = adler32_calc((unsigned char *)ptr, CLUSTER_PAD(size));
		debug(20, "dat: 0x%x", adler32);
redmine authored
243 244

		// Ending
redmine authored
245
		clustercmdadler32_p->dat = adler32 ^ 0xFFFFFFFF;
redmine authored
246
	}
redmine authored
247

redmine authored
248
	if (flags & ADLER32_CALC_HEADER) {
redmine authored
249
		uint32_t adler32;
redmine authored
250
		clustercmdadler32_t adler32_save;
redmine authored
251

redmine authored
252 253 254 255
		// Preparing
		memcpy(&adler32_save.hdr,		&clustercmd_p->h.adler32.hdr,	sizeof(clustercmd_p->h.adler32.hdr));
		memset(&clustercmd_p->h.adler32.hdr, 	0,				sizeof(clustercmd_p->h.adler32.hdr));
		adler32 = 0xFFFFFFFF;
redmine authored
256

redmine authored
257 258
		uint32_t size = sizeof(clustercmdhdr_t);
		char    *ptr  = (char *)&clustercmd_p->h;
redmine authored
259 260

		// Calculating
Andrew Savchenko authored
261
		adler32 = adler32_calc((unsigned char *)ptr, size);
redmine authored
262
		debug(20, "hdr: 0x%x", adler32);
redmine authored
263 264

		// Ending
redmine authored
265 266
		memcpy(&clustercmd_p->h.adler32.hdr, &adler32_save.hdr, sizeof(clustercmd_p->h.adler32.hdr));
		clustercmdadler32_p->hdr = adler32 ^ 0xFFFFFFFF;
redmine authored
267
	}
redmine authored
268

redmine authored
269
	return 0;
redmine authored
270
}
redmine authored
271

redmine authored
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331

/**
 * @brief 			Sends message to another nodes of the cluster.
 * 
 * @param[in]	clustercmd_p	Command structure pointer.
 *
 * @retval	zero 		Successfully send.
 * @retval	non-zero 	Got error, while sending.
 * 
 */

int _cluster_send(clustercmd_t *clustercmd_p) {
	debug(10, "{h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, data_len: %u}",
		clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id,
		clustercmd_p->h.data_len);

	clustercmd_p->h.src_node_id = node_id_my;
	SAFE (clustercmd_adler32_calc(clustercmd_p, &clustercmd_p->h.adler32, ADLER32_CALC_ALL), return _SAFE_rc );

	debug(3, "Sending: "
		"{h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, adler32.hdr: %p, adler32.dat: %p, data_len: %u}",
		clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id,
		(void *)(long)clustercmd_p->h.adler32.hdr, (void *)(long)clustercmd_p->h.adler32.dat,
		clustercmd_p->h.data_len);

	nodeinfo_t *nodeinfo_p;
	nodeinfo_p = &nodeinfo[clustercmd_p->h.dst_node_id];

	// Checking if the node online
	switch (nodeinfo_p->status) {
		case NODESTATUS_DOESNTEXIST:
		case NODESTATUS_OFFLINE:
			debug(1, "There's no online node with id %u. Skipping sending.", clustercmd_p->h.dst_node_id);
			return EADDRNOTAVAIL;
		default:
			break;
	}

	// Putting the message into an output window
	if (nodeinfo_my != NULL)
		clustercmd_window_add(&window_o, clustercmd_p, nodeinfo_my->serial2queuedpacket_ht);

	// Sending the message
	debug(10, "sendto(%p, %p, %i, 0, %p, %i)", sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED(clustercmd_p), &sa_o, sizeof(sa_o));
	debug(50, "clustercmd_p->data.p[0] == 0x%x", clustercmd_p->data.p[0]);
	critical_on (sendto(sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED(clustercmd_p), 0, &sa_o, sizeof(sa_o)) == -1);

	// Finishing
	return 0;
}

static inline int cluster_send(clustercmd_t *clustercmd_p) {
	int rc;

	rc = _cluster_send(clustercmd_p);
	debug(10, "__________________sent___________________ %i", rc);

	return rc;
}

redmine authored
332
/**
redmine authored
333
 * @brief 			Changes information about node's status in nodeinfo[] and updates connected information.
redmine authored
334
 * 
redmine authored
335 336
 * @param[in]	node_id		node_id of the node.
 * @param[in]	node_status	New node status.
redmine authored
337 338 339
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while changing the status. The error-code is placed into returned value.
redmine authored
340 341 342
 * 
 */

redmine authored
343 344 345
int node_update_status(uint8_t node_id, uint8_t node_status) {
	debug(3, "%i, %i", node_id, node_status);

redmine authored
346
	uint8_t node_status_old = nodeinfo[node_id].status;
redmine authored
347 348
	nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];

redmine authored
349
	if ((node_status == NODESTATUS_DOESNTEXIST) && (node_status_old != NODESTATUS_DOESNTEXIST)) {
redmine authored
350 351 352 353
		node_count--;

		node_ids[nodeinfo_p->num] = node_ids[node_count];
		g_hash_table_destroy(nodeinfo_p->modtime_ht);
redmine authored
354 355
		g_hash_table_destroy(nodeinfo_p->serial2queuedpacket_ht);

redmine authored
356
#ifdef VERYPARANOID
redmine authored
357
		memset(nodeinfo_p, 0, sizeof(*nodeinfo_p));
redmine authored
358
#endif
redmine authored
359 360 361
		return 0;
	}

redmine authored
362
	if (node_status == node_status_old)
redmine authored
363
		return 0;
redmine authored
364

redmine authored
365
	switch (node_status_old) {
redmine authored
366
		case NODESTATUS_DOESNTEXIST:
redmine authored
367 368 369
			nodeinfo_p->id			   = node_id;
			nodeinfo_p->num			   = node_count;
			nodeinfo_p->last_serial		   = -1;
redmine authored
370 371 372
			nodeinfo_p->modtime_ht 		   = g_hash_table_new_full(g_str_hash,	  g_str_equal,	  free, 0);
			nodeinfo_p->serial2queuedpacket_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0,    0);

redmine authored
373 374
			node_ids[node_count] = node_id;

redmine authored
375
			node_count++;
redmine authored
376
#ifdef PARANOID
redmine authored
377
			if (node_status == NODESTATUS_OFFLINE)
redmine authored
378
				break; // In case of NODESTATUS_DOESNTEXIST -> NODESTATUS_OFFLINE, node_online should be increased
redmine authored
379
#endif
redmine authored
380
		case NODESTATUS_OFFLINE:
redmine authored
381
			nodeinfo_p->last_serial		   = -1;
redmine authored
382 383 384
			node_online++;
			break;
		default:
redmine authored
385
			if (node_status == NODESTATUS_OFFLINE)
redmine authored
386 387 388
				node_online--;
			break;
	}
redmine authored
389

redmine authored
390
	nodeinfo[node_id].status = node_status;
redmine authored
391 392 393 394

	return 0;
}

redmine authored
395 396

/**
redmine authored
397
 * @brief 			Changes information about node's name and updates connected information.
redmine authored
398
 * 
redmine authored
399 400 401 402 403
 * @param[in]	node_id		node_id of the node.
 * @param[in]	node_name	New node name
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while changing the status. The error-code is placed into returned value.
redmine authored
404 405 406
 * 
 */

redmine authored
407 408
int node_update_name(uint8_t node_id, const char *node_name) {
	debug(3, "%i, \"%s\"", node_id, node_name);
redmine authored
409

redmine authored
410 411 412 413 414 415 416 417 418 419 420 421
	nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];
	void *ret = g_hash_table_lookup(indexes_p->nodenames_ht, node_name);

	if (ret != NULL) {
		int rc;
		uint8_t node_old_id = GPOINTER_TO_INT(ret);
		nodeinfo_t *nodeinfo_old_p = &nodeinfo[node_old_id];
		debug(5, "nodename \"%s\" had been used by node_id == %i", node_name, node_old_id);
		if (node_old_id == node_id) {
			debug(10, "node_old_id [%i] == node_id [%i]", node_old_id, node_id);
			return 0;
		}
redmine authored
422

redmine authored
423 424
		{
			debug(15, "Sending a DIE command to node_id (%i)", node_old_id);
redmine authored
425

redmine authored
426 427 428 429 430 431 432 433 434 435
			clustercmd_t *clustercmd_p  = CLUSTER_ALLOCA(clustercmd_die_t, 0);
			clustercmd_p->h.cmd_id      = CLUSTERCMDID_DIE;
			clustercmd_p->h.dst_node_id = node_old_id;
			if ((rc = cluster_send(clustercmd_p)))
				return rc;
		}

		nodeinfo_old_p->node_name = NULL;
		debug(3, "changing status of node_id == %i to NODESTATUS_OFFLINE", node_old_id);
		node_update_status(node_old_id, NODESTATUS_OFFLINE);
redmine authored
436
	}
redmine authored
437

redmine authored
438 439 440
	char *node_name_dup = strdup(node_name);
	nodeinfo_p->node_name = node_name_dup;
	g_hash_table_replace(indexes_p->nodenames_ht, node_name_dup, GINT_TO_POINTER(node_id));
redmine authored
441 442 443 444 445

	return 0;
}

/**
redmine authored
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
 * @brief 			Sets message processing functions for cluster_recv_proc() function for specified command type
 * 
 * @param[in]	cmd_id		The command type
 * @param[in]	procfunct	The processing function for messages with specified cmd_id
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while setting processing function. The error-code is placed into returned value.
 * 
 */

static inline int cluster_recv_proc_set(clustercmd_id_t cmd_id, cluster_recvproc_funct_t procfunct) {
	recvproc_funct[cmd_id] = procfunct;

	return 0;
}


redmine authored
463
/**
redmine authored
464
 * @brief 			Safe wrapper for recvfrom() function
redmine authored
465 466
 * 
 * @param[in]	sock		The socket descriptor
redmine authored
467 468
 * @param[out]	_buf_p		Pointer to pointer to buffer 
 * @param[in]	flags		Flags...
redmine authored
469 470 471 472 473 474
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
 * 
 */

redmine authored
475 476 477
static inline int cluster_read(int sock, clustercmd_t **cmd_pp, cluster_read_flags_t flags) {
	static char buf[CLUSTER_PACKET_MAXSIZE];
	static clustercmd_t *cmd_p = (void *)buf;
redmine authored
478 479
	struct sockaddr_in sa_in;
	size_t sa_in_len = sizeof(sa_in);
redmine authored
480

redmine authored
481
	*cmd_pp = cmd_p;
redmine authored
482

redmine authored
483
	debug(20, "%i, %p, 0x%x", sock, buf, flags);
redmine authored
484

redmine authored
485 486
	int readret = recvfrom(sock, buf, CLUSTER_PACKET_MAXSIZE, MSG_WAITALL, (struct sockaddr *)&sa_in, (socklen_t * restrict)&sa_in_len);
	debug(30, "recvfrom(%i, %p, 0x%x, %p, %p) -> %i", sock, buf, MSG_WAITALL, &sa_in, &sa_in_len, readret);
redmine authored
487
#ifdef PARANOID
redmine authored
488
	if (!readret) {
redmine authored
489
		error("recvfrom() returned 0. This shouldn't happend. Exit.");
redmine authored
490 491 492
		return EINVAL;
	}
#endif
redmine authored
493
	if (readret < 0) {
redmine authored
494 495 496
		error("recvfrom() returned %i. "
			"Seems, that something wrong with network socket.", 
			readret);
redmine authored
497 498 499
		return errno != -1 ? errno : -2;
	}

redmine authored
500
	debug(2, "Got message from %s (len: %i).", inet_ntoa(sa_in.sin_addr), readret);
redmine authored
501

redmine authored
502
	if (readret < sizeof(clustercmdhdr_t)) {
redmine authored
503
		// Too short message
redmine authored
504
		error("Warning: cluster_read(): Got too short message from node (no header [or too short]). Ignoring it.");
redmine authored
505 506 507
		return -1;
	}

redmine authored
508 509 510 511 512 513 514
	if (readret < CLUSTERCMD_SIZE(cmd_p)) {
		// Too short message
		error("Warning: cluster_read(): Got too short message from node (no data [or too short]). Ignoring it.");
		return -1;
	}

	// Incorrect size?
redmine authored
515
	if (readret & 0x3) {
redmine authored
516
		error("Warning: cluster_recv(): Received packet of size %i (not a multiple of 4). Ignoring it.", readret);
redmine authored
517 518 519
		return 0;
	}

redmine authored
520 521 522 523 524 525 526
	return 0;
}


/**
 * @brief 			Sends packet-reject notification
 * 
redmine authored
527 528
 * @param[in]	clustercmd_p	Pointer to clustercmd that will be rejected
 * @param[in]	reason		Reason why the clustercmd is denied
redmine authored
529 530 531 532 533 534 535 536 537
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
 * 
 */

static inline int clustercmd_reject(clustercmd_t *clustercmd_p, uint8_t reason) {
	clustercmd_t *clustercmd_rej_p 		= CLUSTER_ALLOCA(clustercmd_rej_t, 0);
	clustercmd_rej_p->h.dst_node_id 	= clustercmd_p->h.src_node_id;
redmine authored
538 539
	clustercmd_rej_p->data.rej.serial 	= clustercmd_p->h.serial;
	clustercmd_rej_p->data.rej.reason 	= reason;
redmine authored
540 541 542 543

	return cluster_send(clustercmd_rej_p);
}

redmine authored
544 545 546 547 548 549 550 551

#define CLUSTER_RECV_RETURNMESSAGE(clustercmd_p) {\
		last_serial      = (clustercmd_p)->h.serial;\
		last_src_node_id = (clustercmd_p)->h.src_node_id;\
		if(clustercmd_pp != NULL)\
			*clustercmd_pp = (clustercmd_p);\
		return 1;\
}
redmine authored
552
/**
redmine authored
553
 * @brief 			Receives message from another nodes of the cluster. (not thread-safe)
redmine authored
554
 * 
redmine authored
555
 * @param[out]	clustercmd_pp	Pointer to command structure pointer. It will be re-allocated every time when size is not enough. Allocated space will be reused on next calling.
redmine authored
556
 * @param[i]	timeout		Timeout (in milliseconds).
redmine authored
557
 * 
redmine authored
558 559
 * @retval	 1		If there's new message.
 * @retval	 0		If there's no new messages.
redmine authored
560 561 562 563
 * @retval	-1		If got error while receiving. The error-code is placed into "errno".
 * 
 */

redmine authored
564 565 566 567
static int _cluster_recv(clustercmd_t **clustercmd_pp, struct timeval *timeout) {
	clustercmd_t   *clustercmd_p;
	static uint8_t  last_src_node_id  = NODEID_NOID;
	static uint32_t last_serial	  = 0;
redmine authored
568

redmine authored
569
	// Checking if there message is waiting in the window
redmine authored
570
	if (last_src_node_id != NODEID_NOID) {
redmine authored
571 572
		nodeinfo_t *nodeinfo_p = &nodeinfo[last_src_node_id];

redmine authored
573 574 575
		if (nodeinfo_p->serial2queuedpacket_ht != NULL) {
			clustercmdqueuedpacket_t *clustercmdqueuedpacket_p = (clustercmdqueuedpacket_t *)
				g_hash_table_lookup(nodeinfo_p->serial2queuedpacket_ht, GINT_TO_POINTER(last_serial+1));
redmine authored
576

redmine authored
577 578 579
			if (clustercmdqueuedpacket_p != NULL)
				CLUSTER_RECV_RETURNMESSAGE(&clustercmdqueuedpacket_p->cmd);
		}
redmine authored
580 581
	}

redmine authored
582 583
	// Checking if there any event on read socket
	//	select()
redmine authored
584

redmine authored
585 586
	fd_set rfds;
	FD_ZERO(&rfds);
redmine authored
587
	FD_SET(sock_i, &rfds);
redmine authored
588 589
	debug(3, "select() with timeout {%u, %u}", timeout->tv_sec, timeout->tv_usec);
	int selret = select(sock_i+1, &rfds, NULL, NULL, timeout);
redmine authored
590 591

	//	processing select()'s retuned value
redmine authored
592
	if (selret <  0) {
redmine authored
593
		error("got error while select().");
redmine authored
594
		return -1;
redmine authored
595
	}
redmine authored
596
	if (selret == 0) {
redmine authored
597
		debug(3, "no new messages.");
redmine authored
598
		return 0;
redmine authored
599
	}
redmine authored
600
	debug(3, "got new message(s).");
redmine authored
601

redmine authored
602
	debug(10, "Reading new message's header");
redmine authored
603
	clustercmdadler32_t adler32;
redmine authored
604 605 606
	//clustercmd_t *clustercmd_p = (clustercmd_t *)mmap(NULL, sizeof(clustercmdhdr_t), PROT_NONE, 
	//	MAP_PRIVATE, sock, 0);
	int ret;
redmine authored
607
	if ((ret=cluster_read(sock_i, clustercmd_pp, CLREAD_NONE))) {
redmine authored
608
		if(ret == -1) return 0; // Invalid message? Skipping.
redmine authored
609

redmine authored
610
		error("Got error from cluster_read().");
redmine authored
611 612 613
		errno = ret;
		return -1;
	}
redmine authored
614
	clustercmd_p = *clustercmd_pp;
redmine authored
615

redmine authored
616
	debug(3, "Received: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u,"
redmine authored
617
		" serial: %u, adler32: {0x%x, 0x%x}, data_len: %u}",
redmine authored
618 619 620
		clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, 
		clustercmd_p->h.cmd_id, clustercmd_p->h.serial,
		clustercmd_p->h.adler32.hdr, clustercmd_p->h.adler32.dat,
redmine authored
621
		clustercmd_p->h.data_len);
redmine authored
622

redmine authored
623 624
	// Checking adler32 of packet headers.
	clustercmd_adler32_calc(clustercmd_p, &adler32, ADLER32_CALC_HEADER);
redmine authored
625
	if (adler32.hdr != clustercmd_p->h.adler32.hdr) {
redmine authored
626
		debug(1, "hdr-adler32 mismatch: %p != %p.", 
redmine authored
627
			(void*)(long)clustercmd_p->h.adler32.hdr, (void*)(long)adler32.hdr);
redmine authored
628

redmine authored
629
		if((ret=clustercmd_reject(clustercmd_p, REJ_ADLER32MISMATCH)) != EADDRNOTAVAIL) {
redmine authored
630
			error("Got error while clustercmd_reject().");
redmine authored
631 632 633 634
			errno = ret;
			return -1;
		}
	}
redmine authored
635

redmine authored
636 637 638
	// Checking src_node_id and dst_node_id
	uint8_t src_node_id = clustercmd_p->h.src_node_id;
	uint8_t dst_node_id = clustercmd_p->h.dst_node_id;
redmine authored
639

redmine authored
640
	// 	Packet from registering node?
redmine authored
641
	if (src_node_id == NODEID_NOID) {
redmine authored
642
		// 	Wrong command from registering node?
redmine authored
643 644
		if (clustercmd_p->h.cmd_id != CLUSTERCMDID_HELLO) {
			error("Warning: cluster_recv(): Got non hello packet from NOID node. Ignoring the packet.");
redmine authored
645 646
			return 0;
		}
redmine authored
647
		if (clustercmd_p->h.serial != 0) {
redmine authored
648
			error("Warning: cluster_recv(): Got packet with non-zero serial from NOID node. Ignoring the packet.");
redmine authored
649 650
			return 0;
		}
redmine authored
651 652
	} else
	// 	Wrong src_node_id?
redmine authored
653
	if (src_node_id >= MAXNODES) {
redmine authored
654
		error("Warning: cluster_recv(): Invalid h.src_node_id: %i >= "XTOSTR(MAXNODES)"",
redmine authored
655
			src_node_id);
redmine authored
656
		return -1;
redmine authored
657
	}
redmine authored
658

redmine authored
659
	// 	Is this broadcast message?
redmine authored
660
	if (dst_node_id == NODEID_NOID) {
redmine authored
661 662 663
		// CODE HERE
	} else
	//	Wrong dst_node_id?
redmine authored
664
	if (dst_node_id >= MAXNODES) {
redmine authored
665
		error("Warning: cluster_recv(): Invalid h.dst_node_id: %i >= "XTOSTR(MAXNODES)"", 
redmine authored
666
			dst_node_id);
redmine authored
667
		return -1;
redmine authored
668 669
	}

redmine authored
670 671
	// Seems, that headers are correct. Continuing.

redmine authored
672
	// Paranoid routines
redmine authored
673
	//	The message from us? Something wrong if it is.
redmine authored
674 675
	if ((clustercmd_p->h.src_node_id == node_id_my) && (node_id_my != NODEID_NOID))
		critical("node_id collision");
redmine authored
676 677 678 679

	nodeinfo_t *nodeinfo_p = &nodeinfo[src_node_id];

	// Not actual packet?
redmine authored
680 681 682 683 684
	long serial_diff = clustercmd_p->h.serial - nodeinfo_p->last_serial;
	debug(10, "serial_diff == %i", serial_diff);
	if (serial_diff <= 0 || serial_diff > CLUSTER_WINDOW_PCKTLIMIT) {
		debug(1, "Ignoring packet (serial %i) from %i due to serial_diff: 0 <= || > %i", 
			clustercmd_p->h.serial, src_node_id, serial_diff, CLUSTER_WINDOW_PCKTLIMIT);
redmine authored
685
		return -1;
redmine authored
686 687 688
	}

	// Is this misordered packet?
redmine authored
689
	if (clustercmd_p->h.serial != nodeinfo_p->last_serial + 1) {
redmine authored
690
		clustercmd_window_add(&window_i, clustercmd_p, nodeinfo_p->serial2queuedpacket_ht);
redmine authored
691
		return -1;
redmine authored
692 693 694
	}

	// Is this the end of packet (packet without data)
redmine authored
695
	if (clustercmd_p->h.data_len == 0)
redmine authored
696
		CLUSTER_RECV_RETURNMESSAGE(clustercmd_p);
redmine authored
697 698

	// Too big data?
redmine authored
699
	if (clustercmd_p->h.data_len > CLUSTER_PACKET_MAXSIZE) {
redmine authored
700
		error("Warning: cluster_recv(): Got too big message from node %i. Ignoring it.",
redmine authored
701
			src_node_id);
redmine authored
702
		return -1;
redmine authored
703
	}
redmine authored
704
/*
redmine authored
705
	// Need more space for this packet?
redmine authored
706
	if (CLUSTERCMD_SIZE(clustercmd_p) > size) {
redmine authored
707
		size   = CLUSTERCMD_SIZE(clustercmd_p);
redmine authored
708
		clustercmd_p = (clustercmd_t *)xrealloc((char *)clustercmd_p, size);
redmine authored
709 710
	}

redmine authored
711
	debug(10, "Reading the data");
redmine authored
712
	if ((ret=cluster_read(sock_i, (void *)clustercmd_p->data.p, CLUSTER_PAD(clustercmd_p->h.data_len), CLREAD_CONTINUE))) {
redmine authored
713
		if (ret == -1) return 0;
redmine authored
714

redmine authored
715
		error("Got error from cluster_read().");
redmine authored
716 717 718
		errno = ret;
		return -1;
	}
redmine authored
719
*/
redmine authored
720 721
	// Checking adler32 of packet data.
	clustercmd_adler32_calc(clustercmd_p, &adler32, ADLER32_CALC_DATA);
redmine authored
722
	if (adler32.dat != clustercmd_p->h.adler32.dat) {
redmine authored
723
		debug(1, "dat-adler32 mismatch: %p != %p.", 
redmine authored
724
			(void*)(long)clustercmd_p->h.adler32.dat, (void*)(long)adler32.dat);
redmine authored
725

redmine authored
726
		if ((ret=clustercmd_reject(clustercmd_p, REJ_ADLER32MISMATCH)) != EADDRNOTAVAIL) {
redmine authored
727
			error("Got error while clustercmd_reject().");
redmine authored
728 729 730 731 732
			errno = ret;
			return -1;
		}
	}

redmine authored
733
	CLUSTER_RECV_RETURNMESSAGE(clustercmd_p);
redmine authored
734
}
redmine authored
735

redmine authored
736
static inline int cluster_recv(clustercmd_t **clustercmd_pp, struct timeval *timeout) {
redmine authored
737 738
	int rc;

redmine authored
739
	rc = _cluster_recv(clustercmd_pp, timeout);
redmine authored
740 741 742 743 744
	debug(10, "__________________recv___________________ %i", rc);

	return rc;
}

redmine authored
745 746

/**
redmine authored
747 748 749 750 751 752 753 754 755
 * @brief 			(hsyncop) Reads messages for time "_timeout" and proceeding them to recvproc_funct[] functions
 * 
 * @param[in]	_timeout	How long to wait messages (totally)
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while reading or processing messages. The error-code is placed into returned value.
 * 
 */

redmine authored
756 757 758 759
int cluster_recv_proc(unsigned int timeout_rel_int) {
	struct timeval timeout_rel, timeout_abs, tv_abs;
	int error_count, iteration;
	debug(3, "cluster_recv_proc(%i)", timeout_rel_int);
redmine authored
760
	clustercmd_t *clustercmd_p = NULL;
redmine authored
761
	int ret;
redmine authored
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789

	timeout_rel.tv_sec  = timeout_rel_int / 1000;
	timeout_rel.tv_usec = timeout_rel_int % 1000;
	gettimeofday(&tv_abs, NULL);
	timeradd(&tv_abs, &timeout_rel, &timeout_abs);

	error_count = 0;
	iteration   = 0;
	while (1) {
		if (iteration++) {
			gettimeofday(&tv_abs, NULL);
			if (timercmp(&timeout_abs, &tv_abs, >))
				timersub(&timeout_abs, &tv_abs, &timeout_rel);
			else
				memset(&timeout_rel, 0, sizeof(timeout_rel));

			debug(5, "timeout_abs == {%u, %u}; tv_abs == {%u, %u}; timeout_rel == {%u, %u}",
					timeout_abs.tv_sec, timeout_abs.tv_usec,
					     tv_abs.tv_sec,      tv_abs.tv_usec,
					timeout_rel.tv_sec, timeout_rel.tv_usec
				);
		}

		ret = cluster_recv(&clustercmd_p, &timeout_rel);

		if (!ret)
			break;

redmine authored
790
		// Exit if error
redmine authored
791 792 793 794 795
		if (ret == -1) {
			warning("Got error while cluster_recv(). error_count == %i", error_count);
			error_count++;

			critical_on (error_count >= CLUSTER_RECV_PROC_ERRLIMIT);
redmine authored
796 797 798
		}

		// If we have appropriate callback function, then call it! :)
redmine authored
799 800 801
		if (recvproc_funct[clustercmd_p->h.cmd_id] != NULL) {
			debug(2, "Calling function by pointer %p", recvproc_funct[clustercmd_p->h.cmd_id]);
			if ((ret = recvproc_funct[clustercmd_p->h.cmd_id](clustercmd_p))) {
redmine authored
802 803
				error("Got error from recvproc_funct[%i]: %s (%i)", 
					clustercmd_p->h.cmd_id);
redmine authored
804 805
				return ret;
			}
redmine authored
806 807 808 809 810
			continue;
		}

		// We didn't found an appropriate callback function
		debug(2, "There's no appropriate callback function for cmd_id == %i", clustercmd_p->h.cmd_id);
redmine authored
811 812 813 814 815 816 817
	}

	return 0;
}


/**
redmine authored
818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834
 * @brief 			recvproc-function for DIE-messages
 * 
 * @param[in] 	clustercmd_p 	Pointer to clustercmd
 *
 * @retval	zero 		Successfully initialized. (never happens)
 * @retval	non-zero 	Got error, while initializing.
 * 
 */

static int cluster_recvproc_die(clustercmd_t *clustercmd_p) {
	critical("Got DIE message from node_id == %i,", clustercmd_p->h.src_node_id);

	return 0;
}


/**
redmine authored
835 836 837 838 839 840 841 842 843 844 845
 * @brief 			recvproc-function for ACK-messages
 * 
 * @param[in] 	clustercmd_p 	Pointer to clustercmd
 *
 * @retval	zero 		Successfully initialized.
 * @retval	non-zero 	Got error, while initializing.
 * 
 */

static int cluster_recvproc_ack(clustercmd_t *clustercmd_p) {

redmine authored
846
	uint32_t cmd_serial_ack = clustercmd_p->data.ack.serial;
redmine authored
847

redmine authored
848
	clustercmdqueuedpacket_t *queuedpacket_p = 
redmine authored
849
		(clustercmdqueuedpacket_t *)g_hash_table_lookup(nodeinfo_my->serial2queuedpacket_ht, GINT_TO_POINTER(cmd_serial_ack));
redmine authored
850

redmine authored
851
	if (queuedpacket_p == NULL)
redmine authored
852 853 854 855
		return 0;

	uint8_t node_id_from = clustercmd_p->h.src_node_id;

redmine authored
856
	if (! queuedpacket_p->h.w.o.ack_from[node_id_from]) {
redmine authored
857 858
		queuedpacket_p->h.w.o.ack_count++;
		queuedpacket_p->h.w.o.ack_from[node_id_from]++;
redmine authored
859

redmine authored
860
		if (queuedpacket_p->h.w.o.ack_count == node_count-1)
redmine authored
861
			clustercmd_window_del(&window_o, queuedpacket_p, nodeinfo_my->serial2queuedpacket_ht);
redmine authored
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878
	}

	return 0;
}


/**
 * @brief 			Sets message processing functions for cluster_recv_proc() function for specified command type
 * 
 * @param[in]	cmd_id		The command type
 * @param[in]	procfunct	The processing function for messages with specified cmd_id
 * 
 * @retval	zero		Successful
 * @retval	non-zero	If got error while setting processing function. The error-code is placed into returned value.
 * 
 */

redmine authored
879
int cluster_io_init() {
redmine authored
880
	cluster_recv_proc_set(CLUSTERCMDID_ACK, cluster_recvproc_ack);
redmine authored
881
	cluster_recv_proc_set(CLUSTERCMDID_DIE, cluster_recvproc_die);
redmine authored
882 883 884 885 886 887 888 889 890 891 892 893
	return 0;
}


/**
 * @brief 			Antagonist of cluster_recv_proc_init() function. Freeing everything what was allocated in cluster_recv_proc_init()
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
894 895 896 897 898
int cluster_io_deinit() {

	if(window_i.buf_size) {
#ifdef PARANOID
		if(window_i.buf == NULL) {
redmine authored
899
			error("window_i.buf_size != 0, but window_i.buf == NULL.");
redmine authored
900 901 902 903
		} else
#endif
		free(window_i.buf);
	}
redmine authored
904

redmine authored
905
	if(window_o.buf_size) {
redmine authored
906
#ifdef PARANOID
redmine authored
907
		if(window_o.buf == NULL) {
redmine authored
908
			error("window_o.buf_size != 0, but window_o.buf == NULL.");
redmine authored
909 910
		} else
#endif
redmine authored
911
		free(window_o.buf);
redmine authored
912 913 914 915 916
	}

	return 0;
}

redmine authored
917

redmine authored
918
/**
redmine authored
919
 * @brief 			recvproc-function for welcome-messages
redmine authored
920 921 922 923 924 925 926 927
 * 
 * @param[in] 	clustercmd_p 	Pointer to clustercmd
 *
 * @retval	zero 		Successfully initialized.
 * @retval	non-zero 	Got error, while initializing.
 * 
 */

redmine authored
928 929 930
static int cluster_recvproc_welcome(clustercmd_t *clustercmd_p) {
	debug(20, "%p", clustercmd_p);
//	static time_t updatets = 0;
redmine authored
931

redmine authored
932 933
	clustercmd_welcome_t *data_welcome_p = &clustercmd_p->data.welcome;
/*
redmine authored
934
	// 	Is this the most recent information? Skipping if not.
redmine authored
935
	if (!(data_welcome_p->updatets > updatets))
redmine authored
936
		return 0;
redmine authored
937
*/
redmine authored
938 939
	// 	Is the node name length in message equals to our node name length? Skipping if not.
	uint32_t recv_nodename_len;
redmine authored
940 941 942
	recv_nodename_len = welcome_to_node_name_len(clustercmd_p);
	if (recv_nodename_len != ctx_p->cluster_nodename_len) {
		debug(9, "recv_nodename_len [%i] != ctx_p->cluster_nodename_len [%i]", recv_nodename_len, ctx_p->cluster_nodename_len);
redmine authored
943
		return 0;
redmine authored
944
	}
redmine authored
945 946

	// 	Is the node name equals to ours? Skipping if not.
redmine authored
947 948
	if (memcmp(welcome_to_node_name(data_welcome_p), ctx_p->cluster_nodename, recv_nodename_len)) {
		debug(9, "to_node_name != ctx_p->cluster_nodename");
redmine authored
949
		return 0;
redmine authored
950
	}
redmine authored
951 952

	//	Remembering the node that answered us
redmine authored
953 954 955 956 957 958 959 960
	node_update_status(clustercmd_p->h.src_node_id, NODESTATUS_SEEMSONLINE);
	{
		char *node_name;

		node_name = alloca(data_welcome_p->from_node_name_len);
		memcpy(node_name, data_welcome_p->from_node_name, data_welcome_p->from_node_name_len);
		node_update_name(clustercmd_p->h.src_node_id, node_name);
	}
redmine authored
961 962 963

	// 	Seems, that somebody knows our node id, remembering it.
	node_id_my  = clustercmd_p->h.dst_node_id;
redmine authored
964
//	updatets    = data_welcome_p->updatets;
redmine authored
965 966 967

	return 0;
}
redmine authored
968 969


redmine authored
970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
/**
 * @brief 			recvproc-function for hello-messages
 * 
 * @param[in] 	clustercmd_p 	Pointer to clustercmd
 *
 * @retval	zero 		Successfully initialized.
 * @retval	non-zero 	Got error, while initializing.
 * 
 */

static int cluster_recvproc_hello(clustercmd_t *clustercmd_p) {
	clustercmd_hello_t *data_hello_p = &clustercmd_p->data.hello;

	// Sending information to the new node
	{
		int ret;
		uint8_t node_id;

		debug(15, "Preparing a welcome message for nodename \"%s\" from nodename == \"%s\"", data_hello_p->node_name, ctx_p->cluster_nodename);
		size_t data_len = sizeof(clustercmd_welcome_t) + clustercmd_p->h.data_len+ctx_p->cluster_nodename_len;

		clustercmd_t *answer_p = CLUSTER_ALLOCA(void, data_len);
		clustercmd_welcome_t *answer_data_p = &answer_p->data.welcome;

		answer_data_p->from_node_name_len = ctx_p->cluster_nodename_len;
		memcpy(answer_data_p->from_node_name, ctx_p->cluster_nodename, ctx_p->cluster_nodename_len);
		memcpy(welcome_to_node_name(answer_data_p), data_hello_p->node_name, clustercmd_p->h.data_len);

		{
			void *ptr;
			char *node_name = alloca(clustercmd_p->h.data_len+1);
			memcpy(node_name, data_hello_p->node_name, clustercmd_p->h.data_len+1);
			ptr = g_hash_table_lookup(indexes_p->nodenames_ht, node_name);
			node_id = (ptr == NULL ? NODEID_NOID : GPOINTER_TO_INT(ptr));
			debug(3, "\"%s\" -> %i", node_name, node_id);
		}

		answer_p->h.data_len    = data_len;

		answer_p->h.cmd_id      = CLUSTERCMDID_WELCOME;
		answer_p->h.dst_node_id = node_id; // broadcast

		if ((ret = cluster_send(answer_p)))
			return ret;
	}

	return 0;
}


/**
 * @brief 			recvproc-function for register-messages
 * 
 * @param[in] 	clustercmd_p 	Pointer to clustercmd
 *
 * @retval	zero 		Successfully initialized.
 * @retval	non-zero 	Got error, while initializing.
 * 
 */

static int cluster_recvproc_register(clustercmd_t *clustercmd_p) {
	clustercmd_reg_t *data_register_p = &clustercmd_p->data.reg;

	char *node_name = alloca(clustercmd_p->h.data_len+1);
	memcpy(node_name, data_register_p->node_name, clustercmd_p->h.data_len+1);
	node_update_name(clustercmd_p->h.src_node_id, node_name);

	return 0;
}

redmine authored
1040
extern int cluster_loop();
redmine authored
1041
/**
redmine authored
1042
 * @brief 			Initializes cluster subsystem.
redmine authored
1043
 * 
redmine authored
1044
 * @param[in] 	_ctx_p 	Pointer to "glob" variable, defined in main().
redmine authored
1045
 * @param[in] 	_indexes_p	Pointer to "indexes" variable, defined in sync_run().
redmine authored
1046
 *
redmine authored
1047 1048
 * @retval	zero 		Successfully initialized.
 * @retval	non-zero 	Got error, while initializing.
redmine authored
1049 1050
 * 
 */
redmine authored
1051

redmine authored
1052
int cluster_init(ctx_t *_ctx_p, indexes_t *_indexes_p) {
redmine authored
1053 1054
	int ret;

redmine authored
1055
	// Preventing double initializing
redmine authored
1056
	if (ctx_p != NULL) {
redmine authored
1057
		error("cluster subsystem is already initialized.");
redmine authored
1058 1059 1060
		return EALREADY;
	}

redmine authored
1061
	// Initializing global variables, pt. 1
redmine authored
1062
	ctx_p		= _ctx_p;
redmine authored
1063
	indexes_p	= _indexes_p;
redmine authored
1064
	cluster_timeout	= ctx_p->cluster_timeout;
redmine authored
1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076

	indexes_p->nodenames_ht = g_hash_table_new_full(g_str_hash,	  g_str_equal,	  free, 0);

	node_update_status(NODEID_NOID, NODESTATUS_ONLINE);

	{
		int i=0;
		while (i < MAXNODES) {
			nodeinfo[i].last_serial = -1;
			i++;
		}
	}
redmine authored
1077

redmine authored
1078
	// Initializing network routines
redmine authored
1079 1080 1081 1082 1083 1084 1085

	// 	Input socket

	//		Creating socket

	sock_i = socket(AF_INET, SOCK_DGRAM, 0);
	if(sock_i < 0) {
redmine authored
1086
		error("Cannot create socket for input traffic");
redmine authored
1087 1088 1089 1090 1091
		return errno;
	}

	// 		Enable SO_REUSEADDR to allow multiple instances of this application to receive copies
	// 		of the multicast datagrams.
redmine authored
1092 1093

	int reuse = 1;
redmine authored
1094
	if(setsockopt(sock_i, SOL_SOCKET, SO_REUSEADDR,(char *)&reuse, sizeof(reuse)) < 0) {
redmine authored
1095
		error("Got error while setsockopt()");
redmine authored
1096 1097 1098
		return errno;
	}

redmine authored
1099
	//		Binding
redmine authored
1100

redmine authored
1101
	sa_i.sin_family		= AF_INET;
redmine authored
1102
	sa_i.sin_port 		= htons(ctx_p->cluster_mcastipport);
redmine authored
1103
	sa_i.sin_addr.s_addr	= INADDR_ANY;
redmine authored
1104

redmine authored
1105
	if(bind(sock_i, (struct sockaddr*)&sa_i, sizeof(sa_i))) {
redmine authored
1106
		error("Got error while bind()");
redmine authored
1107 1108 1109
		return errno;
	}

redmine authored
1110 1111
	//		Joining to multicast group

redmine authored
1112
	struct ip_mreq group;
redmine authored
1113 1114
	group.imr_interface.s_addr = inet_addr(ctx_p->cluster_iface);
	group.imr_multiaddr.s_addr = inet_addr(ctx_p->cluster_mcastipaddr);
redmine authored
1115

redmine authored
1116
	if(setsockopt(sock_i, IPPROTO_IP, IP_ADD_MEMBERSHIP, 
redmine authored
1117
				(char *)&group, sizeof(group)) < 0) {
redmine authored
1118
		error("Cannot setsockopt() to enter to membership %s -> %s",
redmine authored
1119
			ctx_p->cluster_iface, ctx_p->cluster_mcastipaddr);
redmine authored
1120 1121
		return errno;
	}
redmine authored
1122

redmine authored
1123 1124 1125 1126 1127 1128
	//	Output socket

	//		Creating socket

	sock_o = socket(AF_INET, SOCK_DGRAM, 0);
	if(sock_o < 0) {
redmine authored
1129
		error("Cannot create socket for output traffic");
redmine authored
1130 1131 1132 1133 1134 1135
		return errno;
	}
	
	//		Initializing the group sockaddr structure

	sa_o.sin_family		= AF_INET;
redmine authored
1136 1137
	sa_o.sin_port 		= htons(ctx_p->cluster_mcastipport);
	sa_o.sin_addr.s_addr	= inet_addr(ctx_p->cluster_mcastipaddr);
redmine authored
1138 1139 1140 1141 1142 1143

	//		Disable looping back output datagrams

	{
		char loopch = 0;
		if(setsockopt(sock_o, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch))<0) {
redmine authored
1144
			error("Cannot disable loopback for output socket.");
redmine authored
1145 1146 1147 1148 1149 1150 1151 1152
			return errno;
		}
	}

	//		Setting local interface for output traffic

	{
		struct in_addr addr_o;
redmine authored
1153
		addr_o.s_addr = inet_addr(ctx_p->cluster_iface);
redmine authored
1154
		if(setsockopt(sock_o, IPPROTO_IP, IP_MULTICAST_IF, &addr_o, sizeof(addr_o)) < 0) {
redmine authored
1155
			error("Cannot set local interface for outbound traffic");
redmine authored
1156 1157 1158 1159
			return errno;
		}
	}

redmine authored
1160

redmine authored
1161
	// Initializing another routines
redmine authored
1162
	cluster_io_init();
redmine authored
1163

redmine authored
1164 1165 1166
	// Getting my ID in the cluster

	//	Trying to preserve my node_id after restart. :)
redmine authored
1167
	//	Asking another nodes about my previous node_id
redmine authored
1168
	{
redmine authored
1169 1170
		debug(15, "Preparing a message with my nodename == \"%s\" (%i)", ctx_p->cluster_nodename, ctx_p->cluster_nodename_len);

redmine authored
1171
		clustercmd_t *clustercmd_p = CLUSTER_ALLOCA(clustercmd_hello_t, ctx_p->cluster_nodename_len);
redmine authored
1172

redmine authored
1173
		clustercmd_p->h.data_len = ctx_p->cluster_nodename_len;
redmine authored
1174
		memcpy(clustercmd_p->data.hello.node_name, ctx_p->cluster_nodename, clustercmd_p->h.data_len+1);
redmine authored
1175

redmine authored
1176
		clustercmd_p->h.cmd_id      = CLUSTERCMDID_HELLO;
redmine authored
1177
		clustercmd_p->h.dst_node_id = NODEID_NOID; // broadcast
redmine authored
1178
		if ((ret = cluster_send(clustercmd_p)))
redmine authored
1179
			return ret;
redmine authored
1180
	}
redmine authored
1181

redmine authored
1182
	//	Processing answers
redmine authored
1183
	cluster_recv_proc_set(CLUSTERCMDID_WELCOME, cluster_recvproc_welcome);
redmine authored
1184

redmine authored
1185
	if ((ret=cluster_recv_proc(cluster_timeout)))
redmine authored
1186
		return ret;
redmine authored
1187

redmine authored
1188 1189 1190 1191
	//	Ignore next welcome messages
	cluster_recv_proc_set(CLUSTERCMDID_WELCOME, NULL);


redmine authored
1192
	debug(3, "After communicating with others, my node_id is %i.", node_id_my);
redmine authored
1193

redmine authored
1194
	//	Getting free node_id if nobody said us the certain value (see above).
redmine authored
1195
	if (node_id_my == NODEID_NOID) {
redmine authored
1196
		int i=0;
redmine authored
1197 1198
		while (i<MAXNODES) {
			if (nodeinfo[i].status == NODESTATUS_DOESNTEXIST) {
redmine authored
1199 1200 1201 1202 1203
				node_id_my = i;
				break;
			}
			i++;
		}
redmine authored
1204
		debug(3, "I was have to set my node_id to %i.", node_id_my);
redmine authored
1205 1206 1207
	}

	//	If there's no free id-s, then exit :(
redmine authored
1208
	if (node_id_my == NODEID_NOID) {
redmine authored
1209
		error("Cannot find free node ID. Seems, that all %i ID-s are already occupied.");
redmine authored
1210 1211 1212
		return ENOMEM;
	}

redmine authored
1213 1214 1215
	// Initializing global variables, pt. 2
	nodeinfo_my = &nodeinfo[node_id_my];

redmine authored
1216
	// Registering in the cluster
redmine authored
1217 1218

	// 	Sending registration information
redmine authored
1219
	node_update_status(node_id_my, NODESTATUS_SEEMSONLINE);
redmine authored
1220
	{
redmine authored
1221
		clustercmd_t *clustercmd_p = CLUSTER_ALLOCA(clustercmd_reg_t, ctx_p->cluster_nodename_len);
redmine authored
1222
		clustercmd_reg_t *data_reg_p = &clustercmd_p->data.reg;
redmine authored
1223

redmine authored
1224
		memcpy(data_reg_p->node_name, ctx_p->cluster_nodename, ctx_p->cluster_nodename_len+1);
redmine authored
1225

redmine authored
1226
		clustercmd_p->h.data_len    = ctx_p->cluster_nodename_len+1;
redmine authored
1227
		clustercmd_p->h.cmd_id      = CLUSTERCMDID_REG;
redmine authored
1228 1229 1230
		clustercmd_p->h.dst_node_id = NODEID_NOID; // broadcast
		if((ret=cluster_send(clustercmd_p)))
			return ret;
redmine authored
1231
	}
redmine authored
1232

redmine authored
1233 1234 1235 1236
	//	Setting process functions
	cluster_recv_proc_set(CLUSTERCMDID_HELLO,	cluster_recvproc_hello);
	cluster_recv_proc_set(CLUSTERCMDID_REG,		cluster_recvproc_register);

redmine authored
1237
	// 	Getting answers
redmine authored
1238
	if ((ret=cluster_recv_proc(cluster_timeout)))
redmine authored
1239 1240
		return ret;

redmine authored
1241
	node_update_status(node_id_my, NODESTATUS_ONLINE);
redmine authored
1242 1243 1244

	// Running thread, that will process background communicating routines with another nodes.
	// The process is based on function cluster_loop() [let's use shorthand "cluster_loop()-thread"]
redmine authored
1245 1246 1247 1248 1249
	ret = pthread_create(&pthread_cluster, NULL, (void *(*)(void *))cluster_loop, NULL);

	return ret;
}

redmine authored
1250
/**
redmine authored
1251
 * @brief 			(syncop) Sends signal to cluster_loop()-thread
redmine authored
1252 1253 1254
 * 
 * @param[in] 	signal 		Signal number
 *
redmine authored
1255 1256
 * @retval	zero 		Successfully send the signal
 * @retval	non-zero 	Got error, while sending the signal
redmine authored
1257 1258 1259
 * 
 */

redmine authored
1260
static inline int cluster_signal(int signal) {
redmine authored
1261
	if (pthread_cluster)
redmine authored
1262 1263 1264
		return pthread_kill(pthread_cluster, signal);

	return 0;
redmine authored
1265 1266
}

redmine authored
1267

redmine authored
1268
extern int cluster_modtime_exchange_cleanup();
redmine authored
1269
/**
redmine authored
1270
 * @brief 			Antagonist of cluster_init() function. Kills cluster_loop()-thread and cleaning up
redmine authored
1271 1272 1273 1274 1275 1276
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1277
int cluster_deinit() {
redmine authored
1278 1279
	int ret = 0;

redmine authored
1280 1281
	cluster_signal(SIGTERM);

redmine authored
1282 1283
	ret = pthread_join(pthread_cluster, NULL);

redmine authored
1284
	cluster_io_deinit();
redmine authored
1285

redmine authored
1286
	node_update_status(NODEID_NOID, NODESTATUS_DOESNTEXIST);
redmine authored
1287 1288 1289 1290 1291
#ifdef VERYPARANOID
	int i=0;
#endif
	while(node_count) {
#ifdef VERYPARANOID
Andrew Savchenko authored
1292
		if(i++ > MAXNODES) {
redmine authored
1293
			error("cluster_deinit() looped. Forcing break.");
redmine authored
1294 1295 1296
			break;
		}
#endif
redmine authored
1297
		node_update_status(0, NODESTATUS_DOESNTEXIST);
redmine authored
1298
	}
redmine authored
1299

redmine authored
1300 1301
	close(sock_i);
	close(sock_o);
redmine authored
1302

redmine authored
1303
#ifdef VERYPARANOID
Andrew Savchenko authored
1304 1305
	memset(nodeinfo, 0, sizeof(nodeinfo_t) * NODES_ALLOC);
	nodeinfo_my = NULL;
redmine authored
1306 1307 1308
	node_count  = 0;
	node_online = 0;
	node_id_my  = NODEID_NOID;
redmine authored
1309 1310 1311

	memset(&sa_i,	0, sizeof(sa_i));
	memset(&sa_o,	0, sizeof(sa_o));
redmine authored
1312
#endif
redmine authored
1313

redmine authored
1314 1315
	cluster_modtime_exchange_cleanup();

redmine authored
1316 1317
	g_hash_table_destroy(indexes_p->nodenames_ht);

redmine authored
1318
	return ret;
redmine authored
1319 1320
}

redmine authored
1321

redmine authored
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331
/**
 * @brief 			(syncop) Forces anothes nodes to ignore events about the file or directory
 * 
 * @param[in] 	fpath 		Path to the file or directory
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1332
int cluster_lock(const char *fpath) {
redmine authored
1333 1334 1335
	return 0;
}

redmine authored
1336 1337 1338 1339 1340 1341 1342 1343 1344

/**
 * @brief 			(syncop) Forces anothes nodes to ignore events about all files and directories listed in queues of "indexes_p"
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1345 1346 1347
int cluster_lock_byindexes() {
	return 0;
}
redmine authored
1348

redmine authored
1349 1350 1351 1352 1353 1354 1355 1356 1357

/**
 * @brief 			(syncop) Returns events-handling on another nodes about all files and directories, locked by cluster_lock() and cluster_lock_byindexes() from this node
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1358
int cluster_unlock_all() {
redmine authored
1359 1360 1361
	return 0;
}

redmine authored
1362

redmine authored
1363 1364 1365 1366 1367 1368 1369 1370
#define CLUSTER_LOOP_CHECK(a) {\
	int ret = a;\
	if(ret) {\
		sync_term(ret);\
		return ret;\
	}\
}

redmine authored
1371
/**
redmine authored
1372
 * @brief 			Processes background communicating routines with another nodes. cluster_init() function create a thread for this function.
redmine authored
1373 1374 1375 1376 1377 1378
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1379
int cluster_loop() {
redmine authored
1380
	int ret = 0;
redmine authored
1381
	sigset_t sigset_cluster;
redmine authored
1382 1383 1384

	// Ignoring SIGINT signal

redmine authored
1385 1386 1387 1388
	sigemptyset(&sigset_cluster);
	sigaddset(&sigset_cluster, SIGINT);
	CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_BLOCK, &sigset_cluster, NULL));

redmine authored
1389 1390
	// Don't ignoring SIGTERM signal

redmine authored
1391 1392 1393 1394
	sigemptyset(&sigset_cluster);
	sigaddset(&sigset_cluster, SIGTERM);
	CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_UNBLOCK, &sigset_cluster, NULL));

redmine authored
1395 1396
	// Starting the loop

redmine authored
1397
	debug(3, "cluster_loop() started.");
redmine authored
1398

redmine authored
1399
	while (1) {
redmine authored
1400
		int _ret;
redmine authored
1401 1402 1403 1404
		// Waiting for event
		fd_set rfds;
		FD_ZERO(&rfds);
		FD_SET(sock_i, &rfds);
redmine authored
1405
		debug(3, "select()");
redmine authored
1406
		_ret = select(sock_i+1, &rfds, NULL, NULL, NULL);
redmine authored
1407 1408

		// Exit if error
redmine authored
1409
		if ((_ret == -1) && (errno != EINTR)) {
redmine authored
1410
			ret = errno;
redmine authored
1411
			sync_term(ret);
redmine authored
1412
			break;
redmine authored
1413 1414
		}

redmine authored
1415
		// Breaking the loop, if there's SIGTERM signal for this thread
redmine authored
1416
		debug(3, "sigpending()");
redmine authored
1417
		if (sigpending(&sigset_cluster))
redmine authored
1418 1419 1420
			if(sigismember(&sigset_cluster, SIGTERM))
				break;

redmine authored
1421
		// Processing new messages
redmine authored
1422
		debug(3, "cluster_recv_proc()");
redmine authored
1423
		if ((ret=cluster_recv_proc(0))) {
redmine authored
1424
			sync_term(ret);
redmine authored
1425
			break;
redmine authored
1426
		}
redmine authored
1427 1428
	}

redmine authored
1429
	debug(3, "cluster_loop() finished with exitcode %i.", ret);
redmine authored
1430
	return ret;
redmine authored
1431 1432 1433 1434 1435 1436 1437 1438 1439
#ifdef DOXYGEN
	sync_term(0);
#endif
}


/**
 * @brief 			Updating information about modification time of a directory.
 * 
redmine authored
1440 1441 1442
 * @param[in]	path		Canonized path to updated file/dir
 * @param[in]	dirlevel	Directory level provided by fts (man 3 fts)
 * @param[in]	st_mode		st_mode value to detect is it directory or not (S_IFDIR or not)
redmine authored
1443 1444 1445 1446 1447 1448
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

redmine authored
1449
int cluster_modtime_update(const char *path, short int dirlevel, mode_t st_mode) {
redmine authored
1450 1451 1452
	// "modtime" is incorrent name-part of function. Actually it updates "change time" (man 2 lstat64).
	int ret;

redmine authored
1453
	// Getting relative directory level (depth)
redmine authored
1454
	short int dirlevel_rel = dirlevel - ctx_p->watchdir_dirlevel;
redmine authored
1455

redmine authored
1456
	if ((st_mode & S_IFMT) == S_IFDIR)
redmine authored
1457 1458 1459
		dirlevel_rel++;

	// Don't remembering information about directories with level beyond the limits
redmine authored
1460
	if ((dirlevel_rel > ctx_p->cluster_scan_dl_max) || (dirlevel_rel < ctx_p->cluster_hash_dl_min))
redmine authored
1461 1462 1463 1464
		return 0;


	// Getting directory/file-'s information (including "change time" aka "st_ctime")
redmine authored
1465
	stat64_t stat64;
redmine authored
1466
	ret=lstat64(path, &stat64);
redmine authored
1467
	if (ret) {
redmine authored
1468
		error("Cannot lstat64()", path);
redmine authored
1469 1470 1471
		return errno;
	}

redmine authored
1472 1473
	// Getting absolute directory path
	const char *dirpath;
redmine authored
1474
	if ((st_mode & S_IFMT) == S_IFDIR) {
redmine authored
1475 1476 1477 1478 1479 1480 1481 1482
		dirpath = path;
	} else {
		char *path_dup = strdup(path);
		dirpath = (const char *)dirname(path_dup);
		free(path_dup);
	}

	// Getting relative directory path
redmine authored
1483
	//	Initializing
redmine authored
1484 1485 1486 1487
	size_t  dirpath_len   = strlen(dirpath);
	char   *dirpath_rel_p = xmalloc(dirpath_len+1);
	char   *dirpath_rel   = dirpath_rel_p;

redmine authored
1488 1489
	const char *dirpath_rel_full     = &dirpath[ctx_p->watchdirlen];
	size_t      dirpath_rel_full_len = dirpath_len - ctx_p->watchdirlen;
redmine authored
1490 1491 1492 1493

	// 	Getting coodinate of the end (directory path is already canonized, so we can simply count number of slashes to get directory level)
	int     slashcount=0;
	size_t  dirpath_rel_end=0;
redmine authored
1494 1495
	while (dirpath_rel_full[dirpath_rel_end] && (dirpath_rel_end < dirpath_rel_full_len)) {
		if (dirpath_rel_full[dirpath_rel_end] == '/') {
redmine authored
1496
			slashcount++;
redmine authored
1497
			if (slashcount >= ctx_p->cluster_hash_dl_max)
redmine authored
1498 1499 1500 1501
				break;
		}
		dirpath_rel_end++;
	}
redmine authored
1502

redmine authored
1503 1504 1505 1506 1507 1508 1509 1510
	//	Copy the required part of path to dirpath_rel
	memcpy(dirpath_rel, dirpath_rel_full, dirpath_rel_end);

	
	// Updating "st_ctime" information. We should check current value for this directory and update it only if it less or not set.
	//	Checking current value
	char toupdate = 0;
	gpointer ctime_gp = g_hash_table_lookup(nodeinfo_my->modtime_ht, dirpath_rel);
redmine authored
1511
	if (ctime_gp == NULL)
redmine authored
1512
		toupdate++;
redmine authored
1513
	else if (GPOINTER_TO_INT(ctime_gp) < stat64.st_ctime)
redmine authored
1514 1515 1516
		toupdate++;

	//	g_hash_table_replace() will replace existent information about the directory or create it if it doesn't exist.
redmine authored
1517
	if (toupdate)
redmine authored
1518
		g_hash_table_replace(nodeinfo_my->modtime_ht, strdup(dirpath_rel), GINT_TO_POINTER(stat64.st_ctime));
redmine authored
1519 1520

	// Why I'm using "st_ctime" instead of "st_mtime"? Because "st_ctime" also updates on updating inode information.
redmine authored
1521 1522
	
	return 0;
redmine authored
1523 1524 1525
}


redmine authored
1526
/**
redmine authored
1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
 * @brief 			Puts entry to list to be send to other nodes. To be called from cluster_modtime_exchange()
 *
 * @param[in] 	pushrentry_arg_p Pointer to pushentry_arg structure
 * 
 */

void cluster_modtime_exchange_pushentry(gpointer dir_gp, gpointer modtype_gp, void *pushentry_arg_gp) {
	struct pushdoubleentry_arg *pushentry_arg_p = (struct pushdoubleentry_arg *)pushentry_arg_gp;
	char  *dir   = (char *)dir_gp;
	time_t ctime = (time_t)GPOINTER_TO_INT(modtype_gp);
redmine authored
1537
	size_t size  = strlen(dir)+1;				// TODO: strlen should be already prepared
redmine authored
1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593
								// but not re-calculated here

	if(pushentry_arg_p->allocated <= pushentry_arg_p->total) {
		pushentry_arg_p->allocated += ALLOC_PORTION;
		pushentry_arg_p->entry      = (struct doubleentry *)
			xrealloc(
				(char *)pushentry_arg_p->entry, 
				pushentry_arg_p->allocated * sizeof(*pushentry_arg_p->entry)
			);
	}

	pushentry_arg_p->entry[pushentry_arg_p->total].dat0  = dir;
	pushentry_arg_p->entry[pushentry_arg_p->total].size0 = size;
	pushentry_arg_p->entry[pushentry_arg_p->total].dat1  = (void *)ctime;	// Will be problems if sizeof(time_t) > sizeof(void *)
	pushentry_arg_p->entry[pushentry_arg_p->total].size1 = sizeof(ctime);

	pushentry_arg_p->size += size;
	pushentry_arg_p->total++;

	return;
}


static struct pushdoubleentry_arg cluster_modtime_exchange_pushentry_arg = {0};
/**
 * @brief 			Clean up after the last run of cluster_modtime_exchange.
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

int cluster_modtime_exchange_cleanup() {
	struct pushdoubleentry_arg *pushentry_arg_p = &cluster_modtime_exchange_pushentry_arg;

	int i=0;
	while(i < pushentry_arg_p->allocated) {
		if(pushentry_arg_p->entry[i].alloc0)
			free(pushentry_arg_p->entry[i].dat0);
		if(pushentry_arg_p->entry[i].alloc1)
			free(pushentry_arg_p->entry[i].dat1);
		i++;
	}

	free(pushentry_arg_p->entry);

#ifdef VERYPARANOID
	memset(pushentry_arg_p, 0, sizeof(*pushentry_arg_p));
#endif

	return 0;
}


/**
 * @brief 			Exchanging with "modtime_ht"-s to be able to compare them.
redmine authored
1594 1595 1596 1597 1598 1599 1600
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

int cluster_modtime_exchange() {
redmine authored
1601 1602
	struct pushdoubleentry_arg *pushentry_arg_p = &cluster_modtime_exchange_pushentry_arg;

redmine authored
1603
	// Getting hash table entries
redmine authored
1604 1605 1606 1607 1608 1609 1610 1611
	pushentry_arg_p->size=0;
	pushentry_arg_p->total=0;
	g_hash_table_foreach(nodeinfo_my->modtime_ht, cluster_modtime_exchange_pushentry, (void *)pushentry_arg_p);

	if(!pushentry_arg_p->total) {
		// !!!
	}

redmine authored
1612
	// Calculating required RAM to compile clustercmd
redmine authored
1613 1614 1615 1616 1617 1618 1619 1620
	size_t toalloc = 0;
	int i = 0;
	while(i < pushentry_arg_p->total) {
		toalloc += 4;					// for size header
		toalloc += pushentry_arg_p->entry[i].size0;	// for path
		toalloc += pushentry_arg_p->entry[i].size1;	// for ctime
	}

redmine authored
1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656
	// Allocating space for the clustercmd
	clustercmd_t *clustercmd_p = (clustercmd_t *)xmalloc(sizeof(clustercmdhdr_t) + toalloc);
	memset(clustercmd_p, 0, sizeof(clustercmdhdr_t));

	// Setting up clustercmd
	clustercmd_p->h.dst_node_id	= NODEID_NOID;
	clustercmd_p->h.cmd_id		= CLUSTERCMDID_HT_EXCH;
	clustercmd_p->h.data_len	= toalloc;

	// Filing clustercmd with hash-table entriyes
	i = 0;
	clustercmd_ht_exch_t *clustercmd_ht_exch_p = &clustercmd_p->data.ht_exch;
	while(i < pushentry_arg_p->total) {
		// Setting the data

		clustercmd_ht_exch_p->ctime       = (time_t)pushentry_arg_p->entry[i].dat1;
		clustercmd_ht_exch_p->path_length = (time_t)pushentry_arg_p->entry[i].size0;

		memcpy(
			clustercmd_ht_exch_p->path,
			pushentry_arg_p->entry[i].dat0,
			clustercmd_ht_exch_p->path_length
		);

		// Pointing to space for next entry:
		size_t offset = sizeof(clustercmd_ht_exch_t)-1+pushentry_arg_p->entry[i].size0;

		clustercmd_ht_exch_p = (clustercmd_ht_exch_t *)
			(&((char *) clustercmd_ht_exch_p)[offset] );
	}

	// Sending
	cluster_send(clustercmd_p);

	// Cleanup
	free(clustercmd_p);
redmine authored
1657 1658 1659 1660 1661 1662

	return 0;
}


/**
redmine authored
1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
 * @brief 			(syncop) Syncing file tree with another nodes with using of directories' modification time as a recent-detector.
 * 
 * @param[in] 	dirpath		Path to the directory
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

int cluster_initialsync() {
redmine authored
1673 1674
	cluster_modtime_exchange();

redmine authored
1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692
	return 0;
}


/**
 * @brief 			(syncop) "Captures" right to update the file or directory to another nodes. It just removes events about the file of directory from another nodes
 * 
 * @param[in] 	dirpath		Path to the directory
 * 
 * @retval	zero 		Successfully initialized
 * @retval	non-zero 	Got error, while initializing
 * 
 */

int cluster_capture(const char *path) {
	return 0;
}

redmine authored
1693 1694
#endif