Tizen 2.1 base
[external/device-mapper.git] / daemons / cmirrord / cluster.c
1 /*
2  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
3  *
4  * This copyrighted material is made available to anyone wishing to use,
5  * modify, copy, or redistribute it subject to the terms and conditions
6  * of the GNU Lesser General Public License v.2.1.
7  *
8  * You should have received a copy of the GNU Lesser General Public License
9  * along with this program; if not, write to the Free Software Foundation,
10  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
11  */
12 #include "logging.h"
13 #include "cluster.h"
14 #include "common.h"
15 #include "compat.h"
16 #include "functions.h"
17 #include "link_mon.h"
18 #include "local.h"
19 #include "xlate.h"
20
21 #include <corosync/cpg.h>
22 #include <errno.h>
23 #include <openais/saAis.h>
24 #include <openais/saCkpt.h>
25 #include <signal.h>
26 #include <unistd.h>
27
28 /* Open AIS error codes */
29 #define str_ais_error(x)                                                \
30         ((x) == SA_AIS_OK) ? "SA_AIS_OK" :                              \
31         ((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" :            \
32         ((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" :            \
33         ((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" :                  \
34         ((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" :            \
35         ((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" :        \
36         ((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \
37         ((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" :        \
38         ((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" :      \
39         ((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" :                  \
40         ((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" :              \
41         ((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" :        \
42         ((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \
43         ((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" :                \
44         ((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" :          \
45         ((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" :        \
46         ((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \
47         ((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" :  \
48         ((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \
49         ((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \
50         ((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \
51         ((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \
52         ((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" :      \
53         ((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \
54         ((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" :        \
55         ((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" :            \
56         ((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" :    \
57         "ais_error_unknown"
58
59 #define _RQ_TYPE(x)                                                     \
60         ((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \
61         ((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN":           \
62         RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
63
64 static uint32_t my_cluster_id = 0xDEAD;
65 static SaCkptHandleT ckpt_handle = 0;
66 static SaCkptCallbacksT callbacks = { 0, 0 };
67 static SaVersionT version = { 'B', 1, 1 };
68
69 #define DEBUGGING_HISTORY 100
70 //static char debugging[DEBUGGING_HISTORY][128];
71 //static int idx = 0;
72 #define LOG_SPRINT(cc, f, arg...) do {                          \
73                 cc->idx++;                                      \
74                 cc->idx = cc->idx % DEBUGGING_HISTORY;          \
75                 sprintf(cc->debugging[cc->idx], f, ## arg);     \
76         } while (0)
77
78 static int log_resp_rec = 0;
79
80 struct checkpoint_data {
81         uint32_t requester;
82         char uuid[CPG_MAX_NAME_LENGTH];
83
84         int bitmap_size; /* in bytes */
85         char *sync_bits;
86         char *clean_bits;
87         char *recovering_region;
88         struct checkpoint_data *next;
89 };      
90
91 #define INVALID 0
92 #define VALID   1
93 #define LEAVING 2
94
95 #define MAX_CHECKPOINT_REQUESTERS 10
96 struct clog_cpg {
97         struct dm_list list;
98
99         uint32_t lowest_id;
100         cpg_handle_t handle;
101         struct cpg_name name;
102         uint64_t luid;
103
104         /* Are we the first, or have we received checkpoint? */
105         int state;
106         int cpg_state;  /* FIXME: debugging */
107         int free_me;
108         int delay;
109         int resend_requests;
110         struct dm_list startup_list;
111         struct dm_list working_list;
112
113         int checkpoints_needed;
114         uint32_t checkpoint_requesters[MAX_CHECKPOINT_REQUESTERS];
115         struct checkpoint_data *checkpoint_list;
116         int idx;
117         char debugging[DEBUGGING_HISTORY][128];
118 };
119
120 static struct dm_list clog_cpg_list;
121
122 /*
123  * cluster_send
124  * @rq
125  *
126  * Returns: 0 on success, -Exxx on error
127  */
128 int cluster_send(struct clog_request *rq)
129 {
130         int r;
131         int count=0;
132         int found = 0;
133         struct iovec iov;
134         struct clog_cpg *entry;
135
136         dm_list_iterate_items(entry, &clog_cpg_list)
137                 if (!strncmp(entry->name.value, rq->u_rq.uuid,
138                              CPG_MAX_NAME_LENGTH)) {
139                         found = 1;
140                         break;
141                 }
142
143         if (!found) {
144                 rq->u_rq.error = -ENOENT;
145                 return -ENOENT;
146         }
147
148         /*
149          * Once the request heads for the cluster, the luid looses
150          * all its meaning.
151          */
152         rq->u_rq.luid = 0;
153
154         iov.iov_base = rq;
155         iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size;
156
157         rq->u.version[0] = xlate64(CLOG_TFR_VERSION);
158         rq->u.version[1] = CLOG_TFR_VERSION;
159
160         r = clog_request_to_network(rq);
161         if (r < 0)
162                 /* FIXME: Better error code for byteswap failure? */
163                 return -EINVAL;
164
165         if (entry->cpg_state != VALID)
166                 return -EINVAL;
167
168         do {
169                 r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
170                 if (r != SA_AIS_ERR_TRY_AGAIN)
171                         break;
172                 count++;
173                 if (count < 10)
174                         LOG_PRINT("[%s]  Retry #%d of cpg_mcast_joined: %s",
175                                   SHORT_UUID(rq->u_rq.uuid), count,
176                                   str_ais_error(r));
177                 else if ((count < 100) && !(count % 10))
178                         LOG_ERROR("[%s]  Retry #%d of cpg_mcast_joined: %s",
179                                   SHORT_UUID(rq->u_rq.uuid), count,
180                                   str_ais_error(r));
181                 else if ((count < 1000) && !(count % 100))
182                         LOG_ERROR("[%s]  Retry #%d of cpg_mcast_joined: %s",
183                                   SHORT_UUID(rq->u_rq.uuid), count,
184                                   str_ais_error(r));
185                 else if ((count < 10000) && !(count % 1000))
186                         LOG_ERROR("[%s]  Retry #%d of cpg_mcast_joined: %s - "
187                                   "OpenAIS not handling the load?",
188                                   SHORT_UUID(rq->u_rq.uuid), count,
189                                   str_ais_error(r));
190                 usleep(1000);
191         } while (1);
192
193         if (r == CPG_OK)
194                 return 0;
195
196         /* error codes found in openais/cpg.h */
197         LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r));
198
199         rq->u_rq.error = -EBADE;
200         return -EBADE;
201 }
202
203 static struct clog_request *get_matching_rq(struct clog_request *rq,
204                                             struct dm_list *l)
205 {
206         struct clog_request *match, *n;
207
208         dm_list_iterate_items_gen_safe(match, n, l, u.list)
209                 if (match->u_rq.seq == rq->u_rq.seq) {
210                         dm_list_del(&match->u.list);
211                         return match;
212                 }
213
214         return NULL;
215 }
216
217 static char rq_buffer[DM_ULOG_REQUEST_SIZE];
218 static int handle_cluster_request(struct clog_cpg *entry __attribute__((unused)),
219                                   struct clog_request *rq, int server)
220 {
221         int r = 0;
222         struct clog_request *tmp = (struct clog_request *)rq_buffer;
223
224         /*
225          * We need a separate dm_ulog_request struct, one that can carry
226          * a return payload.  Otherwise, the memory address after
227          * rq will be altered - leading to problems
228          */
229         memset(rq_buffer, 0, sizeof(rq_buffer));
230         memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size);
231
232         /*
233          * With resumes, we only handle our own.
234          * Resume is a special case that requires
235          * local action (to set up CPG), followed by
236          * a cluster action to co-ordinate reading
237          * the disk and checkpointing
238          */
239         if (tmp->u_rq.request_type == DM_ULOG_RESUME) {
240                 if (tmp->originator == my_cluster_id) {
241                         r = do_request(tmp, server);
242
243                         r = kernel_send(&tmp->u_rq);
244                         if (r < 0)
245                                 LOG_ERROR("Failed to send resume response to kernel");
246                 }
247                 return r;
248         }
249
250         r = do_request(tmp, server);
251
252         if (server &&
253             (tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
254             (tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) {
255                 tmp->u_rq.request_type |= DM_ULOG_RESPONSE;
256
257                 /*
258                  * Errors from previous functions are in the rq struct.
259                  */
260                 r = cluster_send(tmp);
261                 if (r < 0)
262                         LOG_ERROR("cluster_send failed: %s", strerror(-r));
263         }
264
265         return r;
266 }
267
268 static int handle_cluster_response(struct clog_cpg *entry,
269                                    struct clog_request *rq)
270 {
271         int r = 0;
272         struct clog_request *orig_rq;
273
274         /*
275          * If I didn't send it, then I don't care about the response
276          */
277         if (rq->originator != my_cluster_id)
278                 return 0;
279
280         rq->u_rq.request_type &= ~DM_ULOG_RESPONSE;
281         orig_rq = get_matching_rq(rq, &entry->working_list);
282
283         if (!orig_rq) {
284                 /* Unable to find match for response */
285
286                 LOG_ERROR("[%s] No match for cluster response: %s:%u",
287                           SHORT_UUID(rq->u_rq.uuid),
288                           _RQ_TYPE(rq->u_rq.request_type),
289                           rq->u_rq.seq);
290
291                 LOG_ERROR("Current local list:");
292                 if (dm_list_empty(&entry->working_list))
293                         LOG_ERROR("   [none]");
294
295                 dm_list_iterate_items_gen(orig_rq, &entry->working_list, u.list)
296                         LOG_ERROR("   [%s]  %s:%u",
297                                   SHORT_UUID(orig_rq->u_rq.uuid),
298                                   _RQ_TYPE(orig_rq->u_rq.request_type),
299                                   orig_rq->u_rq.seq);
300
301                 return -EINVAL;
302         }
303
304         if (log_resp_rec > 0) {
305                 LOG_COND(log_resend_requests,
306                          "[%s] Response received to %s/#%u",
307                          SHORT_UUID(rq->u_rq.uuid),
308                          _RQ_TYPE(rq->u_rq.request_type),
309                          rq->u_rq.seq);
310                 log_resp_rec--;
311         }
312
313         /* FIXME: Ensure memcpy cannot explode */
314         memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
315
316         r = kernel_send(&orig_rq->u_rq);
317         if (r)
318                 LOG_ERROR("Failed to send response to kernel");
319
320         free(orig_rq);
321         return r;
322 }
323
324 static struct clog_cpg *find_clog_cpg(cpg_handle_t handle)
325 {
326         struct clog_cpg *match;
327
328         dm_list_iterate_items(match, &clog_cpg_list)
329                 if (match->handle == handle)
330                         return match;
331
332         return NULL;
333 }
334
335 /*
336  * prepare_checkpoint
337  * @entry: clog_cpg describing the log
338  * @cp_requester: nodeid requesting the checkpoint
339  *
340  * Creates and fills in a new checkpoint_data struct.
341  *
342  * Returns: checkpoint_data on success, NULL on error
343  */
344 static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
345                                                   uint32_t cp_requester)
346 {
347         int r;
348         struct checkpoint_data *new;
349
350         if (entry->state != VALID) {
351                 /*
352                  * We can't store bitmaps yet, because the log is not
353                  * valid yet.
354                  */
355                 LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
356                           cp_requester);
357                 return NULL;
358         }
359
360         new = malloc(sizeof(*new));
361         if (!new) {
362                 LOG_ERROR("Unable to create checkpoint data for %u",
363                           cp_requester);
364                 return NULL;
365         }
366         memset(new, 0, sizeof(*new));
367         new->requester = cp_requester;
368         strncpy(new->uuid, entry->name.value, entry->name.length);
369
370         new->bitmap_size = push_state(entry->name.value, entry->luid,
371                                       "clean_bits",
372                                       &new->clean_bits, cp_requester);
373         if (new->bitmap_size <= 0) {
374                 LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
375                           new->requester);
376                 free(new);
377                 return NULL;
378         }
379
380         new->bitmap_size = push_state(entry->name.value, entry->luid,
381                                       "sync_bits",
382                                       &new->sync_bits, cp_requester);
383         if (new->bitmap_size <= 0) {
384                 LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
385                           new->requester);
386                 free(new->clean_bits);
387                 free(new);
388                 return NULL;
389         }
390
391         r = push_state(entry->name.value, entry->luid,
392                        "recovering_region",
393                        &new->recovering_region, cp_requester);
394         if (r <= 0) {
395                 LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
396                           new->requester);
397                 free(new->sync_bits);
398                 free(new->clean_bits);
399                 free(new);
400                 return NULL;
401         }
402         LOG_DBG("[%s] Checkpoint prepared for node %u:",
403                 SHORT_UUID(new->uuid), new->requester);
404         LOG_DBG("  bitmap_size = %d", new->bitmap_size);
405
406         return new;
407 }
408
409 /*
410  * free_checkpoint
411  * @cp: the checkpoint_data struct to free
412  *
413  */
414 static void free_checkpoint(struct checkpoint_data *cp)
415 {
416         free(cp->recovering_region);
417         free(cp->sync_bits);
418         free(cp->clean_bits);
419         free(cp);
420 }
421
422 static int export_checkpoint(struct checkpoint_data *cp)
423 {
424         SaCkptCheckpointCreationAttributesT attr;
425         SaCkptCheckpointHandleT h;
426         SaCkptSectionIdT section_id;
427         SaCkptSectionCreationAttributesT section_attr;
428         SaCkptCheckpointOpenFlagsT flags;
429         SaNameT name;
430         SaAisErrorT rv;
431         struct clog_request *rq;
432         int len, r = 0;
433         char buf[32];
434
435         LOG_DBG("Sending checkpointed data to %u", cp->requester);
436
437         len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH,
438                        "bitmaps_%s_%u", SHORT_UUID(cp->uuid), cp->requester);
439         name.length = (SaUint16T)len;
440
441         len = (int)strlen(cp->recovering_region) + 1;
442
443         attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
444         attr.checkpointSize = cp->bitmap_size * 2 + len;
445
446         attr.retentionDuration = SA_TIME_MAX;
447         attr.maxSections = 4;      /* don't know why we need +1 */
448
449         attr.maxSectionSize = (cp->bitmap_size > len) ? cp->bitmap_size : len;
450         attr.maxSectionIdSize = 22;
451
452         flags = SA_CKPT_CHECKPOINT_READ |
453                 SA_CKPT_CHECKPOINT_WRITE |
454                 SA_CKPT_CHECKPOINT_CREATE;
455
456 open_retry:
457         rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h);
458         if (rv == SA_AIS_ERR_TRY_AGAIN) {
459                 LOG_ERROR("export_checkpoint: ckpt open retry");
460                 usleep(1000);
461                 goto open_retry;
462         }
463
464         if (rv == SA_AIS_ERR_EXIST) {
465                 LOG_DBG("export_checkpoint: checkpoint already exists");
466                 return -EEXIST;
467         }
468
469         if (rv != SA_AIS_OK) {
470                 LOG_ERROR("[%s] Failed to open checkpoint for %u: %s",
471                           SHORT_UUID(cp->uuid), cp->requester,
472                           str_ais_error(rv));
473                 return -EIO; /* FIXME: better error */
474         }
475
476         /*
477          * Add section for sync_bits
478          */
479         section_id.idLen = (SaUint16T)snprintf(buf, 32, "sync_bits");
480         section_id.id = (unsigned char *)buf;
481         section_attr.sectionId = &section_id;
482         section_attr.expirationTime = SA_TIME_END;
483
484 sync_create_retry:
485         rv = saCkptSectionCreate(h, &section_attr,
486                                  cp->sync_bits, cp->bitmap_size);
487         if (rv == SA_AIS_ERR_TRY_AGAIN) {
488                 LOG_ERROR("Sync checkpoint section create retry");
489                 usleep(1000);
490                 goto sync_create_retry;
491         }
492
493         if (rv == SA_AIS_ERR_EXIST) {
494                 LOG_DBG("Sync checkpoint section already exists");
495                 saCkptCheckpointClose(h);
496                 return -EEXIST;
497         }
498
499         if (rv != SA_AIS_OK) {
500                 LOG_ERROR("Sync checkpoint section creation failed: %s",
501                           str_ais_error(rv));
502                 saCkptCheckpointClose(h);
503                 return -EIO; /* FIXME: better error */
504         }
505
506         /*
507          * Add section for clean_bits
508          */
509         section_id.idLen = snprintf(buf, 32, "clean_bits");
510         section_id.id = (unsigned char *)buf;
511         section_attr.sectionId = &section_id;
512         section_attr.expirationTime = SA_TIME_END;
513
514 clean_create_retry:
515         rv = saCkptSectionCreate(h, &section_attr, cp->clean_bits, cp->bitmap_size);
516         if (rv == SA_AIS_ERR_TRY_AGAIN) {
517                 LOG_ERROR("Clean checkpoint section create retry");
518                 usleep(1000);
519                 goto clean_create_retry;
520         }
521
522         if (rv == SA_AIS_ERR_EXIST) {
523                 LOG_DBG("Clean checkpoint section already exists");
524                 saCkptCheckpointClose(h);
525                 return -EEXIST;
526         }
527
528         if (rv != SA_AIS_OK) {
529                 LOG_ERROR("Clean checkpoint section creation failed: %s",
530                           str_ais_error(rv));
531                 saCkptCheckpointClose(h);
532                 return -EIO; /* FIXME: better error */
533         }
534
535         /*
536          * Add section for recovering_region
537          */
538         section_id.idLen = snprintf(buf, 32, "recovering_region");
539         section_id.id = (unsigned char *)buf;
540         section_attr.sectionId = &section_id;
541         section_attr.expirationTime = SA_TIME_END;
542
543 rr_create_retry:
544         rv = saCkptSectionCreate(h, &section_attr, cp->recovering_region,
545                                  strlen(cp->recovering_region) + 1);
546         if (rv == SA_AIS_ERR_TRY_AGAIN) {
547                 LOG_ERROR("RR checkpoint section create retry");
548                 usleep(1000);
549                 goto rr_create_retry;
550         }
551
552         if (rv == SA_AIS_ERR_EXIST) {
553                 LOG_DBG("RR checkpoint section already exists");
554                 saCkptCheckpointClose(h);
555                 return -EEXIST;
556         }
557
558         if (rv != SA_AIS_OK) {
559                 LOG_ERROR("RR checkpoint section creation failed: %s",
560                           str_ais_error(rv));
561                 saCkptCheckpointClose(h);
562                 return -EIO; /* FIXME: better error */
563         }
564
565         LOG_DBG("export_checkpoint: closing checkpoint");
566         saCkptCheckpointClose(h);
567
568         rq = malloc(DM_ULOG_REQUEST_SIZE);
569         if (!rq) {
570                 LOG_ERROR("export_checkpoint: Unable to allocate transfer structs");
571                 return -ENOMEM;
572         }
573         memset(rq, 0, sizeof(*rq));
574
575         dm_list_init(&rq->u.list);
576         rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
577         rq->originator = cp->requester;  /* FIXME: hack to overload meaning of originator */
578         strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
579         rq->u_rq.seq = my_cluster_id;
580
581         r = cluster_send(rq);
582         if (r)
583                 LOG_ERROR("Failed to send checkpoint ready notice: %s",
584                           strerror(-r));
585
586         free(rq);
587         return 0;
588 }
589
590 static int import_checkpoint(struct clog_cpg *entry, int no_read)
591 {
592         int rtn = 0;
593         SaCkptCheckpointHandleT h;
594         SaCkptSectionIterationHandleT itr;
595         SaCkptSectionDescriptorT desc;
596         SaCkptIOVectorElementT iov;
597         SaNameT name;
598         SaAisErrorT rv;
599         char *bitmap = NULL;
600         int len;
601
602         bitmap = malloc(1024*1024);
603         if (!bitmap)
604                 return -ENOMEM;
605
606         len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
607                        SHORT_UUID(entry->name.value), my_cluster_id);
608         name.length = (SaUint16T)len;
609
610 open_retry:
611         rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
612                                   SA_CKPT_CHECKPOINT_READ, 0, &h);
613         if (rv == SA_AIS_ERR_TRY_AGAIN) {
614                 LOG_ERROR("import_checkpoint: ckpt open retry");
615                 usleep(1000);
616                 goto open_retry;
617         }
618
619         if (rv != SA_AIS_OK) {
620                 LOG_ERROR("[%s] Failed to open checkpoint: %s",
621                           SHORT_UUID(entry->name.value), str_ais_error(rv));
622                 return -EIO; /* FIXME: better error */
623         }
624
625 unlink_retry:
626         rv = saCkptCheckpointUnlink(ckpt_handle, &name);
627         if (rv == SA_AIS_ERR_TRY_AGAIN) {
628                 LOG_ERROR("import_checkpoint: ckpt unlink retry");
629                 usleep(1000);
630                 goto unlink_retry;
631         }
632
633         if (no_read) {
634                 LOG_DBG("Checkpoint for this log already received");
635                 goto no_read;
636         }
637
638 init_retry:
639         rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
640                                               SA_TIME_END, &itr);
641         if (rv == SA_AIS_ERR_TRY_AGAIN) {
642                 LOG_ERROR("import_checkpoint: sync create retry");
643                 usleep(1000);
644                 goto init_retry;
645         }
646
647         if (rv != SA_AIS_OK) {
648                 LOG_ERROR("[%s] Sync checkpoint section creation failed: %s",
649                           SHORT_UUID(entry->name.value), str_ais_error(rv));
650                 return -EIO; /* FIXME: better error */
651         }
652
653         len = 0;
654         while (1) {
655                 rv = saCkptSectionIterationNext(itr, &desc);
656                 if (rv == SA_AIS_OK)
657                         len++;
658                 else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len)
659                         break;
660                 else if (rv != SA_AIS_ERR_TRY_AGAIN) {
661                         LOG_ERROR("saCkptSectionIterationNext failure: %d", rv);
662                         break;
663                 }
664         }
665         saCkptSectionIterationFinalize(itr);
666         if (len != 3) {
667                 LOG_ERROR("import_checkpoint: %d checkpoint sections found",
668                           len);
669                 usleep(1000);
670                 goto init_retry;
671         }
672         saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
673                                          SA_TIME_END, &itr);
674
675         while (1) {
676                 rv = saCkptSectionIterationNext(itr, &desc);
677                 if (rv == SA_AIS_ERR_NO_SECTIONS)
678                         break;
679
680                 if (rv == SA_AIS_ERR_TRY_AGAIN) {
681                         LOG_ERROR("import_checkpoint: ckpt iternext retry");
682                         usleep(1000);
683                         continue;
684                 }
685
686                 if (rv != SA_AIS_OK) {
687                         LOG_ERROR("import_checkpoint: clean checkpoint section "
688                                   "creation failed: %s", str_ais_error(rv));
689                         rtn = -EIO; /* FIXME: better error */
690                         goto fail;
691                 }
692
693                 if (!desc.sectionSize) {
694                         LOG_ERROR("Checkpoint section empty");
695                         continue;
696                 }
697
698                 memset(bitmap, 0, sizeof(*bitmap));
699                 iov.sectionId = desc.sectionId;
700                 iov.dataBuffer = bitmap;
701                 iov.dataSize = desc.sectionSize;
702                 iov.dataOffset = 0;
703
704         read_retry:
705                 rv = saCkptCheckpointRead(h, &iov, 1, NULL);
706                 if (rv == SA_AIS_ERR_TRY_AGAIN) {
707                         LOG_ERROR("ckpt read retry");
708                         usleep(1000);
709                         goto read_retry;
710                 }
711
712                 if (rv != SA_AIS_OK) {
713                         LOG_ERROR("import_checkpoint: ckpt read error: %s",
714                                   str_ais_error(rv));
715                         rtn = -EIO; /* FIXME: better error */
716                         goto fail;
717                 }
718
719                 if (iov.readSize) {
720                         if (pull_state(entry->name.value, entry->luid,
721                                        (char *)desc.sectionId.id, bitmap,
722                                        iov.readSize)) {
723                                 LOG_ERROR("Error loading state");
724                                 rtn = -EIO;
725                                 goto fail;
726                         }
727                 } else {
728                         /* Need to request new checkpoint */
729                         rtn = -EAGAIN;
730                         goto fail;
731                 }
732         }
733
734 fail:
735         saCkptSectionIterationFinalize(itr);
736 no_read:
737         saCkptCheckpointClose(h);
738
739         free(bitmap);
740         return rtn;
741 }
742
743 static void do_checkpoints(struct clog_cpg *entry, int leaving)
744 {
745         struct checkpoint_data *cp;
746
747         for (cp = entry->checkpoint_list; cp;) {
748                 /*
749                  * FIXME: Check return code.  Could send failure
750                  * notice in rq in export_checkpoint function
751                  * by setting rq->error
752                  */
753                 switch (export_checkpoint(cp)) {
754                 case -EEXIST:
755                         LOG_SPRINT(entry, "[%s] Checkpoint for %u already handled%s",
756                                    SHORT_UUID(entry->name.value), cp->requester,
757                                    (leaving) ? "(L)": "");
758                         LOG_COND(log_checkpoint,
759                                  "[%s] Checkpoint for %u already handled%s",
760                                  SHORT_UUID(entry->name.value), cp->requester,
761                                  (leaving) ? "(L)": "");
762                         entry->checkpoint_list = cp->next;
763                         free_checkpoint(cp);
764                         cp = entry->checkpoint_list;
765                         break;
766                 case 0:
767                         LOG_SPRINT(entry, "[%s] Checkpoint data available for node %u%s",
768                                    SHORT_UUID(entry->name.value), cp->requester,
769                                    (leaving) ? "(L)": "");
770                         LOG_COND(log_checkpoint,
771                                  "[%s] Checkpoint data available for node %u%s",
772                                  SHORT_UUID(entry->name.value), cp->requester,
773                                  (leaving) ? "(L)": "");
774                         entry->checkpoint_list = cp->next;
775                         free_checkpoint(cp);
776                         cp = entry->checkpoint_list;
777                         break;
778                 default:
779                         /* FIXME: Skipping will cause list corruption */
780                         LOG_ERROR("[%s] Failed to export checkpoint for %u%s",
781                                   SHORT_UUID(entry->name.value), cp->requester,
782                                   (leaving) ? "(L)": "");
783                 }
784         }
785 }
786
787 static int resend_requests(struct clog_cpg *entry)
788 {
789         int r = 0;
790         struct clog_request *rq, *n;
791
792         if (!entry->resend_requests || entry->delay)
793                 return 0;
794
795         if (entry->state != VALID)
796                 return 0;
797
798         entry->resend_requests = 0;
799
800         dm_list_iterate_items_gen_safe(rq, n, &entry->working_list, u.list) {
801                 dm_list_del(&rq->u.list);
802
803                 if (strcmp(entry->name.value, rq->u_rq.uuid)) {
804                         LOG_ERROR("[%s]  Stray request from another log (%s)",
805                                   SHORT_UUID(entry->name.value),
806                                   SHORT_UUID(rq->u_rq.uuid));
807                         free(rq);
808                         continue;
809                 }
810
811                 switch (rq->u_rq.request_type) {
812                 case DM_ULOG_SET_REGION_SYNC:
813                         /*
814                          * Some requests simply do not need to be resent.
815                          * If it is a request that just changes log state,
816                          * then it doesn't need to be resent (everyone makes
817                          * updates).
818                          */
819                         LOG_COND(log_resend_requests,
820                                  "[%s] Skipping resend of %s/#%u...",
821                                  SHORT_UUID(entry->name.value),
822                                  _RQ_TYPE(rq->u_rq.request_type),
823                                  rq->u_rq.seq);
824                         LOG_SPRINT(entry, "###  No resend: [%s] %s/%u ###",
825                                    SHORT_UUID(entry->name.value),
826                                    _RQ_TYPE(rq->u_rq.request_type),
827                                    rq->u_rq.seq);
828
829                         rq->u_rq.data_size = 0;
830                         kernel_send(&rq->u_rq);
831                                 
832                         break;
833                         
834                 default:
835                         /*
836                          * If an action or a response is required, then
837                          * the request must be resent.
838                          */
839                         LOG_COND(log_resend_requests,
840                                  "[%s] Resending %s(#%u) due to new server(%u)",
841                                  SHORT_UUID(entry->name.value),
842                                  _RQ_TYPE(rq->u_rq.request_type),
843                                  rq->u_rq.seq, entry->lowest_id);
844                         LOG_SPRINT(entry, "***  Resending: [%s] %s/%u ***",
845                                    SHORT_UUID(entry->name.value),
846                                    _RQ_TYPE(rq->u_rq.request_type),
847                                    rq->u_rq.seq);
848                         r = cluster_send(rq);
849                         if (r < 0)
850                                 LOG_ERROR("Failed resend");
851                 }
852                 free(rq);
853         }
854
855         return r;
856 }
857
858 static int do_cluster_work(void *data __attribute__((unused)))
859 {
860         int r = SA_AIS_OK;
861         struct clog_cpg *entry, *tmp;
862
863         dm_list_iterate_items_safe(entry, tmp, &clog_cpg_list) {
864                 r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
865                 if (r != SA_AIS_OK)
866                         LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
867
868                 if (entry->free_me) {
869                         free(entry);
870                         continue;
871                 }
872                 do_checkpoints(entry, 0);
873
874                 resend_requests(entry);
875         }
876
877         return (r == SA_AIS_OK) ? 0 : -1;  /* FIXME: good error number? */
878 }
879
880 static int flush_startup_list(struct clog_cpg *entry)
881 {
882         int r = 0;
883         int i_was_server;
884         struct clog_request *rq, *n;
885         struct checkpoint_data *new;
886
887         dm_list_iterate_items_gen_safe(rq, n, &entry->startup_list, u.list) {
888                 dm_list_del(&rq->u.list);
889
890                 if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) {
891                         new = prepare_checkpoint(entry, rq->originator);
892                         if (!new) {
893                                 /*
894                                  * FIXME: Need better error handling.  Other nodes
895                                  * will be trying to send the checkpoint too, and we
896                                  * must continue processing the list; so report error
897                                  * but continue.
898                                  */
899                                 LOG_ERROR("Failed to prepare checkpoint for %u!!!",
900                                           rq->originator);
901                                 free(rq);
902                                 continue;
903                         }
904                         LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u",
905                                    SHORT_UUID(entry->name.value), rq->originator);
906                         LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u",
907                                  SHORT_UUID(entry->name.value), rq->originator);
908                         new->next = entry->checkpoint_list;
909                         entry->checkpoint_list = new;
910                 } else {
911                         LOG_DBG("[%s] Processing delayed request: %s",
912                                 SHORT_UUID(rq->u_rq.uuid),
913                                 _RQ_TYPE(rq->u_rq.request_type));
914                         i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0;
915                         r = handle_cluster_request(entry, rq, i_was_server);
916
917                         if (r)
918                                 /*
919                                  * FIXME: If we error out here, we will never get
920                                  * another opportunity to retry these requests
921                                  */
922                                 LOG_ERROR("Error while processing delayed CPG message");
923                 }
924                 free(rq);
925         }
926
927         return 0;
928 }
929
930 static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gname __attribute__((unused)),
931                                  uint32_t nodeid, uint32_t pid __attribute__((unused)),
932                                  void *msg, size_t msg_len)
933 {
934         int i;
935         int r = 0;
936         int i_am_server;
937         int response = 0;
938         struct clog_request *rq = msg;
939         struct clog_request *tmp_rq;
940         struct clog_cpg *match;
941
942         if (clog_request_from_network(rq, msg_len) < 0)
943                 /* Error message comes from 'clog_request_from_network' */
944                 return;
945
946         match = find_clog_cpg(handle);
947         if (!match) {
948                 LOG_ERROR("Unable to find clog_cpg for cluster message");
949                 return;
950         }
951
952         if ((nodeid == my_cluster_id) &&
953             !(rq->u_rq.request_type & DM_ULOG_RESPONSE) &&
954             (rq->u_rq.request_type != DM_ULOG_RESUME) &&
955             (rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
956             (rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) {
957                 tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
958                 if (!tmp_rq) {
959                         /*
960                          * FIXME: It may be possible to continue... but we
961                          * would not be able to resend any messages that might
962                          * be necessary during membership changes
963                          */
964                         LOG_ERROR("[%s] Unable to record request: -ENOMEM",
965                                   SHORT_UUID(rq->u_rq.uuid));
966                         return;
967                 }
968                 memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
969                 dm_list_init(&tmp_rq->u.list);
970                 dm_list_add( &match->working_list, &tmp_rq->u.list);
971         }
972
973         if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) {
974                 /*
975                  * If the server (lowest_id) indicates it is leaving,
976                  * then we must resend any outstanding requests.  However,
977                  * we do not want to resend them if the next server in
978                  * line is in the process of leaving.
979                  */
980                 if (nodeid == my_cluster_id) {
981                         LOG_COND(log_resend_requests, "[%s] I am leaving.1.....",
982                                  SHORT_UUID(rq->u_rq.uuid));
983                 } else {
984                         if (nodeid < my_cluster_id) {
985                                 if (nodeid == match->lowest_id) {
986                                         match->resend_requests = 1;
987                                         LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s",
988                                                  SHORT_UUID(rq->u_rq.uuid), nodeid,
989                                                  (dm_list_empty(&match->working_list)) ? " -- working_list empty": "");
990
991                                         dm_list_iterate_items_gen(tmp_rq, &match->working_list, u.list)
992                                                 LOG_COND(log_resend_requests,
993                                                          "[%s]                %s/%u",
994                                                          SHORT_UUID(tmp_rq->u_rq.uuid),
995                                                          _RQ_TYPE(tmp_rq->u_rq.request_type),
996                                                          tmp_rq->u_rq.seq);
997                                 }
998
999                                 match->delay++;
1000                                 LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d",
1001                                          SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay);
1002                         }
1003                         rq->originator = nodeid; /* don't really need this, but nice for debug */
1004                         goto out;
1005                 }
1006         }
1007
1008         /*
1009          * We can receive messages after we do a cpg_leave but before we
1010          * get our config callback.  However, since we can't respond after
1011          * leaving, we simply return.
1012          */
1013         if (match->state == LEAVING)
1014                 return;
1015
1016         i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
1017
1018         if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) {
1019                 if (my_cluster_id == rq->originator) {
1020                         /* Redundant checkpoints ignored if match->valid */
1021                         LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u",
1022                                    SHORT_UUID(rq->u_rq.uuid), nodeid);
1023                         if (import_checkpoint(match, (match->state != INVALID))) {
1024                                 LOG_SPRINT(match,
1025                                            "[%s] Failed to import checkpoint from %u",
1026                                            SHORT_UUID(rq->u_rq.uuid), nodeid);
1027                                 LOG_ERROR("[%s] Failed to import checkpoint from %u",
1028                                           SHORT_UUID(rq->u_rq.uuid), nodeid);
1029                                 kill(getpid(), SIGUSR1);
1030                                 /* Could we retry? */
1031                                 goto out;
1032                         } else if (match->state == INVALID) {
1033                                 LOG_SPRINT(match,
1034                                            "[%s] Checkpoint data received from %u.  Log is now valid",
1035                                            SHORT_UUID(match->name.value), nodeid);
1036                                 LOG_COND(log_checkpoint,
1037                                          "[%s] Checkpoint data received from %u.  Log is now valid",
1038                                          SHORT_UUID(match->name.value), nodeid);
1039                                 match->state = VALID;
1040
1041                                 flush_startup_list(match);
1042                         } else {
1043                                 LOG_SPRINT(match,
1044                                            "[%s] Redundant checkpoint from %u ignored.",
1045                                            SHORT_UUID(rq->u_rq.uuid), nodeid);
1046                         }
1047                 }
1048                 goto out;
1049         }
1050
1051         if (rq->u_rq.request_type & DM_ULOG_RESPONSE) {
1052                 response = 1;
1053                 r = handle_cluster_response(match, rq);
1054         } else {
1055                 rq->originator = nodeid;
1056
1057                 if (match->state == LEAVING) {
1058                         LOG_ERROR("[%s]  Ignoring %s from %u.  Reason: I'm leaving",
1059                                   SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type),
1060                                   rq->originator);
1061                         goto out;
1062                 }
1063
1064                 if (match->state == INVALID) {
1065                         LOG_DBG("Log not valid yet, storing request");
1066                         tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
1067                         if (!tmp_rq) {
1068                                 LOG_ERROR("cpg_message_callback:  Unable to"
1069                                           " allocate transfer structs");
1070                                 r = -ENOMEM; /* FIXME: Better error #? */
1071                                 goto out;
1072                         }
1073
1074                         memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
1075                         tmp_rq->pit_server = match->lowest_id;
1076                         dm_list_init(&tmp_rq->u.list);
1077                         dm_list_add(&match->startup_list, &tmp_rq->u.list);
1078                         goto out;
1079                 }
1080
1081                 r = handle_cluster_request(match, rq, i_am_server);
1082         }
1083
1084         /*
1085          * If the log is now valid, we can queue the checkpoints
1086          */
1087         for (i = match->checkpoints_needed; i; ) {
1088                 struct checkpoint_data *new;
1089
1090                 if (log_get_state(&rq->u_rq) != LOG_RESUMED) {
1091                         LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)",
1092                                 SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid);
1093                         break;
1094                 }
1095
1096                 i--;
1097                 new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
1098                 if (!new) {
1099                         /* FIXME: Need better error handling */
1100                         LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
1101                                   SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1102                         break;
1103                 }
1104                 LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)",
1105                            SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i],
1106                            (log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED");
1107                 LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*",
1108                          SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1109                 match->checkpoints_needed--;
1110
1111                 new->next = match->checkpoint_list;
1112                 match->checkpoint_list = new;
1113         }
1114
1115 out:
1116         /* nothing happens after this point.  It is just for debugging */
1117         if (r) {
1118                 LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
1119                           SHORT_UUID(rq->u_rq.uuid),
1120                           _RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE),
1121                           strerror(-r));
1122                 LOG_ERROR("[%s]    Response  : %s", SHORT_UUID(rq->u_rq.uuid),
1123                           (response) ? "YES" : "NO");
1124                 LOG_ERROR("[%s]    Originator: %u",
1125                           SHORT_UUID(rq->u_rq.uuid), rq->originator);
1126                 if (response)
1127                         LOG_ERROR("[%s]    Responder : %u",
1128                                   SHORT_UUID(rq->u_rq.uuid), nodeid);
1129
1130                 LOG_ERROR("HISTORY::");
1131                 for (i = 0; i < DEBUGGING_HISTORY; i++) {
1132                         match->idx++;
1133                         match->idx = match->idx % DEBUGGING_HISTORY;
1134                         if (match->debugging[match->idx][0] == '\0')
1135                                 continue;
1136                         LOG_ERROR("%d:%d) %s", i, match->idx,
1137                                   match->debugging[match->idx]);
1138                 }
1139         } else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) ||
1140                    (rq->originator == my_cluster_id)) {
1141                 if (!response)
1142                         LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
1143                                    rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1144                                    _RQ_TYPE(rq->u_rq.request_type),
1145                                    rq->originator, (response) ? "YES" : "NO");
1146                 else
1147                         LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u",
1148                                    rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1149                                    _RQ_TYPE(rq->u_rq.request_type),
1150                                    rq->originator, (response) ? "YES" : "NO",
1151                                    nodeid);
1152         }
1153 }
1154
1155 static void cpg_join_callback(struct clog_cpg *match,
1156                               const struct cpg_address *joined,
1157                               const struct cpg_address *member_list,
1158                               size_t member_list_entries)
1159 {
1160         unsigned i;
1161         uint32_t my_pid = (uint32_t)getpid();
1162         uint32_t lowest = match->lowest_id;
1163         struct clog_request *rq;
1164         char dbuf[32];
1165
1166         /* Assign my_cluster_id */
1167         if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
1168                 my_cluster_id = joined->nodeid;
1169
1170         /* Am I the very first to join? */
1171         if (member_list_entries == 1) {
1172                 match->lowest_id = joined->nodeid;
1173                 match->state = VALID;
1174         }
1175
1176         /* If I am part of the joining list, I do not send checkpoints */
1177         if (joined->nodeid == my_cluster_id)
1178                 goto out;
1179
1180         memset(dbuf, 0, sizeof(dbuf));
1181         for (i = 0; i < member_list_entries - 1; i++)
1182                 sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid);
1183         sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid);
1184         LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint [%s]",
1185                  SHORT_UUID(match->name.value), joined->nodeid, dbuf);
1186
1187         /*
1188          * FIXME: remove checkpoint_requesters/checkpoints_needed, and use
1189          * the startup_list interface exclusively
1190          */
1191         if (dm_list_empty(&match->startup_list) && (match->state == VALID) &&
1192             (match->checkpoints_needed < MAX_CHECKPOINT_REQUESTERS)) {
1193                 match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid;
1194                 goto out;
1195         }
1196
1197         rq = malloc(DM_ULOG_REQUEST_SIZE);
1198         if (!rq) {
1199                 LOG_ERROR("cpg_config_callback: "
1200                           "Unable to allocate transfer structs");
1201                 LOG_ERROR("cpg_config_callback: "
1202                           "Unable to perform checkpoint");
1203                 goto out;
1204         }
1205         rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN;
1206         rq->originator = joined->nodeid;
1207         dm_list_init(&rq->u.list);
1208         dm_list_add(&match->startup_list, &rq->u.list);
1209
1210 out:
1211         /* Find the lowest_id, i.e. the server */
1212         match->lowest_id = member_list[0].nodeid;
1213         for (i = 0; i < member_list_entries; i++)
1214                 if (match->lowest_id > member_list[i].nodeid)
1215                         match->lowest_id = member_list[i].nodeid;
1216
1217         if (lowest == 0xDEAD)
1218                 LOG_COND(log_membership_change, "[%s]  Server change <none> -> %u (%u %s)",
1219                          SHORT_UUID(match->name.value), match->lowest_id,
1220                          joined->nodeid, (member_list_entries == 1) ?
1221                          "is first to join" : "joined");
1222         else if (lowest != match->lowest_id)
1223                 LOG_COND(log_membership_change, "[%s]  Server change %u -> %u (%u joined)",
1224                          SHORT_UUID(match->name.value), lowest,
1225                          match->lowest_id, joined->nodeid);
1226         else
1227                 LOG_COND(log_membership_change, "[%s]  Server unchanged at %u (%u joined)",
1228                          SHORT_UUID(match->name.value),
1229                          lowest, joined->nodeid);
1230         LOG_SPRINT(match, "+++  UUID=%s  %u join  +++",
1231                    SHORT_UUID(match->name.value), joined->nodeid);
1232 }
1233
1234 static void cpg_leave_callback(struct clog_cpg *match,
1235                                const struct cpg_address *left,
1236                                const struct cpg_address *member_list,
1237                                size_t member_list_entries)
1238 {
1239         unsigned i;
1240         int j, fd;
1241         uint32_t lowest = match->lowest_id;
1242         struct clog_request *rq, *n;
1243         struct checkpoint_data *p_cp, *c_cp;
1244
1245         LOG_SPRINT(match, "---  UUID=%s  %u left  ---",
1246                    SHORT_UUID(match->name.value), left->nodeid);
1247
1248         /* Am I leaving? */
1249         if (my_cluster_id == left->nodeid) {
1250                 LOG_DBG("Finalizing leave...");
1251                 dm_list_del(&match->list);
1252
1253                 cpg_fd_get(match->handle, &fd);
1254                 links_unregister(fd);
1255
1256                 cluster_postsuspend(match->name.value, match->luid);
1257
1258                 dm_list_iterate_items_gen_safe(rq, n, &match->working_list, u.list) {
1259                         dm_list_del(&rq->u.list);
1260
1261                         if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND)
1262                                 kernel_send(&rq->u_rq);
1263                         free(rq);
1264                 }
1265
1266                 cpg_finalize(match->handle);
1267
1268                 match->free_me = 1;
1269                 match->lowest_id = 0xDEAD;
1270                 match->state = INVALID;
1271         }                       
1272
1273         /* Remove any pending checkpoints for the leaving node. */
1274         for (p_cp = NULL, c_cp = match->checkpoint_list;
1275              c_cp && (c_cp->requester != left->nodeid);
1276              p_cp = c_cp, c_cp = c_cp->next);
1277         if (c_cp) {
1278                 if (p_cp)
1279                         p_cp->next = c_cp->next;
1280                 else
1281                         match->checkpoint_list = c_cp->next;
1282
1283                 LOG_COND(log_checkpoint,
1284                          "[%s] Removing pending checkpoint (%u is leaving)",
1285                          SHORT_UUID(match->name.value), left->nodeid);
1286                 free_checkpoint(c_cp);
1287         }
1288         dm_list_iterate_items_gen_safe(rq, n, &match->startup_list, u.list) {
1289                 if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) &&
1290                     (rq->originator == left->nodeid)) {
1291                         LOG_COND(log_checkpoint,
1292                                  "[%s] Removing pending ckpt from startup list (%u is leaving)",
1293                                  SHORT_UUID(match->name.value), left->nodeid);
1294                         dm_list_del(&rq->u.list);
1295                         free(rq);
1296                 }
1297         }
1298         for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) {
1299                 match->checkpoint_requesters[j] = match->checkpoint_requesters[i];
1300                 if (match->checkpoint_requesters[i] == left->nodeid) {
1301                         LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)",
1302                                   SHORT_UUID(match->name.value), left->nodeid);
1303                         j--;
1304                 }
1305         }
1306         match->checkpoints_needed = j;
1307
1308         if (left->nodeid < my_cluster_id) {
1309                 match->delay = (match->delay > 0) ? match->delay - 1 : 0;
1310                 if (!match->delay && dm_list_empty(&match->working_list))
1311                         match->resend_requests = 0;
1312                 LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s",
1313                          SHORT_UUID(match->name.value), left->nodeid,
1314                          match->delay, (dm_list_empty(&match->working_list)) ?
1315                          " -- working_list empty": "");
1316         }
1317
1318         /* Find the lowest_id, i.e. the server */
1319         if (!member_list_entries) {
1320                 match->lowest_id = 0xDEAD;
1321                 LOG_COND(log_membership_change, "[%s]  Server change %u -> <none> "
1322                          "(%u is last to leave)",
1323                          SHORT_UUID(match->name.value), left->nodeid,
1324                          left->nodeid);
1325                 return;
1326         }
1327                 
1328         match->lowest_id = member_list[0].nodeid;
1329         for (i = 0; i < member_list_entries; i++)
1330                 if (match->lowest_id > member_list[i].nodeid)
1331                         match->lowest_id = member_list[i].nodeid;
1332
1333         if (lowest != match->lowest_id) {
1334                 LOG_COND(log_membership_change, "[%s]  Server change %u -> %u (%u left)",
1335                          SHORT_UUID(match->name.value), lowest,
1336                          match->lowest_id, left->nodeid);
1337         } else
1338                 LOG_COND(log_membership_change, "[%s]  Server unchanged at %u (%u left)",
1339                          SHORT_UUID(match->name.value), lowest, left->nodeid);
1340
1341         if ((match->state == INVALID) && !match->free_me) {
1342                 /*
1343                  * If all CPG members are waiting for checkpoints and they
1344                  * are all present in my startup_list, then I was the first to
1345                  * join and I must assume control.
1346                  *
1347                  * We do not normally end up here, but if there was a quick
1348                  * 'resume -> suspend -> resume' across the cluster, we may
1349                  * have initially thought we were not the first to join because
1350                  * of the presence of out-going (and unable to respond) members.
1351                  */
1352
1353                 i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */
1354                 dm_list_iterate_items_gen(rq, &match->startup_list, u.list)
1355                         if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN)
1356                                 i++;
1357
1358                 if (i == member_list_entries) {
1359                         /* 
1360                          * Last node who could have given me a checkpoint just left.
1361                          * Setting log state to VALID and acting as 'first join'.
1362                          */
1363                         match->state = VALID;
1364                         flush_startup_list(match);
1365                 }
1366         }
1367 }
1368
1369 static void cpg_config_callback(cpg_handle_t handle, const struct cpg_name *gname __attribute__((unused)),
1370                                 const struct cpg_address *member_list,
1371                                 size_t member_list_entries,
1372                                 const struct cpg_address *left_list,
1373                                 size_t left_list_entries,
1374                                 const struct cpg_address *joined_list,
1375                                 size_t joined_list_entries)
1376 {
1377         struct clog_cpg *match;
1378         int found = 0;
1379
1380         dm_list_iterate_items(match, &clog_cpg_list)
1381                 if (match->handle == handle) {
1382                         found = 1;
1383                         break;
1384                 }
1385
1386         if (!found) {
1387                 LOG_ERROR("Unable to find match for CPG config callback");
1388                 return;
1389         }
1390
1391         if ((joined_list_entries + left_list_entries) > 1)
1392                 LOG_ERROR("[%s]  More than one node joining/leaving",
1393                           SHORT_UUID(match->name.value));
1394
1395         if (joined_list_entries)
1396                 cpg_join_callback(match, joined_list,
1397                                   member_list, member_list_entries);
1398         else
1399                 cpg_leave_callback(match, left_list,
1400                                    member_list, member_list_entries);
1401 }
1402
1403 cpg_callbacks_t cpg_callbacks = {
1404         .cpg_deliver_fn = cpg_message_callback,
1405         .cpg_confchg_fn = cpg_config_callback,
1406 };
1407
1408 /*
1409  * remove_checkpoint
1410  * @entry
1411  *
1412  * Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error
1413  */
1414 static int remove_checkpoint(struct clog_cpg *entry)
1415 {
1416         int len;
1417         SaNameT name;
1418         SaAisErrorT rv;
1419         SaCkptCheckpointHandleT h;
1420
1421         len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
1422                        SHORT_UUID(entry->name.value), my_cluster_id);
1423         name.length = len;
1424
1425 open_retry:
1426         rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
1427                                   SA_CKPT_CHECKPOINT_READ, 0, &h);
1428         if (rv == SA_AIS_ERR_TRY_AGAIN) {
1429                 LOG_ERROR("abort_startup: ckpt open retry");
1430                 usleep(1000);
1431                 goto open_retry;
1432         }
1433
1434         if (rv != SA_AIS_OK)
1435                 return 0;
1436
1437         LOG_DBG("[%s]  Removing checkpoint", SHORT_UUID(entry->name.value));
1438 unlink_retry:
1439         rv = saCkptCheckpointUnlink(ckpt_handle, &name);
1440         if (rv == SA_AIS_ERR_TRY_AGAIN) {
1441                 LOG_ERROR("abort_startup: ckpt unlink retry");
1442                 usleep(1000);
1443                 goto unlink_retry;
1444         }
1445         
1446         if (rv != SA_AIS_OK) {
1447                 LOG_ERROR("[%s] Failed to unlink checkpoint: %s",
1448                           SHORT_UUID(entry->name.value), str_ais_error(rv));
1449                 return -EIO;
1450         }
1451
1452         saCkptCheckpointClose(h);
1453
1454         return 1;
1455 }
1456
1457 int create_cluster_cpg(char *uuid, uint64_t luid)
1458 {
1459         int r;
1460         size_t size;
1461         struct clog_cpg *new = NULL;
1462         struct clog_cpg *tmp;
1463
1464         dm_list_iterate_items(tmp, &clog_cpg_list)
1465                 if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) {
1466                         LOG_ERROR("Log entry already exists: %s", uuid);
1467                         return -EEXIST;
1468                 }
1469
1470         new = malloc(sizeof(*new));
1471         if (!new) {
1472                 LOG_ERROR("Unable to allocate memory for clog_cpg");
1473                 return -ENOMEM;
1474         }
1475         memset(new, 0, sizeof(*new));
1476         dm_list_init(&new->list);
1477         new->lowest_id = 0xDEAD;
1478         dm_list_init(&new->startup_list);
1479         dm_list_init(&new->working_list);
1480
1481         size = ((strlen(uuid) + 1) > CPG_MAX_NAME_LENGTH) ?
1482                 CPG_MAX_NAME_LENGTH : (strlen(uuid) + 1);
1483         strncpy(new->name.value, uuid, size);
1484         new->name.length = (uint32_t)size;
1485         new->luid = luid;
1486
1487         /*
1488          * Ensure there are no stale checkpoints around before we join
1489          */
1490         if (remove_checkpoint(new) == 1)
1491                 LOG_COND(log_checkpoint,
1492                          "[%s]  Removing checkpoints left from previous session",
1493                          SHORT_UUID(new->name.value));
1494
1495         r = cpg_initialize(&new->handle, &cpg_callbacks);
1496         if (r != SA_AIS_OK) {
1497                 LOG_ERROR("cpg_initialize failed:  Cannot join cluster");
1498                 free(new);
1499                 return -EPERM;
1500         }
1501
1502         r = cpg_join(new->handle, &new->name);
1503         if (r != SA_AIS_OK) {
1504                 LOG_ERROR("cpg_join failed:  Cannot join cluster");
1505                 free(new);
1506                 return -EPERM;
1507         }
1508
1509         new->cpg_state = VALID;
1510         dm_list_add(&clog_cpg_list, &new->list);
1511         LOG_DBG("New   handle: %llu", (unsigned long long)new->handle);
1512         LOG_DBG("New   name: %s", new->name.value);
1513
1514         /* FIXME: better variable */
1515         cpg_fd_get(new->handle, &r);
1516         links_register(r, "cluster", do_cluster_work, NULL);
1517
1518         return 0;
1519 }
1520
1521 static void abort_startup(struct clog_cpg *del)
1522 {
1523         struct clog_request *rq, *n;
1524
1525         LOG_DBG("[%s]  CPG teardown before checkpoint received",
1526                 SHORT_UUID(del->name.value));
1527
1528         dm_list_iterate_items_gen_safe(rq, n, &del->startup_list, u.list) {
1529                 dm_list_del(&rq->u.list);
1530
1531                 LOG_DBG("[%s]  Ignoring request from %u: %s",
1532                         SHORT_UUID(del->name.value), rq->originator,
1533                         _RQ_TYPE(rq->u_rq.request_type));
1534                 free(rq);
1535         }
1536
1537         remove_checkpoint(del);
1538 }
1539
1540 static int _destroy_cluster_cpg(struct clog_cpg *del)
1541 {
1542         int r;
1543         int state;
1544         
1545         LOG_COND(log_resend_requests, "[%s] I am leaving.2.....",
1546                  SHORT_UUID(del->name.value));
1547
1548         /*
1549          * We must send any left over checkpoints before
1550          * leaving.  If we don't, an incoming node could
1551          * be stuck with no checkpoint and stall.
1552          do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS:
1553
1554          - Incoming node deletes old checkpoints before joining
1555          - A stale checkpoint is issued here by leaving node
1556          - (leaving node leaves)
1557          - Incoming node joins cluster and finds stale checkpoint.
1558          - (leaving node leaves - option 2)
1559         */
1560         do_checkpoints(del, 1);
1561
1562         state = del->state;
1563
1564         del->cpg_state = INVALID;
1565         del->state = LEAVING;
1566
1567         /*
1568          * If the state is VALID, we might be processing the
1569          * startup list.  If so, we certainly don't want to
1570          * clear the startup_list here by calling abort_startup
1571          */
1572         if (!dm_list_empty(&del->startup_list) && (state != VALID))
1573                 abort_startup(del);
1574
1575         r = cpg_leave(del->handle, &del->name);
1576         if (r != CPG_OK)
1577                 LOG_ERROR("Error leaving CPG!");
1578         return 0;
1579 }
1580
1581 int destroy_cluster_cpg(char *uuid)
1582 {
1583         struct clog_cpg *del, *tmp;
1584
1585         dm_list_iterate_items_safe(del, tmp, &clog_cpg_list)
1586                 if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH))
1587                         _destroy_cluster_cpg(del);
1588
1589         return 0;
1590 }
1591
1592 int init_cluster(void)
1593 {
1594         SaAisErrorT rv;
1595
1596         dm_list_init(&clog_cpg_list);
1597         rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
1598
1599         if (rv != SA_AIS_OK)
1600                 return EXIT_CLUSTER_CKPT_INIT;
1601
1602         return 0;
1603 }
1604
1605 void cleanup_cluster(void)
1606 {
1607         SaAisErrorT err;
1608
1609         err = saCkptFinalize(ckpt_handle);
1610         if (err != SA_AIS_OK)
1611                 LOG_ERROR("Failed to finalize checkpoint handle");
1612 }
1613
1614 void cluster_debug(void)
1615 {
1616         struct checkpoint_data *cp;
1617         struct clog_cpg *entry;
1618         struct clog_request *rq;
1619         int i;
1620
1621         LOG_ERROR("");
1622         LOG_ERROR("CLUSTER COMPONENT DEBUGGING::");
1623         dm_list_iterate_items(entry, &clog_cpg_list) {
1624                 LOG_ERROR("%s::", SHORT_UUID(entry->name.value));
1625                 LOG_ERROR("  lowest_id         : %u", entry->lowest_id);
1626                 LOG_ERROR("  state             : %s", (entry->state == INVALID) ?
1627                           "INVALID" : (entry->state == VALID) ? "VALID" :
1628                           (entry->state == LEAVING) ? "LEAVING" : "UNKNOWN");
1629                 LOG_ERROR("  cpg_state         : %d", entry->cpg_state);
1630                 LOG_ERROR("  free_me           : %d", entry->free_me);
1631                 LOG_ERROR("  delay             : %d", entry->delay);
1632                 LOG_ERROR("  resend_requests   : %d", entry->resend_requests);
1633                 LOG_ERROR("  checkpoints_needed: %d", entry->checkpoints_needed);
1634                 for (i = 0, cp = entry->checkpoint_list;
1635                      i < MAX_CHECKPOINT_REQUESTERS; i++)
1636                         if (cp)
1637                                 cp = cp->next;
1638                         else
1639                                 break;
1640                 LOG_ERROR("  CKPTs waiting     : %d", i);
1641                 LOG_ERROR("  Working list:");
1642                 dm_list_iterate_items_gen(rq, &entry->working_list, u.list)
1643                         LOG_ERROR("  %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1644                                   rq->u_rq.seq);
1645
1646                 LOG_ERROR("  Startup list:");
1647                 dm_list_iterate_items_gen(rq, &entry->startup_list, u.list)
1648                         LOG_ERROR("  %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1649                                   rq->u_rq.seq);
1650
1651                 LOG_ERROR("Command History:");
1652                 for (i = 0; i < DEBUGGING_HISTORY; i++) {
1653                         entry->idx++;
1654                         entry->idx = entry->idx % DEBUGGING_HISTORY;
1655                         if (entry->debugging[entry->idx][0] == '\0')
1656                                 continue;
1657                         LOG_ERROR("%d:%d) %s", i, entry->idx,
1658                                   entry->debugging[entry->idx]);
1659                 }
1660         }
1661 }