redmine

cluster: window optimized

... ... @@ -70,57 +70,30 @@ window_t window = {0};
*/
static inline int clustercmd_window_add(clustercmd_t *clustercmd_p) {
/*
struct window_occupied_sides {
size_t left;
size_t right;
};
typedef struct window_occupied_sides window_occupied_sides_t;
struct window {
unsigned int size; // Allocated cells
unsigned int packets; // Packets are waiting for ACK-s
unsigned int idstack_len; // Count of stacked cells for future use
unsigned int *idstack; // Stack of cells for future use
char *idstacked; // Assoc-array of cells' id-s (aka window_id) to determine is the cell stacked
unsigned int occupied_len; // Count of cells with packets and stacked cells for future use
unsigned int *occupied_id; // Array of cells' id-s with packets and stacked cells for future use
window_occupied_sides_t *occupied_sides; // Array of structures with coordinates in buffer of occupied space by cell ida (aka window_id)
GHashTable *serial2waitack_ht; // Hash-table: clustercmd.h.serial -> clustercmdwaitack
size_t buf_size; // Allocated space of the buffer
char *buf; // Pointer to the buffer
};
struct clustercmdwaitack {
unsigned int window_id;
char ack_from[MAXNODES];
uint8_t ack_count;
clustercmd_t cmd;
};
*/
// Checking if there enough window cells allocated
if(window.packets >= window.size) {
if(window.packets_len >= window.size) {
window.size += ALLOC_PORTION;
# define CXREALLOC(a, size) \
(typeof(a))xrealloc((char *)(a), (size_t)(size) * sizeof(*(a)))
window.idstack = CXREALLOC(window.idstack, window.size);
window.idstacked = CXREALLOC(window.idstacked, window.size);
window.occupied_id = CXREALLOC(window.occupied_id, window.size);
window.packets_id = CXREALLOC(window.packets_id, window.size);
window.occupied_sides = CXREALLOC(window.occupied_sides, window.size);
# undef CXREALLOC
}
// Calculating required memory space in buffer for the message
size_t required_space = sizeof(clustercmdwaitackhdr_t) + sizeof(clustercmdhdr_t) + clustercmd_p->h.data_len;
size_t clustercmd_size = sizeof(clustercmdhdr_t) + clustercmd_p->h.data_len;
size_t required_space = sizeof(clustercmdwaitackhdr_t) + clustercmd_size;
// Searching business boundaries in the window buffer
// Searching occupied boundaries in the window buffer
size_t occupied_left = SIZE_MAX, occupied_right=0;
int i;
i = 0;
while(i < window.packets) {
while(i < window.packets_len) {
unsigned int window_id;
window_id = window.occupied_id[i];
window_id = window.packets_id[i];
occupied_left = MIN(occupied_left, window.occupied_sides[window_id].left);
occupied_right = MAX(occupied_right, window.occupied_sides[window_id].right);
... ... @@ -131,7 +104,7 @@ static inline int clustercmd_window_add(clustercmd_t *clustercmd_p) {
// Trying to find a space in the buffer to place message
size_t buf_coordinate = SIZE_MAX;
if(window.packets) {
if(window.packets_len) {
// Free space from left (start of buffer)
size_t free_left = occupied_left;
// Free space from right (end of buffer)
... ... @@ -141,33 +114,40 @@ static inline int clustercmd_window_add(clustercmd_t *clustercmd_p) {
buf_coordinate = free_left - required_space;
else
if(free_right > required_space)
buf_coordinate = occupied_right;
else { // Not enough space in the window buffer;
buf_coordinate = occupied_right;
else
{
// Not enough space in the window buffer;
window.buf_size += MAX(CLUSTER_WINDOW_BUFSIZE_PORTION, required_space);
window.buf = xrealloc(window.buf, window.buf_size);
buf_coordinate = occupied_right;
buf_coordinate = occupied_right;
}
printf_ddd("Debug3: clustercmd_window_add(): f_left == %u; f_right == %u; b_coord == %u; w.buf_size == %u",
free_left, free_right, buf_coordinate, window.buf_size);
}
if(window.idstack_len) {
// Using stacked window_id
unsigned int window_id;
(void)window_id; // Anti-warning
window.occupied_sides[window_id].left = buf_coordinate;
window.occupied_sides[window_id].right = buf_coordinate + required_space;
} else {
// If there's no stacked window_id-s, creating new
unsigned int window_id;
}
// packet id in window
window_id = window.packets_len;
window.packets++;
// reserving the space in buffer
window.occupied_sides[window_id].left = buf_coordinate;
window.occupied_sides[window_id].right = buf_coordinate + required_space;
// placing information into buffer
clustercmdwaitack_t *waitack_p;
waitack_p = (clustercmdwaitack_t *)&window.buf[buf_coordinate];
memset(&waitack_p->h, 0, sizeof(waitack_p->h));
memcpy(&waitack_p->cmd, clustercmd_p, clustercmd_size);
g_hash_table_remove(window.serial2waitack_ht, GINT_TO_POINTER(clustercmd_p->h.serial));
waitack_p->h.window_id = window_id;
// remembering new packet
g_hash_table_insert(window.serial2waitack_ht, GINT_TO_POINTER(clustercmd_p->h.serial), waitack_p);
window.packets_id[window.packets_len++] = window_id;
return 0;
}
... ... @@ -189,48 +169,28 @@ static inline int clustercmd_window_del(clustercmdwaitack_t *waitack_p) {
printf_e("Error: clustercmd_window_del(): window is not allocated.");
return EINVAL;
}
if(!window.packets) {
if(!window.packets_len) {
printf_e("Error: clustercmd_window_del(): there already no packets in the window.");
return EINVAL;
}
#endif
/*
struct window {
unsigned int size;
unsigned int packets;
unsigned int used;
unsigned int idstack_len;
unsigned int *idstack;
char *idstacked;
size_t *occupied[WINDOW_SIDES];
GHashTable *serial2waitack_ht;
size_t buf_size;
char *buf;
};
struct clustercmdwaitack {
unsigned int window_id;
char ack_from[MAXNODES];
uint8_t ack_count;
clustercmd_t cmd;
};
*/
unsigned int window_id = waitack_p->h.window_id;
unsigned int window_id_del = waitack_p->h.window_id;
unsigned int window_id_last = --window.packets_len;
g_hash_table_remove(window.serial2waitack_ht, GINT_TO_POINTER(waitack_p->cmd.h.serial));
window.packets--;
// Forgeting the packet
if(window_id == (window.occupied_len-1)) {
window.occupied_len--;
while(window.idstacked[window.occupied_len-1]) {
window.idstacked[--window.occupied_len] = 0;
window.idstack_len--;
}
return 0;
// Moving the last packet into place of deleting packet, to free the tail in "window.packets_id" and "window.occupied_sides"
if(window_id_del != window_id_last) {
printf_ddd("Debug3: clustercmd_window_del(): %i -> %i\n", window_id_last, window_id_del);
window.packets_id[window_id_del] = window.packets_id[window_id_last];
memcpy(&window.occupied_sides[window_id_del], &window.occupied_sides[window_id_last], sizeof(window.occupied_sides[window_id_del]));
}
window.idstack[window.idstack_len++] = window_id;
// Removing from hash table
g_hash_table_remove(window.serial2waitack_ht, GINT_TO_POINTER(waitack_p->cmd.h.serial));
return 0;
}
... ...
... ... @@ -149,12 +149,8 @@ typedef struct window_occupied_sides window_occupied_sides_t;
struct window {
unsigned int size; // Allocated cells
unsigned int packets; // Packets are waiting for ACK-s
unsigned int idstack_len; // Count of stacked cells for future use
unsigned int *idstack; // Stack of cells for future use
char *idstacked; // Assoc-array of cells' id-s (aka window_id) to determine is the cell stacked
unsigned int occupied_len; // Count of cells with packets and stacked cells for future use
unsigned int *occupied_id; // Array of cells' id-s with packets and stacked cells for future use
unsigned int packets_len; // Count of packets (are waiting for ACK-s)
unsigned int *packets_id; // Array of cells' id-s with packets
window_occupied_sides_t *occupied_sides; // Array of structures with coordinates in buffer of occupied space by cell ida (aka window_id)
GHashTable *serial2waitack_ht; // Hash-table: clustercmd.h.serial -> clustercmdwaitack
size_t buf_size; // Allocated space of the buffer
... ...