Blame view

socket.c 21 KB
redmine authored
1
/*
redmine authored
2
    clsync - file tree sync utility based on inotify
redmine authored
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
    
    Copyright (C) 2013  Dmitry Yu Okunev <dyokunev@ut.mephi.ru> 0x8E30679C
    
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

Andrew Savchenko authored
20 21 22
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
redmine authored
23
#include <string.h>
Andrew Savchenko authored
24
#include <sys/errno.h>
Andrew Savchenko authored
25
#include <sys/param.h>
Andrew Savchenko authored
26
#include <sys/socket.h>
redmine authored
27
#include <sys/un.h>	// for "struct sockaddr_un"
Andrew Savchenko authored
28
#include <unistd.h>
redmine authored
29

redmine authored
30

Andrew Savchenko authored
31
#include "configuration.h"
redmine authored
32
#include "error.h"
redmine authored
33
#include "malloc.h"
Andrew Savchenko authored
34
#include "program.h"
redmine authored
35 36
#include "socket.h"

redmine authored
37
pthread_mutex_t socket_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
redmine authored
38

redmine authored
39 40 41
int clsyncsockthreads_last	= -1;
int clsyncsockthreads_count	=  0;
int clsyncsockthreads_num	=  0;
redmine authored
42

redmine authored
43
char clsyncsockthread_busy[SOCKET_MAX+1] = {0};
redmine authored
44

redmine authored
45
socket_sockthreaddata_t sockthreaddata[SOCKET_MAX+1] = {{0}};
redmine authored
46 47

int socket_gc() {
redmine authored
48
	int i=clsyncsockthreads_last+1;
redmine authored
49 50
	while(i) {
		i--;
redmine authored
51
		switch(sockthreaddata[i].state) {
redmine authored
52
			case CLSTATE_DIED:
redmine authored
53
				debug(3, "Forgeting clsyncsock #%u", i);
redmine authored
54 55
				pthread_join(sockthreaddata[i].thread, NULL);
				sockthreaddata[i].state = CLSTATE_NONE;
redmine authored
56 57 58 59 60 61 62 63
				break;
			default:
				break;
		}
	}

	return 0;
}
redmine authored
64 65 66 67

static char *recv_stps[SOCKET_MAX];
static char *recv_ptrs[SOCKET_MAX];

redmine authored
68 69
const char *const textmessage_args[SOCKCMD_MAXID] = {
	[SOCKCMD_REQUEST_NEGOTIATION] 	= "%u",
redmine authored
70
	[SOCKCMD_REQUEST_DUMP]	 	= "%s",
redmine authored
71
	[SOCKCMD_REQUEST_SET]	 	= "%s\003/ %s\003/",
redmine authored
72
	[SOCKCMD_REPLY_NEGOTIATION] 	= "%u",
redmine authored
73 74
	[SOCKCMD_REPLY_ACK]		= "%u %lu",
	[SOCKCMD_REPLY_EINVAL]		= "%u %lu",
redmine authored
75 76
	[SOCKCMD_REPLY_VERSION]		= "%u %u %s",
	[SOCKCMD_REPLY_INFO]		= "%s\003/ %s\003/ %x %x",
redmine authored
77
	[SOCKCMD_REPLY_UNKNOWNCMD]	= "%u %lu",
redmine authored
78
	[SOCKCMD_REPLY_INVALIDCMDID]	= "%lu",
redmine authored
79 80 81
	[SOCKCMD_REPLY_EEXIST]		= "%s\003/",
	[SOCKCMD_REPLY_EPERM]		= "%s\003/",
	[SOCKCMD_REPLY_ECUSTOM]		= "%s\003/ %s\003/ %u %s\003/",
redmine authored
82 83
};

redmine authored
84 85 86
const char *const textmessage_descr[SOCKCMD_MAXID] = {
	[SOCKCMD_REQUEST_NEGOTIATION]	= "Protocol version is %u.",
	[SOCKCMD_REPLY_NEGOTIATION]	= "Protocol version is %u.",
redmine authored
87 88
	[SOCKCMD_REPLY_ACK]		= "Acknowledged command: id == %u; num == %lu.",
	[SOCKCMD_REPLY_EINVAL]		= "Rejected command: id == %u; num == %lu. Invalid arguments: %s.",
redmine authored
89 90 91 92 93 94
	[SOCKCMD_REPLY_LOGIN]		= "Enter your login and password, please.",
	[SOCKCMD_REPLY_UNEXPECTEDEND]	= "Need to go, sorry. :)",
	[SOCKCMD_REPLY_DIE]		= "Okay :(",
	[SOCKCMD_REPLY_BYE]		= "Bye.",
	[SOCKCMD_REPLY_VERSION]		= "clsync v%u.%u%s",
	[SOCKCMD_REPLY_INFO]		= "config_block == \"%s\"; label == \"%s\"; flags == %x; flags_set == %x.",
redmine authored
95
	[SOCKCMD_REPLY_SET]		= "Set",
redmine authored
96
	[SOCKCMD_REPLY_DUMP]		= "Ready",
redmine authored
97 98
	[SOCKCMD_REPLY_UNKNOWNCMD]	= "Unknown command.",
	[SOCKCMD_REPLY_INVALIDCMDID]	= "Invalid command id. Required: 0 <= cmd_id < 1000.",
redmine authored
99 100 101
	[SOCKCMD_REPLY_EEXIST]		= "File exists: \"%s\".",
	[SOCKCMD_REPLY_EPERM]		= "Permission denied: \"%s\".",
	[SOCKCMD_REPLY_ECUSTOM]		= "%s(%s): Error #%u: \"%s\".",
redmine authored
102 103 104
};

int socket_check_bysock(int sock) {
redmine authored
105

redmine authored
106 107
	int error_code, ret;
	socklen_t error_code_len = sizeof(error_code);
redmine authored
108

redmine authored
109
	if ((ret=getsockopt(sock, SOL_SOCKET, SO_ERROR, &error_code, &error_code_len))) {
redmine authored
110 111
		return errno;
	}
redmine authored
112
	if (error_code) {
redmine authored
113 114 115
		errno = error_code;
		return error_code;
	}
redmine authored
116

redmine authored
117 118
	return 0;
}
redmine authored
119

redmine authored
120 121
static inline int socket_check(clsyncsock_t *clsyncsock_p) {
	return socket_check_bysock(clsyncsock_p->sock);
redmine authored
122 123
}

redmine authored
124 125
clsyncsock_t *socket_new(int clsyncsock_sock) {
	clsyncsock_t *clsyncsock_p = xmalloc(sizeof(*clsyncsock_p));
redmine authored
126
	
redmine authored
127
	debug(2, "sock == %i.", clsyncsock_sock);
redmine authored
128

redmine authored
129
	clsyncsock_p->sock    = clsyncsock_sock;
redmine authored
130
	
redmine authored
131 132
	clsyncsock_p->prot    = SOCKET_DEFAULT_PROT;
	clsyncsock_p->subprot = SOCKET_DEFAULT_SUBPROT;
redmine authored
133

redmine authored
134
	return clsyncsock_p;
redmine authored
135 136
}

redmine authored
137 138
int socket_cleanup(clsyncsock_t *clsyncsock_p) {
	int clsyncsock_sock = clsyncsock_p->sock;
redmine authored
139

redmine authored
140
	debug(2, "sock == %i.", clsyncsock_sock);
redmine authored
141

redmine authored
142 143
	recv_ptrs[clsyncsock_sock] = NULL;
	recv_stps[clsyncsock_sock] = NULL;
redmine authored
144 145


redmine authored
146
	free(clsyncsock_p);
redmine authored
147 148
	return 0;
}
redmine authored
149

redmine authored
150 151 152 153 154 155
int socket_close(clsyncsock_t *clsyncsock_p) {
	close(clsyncsock_p->sock);

	return socket_cleanup(clsyncsock_p);
}

redmine authored
156
int socket_thread_delete(socket_sockthreaddata_t *threaddata_p) {
redmine authored
157
	int thread_id;
redmine authored
158

redmine authored
159
	pthread_mutex_lock(&socket_thread_mutex);
redmine authored
160

redmine authored
161
	thread_id = threaddata_p->id;
redmine authored
162

redmine authored
163
	socket_close(threaddata_p->clsyncsock_p);
redmine authored
164

redmine authored
165
	clsyncsockthreads_count--;
redmine authored
166

redmine authored
167
	if (clsyncsockthreads_last == thread_id)
redmine authored
168
		clsyncsockthreads_last = thread_id-1;
redmine authored
169

redmine authored
170
	clsyncsockthread_busy[thread_id]=0;
redmine authored
171 172 173 174 175 176 177

	threaddata_p->state = CLSTATE_DIED;

	if (threaddata_p->freefunct_arg != NULL)
		threaddata_p->freefunct_arg(threaddata_p->arg);

	pthread_mutex_unlock(&socket_thread_mutex);
redmine authored
178 179 180
	return 0;
}

redmine authored
181
clsyncsock_t *socket_accept(int sock) {
redmine authored
182 183 184 185
	// Cleaning up after died connections (getting free space for new connection)
	socket_gc();

	// Getting new connection
redmine authored
186 187
	int clsyncsock_sock = accept(sock, NULL, NULL);
	if(clsyncsock_sock == -1) {
redmine authored
188
		error("socket_accept(%i): Cannot accept()", sock);
redmine authored
189 190 191
		return NULL;
	}

redmine authored
192
	return socket_new(clsyncsock_sock);
redmine authored
193 194
}

redmine authored
195
clsyncsock_t *socket_listen_unix(const char *const socket_path) {
redmine authored
196
	// creating a simple unix socket
redmine authored
197 198
	int s;
	s = socket(AF_UNIX, SOCK_STREAM, 0);
redmine authored
199 200

	// checking the path
redmine authored
201
	// already exists? - unlink
redmine authored
202 203
	if (!access(socket_path, F_OK))
		if (unlink(socket_path)) {
redmine authored
204
			error("Cannot unlink() \"%s\".", 
redmine authored
205
				socket_path);
redmine authored
206 207 208
			close(s);
			return NULL;
		}
redmine authored
209 210

	// binding
redmine authored
211
	{
redmine authored
212 213 214 215
		struct sockaddr_un addr;
		memset(&addr, 0, sizeof(addr));
		addr.sun_family = AF_UNIX;
		strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path)-1);
redmine authored
216
		if (bind(s, (struct sockaddr *)&addr, sizeof(addr))) {
redmine authored
217
			error("Cannot bind() on address \"%s\".",
redmine authored
218
				socket_path);
redmine authored
219 220
			close(s);
			return NULL;
redmine authored
221 222 223 224
		}
	}

	// starting to listening
redmine authored
225
	if (listen(s, SOCKET_BACKLOG)) {
redmine authored
226
		error("Cannot listen() on address \"%s\".",
redmine authored
227
			socket_path);
redmine authored
228 229
		close(s);
		return NULL;
redmine authored
230 231
	}

redmine authored
232
	return socket_new(s);
redmine authored
233 234
}

redmine authored
235
#ifdef SOCKET_PROVIDER_LIBCLSYNC
redmine authored
236
clsyncsock_t *socket_connect_unix(const char *const socket_path) {
redmine authored
237 238 239 240 241 242 243 244
	// creating a simple unix socket
	int s;

	s = socket(AF_UNIX, SOCK_STREAM, 0);
	if (s == -1)
		return NULL;

	// checking the path
redmine authored
245
	if (access(socket_path, F_OK)) {
redmine authored
246
		error("Cannot access() to \"%s\".", 
redmine authored
247
			socket_path);
redmine authored
248 249 250 251 252 253 254 255 256 257
		close(s);
		return NULL;
	}

	// connecting
	{
		struct sockaddr_un addr;
		memset(&addr, 0, sizeof(addr));
		addr.sun_family = AF_UNIX;
		strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path)-1);
redmine authored
258
		if (connect(s, (struct sockaddr *)&addr, sizeof(addr))) {
redmine authored
259
			error("Cannot connect() to address \"%s\".",
redmine authored
260
				socket_path);
redmine authored
261 262 263 264 265 266
			close(s);
			return NULL;
		}
	}

	return socket_new(s);
redmine authored
267 268 269
}
#endif

redmine authored
270 271
int _socket_send(clsyncsock_t *clsyncsock, uint64_t *cmd_num_p, sockcmd_id_t cmd_id, va_list ap)
{
redmine authored
272
	int ret;
redmine authored
273

redmine authored
274
	char prebuf0[SOCKET_BUFSIZ], prebuf1[SOCKET_BUFSIZ], sendbuf[SOCKET_BUFSIZ];
redmine authored
275

redmine authored
276
	ret = 0;
redmine authored
277

redmine authored
278
	switch (clsyncsock->prot) {
redmine authored
279
		case 0:
redmine authored
280
			switch (clsyncsock->subprot) {
redmine authored
281 282 283
				case SUBPROT0_TEXT: {
					va_list ap_copy;

redmine authored
284
					debug(3, "%p %p %p", prebuf0, textmessage_args[cmd_id], ap_copy);
redmine authored
285

redmine authored
286
					if (textmessage_args[cmd_id]) {
redmine authored
287 288 289 290 291 292 293
						va_copy(ap_copy, ap);
						vsprintf(prebuf0, textmessage_args[cmd_id], ap_copy);
					} else
						*prebuf0 = 0;

					va_copy(ap_copy, ap);
					vsprintf(prebuf1, textmessage_descr[cmd_id], ap);
redmine authored
294

redmine authored
295 296
					size_t sendlen = sprintf(
							sendbuf, 
redmine authored
297
							"%lu %u %s :%s\n", 
redmine authored
298 299 300
							(*cmd_num_p)++,
							cmd_id, prebuf0, prebuf1
						);
redmine authored
301

redmine authored
302
					debug(5, "send(): \"%s\"", sendbuf);
redmine authored
303
					send(clsyncsock->sock, sendbuf, sendlen, 0);
redmine authored
304
					break;
redmine authored
305 306 307
				}
/*				case SUBPROT0_BINARY:
					break;*/
