redmine

cluster support continued

... ... @@ -71,18 +71,23 @@ window_t window = {0};
static inline int clustercmd_window_add(clustercmd_t *clustercmd_p) {
/*
struct window_occupied_sides {
size_t left;
size_t right;
};
typedef struct window_occupied_sides window_occupied_sides_t;
struct window {
unsigned int size;
unsigned int packets;
unsigned int last;
unsigned int idstack_len;
unsigned int *idstack;
char *idstacked;
unsigned int *busy_id;
size_t *busy_sides[WINDOW_SIDES];
GHashTable *serial2waitack_ht;
size_t buf_size;
char *buf;
unsigned int size; // Allocated cells
unsigned int packets; // Packets are waiting for ACK-s
unsigned int idstack_len; // Count of stacked cells for future use
unsigned int *idstack; // Stack of cells for future use
char *idstacked; // Assoc-array of cells' id-s (aka window_id) to determine is the cell stacked
unsigned int occupied_len; // Count of cells with packets and stacked cells for future use
unsigned int *occupied_id; // Array of cells' id-s with packets and stacked cells for future use
window_occupied_sides_t *occupied_sides; // Array of structures with coordinates in buffer of occupied space by cell ida (aka window_id)
GHashTable *serial2waitack_ht; // Hash-table: clustercmd.h.serial -> clustercmdwaitack
size_t buf_size; // Allocated space of the buffer
char *buf; // Pointer to the buffer
};
struct clustercmdwaitack {
unsigned int window_id;
... ... @@ -99,10 +104,10 @@ static inline int clustercmd_window_add(clustercmd_t *clustercmd_p) {
# define CXREALLOC(a, size) \
(typeof(a))xrealloc((char *)(a), (size_t)(size) * sizeof(*(a)))
window.idstack = CXREALLOC(window.idstack, window.size);
window.idstacked = CXREALLOC(window.idstacked, window.size);
window.busy_id = CXREALLOC(window.busy_id, window.size);
window.busy_sides = CXREALLOC(window.busy_sides, window.size);
window.idstack = CXREALLOC(window.idstack, window.size);
window.idstacked = CXREALLOC(window.idstacked, window.size);
window.occupied_id = CXREALLOC(window.occupied_id, window.size);
window.occupied_sides = CXREALLOC(window.occupied_sides, window.size);
# undef CXREALLOC
}
... ... @@ -110,47 +115,52 @@ static inline int clustercmd_window_add(clustercmd_t *clustercmd_p) {
size_t required_space = sizeof(clustercmdwaitackhdr_t) + sizeof(clustercmdhdr_t) + clustercmd_p->h.data_len;
// Searching business boundaries in the window buffer
size_t busy_left = SIZE_MAX, busy_right=0;
size_t occupied_left = SIZE_MAX, occupied_right=0;
int i;
i = 0;
while(i < window.packets) {
unsigned int window_id;
window_id = window.busy_id[i];
window_id = window.occupied_id[i];
busy_left = MIN(busy_left, window.busy_sides[window_id].left);
busy_right = MAX(busy_right, window.busy_sides[window_id].right);
occupied_left = MIN(occupied_left, window.occupied_sides[window_id].left);
occupied_right = MAX(occupied_right, window.occupied_sides[window_id].right);
}
printf_ddd("Debug3: clustercmd_window_add(): w.size == %u, b_left == %u; b_right == %u; w.buf_size == %u; r_space == %u\n",
window.size, busy_left, busy_right, window.buf_size, required_space);
window.size, occupied_left, occupied_right, window.buf_size, required_space);
// Trying to find a space in the buffer to place message
size_t buf_coordinate = SIZE_MAX;
if(window.packets) {
size_t free_left = busy_left;
size_t free_right = window.buf_size - busy_right;
// Free space from left (start of buffer)
size_t free_left = occupied_left;
// Free space from right (end of buffer)
size_t free_right = window.buf_size - occupied_right;
if(free_left > required_space)
buf_coordinate = free_left - required_space;
else
if(free_right > required_space)
buf_coordinate = busy_right;
buf_coordinate = occupied_right;
else { // Not enough space in the window buffer;
window.buf_size += MAX(CLUSTER_WINDOW_BUFSIZE_PORTION, required_space);
window.buf = xrealloc(window.buf, window.buf_size);
buf_coordinate = busy_right;
buf_coordinate = occupied_right;
}
printf_ddd("Debug3: clustercmd_window_add(): f_left == %u; f_right == %u; b_coord == %u; w.buf_size == %u",
free_left, free_right, buf_coordinate, window.buf_size);
}
if(window.idstack_len) {
// Using stacked window_id
unsigned int window_id;
(void)window_id; // Anti-warning
window.busy_sides[window_id].left = buf_coordinate;
window.busy_sides[window_id].right = buf_coordinate + required_space;
window.occupied_sides[window_id].left = buf_coordinate;
window.occupied_sides[window_id].right = buf_coordinate + required_space;
} else {
// !!!
// If there's no stacked window_id-s, creating new
}
window.packets++;
... ... @@ -193,7 +203,7 @@ static inline int clustercmd_window_del(clustercmdwaitack_t *waitack_p) {
unsigned int idstack_len;
unsigned int *idstack;
char *idstacked;
size_t *busy[WINDOW_SIDES];
size_t *occupied[WINDOW_SIDES];
GHashTable *serial2waitack_ht;
size_t buf_size;
char *buf;
... ... @@ -211,10 +221,10 @@ static inline int clustercmd_window_del(clustercmdwaitack_t *waitack_p) {
g_hash_table_remove(window.serial2waitack_ht, GINT_TO_POINTER(waitack_p->cmd.h.serial));
window.packets--;
if(window_id == (window.used-1)) {
window.used--;
while(window.idstacked[window.used-1]) {
window.idstacked[--window.used] = 0;
if(window_id == (window.occupied_len-1)) {
window.occupied_len--;
while(window.idstacked[window.occupied_len-1]) {
window.idstacked[--window.occupied_len] = 0;
window.idstack_len--;
}
return 0;
... ... @@ -714,7 +724,7 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
// If there's no free id-s, then exit :(
if(node_id_my == NODEID_NOID) {
printf_e("Error: Cannot find free node ID. Seems, that all %i ID-s are already busy.\n");
printf_e("Error: Cannot find free node ID. Seems, that all %i ID-s are already occupied.\n");
return ENOMEM;
}
... ...
... ... @@ -141,24 +141,24 @@ struct clustercmdwaitack {
};
typedef struct clustercmdwaitack clustercmdwaitack_t;
struct window_busy_sides {
struct window_occupied_sides {
size_t left;
size_t right;
};
typedef struct window_busy_sides window_busy_sides_t;
typedef struct window_occupied_sides window_occupied_sides_t;
struct window {
unsigned int size;
unsigned int packets;
unsigned int used;
unsigned int idstack_len;
unsigned int *idstack;
char *idstacked;
unsigned int *busy_id;
window_busy_sides_t *busy_sides;
GHashTable *serial2waitack_ht;
size_t buf_size;
char *buf;
unsigned int size; // Allocated cells
unsigned int packets; // Packets are waiting for ACK-s
unsigned int idstack_len; // Count of stacked cells for future use
unsigned int *idstack; // Stack of cells for future use
char *idstacked; // Assoc-array of cells' id-s (aka window_id) to determine is the cell stacked
unsigned int occupied_len; // Count of cells with packets and stacked cells for future use
unsigned int *occupied_id; // Array of cells' id-s with packets and stacked cells for future use
window_occupied_sides_t *occupied_sides; // Array of structures with coordinates in buffer of occupied space by cell ida (aka window_id)
GHashTable *serial2waitack_ht; // Hash-table: clustercmd.h.serial -> clustercmdwaitack
size_t buf_size; // Allocated space of the buffer
char *buf; // Pointer to the buffer
};
typedef struct window window_t;
... ...