Tizen 2.1 base
[external/device-mapper.git] / daemons / clvmd / clvmd.c
1 /*
2  * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
3  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
4  *
5  * This file is part of LVM2.
6  *
7  * This copyrighted material is made available to anyone wishing to use,
8  * modify, copy, or redistribute it subject to the terms and conditions
9  * of the GNU General Public License v.2.
10  *
11  * You should have received a copy of the GNU General Public License
12  * along with this program; if not, write to the Free Software Foundation,
13  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
14  */
15
16 /*
17  * CLVMD: Cluster LVM daemon
18  */
19
20 #include "clvmd-common.h"
21
22 #include <pthread.h>
23
24 #include "clvmd-comms.h"
25 #include "clvm.h"
26 #include "clvmd.h"
27 #include "lvm-functions.h"
28 #include "lvm-version.h"
29 #include "refresh_clvmd.h"
30
31 #ifdef HAVE_COROSYNC_CONFDB_H
32 #include <corosync/confdb.h>
33 #endif
34
35 #include <fcntl.h>
36 #include <netinet/in.h>
37 #include <signal.h>
38 #include <stddef.h>
39 #include <syslog.h>
40 #include <sys/un.h>
41 #include <sys/utsname.h>
42
43 #ifndef TRUE
44 #define TRUE 1
45 #endif
46 #ifndef FALSE
47 #define FALSE 0
48 #endif
49
50 #define MAX_RETRIES 4
51
52 #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
53
54 /* Head of the fd list. Also contains
55    the cluster_socket details */
56 static struct local_client local_client_head;
57
58 static unsigned short global_xid = 0;   /* Last transaction ID issued */
59
60 struct cluster_ops *clops = NULL;
61
62 static char our_csid[MAX_CSID_LEN];
63 static unsigned max_csid_len;
64 static unsigned max_cluster_message;
65 static unsigned max_cluster_member_name_len;
66
67 /* Structure of items on the LVM thread list */
68 struct lvm_thread_cmd {
69         struct dm_list list;
70
71         struct local_client *client;
72         struct clvm_header *msg;
73         char csid[MAX_CSID_LEN];
74         int remote;             /* Flag */
75         int msglen;
76         unsigned short xid;
77 };
78
79 struct lvm_startup_params {
80         int using_gulm;
81         char **argv;
82 };
83
84 debug_t debug;
85 static pthread_t lvm_thread;
86 static pthread_mutex_t lvm_thread_mutex;
87 static pthread_cond_t lvm_thread_cond;
88 static pthread_mutex_t lvm_start_mutex;
89 static struct dm_list lvm_cmd_head;
90 static volatile sig_atomic_t quit = 0;
91 static volatile sig_atomic_t reread_config = 0;
92 static int child_pipe[2];
93
94 /* Reasons the daemon failed initialisation */
95 #define DFAIL_INIT       1
96 #define DFAIL_LOCAL_SOCK 2
97 #define DFAIL_CLUSTER_IF 3
98 #define DFAIL_MALLOC     4
99 #define DFAIL_TIMEOUT    5
100 #define SUCCESS          0
101
102 typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC, IF_SINGLENODE} if_type_t;
103
104 typedef void *(lvm_pthread_fn_t)(void*);
105
106 /* Prototypes for code further down */
107 static void sigusr2_handler(int sig);
108 static void sighup_handler(int sig);
109 static void sigterm_handler(int sig);
110 static void send_local_reply(struct local_client *client, int status,
111                              int clientid);
112 static void free_reply(struct local_client *client);
113 static void send_version_message(void);
114 static void *pre_and_post_thread(void *arg);
115 static int send_message(void *buf, int msglen, const char *csid, int fd,
116                         const char *errtext);
117 static int read_from_local_sock(struct local_client *thisfd);
118 static int process_local_command(struct clvm_header *msg, int msglen,
119                                  struct local_client *client,
120                                  unsigned short xid);
121 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
122                                    const char *csid);
123 static int process_reply(const struct clvm_header *msg, int msglen,
124                          const char *csid);
125 static int open_local_sock(void);
126 static void close_local_sock(int local_socket);
127 static int check_local_clvmd(void);
128 static struct local_client *find_client(int clientid);
129 static void main_loop(int local_sock, int cmd_timeout);
130 static void be_daemon(int start_timeout);
131 static int check_all_clvmds_running(struct local_client *client);
132 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
133                                      int len, const char *csid,
134                                      struct local_client **new_client);
135 static void lvm_thread_fn(void *) __attribute__ ((noreturn));
136 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
137                            int msglen, const char *csid);
138 static int distribute_command(struct local_client *thisfd);
139 static void hton_clvm(struct clvm_header *hdr);
140 static void ntoh_clvm(struct clvm_header *hdr);
141 static void add_reply_to_list(struct local_client *client, int status,
142                               const char *csid, const char *buf, int len);
143 static if_type_t parse_cluster_interface(char *ifname);
144 static if_type_t get_cluster_type(void);
145
146 static void usage(const char *prog, FILE *file)
147 {
148         fprintf(file, "Usage:\n"
149                 "%s [Vhd]\n\n"
150                 "   -V       Show version of clvmd\n"
151                 "   -h       Show this help information\n"
152                 "   -d       Set debug level\n"
153                 "            If starting clvmd then don't fork, run in the foreground\n"
154                 "   -R       Tell all running clvmds in the cluster to reload their device cache\n"
155                 "   -S       Restart clvmd, preserving exclusive locks\n"
156                 "   -C       Sets debug level (from -d) on all clvmd instances clusterwide\n"
157                 "   -t<secs> Command timeout (default 60 seconds)\n"
158                 "   -T<secs> Startup timeout (default none)\n"
159                 "   -I<cmgr> Cluster manager (default: auto)\n"
160                 "            Available cluster managers: "
161 #ifdef USE_COROSYNC
162                 "corosync "
163 #endif
164 #ifdef USE_CMAN
165                 "cman "
166 #endif
167 #ifdef USE_OPENAIS
168                 "openais "
169 #endif
170 #ifdef USE_GULM
171                 "gulm "
172 #endif
173 #ifdef USE_SINGLENODE
174                 "singlenode "
175 #endif
176                 "\n", prog);
177 }
178
179 /* Called to signal the parent how well we got on during initialisation */
180 static void child_init_signal(int status)
181 {
182         if (child_pipe[1]) {
183                 /* FIXME Use a proper wrapper around write */
184                 if (write(child_pipe[1], &status, sizeof(status)) < 0)
185                         log_sys_error("write", "child_pipe");
186                 if (close(child_pipe[1]))
187                         log_sys_error("close", "child_pipe");
188         }
189 }
190
191 static __attribute__((noreturn)) void child_init_signal_and_exit(int status) 
192 {
193         child_init_signal(status);
194         exit(status);
195 }
196
197 static void safe_close(int *fd)
198 {
199         if (*fd >= 0) {
200                 int to_close = *fd;
201                 *fd = -1;
202                 close(to_close);
203         }
204 }
205
206 void debuglog(const char *fmt, ...)
207 {
208         time_t P;
209         va_list ap;
210         static int syslog_init = 0;
211
212         if (debug == DEBUG_STDERR) {
213                 va_start(ap,fmt);
214                 time(&P);
215                 fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
216                 vfprintf(stderr, fmt, ap);
217                 va_end(ap);
218         }
219         if (debug == DEBUG_SYSLOG) {
220                 if (!syslog_init) {
221                         openlog("clvmd", LOG_PID, LOG_DAEMON);
222                         syslog_init = 1;
223                 }
224
225                 va_start(ap,fmt);
226                 vsyslog(LOG_DEBUG, fmt, ap);
227                 va_end(ap);
228         }
229 }
230
231 static const char *decode_cmd(unsigned char cmdl)
232 {
233         static char buf[128];
234         const char *command;
235
236         switch (cmdl) {
237         case CLVMD_CMD_TEST:
238                 command = "TEST";
239                 break;
240         case CLVMD_CMD_LOCK_VG:
241                 command = "LOCK_VG";
242                 break;
243         case CLVMD_CMD_LOCK_LV:
244                 command = "LOCK_LV";
245                 break;
246         case CLVMD_CMD_REFRESH:
247                 command = "REFRESH";
248                 break;
249         case CLVMD_CMD_SET_DEBUG:
250                 command = "SET_DEBUG";
251                 break;
252         case CLVMD_CMD_GET_CLUSTERNAME:
253                 command = "GET_CLUSTERNAME";
254                 break;
255         case CLVMD_CMD_VG_BACKUP:
256                 command = "VG_BACKUP";
257                 break;
258         case CLVMD_CMD_REPLY:
259                 command = "REPLY";
260                 break;
261         case CLVMD_CMD_VERSION:
262                 command = "VERSION";
263                 break;
264         case CLVMD_CMD_GOAWAY:
265                 command = "GOAWAY";
266                 break;
267         case CLVMD_CMD_LOCK:
268                 command = "LOCK";
269                 break;
270         case CLVMD_CMD_UNLOCK:
271                 command = "UNLOCK";
272                 break;
273         case CLVMD_CMD_LOCK_QUERY:
274                 command = "LOCK_QUERY";
275                 break;
276         case CLVMD_CMD_RESTART:
277                 command = "RESTART";
278                 break;
279         default:
280                 command = "unknown";
281                 break;
282         }
283
284         sprintf(buf, "%s (0x%x)", command, cmdl);
285
286         return buf;
287 }
288
289 static void remove_lockfile(void)
290 {
291         if (unlink(CLVMD_PIDFILE))
292                 log_sys_error("unlink", CLVMD_PIDFILE);
293 }
294
295 /*
296  * clvmd require dm-ioctl capability for operation
297  */
298 static void check_permissions(void)
299 {
300         if (getuid() || geteuid()) {
301                 log_error("Cannot run as a non-root user.");
302
303                  /*
304                   * Fail cleanly here if not run as root, instead of failing
305                   * later when attempting a root-only operation 
306                   * Preferred exit code from an initscript for this.
307                   */
308                 exit(4);
309         }
310 }
311
312 int main(int argc, char *argv[])
313 {
314         int local_sock;
315         struct local_client *newfd;
316         struct utsname nodeinfo;
317         struct lvm_startup_params lvm_params;
318         signed char opt;
319         int cmd_timeout = DEFAULT_CMD_TIMEOUT;
320         int start_timeout = 0;
321         if_type_t cluster_iface = IF_AUTO;
322         sigset_t ss;
323         int using_gulm = 0;
324         int debug_opt = 0;
325         int clusterwide_opt = 0;
326         mode_t old_mask;
327
328         /* Deal with command-line arguments */
329         opterr = 0;
330         optind = 0;
331         while ((opt = getopt(argc, argv, "?vVhd::t:RST:CI:E:")) != EOF) {
332                 switch (opt) {
333                 case 'h':
334                         usage(argv[0], stdout);
335                         exit(0);
336
337                 case '?':
338                         usage(argv[0], stderr);
339                         exit(0);
340
341                 case 'R':
342                         check_permissions();
343                         return refresh_clvmd(1)==1?0:1;
344
345                 case 'S':
346                         check_permissions();
347                         return restart_clvmd(clusterwide_opt)==1?0:1;
348
349                 case 'C':
350                         clusterwide_opt = 1;
351                         break;
352
353                 case 'd':
354                         debug_opt = 1;
355                         if (optarg)
356                                 debug = atoi(optarg);
357                         else
358                                 debug = DEBUG_STDERR;
359                         break;
360
361                 case 't':
362                         cmd_timeout = atoi(optarg);
363                         if (!cmd_timeout) {
364                                 fprintf(stderr, "command timeout is invalid\n");
365                                 usage(argv[0], stderr);
366                                 exit(1);
367                         }
368                         break;
369                 case 'I':
370                         cluster_iface = parse_cluster_interface(optarg);
371                         break;
372                 case 'T':
373                         start_timeout = atoi(optarg);
374                         if (start_timeout <= 0) {
375                                 fprintf(stderr, "startup timeout is invalid\n");
376                                 usage(argv[0], stderr);
377                                 exit(1);
378                         }
379                         break;
380
381                 case 'V':
382                         printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
383                         printf("Protocol version:           %d.%d.%d\n",
384                                CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
385                                CLVMD_PATCH_VERSION);
386                         exit(0);
387                         break;
388
389                 }
390         }
391
392         check_permissions();
393
394         /* Setting debug options on an existing clvmd */
395         if (debug_opt && !check_local_clvmd()) {
396
397                 /* Sending to stderr makes no sense for a detached daemon */
398                 if (debug == DEBUG_STDERR)
399                         debug = DEBUG_SYSLOG;
400                 return debug_clvmd(debug, clusterwide_opt)==1?0:1;
401         }
402
403         /*
404          * Switch to C locale to avoid reading large locale-archive file
405          * used by some glibc (on some distributions it takes over 100MB).
406          * Daemon currently needs to use mlockall().
407          */
408         if (setenv("LANG", "C", 1))
409                 perror("Cannot set LANG to C");
410
411         /* Fork into the background (unless requested not to) */
412         if (debug != DEBUG_STDERR) {
413                 be_daemon(start_timeout);
414         }
415
416         dm_prepare_selinux_context(DEFAULT_RUN_DIR, S_IFDIR);
417         old_mask = umask(0077);
418         if (dm_create_dir(DEFAULT_RUN_DIR) == 0) {
419                 DEBUGLOG("clvmd: unable to create %s directory\n",
420                           DEFAULT_RUN_DIR);
421                 umask(old_mask);
422                 exit(1);
423         }
424         umask(old_mask);
425
426         /* Create pidfile */
427         (void) dm_prepare_selinux_context(CLVMD_PIDFILE, S_IFREG);
428         if (dm_create_lockfile(CLVMD_PIDFILE) == 0) {
429                 DEBUGLOG("clvmd: unable to create lockfile\n");
430                 exit(1);
431         }
432         (void) dm_prepare_selinux_context(NULL, 0);
433
434         atexit(remove_lockfile);
435
436         DEBUGLOG("CLVMD started\n");
437
438         /* Open the Unix socket we listen for commands on.
439            We do this before opening the cluster socket so that
440            potential clients will block rather than error if we are running
441            but the cluster is not ready yet */
442         local_sock = open_local_sock();
443         if (local_sock < 0) {
444                 child_init_signal_and_exit(DFAIL_LOCAL_SOCK);
445                 /* NOTREACHED */
446         }
447
448         /* Set up signal handlers, USR1 is for cluster change notifications (in cman)
449            USR2 causes child threads to exit.
450            HUP causes gulm version to re-read nodes list from CCS.
451            PIPE should be ignored */
452         signal(SIGUSR2, sigusr2_handler);
453         signal(SIGHUP,  sighup_handler);
454         signal(SIGPIPE, SIG_IGN);
455
456         /* Block SIGUSR2/SIGINT/SIGTERM in process */
457         sigemptyset(&ss);
458         sigaddset(&ss, SIGUSR2);
459         sigaddset(&ss, SIGINT);
460         sigaddset(&ss, SIGTERM);
461         sigprocmask(SIG_BLOCK, &ss, NULL);
462
463         /* Initialise the LVM thread variables */
464         dm_list_init(&lvm_cmd_head);
465         pthread_mutex_init(&lvm_thread_mutex, NULL);
466         pthread_cond_init(&lvm_thread_cond, NULL);
467         pthread_mutex_init(&lvm_start_mutex, NULL);
468         init_lvhash();
469
470         /* Start the cluster interface */
471         if (cluster_iface == IF_AUTO)
472                 cluster_iface = get_cluster_type();
473
474 #ifdef USE_CMAN
475         if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) {
476                 max_csid_len = CMAN_MAX_CSID_LEN;
477                 max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
478                 max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
479                 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
480         }
481 #endif
482 #ifdef USE_GULM
483         if (!clops)
484                 if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) {
485                         max_csid_len = GULM_MAX_CSID_LEN;
486                         max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
487                         max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
488                         using_gulm = 1;
489                         syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
490                 }
491 #endif
492 #ifdef USE_COROSYNC
493         if (!clops)
494                 if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) {
495                         max_csid_len = COROSYNC_CSID_LEN;
496                         max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
497                         max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
498                         syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
499                 }
500 #endif
501 #ifdef USE_OPENAIS
502         if (!clops)
503                 if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) {
504                         max_csid_len = OPENAIS_CSID_LEN;
505                         max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
506                         max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
507                         syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
508                 }
509 #endif
510 #ifdef USE_SINGLENODE
511         if (!clops)
512                 if (cluster_iface == IF_SINGLENODE && (clops = init_singlenode_cluster())) {
513                         max_csid_len = SINGLENODE_CSID_LEN;
514                         max_cluster_message = SINGLENODE_MAX_CLUSTER_MESSAGE;
515                         max_cluster_member_name_len = MAX_CLUSTER_MEMBER_NAME_LEN;
516                         syslog(LOG_NOTICE, "Cluster LVM daemon started - running in single-node mode");
517                 }
518 #endif
519
520         if (!clops) {
521                 DEBUGLOG("Can't initialise cluster interface\n");
522                 log_error("Can't initialise cluster interface\n");
523                 child_init_signal_and_exit(DFAIL_CLUSTER_IF);
524                 /* NOTREACHED */
525         }
526         DEBUGLOG("Cluster ready, doing some more initialisation\n");
527
528         /* Save our CSID */
529         uname(&nodeinfo);
530         clops->get_our_csid(our_csid);
531
532         /* Initialise the FD list head */
533         local_client_head.fd = clops->get_main_cluster_fd();
534         local_client_head.type = CLUSTER_MAIN_SOCK;
535         local_client_head.callback = clops->cluster_fd_callback;
536
537         /* Add the local socket to the list */
538         newfd = malloc(sizeof(struct local_client));
539         if (!newfd) {
540                 child_init_signal_and_exit(DFAIL_MALLOC);
541                 /* NOTREACHED */
542         }
543
544         newfd->fd = local_sock;
545         newfd->removeme = 0;
546         newfd->type = LOCAL_RENDEZVOUS;
547         newfd->callback = local_rendezvous_callback;
548         newfd->next = local_client_head.next;
549         local_client_head.next = newfd;
550
551         /* This needs to be started after cluster initialisation
552            as it may need to take out locks */
553         DEBUGLOG("starting LVM thread\n");
554
555         /* Don't let anyone else to do work until we are started */
556         pthread_mutex_lock(&lvm_start_mutex);
557         lvm_params.using_gulm = using_gulm;
558         lvm_params.argv = argv;
559         pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn,
560                         (void *)&lvm_params);
561
562         /* Tell the rest of the cluster our version number */
563         /* CMAN can do this immediately, gulm needs to wait until
564            the core initialisation has finished and the node list
565            has been gathered */
566         if (clops->cluster_init_completed)
567                 clops->cluster_init_completed();
568
569         DEBUGLOG("clvmd ready for work\n");
570         child_init_signal(SUCCESS);
571
572         /* Try to shutdown neatly */
573         signal(SIGTERM, sigterm_handler);
574         signal(SIGINT, sigterm_handler);
575
576         /* Do some work */
577         main_loop(local_sock, cmd_timeout);
578
579         close_local_sock(local_sock);
580         destroy_lvm();
581
582         return 0;
583 }
584
585 /* Called when the GuLM cluster layer has completed initialisation.
586    We send the version message */
587 void clvmd_cluster_init_completed()
588 {
589         send_version_message();
590 }
591
592 /* Data on a connected socket */
593 static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
594                                const char *csid,
595                                struct local_client **new_client)
596 {
597         *new_client = NULL;
598         return read_from_local_sock(thisfd);
599 }
600
601 /* Data on a connected socket */
602 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
603                                      int len, const char *csid,
604                                      struct local_client **new_client)
605 {
606         /* Someone connected to our local socket, accept it. */
607
608         struct sockaddr_un socka;
609         struct local_client *newfd;
610         socklen_t sl = sizeof(socka);
611         int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
612
613         if (client_fd == -1 && errno == EINTR)
614                 return 1;
615
616         if (client_fd >= 0) {
617                 newfd = malloc(sizeof(struct local_client));
618                 if (!newfd) {
619                         close(client_fd);
620                         return 1;
621                 }
622
623                 if (fcntl(client_fd, F_SETFD, 1))
624                         DEBUGLOG("setting CLOEXEC on client fd failed: %s\n", strerror(errno));
625
626                 newfd->fd = client_fd;
627                 newfd->type = LOCAL_SOCK;
628                 newfd->xid = 0;
629                 newfd->removeme = 0;
630                 newfd->callback = local_sock_callback;
631                 newfd->bits.localsock.replies = NULL;
632                 newfd->bits.localsock.expected_replies = 0;
633                 newfd->bits.localsock.cmd = NULL;
634                 newfd->bits.localsock.in_progress = FALSE;
635                 newfd->bits.localsock.sent_out = FALSE;
636                 newfd->bits.localsock.threadid = 0;
637                 newfd->bits.localsock.finished = 0;
638                 newfd->bits.localsock.pipe_client = NULL;
639                 newfd->bits.localsock.private = NULL;
640                 newfd->bits.localsock.all_success = 1;
641                 DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
642                 *new_client = newfd;
643         }
644         return 1;
645 }
646
647 static int local_pipe_callback(struct local_client *thisfd, char *buf,
648                                int maxlen, const char *csid,
649                                struct local_client **new_client)
650 {
651         int len;
652         char buffer[PIPE_BUF];
653         struct local_client *sock_client = thisfd->bits.pipe.client;
654         int status = -1;        /* in error by default */
655
656         len = read(thisfd->fd, buffer, sizeof(int));
657         if (len == -1 && errno == EINTR)
658                 return 1;
659
660         if (len == sizeof(int)) {
661                 memcpy(&status, buffer, sizeof(int));
662         }
663
664         DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
665                  thisfd->fd, len, status);
666
667         /* EOF on pipe or an error, close it */
668         if (len <= 0) {
669                 int jstat;
670                 void *ret = &status;
671                 close(thisfd->fd);
672
673                 /* Clear out the cross-link */
674                 if (thisfd->bits.pipe.client != NULL)
675                         thisfd->bits.pipe.client->bits.localsock.pipe_client =
676                             NULL;
677
678                 /* Reap child thread */
679                 if (thisfd->bits.pipe.threadid) {
680                         jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
681                         thisfd->bits.pipe.threadid = 0;
682                         if (thisfd->bits.pipe.client != NULL)
683                                 thisfd->bits.pipe.client->bits.localsock.
684                                     threadid = 0;
685                 }
686                 return -1;
687         } else {
688                 DEBUGLOG("background routine status was %d, sock_client=%p\n",
689                          status, sock_client);
690                 /* But has the client gone away ?? */
691                 if (sock_client == NULL) {
692                         DEBUGLOG
693                             ("Got PIPE response for dead client, ignoring it\n");
694                 } else {
695                         /* If error then just return that code */
696                         if (status)
697                                 send_local_reply(sock_client, status,
698                                                  sock_client->fd);
699                         else {
700                                 if (sock_client->bits.localsock.state ==
701                                     POST_COMMAND) {
702                                         send_local_reply(sock_client, 0,
703                                                          sock_client->fd);
704                                 } else  // PRE_COMMAND finished.
705                                 {
706                                         if (
707                                             (status =
708                                              distribute_command(sock_client)) !=
709                                             0) send_local_reply(sock_client,
710                                                                 EFBIG,
711                                                                 sock_client->
712                                                                 fd);
713                                 }
714                         }
715                 }
716         }
717         return len;
718 }
719
720 /* If a noed is up, look for it in the reply array, if it's not there then
721    add one with "ETIMEDOUT".
722    NOTE: This won't race with real replies because they happen in the same thread.
723 */
724 static void timedout_callback(struct local_client *client, const char *csid,
725                               int node_up)
726 {
727         if (node_up) {
728                 struct node_reply *reply;
729                 char nodename[max_cluster_member_name_len];
730
731                 clops->name_from_csid(csid, nodename);
732                 DEBUGLOG("Checking for a reply from %s\n", nodename);
733                 pthread_mutex_lock(&client->bits.localsock.reply_mutex);
734
735                 reply = client->bits.localsock.replies;
736                 while (reply && strcmp(reply->node, nodename) != 0) {
737                         reply = reply->next;
738                 }
739
740                 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
741
742                 if (!reply) {
743                         DEBUGLOG("Node %s timed-out\n", nodename);
744                         add_reply_to_list(client, ETIMEDOUT, csid,
745                                           "Command timed out", 18);
746                 }
747         }
748 }
749
750 /* Called when the request has timed out on at least one node. We fill in
751    the remaining node entries with ETIMEDOUT and return.
752
753    By the time we get here the node that caused
754    the timeout could have gone down, in which case we will never get the expected
755    number of replies that triggers the post command so we need to do it here
756 */
757 static void request_timed_out(struct local_client *client)
758 {
759         DEBUGLOG("Request timed-out. padding\n");
760         clops->cluster_do_node_callback(client, timedout_callback);
761
762         if (client->bits.localsock.num_replies !=
763             client->bits.localsock.expected_replies) {
764                 /* Post-process the command */
765                 if (client->bits.localsock.threadid) {
766                         pthread_mutex_lock(&client->bits.localsock.mutex);
767                         client->bits.localsock.state = POST_COMMAND;
768                         pthread_cond_signal(&client->bits.localsock.cond);
769                         pthread_mutex_unlock(&client->bits.localsock.mutex);
770                 }
771         }
772 }
773
774 /* This is where the real work happens */
775 static void main_loop(int local_sock, int cmd_timeout)
776 {
777         DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
778
779         sigset_t ss;
780         sigemptyset(&ss);
781         sigaddset(&ss, SIGINT);
782         sigaddset(&ss, SIGTERM);
783         pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
784         /* Main loop */
785         while (!quit) {
786                 fd_set in;
787                 int select_status;
788                 struct local_client *thisfd;
789                 struct timeval tv = { cmd_timeout, 0 };
790                 int quorate = clops->is_quorate();
791
792                 /* Wait on the cluster FD and all local sockets/pipes */
793                 local_client_head.fd = clops->get_main_cluster_fd();
794                 FD_ZERO(&in);
795                 for (thisfd = &local_client_head; thisfd != NULL;
796                      thisfd = thisfd->next) {
797
798                         if (thisfd->removeme)
799                                 continue;
800
801                         /* if the cluster is not quorate then don't listen for new requests */
802                         if ((thisfd->type != LOCAL_RENDEZVOUS &&
803                              thisfd->type != LOCAL_SOCK) || quorate)
804                                 FD_SET(thisfd->fd, &in);
805                 }
806
807                 select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
808
809                 if (reread_config) {
810                         int saved_errno = errno;
811
812                         reread_config = 0;
813                         if (clops->reread_config)
814                                 clops->reread_config();
815                         errno = saved_errno;
816                 }
817
818                 if (select_status > 0) {
819                         struct local_client *lastfd = NULL;
820                         char csid[MAX_CSID_LEN];
821                         char buf[max_cluster_message];
822
823                         for (thisfd = &local_client_head; thisfd != NULL;
824                              thisfd = thisfd->next) {
825
826                                 if (thisfd->removeme) {
827                                         struct local_client *free_fd;
828                                         lastfd->next = thisfd->next;
829                                         free_fd = thisfd;
830                                         thisfd = lastfd;
831
832                                         DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
833
834                                         /* Queue cleanup, this also frees the client struct */
835                                         add_to_lvmqueue(free_fd, NULL, 0, NULL);
836                                         break;
837                                 }
838
839                                 if (FD_ISSET(thisfd->fd, &in)) {
840                                         struct local_client *newfd = NULL;
841                                         int ret;
842
843                                         /* Do callback */
844                                         ret =
845                                             thisfd->callback(thisfd, buf,
846                                                              sizeof(buf), csid,
847                                                              &newfd);
848                                         /* Ignore EAGAIN */
849                                         if (ret < 0 && (errno == EAGAIN ||
850                                                         errno == EINTR)) continue;
851
852                                         /* Got error or EOF: Remove it from the list safely */
853                                         if (ret <= 0) {
854                                                 struct local_client *free_fd;
855                                                 int type = thisfd->type;
856
857                                                 /* If the cluster socket shuts down, so do we */
858                                                 if (type == CLUSTER_MAIN_SOCK ||
859                                                     type == CLUSTER_INTERNAL)
860                                                         goto closedown;
861
862                                                 DEBUGLOG("ret == %d, errno = %d. removing client\n",
863                                                          ret, errno);
864                                                 lastfd->next = thisfd->next;
865                                                 free_fd = thisfd;
866                                                 thisfd = lastfd;
867                                                 safe_close(&(free_fd->fd));
868
869                                                 /* Queue cleanup, this also frees the client struct */
870                                                 add_to_lvmqueue(free_fd, NULL, 0, NULL);
871                                                 break;
872                                         }
873
874                                         /* New client...simply add it to the list */
875                                         if (newfd) {
876                                                 newfd->next = thisfd->next;
877                                                 thisfd->next = newfd;
878                                                 break;
879                                         }
880                                 }
881                                 lastfd = thisfd;
882                         }
883                 }
884
885                 /* Select timed out. Check for clients that have been waiting too long for a response */
886                 if (select_status == 0) {
887                         time_t the_time = time(NULL);
888
889                         for (thisfd = &local_client_head; thisfd != NULL;
890                              thisfd = thisfd->next) {
891                                 if (thisfd->type == LOCAL_SOCK
892                                     && thisfd->bits.localsock.sent_out
893                                     && thisfd->bits.localsock.sent_time +
894                                     cmd_timeout < the_time
895                                     && thisfd->bits.localsock.
896                                     expected_replies !=
897                                     thisfd->bits.localsock.num_replies) {
898                                         /* Send timed out message + replies we already have */
899                                         DEBUGLOG
900                                             ("Request timed-out (send: %ld, now: %ld)\n",
901                                              thisfd->bits.localsock.sent_time,
902                                              the_time);
903
904                                         thisfd->bits.localsock.all_success = 0;
905
906                                         request_timed_out(thisfd);
907                                 }
908                         }
909                 }
910                 if (select_status < 0) {
911                         if (errno == EINTR)
912                                 continue;
913
914 #ifdef DEBUG
915                         perror("select error");
916                         exit(-1);
917 #endif
918                 }
919         }
920
921       closedown:
922         clops->cluster_closedown();
923 }
924
925 static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
926 {
927         int child_status;
928         int sstat;
929         fd_set fds;
930         struct timeval tv = {timeout, 0};
931
932         FD_ZERO(&fds);
933         FD_SET(c_pipe, &fds);
934
935         sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
936         if (sstat == 0) {
937                 fprintf(stderr, "clvmd startup timed out\n");
938                 exit(DFAIL_TIMEOUT);
939         }
940         if (sstat == 1) {
941                 if (read(c_pipe, &child_status, sizeof(child_status)) !=
942                     sizeof(child_status)) {
943
944                         fprintf(stderr, "clvmd failed in initialisation\n");
945                         exit(DFAIL_INIT);
946                 }
947                 else {
948                         switch (child_status) {
949                         case SUCCESS:
950                                 break;
951                         case DFAIL_INIT:
952                                 fprintf(stderr, "clvmd failed in initialisation\n");
953                                 break;
954                         case DFAIL_LOCAL_SOCK:
955                                 fprintf(stderr, "clvmd could not create local socket\n");
956                                 fprintf(stderr, "Another clvmd is probably already running\n");
957                                 break;
958                         case DFAIL_CLUSTER_IF:
959                                 fprintf(stderr, "clvmd could not connect to cluster manager\n");
960                                 fprintf(stderr, "Consult syslog for more information\n");
961                                 break;
962                         case DFAIL_MALLOC:
963                                 fprintf(stderr, "clvmd failed, not enough memory\n");
964                                 break;
965                         default:
966                                 fprintf(stderr, "clvmd failed, error was %d\n", child_status);
967                                 break;
968                         }
969                         exit(child_status);
970                 }
971         }
972         fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
973         exit(DFAIL_INIT);
974 }
975
976 /*
977  * Fork into the background and detach from our parent process.
978  * In the interests of user-friendliness we wait for the daemon
979  * to complete initialisation before returning its status
980  * the the user.
981  */
982 static void be_daemon(int timeout)
983 {
984         int devnull = open("/dev/null", O_RDWR);
985         if (devnull == -1) {
986                 perror("Can't open /dev/null");
987                 exit(3);
988         }
989
990         pipe(child_pipe);
991
992         switch (fork()) {
993         case -1:
994                 perror("clvmd: can't fork");
995                 exit(2);
996
997         case 0:         /* Child */
998                 close(child_pipe[0]);
999                 break;
1000
1001         default:       /* Parent */
1002                 close(child_pipe[1]);
1003                 wait_for_child(child_pipe[0], timeout);
1004         }
1005
1006         /* Detach ourself from the calling environment */
1007         if (close(0) || close(1) || close(2)) {
1008                 perror("Error closing terminal FDs");
1009                 exit(4);
1010         }
1011         setsid();
1012
1013         if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
1014             || dup2(devnull, 2) < 0) {
1015                 perror("Error setting terminal FDs to /dev/null");
1016                 log_error("Error setting terminal FDs to /dev/null: %m");
1017                 exit(5);
1018         }
1019         if (chdir("/")) {
1020                 log_error("Error setting current directory to /: %m");
1021                 exit(6);
1022         }
1023
1024 }
1025
1026 /* Called when we have a read from the local socket.
1027    was in the main loop but it's grown up and is a big girl now */
1028 static int read_from_local_sock(struct local_client *thisfd)
1029 {
1030         int len;
1031         int argslen;
1032         int missing_len;
1033         char buffer[PIPE_BUF];
1034
1035         len = read(thisfd->fd, buffer, sizeof(buffer));
1036         if (len == -1 && errno == EINTR)
1037                 return 1;
1038
1039         DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
1040
1041         /* EOF or error on socket */
1042         if (len <= 0) {
1043                 int *status;
1044                 int jstat;
1045
1046                 DEBUGLOG("EOF on local socket: inprogress=%d\n",
1047                          thisfd->bits.localsock.in_progress);
1048
1049                 thisfd->bits.localsock.finished = 1;
1050
1051                 /* If the client went away in mid command then tidy up */
1052                 if (thisfd->bits.localsock.in_progress) {
1053                         pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
1054                         pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1055                         thisfd->bits.localsock.state = POST_COMMAND;
1056                         pthread_cond_signal(&thisfd->bits.localsock.cond);
1057                         pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1058
1059                         /* Free any unsent buffers */
1060                         free_reply(thisfd);
1061                 }
1062
1063                 /* Kill the subthread & free resources */
1064                 if (thisfd->bits.localsock.threadid) {
1065                         DEBUGLOG("Waiting for child thread\n");
1066                         pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1067                         thisfd->bits.localsock.state = PRE_COMMAND;
1068                         pthread_cond_signal(&thisfd->bits.localsock.cond);
1069                         pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1070
1071                         jstat =
1072                             pthread_join(thisfd->bits.localsock.threadid,
1073                                          (void **) &status);
1074                         DEBUGLOG("Joined child thread\n");
1075
1076                         thisfd->bits.localsock.threadid = 0;
1077                         pthread_cond_destroy(&thisfd->bits.localsock.cond);
1078                         pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
1079
1080                         /* Remove the pipe client */
1081                         if (thisfd->bits.localsock.pipe_client != NULL) {
1082                                 struct local_client *newfd;
1083                                 struct local_client *lastfd = NULL;
1084                                 struct local_client *free_fd = NULL;
1085
1086                                 close(thisfd->bits.localsock.pipe_client->fd);  /* Close pipe */
1087                                 close(thisfd->bits.localsock.pipe);
1088
1089                                 /* Remove pipe client */
1090                                 for (newfd = &local_client_head; newfd != NULL;
1091                                      newfd = newfd->next) {
1092                                         if (thisfd->bits.localsock.
1093                                             pipe_client == newfd) {
1094                                                 thisfd->bits.localsock.
1095                                                     pipe_client = NULL;
1096
1097                                                 lastfd->next = newfd->next;
1098                                                 free_fd = newfd;
1099                                                 newfd->next = lastfd;
1100                                                 free(free_fd);
1101                                                 break;
1102                                         }
1103                                         lastfd = newfd;
1104                                 }
1105                         }
1106                 }
1107
1108                 /* Free the command buffer */
1109                 free(thisfd->bits.localsock.cmd);
1110
1111                 /* Clear out the cross-link */
1112                 if (thisfd->bits.localsock.pipe_client != NULL)
1113                         thisfd->bits.localsock.pipe_client->bits.pipe.client =
1114                             NULL;
1115
1116                 safe_close(&(thisfd->fd));
1117                 return 0;
1118         } else {
1119                 int comms_pipe[2];
1120                 struct local_client *newfd;
1121                 char csid[MAX_CSID_LEN];
1122                 struct clvm_header *inheader;
1123                 int status;
1124
1125                 inheader = (struct clvm_header *) buffer;
1126
1127                 /* Fill in the client ID */
1128                 inheader->clientid = htonl(thisfd->fd);
1129
1130                 /* If we are already busy then return an error */
1131                 if (thisfd->bits.localsock.in_progress) {
1132                         struct clvm_header reply;
1133                         reply.cmd = CLVMD_CMD_REPLY;
1134                         reply.status = EBUSY;
1135                         reply.arglen = 0;
1136                         reply.flags = 0;
1137                         send_message(&reply, sizeof(reply), our_csid,
1138                                      thisfd->fd,
1139                                      "Error sending EBUSY reply to local user");
1140                         return len;
1141                 }
1142
1143                 /* Free any old buffer space */
1144                 free(thisfd->bits.localsock.cmd);
1145
1146                 /* See if we have the whole message */
1147                 argslen =
1148                     len - strlen(inheader->node) - sizeof(struct clvm_header);
1149                 missing_len = inheader->arglen - argslen;
1150
1151                 if (missing_len < 0)
1152                         missing_len = 0;
1153
1154                 /* Save the message */
1155                 thisfd->bits.localsock.cmd = malloc(len + missing_len);
1156
1157                 if (!thisfd->bits.localsock.cmd) {
1158                         struct clvm_header reply;
1159                         reply.cmd = CLVMD_CMD_REPLY;
1160                         reply.status = ENOMEM;
1161                         reply.arglen = 0;
1162                         reply.flags = 0;
1163                         send_message(&reply, sizeof(reply), our_csid,
1164                                      thisfd->fd,
1165                                      "Error sending ENOMEM reply to local user");
1166                         return 0;
1167                 }
1168                 memcpy(thisfd->bits.localsock.cmd, buffer, len);
1169                 thisfd->bits.localsock.cmd_len = len + missing_len;
1170                 inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
1171
1172                 /* If we don't have the full message then read the rest now */
1173                 if (missing_len) {
1174                         char *argptr =
1175                             inheader->node + strlen(inheader->node) + 1;
1176
1177                         while (missing_len > 0 && len >= 0) {
1178                                 DEBUGLOG
1179                                     ("got %d bytes, need another %d (total %d)\n",
1180                                      argslen, missing_len, inheader->arglen);
1181                                 len = read(thisfd->fd, argptr + argslen,
1182                                            missing_len);
1183                                 if (len >= 0) {
1184                                         missing_len -= len;
1185                                         argslen += len;
1186                                 }
1187                         }
1188                 }
1189
1190                 /* Initialise and lock the mutex so the subthread will wait after
1191                    finishing the PRE routine */
1192                 if (!thisfd->bits.localsock.threadid) {
1193                         pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
1194                         pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
1195                         pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
1196                 }
1197
1198                 /* Only run the command if all the cluster nodes are running CLVMD */
1199                 if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
1200                     (check_all_clvmds_running(thisfd) == -1)) {
1201                         thisfd->bits.localsock.expected_replies = 0;
1202                         thisfd->bits.localsock.num_replies = 0;
1203                         send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
1204                         return len;
1205                 }
1206
1207                 /* Check the node name for validity */
1208                 if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
1209                         /* Error, node is not in the cluster */
1210                         struct clvm_header reply;
1211                         DEBUGLOG("Unknown node: '%s'\n", inheader->node);
1212
1213                         reply.cmd = CLVMD_CMD_REPLY;
1214                         reply.status = ENOENT;
1215                         reply.flags = 0;
1216                         reply.arglen = 0;
1217                         send_message(&reply, sizeof(reply), our_csid,
1218                                      thisfd->fd,
1219                                      "Error sending ENOENT reply to local user");
1220                         thisfd->bits.localsock.expected_replies = 0;
1221                         thisfd->bits.localsock.num_replies = 0;
1222                         thisfd->bits.localsock.in_progress = FALSE;
1223                         thisfd->bits.localsock.sent_out = FALSE;
1224                         return len;
1225                 }
1226
1227                 /* If we already have a subthread then just signal it to start */
1228                 if (thisfd->bits.localsock.threadid) {
1229                         pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1230                         thisfd->bits.localsock.state = PRE_COMMAND;
1231                         pthread_cond_signal(&thisfd->bits.localsock.cond);
1232                         pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1233                         return len;
1234                 }
1235
1236                 /* Create a pipe and add the reading end to our FD list */
1237                 pipe(comms_pipe);
1238                 newfd = malloc(sizeof(struct local_client));
1239                 if (!newfd) {
1240                         struct clvm_header reply;
1241                         close(comms_pipe[0]);
1242                         close(comms_pipe[1]);
1243
1244                         reply.cmd = CLVMD_CMD_REPLY;
1245                         reply.status = ENOMEM;
1246                         reply.arglen = 0;
1247                         reply.flags = 0;
1248                         send_message(&reply, sizeof(reply), our_csid,
1249                                      thisfd->fd,
1250                                      "Error sending ENOMEM reply to local user");
1251                         return len;
1252                 }
1253                 DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
1254                          comms_pipe[1]);
1255
1256                 if (fcntl(comms_pipe[0], F_SETFD, 1))
1257                         DEBUGLOG("setting CLOEXEC on pipe[0] failed: %s\n", strerror(errno));
1258                 if (fcntl(comms_pipe[1], F_SETFD, 1))
1259                         DEBUGLOG("setting CLOEXEC on pipe[1] failed: %s\n", strerror(errno));
1260
1261                 newfd->fd = comms_pipe[0];
1262                 newfd->removeme = 0;
1263                 newfd->type = THREAD_PIPE;
1264                 newfd->callback = local_pipe_callback;
1265                 newfd->next = thisfd->next;
1266                 newfd->bits.pipe.client = thisfd;
1267                 newfd->bits.pipe.threadid = 0;
1268                 thisfd->next = newfd;
1269
1270                 /* Store a cross link to the pipe */
1271                 thisfd->bits.localsock.pipe_client = newfd;
1272
1273                 thisfd->bits.localsock.pipe = comms_pipe[1];
1274
1275                 /* Make sure the thread has a copy of it's own ID */
1276                 newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
1277
1278                 /* Run the pre routine */
1279                 thisfd->bits.localsock.in_progress = TRUE;
1280                 thisfd->bits.localsock.state = PRE_COMMAND;
1281                 DEBUGLOG("Creating pre&post thread\n");
1282                 status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
1283                                pre_and_post_thread, thisfd);
1284                 DEBUGLOG("Created pre&post thread, state = %d\n", status);
1285         }
1286         return len;
1287 }
1288
1289 /* Add a file descriptor from the cluster or comms interface to
1290    our list of FDs for select
1291 */
1292 int add_client(struct local_client *new_client)
1293 {
1294         new_client->next = local_client_head.next;
1295         local_client_head.next = new_client;
1296
1297         return 0;
1298 }
1299
1300 /* Called when the pre-command has completed successfully - we
1301    now execute the real command on all the requested nodes */
1302 static int distribute_command(struct local_client *thisfd)
1303 {
1304         struct clvm_header *inheader =
1305             (struct clvm_header *) thisfd->bits.localsock.cmd;
1306         int len = thisfd->bits.localsock.cmd_len;
1307
1308         thisfd->xid = global_xid++;
1309         DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
1310
1311         /* Forward it to other nodes in the cluster if needed */
1312         if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
1313                 /* if node is empty then do it on the whole cluster */
1314                 if (inheader->node[0] == '\0') {
1315                         thisfd->bits.localsock.expected_replies =
1316                             clops->get_num_nodes();
1317                         thisfd->bits.localsock.num_replies = 0;
1318                         thisfd->bits.localsock.sent_time = time(NULL);
1319                         thisfd->bits.localsock.in_progress = TRUE;
1320                         thisfd->bits.localsock.sent_out = TRUE;
1321
1322                         /* Do it here first */
1323                         add_to_lvmqueue(thisfd, inheader, len, NULL);
1324
1325                         DEBUGLOG("Sending message to all cluster nodes\n");
1326                         inheader->xid = thisfd->xid;
1327                         send_message(inheader, len, NULL, -1,
1328                                      "Error forwarding message to cluster");
1329                 } else {
1330                         /* Do it on a single node */
1331                         char csid[MAX_CSID_LEN];
1332
1333                         if (clops->csid_from_name(csid, inheader->node)) {
1334                                 /* This has already been checked so should not happen */
1335                                 return 0;
1336                         } else {
1337                                 /* OK, found a node... */
1338                                 thisfd->bits.localsock.expected_replies = 1;
1339                                 thisfd->bits.localsock.num_replies = 0;
1340                                 thisfd->bits.localsock.in_progress = TRUE;
1341
1342                                 /* Are we the requested node ?? */
1343                                 if (memcmp(csid, our_csid, max_csid_len) == 0) {
1344                                         DEBUGLOG("Doing command on local node only\n");
1345                                         add_to_lvmqueue(thisfd, inheader, len, NULL);
1346                                 } else {
1347                                         DEBUGLOG("Sending message to single node: %s\n",
1348                                                  inheader->node);
1349                                         inheader->xid = thisfd->xid;
1350                                         send_message(inheader, len,
1351                                                      csid, -1,
1352                                                      "Error forwarding message to cluster node");
1353                                 }
1354                         }
1355                 }
1356         } else {
1357                 /* Local explicitly requested, ignore nodes */
1358                 thisfd->bits.localsock.in_progress = TRUE;
1359                 thisfd->bits.localsock.expected_replies = 1;
1360                 thisfd->bits.localsock.num_replies = 0;
1361                 add_to_lvmqueue(thisfd, inheader, len, NULL);
1362         }
1363         return 0;
1364 }
1365
1366 /* Process a command from a remote node and return the result */
1367 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
1368                                    const char *csid)
1369 {
1370         char *replyargs;
1371         char nodename[max_cluster_member_name_len];
1372         int replylen = 0;
1373         int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1374         int status;
1375         int msg_malloced = 0;
1376
1377         /* Get the node name as we /may/ need it later */
1378         clops->name_from_csid(csid, nodename);
1379
1380         DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
1381                  decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
1382
1383         /* Check for GOAWAY and sulk */
1384         if (msg->cmd == CLVMD_CMD_GOAWAY) {
1385
1386                 DEBUGLOG("Told to go away by %s\n", nodename);
1387                 log_error("Told to go away by %s\n", nodename);
1388                 exit(99);
1389         }
1390
1391         /* Version check is internal - don't bother exposing it in
1392            clvmd-command.c */
1393         if (msg->cmd == CLVMD_CMD_VERSION) {
1394                 int version_nums[3];
1395                 char node[256];
1396
1397                 memcpy(version_nums, msg->args, sizeof(version_nums));
1398
1399                 clops->name_from_csid(csid, node);
1400                 DEBUGLOG("Remote node %s is version %d.%d.%d\n",
1401                          node,
1402                          ntohl(version_nums[0]),
1403                          ntohl(version_nums[1]), ntohl(version_nums[2]));
1404
1405                 if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
1406                         struct clvm_header byebyemsg;
1407                         DEBUGLOG
1408                             ("Telling node %s to go away because of incompatible version number\n",
1409                              node);
1410                         log_notice
1411                             ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
1412                              node, ntohl(version_nums[0]),
1413                              ntohl(version_nums[1]), ntohl(version_nums[2]));
1414
1415                         byebyemsg.cmd = CLVMD_CMD_GOAWAY;
1416                         byebyemsg.status = 0;
1417                         byebyemsg.flags = 0;
1418                         byebyemsg.arglen = 0;
1419                         byebyemsg.clientid = 0;
1420                         clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
1421                                              our_csid,
1422                                              "Error Sending GOAWAY message");
1423                 } else {
1424                         clops->add_up_node(csid);
1425                 }
1426                 return;
1427         }
1428
1429         /* Allocate a default reply buffer */
1430         replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
1431
1432         if (replyargs != NULL) {
1433                 /* Run the command */
1434                 status =
1435                     do_command(NULL, msg, msglen, &replyargs, buflen,
1436                                &replylen);
1437         } else {
1438                 status = ENOMEM;
1439         }
1440
1441         /* If it wasn't a reply, then reply */
1442         if (msg->cmd != CLVMD_CMD_REPLY) {
1443                 char *aggreply;
1444
1445                 aggreply =
1446                     realloc(replyargs, replylen + sizeof(struct clvm_header));
1447                 if (aggreply) {
1448                         struct clvm_header *agghead =
1449                             (struct clvm_header *) aggreply;
1450
1451                         replyargs = aggreply;
1452                         /* Move it up so there's room for a header in front of the data */
1453                         memmove(aggreply + offsetof(struct clvm_header, args),
1454                                 replyargs, replylen);
1455
1456                         agghead->xid = msg->xid;
1457                         agghead->cmd = CLVMD_CMD_REPLY;
1458                         agghead->status = status;
1459                         agghead->flags = 0;
1460                         agghead->clientid = msg->clientid;
1461                         agghead->arglen = replylen;
1462                         agghead->node[0] = '\0';
1463                         send_message(aggreply,
1464                                      sizeof(struct clvm_header) +
1465                                      replylen, csid, fd,
1466                                      "Error sending command reply");
1467                 } else {
1468                         struct clvm_header head;
1469
1470                         DEBUGLOG("Error attempting to realloc return buffer\n");
1471                         /* Return a failure response */
1472                         head.cmd = CLVMD_CMD_REPLY;
1473                         head.status = ENOMEM;
1474                         head.flags = 0;
1475                         head.clientid = msg->clientid;
1476                         head.arglen = 0;
1477                         head.node[0] = '\0';
1478                         send_message(&head, sizeof(struct clvm_header), csid,
1479                                      fd, "Error sending ENOMEM command reply");
1480                         return;
1481                 }
1482         }
1483
1484         /* Free buffer if it was malloced */
1485         if (msg_malloced) {
1486                 free(msg);
1487         }
1488         free(replyargs);
1489 }
1490
1491 /* Add a reply to a command to the list of replies for this client.
1492    If we have got a full set then send them to the waiting client down the local
1493    socket */
1494 static void add_reply_to_list(struct local_client *client, int status,
1495                               const char *csid, const char *buf, int len)
1496 {
1497         struct node_reply *reply;
1498
1499         pthread_mutex_lock(&client->bits.localsock.reply_mutex);
1500
1501         /* Add it to the list of replies */
1502         reply = malloc(sizeof(struct node_reply));
1503         if (reply) {
1504                 reply->status = status;
1505                 clops->name_from_csid(csid, reply->node);
1506                 DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
1507
1508                 if (len > 0) {
1509                         reply->replymsg = malloc(len);
1510                         if (!reply->replymsg) {
1511                                 reply->status = ENOMEM;
1512                         } else {
1513                                 memcpy(reply->replymsg, buf, len);
1514                         }
1515                 } else {
1516                         reply->replymsg = NULL;
1517                 }
1518                 /* Hook it onto the reply chain */
1519                 reply->next = client->bits.localsock.replies;
1520                 client->bits.localsock.replies = reply;
1521         } else {
1522                 /* It's all gone horribly wrong... */
1523                 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1524                 send_local_reply(client, ENOMEM, client->fd);
1525                 return;
1526         }
1527         DEBUGLOG("Got %d replies, expecting: %d\n",
1528                  client->bits.localsock.num_replies + 1,
1529                  client->bits.localsock.expected_replies);
1530
1531         /* If we have the whole lot then do the post-process */
1532         if (++client->bits.localsock.num_replies ==
1533             client->bits.localsock.expected_replies) {
1534                 /* Post-process the command */
1535                 if (client->bits.localsock.threadid) {
1536                         pthread_mutex_lock(&client->bits.localsock.mutex);
1537                         client->bits.localsock.state = POST_COMMAND;
1538                         pthread_cond_signal(&client->bits.localsock.cond);
1539                         pthread_mutex_unlock(&client->bits.localsock.mutex);
1540                 }
1541         }
1542         pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1543 }
1544
1545 /* This is the thread that runs the PRE and post commands for a particular connection */
1546 static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
1547 {
1548         struct local_client *client = (struct local_client *) arg;
1549         int status;
1550         int write_status;
1551         sigset_t ss;
1552         int pipe_fd = client->bits.localsock.pipe;
1553
1554         DEBUGLOG("in sub thread: client = %p\n", client);
1555         pthread_mutex_lock(&client->bits.localsock.mutex);
1556
1557         /* Don't start until the LVM thread is ready */
1558         pthread_mutex_lock(&lvm_start_mutex);
1559         pthread_mutex_unlock(&lvm_start_mutex);
1560         DEBUGLOG("Sub thread ready for work.\n");
1561
1562         /* Ignore SIGUSR1 (handled by master process) but enable
1563            SIGUSR2 (kills subthreads) */
1564         sigemptyset(&ss);
1565         sigaddset(&ss, SIGUSR1);
1566         pthread_sigmask(SIG_BLOCK, &ss, NULL);
1567
1568         sigdelset(&ss, SIGUSR1);
1569         sigaddset(&ss, SIGUSR2);
1570         pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
1571
1572         /* Loop around doing PRE and POST functions until the client goes away */
1573         while (!client->bits.localsock.finished) {
1574                 /* Execute the code */
1575                 status = do_pre_command(client);
1576
1577                 if (status)
1578                         client->bits.localsock.all_success = 0;
1579
1580                 DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
1581
1582                 /* Tell the parent process we have finished this bit */
1583                 do {
1584                         write_status = write(pipe_fd, &status, sizeof(int));
1585                         if (write_status == sizeof(int))
1586                                 break;
1587                         if (write_status < 0 &&
1588                             (errno == EINTR || errno == EAGAIN))
1589                                 continue;
1590                         log_error("Error sending to pipe: %m\n");
1591                         break;
1592                 } while(1);
1593
1594                 if (status) {
1595                         client->bits.localsock.state = POST_COMMAND;
1596                         goto next_pre;
1597                 }
1598
1599                 /* We may need to wait for the condition variable before running the post command */
1600                 DEBUGLOG("Waiting to do post command - state = %d\n",
1601                          client->bits.localsock.state);
1602
1603                 if (client->bits.localsock.state != POST_COMMAND &&
1604                     !client->bits.localsock.finished) {
1605                         pthread_cond_wait(&client->bits.localsock.cond,
1606                                           &client->bits.localsock.mutex);
1607                 }
1608
1609                 DEBUGLOG("Got post command condition...\n");
1610
1611                 /* POST function must always run, even if the client aborts */
1612                 status = 0;
1613                 do_post_command(client);
1614
1615                 do {
1616                         write_status = write(pipe_fd, &status, sizeof(int));
1617                         if (write_status == sizeof(int))
1618                                 break;
1619                         if (write_status < 0 &&
1620                             (errno == EINTR || errno == EAGAIN))
1621                                 continue;
1622                         log_error("Error sending to pipe: %m\n");
1623                         break;
1624                 } while(1);
1625 next_pre:
1626                 DEBUGLOG("Waiting for next pre command\n");
1627
1628                 if (client->bits.localsock.state != PRE_COMMAND &&
1629                     !client->bits.localsock.finished) {
1630                         pthread_cond_wait(&client->bits.localsock.cond,
1631                                           &client->bits.localsock.mutex);
1632                 }
1633
1634                 DEBUGLOG("Got pre command condition...\n");
1635         }
1636         pthread_mutex_unlock(&client->bits.localsock.mutex);
1637         DEBUGLOG("Subthread finished\n");
1638         pthread_exit((void *) 0);
1639 }
1640
1641 /* Process a command on the local node and store the result */
1642 static int process_local_command(struct clvm_header *msg, int msglen,
1643                                  struct local_client *client,
1644                                  unsigned short xid)
1645 {
1646         char *replybuf = malloc(max_cluster_message);
1647         int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1648         int replylen = 0;
1649         int status;
1650
1651         DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
1652                  decode_cmd(msg->cmd), msg, msglen, client);
1653
1654         if (replybuf == NULL)
1655                 return -1;
1656
1657         status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
1658
1659         if (status)
1660                 client->bits.localsock.all_success = 0;
1661
1662         /* If we took too long then discard the reply */
1663         if (xid == client->xid) {
1664                 add_reply_to_list(client, status, our_csid, replybuf, replylen);
1665         } else {
1666                 DEBUGLOG
1667                     ("Local command took too long, discarding xid %d, current is %d\n",
1668                      xid, client->xid);
1669         }
1670
1671         free(replybuf);
1672         return status;
1673 }
1674
1675 static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
1676 {
1677         struct local_client *client = NULL;
1678
1679         client = find_client(msg->clientid);
1680         if (!client) {
1681                 DEBUGLOG("Got message for unknown client 0x%x\n",
1682                          msg->clientid);
1683                 log_error("Got message for unknown client 0x%x\n",
1684                           msg->clientid);
1685                 return -1;
1686         }
1687
1688         if (msg->status)
1689                 client->bits.localsock.all_success = 0;
1690
1691         /* Gather replies together for this client id */
1692         if (msg->xid == client->xid) {
1693                 add_reply_to_list(client, msg->status, csid, msg->args,
1694                                   msg->arglen);
1695         } else {
1696                 DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
1697                          msg->xid, client->xid);
1698         }
1699         return 0;
1700 }
1701
1702 /* Send an aggregated reply back to the client */
1703 static void send_local_reply(struct local_client *client, int status, int fd)
1704 {
1705         struct clvm_header *clientreply;
1706         struct node_reply *thisreply = client->bits.localsock.replies;
1707         char *replybuf;
1708         char *ptr;
1709         int message_len = 0;
1710
1711         DEBUGLOG("Send local reply\n");
1712
1713         /* Work out the total size of the reply */
1714         while (thisreply) {
1715                 if (thisreply->replymsg)
1716                         message_len += strlen(thisreply->replymsg) + 1;
1717                 else
1718                         message_len++;
1719
1720                 message_len += strlen(thisreply->node) + 1 + sizeof(int);
1721
1722                 thisreply = thisreply->next;
1723         }
1724
1725         /* Add in the size of our header */
1726         message_len = message_len + sizeof(struct clvm_header) + 1;
1727         replybuf = malloc(message_len);
1728
1729         clientreply = (struct clvm_header *) replybuf;
1730         clientreply->status = status;
1731         clientreply->cmd = CLVMD_CMD_REPLY;
1732         clientreply->node[0] = '\0';
1733         clientreply->flags = 0;
1734
1735         ptr = clientreply->args;
1736
1737         /* Add in all the replies, and free them as we go */
1738         thisreply = client->bits.localsock.replies;
1739         while (thisreply) {
1740                 struct node_reply *tempreply = thisreply;
1741
1742                 strcpy(ptr, thisreply->node);
1743                 ptr += strlen(thisreply->node) + 1;
1744
1745                 if (thisreply->status)
1746                         clientreply->flags |= CLVMD_FLAG_NODEERRS;
1747
1748                 memcpy(ptr, &thisreply->status, sizeof(int));
1749                 ptr += sizeof(int);
1750
1751                 if (thisreply->replymsg) {
1752                         strcpy(ptr, thisreply->replymsg);
1753                         ptr += strlen(thisreply->replymsg) + 1;
1754                 } else {
1755                         ptr[0] = '\0';
1756                         ptr++;
1757                 }
1758                 thisreply = thisreply->next;
1759
1760                 free(tempreply->replymsg);
1761                 free(tempreply);
1762         }
1763
1764         /* Terminate with an empty node name */
1765         *ptr = '\0';
1766
1767         clientreply->arglen = ptr - clientreply->args + 1;
1768
1769         /* And send it */
1770         send_message(replybuf, message_len, our_csid, fd,
1771                      "Error sending REPLY to client");
1772         free(replybuf);
1773
1774         /* Reset comms variables */
1775         client->bits.localsock.replies = NULL;
1776         client->bits.localsock.expected_replies = 0;
1777         client->bits.localsock.in_progress = FALSE;
1778         client->bits.localsock.sent_out = FALSE;
1779 }
1780
1781 /* Just free a reply chain baceuse it wasn't used. */
1782 static void free_reply(struct local_client *client)
1783 {
1784         /* Add in all the replies, and free them as we go */
1785         struct node_reply *thisreply = client->bits.localsock.replies;
1786         while (thisreply) {
1787                 struct node_reply *tempreply = thisreply;
1788
1789                 thisreply = thisreply->next;
1790
1791                 free(tempreply->replymsg);
1792                 free(tempreply);
1793         }
1794         client->bits.localsock.replies = NULL;
1795 }
1796
1797 /* Send our version number to the cluster */
1798 static void send_version_message()
1799 {
1800         char message[sizeof(struct clvm_header) + sizeof(int) * 3];
1801         struct clvm_header *msg = (struct clvm_header *) message;
1802         int version_nums[3];
1803
1804         msg->cmd = CLVMD_CMD_VERSION;
1805         msg->status = 0;
1806         msg->flags = 0;
1807         msg->clientid = 0;
1808         msg->arglen = sizeof(version_nums);
1809
1810         version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
1811         version_nums[1] = htonl(CLVMD_MINOR_VERSION);
1812         version_nums[2] = htonl(CLVMD_PATCH_VERSION);
1813
1814         memcpy(&msg->args, version_nums, sizeof(version_nums));
1815
1816         hton_clvm(msg);
1817
1818         clops->cluster_send_message(message, sizeof(message), NULL,
1819                              "Error Sending version number");
1820 }
1821
1822 /* Send a message to either a local client or another server */
1823 static int send_message(void *buf, int msglen, const char *csid, int fd,
1824                         const char *errtext)
1825 {
1826         int len = 0;
1827         int saved_errno = 0;
1828         struct timespec delay;
1829         struct timespec remtime;
1830
1831         int retry_cnt = 0;
1832
1833         /* Send remote messages down the cluster socket */
1834         if (csid == NULL || !ISLOCAL_CSID(csid)) {
1835                 hton_clvm((struct clvm_header *) buf);
1836                 return clops->cluster_send_message(buf, msglen, csid, errtext);
1837         } else {
1838                 int ptr = 0;
1839
1840                 /* Make sure it all goes */
1841                 do {
1842                         if (retry_cnt > MAX_RETRIES)
1843                         {
1844                                 errno = saved_errno;
1845                                 log_error("%s", errtext);
1846                                 errno = saved_errno;
1847                                 break;
1848                         }
1849
1850                         len = write(fd, buf + ptr, msglen - ptr);
1851
1852                         if (len <= 0) {
1853                                 if (errno == EINTR)
1854                                         continue;
1855                                 if (errno == EAGAIN ||
1856                                     errno == EIO ||
1857                                     errno == ENOSPC) {
1858                                         saved_errno = errno;
1859                                         retry_cnt++;
1860
1861                                         delay.tv_sec = 0;
1862                                         delay.tv_nsec = 100000;
1863                                         remtime.tv_sec = 0;
1864                                         remtime.tv_nsec = 0;
1865                                         (void) nanosleep (&delay, &remtime);
1866
1867                                         continue;
1868                                 }
1869                                 log_error("%s", errtext);
1870                                 break;
1871                         }
1872                         ptr += len;
1873                 } while (ptr < msglen);
1874         }
1875         return len;
1876 }
1877
1878 static int process_work_item(struct lvm_thread_cmd *cmd)
1879 {
1880         /* If msg is NULL then this is a cleanup request */
1881         if (cmd->msg == NULL) {
1882                 DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
1883                 cmd_client_cleanup(cmd->client);
1884                 free(cmd->client);
1885                 return 0;
1886         }
1887
1888         if (!cmd->remote) {
1889                 DEBUGLOG("process_work_item: local\n");
1890                 process_local_command(cmd->msg, cmd->msglen, cmd->client,
1891                                       cmd->xid);
1892         } else {
1893                 DEBUGLOG("process_work_item: remote\n");
1894                 process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
1895                                        cmd->csid);
1896         }
1897         return 0;
1898 }
1899
1900 /*
1901  * Routine that runs in the "LVM thread".
1902  */
1903 static void lvm_thread_fn(void *arg)
1904 {
1905         struct dm_list *cmdl, *tmp;
1906         sigset_t ss;
1907         struct lvm_startup_params *lvm_params = arg;
1908
1909         DEBUGLOG("LVM thread function started\n");
1910
1911         /* Ignore SIGUSR1 & 2 */
1912         sigemptyset(&ss);
1913         sigaddset(&ss, SIGUSR1);
1914         sigaddset(&ss, SIGUSR2);
1915         pthread_sigmask(SIG_BLOCK, &ss, NULL);
1916
1917         /* Initialise the interface to liblvm */
1918         init_clvm(lvm_params->using_gulm, lvm_params->argv);
1919
1920         /* Allow others to get moving */
1921         pthread_mutex_unlock(&lvm_start_mutex);
1922
1923         /* Now wait for some actual work */
1924         for (;;) {
1925                 DEBUGLOG("LVM thread waiting for work\n");
1926
1927                 pthread_mutex_lock(&lvm_thread_mutex);
1928                 if (dm_list_empty(&lvm_cmd_head))
1929                         pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
1930
1931                 dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
1932                         struct lvm_thread_cmd *cmd;
1933
1934                         cmd =
1935                             dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
1936                         dm_list_del(&cmd->list);
1937                         pthread_mutex_unlock(&lvm_thread_mutex);
1938
1939                         process_work_item(cmd);
1940                         free(cmd->msg);
1941                         free(cmd);
1942
1943                         pthread_mutex_lock(&lvm_thread_mutex);
1944                 }
1945                 pthread_mutex_unlock(&lvm_thread_mutex);
1946         }
1947 }
1948
1949 /* Pass down some work to the LVM thread */
1950 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
1951                            int msglen, const char *csid)
1952 {
1953         struct lvm_thread_cmd *cmd;
1954
1955         cmd = malloc(sizeof(struct lvm_thread_cmd));
1956         if (!cmd)
1957                 return ENOMEM;
1958
1959         if (msglen) {
1960                 cmd->msg = malloc(msglen);
1961                 if (!cmd->msg) {
1962                         log_error("Unable to allocate buffer space\n");
1963                         free(cmd);
1964                         return -1;
1965                 }
1966                 memcpy(cmd->msg, msg, msglen);
1967         }
1968         else {
1969                 cmd->msg = NULL;
1970         }
1971         cmd->client = client;
1972         cmd->msglen = msglen;
1973         cmd->xid = client->xid;
1974
1975         if (csid) {
1976                 memcpy(cmd->csid, csid, max_csid_len);
1977                 cmd->remote = 1;
1978         } else {
1979                 cmd->remote = 0;
1980         }
1981
1982         DEBUGLOG
1983             ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
1984              cmd, client, msg, msglen, csid, cmd->xid);
1985         pthread_mutex_lock(&lvm_thread_mutex);
1986         dm_list_add(&lvm_cmd_head, &cmd->list);
1987         pthread_cond_signal(&lvm_thread_cond);
1988         pthread_mutex_unlock(&lvm_thread_mutex);
1989
1990         return 0;
1991 }
1992
1993 /* Return 0 if we can talk to an existing clvmd */
1994 static int check_local_clvmd(void)
1995 {
1996         int local_socket;
1997         struct sockaddr_un sockaddr;
1998         int ret = 0;
1999
2000         /* Open local socket */
2001         if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
2002                 return -1;
2003         }
2004
2005         memset(&sockaddr, 0, sizeof(sockaddr));
2006         memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
2007         sockaddr.sun_family = AF_UNIX;
2008
2009         if (connect(local_socket,(struct sockaddr *) &sockaddr,
2010                     sizeof(sockaddr))) {
2011                 ret = -1;
2012         }
2013
2014         close(local_socket);
2015         return ret;
2016 }
2017
2018 static void close_local_sock(int local_socket)
2019 {
2020         if (local_socket != -1 && close(local_socket))
2021                 stack;
2022
2023         if (CLVMD_SOCKNAME[0] != '\0' && unlink(CLVMD_SOCKNAME))
2024                 stack;
2025 }
2026
2027 /* Open the local socket, that's the one we talk to libclvm down */
2028 static int open_local_sock()
2029 {
2030         int local_socket = -1;
2031         struct sockaddr_un sockaddr;
2032         mode_t old_mask;
2033
2034         close_local_sock(local_socket);
2035
2036         (void) dm_prepare_selinux_context(CLVMD_SOCKNAME, S_IFSOCK);
2037         old_mask = umask(0077);
2038
2039         /* Open local socket */
2040         local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
2041         if (local_socket < 0) {
2042                 log_error("Can't create local socket: %m");
2043                 goto error;
2044         }
2045
2046         /* Set Close-on-exec & non-blocking */
2047         if (fcntl(local_socket, F_SETFD, 1))
2048                 DEBUGLOG("setting CLOEXEC on local_socket failed: %s\n", strerror(errno));
2049         fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
2050
2051         memset(&sockaddr, 0, sizeof(sockaddr));
2052         memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
2053         sockaddr.sun_family = AF_UNIX;
2054
2055         if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
2056                 log_error("can't bind local socket: %m");
2057                 goto error;
2058         }
2059         if (listen(local_socket, 1) != 0) {
2060                 log_error("listen local: %m");
2061                 goto error;
2062         }
2063
2064         umask(old_mask);
2065         (void) dm_prepare_selinux_context(NULL, 0);
2066         return local_socket;
2067 error:
2068         close_local_sock(local_socket);
2069         umask(old_mask);
2070         (void) dm_prepare_selinux_context(NULL, 0);
2071         return -1;
2072 }
2073
2074 void process_message(struct local_client *client, const char *buf, int len,
2075                      const char *csid)
2076 {
2077         struct clvm_header *inheader;
2078
2079         inheader = (struct clvm_header *) buf;
2080         ntoh_clvm(inheader);    /* Byteswap fields */
2081         if (inheader->cmd == CLVMD_CMD_REPLY)
2082                 process_reply(inheader, len, csid);
2083         else
2084                 add_to_lvmqueue(client, inheader, len, csid);
2085 }
2086
2087
2088 static void check_all_callback(struct local_client *client, const char *csid,
2089                                int node_up)
2090 {
2091         if (!node_up)
2092                 add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
2093                                   18);
2094 }
2095
2096 /* Check to see if all CLVMDs are running (ie one on
2097    every node in the cluster).
2098    If not, returns -1 and prints out a list of errant nodes */
2099 static int check_all_clvmds_running(struct local_client *client)
2100 {
2101         DEBUGLOG("check_all_clvmds_running\n");
2102         return clops->cluster_do_node_callback(client, check_all_callback);
2103 }
2104
2105 /* Return a local_client struct given a client ID.
2106    client IDs are in network byte order */
2107 static struct local_client *find_client(int clientid)
2108 {
2109         struct local_client *thisfd;
2110         for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
2111                 if (thisfd->fd == ntohl(clientid))
2112                         return thisfd;
2113         }
2114         return NULL;
2115 }
2116
2117 /* Byte-swapping routines for the header so we
2118    work in a heterogeneous environment */
2119 static void hton_clvm(struct clvm_header *hdr)
2120 {
2121         hdr->status = htonl(hdr->status);
2122         hdr->arglen = htonl(hdr->arglen);
2123         hdr->xid = htons(hdr->xid);
2124         /* Don't swap clientid as it's only a token as far as
2125            remote nodes are concerned */
2126 }
2127
2128 static void ntoh_clvm(struct clvm_header *hdr)
2129 {
2130         hdr->status = ntohl(hdr->status);
2131         hdr->arglen = ntohl(hdr->arglen);
2132         hdr->xid = ntohs(hdr->xid);
2133 }
2134
2135 /* Handler for SIGUSR2 - sent to kill subthreads */
2136 static void sigusr2_handler(int sig)
2137 {
2138         DEBUGLOG("SIGUSR2 received\n");
2139         return;
2140 }
2141
2142 static void sigterm_handler(int sig)
2143 {
2144         DEBUGLOG("SIGTERM received\n");
2145         quit = 1;
2146         return;
2147 }
2148
2149 static void sighup_handler(int sig)
2150 {
2151         DEBUGLOG("got SIGHUP\n");
2152         reread_config = 1;
2153 }
2154
2155 int sync_lock(const char *resource, int mode, int flags, int *lockid)
2156 {
2157         return clops->sync_lock(resource, mode, flags, lockid);
2158 }
2159
2160 int sync_unlock(const char *resource, int lockid)
2161 {
2162         return clops->sync_unlock(resource, lockid);
2163 }
2164
2165 static if_type_t parse_cluster_interface(char *ifname)
2166 {
2167         if_type_t iface = IF_AUTO;
2168
2169         if (!strcmp(ifname, "auto"))
2170                 iface = IF_AUTO;
2171         if (!strcmp(ifname, "cman"))
2172                 iface = IF_CMAN;
2173         if (!strcmp(ifname, "gulm"))
2174                 iface = IF_GULM;
2175         if (!strcmp(ifname, "openais"))
2176                 iface = IF_OPENAIS;
2177         if (!strcmp(ifname, "corosync"))
2178                 iface = IF_COROSYNC;
2179         if (!strcmp(ifname, "singlenode"))
2180                 iface = IF_SINGLENODE;
2181
2182         return iface;
2183 }
2184
2185 /*
2186  * Try and find a cluster system in corosync's objdb, if it is running. This is
2187  * only called if the command-line option is not present, and if it fails
2188  * we still try the interfaces in order.
2189  */
2190 static if_type_t get_cluster_type()
2191 {
2192 #ifdef HAVE_COROSYNC_CONFDB_H
2193         confdb_handle_t handle;
2194         if_type_t type = IF_AUTO;
2195         int result;
2196         char buf[255];
2197         size_t namelen = sizeof(buf);
2198         hdb_handle_t cluster_handle;
2199         hdb_handle_t clvmd_handle;
2200         confdb_callbacks_t callbacks = {
2201                 .confdb_key_change_notify_fn = NULL,
2202                 .confdb_object_create_change_notify_fn = NULL,
2203                 .confdb_object_delete_change_notify_fn = NULL
2204         };
2205
2206         result = confdb_initialize (&handle, &callbacks);
2207         if (result != CS_OK)
2208                 return type;
2209
2210         result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
2211         if (result != CS_OK)
2212                 goto out;
2213
2214         result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
2215         if (result != CS_OK)
2216                 goto out;
2217
2218         result = confdb_object_find_start(handle, cluster_handle);
2219         if (result != CS_OK)
2220                 goto out;
2221
2222         result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
2223         if (result != CS_OK)
2224                 goto out;
2225
2226         result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
2227         if (result != CS_OK)
2228                 goto out;
2229
2230         buf[namelen] = '\0';
2231         type = parse_cluster_interface(buf);
2232         DEBUGLOG("got interface type '%s' from confdb\n", buf);
2233 out:
2234         confdb_finalize(handle);
2235         return type;
2236 #else
2237         return IF_AUTO;
2238 #endif
2239 }