redmine authored
308
				default:
redmine authored
309
					error("Unknown subprotocol with id %u.", clsyncsock->subprot);
redmine authored
310 311 312 313 314
					ret = EINVAL;
					goto l_socket_send_end;
			}
			break;
		default:
redmine authored
315
			error("Unknown protocol with id %u.", clsyncsock->prot);
redmine authored
316 317 318 319 320
			ret = EINVAL;
			goto l_socket_send_end;
	}

l_socket_send_end:
redmine authored
321 322 323 324 325 326 327 328 329 330 331
	return ret;
}

int socket_reply(clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p, sockcmd_id_t cmd_id, ...)
{
	va_list ap;
	int ret;
	uint64_t cmd_num = sockcmd_p->cmd_num;

	va_start(ap, cmd_id);
	ret = _socket_send(clsyncsock_p, &cmd_num, cmd_id, ap);
redmine authored
332 333 334 335
	va_end(ap);
	return ret;
}

redmine authored
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
int socket_send(clsyncsock_t *clsyncsock_p, sockcmd_id_t cmd_id, ...)
{
	va_list ap;
	int ret;

	va_start(ap, cmd_id);
	ret = _socket_send(clsyncsock_p, &clsyncsock_p->cmd_num, cmd_id, ap);
	va_end(ap);
	return ret;
}

