redmine

continued cluster support

... ... @@ -35,20 +35,104 @@
// Global variables. They will be initialized in cluster_init()
options_t *options_p=NULL;
indexes_t *indexes_p=NULL;
pthread_t pthread_cluster;
int sock = -1;
options_t *options_p = NULL;
indexes_t *indexes_p = NULL;
pthread_t pthread_cluster;
nodeinfo_t nodeinfo[MAXNODES] = {{0}};
nodeinfo_t *nodeinfo_my = NULL;
uint8_t node_id_my = NODEID_NOID;
unsigned int cluster_timeout = 0;
/**
* @brief Sends message to another nodes of the cluster.
*
* @param[in] clustercmd_p Command structure pointer.
*
* @retval zero Successfully send.
* @retval non-zero Got error, while sending.
*
*/
int cluster_send(clustercmd_t *clustercmd_p) {
// Paranoid routines
#ifdef PARANOID
if(clustercmd_p->cmd_id != CLUSTERCMDID_PING) {
if(clustercmd_p->node_id != node_id_my) {
#ifdef VERYPARANOID
printf_e("Error: cluster_send(): clustercmd_p->node_id != node_id_my (%i != %i). Exit.\n", clustercmd_p->node_id, node_id_my);
return EINVAL;
#else
printf_e("Warning: cluster_send(): clustercmd_p->node_id != node_id_my (%i != %i). Correcting.\n", clustercmd_p->node_id, node_id_my);
clustercmd_p->node_id = node_id_my;
#endif
}
}
#endif
// CODE HERE
return 0;
}
/**
* @brief Receives message from another nodes of the cluster.
*
* @param[out] clustercmd_p Command structure pointer.
* @param[i/o] timeout Pointer to timeout (in milliseconds). Timeout is assumed zero if the pointer is NULL. After waiting the event timeout value will be decreased on wasted time.
*
* @retval 1 If there's new message.
* @retval 0 If there's no new messages.
* @retval -1 If got error while receiving. The error-code is placed into "errno".
*
*/
int cluster_recv(clustercmd_t *clustercmd_p, unsigned int *timeout_p) {
int timeout;
// Getting the timeout
timeout = (timeout_p == NULL ? 0 : *timeout_p);
// CODE HERE
// Setting the timeout
if(timeout_p != NULL)
*timeout_p = timeout;
// Paranoid routines
#ifdef PARANOID
if(clustercmd_p->node_id == node_id_my) {
#ifdef VERYPARANOID
printf_e("Error: cluster_recv(): clustercmd_p->node_id != node_id_my (%i != %i). Exit.\n", clustercmd_p->node_id, node_id_my);
return EINVAL;
#else
printf_e("Warning: cluster_recv(): clustercmd_p->node_id != node_id_my (%i != %i). Ignoring the command.\n", clustercmd_p->node_id, node_id_my);
clustercmd_p = NULL;
return 0;
#endif
}
#endif
return 0;
}
extern int cluster_loop();
/**
* @brief Initializes cluster subsystem
* @brief Initializes cluster subsystem.
*
* @param[in] _options_p Pointer to "options" variable, defined in main()
* @param[in] _indexes_p Pointer to "indexes" variable, defined in sync_run()
* @param[in] _options_p Pointer to "options" variable, defined in main().
* @param[in] _indexes_p Pointer to "indexes" variable, defined in sync_run().
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
* @retval zero Successfully initialized.
* @retval non-zero Got error, while initializing.
*
*/
... ... @@ -61,19 +145,107 @@ int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
return EALREADY;
}
// Initializing global variables
options_p = _options_p;
indexes_p = _indexes_p;
// Initializing network routines
sock = socket(AF_INET, SOCK_DGRAM, 0);
int reuse = 1;
if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,(char *)&reuse, sizeof(reuse)) < 0) {
printf_e("Error: cluster_init(): Got error while setsockopt(): %s (errno: %i)\n", strerror(errno), errno);
return errno;
}
struct sockaddr_in sa = {0};
sa.sin_family = AF_INET;
sa.sin_port = htons(options_p->cluster_mcastipport);
sa.sin_addr.s_addr = INADDR_ANY;
// Running thread, that will process incoming traffic from another nodes of the cluster.
// The process is based on function cluster_loop()
// Initializing global variables, pt. 1
options_p = _options_p;
indexes_p = _indexes_p;
cluster_timeout = options_p->cluster_timeout * 1000;
// Getting my ID in the cluster
clustercmd_t clustercmd;
// Trying to preserve my node_id after restart. :)
clustercmd.cmd_id = CLUSTERCMDID_GETMYID;
clustercmd.data_len = options_p->cluster_nodename_len;
clustercmd.data_p = options_p->cluster_nodename;
cluster_send(&clustercmd);
int updatets = 0;
unsigned int timeout = cluster_timeout;
while((ret=cluster_recv(&clustercmd, &timeout)) && (timeout>0)) {
// Exit if error
if(ret == -1) {
printf_e("Error: cluster_init(): Got error while cluster_recv(): %s (%i).\n", strerror(errno), errno);
return errno;
}
// Somebody tryes to give us the cue about our node_id? Skipping if not.
if(clustercmd.cmd_id != CLUSTERCMDID_SETID)
continue;
// Is this the most recent information? Skipping if not.
clustercmd_setiddata_t *setiddata = clustercmd.data_p;
if(!(setiddata->updatets > updatets))
continue;
// Is the node name length in message equals to our node name length? Skipping if not.
uint32_t recv_nodename_len;
recv_nodename_len = clustercmd.data_len - sizeof(*setiddata) + sizeof(char *);
if(recv_nodename_len != options_p->cluster_nodename_len)
continue;
// Is the node name equals to ours? Skipping if not.
if(memcmp(setiddata->node_name, options_p->cluster_nodename, recv_nodename_len))
continue;
// Seems, that somebody knows our node id, remembering it.
node_id_my = setiddata->node_id;
updatets = setiddata->updatets;
}
// Getting free node_id if nobody said us the certain value (see above).
if(node_id_my == NODEID_NOID) {
int i=0;
while(i<MAXNODES) {
if(nodeinfo[i].status == NODESTATUS_DOESNTEXIST) {
node_id_my = i;
break;
}
i++;
}
}
// If there's no free id-s, then exit :(
if(node_id_my == NODEID_NOID) {
printf_e("Error: Cannot find free node ID. Seems, that all %i ID-s is already busy.\n");
return ENOMEM;
}
clustercmd.cmd_id = CLUSTERCMDID_REGISTER;
clustercmd.data_len = 1;
clustercmd.data_p = &node_id_my;
cluster_send(&clustercmd);
// Initializing global variables, pt. 2
nodeinfo_my = &nodeinfo[node_id_my];
nodeinfo_my->modtime_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
// Running thread, that will process background communicating routines with another nodes.
// The process is based on function cluster_loop() [let's use shorthand "cluster_loop()-thread"]
ret = pthread_create(&pthread_cluster, NULL, (void *(*)(void *))cluster_loop, NULL);
return ret;
}
/**
* @brief (syncop) Sends signal to thread that is processing incoming traffic from another nodes of the cluster [thread: cluster_loop()]
* @brief (syncop) Sends signal to cluster_loop()-thread
*
* @param[in] signal Signal number
*
... ... @@ -88,7 +260,7 @@ static inline int cluster_signal(int signal) {
/**
* @brief Antagonist of cluster_init() function. Kills cluster_loop()-thread.
* @brief Antagonist of cluster_init() function. Kills cluster_loop()-thread and cleaning up
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
... ... @@ -96,9 +268,15 @@ static inline int cluster_signal(int signal) {
*/
int cluster_deinit() {
int ret = 0;
cluster_signal(SIGTERM);
return pthread_join(pthread_cluster, NULL);
ret = pthread_join(pthread_cluster, NULL);
g_hash_table_destroy(nodeinfo_my->modtime_ht);
return ret;
}
... ... @@ -152,7 +330,7 @@ int cluster_unlock_all() {
}
/**
* @brief Processes incoming traffic from another nodes in loop. cluster_init() function create a thread for this function.
* @brief Processes background communicating routines with another nodes. cluster_init() function create a thread for this function.
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
... ... @@ -203,12 +381,41 @@ int cluster_loop() {
*/
int cluster_modtime_update(const char *dirpath) {
// "modtime" is incorrent name-part of function. Actually it updates "change time" (man 2 lstat64).
int ret;
// Getting directory information (including "change time" aka "st_ctime")
struct stat64 stat64;
ret=lstat64(dirpath, &stat64);
if(ret) {
printf_e("Error: cluster_modtime_update() cannot lstat64() on \"%s\": %s (errno: %i)\n", dirpath, strerror(errno), errno);
return errno;
}
// Updating "st_ctime" information. g_hash_table_replace() will replace existent information about the directory or create it if it doesn't exist.
g_hash_table_replace(nodeinfo_my->modtime_ht, strdup(dirpath), GINT_TO_POINTER(stat64.st_ctime));
// Why I'm using "st_ctime" instead of "st_mtime"? Because "st_ctime" also updates on updating inode information.
return 0;
}
/**
* @brief (syncop) Exchanging with "modtime_ht"-s to be able to compare them.
*
* @retval zero Successfully initialized
* @retval non-zero Got error, while initializing
*
*/
int cluster_modtime_exchange() {
return 0;
}
/**
* @brief (syncop) Syncing file tree with another nodes with using of directories' modification time as a recent-detector.
*
* @param[in] dirpath Path to the directory
... ... @@ -219,6 +426,8 @@ int cluster_modtime_update(const char *dirpath) {
*/
int cluster_initialsync() {
cluster_modtime_exchange();
return 0;
}
... ...
... ... @@ -17,6 +17,50 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
enum nodestatus {
NODESTATUS_DOESNTEXIST = 0,
NODESTATUS_OFFLINE,
NODESTATUS_ONLINE
};
typedef enum nodestatus nodestatus_t;
enum nodeid {
NODEID_NOID = MAXNODES
};
typedef enum nodeid nodeid_t;
struct nodeinfo {
nodestatus_t status;
uint32_t updatets;
GHashTable *modtime_ht;
};
typedef struct nodeinfo nodeinfo_t;
enum clustercmd_id {
CLUSTERCMDID_PING = 0,
CLUSTERCMDID_REGISTER = 1,
CLUSTERCMDID_GETMYID = 2,
CLUSTERCMDID_SETID = 3,
};
typedef enum clustercmd_id clustercmd_id_t;
struct clustercmd {
uint32_t crc32;
uint8_t node_id;
uint8_t cmd_id;
uint32_t data_len;
void *data_p;
};
typedef struct clustercmd clustercmd_t;
struct clustercmd_setiddata {
uint8_t node_id;
uint32_t updatets;
char *node_name;
};
typedef struct clustercmd_setiddata clustercmd_setiddata_t;
extern int cluster_init(options_t *options_p, indexes_t *indexes_p);
extern int cluster_deinit();
... ... @@ -28,4 +72,3 @@ extern int cluster_capture(const char *fpath);
extern int cluster_modtime_update(const char *dirpath);
extern int cluster_initialsync();
... ...
... ... @@ -52,6 +52,11 @@
#include <dirent.h>
#include <glib.h>
#include <sys/utsname.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "config.h"
... ... @@ -86,6 +91,7 @@ enum flags_enum {
PTHREAD = 'p',
CLUSTERIFACE = 'c',
CLUSTERMCASTIPADDR = 'm',
CLUSTERMCASTIPPORT = 'P',
CLUSTERTIMEOUT = 'W',
CLUSTERNODENAME = 'n',
HELP = 'h',
... ... @@ -160,6 +166,8 @@ struct options {
char *cluster_iface;
char *cluster_mcastipaddr;
char *cluster_nodename;
uint32_t cluster_nodename_len;
uint16_t cluster_mcastipport;
size_t watchdirlen;
size_t destdirlen;
size_t watchdirsize;
... ...
... ... @@ -6,9 +6,14 @@
#define RULE_DEFAULT RULE_ACCEPT
// don't do to much rules, it will degrade performance
#define MAXRULES (1<<8)
// there's no need in more than 256 arguments while running action-script, IMHO :)
#define MAXARGUMENTS (1<<8)
#define MAXTHREADS (1<<5)
// clsync should be used, if there's more than 5-10 nodes. So the limit in 255 is quite enough. :)
#define MAXNODES ((1<<8)-1)
#define DEFAULT_NOTIFYENGINE NE_INOTIFY
#define DEFAULT_COLLECTDELAY 30
... ... @@ -19,6 +24,8 @@
#define DEFAULT_RSYNC_INCLUDELINESLIMIT 20000
#define DEFAULT_SYNCTIMEOUT (3600 * 24)
#define DEFAULT_CLUSTERTIMEOUT 1000
#define DEFAULT_CLUSTERIPADDR "227.108.115.121"
#define DEFAULT_CLUSTERIPPORT 40079
#define FANOTIFY_FLAGS (FAN_CLOEXEC|FAN_UNLIMITED_QUEUE|FAN_UNLIMITED_MARKS)
#define FANOTIFY_EVFLAGS (O_LARGEFILE|O_RDONLY|O_CLOEXEC)
... ...
... ... @@ -33,6 +33,7 @@ static struct option long_options[] =
{"pthread", no_argument, NULL, PTHREAD},
{"cluster-iface", required_argument, NULL, CLUSTERIFACE}, // Not implemented, yet
{"cluster-ip", required_argument, NULL, CLUSTERMCASTIPADDR}, // Not implemented, yet
{"cluster-port", required_argument, NULL, CLUSTERMCASTIPPORT}, // Not implemented, yet
{"cluster-timeout", required_argument, NULL, CLUSTERTIMEOUT}, // Not implemented, yet
{"cluster-node-name", required_argument, NULL, CLUSTERNODENAME}, // Not implemented, yet
{"collectdelay", required_argument, NULL, DELAY},
... ... @@ -313,9 +314,9 @@ int main_rehash(options_t *options_p) {
int main(int argc, char *argv[]) {
struct options options;
memset(&options, 0, sizeof(options));
int ret = 0, nret;
struct stat64 stat64={0};
memset(&options, 0, sizeof(options));
options.notifyengine = DEFAULT_NOTIFYENGINE;
options.syncdelay = DEFAULT_SYNCDELAY;
options._queues[QUEUE_NORMAL].collectdelay = DEFAULT_COLLECTDELAY;
... ... @@ -325,7 +326,6 @@ int main(int argc, char *argv[]) {
options.label = DEFAULT_LABEL;
options.rsyncinclimit = DEFAULT_RSYNC_INCLUDELINESLIMIT;
options.synctimeout = DEFAULT_SYNCTIMEOUT;
options.cluster_timeout = DEFAULT_CLUSTERTIMEOUT;
parse_arguments(argc, argv, &options);
out_init(options.flags);
... ... @@ -339,11 +339,16 @@ int main(int argc, char *argv[]) {
ret = EINVAL;
}
if((options.cluster_iface == NULL) && ((options.cluster_mcastipaddr != NULL) || (options.cluster_nodename != NULL))) {
printf_e("Error: Options \"--cluster-ip\" and/or \"--cluster-node-name\" cannot be used without \"--cluster-iface\".\n");
if((options.cluster_iface == NULL) && ((options.cluster_mcastipaddr != NULL) || (options.cluster_nodename != NULL) || (options.cluster_timeout) || (options.cluster_mcastipport))) {
printf_e("Error: Options \"--cluster-ip\", \"--cluster-node-name\", \"--cluster_timeout\" and/or \"cluster_ipport\" cannot be used without \"--cluster-iface\".\n");
ret = EINVAL;
}
if(!options.cluster_timeout)
options.cluster_timeout = DEFAULT_CLUSTERTIMEOUT;
if(!options.cluster_mcastipport)
options.cluster_mcastipport = DEFAULT_CLUSTERIPPORT;
if(options.cluster_iface != NULL) {
#ifndef _DEBUG
printf_e("Error: Cluster subsystem is not implemented, yet. Sorry.\n");
... ... @@ -358,6 +363,8 @@ int main(int argc, char *argv[]) {
if(options.cluster_nodename == NULL) {
printf_e("Error: Option \"--cluster-iface\" is set, but \"--cluster-node-name\" is not set and cannot get the nodename with uname().\n");
ret = EINVAL;
} else {
options.cluster_nodename_len = strlen(options.cluster_nodename);
}
}
... ...
... ... @@ -117,6 +117,24 @@ Default value is "227.108.115.121". [(128+"c")."l"."s"."y"]
.RE
.PP
.B \-c, \-\-cluster\-port
.I multicast\-port
.RS 8
Not implemented yet!
Sets UDP-port number for multicast messages.
This option can be used only in conjuction with
.B \-\-cluster\-interface
\*S.
.I multicast\-port
should be greater than 0 and less than 65535.
Default value is "40079". [("n" << 8) + "c"]
.RE
.PP
.B \-c, \-\-cluster\-timeout
.I cluster\-timeout
.RS 8
... ...
... ... @@ -827,8 +827,11 @@ int sync_notify_mark(int notify_d, options_t *options_p, const char *accpath, co
return wd;
}
if(options_p->cluster_iface)
cluster_modtime_update(path);
if(options_p->cluster_iface) {
int ret=cluster_modtime_update(path);
if(ret) printf_e("Error: sync_notify_mark() cannot cluster_modtime_update(): %s (errno %i)\n", strerror(ret), ret);
return ret;
}
switch(options_p->notifyengine) {
#ifdef FANOTIFY_SUPPORT
... ...