2 * Copyright (C) 2015, SUSE
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
18 #include "md-cluster.h"
21 #define NEW_DEV_TIMEOUT 5000
23 struct dlm_lock_resource {
26 char *name; /* lock name. */
27 uint32_t flags; /* flags to pass to dlm_lock() */
28 struct completion completion; /* completion for synchronized locking */
29 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
30 struct mddev *mddev; /* pointing back to mddev. */
37 struct list_head list;
45 /* md_cluster_info flags */
46 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
49 struct md_cluster_info {
50 /* dlm lock space and resources for clustered raid. */
51 dlm_lockspace_t *lockspace;
53 struct completion completion;
54 struct dlm_lock_resource *sb_lock;
55 struct mutex sb_mutex;
56 struct dlm_lock_resource *bitmap_lockres;
57 struct list_head suspend_list;
58 spinlock_t suspend_lock;
59 struct md_thread *recovery_thread;
60 unsigned long recovery_map;
61 /* communication loc resources */
62 struct dlm_lock_resource *ack_lockres;
63 struct dlm_lock_resource *message_lockres;
64 struct dlm_lock_resource *token_lockres;
65 struct dlm_lock_resource *no_new_dev_lockres;
66 struct md_thread *recv_thread;
67 struct completion newdisk_completion;
80 /* TODO: Unionize this for smaller footprint */
87 static void sync_ast(void *arg)
89 struct dlm_lock_resource *res;
91 res = (struct dlm_lock_resource *) arg;
92 complete(&res->completion);
95 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
99 init_completion(&res->completion);
100 ret = dlm_lock(res->ls, mode, &res->lksb,
101 res->flags, res->name, strlen(res->name),
102 0, sync_ast, res, res->bast);
105 wait_for_completion(&res->completion);
106 return res->lksb.sb_status;
109 static int dlm_unlock_sync(struct dlm_lock_resource *res)
111 return dlm_lock_sync(res, DLM_LOCK_NL);
114 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
115 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
117 struct dlm_lock_resource *res = NULL;
119 struct md_cluster_info *cinfo = mddev->cluster_info;
121 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
124 res->ls = cinfo->lockspace;
126 namelen = strlen(name);
127 res->name = kzalloc(namelen + 1, GFP_KERNEL);
129 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
132 strlcpy(res->name, name, namelen + 1);
134 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
135 if (!res->lksb.sb_lvbptr) {
136 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
139 res->flags = DLM_LKF_VALBLK;
145 res->flags |= DLM_LKF_EXPEDITE;
147 ret = dlm_lock_sync(res, DLM_LOCK_NL);
149 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
152 res->flags &= ~DLM_LKF_EXPEDITE;
153 res->flags |= DLM_LKF_CONVERT;
157 kfree(res->lksb.sb_lvbptr);
163 static void lockres_free(struct dlm_lock_resource *res)
168 init_completion(&res->completion);
169 dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
170 wait_for_completion(&res->completion);
173 kfree(res->lksb.sb_lvbptr);
177 static char *pretty_uuid(char *dest, char *src)
181 for (i = 0; i < 16; i++) {
182 if (i == 4 || i == 6 || i == 8 || i == 10)
183 len += sprintf(dest + len, "-");
184 len += sprintf(dest + len, "%02x", (__u8)src[i]);
189 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
190 sector_t lo, sector_t hi)
192 struct resync_info *ri;
194 ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
195 ri->lo = cpu_to_le64(lo);
196 ri->hi = cpu_to_le64(hi);
199 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
201 struct resync_info ri;
202 struct suspend_info *s = NULL;
205 dlm_lock_sync(lockres, DLM_LOCK_CR);
206 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
207 hi = le64_to_cpu(ri.hi);
209 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
213 s->lo = le64_to_cpu(ri.lo);
215 dlm_unlock_sync(lockres);
220 static void recover_bitmaps(struct md_thread *thread)
222 struct mddev *mddev = thread->mddev;
223 struct md_cluster_info *cinfo = mddev->cluster_info;
224 struct dlm_lock_resource *bm_lockres;
227 struct suspend_info *s, *tmp;
230 while (cinfo->recovery_map) {
231 slot = fls64((u64)cinfo->recovery_map) - 1;
233 /* Clear suspend_area associated with the bitmap */
234 spin_lock_irq(&cinfo->suspend_lock);
235 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
236 if (slot == s->slot) {
240 spin_unlock_irq(&cinfo->suspend_lock);
242 snprintf(str, 64, "bitmap%04d", slot);
243 bm_lockres = lockres_init(mddev, str, NULL, 1);
245 pr_err("md-cluster: Cannot initialize bitmaps\n");
249 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
251 pr_err("md-cluster: Could not DLM lock %s: %d\n",
255 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi);
257 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
261 /* TODO:Wait for current resync to get over */
262 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
263 if (lo < mddev->recovery_cp)
264 mddev->recovery_cp = lo;
265 md_check_recovery(mddev);
268 dlm_unlock_sync(bm_lockres);
270 clear_bit(slot, &cinfo->recovery_map);
274 static void recover_prep(void *arg)
278 static void recover_slot(void *arg, struct dlm_slot *slot)
280 struct mddev *mddev = arg;
281 struct md_cluster_info *cinfo = mddev->cluster_info;
283 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
284 mddev->bitmap_info.cluster_name,
285 slot->nodeid, slot->slot,
287 set_bit(slot->slot - 1, &cinfo->recovery_map);
288 if (!cinfo->recovery_thread) {
289 cinfo->recovery_thread = md_register_thread(recover_bitmaps,
291 if (!cinfo->recovery_thread) {
292 pr_warn("md-cluster: Could not create recovery thread\n");
296 md_wakeup_thread(cinfo->recovery_thread);
299 static void recover_done(void *arg, struct dlm_slot *slots,
300 int num_slots, int our_slot,
303 struct mddev *mddev = arg;
304 struct md_cluster_info *cinfo = mddev->cluster_info;
306 cinfo->slot_number = our_slot;
307 complete(&cinfo->completion);
310 static const struct dlm_lockspace_ops md_ls_ops = {
311 .recover_prep = recover_prep,
312 .recover_slot = recover_slot,
313 .recover_done = recover_done,
317 * The BAST function for the ack lock resource
318 * This function wakes up the receive thread in
319 * order to receive and process the message.
321 static void ack_bast(void *arg, int mode)
323 struct dlm_lock_resource *res = (struct dlm_lock_resource *)arg;
324 struct md_cluster_info *cinfo = res->mddev->cluster_info;
326 if (mode == DLM_LOCK_EX)
327 md_wakeup_thread(cinfo->recv_thread);
330 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
332 struct suspend_info *s, *tmp;
334 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
335 if (slot == s->slot) {
336 pr_info("%s:%d Deleting suspend_info: %d\n",
337 __func__, __LINE__, slot);
344 static void remove_suspend_info(struct md_cluster_info *cinfo, int slot)
346 spin_lock_irq(&cinfo->suspend_lock);
347 __remove_suspend_info(cinfo, slot);
348 spin_unlock_irq(&cinfo->suspend_lock);
352 static void process_suspend_info(struct md_cluster_info *cinfo,
353 int slot, sector_t lo, sector_t hi)
355 struct suspend_info *s;
358 remove_suspend_info(cinfo, slot);
361 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
367 spin_lock_irq(&cinfo->suspend_lock);
368 /* Remove existing entry (if exists) before adding */
369 __remove_suspend_info(cinfo, slot);
370 list_add(&s->list, &cinfo->suspend_list);
371 spin_unlock_irq(&cinfo->suspend_lock);
374 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
377 struct md_cluster_info *cinfo = mddev->cluster_info;
378 char event_name[] = "EVENT=ADD_DEVICE";
380 char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
383 len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
384 pretty_uuid(disk_uuid + len, cmsg->uuid);
385 snprintf(raid_slot, 16, "RAID_DISK=%d", cmsg->raid_slot);
386 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
387 init_completion(&cinfo->newdisk_completion);
388 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
389 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
390 wait_for_completion_timeout(&cinfo->newdisk_completion,
392 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
396 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
398 struct md_cluster_info *cinfo = mddev->cluster_info;
401 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
404 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
407 case METADATA_UPDATED:
408 pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
409 __func__, __LINE__, msg->slot);
410 process_metadata_update(mddev, msg);
413 pr_info("%s: %d Received message: RESYNCING from %d\n",
414 __func__, __LINE__, msg->slot);
415 process_suspend_info(mddev->cluster_info, msg->slot,
416 msg->low, msg->high);
419 pr_info("%s: %d Received message: NEWDISK from %d\n",
420 __func__, __LINE__, msg->slot);
421 process_add_new_disk(mddev, msg);
426 * thread for receiving message
428 static void recv_daemon(struct md_thread *thread)
430 struct md_cluster_info *cinfo = thread->mddev->cluster_info;
431 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
432 struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
433 struct cluster_msg msg;
435 /*get CR on Message*/
436 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
437 pr_err("md/raid1:failed to get CR on MESSAGE\n");
441 /* read lvb and wake up thread to process this message_lockres */
442 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
443 process_recvd_msg(thread->mddev, &msg);
445 /*release CR on ack_lockres*/
446 dlm_unlock_sync(ack_lockres);
447 /*up-convert to EX on message_lockres*/
448 dlm_lock_sync(message_lockres, DLM_LOCK_EX);
449 /*get CR on ack_lockres again*/
450 dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
451 /*release CR on message_lockres*/
452 dlm_unlock_sync(message_lockres);
456 * Takes the lock on the TOKEN lock resource so no other
457 * node can communicate while the operation is underway.
459 static int lock_comm(struct md_cluster_info *cinfo)
463 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
465 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
466 __func__, __LINE__, error);
470 static void unlock_comm(struct md_cluster_info *cinfo)
472 dlm_unlock_sync(cinfo->token_lockres);
476 * This function performs the actual sending of the message. This function is
477 * usually called after performing the encompassing operation
479 * 1. Grabs the message lockresource in EX mode
480 * 2. Copies the message to the message LVB
481 * 3. Downconverts message lockresource to CR
482 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
483 * and the other nodes read the message. The thread will wait here until all other
484 * nodes have released ack lock resource.
485 * 5. Downconvert ack lockresource to CR
487 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
490 int slot = cinfo->slot_number - 1;
492 cmsg->slot = cpu_to_le32(slot);
493 /*get EX on Message*/
494 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
496 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
500 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
501 sizeof(struct cluster_msg));
502 /*down-convert EX to CR on Message*/
503 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CR);
505 pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n",
510 /*up-convert CR to EX on Ack*/
511 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
513 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
518 /*down-convert EX to CR on Ack*/
519 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
521 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
527 dlm_unlock_sync(cinfo->message_lockres);
532 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
537 ret = __sendmsg(cinfo, cmsg);
542 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
544 struct md_cluster_info *cinfo = mddev->cluster_info;
546 struct dlm_lock_resource *bm_lockres;
547 struct suspend_info *s;
551 for (i = 0; i < total_slots; i++) {
552 memset(str, '\0', 64);
553 snprintf(str, 64, "bitmap%04d", i);
554 bm_lockres = lockres_init(mddev, str, NULL, 1);
557 if (i == (cinfo->slot_number - 1))
560 bm_lockres->flags |= DLM_LKF_NOQUEUE;
561 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
562 if (ret == -EAGAIN) {
563 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
564 s = read_resync_info(mddev, bm_lockres);
566 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
568 (unsigned long long) s->lo,
569 (unsigned long long) s->hi, i);
570 spin_lock_irq(&cinfo->suspend_lock);
572 list_add(&s->list, &cinfo->suspend_list);
573 spin_unlock_irq(&cinfo->suspend_lock);
576 lockres_free(bm_lockres);
581 /* TODO: Read the disk bitmap sb and check if it needs recovery */
582 dlm_unlock_sync(bm_lockres);
583 lockres_free(bm_lockres);
589 static int join(struct mddev *mddev, int nodes)
591 struct md_cluster_info *cinfo;
595 if (!try_module_get(THIS_MODULE))
598 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
602 init_completion(&cinfo->completion);
604 mutex_init(&cinfo->sb_mutex);
605 mddev->cluster_info = cinfo;
608 pretty_uuid(str, mddev->uuid);
609 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
610 DLM_LSFL_FS, LVB_SIZE,
611 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
614 wait_for_completion(&cinfo->completion);
615 if (nodes < cinfo->slot_number) {
616 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
617 cinfo->slot_number, nodes);
621 cinfo->sb_lock = lockres_init(mddev, "cmd-super",
623 if (!cinfo->sb_lock) {
627 /* Initiate the communication resources */
629 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
630 if (!cinfo->recv_thread) {
631 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
634 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
635 if (!cinfo->message_lockres)
637 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
638 if (!cinfo->token_lockres)
640 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
641 if (!cinfo->ack_lockres)
643 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
644 if (!cinfo->no_new_dev_lockres)
647 /* get sync CR lock on ACK. */
648 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
649 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
651 /* get sync CR lock on no-new-dev. */
652 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
653 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
656 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
657 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
658 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
659 if (!cinfo->bitmap_lockres)
661 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
662 pr_err("Failed to get bitmap lock\n");
667 INIT_LIST_HEAD(&cinfo->suspend_list);
668 spin_lock_init(&cinfo->suspend_lock);
670 ret = gather_all_resync_info(mddev, nodes);
676 lockres_free(cinfo->message_lockres);
677 lockres_free(cinfo->token_lockres);
678 lockres_free(cinfo->ack_lockres);
679 lockres_free(cinfo->no_new_dev_lockres);
680 lockres_free(cinfo->bitmap_lockres);
681 lockres_free(cinfo->sb_lock);
682 if (cinfo->lockspace)
683 dlm_release_lockspace(cinfo->lockspace, 2);
684 mddev->cluster_info = NULL;
686 module_put(THIS_MODULE);
690 static int leave(struct mddev *mddev)
692 struct md_cluster_info *cinfo = mddev->cluster_info;
696 md_unregister_thread(&cinfo->recovery_thread);
697 md_unregister_thread(&cinfo->recv_thread);
698 lockres_free(cinfo->message_lockres);
699 lockres_free(cinfo->token_lockres);
700 lockres_free(cinfo->ack_lockres);
701 lockres_free(cinfo->no_new_dev_lockres);
702 lockres_free(cinfo->sb_lock);
703 lockres_free(cinfo->bitmap_lockres);
704 dlm_release_lockspace(cinfo->lockspace, 2);
708 /* slot_number(): Returns the MD slot number to use
709 * DLM starts the slot numbers from 1, wheras cluster-md
710 * wants the number to be from zero, so we deduct one
712 static int slot_number(struct mddev *mddev)
714 struct md_cluster_info *cinfo = mddev->cluster_info;
716 return cinfo->slot_number - 1;
719 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
721 struct md_cluster_info *cinfo = mddev->cluster_info;
723 add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
724 /* Re-acquire the lock to refresh LVB */
725 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
728 static int metadata_update_start(struct mddev *mddev)
730 return lock_comm(mddev->cluster_info);
733 static int metadata_update_finish(struct mddev *mddev)
735 struct md_cluster_info *cinfo = mddev->cluster_info;
736 struct cluster_msg cmsg;
739 memset(&cmsg, 0, sizeof(cmsg));
740 cmsg.type = cpu_to_le32(METADATA_UPDATED);
741 ret = __sendmsg(cinfo, &cmsg);
746 static int metadata_update_cancel(struct mddev *mddev)
748 struct md_cluster_info *cinfo = mddev->cluster_info;
750 return dlm_unlock_sync(cinfo->token_lockres);
753 static int resync_send(struct mddev *mddev, enum msg_type type,
754 sector_t lo, sector_t hi)
756 struct md_cluster_info *cinfo = mddev->cluster_info;
757 struct cluster_msg cmsg;
758 int slot = cinfo->slot_number - 1;
760 pr_info("%s:%d lo: %llu hi: %llu\n", __func__, __LINE__,
761 (unsigned long long)lo,
762 (unsigned long long)hi);
763 resync_info_update(mddev, lo, hi);
764 cmsg.type = cpu_to_le32(type);
765 cmsg.slot = cpu_to_le32(slot);
766 cmsg.low = cpu_to_le64(lo);
767 cmsg.high = cpu_to_le64(hi);
768 return sendmsg(cinfo, &cmsg);
771 static int resync_start(struct mddev *mddev, sector_t lo, sector_t hi)
773 pr_info("%s:%d\n", __func__, __LINE__);
774 return resync_send(mddev, RESYNCING, lo, hi);
777 static void resync_finish(struct mddev *mddev)
779 pr_info("%s:%d\n", __func__, __LINE__);
780 resync_send(mddev, RESYNCING, 0, 0);
783 static int area_resyncing(struct mddev *mddev, sector_t lo, sector_t hi)
785 struct md_cluster_info *cinfo = mddev->cluster_info;
787 struct suspend_info *s;
789 spin_lock_irq(&cinfo->suspend_lock);
790 if (list_empty(&cinfo->suspend_list))
792 list_for_each_entry(s, &cinfo->suspend_list, list)
793 if (hi > s->lo && lo < s->hi) {
798 spin_unlock_irq(&cinfo->suspend_lock);
802 static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev)
804 struct md_cluster_info *cinfo = mddev->cluster_info;
805 struct cluster_msg cmsg;
807 struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
808 char *uuid = sb->device_uuid;
810 memset(&cmsg, 0, sizeof(cmsg));
811 cmsg.type = cpu_to_le32(NEWDISK);
812 memcpy(cmsg.uuid, uuid, 16);
813 cmsg.raid_slot = rdev->desc_nr;
815 ret = __sendmsg(cinfo, &cmsg);
818 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
819 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
820 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
821 /* Some node does not "see" the device */
825 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
829 static int add_new_disk_finish(struct mddev *mddev)
831 struct cluster_msg cmsg;
832 struct md_cluster_info *cinfo = mddev->cluster_info;
834 /* Write sb and inform others */
835 md_update_sb(mddev, 1);
836 cmsg.type = METADATA_UPDATED;
837 ret = __sendmsg(cinfo, &cmsg);
842 static int new_disk_ack(struct mddev *mddev, bool ack)
844 struct md_cluster_info *cinfo = mddev->cluster_info;
846 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
847 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
852 dlm_unlock_sync(cinfo->no_new_dev_lockres);
853 complete(&cinfo->newdisk_completion);
857 static struct md_cluster_operations cluster_ops = {
860 .slot_number = slot_number,
861 .resync_info_update = resync_info_update,
862 .resync_start = resync_start,
863 .resync_finish = resync_finish,
864 .metadata_update_start = metadata_update_start,
865 .metadata_update_finish = metadata_update_finish,
866 .metadata_update_cancel = metadata_update_cancel,
867 .area_resyncing = area_resyncing,
868 .add_new_disk_start = add_new_disk_start,
869 .add_new_disk_finish = add_new_disk_finish,
870 .new_disk_ack = new_disk_ack,
873 static int __init cluster_init(void)
875 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
876 pr_info("Registering Cluster MD functions\n");
877 register_md_cluster_operations(&cluster_ops, THIS_MODULE);
881 static void cluster_exit(void)
883 unregister_md_cluster_operations();
886 module_init(cluster_init);
887 module_exit(cluster_exit);
888 MODULE_LICENSE("GPL");
889 MODULE_DESCRIPTION("Clustering support for MD");