int socket_send_cb(clsyncsock_t *clsyncsock_p, sockcmd_id_t cmd_id, clsyncsock_cb_funct_t cb, void *cb_arg, ...)
{
	if (clsyncsock_p->cbqueue_len >= CLSYNCSOCK_WINDOW) {
		errno = EOVERFLOW;
		error("Callback queue overflowed. Closing the socket.");
		socket_close(clsyncsock_p);
		return errno;
	}

	{
		va_list ap;
		int ret;
		uint64_t cmd_num = clsyncsock_p->cmd_num;

		va_start(ap, cb_arg);
		ret = _socket_send(clsyncsock_p, &clsyncsock_p->cmd_num, cmd_id, ap);
		va_end(ap);

		if (!ret) {
			clsynccbqueue_t *cbq = &clsyncsock_p->cbqueue[clsyncsock_p->cbqueue_len];
			int id;

			cbq->cmd_num		= cmd_num;
			cbq->callback_funct	= cb;
			cbq->callback_arg	= cb_arg;

			id  =  cmd_num % (2*CLSYNCSOCK_WINDOW);
			while (clsyncsock_p->cbqueue_cache[id] != NULL) id++;
			clsyncsock_p->cbqueue_cache[id] = cbq;

			clsyncsock_p->cbqueue_len++;
		}

		return ret;
	}
}

