cluster.h
7.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
/*
clsync - file tree sync utility based on inotify
Copyright (C) 2013 Dmitry Yu Okunev <dyokunev@ut.mephi.ru> 0x8E30679C
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef CLUSTER_SUPPORT
// Macros for reading messages
#define CLUSTER_RESTDATALEN(clustercmd_p, data_type) \
((clustercmd_p)->h.data_len - sizeof(data_type) + sizeof(char *))
#define CLUSTER_LOOP_EXPECTCMD(clustercmd_p, clustercmd_id, ret) {\
/* Exit if error */ \
if(ret == -1) { \
error("CLUSTER_LOOP_EXPECTCMD()"); \
return errno; \
}\
\
/* Is that the command we are expecting? Skipping if not. */\
if(clustercmd_p->h.cmd_id != clustercmd_id)\
continue;\
}
// Macros for writing messages
// calculated required memory for clustercmd packet
#define CLUSTER_REQMEM(data_type, restdata_len) \
(sizeof(clustercmdhdr_t) + sizeof(data_type) + (restdata_len) + 2)
// calculated required memory for clustercmd packet with padding
#define CLUSTER_REQMEM_PADDED(data_type, restdata_len) \
CLUSTER_PAD(CLUSTER_REQMEM(data_type, restdata_len))
// allocated memory for clustercmd packet with padding
#define CLUSTER_ALLOC(data_type, restdata_len, alloc_funct)\
(clustercmd_t *)memset((alloc_funct)(CLUSTER_REQMEM_PADDED(data_type, restdata_len)), 0, CLUSTER_REQMEM_PADDED(data_type, restdata_len))
// allocated memory for clustercmd packet with padding with alloca()
#define CLUSTER_ALLOCA(data_type, restdata_len)\
CLUSTER_ALLOC(data_type, restdata_len, alloca)
// allocated memory for clustercmd packet with padding with xmalloc()
#define CLUSTER_MALLOC(data_type, restdata_len)\
CLUSTER_ALLOC(data_type, restdata_len, xmalloc)
// Common macros
#define CLUSTER_PAD(size) ((((size) + 3) >> 2) << 2)
#define CLUSTERCMD_SIZE(clustercmd_p) (sizeof(clustercmdhdr_t) + (*(clustercmd_p)).h.data_len)
#define CLUSTERCMD_SIZE_PADDED(clustercmd_p) (sizeof(clustercmdhdr_t) + CLUSTER_PAD((*(clustercmd_p)).h.data_len))
// Types
enum adler32_calc {
ADLER32_CALC_NONE = 0x00,
ADLER32_CALC_HEADER = 0x01,
ADLER32_CALC_DATA = 0x02,
ADLER32_CALC_ALL = 0x03,
};
typedef enum adler32_calc adler32_calc_t;
enum cluster_read_flags {
CLREAD_NONE = 0x00,
CLREAD_ALL = 0xff
};
typedef enum cluster_read_flags cluster_read_flags_t;
enum nodestatus {
NODESTATUS_DOESNTEXIST = 0,
NODESTATUS_OFFLINE,
NODESTATUS_SEEMSONLINE,
NODESTATUS_ONLINE,
NODESTATUS_BANNED
};
typedef enum nodestatus nodestatus_t;
enum nodeid {
NODEID_NOID = MAXNODES
};
typedef enum nodeid nodeid_t;
struct packets_stats {
uint64_t tot;
uint64_t rej;
};
typedef struct packets_stats packets_stats_t;
struct nodeinfo {
uint8_t id;
uint8_t num;
nodestatus_t status;
uint32_t updatets;
GHashTable *modtime_ht;
GHashTable *serial2queuedpacket_ht;
packets_stats_t packets_in;
packets_stats_t packets_out;
uint32_t last_serial;
char *node_name;
};
typedef struct nodeinfo nodeinfo_t;
enum clustercmd_id {
CLUSTERCMDID_PING = 0,
CLUSTERCMDID_ACK = 1,
CLUSTERCMDID_REG = 2,
CLUSTERCMDID_HELLO = 3,
CLUSTERCMDID_WELCOME = 4,
CLUSTERCMDID_DIE = 5,
CLUSTERCMDID_HT_EXCH = 6,
COUNT_CLUSTERCMDID
};
typedef enum clustercmd_id clustercmd_id_t;
struct clustercmd_hello {
char node_name[0];
};
typedef struct clustercmd_hello clustercmd_hello_t;
#define welcome_to_node_name_len(cmd_p) ((cmd_p)->h.data_len-(((clustercmd_welcome_t *)&(cmd_p)->data)->from_node_name_len)-sizeof(clustercmd_welcome_t))
#define welcome_to_node_name(cmddata_p) (&cmddata_p->from_node_name[cmddata_p->from_node_name_len])
struct clustercmd_welcome {
size_t from_node_name_len;
char from_node_name[0];
// to_node_name == my_node_name+my_node_name_len
};
typedef struct clustercmd_welcome clustercmd_welcome_t;
struct clustercmd_reg {
char node_name[0];
};
typedef struct clustercmd_reg clustercmd_reg_t;
struct clustercmd_ack {
uint32_t serial;
};
typedef struct clustercmd_ack clustercmd_ack_t;
enum reject_reason {
REJ_UNKNOWN = 0,
REJ_ADLER32MISMATCH,
};
typedef enum reject_reason reject_reason_t;
struct clustercmd_rej {
uint32_t serial;
uint8_t reason;
};
typedef struct clustercmd_rej clustercmd_rej_t;
struct clustercmd_ht_exch {
time_t ctime;
size_t path_length;
char path[0];
};
typedef struct clustercmd_ht_exch clustercmd_ht_exch_t;
struct clustercmdadler32 {
uint32_t hdr; // 32
uint32_t dat; // 64
};
typedef struct clustercmdadler32 clustercmdadler32_t;
struct clustercmdhdr { // bits
uint8_t dst_node_id; // 8
uint8_t src_node_id; // 16
uint8_t flags; // 24 (for future compatibility)
uint8_t cmd_id; // 32
clustercmdadler32_t adler32; // 96
uint32_t data_len; // 128
uint32_t ts; // 160
uint32_t serial; // 192
};
typedef struct clustercmdhdr clustercmdhdr_t;
typedef char clustercmd_die_t;
struct clustercmd {
clustercmdhdr_t h;
union data {
char p[0];
clustercmd_welcome_t welcome;
clustercmd_reg_t reg;
clustercmd_ack_t ack;
clustercmd_rej_t rej;
clustercmd_hello_t hello;
clustercmd_ht_exch_t ht_exch;
clustercmd_die_t die;
} data;
};
typedef struct clustercmd clustercmd_t;
struct clustercmdqueuedpackethdri {
char dummy; // anti-warning
};
typedef struct clustercmdqueuedpackethdri clustercmdqueuedpackethdri_t;
struct clustercmdqueuedpackethdro {
char ack_from[MAXNODES];
uint8_t ack_count;
};
typedef struct clustercmdqueuedpackethdro clustercmdqueuedpackethdro_t;
struct clustercmdqueuedpackethdr {
unsigned int window_id;
union w {
clustercmdqueuedpackethdri_t i;
clustercmdqueuedpackethdro_t o;
} w;
};
typedef struct clustercmdqueuedpackethdr clustercmdqueuedpackethdr_t;
struct clustercmdqueuedpacket {
clustercmdqueuedpackethdr_t h;
clustercmd_t cmd;
};
typedef struct clustercmdqueuedpacket clustercmdqueuedpacket_t;
struct window_occupied_sides {
size_t left;
size_t right;
};
typedef struct window_occupied_sides window_occupied_sides_t;
struct window {
unsigned int size; // Allocated cells
unsigned int packets_len; // Count of packets (are waiting for ACK-s)
unsigned int *packets_id; // Array of cells' id-s with packets
window_occupied_sides_t *occupied_sides; // Array of structures with coordinates in buffer of occupied space by cell ida (aka window_id)
size_t buf_size; // Allocated space of the buffer
char *buf; // Pointer to the buffer
};
typedef struct window window_t;
typedef int (*cluster_recvproc_funct_t)(clustercmd_t *clustercmd_p);
// Externs
extern int cluster_init(struct ctx *ctx_p, struct indexes *indexes_p);
extern int cluster_deinit();
extern int cluster_lock(const char *fpath);
extern int cluster_lock_byindexes();
extern int cluster_unlock_all();
extern int cluster_capture(const char *fpath);
extern int cluster_modtime_update(const char *dirpath, short int dirlevel, mode_t st_mode);
extern int cluster_initialsync();
#endif