2 * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
3 * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
5 * This file is part of LVM2.
7 * This copyrighted material is made available to anyone wishing to use,
8 * modify, copy, or redistribute it subject to the terms and conditions
9 * of the GNU General Public License v.2.
11 * You should have received a copy of the GNU General Public License
12 * along with this program; if not, write to the Free Software Foundation,
13 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * CLVMD: Cluster LVM daemon
20 #include "clvmd-common.h"
24 #include "clvmd-comms.h"
27 #include "lvm-functions.h"
28 #include "lvm-version.h"
29 #include "refresh_clvmd.h"
31 #ifdef HAVE_COROSYNC_CONFDB_H
32 #include <corosync/confdb.h>
36 #include <netinet/in.h>
41 #include <sys/utsname.h>
52 #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
54 /* Head of the fd list. Also contains
55 the cluster_socket details */
56 static struct local_client local_client_head;
58 static unsigned short global_xid = 0; /* Last transaction ID issued */
60 struct cluster_ops *clops = NULL;
62 static char our_csid[MAX_CSID_LEN];
63 static unsigned max_csid_len;
64 static unsigned max_cluster_message;
65 static unsigned max_cluster_member_name_len;
67 /* Structure of items on the LVM thread list */
68 struct lvm_thread_cmd {
71 struct local_client *client;
72 struct clvm_header *msg;
73 char csid[MAX_CSID_LEN];
74 int remote; /* Flag */
79 struct lvm_startup_params {
85 static pthread_t lvm_thread;
86 static pthread_mutex_t lvm_thread_mutex;
87 static pthread_cond_t lvm_thread_cond;
88 static pthread_mutex_t lvm_start_mutex;
89 static struct dm_list lvm_cmd_head;
90 static volatile sig_atomic_t quit = 0;
91 static volatile sig_atomic_t reread_config = 0;
92 static int child_pipe[2];
94 /* Reasons the daemon failed initialisation */
96 #define DFAIL_LOCAL_SOCK 2
97 #define DFAIL_CLUSTER_IF 3
98 #define DFAIL_MALLOC 4
99 #define DFAIL_TIMEOUT 5
102 typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC, IF_SINGLENODE} if_type_t;
104 typedef void *(lvm_pthread_fn_t)(void*);
106 /* Prototypes for code further down */
107 static void sigusr2_handler(int sig);
108 static void sighup_handler(int sig);
109 static void sigterm_handler(int sig);
110 static void send_local_reply(struct local_client *client, int status,
112 static void free_reply(struct local_client *client);
113 static void send_version_message(void);
114 static void *pre_and_post_thread(void *arg);
115 static int send_message(void *buf, int msglen, const char *csid, int fd,
116 const char *errtext);
117 static int read_from_local_sock(struct local_client *thisfd);
118 static int process_local_command(struct clvm_header *msg, int msglen,
119 struct local_client *client,
121 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
123 static int process_reply(const struct clvm_header *msg, int msglen,
125 static int open_local_sock(void);
126 static void close_local_sock(int local_socket);
127 static int check_local_clvmd(void);
128 static struct local_client *find_client(int clientid);
129 static void main_loop(int local_sock, int cmd_timeout);
130 static void be_daemon(int start_timeout);
131 static int check_all_clvmds_running(struct local_client *client);
132 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
133 int len, const char *csid,
134 struct local_client **new_client);
135 static void lvm_thread_fn(void *) __attribute__ ((noreturn));
136 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
137 int msglen, const char *csid);
138 static int distribute_command(struct local_client *thisfd);
139 static void hton_clvm(struct clvm_header *hdr);
140 static void ntoh_clvm(struct clvm_header *hdr);
141 static void add_reply_to_list(struct local_client *client, int status,
142 const char *csid, const char *buf, int len);
143 static if_type_t parse_cluster_interface(char *ifname);
144 static if_type_t get_cluster_type(void);
146 static void usage(const char *prog, FILE *file)
148 fprintf(file, "Usage:\n"
150 " -V Show version of clvmd\n"
151 " -h Show this help information\n"
152 " -d Set debug level\n"
153 " If starting clvmd then don't fork, run in the foreground\n"
154 " -R Tell all running clvmds in the cluster to reload their device cache\n"
155 " -S Restart clvmd, preserving exclusive locks\n"
156 " -C Sets debug level (from -d) on all clvmd instances clusterwide\n"
157 " -t<secs> Command timeout (default 60 seconds)\n"
158 " -T<secs> Startup timeout (default none)\n"
159 " -I<cmgr> Cluster manager (default: auto)\n"
160 " Available cluster managers: "
173 #ifdef USE_SINGLENODE
179 /* Called to signal the parent how well we got on during initialisation */
180 static void child_init_signal(int status)
183 /* FIXME Use a proper wrapper around write */
184 if (write(child_pipe[1], &status, sizeof(status)) < 0)
185 log_sys_error("write", "child_pipe");
186 if (close(child_pipe[1]))
187 log_sys_error("close", "child_pipe");
191 static __attribute__((noreturn)) void child_init_signal_and_exit(int status)
193 child_init_signal(status);
197 static void safe_close(int *fd)
206 void debuglog(const char *fmt, ...)
210 static int syslog_init = 0;
212 if (debug == DEBUG_STDERR) {
215 fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
216 vfprintf(stderr, fmt, ap);
219 if (debug == DEBUG_SYSLOG) {
221 openlog("clvmd", LOG_PID, LOG_DAEMON);
226 vsyslog(LOG_DEBUG, fmt, ap);
231 static const char *decode_cmd(unsigned char cmdl)
233 static char buf[128];
240 case CLVMD_CMD_LOCK_VG:
243 case CLVMD_CMD_LOCK_LV:
246 case CLVMD_CMD_REFRESH:
249 case CLVMD_CMD_SET_DEBUG:
250 command = "SET_DEBUG";
252 case CLVMD_CMD_GET_CLUSTERNAME:
253 command = "GET_CLUSTERNAME";
255 case CLVMD_CMD_VG_BACKUP:
256 command = "VG_BACKUP";
258 case CLVMD_CMD_REPLY:
261 case CLVMD_CMD_VERSION:
264 case CLVMD_CMD_GOAWAY:
270 case CLVMD_CMD_UNLOCK:
273 case CLVMD_CMD_LOCK_QUERY:
274 command = "LOCK_QUERY";
276 case CLVMD_CMD_RESTART:
284 sprintf(buf, "%s (0x%x)", command, cmdl);
289 static void remove_lockfile(void)
291 if (unlink(CLVMD_PIDFILE))
292 log_sys_error("unlink", CLVMD_PIDFILE);
296 * clvmd require dm-ioctl capability for operation
298 static void check_permissions(void)
300 if (getuid() || geteuid()) {
301 log_error("Cannot run as a non-root user.");
304 * Fail cleanly here if not run as root, instead of failing
305 * later when attempting a root-only operation
306 * Preferred exit code from an initscript for this.
312 int main(int argc, char *argv[])
315 struct local_client *newfd;
316 struct utsname nodeinfo;
317 struct lvm_startup_params lvm_params;
319 int cmd_timeout = DEFAULT_CMD_TIMEOUT;
320 int start_timeout = 0;
321 if_type_t cluster_iface = IF_AUTO;
325 int clusterwide_opt = 0;
328 /* Deal with command-line arguments */
331 while ((opt = getopt(argc, argv, "?vVhd::t:RST:CI:E:")) != EOF) {
334 usage(argv[0], stdout);
338 usage(argv[0], stderr);
343 return refresh_clvmd(1)==1?0:1;
347 return restart_clvmd(clusterwide_opt)==1?0:1;
356 debug = atoi(optarg);
358 debug = DEBUG_STDERR;
362 cmd_timeout = atoi(optarg);
364 fprintf(stderr, "command timeout is invalid\n");
365 usage(argv[0], stderr);
370 cluster_iface = parse_cluster_interface(optarg);
373 start_timeout = atoi(optarg);
374 if (start_timeout <= 0) {
375 fprintf(stderr, "startup timeout is invalid\n");
376 usage(argv[0], stderr);
382 printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
383 printf("Protocol version: %d.%d.%d\n",
384 CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
385 CLVMD_PATCH_VERSION);
394 /* Setting debug options on an existing clvmd */
395 if (debug_opt && !check_local_clvmd()) {
397 /* Sending to stderr makes no sense for a detached daemon */
398 if (debug == DEBUG_STDERR)
399 debug = DEBUG_SYSLOG;
400 return debug_clvmd(debug, clusterwide_opt)==1?0:1;
404 * Switch to C locale to avoid reading large locale-archive file
405 * used by some glibc (on some distributions it takes over 100MB).
406 * Daemon currently needs to use mlockall().
408 if (setenv("LANG", "C", 1))
409 perror("Cannot set LANG to C");
411 /* Fork into the background (unless requested not to) */
412 if (debug != DEBUG_STDERR) {
413 be_daemon(start_timeout);
416 dm_prepare_selinux_context(DEFAULT_RUN_DIR, S_IFDIR);
417 old_mask = umask(0077);
418 if (dm_create_dir(DEFAULT_RUN_DIR) == 0) {
419 DEBUGLOG("clvmd: unable to create %s directory\n",
427 (void) dm_prepare_selinux_context(CLVMD_PIDFILE, S_IFREG);
428 if (dm_create_lockfile(CLVMD_PIDFILE) == 0) {
429 DEBUGLOG("clvmd: unable to create lockfile\n");
432 (void) dm_prepare_selinux_context(NULL, 0);
434 atexit(remove_lockfile);
436 DEBUGLOG("CLVMD started\n");
438 /* Open the Unix socket we listen for commands on.
439 We do this before opening the cluster socket so that
440 potential clients will block rather than error if we are running
441 but the cluster is not ready yet */
442 local_sock = open_local_sock();
443 if (local_sock < 0) {
444 child_init_signal_and_exit(DFAIL_LOCAL_SOCK);
448 /* Set up signal handlers, USR1 is for cluster change notifications (in cman)
449 USR2 causes child threads to exit.
450 HUP causes gulm version to re-read nodes list from CCS.
451 PIPE should be ignored */
452 signal(SIGUSR2, sigusr2_handler);
453 signal(SIGHUP, sighup_handler);
454 signal(SIGPIPE, SIG_IGN);
456 /* Block SIGUSR2/SIGINT/SIGTERM in process */
458 sigaddset(&ss, SIGUSR2);
459 sigaddset(&ss, SIGINT);
460 sigaddset(&ss, SIGTERM);
461 sigprocmask(SIG_BLOCK, &ss, NULL);
463 /* Initialise the LVM thread variables */
464 dm_list_init(&lvm_cmd_head);
465 pthread_mutex_init(&lvm_thread_mutex, NULL);
466 pthread_cond_init(&lvm_thread_cond, NULL);
467 pthread_mutex_init(&lvm_start_mutex, NULL);
470 /* Start the cluster interface */
471 if (cluster_iface == IF_AUTO)
472 cluster_iface = get_cluster_type();
475 if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) {
476 max_csid_len = CMAN_MAX_CSID_LEN;
477 max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
478 max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
479 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
484 if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) {
485 max_csid_len = GULM_MAX_CSID_LEN;
486 max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
487 max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
489 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
494 if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) {
495 max_csid_len = COROSYNC_CSID_LEN;
496 max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
497 max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
498 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
503 if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) {
504 max_csid_len = OPENAIS_CSID_LEN;
505 max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
506 max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
507 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
510 #ifdef USE_SINGLENODE
512 if (cluster_iface == IF_SINGLENODE && (clops = init_singlenode_cluster())) {
513 max_csid_len = SINGLENODE_CSID_LEN;
514 max_cluster_message = SINGLENODE_MAX_CLUSTER_MESSAGE;
515 max_cluster_member_name_len = MAX_CLUSTER_MEMBER_NAME_LEN;
516 syslog(LOG_NOTICE, "Cluster LVM daemon started - running in single-node mode");
521 DEBUGLOG("Can't initialise cluster interface\n");
522 log_error("Can't initialise cluster interface\n");
523 child_init_signal_and_exit(DFAIL_CLUSTER_IF);
526 DEBUGLOG("Cluster ready, doing some more initialisation\n");
530 clops->get_our_csid(our_csid);
532 /* Initialise the FD list head */
533 local_client_head.fd = clops->get_main_cluster_fd();
534 local_client_head.type = CLUSTER_MAIN_SOCK;
535 local_client_head.callback = clops->cluster_fd_callback;
537 /* Add the local socket to the list */
538 newfd = malloc(sizeof(struct local_client));
540 child_init_signal_and_exit(DFAIL_MALLOC);
544 newfd->fd = local_sock;
546 newfd->type = LOCAL_RENDEZVOUS;
547 newfd->callback = local_rendezvous_callback;
548 newfd->next = local_client_head.next;
549 local_client_head.next = newfd;
551 /* This needs to be started after cluster initialisation
552 as it may need to take out locks */
553 DEBUGLOG("starting LVM thread\n");
555 /* Don't let anyone else to do work until we are started */
556 pthread_mutex_lock(&lvm_start_mutex);
557 lvm_params.using_gulm = using_gulm;
558 lvm_params.argv = argv;
559 pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn,
560 (void *)&lvm_params);
562 /* Tell the rest of the cluster our version number */
563 /* CMAN can do this immediately, gulm needs to wait until
564 the core initialisation has finished and the node list
566 if (clops->cluster_init_completed)
567 clops->cluster_init_completed();
569 DEBUGLOG("clvmd ready for work\n");
570 child_init_signal(SUCCESS);
572 /* Try to shutdown neatly */
573 signal(SIGTERM, sigterm_handler);
574 signal(SIGINT, sigterm_handler);
577 main_loop(local_sock, cmd_timeout);
579 close_local_sock(local_sock);
585 /* Called when the GuLM cluster layer has completed initialisation.
586 We send the version message */
587 void clvmd_cluster_init_completed()
589 send_version_message();
592 /* Data on a connected socket */
593 static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
595 struct local_client **new_client)
598 return read_from_local_sock(thisfd);
601 /* Data on a connected socket */
602 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
603 int len, const char *csid,
604 struct local_client **new_client)
606 /* Someone connected to our local socket, accept it. */
608 struct sockaddr_un socka;
609 struct local_client *newfd;
610 socklen_t sl = sizeof(socka);
611 int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
613 if (client_fd == -1 && errno == EINTR)
616 if (client_fd >= 0) {
617 newfd = malloc(sizeof(struct local_client));
623 if (fcntl(client_fd, F_SETFD, 1))
624 DEBUGLOG("setting CLOEXEC on client fd failed: %s\n", strerror(errno));
626 newfd->fd = client_fd;
627 newfd->type = LOCAL_SOCK;
630 newfd->callback = local_sock_callback;
631 newfd->bits.localsock.replies = NULL;
632 newfd->bits.localsock.expected_replies = 0;
633 newfd->bits.localsock.cmd = NULL;
634 newfd->bits.localsock.in_progress = FALSE;
635 newfd->bits.localsock.sent_out = FALSE;
636 newfd->bits.localsock.threadid = 0;
637 newfd->bits.localsock.finished = 0;
638 newfd->bits.localsock.pipe_client = NULL;
639 newfd->bits.localsock.private = NULL;
640 newfd->bits.localsock.all_success = 1;
641 DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
647 static int local_pipe_callback(struct local_client *thisfd, char *buf,
648 int maxlen, const char *csid,
649 struct local_client **new_client)
652 char buffer[PIPE_BUF];
653 struct local_client *sock_client = thisfd->bits.pipe.client;
654 int status = -1; /* in error by default */
656 len = read(thisfd->fd, buffer, sizeof(int));
657 if (len == -1 && errno == EINTR)
660 if (len == sizeof(int)) {
661 memcpy(&status, buffer, sizeof(int));
664 DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
665 thisfd->fd, len, status);
667 /* EOF on pipe or an error, close it */
673 /* Clear out the cross-link */
674 if (thisfd->bits.pipe.client != NULL)
675 thisfd->bits.pipe.client->bits.localsock.pipe_client =
678 /* Reap child thread */
679 if (thisfd->bits.pipe.threadid) {
680 jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
681 thisfd->bits.pipe.threadid = 0;
682 if (thisfd->bits.pipe.client != NULL)
683 thisfd->bits.pipe.client->bits.localsock.
688 DEBUGLOG("background routine status was %d, sock_client=%p\n",
689 status, sock_client);
690 /* But has the client gone away ?? */
691 if (sock_client == NULL) {
693 ("Got PIPE response for dead client, ignoring it\n");
695 /* If error then just return that code */
697 send_local_reply(sock_client, status,
700 if (sock_client->bits.localsock.state ==
702 send_local_reply(sock_client, 0,
704 } else // PRE_COMMAND finished.
708 distribute_command(sock_client)) !=
709 0) send_local_reply(sock_client,
720 /* If a noed is up, look for it in the reply array, if it's not there then
721 add one with "ETIMEDOUT".
722 NOTE: This won't race with real replies because they happen in the same thread.
724 static void timedout_callback(struct local_client *client, const char *csid,
728 struct node_reply *reply;
729 char nodename[max_cluster_member_name_len];
731 clops->name_from_csid(csid, nodename);
732 DEBUGLOG("Checking for a reply from %s\n", nodename);
733 pthread_mutex_lock(&client->bits.localsock.reply_mutex);
735 reply = client->bits.localsock.replies;
736 while (reply && strcmp(reply->node, nodename) != 0) {
740 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
743 DEBUGLOG("Node %s timed-out\n", nodename);
744 add_reply_to_list(client, ETIMEDOUT, csid,
745 "Command timed out", 18);
750 /* Called when the request has timed out on at least one node. We fill in
751 the remaining node entries with ETIMEDOUT and return.
753 By the time we get here the node that caused
754 the timeout could have gone down, in which case we will never get the expected
755 number of replies that triggers the post command so we need to do it here
757 static void request_timed_out(struct local_client *client)
759 DEBUGLOG("Request timed-out. padding\n");
760 clops->cluster_do_node_callback(client, timedout_callback);
762 if (client->bits.localsock.num_replies !=
763 client->bits.localsock.expected_replies) {
764 /* Post-process the command */
765 if (client->bits.localsock.threadid) {
766 pthread_mutex_lock(&client->bits.localsock.mutex);
767 client->bits.localsock.state = POST_COMMAND;
768 pthread_cond_signal(&client->bits.localsock.cond);
769 pthread_mutex_unlock(&client->bits.localsock.mutex);
774 /* This is where the real work happens */
775 static void main_loop(int local_sock, int cmd_timeout)
777 DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
781 sigaddset(&ss, SIGINT);
782 sigaddset(&ss, SIGTERM);
783 pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
788 struct local_client *thisfd;
789 struct timeval tv = { cmd_timeout, 0 };
790 int quorate = clops->is_quorate();
792 /* Wait on the cluster FD and all local sockets/pipes */
793 local_client_head.fd = clops->get_main_cluster_fd();
795 for (thisfd = &local_client_head; thisfd != NULL;
796 thisfd = thisfd->next) {
798 if (thisfd->removeme)
801 /* if the cluster is not quorate then don't listen for new requests */
802 if ((thisfd->type != LOCAL_RENDEZVOUS &&
803 thisfd->type != LOCAL_SOCK) || quorate)
804 FD_SET(thisfd->fd, &in);
807 select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
810 int saved_errno = errno;
813 if (clops->reread_config)
814 clops->reread_config();
818 if (select_status > 0) {
819 struct local_client *lastfd = NULL;
820 char csid[MAX_CSID_LEN];
821 char buf[max_cluster_message];
823 for (thisfd = &local_client_head; thisfd != NULL;
824 thisfd = thisfd->next) {
826 if (thisfd->removeme) {
827 struct local_client *free_fd;
828 lastfd->next = thisfd->next;
832 DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
834 /* Queue cleanup, this also frees the client struct */
835 add_to_lvmqueue(free_fd, NULL, 0, NULL);
839 if (FD_ISSET(thisfd->fd, &in)) {
840 struct local_client *newfd = NULL;
845 thisfd->callback(thisfd, buf,
849 if (ret < 0 && (errno == EAGAIN ||
850 errno == EINTR)) continue;
852 /* Got error or EOF: Remove it from the list safely */
854 struct local_client *free_fd;
855 int type = thisfd->type;
857 /* If the cluster socket shuts down, so do we */
858 if (type == CLUSTER_MAIN_SOCK ||
859 type == CLUSTER_INTERNAL)
862 DEBUGLOG("ret == %d, errno = %d. removing client\n",
864 lastfd->next = thisfd->next;
867 safe_close(&(free_fd->fd));
869 /* Queue cleanup, this also frees the client struct */
870 add_to_lvmqueue(free_fd, NULL, 0, NULL);
874 /* New client...simply add it to the list */
876 newfd->next = thisfd->next;
877 thisfd->next = newfd;
885 /* Select timed out. Check for clients that have been waiting too long for a response */
886 if (select_status == 0) {
887 time_t the_time = time(NULL);
889 for (thisfd = &local_client_head; thisfd != NULL;
890 thisfd = thisfd->next) {
891 if (thisfd->type == LOCAL_SOCK
892 && thisfd->bits.localsock.sent_out
893 && thisfd->bits.localsock.sent_time +
894 cmd_timeout < the_time
895 && thisfd->bits.localsock.
897 thisfd->bits.localsock.num_replies) {
898 /* Send timed out message + replies we already have */
900 ("Request timed-out (send: %ld, now: %ld)\n",
901 thisfd->bits.localsock.sent_time,
904 thisfd->bits.localsock.all_success = 0;
906 request_timed_out(thisfd);
910 if (select_status < 0) {
915 perror("select error");
922 clops->cluster_closedown();
925 static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
930 struct timeval tv = {timeout, 0};
933 FD_SET(c_pipe, &fds);
935 sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
937 fprintf(stderr, "clvmd startup timed out\n");
941 if (read(c_pipe, &child_status, sizeof(child_status)) !=
942 sizeof(child_status)) {
944 fprintf(stderr, "clvmd failed in initialisation\n");
948 switch (child_status) {
952 fprintf(stderr, "clvmd failed in initialisation\n");
954 case DFAIL_LOCAL_SOCK:
955 fprintf(stderr, "clvmd could not create local socket\n");
956 fprintf(stderr, "Another clvmd is probably already running\n");
958 case DFAIL_CLUSTER_IF:
959 fprintf(stderr, "clvmd could not connect to cluster manager\n");
960 fprintf(stderr, "Consult syslog for more information\n");
963 fprintf(stderr, "clvmd failed, not enough memory\n");
966 fprintf(stderr, "clvmd failed, error was %d\n", child_status);
972 fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
977 * Fork into the background and detach from our parent process.
978 * In the interests of user-friendliness we wait for the daemon
979 * to complete initialisation before returning its status
982 static void be_daemon(int timeout)
984 int devnull = open("/dev/null", O_RDWR);
986 perror("Can't open /dev/null");
994 perror("clvmd: can't fork");
998 close(child_pipe[0]);
1001 default: /* Parent */
1002 close(child_pipe[1]);
1003 wait_for_child(child_pipe[0], timeout);
1006 /* Detach ourself from the calling environment */
1007 if (close(0) || close(1) || close(2)) {
1008 perror("Error closing terminal FDs");
1013 if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
1014 || dup2(devnull, 2) < 0) {
1015 perror("Error setting terminal FDs to /dev/null");
1016 log_error("Error setting terminal FDs to /dev/null: %m");
1020 log_error("Error setting current directory to /: %m");
1026 /* Called when we have a read from the local socket.
1027 was in the main loop but it's grown up and is a big girl now */
1028 static int read_from_local_sock(struct local_client *thisfd)
1033 char buffer[PIPE_BUF];
1035 len = read(thisfd->fd, buffer, sizeof(buffer));
1036 if (len == -1 && errno == EINTR)
1039 DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
1041 /* EOF or error on socket */
1046 DEBUGLOG("EOF on local socket: inprogress=%d\n",
1047 thisfd->bits.localsock.in_progress);
1049 thisfd->bits.localsock.finished = 1;
1051 /* If the client went away in mid command then tidy up */
1052 if (thisfd->bits.localsock.in_progress) {
1053 pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
1054 pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1055 thisfd->bits.localsock.state = POST_COMMAND;
1056 pthread_cond_signal(&thisfd->bits.localsock.cond);
1057 pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1059 /* Free any unsent buffers */
1063 /* Kill the subthread & free resources */
1064 if (thisfd->bits.localsock.threadid) {
1065 DEBUGLOG("Waiting for child thread\n");
1066 pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1067 thisfd->bits.localsock.state = PRE_COMMAND;
1068 pthread_cond_signal(&thisfd->bits.localsock.cond);
1069 pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1072 pthread_join(thisfd->bits.localsock.threadid,
1074 DEBUGLOG("Joined child thread\n");
1076 thisfd->bits.localsock.threadid = 0;
1077 pthread_cond_destroy(&thisfd->bits.localsock.cond);
1078 pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
1080 /* Remove the pipe client */
1081 if (thisfd->bits.localsock.pipe_client != NULL) {
1082 struct local_client *newfd;
1083 struct local_client *lastfd = NULL;
1084 struct local_client *free_fd = NULL;
1086 close(thisfd->bits.localsock.pipe_client->fd); /* Close pipe */
1087 close(thisfd->bits.localsock.pipe);
1089 /* Remove pipe client */
1090 for (newfd = &local_client_head; newfd != NULL;
1091 newfd = newfd->next) {
1092 if (thisfd->bits.localsock.
1093 pipe_client == newfd) {
1094 thisfd->bits.localsock.
1097 lastfd->next = newfd->next;
1099 newfd->next = lastfd;
1108 /* Free the command buffer */
1109 free(thisfd->bits.localsock.cmd);
1111 /* Clear out the cross-link */
1112 if (thisfd->bits.localsock.pipe_client != NULL)
1113 thisfd->bits.localsock.pipe_client->bits.pipe.client =
1116 safe_close(&(thisfd->fd));
1120 struct local_client *newfd;
1121 char csid[MAX_CSID_LEN];
1122 struct clvm_header *inheader;
1125 inheader = (struct clvm_header *) buffer;
1127 /* Fill in the client ID */
1128 inheader->clientid = htonl(thisfd->fd);
1130 /* If we are already busy then return an error */
1131 if (thisfd->bits.localsock.in_progress) {
1132 struct clvm_header reply;
1133 reply.cmd = CLVMD_CMD_REPLY;
1134 reply.status = EBUSY;
1137 send_message(&reply, sizeof(reply), our_csid,
1139 "Error sending EBUSY reply to local user");
1143 /* Free any old buffer space */
1144 free(thisfd->bits.localsock.cmd);
1146 /* See if we have the whole message */
1148 len - strlen(inheader->node) - sizeof(struct clvm_header);
1149 missing_len = inheader->arglen - argslen;
1151 if (missing_len < 0)
1154 /* Save the message */
1155 thisfd->bits.localsock.cmd = malloc(len + missing_len);
1157 if (!thisfd->bits.localsock.cmd) {
1158 struct clvm_header reply;
1159 reply.cmd = CLVMD_CMD_REPLY;
1160 reply.status = ENOMEM;
1163 send_message(&reply, sizeof(reply), our_csid,
1165 "Error sending ENOMEM reply to local user");
1168 memcpy(thisfd->bits.localsock.cmd, buffer, len);
1169 thisfd->bits.localsock.cmd_len = len + missing_len;
1170 inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
1172 /* If we don't have the full message then read the rest now */
1175 inheader->node + strlen(inheader->node) + 1;
1177 while (missing_len > 0 && len >= 0) {
1179 ("got %d bytes, need another %d (total %d)\n",
1180 argslen, missing_len, inheader->arglen);
1181 len = read(thisfd->fd, argptr + argslen,
1190 /* Initialise and lock the mutex so the subthread will wait after
1191 finishing the PRE routine */
1192 if (!thisfd->bits.localsock.threadid) {
1193 pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
1194 pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
1195 pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
1198 /* Only run the command if all the cluster nodes are running CLVMD */
1199 if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
1200 (check_all_clvmds_running(thisfd) == -1)) {
1201 thisfd->bits.localsock.expected_replies = 0;
1202 thisfd->bits.localsock.num_replies = 0;
1203 send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
1207 /* Check the node name for validity */
1208 if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
1209 /* Error, node is not in the cluster */
1210 struct clvm_header reply;
1211 DEBUGLOG("Unknown node: '%s'\n", inheader->node);
1213 reply.cmd = CLVMD_CMD_REPLY;
1214 reply.status = ENOENT;
1217 send_message(&reply, sizeof(reply), our_csid,
1219 "Error sending ENOENT reply to local user");
1220 thisfd->bits.localsock.expected_replies = 0;
1221 thisfd->bits.localsock.num_replies = 0;
1222 thisfd->bits.localsock.in_progress = FALSE;
1223 thisfd->bits.localsock.sent_out = FALSE;
1227 /* If we already have a subthread then just signal it to start */
1228 if (thisfd->bits.localsock.threadid) {
1229 pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1230 thisfd->bits.localsock.state = PRE_COMMAND;
1231 pthread_cond_signal(&thisfd->bits.localsock.cond);
1232 pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1236 /* Create a pipe and add the reading end to our FD list */
1238 newfd = malloc(sizeof(struct local_client));
1240 struct clvm_header reply;
1241 close(comms_pipe[0]);
1242 close(comms_pipe[1]);
1244 reply.cmd = CLVMD_CMD_REPLY;
1245 reply.status = ENOMEM;
1248 send_message(&reply, sizeof(reply), our_csid,
1250 "Error sending ENOMEM reply to local user");
1253 DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
1256 if (fcntl(comms_pipe[0], F_SETFD, 1))
1257 DEBUGLOG("setting CLOEXEC on pipe[0] failed: %s\n", strerror(errno));
1258 if (fcntl(comms_pipe[1], F_SETFD, 1))
1259 DEBUGLOG("setting CLOEXEC on pipe[1] failed: %s\n", strerror(errno));
1261 newfd->fd = comms_pipe[0];
1262 newfd->removeme = 0;
1263 newfd->type = THREAD_PIPE;
1264 newfd->callback = local_pipe_callback;
1265 newfd->next = thisfd->next;
1266 newfd->bits.pipe.client = thisfd;
1267 newfd->bits.pipe.threadid = 0;
1268 thisfd->next = newfd;
1270 /* Store a cross link to the pipe */
1271 thisfd->bits.localsock.pipe_client = newfd;
1273 thisfd->bits.localsock.pipe = comms_pipe[1];
1275 /* Make sure the thread has a copy of it's own ID */
1276 newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
1278 /* Run the pre routine */
1279 thisfd->bits.localsock.in_progress = TRUE;
1280 thisfd->bits.localsock.state = PRE_COMMAND;
1281 DEBUGLOG("Creating pre&post thread\n");
1282 status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
1283 pre_and_post_thread, thisfd);
1284 DEBUGLOG("Created pre&post thread, state = %d\n", status);
1289 /* Add a file descriptor from the cluster or comms interface to
1290 our list of FDs for select
1292 int add_client(struct local_client *new_client)
1294 new_client->next = local_client_head.next;
1295 local_client_head.next = new_client;
1300 /* Called when the pre-command has completed successfully - we
1301 now execute the real command on all the requested nodes */
1302 static int distribute_command(struct local_client *thisfd)
1304 struct clvm_header *inheader =
1305 (struct clvm_header *) thisfd->bits.localsock.cmd;
1306 int len = thisfd->bits.localsock.cmd_len;
1308 thisfd->xid = global_xid++;
1309 DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
1311 /* Forward it to other nodes in the cluster if needed */
1312 if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
1313 /* if node is empty then do it on the whole cluster */
1314 if (inheader->node[0] == '\0') {
1315 thisfd->bits.localsock.expected_replies =
1316 clops->get_num_nodes();
1317 thisfd->bits.localsock.num_replies = 0;
1318 thisfd->bits.localsock.sent_time = time(NULL);
1319 thisfd->bits.localsock.in_progress = TRUE;
1320 thisfd->bits.localsock.sent_out = TRUE;
1322 /* Do it here first */
1323 add_to_lvmqueue(thisfd, inheader, len, NULL);
1325 DEBUGLOG("Sending message to all cluster nodes\n");
1326 inheader->xid = thisfd->xid;
1327 send_message(inheader, len, NULL, -1,
1328 "Error forwarding message to cluster");
1330 /* Do it on a single node */
1331 char csid[MAX_CSID_LEN];
1333 if (clops->csid_from_name(csid, inheader->node)) {
1334 /* This has already been checked so should not happen */
1337 /* OK, found a node... */
1338 thisfd->bits.localsock.expected_replies = 1;
1339 thisfd->bits.localsock.num_replies = 0;
1340 thisfd->bits.localsock.in_progress = TRUE;
1342 /* Are we the requested node ?? */
1343 if (memcmp(csid, our_csid, max_csid_len) == 0) {
1344 DEBUGLOG("Doing command on local node only\n");
1345 add_to_lvmqueue(thisfd, inheader, len, NULL);
1347 DEBUGLOG("Sending message to single node: %s\n",
1349 inheader->xid = thisfd->xid;
1350 send_message(inheader, len,
1352 "Error forwarding message to cluster node");
1357 /* Local explicitly requested, ignore nodes */
1358 thisfd->bits.localsock.in_progress = TRUE;
1359 thisfd->bits.localsock.expected_replies = 1;
1360 thisfd->bits.localsock.num_replies = 0;
1361 add_to_lvmqueue(thisfd, inheader, len, NULL);
1366 /* Process a command from a remote node and return the result */
1367 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
1371 char nodename[max_cluster_member_name_len];
1373 int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1375 int msg_malloced = 0;
1377 /* Get the node name as we /may/ need it later */
1378 clops->name_from_csid(csid, nodename);
1380 DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
1381 decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
1383 /* Check for GOAWAY and sulk */
1384 if (msg->cmd == CLVMD_CMD_GOAWAY) {
1386 DEBUGLOG("Told to go away by %s\n", nodename);
1387 log_error("Told to go away by %s\n", nodename);
1391 /* Version check is internal - don't bother exposing it in
1393 if (msg->cmd == CLVMD_CMD_VERSION) {
1394 int version_nums[3];
1397 memcpy(version_nums, msg->args, sizeof(version_nums));
1399 clops->name_from_csid(csid, node);
1400 DEBUGLOG("Remote node %s is version %d.%d.%d\n",
1402 ntohl(version_nums[0]),
1403 ntohl(version_nums[1]), ntohl(version_nums[2]));
1405 if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
1406 struct clvm_header byebyemsg;
1408 ("Telling node %s to go away because of incompatible version number\n",
1411 ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
1412 node, ntohl(version_nums[0]),
1413 ntohl(version_nums[1]), ntohl(version_nums[2]));
1415 byebyemsg.cmd = CLVMD_CMD_GOAWAY;
1416 byebyemsg.status = 0;
1417 byebyemsg.flags = 0;
1418 byebyemsg.arglen = 0;
1419 byebyemsg.clientid = 0;
1420 clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
1422 "Error Sending GOAWAY message");
1424 clops->add_up_node(csid);
1429 /* Allocate a default reply buffer */
1430 replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
1432 if (replyargs != NULL) {
1433 /* Run the command */
1435 do_command(NULL, msg, msglen, &replyargs, buflen,
1441 /* If it wasn't a reply, then reply */
1442 if (msg->cmd != CLVMD_CMD_REPLY) {
1446 realloc(replyargs, replylen + sizeof(struct clvm_header));
1448 struct clvm_header *agghead =
1449 (struct clvm_header *) aggreply;
1451 replyargs = aggreply;
1452 /* Move it up so there's room for a header in front of the data */
1453 memmove(aggreply + offsetof(struct clvm_header, args),
1454 replyargs, replylen);
1456 agghead->xid = msg->xid;
1457 agghead->cmd = CLVMD_CMD_REPLY;
1458 agghead->status = status;
1460 agghead->clientid = msg->clientid;
1461 agghead->arglen = replylen;
1462 agghead->node[0] = '\0';
1463 send_message(aggreply,
1464 sizeof(struct clvm_header) +
1466 "Error sending command reply");
1468 struct clvm_header head;
1470 DEBUGLOG("Error attempting to realloc return buffer\n");
1471 /* Return a failure response */
1472 head.cmd = CLVMD_CMD_REPLY;
1473 head.status = ENOMEM;
1475 head.clientid = msg->clientid;
1477 head.node[0] = '\0';
1478 send_message(&head, sizeof(struct clvm_header), csid,
1479 fd, "Error sending ENOMEM command reply");
1484 /* Free buffer if it was malloced */
1491 /* Add a reply to a command to the list of replies for this client.
1492 If we have got a full set then send them to the waiting client down the local
1494 static void add_reply_to_list(struct local_client *client, int status,
1495 const char *csid, const char *buf, int len)
1497 struct node_reply *reply;
1499 pthread_mutex_lock(&client->bits.localsock.reply_mutex);
1501 /* Add it to the list of replies */
1502 reply = malloc(sizeof(struct node_reply));
1504 reply->status = status;
1505 clops->name_from_csid(csid, reply->node);
1506 DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
1509 reply->replymsg = malloc(len);
1510 if (!reply->replymsg) {
1511 reply->status = ENOMEM;
1513 memcpy(reply->replymsg, buf, len);
1516 reply->replymsg = NULL;
1518 /* Hook it onto the reply chain */
1519 reply->next = client->bits.localsock.replies;
1520 client->bits.localsock.replies = reply;
1522 /* It's all gone horribly wrong... */
1523 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1524 send_local_reply(client, ENOMEM, client->fd);
1527 DEBUGLOG("Got %d replies, expecting: %d\n",
1528 client->bits.localsock.num_replies + 1,
1529 client->bits.localsock.expected_replies);
1531 /* If we have the whole lot then do the post-process */
1532 if (++client->bits.localsock.num_replies ==
1533 client->bits.localsock.expected_replies) {
1534 /* Post-process the command */
1535 if (client->bits.localsock.threadid) {
1536 pthread_mutex_lock(&client->bits.localsock.mutex);
1537 client->bits.localsock.state = POST_COMMAND;
1538 pthread_cond_signal(&client->bits.localsock.cond);
1539 pthread_mutex_unlock(&client->bits.localsock.mutex);
1542 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1545 /* This is the thread that runs the PRE and post commands for a particular connection */
1546 static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
1548 struct local_client *client = (struct local_client *) arg;
1552 int pipe_fd = client->bits.localsock.pipe;
1554 DEBUGLOG("in sub thread: client = %p\n", client);
1555 pthread_mutex_lock(&client->bits.localsock.mutex);
1557 /* Don't start until the LVM thread is ready */
1558 pthread_mutex_lock(&lvm_start_mutex);
1559 pthread_mutex_unlock(&lvm_start_mutex);
1560 DEBUGLOG("Sub thread ready for work.\n");
1562 /* Ignore SIGUSR1 (handled by master process) but enable
1563 SIGUSR2 (kills subthreads) */
1565 sigaddset(&ss, SIGUSR1);
1566 pthread_sigmask(SIG_BLOCK, &ss, NULL);
1568 sigdelset(&ss, SIGUSR1);
1569 sigaddset(&ss, SIGUSR2);
1570 pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
1572 /* Loop around doing PRE and POST functions until the client goes away */
1573 while (!client->bits.localsock.finished) {
1574 /* Execute the code */
1575 status = do_pre_command(client);
1578 client->bits.localsock.all_success = 0;
1580 DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
1582 /* Tell the parent process we have finished this bit */
1584 write_status = write(pipe_fd, &status, sizeof(int));
1585 if (write_status == sizeof(int))
1587 if (write_status < 0 &&
1588 (errno == EINTR || errno == EAGAIN))
1590 log_error("Error sending to pipe: %m\n");
1595 client->bits.localsock.state = POST_COMMAND;
1599 /* We may need to wait for the condition variable before running the post command */
1600 DEBUGLOG("Waiting to do post command - state = %d\n",
1601 client->bits.localsock.state);
1603 if (client->bits.localsock.state != POST_COMMAND &&
1604 !client->bits.localsock.finished) {
1605 pthread_cond_wait(&client->bits.localsock.cond,
1606 &client->bits.localsock.mutex);
1609 DEBUGLOG("Got post command condition...\n");
1611 /* POST function must always run, even if the client aborts */
1613 do_post_command(client);
1616 write_status = write(pipe_fd, &status, sizeof(int));
1617 if (write_status == sizeof(int))
1619 if (write_status < 0 &&
1620 (errno == EINTR || errno == EAGAIN))
1622 log_error("Error sending to pipe: %m\n");
1626 DEBUGLOG("Waiting for next pre command\n");
1628 if (client->bits.localsock.state != PRE_COMMAND &&
1629 !client->bits.localsock.finished) {
1630 pthread_cond_wait(&client->bits.localsock.cond,
1631 &client->bits.localsock.mutex);
1634 DEBUGLOG("Got pre command condition...\n");
1636 pthread_mutex_unlock(&client->bits.localsock.mutex);
1637 DEBUGLOG("Subthread finished\n");
1638 pthread_exit((void *) 0);
1641 /* Process a command on the local node and store the result */
1642 static int process_local_command(struct clvm_header *msg, int msglen,
1643 struct local_client *client,
1646 char *replybuf = malloc(max_cluster_message);
1647 int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1651 DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
1652 decode_cmd(msg->cmd), msg, msglen, client);
1654 if (replybuf == NULL)
1657 status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
1660 client->bits.localsock.all_success = 0;
1662 /* If we took too long then discard the reply */
1663 if (xid == client->xid) {
1664 add_reply_to_list(client, status, our_csid, replybuf, replylen);
1667 ("Local command took too long, discarding xid %d, current is %d\n",
1675 static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
1677 struct local_client *client = NULL;
1679 client = find_client(msg->clientid);
1681 DEBUGLOG("Got message for unknown client 0x%x\n",
1683 log_error("Got message for unknown client 0x%x\n",
1689 client->bits.localsock.all_success = 0;
1691 /* Gather replies together for this client id */
1692 if (msg->xid == client->xid) {
1693 add_reply_to_list(client, msg->status, csid, msg->args,
1696 DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
1697 msg->xid, client->xid);
1702 /* Send an aggregated reply back to the client */
1703 static void send_local_reply(struct local_client *client, int status, int fd)
1705 struct clvm_header *clientreply;
1706 struct node_reply *thisreply = client->bits.localsock.replies;
1709 int message_len = 0;
1711 DEBUGLOG("Send local reply\n");
1713 /* Work out the total size of the reply */
1715 if (thisreply->replymsg)
1716 message_len += strlen(thisreply->replymsg) + 1;
1720 message_len += strlen(thisreply->node) + 1 + sizeof(int);
1722 thisreply = thisreply->next;
1725 /* Add in the size of our header */
1726 message_len = message_len + sizeof(struct clvm_header) + 1;
1727 replybuf = malloc(message_len);
1729 clientreply = (struct clvm_header *) replybuf;
1730 clientreply->status = status;
1731 clientreply->cmd = CLVMD_CMD_REPLY;
1732 clientreply->node[0] = '\0';
1733 clientreply->flags = 0;
1735 ptr = clientreply->args;
1737 /* Add in all the replies, and free them as we go */
1738 thisreply = client->bits.localsock.replies;
1740 struct node_reply *tempreply = thisreply;
1742 strcpy(ptr, thisreply->node);
1743 ptr += strlen(thisreply->node) + 1;
1745 if (thisreply->status)
1746 clientreply->flags |= CLVMD_FLAG_NODEERRS;
1748 memcpy(ptr, &thisreply->status, sizeof(int));
1751 if (thisreply->replymsg) {
1752 strcpy(ptr, thisreply->replymsg);
1753 ptr += strlen(thisreply->replymsg) + 1;
1758 thisreply = thisreply->next;
1760 free(tempreply->replymsg);
1764 /* Terminate with an empty node name */
1767 clientreply->arglen = ptr - clientreply->args + 1;
1770 send_message(replybuf, message_len, our_csid, fd,
1771 "Error sending REPLY to client");
1774 /* Reset comms variables */
1775 client->bits.localsock.replies = NULL;
1776 client->bits.localsock.expected_replies = 0;
1777 client->bits.localsock.in_progress = FALSE;
1778 client->bits.localsock.sent_out = FALSE;
1781 /* Just free a reply chain baceuse it wasn't used. */
1782 static void free_reply(struct local_client *client)
1784 /* Add in all the replies, and free them as we go */
1785 struct node_reply *thisreply = client->bits.localsock.replies;
1787 struct node_reply *tempreply = thisreply;
1789 thisreply = thisreply->next;
1791 free(tempreply->replymsg);
1794 client->bits.localsock.replies = NULL;
1797 /* Send our version number to the cluster */
1798 static void send_version_message()
1800 char message[sizeof(struct clvm_header) + sizeof(int) * 3];
1801 struct clvm_header *msg = (struct clvm_header *) message;
1802 int version_nums[3];
1804 msg->cmd = CLVMD_CMD_VERSION;
1808 msg->arglen = sizeof(version_nums);
1810 version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
1811 version_nums[1] = htonl(CLVMD_MINOR_VERSION);
1812 version_nums[2] = htonl(CLVMD_PATCH_VERSION);
1814 memcpy(&msg->args, version_nums, sizeof(version_nums));
1818 clops->cluster_send_message(message, sizeof(message), NULL,
1819 "Error Sending version number");
1822 /* Send a message to either a local client or another server */
1823 static int send_message(void *buf, int msglen, const char *csid, int fd,
1824 const char *errtext)
1827 int saved_errno = 0;
1828 struct timespec delay;
1829 struct timespec remtime;
1833 /* Send remote messages down the cluster socket */
1834 if (csid == NULL || !ISLOCAL_CSID(csid)) {
1835 hton_clvm((struct clvm_header *) buf);
1836 return clops->cluster_send_message(buf, msglen, csid, errtext);
1840 /* Make sure it all goes */
1842 if (retry_cnt > MAX_RETRIES)
1844 errno = saved_errno;
1845 log_error("%s", errtext);
1846 errno = saved_errno;
1850 len = write(fd, buf + ptr, msglen - ptr);
1855 if (errno == EAGAIN ||
1858 saved_errno = errno;
1862 delay.tv_nsec = 100000;
1864 remtime.tv_nsec = 0;
1865 (void) nanosleep (&delay, &remtime);
1869 log_error("%s", errtext);
1873 } while (ptr < msglen);
1878 static int process_work_item(struct lvm_thread_cmd *cmd)
1880 /* If msg is NULL then this is a cleanup request */
1881 if (cmd->msg == NULL) {
1882 DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
1883 cmd_client_cleanup(cmd->client);
1889 DEBUGLOG("process_work_item: local\n");
1890 process_local_command(cmd->msg, cmd->msglen, cmd->client,
1893 DEBUGLOG("process_work_item: remote\n");
1894 process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
1901 * Routine that runs in the "LVM thread".
1903 static void lvm_thread_fn(void *arg)
1905 struct dm_list *cmdl, *tmp;
1907 struct lvm_startup_params *lvm_params = arg;
1909 DEBUGLOG("LVM thread function started\n");
1911 /* Ignore SIGUSR1 & 2 */
1913 sigaddset(&ss, SIGUSR1);
1914 sigaddset(&ss, SIGUSR2);
1915 pthread_sigmask(SIG_BLOCK, &ss, NULL);
1917 /* Initialise the interface to liblvm */
1918 init_clvm(lvm_params->using_gulm, lvm_params->argv);
1920 /* Allow others to get moving */
1921 pthread_mutex_unlock(&lvm_start_mutex);
1923 /* Now wait for some actual work */
1925 DEBUGLOG("LVM thread waiting for work\n");
1927 pthread_mutex_lock(&lvm_thread_mutex);
1928 if (dm_list_empty(&lvm_cmd_head))
1929 pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
1931 dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
1932 struct lvm_thread_cmd *cmd;
1935 dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
1936 dm_list_del(&cmd->list);
1937 pthread_mutex_unlock(&lvm_thread_mutex);
1939 process_work_item(cmd);
1943 pthread_mutex_lock(&lvm_thread_mutex);
1945 pthread_mutex_unlock(&lvm_thread_mutex);
1949 /* Pass down some work to the LVM thread */
1950 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
1951 int msglen, const char *csid)
1953 struct lvm_thread_cmd *cmd;
1955 cmd = malloc(sizeof(struct lvm_thread_cmd));
1960 cmd->msg = malloc(msglen);
1962 log_error("Unable to allocate buffer space\n");
1966 memcpy(cmd->msg, msg, msglen);
1971 cmd->client = client;
1972 cmd->msglen = msglen;
1973 cmd->xid = client->xid;
1976 memcpy(cmd->csid, csid, max_csid_len);
1983 ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
1984 cmd, client, msg, msglen, csid, cmd->xid);
1985 pthread_mutex_lock(&lvm_thread_mutex);
1986 dm_list_add(&lvm_cmd_head, &cmd->list);
1987 pthread_cond_signal(&lvm_thread_cond);
1988 pthread_mutex_unlock(&lvm_thread_mutex);
1993 /* Return 0 if we can talk to an existing clvmd */
1994 static int check_local_clvmd(void)
1997 struct sockaddr_un sockaddr;
2000 /* Open local socket */
2001 if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
2005 memset(&sockaddr, 0, sizeof(sockaddr));
2006 memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
2007 sockaddr.sun_family = AF_UNIX;
2009 if (connect(local_socket,(struct sockaddr *) &sockaddr,
2010 sizeof(sockaddr))) {
2014 close(local_socket);
2018 static void close_local_sock(int local_socket)
2020 if (local_socket != -1 && close(local_socket))
2023 if (CLVMD_SOCKNAME[0] != '\0' && unlink(CLVMD_SOCKNAME))
2027 /* Open the local socket, that's the one we talk to libclvm down */
2028 static int open_local_sock()
2030 int local_socket = -1;
2031 struct sockaddr_un sockaddr;
2034 close_local_sock(local_socket);
2036 (void) dm_prepare_selinux_context(CLVMD_SOCKNAME, S_IFSOCK);
2037 old_mask = umask(0077);
2039 /* Open local socket */
2040 local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
2041 if (local_socket < 0) {
2042 log_error("Can't create local socket: %m");
2046 /* Set Close-on-exec & non-blocking */
2047 if (fcntl(local_socket, F_SETFD, 1))
2048 DEBUGLOG("setting CLOEXEC on local_socket failed: %s\n", strerror(errno));
2049 fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
2051 memset(&sockaddr, 0, sizeof(sockaddr));
2052 memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
2053 sockaddr.sun_family = AF_UNIX;
2055 if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
2056 log_error("can't bind local socket: %m");
2059 if (listen(local_socket, 1) != 0) {
2060 log_error("listen local: %m");
2065 (void) dm_prepare_selinux_context(NULL, 0);
2066 return local_socket;
2068 close_local_sock(local_socket);
2070 (void) dm_prepare_selinux_context(NULL, 0);
2074 void process_message(struct local_client *client, const char *buf, int len,
2077 struct clvm_header *inheader;
2079 inheader = (struct clvm_header *) buf;
2080 ntoh_clvm(inheader); /* Byteswap fields */
2081 if (inheader->cmd == CLVMD_CMD_REPLY)
2082 process_reply(inheader, len, csid);
2084 add_to_lvmqueue(client, inheader, len, csid);
2088 static void check_all_callback(struct local_client *client, const char *csid,
2092 add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
2096 /* Check to see if all CLVMDs are running (ie one on
2097 every node in the cluster).
2098 If not, returns -1 and prints out a list of errant nodes */
2099 static int check_all_clvmds_running(struct local_client *client)
2101 DEBUGLOG("check_all_clvmds_running\n");
2102 return clops->cluster_do_node_callback(client, check_all_callback);
2105 /* Return a local_client struct given a client ID.
2106 client IDs are in network byte order */
2107 static struct local_client *find_client(int clientid)
2109 struct local_client *thisfd;
2110 for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
2111 if (thisfd->fd == ntohl(clientid))
2117 /* Byte-swapping routines for the header so we
2118 work in a heterogeneous environment */
2119 static void hton_clvm(struct clvm_header *hdr)
2121 hdr->status = htonl(hdr->status);
2122 hdr->arglen = htonl(hdr->arglen);
2123 hdr->xid = htons(hdr->xid);
2124 /* Don't swap clientid as it's only a token as far as
2125 remote nodes are concerned */
2128 static void ntoh_clvm(struct clvm_header *hdr)
2130 hdr->status = ntohl(hdr->status);
2131 hdr->arglen = ntohl(hdr->arglen);
2132 hdr->xid = ntohs(hdr->xid);
2135 /* Handler for SIGUSR2 - sent to kill subthreads */
2136 static void sigusr2_handler(int sig)
2138 DEBUGLOG("SIGUSR2 received\n");
2142 static void sigterm_handler(int sig)
2144 DEBUGLOG("SIGTERM received\n");
2149 static void sighup_handler(int sig)
2151 DEBUGLOG("got SIGHUP\n");
2155 int sync_lock(const char *resource, int mode, int flags, int *lockid)
2157 return clops->sync_lock(resource, mode, flags, lockid);
2160 int sync_unlock(const char *resource, int lockid)
2162 return clops->sync_unlock(resource, lockid);
2165 static if_type_t parse_cluster_interface(char *ifname)
2167 if_type_t iface = IF_AUTO;
2169 if (!strcmp(ifname, "auto"))
2171 if (!strcmp(ifname, "cman"))
2173 if (!strcmp(ifname, "gulm"))
2175 if (!strcmp(ifname, "openais"))
2177 if (!strcmp(ifname, "corosync"))
2178 iface = IF_COROSYNC;
2179 if (!strcmp(ifname, "singlenode"))
2180 iface = IF_SINGLENODE;
2186 * Try and find a cluster system in corosync's objdb, if it is running. This is
2187 * only called if the command-line option is not present, and if it fails
2188 * we still try the interfaces in order.
2190 static if_type_t get_cluster_type()
2192 #ifdef HAVE_COROSYNC_CONFDB_H
2193 confdb_handle_t handle;
2194 if_type_t type = IF_AUTO;
2197 size_t namelen = sizeof(buf);
2198 hdb_handle_t cluster_handle;
2199 hdb_handle_t clvmd_handle;
2200 confdb_callbacks_t callbacks = {
2201 .confdb_key_change_notify_fn = NULL,
2202 .confdb_object_create_change_notify_fn = NULL,
2203 .confdb_object_delete_change_notify_fn = NULL
2206 result = confdb_initialize (&handle, &callbacks);
2207 if (result != CS_OK)
2210 result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
2211 if (result != CS_OK)
2214 result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
2215 if (result != CS_OK)
2218 result = confdb_object_find_start(handle, cluster_handle);
2219 if (result != CS_OK)
2222 result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
2223 if (result != CS_OK)
2226 result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
2227 if (result != CS_OK)
2230 buf[namelen] = '\0';
2231 type = parse_cluster_interface(buf);
2232 DEBUGLOG("got interface type '%s' from confdb\n", buf);
2234 confdb_finalize(handle);