redmine authored
384
static inline int socket_overflow_fix(char *buf, char **data_start_p, char **data_end_p) {
redmine authored
385
	debug(3, "buf==%p; data_start==%p; data_end==%p", buf, *data_start_p, *data_end_p);
redmine authored
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
	if(buf == *data_start_p)
		return 0;

	size_t ptr_diff = *data_start_p - buf;

	if(*data_start_p != *data_end_p) {
		*data_start_p = buf;
		*data_end_p   = buf;
		return ptr_diff;
	}

	size_t data_length = *data_end_p - *data_start_p;

	memmove(buf, *data_start_p, data_length);
	*data_start_p =  buf;
	*data_end_p   = &buf[data_length];

	return ptr_diff;
}

redmine authored
406 407 408
#define PARSE_TEXT_DATA_SSCANF(dat_t, ...) {\
	sockcmd_p->data = xmalloc(sizeof(dat_t));\
	dat_t *d = (dat_t *)sockcmd_p->data;\
redmine authored
409
	if (sscanf(args, textmessage_args[sockcmd_p->cmd_id], __VA_ARGS__) < min_args)\
redmine authored
410 411 412 413
		return EINVAL;\
}

static inline int parse_text_data(sockcmd_t *sockcmd_p, char *args, size_t args_len) {
redmine authored
414 415
	debug(6, "(%p, %p, %u)", sockcmd_p, args, args_len);

redmine authored
416
	if (!args_len)
redmine authored
417 418 419 420 421
		return 0;

	int min_args = 0;
	const char *ptr = (const char *)textmessage_args[sockcmd_p->cmd_id];

redmine authored
422
	if (ptr != NULL) {
redmine authored
423 424 425 426 427 428 429 430 431 432 433
		while(*ptr) {
			if(*ptr == '%') {
				if(ptr[1] == '%')
					ptr++;
				else
					min_args++;
			}
			ptr++;
		}
	}

redmine authored
434
	switch (sockcmd_p->cmd_id) {
redmine authored
435 436 437 438
		case SOCKCMD_REQUEST_NEGOTIATION:
		case SOCKCMD_REPLY_NEGOTIATION:
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_negotiation_t, &d->prot, &d->subprot);
			break;
redmine authored
439 440 441
		case SOCKCMD_REQUEST_DUMP:
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_dump_t, &d->dir_path);
			break;
