redmine

cluster support continued

... ... @@ -5,27 +5,47 @@ broadcast: dst_node_id == NOID
session example of two nodes (A and B):
A appears
A appears (getting node_id):
A -> getmyid (serial: 0; src: NOID ; dst: NOID; name: A)
A -> register (serial: 1; src: 0 ; dst: NOID; name: A)
A -> getmyid (serial: 0; src: NOID ; dst: NOID; name: A) | cluster_init()
A -> register (serial: 1; src: 0 ; dst: NOID; name: A) |
Trying to sync with somebody:
A -> updtree (serial: 2; src: 0 ; dst: NOID; [A modtree])
B appears (getting node_id):
B -> getmyid (serial: 0; src: NOID ; dst: NOID; name: B) | cluster_init()
A -> setid (serial: 3; src: 0 ; dst: NOID; name: A; updatets: 100)
B -> register (serial: 1; src: 1 : dst: NOID; name: B) |
A -> ack (serial: 4; src: 0 ; dst: 1; ack_serial: 1)
Initial syncing A <-> B:
B -> updtree (serial: 2; src: 1 ; dst: NOID; [B modtree])
A -> updtree (serial: 5; src: 0 ; dst: NOID; [A modtree])
A -> lock (serial: 6: src: 0 ; dst: NOID; [paths list])
B -> ack (serial: 2; src: 1 ; dst: 0; ack_serial: 6)
B -> lock (serial: 3: src: 1 ; dst: NOID; [paths list])
A -> ack (serial: 7; src: 0 ; dst: 1; ack_serial: 3)
A -> unlockall (serial: 8: src: 0 ; dst: NOID)
B -> ack (serial: 4; src: 1 ; dst: 0; ack_serial: 8)
B -> unlockall (serial: 5: src: 1 ; dst: NOID)
A -> ack (serial: 9; src: 0 ; dst: 1; ack_serial: 5)
B appers
Just syncing A -> B:
B -> getmyid (serial: 0; src: NOID ; dst: NOID; name: B)
A -> setid (serial: 2; src: 0 ; dst: NOID; name: A; updatets: 100)
B -> register (serial: 1; src: 1 : dst: NOID; name: B)
A -> ack (serial: 3; src: 0 ; dst: 1; ack_serial: 1)
A -> lock (serial: 4: src: 0 ; dst: NOID; [paths list])
B -> ack (serial: 2; src: 1 ; dst: 0; ack_serial: 4)
A -> unlock_all (serial: 5: src: 0 ; dst: NOID)
B -> ack (serial: 3; src: 1 ; dst: 0; ack_serial: 5)
A -> lock (serial: 10: src: 0 ; dst: NOID; [paths list])
B -> ack (serial: 6; src: 1 ; dst: 0; ack_serial: 10)
A -> unlockall (serial: 11: src: 0 ; dst: NOID)
B -> ack (serial: 7; src: 1 ; dst: 0; ack_serial: 11)
A disappers (shutdown)
A appears
A appears (registering with old node_id)
A -> getmyid (serial: 0; src: NOID ; dst: NOID; name: A)
B -> setid (serial: 4; src: 1 ; dst: 0; name: A; updatets: 200)
B -> setid (serial: 4; src: 1 ; dst: 0; name: A; updatets: 200)
A -> register (serial: 1; src: 0 ; dst: NOID; name: A)
B -> ack (serial: 5; src: 1 ; dst: 0; ack_serial: 1)
... ...
This diff is collapsed. Click to expand it.
... ... @@ -38,23 +38,40 @@
// Macros for writing messages
// calculated required memory for clustercmd packet
#define CLUSTER_REQMEM(data_type, restdata_len) \
(sizeof(clustercmdhdr_t) + sizeof(data_type) + (restdata_len) + 2)
// calculated required memory for clustercmd packet with padding
#define CLUSTER_PREQMEM(data_type, restdata_len) \
((CLUSTER_REQMEM(data_type, restdata_len)+3)%4)
// allocated memory for clustercmd packet with padding
#define CLUSTER_ALLOC(data_type, restdata_len, alloc_funct)\
(clustercmd_t *)PARANOIDV(memset)((alloc_funct)(CLUSTER_REQMEM(data_type, restdata_len))PARANOIDV(, 0, CLUSTER_REQMEM(data_type, restdata_len)))
(clustercmd_t *)PARANOIDV(memset)((alloc_funct)(CLUSTER_PREQMEM(data_type, restdata_len))PARANOIDV(, 0, CLUSTER_PREQMEM(data_type, restdata_len)))
// allocated memory for clustercmd packet with padding with alloca()
#define CLUSTER_ALLOCA(data_type, restdata_len)\
CLUSTER_ALLOC(data_type, restdata_len, alloca)
// allocated memory for clustercmd packet with padding with xmalloc()
#define CLUSTER_MALLOC(data_type, restdata_len)\
CLUSTER_ALLOC(data_type, restdata_len, xmalloc)
// Common macros
#define CLUSTERCMD_SIZE(clustercmd) (sizeof(clustercmdhdr_t) + (clustercmd).h.data_len)
// Types
enum crc32_calc {
CRC32_CALC_NONE = 0x00,
CRC32_CALC_HEADER = 0x01,
CRC32_CALC_DATA = 0x02,
CRC32_CALC_ALL = 0x03,
};
typedef enum crc32_calc crc32_calc_t;
enum nodestatus {
NODESTATUS_DOESNTEXIST = 0,
NODESTATUS_OFFLINE,
... ... @@ -118,19 +135,33 @@ struct clustercmd_ack {
};
typedef struct clustercmd_ack clustercmd_ack_t;
struct clustercmd_ackrej {
enum reject_reason {
REJ_UNKNOWN = 0,
REJ_CRC32MISMATCH,
};
typedef enum reject_reason reject_reason_t;
struct clustercmd_rej {
uint32_t serial;
uint8_t reason;
};
typedef struct clustercmd_rej clustercmd_rej_t;
struct clustercmdcrc32 {
uint32_t hdr;
uint32_t dat;
};
typedef struct clustercmd_ackrej clustercmd_ackrej_t;
typedef struct clustercmdcrc32 clustercmdcrc32_t;
struct clustercmdhdr {
uint8_t dst_node_id;
uint8_t src_node_id;
uint8_t cmd_id;
uint32_t crc32;
uint32_t data_len;
uint32_t ts;
uint32_t serial;
struct clustercmdhdr { // bits
uint8_t dst_node_id; // 8
uint8_t src_node_id; // 16
uint8_t flags; // 24 (for future compatibility)
uint8_t cmd_id; // 32
clustercmdcrc32_t crc32; // 64
uint32_t data_len; // 96
uint32_t ts; // 128
uint32_t serial; // 160
};
typedef struct clustercmdhdr clustercmdhdr_t;
... ... @@ -141,24 +172,36 @@ struct clustercmd {
clustercmd_setiddata_t data_setid;
clustercmd_register_t data_register;
clustercmd_ack_t data_ack;
clustercmd_ackrej_t data_ackrej;
clustercmd_rej_t data_rej;
clustercmd_getmyid_t data_getmyid;
};
};
typedef struct clustercmd clustercmd_t;
struct clustercmdwaitackhdr {
unsigned int window_id;
struct clustercmdqueuedpackethdri {
};
typedef struct clustercmdqueuedpackethdri clustercmdqueuedpackethdri_t;
struct clustercmdqueuedpackethdro {
char ack_from[MAXNODES];
uint8_t ack_count;
};
typedef struct clustercmdwaitackhdr clustercmdwaitackhdr_t;
typedef struct clustercmdqueuedpackethdro clustercmdqueuedpackethdro_t;
struct clustercmdqueuedpackethdr {
unsigned int window_id;
union {
clustercmdqueuedpackethdri_t i;
clustercmdqueuedpackethdro_t o;
};
};
typedef struct clustercmdqueuedpackethdr clustercmdqueuedpackethdr_t;
struct clustercmdwaitack {
clustercmdwaitackhdr_t h;
clustercmd_t cmd;
struct clustercmdqueuedpacket {
clustercmdqueuedpackethdr_t h;
clustercmd_t cmd;
};
typedef struct clustercmdwaitack clustercmdwaitack_t;
typedef struct clustercmdqueuedpacket clustercmdqueuedpacket_t;
struct window_occupied_sides {
size_t left;
... ... @@ -171,7 +214,7 @@ struct window {
unsigned int packets_len; // Count of packets (are waiting for ACK-s)
unsigned int *packets_id; // Array of cells' id-s with packets
window_occupied_sides_t *occupied_sides; // Array of structures with coordinates in buffer of occupied space by cell ida (aka window_id)
GHashTable *serial2waitack_ht; // Hash-table: clustercmd.h.serial -> clustercmdwaitack
GHashTable *serial2queuedpacket_ht;// Hash-table: clustercmd.h.serial -> clustercmdqueuedpacket
size_t buf_size; // Allocated space of the buffer
char *buf; // Pointer to the buffer
};
... ...
... ... @@ -46,6 +46,7 @@
#define KILL_TIMEOUT 60
#define ALLOC_PORTION (1<<10) /* 1 KiX */
#define CLUSTER_WINDOW_BUFSIZE_PORTION (1<<20) /* 1 MiB */
#define ALLOC_PORTION (1<<10) /* 1 KiX */
#define CLUSTER_WINDOW_BUFSIZE_PORTION (1<<20) /* 1 MiB */
#define CLUSTER_PACKET_MAXSIZE (1<<24) /* 16 MiB */
... ...