2 * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
4 * This copyrighted material is made available to anyone wishing to use,
5 * modify, copy, or redistribute it subject to the terms and conditions
6 * of the GNU Lesser General Public License v.2.1.
8 * You should have received a copy of the GNU Lesser General Public License
9 * along with this program; if not, write to the Free Software Foundation,
10 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 #include "functions.h"
21 #include <corosync/cpg.h>
23 #include <openais/saAis.h>
24 #include <openais/saCkpt.h>
28 /* Open AIS error codes */
29 #define str_ais_error(x) \
30 ((x) == SA_AIS_OK) ? "SA_AIS_OK" : \
31 ((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" : \
32 ((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" : \
33 ((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" : \
34 ((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" : \
35 ((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" : \
36 ((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \
37 ((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" : \
38 ((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" : \
39 ((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" : \
40 ((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" : \
41 ((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" : \
42 ((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \
43 ((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" : \
44 ((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" : \
45 ((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" : \
46 ((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \
47 ((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" : \
48 ((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \
49 ((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \
50 ((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \
51 ((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \
52 ((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" : \
53 ((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \
54 ((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" : \
55 ((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" : \
56 ((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" : \
60 ((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \
61 ((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN": \
62 RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
64 static uint32_t my_cluster_id = 0xDEAD;
65 static SaCkptHandleT ckpt_handle = 0;
66 static SaCkptCallbacksT callbacks = { 0, 0 };
67 static SaVersionT version = { 'B', 1, 1 };
69 #define DEBUGGING_HISTORY 100
70 //static char debugging[DEBUGGING_HISTORY][128];
72 #define LOG_SPRINT(cc, f, arg...) do { \
74 cc->idx = cc->idx % DEBUGGING_HISTORY; \
75 sprintf(cc->debugging[cc->idx], f, ## arg); \
78 static int log_resp_rec = 0;
80 struct checkpoint_data {
82 char uuid[CPG_MAX_NAME_LENGTH];
84 int bitmap_size; /* in bytes */
87 char *recovering_region;
88 struct checkpoint_data *next;
95 #define MAX_CHECKPOINT_REQUESTERS 10
101 struct cpg_name name;
104 /* Are we the first, or have we received checkpoint? */
106 int cpg_state; /* FIXME: debugging */
110 struct dm_list startup_list;
111 struct dm_list working_list;
113 int checkpoints_needed;
114 uint32_t checkpoint_requesters[MAX_CHECKPOINT_REQUESTERS];
115 struct checkpoint_data *checkpoint_list;
117 char debugging[DEBUGGING_HISTORY][128];
120 static struct dm_list clog_cpg_list;
126 * Returns: 0 on success, -Exxx on error
128 int cluster_send(struct clog_request *rq)
134 struct clog_cpg *entry;
136 dm_list_iterate_items(entry, &clog_cpg_list)
137 if (!strncmp(entry->name.value, rq->u_rq.uuid,
138 CPG_MAX_NAME_LENGTH)) {
144 rq->u_rq.error = -ENOENT;
149 * Once the request heads for the cluster, the luid looses
155 iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size;
157 rq->u.version[0] = xlate64(CLOG_TFR_VERSION);
158 rq->u.version[1] = CLOG_TFR_VERSION;
160 r = clog_request_to_network(rq);
162 /* FIXME: Better error code for byteswap failure? */
165 if (entry->cpg_state != VALID)
169 r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
170 if (r != SA_AIS_ERR_TRY_AGAIN)
174 LOG_PRINT("[%s] Retry #%d of cpg_mcast_joined: %s",
175 SHORT_UUID(rq->u_rq.uuid), count,
177 else if ((count < 100) && !(count % 10))
178 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
179 SHORT_UUID(rq->u_rq.uuid), count,
181 else if ((count < 1000) && !(count % 100))
182 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
183 SHORT_UUID(rq->u_rq.uuid), count,
185 else if ((count < 10000) && !(count % 1000))
186 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s - "
187 "OpenAIS not handling the load?",
188 SHORT_UUID(rq->u_rq.uuid), count,
196 /* error codes found in openais/cpg.h */
197 LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r));
199 rq->u_rq.error = -EBADE;
203 static struct clog_request *get_matching_rq(struct clog_request *rq,
206 struct clog_request *match, *n;
208 dm_list_iterate_items_gen_safe(match, n, l, u.list)
209 if (match->u_rq.seq == rq->u_rq.seq) {
210 dm_list_del(&match->u.list);
217 static char rq_buffer[DM_ULOG_REQUEST_SIZE];
218 static int handle_cluster_request(struct clog_cpg *entry __attribute__((unused)),
219 struct clog_request *rq, int server)
222 struct clog_request *tmp = (struct clog_request *)rq_buffer;
225 * We need a separate dm_ulog_request struct, one that can carry
226 * a return payload. Otherwise, the memory address after
227 * rq will be altered - leading to problems
229 memset(rq_buffer, 0, sizeof(rq_buffer));
230 memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size);
233 * With resumes, we only handle our own.
234 * Resume is a special case that requires
235 * local action (to set up CPG), followed by
236 * a cluster action to co-ordinate reading
237 * the disk and checkpointing
239 if (tmp->u_rq.request_type == DM_ULOG_RESUME) {
240 if (tmp->originator == my_cluster_id) {
241 r = do_request(tmp, server);
243 r = kernel_send(&tmp->u_rq);
245 LOG_ERROR("Failed to send resume response to kernel");
250 r = do_request(tmp, server);
253 (tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
254 (tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) {
255 tmp->u_rq.request_type |= DM_ULOG_RESPONSE;
258 * Errors from previous functions are in the rq struct.
260 r = cluster_send(tmp);
262 LOG_ERROR("cluster_send failed: %s", strerror(-r));
268 static int handle_cluster_response(struct clog_cpg *entry,
269 struct clog_request *rq)
272 struct clog_request *orig_rq;
275 * If I didn't send it, then I don't care about the response
277 if (rq->originator != my_cluster_id)
280 rq->u_rq.request_type &= ~DM_ULOG_RESPONSE;
281 orig_rq = get_matching_rq(rq, &entry->working_list);
284 /* Unable to find match for response */
286 LOG_ERROR("[%s] No match for cluster response: %s:%u",
287 SHORT_UUID(rq->u_rq.uuid),
288 _RQ_TYPE(rq->u_rq.request_type),
291 LOG_ERROR("Current local list:");
292 if (dm_list_empty(&entry->working_list))
293 LOG_ERROR(" [none]");
295 dm_list_iterate_items_gen(orig_rq, &entry->working_list, u.list)
296 LOG_ERROR(" [%s] %s:%u",
297 SHORT_UUID(orig_rq->u_rq.uuid),
298 _RQ_TYPE(orig_rq->u_rq.request_type),
304 if (log_resp_rec > 0) {
305 LOG_COND(log_resend_requests,
306 "[%s] Response received to %s/#%u",
307 SHORT_UUID(rq->u_rq.uuid),
308 _RQ_TYPE(rq->u_rq.request_type),
313 /* FIXME: Ensure memcpy cannot explode */
314 memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
316 r = kernel_send(&orig_rq->u_rq);
318 LOG_ERROR("Failed to send response to kernel");
324 static struct clog_cpg *find_clog_cpg(cpg_handle_t handle)
326 struct clog_cpg *match;
328 dm_list_iterate_items(match, &clog_cpg_list)
329 if (match->handle == handle)
337 * @entry: clog_cpg describing the log
338 * @cp_requester: nodeid requesting the checkpoint
340 * Creates and fills in a new checkpoint_data struct.
342 * Returns: checkpoint_data on success, NULL on error
344 static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
345 uint32_t cp_requester)
348 struct checkpoint_data *new;
350 if (entry->state != VALID) {
352 * We can't store bitmaps yet, because the log is not
355 LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
360 new = malloc(sizeof(*new));
362 LOG_ERROR("Unable to create checkpoint data for %u",
366 memset(new, 0, sizeof(*new));
367 new->requester = cp_requester;
368 strncpy(new->uuid, entry->name.value, entry->name.length);
370 new->bitmap_size = push_state(entry->name.value, entry->luid,
372 &new->clean_bits, cp_requester);
373 if (new->bitmap_size <= 0) {
374 LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
380 new->bitmap_size = push_state(entry->name.value, entry->luid,
382 &new->sync_bits, cp_requester);
383 if (new->bitmap_size <= 0) {
384 LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
386 free(new->clean_bits);
391 r = push_state(entry->name.value, entry->luid,
393 &new->recovering_region, cp_requester);
395 LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
397 free(new->sync_bits);
398 free(new->clean_bits);
402 LOG_DBG("[%s] Checkpoint prepared for node %u:",
403 SHORT_UUID(new->uuid), new->requester);
404 LOG_DBG(" bitmap_size = %d", new->bitmap_size);
411 * @cp: the checkpoint_data struct to free
414 static void free_checkpoint(struct checkpoint_data *cp)
416 free(cp->recovering_region);
418 free(cp->clean_bits);
422 static int export_checkpoint(struct checkpoint_data *cp)
424 SaCkptCheckpointCreationAttributesT attr;
425 SaCkptCheckpointHandleT h;
426 SaCkptSectionIdT section_id;
427 SaCkptSectionCreationAttributesT section_attr;
428 SaCkptCheckpointOpenFlagsT flags;
431 struct clog_request *rq;
435 LOG_DBG("Sending checkpointed data to %u", cp->requester);
437 len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH,
438 "bitmaps_%s_%u", SHORT_UUID(cp->uuid), cp->requester);
439 name.length = (SaUint16T)len;
441 len = (int)strlen(cp->recovering_region) + 1;
443 attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
444 attr.checkpointSize = cp->bitmap_size * 2 + len;
446 attr.retentionDuration = SA_TIME_MAX;
447 attr.maxSections = 4; /* don't know why we need +1 */
449 attr.maxSectionSize = (cp->bitmap_size > len) ? cp->bitmap_size : len;
450 attr.maxSectionIdSize = 22;
452 flags = SA_CKPT_CHECKPOINT_READ |
453 SA_CKPT_CHECKPOINT_WRITE |
454 SA_CKPT_CHECKPOINT_CREATE;
457 rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h);
458 if (rv == SA_AIS_ERR_TRY_AGAIN) {
459 LOG_ERROR("export_checkpoint: ckpt open retry");
464 if (rv == SA_AIS_ERR_EXIST) {
465 LOG_DBG("export_checkpoint: checkpoint already exists");
469 if (rv != SA_AIS_OK) {
470 LOG_ERROR("[%s] Failed to open checkpoint for %u: %s",
471 SHORT_UUID(cp->uuid), cp->requester,
473 return -EIO; /* FIXME: better error */
477 * Add section for sync_bits
479 section_id.idLen = (SaUint16T)snprintf(buf, 32, "sync_bits");
480 section_id.id = (unsigned char *)buf;
481 section_attr.sectionId = §ion_id;
482 section_attr.expirationTime = SA_TIME_END;
485 rv = saCkptSectionCreate(h, §ion_attr,
486 cp->sync_bits, cp->bitmap_size);
487 if (rv == SA_AIS_ERR_TRY_AGAIN) {
488 LOG_ERROR("Sync checkpoint section create retry");
490 goto sync_create_retry;
493 if (rv == SA_AIS_ERR_EXIST) {
494 LOG_DBG("Sync checkpoint section already exists");
495 saCkptCheckpointClose(h);
499 if (rv != SA_AIS_OK) {
500 LOG_ERROR("Sync checkpoint section creation failed: %s",
502 saCkptCheckpointClose(h);
503 return -EIO; /* FIXME: better error */
507 * Add section for clean_bits
509 section_id.idLen = snprintf(buf, 32, "clean_bits");
510 section_id.id = (unsigned char *)buf;
511 section_attr.sectionId = §ion_id;
512 section_attr.expirationTime = SA_TIME_END;
515 rv = saCkptSectionCreate(h, §ion_attr, cp->clean_bits, cp->bitmap_size);
516 if (rv == SA_AIS_ERR_TRY_AGAIN) {
517 LOG_ERROR("Clean checkpoint section create retry");
519 goto clean_create_retry;
522 if (rv == SA_AIS_ERR_EXIST) {
523 LOG_DBG("Clean checkpoint section already exists");
524 saCkptCheckpointClose(h);
528 if (rv != SA_AIS_OK) {
529 LOG_ERROR("Clean checkpoint section creation failed: %s",
531 saCkptCheckpointClose(h);
532 return -EIO; /* FIXME: better error */
536 * Add section for recovering_region
538 section_id.idLen = snprintf(buf, 32, "recovering_region");
539 section_id.id = (unsigned char *)buf;
540 section_attr.sectionId = §ion_id;
541 section_attr.expirationTime = SA_TIME_END;
544 rv = saCkptSectionCreate(h, §ion_attr, cp->recovering_region,
545 strlen(cp->recovering_region) + 1);
546 if (rv == SA_AIS_ERR_TRY_AGAIN) {
547 LOG_ERROR("RR checkpoint section create retry");
549 goto rr_create_retry;
552 if (rv == SA_AIS_ERR_EXIST) {
553 LOG_DBG("RR checkpoint section already exists");
554 saCkptCheckpointClose(h);
558 if (rv != SA_AIS_OK) {
559 LOG_ERROR("RR checkpoint section creation failed: %s",
561 saCkptCheckpointClose(h);
562 return -EIO; /* FIXME: better error */
565 LOG_DBG("export_checkpoint: closing checkpoint");
566 saCkptCheckpointClose(h);
568 rq = malloc(DM_ULOG_REQUEST_SIZE);
570 LOG_ERROR("export_checkpoint: Unable to allocate transfer structs");
573 memset(rq, 0, sizeof(*rq));
575 dm_list_init(&rq->u.list);
576 rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
577 rq->originator = cp->requester; /* FIXME: hack to overload meaning of originator */
578 strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
579 rq->u_rq.seq = my_cluster_id;
581 r = cluster_send(rq);
583 LOG_ERROR("Failed to send checkpoint ready notice: %s",
590 static int import_checkpoint(struct clog_cpg *entry, int no_read)
593 SaCkptCheckpointHandleT h;
594 SaCkptSectionIterationHandleT itr;
595 SaCkptSectionDescriptorT desc;
596 SaCkptIOVectorElementT iov;
602 bitmap = malloc(1024*1024);
606 len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
607 SHORT_UUID(entry->name.value), my_cluster_id);
608 name.length = (SaUint16T)len;
611 rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
612 SA_CKPT_CHECKPOINT_READ, 0, &h);
613 if (rv == SA_AIS_ERR_TRY_AGAIN) {
614 LOG_ERROR("import_checkpoint: ckpt open retry");
619 if (rv != SA_AIS_OK) {
620 LOG_ERROR("[%s] Failed to open checkpoint: %s",
621 SHORT_UUID(entry->name.value), str_ais_error(rv));
622 return -EIO; /* FIXME: better error */
626 rv = saCkptCheckpointUnlink(ckpt_handle, &name);
627 if (rv == SA_AIS_ERR_TRY_AGAIN) {
628 LOG_ERROR("import_checkpoint: ckpt unlink retry");
634 LOG_DBG("Checkpoint for this log already received");
639 rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
641 if (rv == SA_AIS_ERR_TRY_AGAIN) {
642 LOG_ERROR("import_checkpoint: sync create retry");
647 if (rv != SA_AIS_OK) {
648 LOG_ERROR("[%s] Sync checkpoint section creation failed: %s",
649 SHORT_UUID(entry->name.value), str_ais_error(rv));
650 return -EIO; /* FIXME: better error */
655 rv = saCkptSectionIterationNext(itr, &desc);
658 else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len)
660 else if (rv != SA_AIS_ERR_TRY_AGAIN) {
661 LOG_ERROR("saCkptSectionIterationNext failure: %d", rv);
665 saCkptSectionIterationFinalize(itr);
667 LOG_ERROR("import_checkpoint: %d checkpoint sections found",
672 saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
676 rv = saCkptSectionIterationNext(itr, &desc);
677 if (rv == SA_AIS_ERR_NO_SECTIONS)
680 if (rv == SA_AIS_ERR_TRY_AGAIN) {
681 LOG_ERROR("import_checkpoint: ckpt iternext retry");
686 if (rv != SA_AIS_OK) {
687 LOG_ERROR("import_checkpoint: clean checkpoint section "
688 "creation failed: %s", str_ais_error(rv));
689 rtn = -EIO; /* FIXME: better error */
693 if (!desc.sectionSize) {
694 LOG_ERROR("Checkpoint section empty");
698 memset(bitmap, 0, sizeof(*bitmap));
699 iov.sectionId = desc.sectionId;
700 iov.dataBuffer = bitmap;
701 iov.dataSize = desc.sectionSize;
705 rv = saCkptCheckpointRead(h, &iov, 1, NULL);
706 if (rv == SA_AIS_ERR_TRY_AGAIN) {
707 LOG_ERROR("ckpt read retry");
712 if (rv != SA_AIS_OK) {
713 LOG_ERROR("import_checkpoint: ckpt read error: %s",
715 rtn = -EIO; /* FIXME: better error */
720 if (pull_state(entry->name.value, entry->luid,
721 (char *)desc.sectionId.id, bitmap,
723 LOG_ERROR("Error loading state");
728 /* Need to request new checkpoint */
735 saCkptSectionIterationFinalize(itr);
737 saCkptCheckpointClose(h);
743 static void do_checkpoints(struct clog_cpg *entry, int leaving)
745 struct checkpoint_data *cp;
747 for (cp = entry->checkpoint_list; cp;) {
749 * FIXME: Check return code. Could send failure
750 * notice in rq in export_checkpoint function
751 * by setting rq->error
753 switch (export_checkpoint(cp)) {
755 LOG_SPRINT(entry, "[%s] Checkpoint for %u already handled%s",
756 SHORT_UUID(entry->name.value), cp->requester,
757 (leaving) ? "(L)": "");
758 LOG_COND(log_checkpoint,
759 "[%s] Checkpoint for %u already handled%s",
760 SHORT_UUID(entry->name.value), cp->requester,
761 (leaving) ? "(L)": "");
762 entry->checkpoint_list = cp->next;
764 cp = entry->checkpoint_list;
767 LOG_SPRINT(entry, "[%s] Checkpoint data available for node %u%s",
768 SHORT_UUID(entry->name.value), cp->requester,
769 (leaving) ? "(L)": "");
770 LOG_COND(log_checkpoint,
771 "[%s] Checkpoint data available for node %u%s",
772 SHORT_UUID(entry->name.value), cp->requester,
773 (leaving) ? "(L)": "");
774 entry->checkpoint_list = cp->next;
776 cp = entry->checkpoint_list;
779 /* FIXME: Skipping will cause list corruption */
780 LOG_ERROR("[%s] Failed to export checkpoint for %u%s",
781 SHORT_UUID(entry->name.value), cp->requester,
782 (leaving) ? "(L)": "");
787 static int resend_requests(struct clog_cpg *entry)
790 struct clog_request *rq, *n;
792 if (!entry->resend_requests || entry->delay)
795 if (entry->state != VALID)
798 entry->resend_requests = 0;
800 dm_list_iterate_items_gen_safe(rq, n, &entry->working_list, u.list) {
801 dm_list_del(&rq->u.list);
803 if (strcmp(entry->name.value, rq->u_rq.uuid)) {
804 LOG_ERROR("[%s] Stray request from another log (%s)",
805 SHORT_UUID(entry->name.value),
806 SHORT_UUID(rq->u_rq.uuid));
811 switch (rq->u_rq.request_type) {
812 case DM_ULOG_SET_REGION_SYNC:
814 * Some requests simply do not need to be resent.
815 * If it is a request that just changes log state,
816 * then it doesn't need to be resent (everyone makes
819 LOG_COND(log_resend_requests,
820 "[%s] Skipping resend of %s/#%u...",
821 SHORT_UUID(entry->name.value),
822 _RQ_TYPE(rq->u_rq.request_type),
824 LOG_SPRINT(entry, "### No resend: [%s] %s/%u ###",
825 SHORT_UUID(entry->name.value),
826 _RQ_TYPE(rq->u_rq.request_type),
829 rq->u_rq.data_size = 0;
830 kernel_send(&rq->u_rq);
836 * If an action or a response is required, then
837 * the request must be resent.
839 LOG_COND(log_resend_requests,
840 "[%s] Resending %s(#%u) due to new server(%u)",
841 SHORT_UUID(entry->name.value),
842 _RQ_TYPE(rq->u_rq.request_type),
843 rq->u_rq.seq, entry->lowest_id);
844 LOG_SPRINT(entry, "*** Resending: [%s] %s/%u ***",
845 SHORT_UUID(entry->name.value),
846 _RQ_TYPE(rq->u_rq.request_type),
848 r = cluster_send(rq);
850 LOG_ERROR("Failed resend");
858 static int do_cluster_work(void *data __attribute__((unused)))
861 struct clog_cpg *entry, *tmp;
863 dm_list_iterate_items_safe(entry, tmp, &clog_cpg_list) {
864 r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
866 LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
868 if (entry->free_me) {
872 do_checkpoints(entry, 0);
874 resend_requests(entry);
877 return (r == SA_AIS_OK) ? 0 : -1; /* FIXME: good error number? */
880 static int flush_startup_list(struct clog_cpg *entry)
884 struct clog_request *rq, *n;
885 struct checkpoint_data *new;
887 dm_list_iterate_items_gen_safe(rq, n, &entry->startup_list, u.list) {
888 dm_list_del(&rq->u.list);
890 if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) {
891 new = prepare_checkpoint(entry, rq->originator);
894 * FIXME: Need better error handling. Other nodes
895 * will be trying to send the checkpoint too, and we
896 * must continue processing the list; so report error
899 LOG_ERROR("Failed to prepare checkpoint for %u!!!",
904 LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u",
905 SHORT_UUID(entry->name.value), rq->originator);
906 LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u",
907 SHORT_UUID(entry->name.value), rq->originator);
908 new->next = entry->checkpoint_list;
909 entry->checkpoint_list = new;
911 LOG_DBG("[%s] Processing delayed request: %s",
912 SHORT_UUID(rq->u_rq.uuid),
913 _RQ_TYPE(rq->u_rq.request_type));
914 i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0;
915 r = handle_cluster_request(entry, rq, i_was_server);
919 * FIXME: If we error out here, we will never get
920 * another opportunity to retry these requests
922 LOG_ERROR("Error while processing delayed CPG message");
930 static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gname __attribute__((unused)),
931 uint32_t nodeid, uint32_t pid __attribute__((unused)),
932 void *msg, size_t msg_len)
938 struct clog_request *rq = msg;
939 struct clog_request *tmp_rq;
940 struct clog_cpg *match;
942 if (clog_request_from_network(rq, msg_len) < 0)
943 /* Error message comes from 'clog_request_from_network' */
946 match = find_clog_cpg(handle);
948 LOG_ERROR("Unable to find clog_cpg for cluster message");
952 if ((nodeid == my_cluster_id) &&
953 !(rq->u_rq.request_type & DM_ULOG_RESPONSE) &&
954 (rq->u_rq.request_type != DM_ULOG_RESUME) &&
955 (rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
956 (rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) {
957 tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
960 * FIXME: It may be possible to continue... but we
961 * would not be able to resend any messages that might
962 * be necessary during membership changes
964 LOG_ERROR("[%s] Unable to record request: -ENOMEM",
965 SHORT_UUID(rq->u_rq.uuid));
968 memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
969 dm_list_init(&tmp_rq->u.list);
970 dm_list_add( &match->working_list, &tmp_rq->u.list);
973 if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) {
975 * If the server (lowest_id) indicates it is leaving,
976 * then we must resend any outstanding requests. However,
977 * we do not want to resend them if the next server in
978 * line is in the process of leaving.
980 if (nodeid == my_cluster_id) {
981 LOG_COND(log_resend_requests, "[%s] I am leaving.1.....",
982 SHORT_UUID(rq->u_rq.uuid));
984 if (nodeid < my_cluster_id) {
985 if (nodeid == match->lowest_id) {
986 match->resend_requests = 1;
987 LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s",
988 SHORT_UUID(rq->u_rq.uuid), nodeid,
989 (dm_list_empty(&match->working_list)) ? " -- working_list empty": "");
991 dm_list_iterate_items_gen(tmp_rq, &match->working_list, u.list)
992 LOG_COND(log_resend_requests,
994 SHORT_UUID(tmp_rq->u_rq.uuid),
995 _RQ_TYPE(tmp_rq->u_rq.request_type),
1000 LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d",
1001 SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay);
1003 rq->originator = nodeid; /* don't really need this, but nice for debug */
1009 * We can receive messages after we do a cpg_leave but before we
1010 * get our config callback. However, since we can't respond after
1011 * leaving, we simply return.
1013 if (match->state == LEAVING)
1016 i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
1018 if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) {
1019 if (my_cluster_id == rq->originator) {
1020 /* Redundant checkpoints ignored if match->valid */
1021 LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u",
1022 SHORT_UUID(rq->u_rq.uuid), nodeid);
1023 if (import_checkpoint(match, (match->state != INVALID))) {
1025 "[%s] Failed to import checkpoint from %u",
1026 SHORT_UUID(rq->u_rq.uuid), nodeid);
1027 LOG_ERROR("[%s] Failed to import checkpoint from %u",
1028 SHORT_UUID(rq->u_rq.uuid), nodeid);
1029 kill(getpid(), SIGUSR1);
1030 /* Could we retry? */
1032 } else if (match->state == INVALID) {
1034 "[%s] Checkpoint data received from %u. Log is now valid",
1035 SHORT_UUID(match->name.value), nodeid);
1036 LOG_COND(log_checkpoint,
1037 "[%s] Checkpoint data received from %u. Log is now valid",
1038 SHORT_UUID(match->name.value), nodeid);
1039 match->state = VALID;
1041 flush_startup_list(match);
1044 "[%s] Redundant checkpoint from %u ignored.",
1045 SHORT_UUID(rq->u_rq.uuid), nodeid);
1051 if (rq->u_rq.request_type & DM_ULOG_RESPONSE) {
1053 r = handle_cluster_response(match, rq);
1055 rq->originator = nodeid;
1057 if (match->state == LEAVING) {
1058 LOG_ERROR("[%s] Ignoring %s from %u. Reason: I'm leaving",
1059 SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type),
1064 if (match->state == INVALID) {
1065 LOG_DBG("Log not valid yet, storing request");
1066 tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
1068 LOG_ERROR("cpg_message_callback: Unable to"
1069 " allocate transfer structs");
1070 r = -ENOMEM; /* FIXME: Better error #? */
1074 memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
1075 tmp_rq->pit_server = match->lowest_id;
1076 dm_list_init(&tmp_rq->u.list);
1077 dm_list_add(&match->startup_list, &tmp_rq->u.list);
1081 r = handle_cluster_request(match, rq, i_am_server);
1085 * If the log is now valid, we can queue the checkpoints
1087 for (i = match->checkpoints_needed; i; ) {
1088 struct checkpoint_data *new;
1090 if (log_get_state(&rq->u_rq) != LOG_RESUMED) {
1091 LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)",
1092 SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid);
1097 new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
1099 /* FIXME: Need better error handling */
1100 LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
1101 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1104 LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)",
1105 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i],
1106 (log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED");
1107 LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*",
1108 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1109 match->checkpoints_needed--;
1111 new->next = match->checkpoint_list;
1112 match->checkpoint_list = new;
1116 /* nothing happens after this point. It is just for debugging */
1118 LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
1119 SHORT_UUID(rq->u_rq.uuid),
1120 _RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE),
1122 LOG_ERROR("[%s] Response : %s", SHORT_UUID(rq->u_rq.uuid),
1123 (response) ? "YES" : "NO");
1124 LOG_ERROR("[%s] Originator: %u",
1125 SHORT_UUID(rq->u_rq.uuid), rq->originator);
1127 LOG_ERROR("[%s] Responder : %u",
1128 SHORT_UUID(rq->u_rq.uuid), nodeid);
1130 LOG_ERROR("HISTORY::");
1131 for (i = 0; i < DEBUGGING_HISTORY; i++) {
1133 match->idx = match->idx % DEBUGGING_HISTORY;
1134 if (match->debugging[match->idx][0] == '\0')
1136 LOG_ERROR("%d:%d) %s", i, match->idx,
1137 match->debugging[match->idx]);
1139 } else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) ||
1140 (rq->originator == my_cluster_id)) {
1142 LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
1143 rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1144 _RQ_TYPE(rq->u_rq.request_type),
1145 rq->originator, (response) ? "YES" : "NO");
1147 LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u",
1148 rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1149 _RQ_TYPE(rq->u_rq.request_type),
1150 rq->originator, (response) ? "YES" : "NO",
1155 static void cpg_join_callback(struct clog_cpg *match,
1156 const struct cpg_address *joined,
1157 const struct cpg_address *member_list,
1158 size_t member_list_entries)
1161 uint32_t my_pid = (uint32_t)getpid();
1162 uint32_t lowest = match->lowest_id;
1163 struct clog_request *rq;
1166 /* Assign my_cluster_id */
1167 if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
1168 my_cluster_id = joined->nodeid;
1170 /* Am I the very first to join? */
1171 if (member_list_entries == 1) {
1172 match->lowest_id = joined->nodeid;
1173 match->state = VALID;
1176 /* If I am part of the joining list, I do not send checkpoints */
1177 if (joined->nodeid == my_cluster_id)
1180 memset(dbuf, 0, sizeof(dbuf));
1181 for (i = 0; i < member_list_entries - 1; i++)
1182 sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid);
1183 sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid);
1184 LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint [%s]",
1185 SHORT_UUID(match->name.value), joined->nodeid, dbuf);
1188 * FIXME: remove checkpoint_requesters/checkpoints_needed, and use
1189 * the startup_list interface exclusively
1191 if (dm_list_empty(&match->startup_list) && (match->state == VALID) &&
1192 (match->checkpoints_needed < MAX_CHECKPOINT_REQUESTERS)) {
1193 match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid;
1197 rq = malloc(DM_ULOG_REQUEST_SIZE);
1199 LOG_ERROR("cpg_config_callback: "
1200 "Unable to allocate transfer structs");
1201 LOG_ERROR("cpg_config_callback: "
1202 "Unable to perform checkpoint");
1205 rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN;
1206 rq->originator = joined->nodeid;
1207 dm_list_init(&rq->u.list);
1208 dm_list_add(&match->startup_list, &rq->u.list);
1211 /* Find the lowest_id, i.e. the server */
1212 match->lowest_id = member_list[0].nodeid;
1213 for (i = 0; i < member_list_entries; i++)
1214 if (match->lowest_id > member_list[i].nodeid)
1215 match->lowest_id = member_list[i].nodeid;
1217 if (lowest == 0xDEAD)
1218 LOG_COND(log_membership_change, "[%s] Server change <none> -> %u (%u %s)",
1219 SHORT_UUID(match->name.value), match->lowest_id,
1220 joined->nodeid, (member_list_entries == 1) ?
1221 "is first to join" : "joined");
1222 else if (lowest != match->lowest_id)
1223 LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u joined)",
1224 SHORT_UUID(match->name.value), lowest,
1225 match->lowest_id, joined->nodeid);
1227 LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u joined)",
1228 SHORT_UUID(match->name.value),
1229 lowest, joined->nodeid);
1230 LOG_SPRINT(match, "+++ UUID=%s %u join +++",
1231 SHORT_UUID(match->name.value), joined->nodeid);
1234 static void cpg_leave_callback(struct clog_cpg *match,
1235 const struct cpg_address *left,
1236 const struct cpg_address *member_list,
1237 size_t member_list_entries)
1241 uint32_t lowest = match->lowest_id;
1242 struct clog_request *rq, *n;
1243 struct checkpoint_data *p_cp, *c_cp;
1245 LOG_SPRINT(match, "--- UUID=%s %u left ---",
1246 SHORT_UUID(match->name.value), left->nodeid);
1249 if (my_cluster_id == left->nodeid) {
1250 LOG_DBG("Finalizing leave...");
1251 dm_list_del(&match->list);
1253 cpg_fd_get(match->handle, &fd);
1254 links_unregister(fd);
1256 cluster_postsuspend(match->name.value, match->luid);
1258 dm_list_iterate_items_gen_safe(rq, n, &match->working_list, u.list) {
1259 dm_list_del(&rq->u.list);
1261 if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND)
1262 kernel_send(&rq->u_rq);
1266 cpg_finalize(match->handle);
1269 match->lowest_id = 0xDEAD;
1270 match->state = INVALID;
1273 /* Remove any pending checkpoints for the leaving node. */
1274 for (p_cp = NULL, c_cp = match->checkpoint_list;
1275 c_cp && (c_cp->requester != left->nodeid);
1276 p_cp = c_cp, c_cp = c_cp->next);
1279 p_cp->next = c_cp->next;
1281 match->checkpoint_list = c_cp->next;
1283 LOG_COND(log_checkpoint,
1284 "[%s] Removing pending checkpoint (%u is leaving)",
1285 SHORT_UUID(match->name.value), left->nodeid);
1286 free_checkpoint(c_cp);
1288 dm_list_iterate_items_gen_safe(rq, n, &match->startup_list, u.list) {
1289 if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) &&
1290 (rq->originator == left->nodeid)) {
1291 LOG_COND(log_checkpoint,
1292 "[%s] Removing pending ckpt from startup list (%u is leaving)",
1293 SHORT_UUID(match->name.value), left->nodeid);
1294 dm_list_del(&rq->u.list);
1298 for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) {
1299 match->checkpoint_requesters[j] = match->checkpoint_requesters[i];
1300 if (match->checkpoint_requesters[i] == left->nodeid) {
1301 LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)",
1302 SHORT_UUID(match->name.value), left->nodeid);
1306 match->checkpoints_needed = j;
1308 if (left->nodeid < my_cluster_id) {
1309 match->delay = (match->delay > 0) ? match->delay - 1 : 0;
1310 if (!match->delay && dm_list_empty(&match->working_list))
1311 match->resend_requests = 0;
1312 LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s",
1313 SHORT_UUID(match->name.value), left->nodeid,
1314 match->delay, (dm_list_empty(&match->working_list)) ?
1315 " -- working_list empty": "");
1318 /* Find the lowest_id, i.e. the server */
1319 if (!member_list_entries) {
1320 match->lowest_id = 0xDEAD;
1321 LOG_COND(log_membership_change, "[%s] Server change %u -> <none> "
1322 "(%u is last to leave)",
1323 SHORT_UUID(match->name.value), left->nodeid,
1328 match->lowest_id = member_list[0].nodeid;
1329 for (i = 0; i < member_list_entries; i++)
1330 if (match->lowest_id > member_list[i].nodeid)
1331 match->lowest_id = member_list[i].nodeid;
1333 if (lowest != match->lowest_id) {
1334 LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u left)",
1335 SHORT_UUID(match->name.value), lowest,
1336 match->lowest_id, left->nodeid);
1338 LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u left)",
1339 SHORT_UUID(match->name.value), lowest, left->nodeid);
1341 if ((match->state == INVALID) && !match->free_me) {
1343 * If all CPG members are waiting for checkpoints and they
1344 * are all present in my startup_list, then I was the first to
1345 * join and I must assume control.
1347 * We do not normally end up here, but if there was a quick
1348 * 'resume -> suspend -> resume' across the cluster, we may
1349 * have initially thought we were not the first to join because
1350 * of the presence of out-going (and unable to respond) members.
1353 i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */
1354 dm_list_iterate_items_gen(rq, &match->startup_list, u.list)
1355 if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN)
1358 if (i == member_list_entries) {
1360 * Last node who could have given me a checkpoint just left.
1361 * Setting log state to VALID and acting as 'first join'.
1363 match->state = VALID;
1364 flush_startup_list(match);
1369 static void cpg_config_callback(cpg_handle_t handle, const struct cpg_name *gname __attribute__((unused)),
1370 const struct cpg_address *member_list,
1371 size_t member_list_entries,
1372 const struct cpg_address *left_list,
1373 size_t left_list_entries,
1374 const struct cpg_address *joined_list,
1375 size_t joined_list_entries)
1377 struct clog_cpg *match;
1380 dm_list_iterate_items(match, &clog_cpg_list)
1381 if (match->handle == handle) {
1387 LOG_ERROR("Unable to find match for CPG config callback");
1391 if ((joined_list_entries + left_list_entries) > 1)
1392 LOG_ERROR("[%s] More than one node joining/leaving",
1393 SHORT_UUID(match->name.value));
1395 if (joined_list_entries)
1396 cpg_join_callback(match, joined_list,
1397 member_list, member_list_entries);
1399 cpg_leave_callback(match, left_list,
1400 member_list, member_list_entries);
1403 cpg_callbacks_t cpg_callbacks = {
1404 .cpg_deliver_fn = cpg_message_callback,
1405 .cpg_confchg_fn = cpg_config_callback,
1412 * Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error
1414 static int remove_checkpoint(struct clog_cpg *entry)
1419 SaCkptCheckpointHandleT h;
1421 len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
1422 SHORT_UUID(entry->name.value), my_cluster_id);
1426 rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
1427 SA_CKPT_CHECKPOINT_READ, 0, &h);
1428 if (rv == SA_AIS_ERR_TRY_AGAIN) {
1429 LOG_ERROR("abort_startup: ckpt open retry");
1434 if (rv != SA_AIS_OK)
1437 LOG_DBG("[%s] Removing checkpoint", SHORT_UUID(entry->name.value));
1439 rv = saCkptCheckpointUnlink(ckpt_handle, &name);
1440 if (rv == SA_AIS_ERR_TRY_AGAIN) {
1441 LOG_ERROR("abort_startup: ckpt unlink retry");
1446 if (rv != SA_AIS_OK) {
1447 LOG_ERROR("[%s] Failed to unlink checkpoint: %s",
1448 SHORT_UUID(entry->name.value), str_ais_error(rv));
1452 saCkptCheckpointClose(h);
1457 int create_cluster_cpg(char *uuid, uint64_t luid)
1461 struct clog_cpg *new = NULL;
1462 struct clog_cpg *tmp;
1464 dm_list_iterate_items(tmp, &clog_cpg_list)
1465 if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) {
1466 LOG_ERROR("Log entry already exists: %s", uuid);
1470 new = malloc(sizeof(*new));
1472 LOG_ERROR("Unable to allocate memory for clog_cpg");
1475 memset(new, 0, sizeof(*new));
1476 dm_list_init(&new->list);
1477 new->lowest_id = 0xDEAD;
1478 dm_list_init(&new->startup_list);
1479 dm_list_init(&new->working_list);
1481 size = ((strlen(uuid) + 1) > CPG_MAX_NAME_LENGTH) ?
1482 CPG_MAX_NAME_LENGTH : (strlen(uuid) + 1);
1483 strncpy(new->name.value, uuid, size);
1484 new->name.length = (uint32_t)size;
1488 * Ensure there are no stale checkpoints around before we join
1490 if (remove_checkpoint(new) == 1)
1491 LOG_COND(log_checkpoint,
1492 "[%s] Removing checkpoints left from previous session",
1493 SHORT_UUID(new->name.value));
1495 r = cpg_initialize(&new->handle, &cpg_callbacks);
1496 if (r != SA_AIS_OK) {
1497 LOG_ERROR("cpg_initialize failed: Cannot join cluster");
1502 r = cpg_join(new->handle, &new->name);
1503 if (r != SA_AIS_OK) {
1504 LOG_ERROR("cpg_join failed: Cannot join cluster");
1509 new->cpg_state = VALID;
1510 dm_list_add(&clog_cpg_list, &new->list);
1511 LOG_DBG("New handle: %llu", (unsigned long long)new->handle);
1512 LOG_DBG("New name: %s", new->name.value);
1514 /* FIXME: better variable */
1515 cpg_fd_get(new->handle, &r);
1516 links_register(r, "cluster", do_cluster_work, NULL);
1521 static void abort_startup(struct clog_cpg *del)
1523 struct clog_request *rq, *n;
1525 LOG_DBG("[%s] CPG teardown before checkpoint received",
1526 SHORT_UUID(del->name.value));
1528 dm_list_iterate_items_gen_safe(rq, n, &del->startup_list, u.list) {
1529 dm_list_del(&rq->u.list);
1531 LOG_DBG("[%s] Ignoring request from %u: %s",
1532 SHORT_UUID(del->name.value), rq->originator,
1533 _RQ_TYPE(rq->u_rq.request_type));
1537 remove_checkpoint(del);
1540 static int _destroy_cluster_cpg(struct clog_cpg *del)
1545 LOG_COND(log_resend_requests, "[%s] I am leaving.2.....",
1546 SHORT_UUID(del->name.value));
1549 * We must send any left over checkpoints before
1550 * leaving. If we don't, an incoming node could
1551 * be stuck with no checkpoint and stall.
1552 do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS:
1554 - Incoming node deletes old checkpoints before joining
1555 - A stale checkpoint is issued here by leaving node
1556 - (leaving node leaves)
1557 - Incoming node joins cluster and finds stale checkpoint.
1558 - (leaving node leaves - option 2)
1560 do_checkpoints(del, 1);
1564 del->cpg_state = INVALID;
1565 del->state = LEAVING;
1568 * If the state is VALID, we might be processing the
1569 * startup list. If so, we certainly don't want to
1570 * clear the startup_list here by calling abort_startup
1572 if (!dm_list_empty(&del->startup_list) && (state != VALID))
1575 r = cpg_leave(del->handle, &del->name);
1577 LOG_ERROR("Error leaving CPG!");
1581 int destroy_cluster_cpg(char *uuid)
1583 struct clog_cpg *del, *tmp;
1585 dm_list_iterate_items_safe(del, tmp, &clog_cpg_list)
1586 if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH))
1587 _destroy_cluster_cpg(del);
1592 int init_cluster(void)
1596 dm_list_init(&clog_cpg_list);
1597 rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
1599 if (rv != SA_AIS_OK)
1600 return EXIT_CLUSTER_CKPT_INIT;
1605 void cleanup_cluster(void)
1609 err = saCkptFinalize(ckpt_handle);
1610 if (err != SA_AIS_OK)
1611 LOG_ERROR("Failed to finalize checkpoint handle");
1614 void cluster_debug(void)
1616 struct checkpoint_data *cp;
1617 struct clog_cpg *entry;
1618 struct clog_request *rq;
1622 LOG_ERROR("CLUSTER COMPONENT DEBUGGING::");
1623 dm_list_iterate_items(entry, &clog_cpg_list) {
1624 LOG_ERROR("%s::", SHORT_UUID(entry->name.value));
1625 LOG_ERROR(" lowest_id : %u", entry->lowest_id);
1626 LOG_ERROR(" state : %s", (entry->state == INVALID) ?
1627 "INVALID" : (entry->state == VALID) ? "VALID" :
1628 (entry->state == LEAVING) ? "LEAVING" : "UNKNOWN");
1629 LOG_ERROR(" cpg_state : %d", entry->cpg_state);
1630 LOG_ERROR(" free_me : %d", entry->free_me);
1631 LOG_ERROR(" delay : %d", entry->delay);
1632 LOG_ERROR(" resend_requests : %d", entry->resend_requests);
1633 LOG_ERROR(" checkpoints_needed: %d", entry->checkpoints_needed);
1634 for (i = 0, cp = entry->checkpoint_list;
1635 i < MAX_CHECKPOINT_REQUESTERS; i++)
1640 LOG_ERROR(" CKPTs waiting : %d", i);
1641 LOG_ERROR(" Working list:");
1642 dm_list_iterate_items_gen(rq, &entry->working_list, u.list)
1643 LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1646 LOG_ERROR(" Startup list:");
1647 dm_list_iterate_items_gen(rq, &entry->startup_list, u.list)
1648 LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1651 LOG_ERROR("Command History:");
1652 for (i = 0; i < DEBUGGING_HISTORY; i++) {
1654 entry->idx = entry->idx % DEBUGGING_HISTORY;
1655 if (entry->debugging[entry->idx][0] == '\0')
1657 LOG_ERROR("%d:%d) %s", i, entry->idx,
1658 entry->debugging[entry->idx]);