redmine authored
442 443 444
		case SOCKCMD_REQUEST_SET:
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_set_t, &d->key, &d->value);
			break;
redmine authored
445 446 447 448 449 450 451
		case SOCKCMD_REPLY_ACK:
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_ack_t, &d->cmd_id, &d->cmd_num);
			break;
		case SOCKCMD_REPLY_EINVAL:
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_einval_t, &d->cmd_id, &d->cmd_num);
			break;
		case SOCKCMD_REPLY_VERSION:
redmine authored
452
			if (args_len > sizeof(1<<8))
redmine authored
453 454 455 456
				args[args_len=1<<8] = 0;
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_version_t, &d->major, &d->minor, &d->revision);
			break;
		case SOCKCMD_REPLY_INFO:
redmine authored
457
			if (args_len > sizeof(1<<8))
redmine authored
458 459 460 461 462 463 464 465 466
				args[args_len=1<<8] = 0;
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_info_t, &d->config_block, &d->label, &d->flags, &d->flags_set);
			break;
		case SOCKCMD_REPLY_UNKNOWNCMD:
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_unknowncmd_t, &d->cmd_id, &d->cmd_num);
			break;
		case SOCKCMD_REPLY_INVALIDCMDID:
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_invalidcmd_t, &d->cmd_num);
			break;
redmine authored
467 468 469 470 471 472
		case SOCKCMD_REPLY_EEXIST:
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_eexist_t, &d->file_path);
			break;
		case SOCKCMD_REPLY_EPERM:
			PARSE_TEXT_DATA_SSCANF(sockcmd_dat_eperm_t,  &d->descr);
			break;
redmine authored
473 474 475
		default:
			sockcmd_p->data = xmalloc(args_len+1);
			memcpy(sockcmd_p->data, args, args_len);
redmine authored
476
			((char *)sockcmd_p->data)[args_len] = 0;
redmine authored
477 478 479 480 481 482
			break;
	}

	return 0;
}

redmine authored
483
int socket_recv(clsyncsock_t *clsyncsock, sockcmd_t *sockcmd_p) {
redmine authored
484
	static char bufs[SOCKET_MAX][SOCKET_BUFSIZ];
redmine authored
485
	char *buf, *ptr, *start, *end;
redmine authored
486
	int clsyncsock_sock;
redmine authored
487
	size_t filled_length, rest_length, recv_length, filled_length_new;
redmine authored
488
	errno = 0;
redmine authored
489

redmine authored
490
	clsyncsock_sock = clsyncsock->sock;
redmine authored
491

redmine authored
492
	buf = bufs[clsyncsock_sock];
redmine authored
493

redmine authored
494
	start =  recv_stps[clsyncsock_sock];
redmine authored
495 496
	start = (start==NULL ? buf : start);

redmine authored
497
	ptr   =  recv_ptrs[clsyncsock_sock];
redmine authored
498 499
	ptr   = (ptr==NULL   ? buf : ptr);

redmine authored
500
	debug(3, "buf==%p; start==%p; ptr==%p", buf, start, ptr);
redmine authored
501

redmine authored
502
	while (1) {
redmine authored
503 504 505
		filled_length = ptr-buf;
		rest_length = SOCKET_BUFSIZ-filled_length-16;

redmine authored
506
		if (rest_length <= 0) {
redmine authored
507
			if(!socket_overflow_fix(buf, &start, &ptr)) {
redmine authored
508
				debug(1, "Got too big message. Ignoring.");
redmine authored
509
				ptr = buf;
redmine authored
510 511 512 513
			}
			continue;
		}

redmine authored
514
		recv_length = recv(clsyncsock_sock, ptr, rest_length, 0);
redmine authored
515 516
		filled_length_new = filled_length + recv_length;

redmine authored
517 518 519 520 521 522
		debug(5, "recv_length == %u; filled_length_new == %u", recv_length, filled_length_new);

		if (recv_length == 0)
			return ECONNRESET;

		if (recv_length < 0)
redmine authored
523 524
			return errno;

redmine authored
525
		switch (clsyncsock->prot) {
redmine authored
526 527 528
			case 0: {
				// Checking if binary
				uint16_t cmd_id_binary = *(uint16_t *)buf;
redmine authored
529
				clsyncsock->subprot = (
redmine authored
530 531 532 533
								cmd_id_binary == SOCKCMD_REQUEST_NEGOTIATION ||
								cmd_id_binary == SOCKCMD_REPLY_NEGOTIATION
							) 
							? SUBPROT0_BINARY : SUBPROT0_TEXT;
redmine authored
534 535

				// Processing
redmine authored
536
				switch (clsyncsock->subprot) {
redmine authored
537
					case SUBPROT0_TEXT:
redmine authored
538
						if ((end=strchr(ptr, '\n')) != NULL) {
redmine authored
539 540 541 542
							if (sscanf(start, "%lu %u", &sockcmd_p->cmd_num, (unsigned int *)&sockcmd_p->cmd_id) != 2) {
								*end = 0;
								error("It's expected to parse \"%%lu %%u\" from \"%s\"", start);
								*end = '\n';
redmine authored
543
								return errno = ENOMSG;
redmine authored
544
							}
redmine authored
545

redmine authored
546 547 548 549 550 551 552 553 554 555
							char *str_args = start;

							// Skipping the first number
							while (*str_args >= '0' && *str_args <= '9') str_args++;
							// Skipping the space
							str_args++;
							// Skipping the second number
							while (*str_args >= '0' && *str_args <= '9') str_args++;
							// Skipping the space
							str_args++;
redmine authored
556

redmine authored
557
							// Parsing the arguments
redmine authored
558 559
							if (end > str_args)
								parse_text_data(sockcmd_p, str_args, end-str_args);
redmine authored
560

redmine authored
561 562 563 564 565 566
							// TODO Process message here

							goto l_socket_recv_end;
						}
						break;
					default:
redmine authored
567
						return errno = ENOPROTOOPT;
redmine authored
568 569 570 571
				}
				break;
			}
			default:
redmine authored
572
				return errno = ENOPROTOOPT;
redmine authored
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
		}

		
	}

l_socket_recv_end:
	
	//       ----------------------------------
	//       buf    ptr    end    filled
	// cut:  ---------------
	//                    start    ptr
	//                     new     new

	start = &end[1];
	ptr   = &buf[filled_length_new];

	// No data buffered. Reset "start" and "ptr".

redmine authored
591
	if (start == ptr) {
redmine authored
592 593 594 595 596 597
		start = buf;
		ptr   = buf;
	}

	// Remembering the values

redmine authored
598 599
	recv_stps[clsyncsock_sock] = start;
	recv_ptrs[clsyncsock_sock] = ptr;
redmine authored
600

redmine authored
601 602
	debug(3, "sockcmd_p->cmd_num == %lu; sockcmd_p->cmd_id == %i; buf==%p; ptr==%p; end==%p, filled=%p, buf_end==%p",
		sockcmd_p->cmd_num, sockcmd_p->cmd_id, buf, ptr, end, &buf[filled_length_new], &buf[SOCKET_BUFSIZ]);
redmine authored
603 604 605 606

	return 0;
}

redmine authored
607
int socket_sendinvalid(clsyncsock_t *clsyncsock_p, sockcmd_t *sockcmd_p) {
redmine authored
608
	if (sockcmd_p->cmd_id >= 1000)
redmine authored
609
		return socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_INVALIDCMDID, sockcmd_p->cmd_num);
redmine authored
610
	else
redmine authored
611
		return socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_UNKNOWNCMD,   sockcmd_p->cmd_id, sockcmd_p->cmd_num);
redmine authored
612 613
}

redmine authored
614
int socket_procclsyncsock(socket_sockthreaddata_t *arg) {
redmine authored
615 616 617
	char		_sockcmd_buf[SOCKET_BUFSIZ]={0};

	sockcmd_t		*sockcmd_p    = (sockcmd_t *)_sockcmd_buf;
redmine authored
618

redmine authored
619 620
	clsyncsock_t		*clsyncsock_p = arg->clsyncsock_p;
	clsyncsock_procfunct_t   procfunct    = arg->procfunct;
redmine authored
621
	//sockprocflags_t		 flags        = arg->flags;
redmine authored
622 623 624 625 626 627 628

	enum auth_flags {
		AUTHFLAG_ENTERED_LOGIN = 0x01,
	};
	typedef enum auth_flags auth_flags_t;
	auth_flags_t	 auth_flags = 0;

redmine authored
629
	debug(3, "Started new thread for new connection.");
redmine authored
630

redmine authored
631
	arg->state = (arg->authtype == SOCKAUTH_NULL) ? CLSTATE_MAIN : CLSTATE_AUTH;
redmine authored
632
	socket_send(clsyncsock_p, SOCKCMD_REQUEST_NEGOTIATION, clsyncsock_p->prot, clsyncsock_p->subprot);
redmine authored
633

redmine authored
634
	while ((arg->running && *arg->running) && (arg->state==CLSTATE_AUTH || arg->state==CLSTATE_MAIN)) {
redmine authored
635
		debug(3, "Iteration.");
redmine authored
636 637 638

		// Receiving message
		int ret;
redmine authored
639
		if ((ret = socket_recv(clsyncsock_p, sockcmd_p))) {
redmine authored
640
			debug(2, "Got error while receiving a message from clsyncsock with sock %u. Ending the thread.", 
redmine authored
641
				arg->clsyncsock_p->sock);
redmine authored
642 643 644
			break;
		}

redmine authored
645 646 647 648 649 650 651 652 653 654
		// Checking for a callback for this answer
		{
			uint64_t cmd_num = sockcmd_p->cmd_num;
			int i;

			i  =  cmd_num % (2*CLSYNCSOCK_WINDOW);

			while (clsyncsock_p->cbqueue_cache[i] != NULL) {
				if (clsyncsock_p->cbqueue_cache[i]->cmd_num == cmd_num) { // Found!
					clsynccbqueue_t *cbq;
redmine authored
655
					cbq = clsyncsock_p->cbqueue_cache[i];
redmine authored
656 657 658 659 660 661 662 663 664 665 666 667

					// Calling the callback function
					cbq->callback_funct(arg, sockcmd_p, cbq->callback_arg);

					// Removing from queue
					memcpy(cbq, &clsyncsock_p->cbqueue[--clsyncsock_p->cbqueue_len], sizeof(*cbq));
					clsyncsock_p->cbqueue_cache[i] = NULL;
				}
				i++;
			}
		}

redmine authored
668
		// Processing the message
redmine authored
669 670
		if (procfunct(arg, sockcmd_p))
			switch (sockcmd_p->cmd_id) {
redmine authored
671 672 673
				case SOCKCMD_REPLY_NEGOTIATION:
				case SOCKCMD_REQUEST_NEGOTIATION: {
					sockcmd_dat_negotiation_t *data = (sockcmd_dat_negotiation_t *)sockcmd_p->data;
redmine authored
674
					switch (data->prot) {
redmine authored
675
						case 0:
redmine authored
676
							switch (data->subprot) {
redmine authored
677 678 679
								case SUBPROT0_TEXT:
								case SUBPROT0_BINARY:
									clsyncsock_p->subprot = data->subprot;
redmine authored
680 681
									if (sockcmd_p->cmd_id == SOCKCMD_REQUEST_NEGOTIATION)
										socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_NEGOTIATION, data->prot, data->subprot);
redmine authored
682
									else {
redmine authored
683
										socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_ACK,    sockcmd_p->cmd_id, sockcmd_p->cmd_num);
redmine authored
684
										debug(1, "Negotiated proto: %u %u", data->prot, data->subprot);
redmine authored
685 686 687
									}
									break;
								default:
redmine authored
688
									socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_EINVAL, sockcmd_p->cmd_id, sockcmd_p->cmd_num, "Incorrect subprotocol id");
redmine authored
689 690 691
							}
							break;
						default:
redmine authored
692
							socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_EINVAL, sockcmd_p->cmd_id, sockcmd_p->cmd_num, "Incorrect protocol id");
redmine authored
693 694
					}
					break;
redmine authored
695
				}
redmine authored
696
				case SOCKCMD_REQUEST_VERSION: {
redmine authored
697
					socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_VERSION, VERSION_MAJ, VERSION_MIN, REVISION);
redmine authored
698 699 700
					break;
				}
				case SOCKCMD_REQUEST_QUIT: {
redmine authored
701
					socket_reply(clsyncsock_p, sockcmd_p, SOCKCMD_REPLY_BYE);
redmine authored
702 703 704 705
					arg->state = CLSTATE_DYING;
					break;
				}
				default:
redmine authored
706
					socket_sendinvalid(clsyncsock_p, sockcmd_p);
redmine authored
707 708
					break;
			}
redmine authored
709

redmine authored
710
		if (sockcmd_p->data != NULL) {
redmine authored
711
			free(sockcmd_p->data);
redmine authored
712 713
			sockcmd_p->data = NULL;
		}
redmine authored
714

redmine authored
715
		// Check if the socket is still alive
redmine authored
716
		if (socket_check(clsyncsock_p)) {
redmine authored
717
			debug(1, "clsyncsock socket error: %s", strerror(errno));
redmine authored
718 719
			break;
		}
redmine authored
720

redmine authored
721
		// Sending prompt
redmine authored
722
		switch (arg->state) {
redmine authored
723
			case CLSTATE_AUTH:
redmine authored
724
				if (!(auth_flags&AUTHFLAG_ENTERED_LOGIN))
redmine authored
725
					socket_send(clsyncsock_p, SOCKCMD_REQUEST_LOGIN);
redmine authored
726 727 728
				break;
			default:
				break;
redmine authored
729
		}
redmine authored
730 731
	}

redmine authored
732
	debug(3, "Ending a connection thread.");
redmine authored
733

redmine authored
734 735 736 737 738
	socket_thread_delete(arg);

	return 0;
}

redmine authored
739
socket_sockthreaddata_t *socket_thread_new() {
redmine authored
740
	pthread_mutex_lock(&socket_thread_mutex);
redmine authored
741
	socket_sockthreaddata_t *threaddata_p = &sockthreaddata[clsyncsockthreads_num];
redmine authored
742

redmine authored
743
	if (clsyncsockthreads_num >= SOCKET_MAX) {
redmine authored
744
		error("Warning: socket_thread_new(): Too many connection threads.");
redmine authored
745 746 747 748 749
		errno = EUSERS;
		pthread_mutex_unlock(&socket_thread_mutex);
		return NULL;
	}

redmine authored
750
	threaddata_p->id = clsyncsockthreads_num;
redmine authored
751

redmine authored
752
	clsyncsockthread_busy[clsyncsockthreads_num]=1;
redmine authored
753
	// TODO: SECURITY: Possible DoS-attack on huge "SOCKET_MAX" value. Fix it.
redmine authored
754
	while (clsyncsockthread_busy[++clsyncsockthreads_num]);
redmine authored
755 756 757 758

#ifdef PARANOID
	// Processing the events: checking if previous check were been made right

redmine authored
759
	if (threaddata_p->state != CLSTATE_NONE) {
redmine authored
760
		// This's not supposed to be
redmine authored
761
		error("Internal-Error: socket_newconnarg(): connproc_arg->state != CLSTATE_NONE");
redmine authored
762
		pthread_mutex_unlock(&socket_thread_mutex);
redmine authored
763
		errno = EILSEQ;
redmine authored
764 765 766 767 768 769
		return NULL;
	}
#endif

	// Processing the events: creating a thread for new connection

redmine authored
770
	debug(3, "clsyncsockthreads_count == %u;\tclsyncsockthreads_last == %u;\tclsyncsockthreads_num == %u", 
redmine authored
771
		clsyncsockthreads_count, clsyncsockthreads_last, clsyncsockthreads_num);
redmine authored
772

redmine authored
773
	clsyncsockthreads_last = MAX(clsyncsockthreads_last, clsyncsockthreads_num);
redmine authored
774

redmine authored
775
	clsyncsockthreads_count++;
redmine authored
776 777 778 779
	pthread_mutex_unlock(&socket_thread_mutex);
	return threaddata_p;
}

redmine authored
780
socket_sockthreaddata_t *socket_thread_attach(clsyncsock_t *clsyncsock_p) {
redmine authored
781

redmine authored
782
	socket_sockthreaddata_t *threaddata_p = socket_thread_new();
redmine authored
783 784 785 786

	if (threaddata_p == NULL)
		return NULL;

redmine authored
787
	threaddata_p->clsyncsock_p	= clsyncsock_p;
redmine authored
788 789 790 791

	return threaddata_p;
}

redmine authored
792 793
int socket_thread_start(socket_sockthreaddata_t *threaddata_p) {
	if(pthread_create(&threaddata_p->thread, NULL, (void *(*)(void *))socket_procclsyncsock, threaddata_p)) {
redmine authored
794
		error("Cannot create a thread for connection");
redmine authored
795 796
		return errno;
	}
redmine authored
797

redmine authored
798 799 800
	return 0;
}

redmine authored
801
int socket_init() {
redmine authored
802 803 804
	return 0;
}

redmine authored
805
int socket_deinit() {
redmine authored
806
	return 0;
redmine authored
807 808
}

redmine authored
809