1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
24 #include "requestqueue.h"
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32 static struct task_struct * scand_task;
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
38 int n = simple_strtol(buf, NULL, 0);
40 ls = dlm_find_lockspace_local(ls->ls_local_handle);
54 dlm_put_lockspace(ls);
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 wake_up(&ls->ls_uevent_wait);
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
77 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
79 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
82 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
84 int val = simple_strtoul(buf, NULL, 0);
86 set_bit(LSFL_NODIR, &ls->ls_flags);
90 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
92 uint32_t status = dlm_recover_status(ls);
93 return snprintf(buf, PAGE_SIZE, "%x\n", status);
96 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
98 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
102 struct attribute attr;
103 ssize_t (*show)(struct dlm_ls *, char *);
104 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
107 static struct dlm_attr dlm_attr_control = {
108 .attr = {.name = "control", .mode = S_IWUSR},
109 .store = dlm_control_store
112 static struct dlm_attr dlm_attr_event = {
113 .attr = {.name = "event_done", .mode = S_IWUSR},
114 .store = dlm_event_store
117 static struct dlm_attr dlm_attr_id = {
118 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
120 .store = dlm_id_store
123 static struct dlm_attr dlm_attr_nodir = {
124 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125 .show = dlm_nodir_show,
126 .store = dlm_nodir_store
129 static struct dlm_attr dlm_attr_recover_status = {
130 .attr = {.name = "recover_status", .mode = S_IRUGO},
131 .show = dlm_recover_status_show
134 static struct dlm_attr dlm_attr_recover_nodeid = {
135 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
136 .show = dlm_recover_nodeid_show
139 static struct attribute *dlm_attrs[] = {
140 &dlm_attr_control.attr,
141 &dlm_attr_event.attr,
143 &dlm_attr_nodir.attr,
144 &dlm_attr_recover_status.attr,
145 &dlm_attr_recover_nodeid.attr,
149 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
152 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
153 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154 return a->show ? a->show(ls, buf) : 0;
157 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158 const char *buf, size_t len)
160 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
161 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162 return a->store ? a->store(ls, buf, len) : len;
165 static void lockspace_kobj_release(struct kobject *k)
167 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
171 static const struct sysfs_ops dlm_attr_ops = {
172 .show = dlm_attr_show,
173 .store = dlm_attr_store,
176 static struct kobj_type dlm_ktype = {
177 .default_attrs = dlm_attrs,
178 .sysfs_ops = &dlm_attr_ops,
179 .release = lockspace_kobj_release,
182 static struct kset *dlm_kset;
184 static int do_uevent(struct dlm_ls *ls, int in)
189 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
191 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
193 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
195 /* dlm_controld will see the uevent, do the necessary group management
196 and then write to sysfs to wake us */
198 error = wait_event_interruptible(ls->ls_uevent_wait,
199 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
201 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
206 error = ls->ls_uevent_result;
209 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210 error, ls->ls_uevent_result);
214 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215 struct kobj_uevent_env *env)
217 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
219 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223 static struct kset_uevent_ops dlm_uevent_ops = {
224 .uevent = dlm_uevent,
227 int __init dlm_lockspace_init(void)
230 mutex_init(&ls_lock);
231 INIT_LIST_HEAD(&lslist);
232 spin_lock_init(&lslist_lock);
234 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
236 printk(KERN_WARNING "%s: can not create kset\n", __func__);
242 void dlm_lockspace_exit(void)
244 kset_unregister(dlm_kset);
247 static struct dlm_ls *find_ls_to_scan(void)
251 spin_lock(&lslist_lock);
252 list_for_each_entry(ls, &lslist, ls_list) {
253 if (time_after_eq(jiffies, ls->ls_scan_time +
254 dlm_config.ci_scan_secs * HZ)) {
255 spin_unlock(&lslist_lock);
259 spin_unlock(&lslist_lock);
263 static int dlm_scand(void *data)
267 while (!kthread_should_stop()) {
268 ls = find_ls_to_scan();
270 if (dlm_lock_recovery_try(ls)) {
271 ls->ls_scan_time = jiffies;
273 dlm_scan_timeout(ls);
274 dlm_scan_waiters(ls);
275 dlm_unlock_recovery(ls);
277 ls->ls_scan_time += HZ;
281 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286 static int dlm_scand_start(void)
288 struct task_struct *p;
291 p = kthread_run(dlm_scand, NULL, "dlm_scand");
299 static void dlm_scand_stop(void)
301 kthread_stop(scand_task);
304 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
308 spin_lock(&lslist_lock);
310 list_for_each_entry(ls, &lslist, ls_list) {
311 if (ls->ls_global_id == id) {
318 spin_unlock(&lslist_lock);
322 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
326 spin_lock(&lslist_lock);
327 list_for_each_entry(ls, &lslist, ls_list) {
328 if (ls->ls_local_handle == lockspace) {
335 spin_unlock(&lslist_lock);
339 struct dlm_ls *dlm_find_lockspace_device(int minor)
343 spin_lock(&lslist_lock);
344 list_for_each_entry(ls, &lslist, ls_list) {
345 if (ls->ls_device.minor == minor) {
352 spin_unlock(&lslist_lock);
356 void dlm_put_lockspace(struct dlm_ls *ls)
358 spin_lock(&lslist_lock);
360 spin_unlock(&lslist_lock);
363 static void remove_lockspace(struct dlm_ls *ls)
366 spin_lock(&lslist_lock);
367 if (ls->ls_count == 0) {
368 WARN_ON(ls->ls_create_count != 0);
369 list_del(&ls->ls_list);
370 spin_unlock(&lslist_lock);
373 spin_unlock(&lslist_lock);
378 static int threads_start(void)
382 error = dlm_scand_start();
384 log_print("cannot start dlm_scand thread %d", error);
388 /* Thread for sending/receiving messages for all lockspace's */
389 error = dlm_lowcomms_start();
391 log_print("cannot start dlm lowcomms %d", error);
403 static void threads_stop(void)
409 static int new_lockspace(const char *name, const char *cluster,
410 uint32_t flags, int lvblen,
411 const struct dlm_lockspace_ops *ops, void *ops_arg,
412 int *ops_result, dlm_lockspace_t **lockspace)
417 int namelen = strlen(name);
419 if (namelen > DLM_LOCKSPACE_LEN)
422 if (!lvblen || (lvblen % 8))
425 if (!try_module_get(THIS_MODULE))
428 if (!dlm_user_daemon_available()) {
429 log_print("dlm user daemon not available");
434 if (ops && ops_result) {
435 if (!dlm_config.ci_recover_callbacks)
436 *ops_result = -EOPNOTSUPP;
441 if (dlm_config.ci_recover_callbacks && cluster &&
442 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443 log_print("dlm cluster name %s mismatch %s",
444 dlm_config.ci_cluster_name, cluster);
451 spin_lock(&lslist_lock);
452 list_for_each_entry(ls, &lslist, ls_list) {
453 WARN_ON(ls->ls_create_count <= 0);
454 if (ls->ls_namelen != namelen)
456 if (memcmp(ls->ls_name, name, namelen))
458 if (flags & DLM_LSFL_NEWEXCL) {
462 ls->ls_create_count++;
467 spin_unlock(&lslist_lock);
474 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
477 memcpy(ls->ls_name, name, namelen);
478 ls->ls_namelen = namelen;
479 ls->ls_lvblen = lvblen;
482 ls->ls_scan_time = jiffies;
484 if (ops && dlm_config.ci_recover_callbacks) {
486 ls->ls_ops_arg = ops_arg;
489 if (flags & DLM_LSFL_TIMEWARN)
490 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
492 /* ls_exflags are forced to match among nodes, and we don't
493 need to require all nodes to have some flags set */
494 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
497 size = dlm_config.ci_rsbtbl_size;
498 ls->ls_rsbtbl_size = size;
500 ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
503 for (i = 0; i < size; i++) {
504 ls->ls_rsbtbl[i].keep.rb_node = NULL;
505 ls->ls_rsbtbl[i].toss.rb_node = NULL;
506 spin_lock_init(&ls->ls_rsbtbl[i].lock);
509 spin_lock_init(&ls->ls_remove_spin);
511 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
512 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
514 if (!ls->ls_remove_names[i])
518 idr_init(&ls->ls_lkbidr);
519 spin_lock_init(&ls->ls_lkbidr_spin);
521 INIT_LIST_HEAD(&ls->ls_waiters);
522 mutex_init(&ls->ls_waiters_mutex);
523 INIT_LIST_HEAD(&ls->ls_orphans);
524 mutex_init(&ls->ls_orphans_mutex);
525 INIT_LIST_HEAD(&ls->ls_timeout);
526 mutex_init(&ls->ls_timeout_mutex);
528 INIT_LIST_HEAD(&ls->ls_new_rsb);
529 spin_lock_init(&ls->ls_new_rsb_spin);
531 INIT_LIST_HEAD(&ls->ls_nodes);
532 INIT_LIST_HEAD(&ls->ls_nodes_gone);
533 ls->ls_num_nodes = 0;
534 ls->ls_low_nodeid = 0;
535 ls->ls_total_weight = 0;
536 ls->ls_node_array = NULL;
538 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
539 ls->ls_stub_rsb.res_ls = ls;
541 ls->ls_debug_rsb_dentry = NULL;
542 ls->ls_debug_waiters_dentry = NULL;
544 init_waitqueue_head(&ls->ls_uevent_wait);
545 ls->ls_uevent_result = 0;
546 init_completion(&ls->ls_members_done);
547 ls->ls_members_result = -1;
549 mutex_init(&ls->ls_cb_mutex);
550 INIT_LIST_HEAD(&ls->ls_cb_delay);
552 ls->ls_recoverd_task = NULL;
553 mutex_init(&ls->ls_recoverd_active);
554 spin_lock_init(&ls->ls_recover_lock);
555 spin_lock_init(&ls->ls_rcom_spin);
556 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
557 ls->ls_recover_status = 0;
558 ls->ls_recover_seq = 0;
559 ls->ls_recover_args = NULL;
560 init_rwsem(&ls->ls_in_recovery);
561 init_rwsem(&ls->ls_recv_active);
562 INIT_LIST_HEAD(&ls->ls_requestqueue);
563 mutex_init(&ls->ls_requestqueue_mutex);
564 mutex_init(&ls->ls_clear_proc_locks);
566 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
567 if (!ls->ls_recover_buf)
571 ls->ls_num_slots = 0;
572 ls->ls_slots_size = 0;
575 INIT_LIST_HEAD(&ls->ls_recover_list);
576 spin_lock_init(&ls->ls_recover_list_lock);
577 idr_init(&ls->ls_recover_idr);
578 spin_lock_init(&ls->ls_recover_idr_lock);
579 ls->ls_recover_list_count = 0;
580 ls->ls_local_handle = ls;
581 init_waitqueue_head(&ls->ls_wait_general);
582 INIT_LIST_HEAD(&ls->ls_root_list);
583 init_rwsem(&ls->ls_root_sem);
585 spin_lock(&lslist_lock);
586 ls->ls_create_count = 1;
587 list_add(&ls->ls_list, &lslist);
588 spin_unlock(&lslist_lock);
590 if (flags & DLM_LSFL_FS) {
591 error = dlm_callback_start(ls);
593 log_error(ls, "can't start dlm_callback %d", error);
598 init_waitqueue_head(&ls->ls_recover_lock_wait);
601 * Once started, dlm_recoverd first looks for ls in lslist, then
602 * initializes ls_in_recovery as locked in "down" mode. We need
603 * to wait for the wakeup from dlm_recoverd because in_recovery
604 * has to start out in down mode.
607 error = dlm_recoverd_start(ls);
609 log_error(ls, "can't start dlm_recoverd %d", error);
613 wait_event(ls->ls_recover_lock_wait,
614 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
616 ls->ls_kobj.kset = dlm_kset;
617 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
621 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
623 /* let kobject handle freeing of ls if there's an error */
626 /* This uevent triggers dlm_controld in userspace to add us to the
627 group of nodes that are members of this lockspace (managed by the
628 cluster infrastructure.) Once it's done that, it tells us who the
629 current lockspace members are (via configfs) and then tells the
630 lockspace to start running (via sysfs) in dlm_ls_start(). */
632 error = do_uevent(ls, 1);
636 wait_for_completion(&ls->ls_members_done);
637 error = ls->ls_members_result;
641 dlm_create_debug_file(ls);
643 log_debug(ls, "join complete");
649 dlm_clear_members(ls);
650 kfree(ls->ls_node_array);
652 dlm_recoverd_stop(ls);
654 dlm_callback_stop(ls);
656 spin_lock(&lslist_lock);
657 list_del(&ls->ls_list);
658 spin_unlock(&lslist_lock);
659 idr_destroy(&ls->ls_recover_idr);
660 kfree(ls->ls_recover_buf);
662 idr_destroy(&ls->ls_lkbidr);
663 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
664 if (ls->ls_remove_names[i])
665 kfree(ls->ls_remove_names[i]);
668 vfree(ls->ls_rsbtbl);
671 kobject_put(&ls->ls_kobj);
675 module_put(THIS_MODULE);
679 int dlm_new_lockspace(const char *name, const char *cluster,
680 uint32_t flags, int lvblen,
681 const struct dlm_lockspace_ops *ops, void *ops_arg,
682 int *ops_result, dlm_lockspace_t **lockspace)
686 mutex_lock(&ls_lock);
688 error = threads_start();
692 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
693 ops_result, lockspace);
701 mutex_unlock(&ls_lock);
705 static int lkb_idr_is_local(int id, void *p, void *data)
707 struct dlm_lkb *lkb = p;
709 if (!lkb->lkb_nodeid)
714 static int lkb_idr_is_any(int id, void *p, void *data)
719 static int lkb_idr_free(int id, void *p, void *data)
721 struct dlm_lkb *lkb = p;
723 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
724 dlm_free_lvb(lkb->lkb_lvbptr);
730 /* NOTE: We check the lkbidr here rather than the resource table.
731 This is because there may be LKBs queued as ASTs that have been unlinked
732 from their RSBs and are pending deletion once the AST has been delivered */
734 static int lockspace_busy(struct dlm_ls *ls, int force)
738 spin_lock(&ls->ls_lkbidr_spin);
740 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
741 } else if (force == 1) {
742 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
746 spin_unlock(&ls->ls_lkbidr_spin);
750 static int release_lockspace(struct dlm_ls *ls, int force)
756 busy = lockspace_busy(ls, force);
758 spin_lock(&lslist_lock);
759 if (ls->ls_create_count == 1) {
763 /* remove_lockspace takes ls off lslist */
764 ls->ls_create_count = 0;
767 } else if (ls->ls_create_count > 1) {
768 rv = --ls->ls_create_count;
772 spin_unlock(&lslist_lock);
775 log_debug(ls, "release_lockspace no remove %d", rv);
779 dlm_device_deregister(ls);
781 if (force < 3 && dlm_user_daemon_available())
784 dlm_recoverd_stop(ls);
786 dlm_callback_stop(ls);
788 remove_lockspace(ls);
790 dlm_delete_debug_file(ls);
792 kfree(ls->ls_recover_buf);
795 * Free all lkb's in idr
798 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
799 idr_destroy(&ls->ls_lkbidr);
802 * Free all rsb's on rsbtbl[] lists
805 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
806 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
807 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
808 rb_erase(n, &ls->ls_rsbtbl[i].keep);
812 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
813 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
814 rb_erase(n, &ls->ls_rsbtbl[i].toss);
819 vfree(ls->ls_rsbtbl);
821 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
822 kfree(ls->ls_remove_names[i]);
824 while (!list_empty(&ls->ls_new_rsb)) {
825 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
827 list_del(&rsb->res_hashchain);
832 * Free structures on any other lists
835 dlm_purge_requestqueue(ls);
836 kfree(ls->ls_recover_args);
837 dlm_clear_members(ls);
838 dlm_clear_members_gone(ls);
839 kfree(ls->ls_node_array);
840 log_debug(ls, "release_lockspace final free");
841 kobject_put(&ls->ls_kobj);
842 /* The ls structure will be freed when the kobject is done with */
844 module_put(THIS_MODULE);
849 * Called when a system has released all its locks and is not going to use the
850 * lockspace any longer. We free everything we're managing for this lockspace.
851 * Remaining nodes will go through the recovery process as if we'd died. The
852 * lockspace must continue to function as usual, participating in recoveries,
853 * until this returns.
855 * Force has 4 possible values:
856 * 0 - don't destroy locksapce if it has any LKBs
857 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
858 * 2 - destroy lockspace regardless of LKBs
859 * 3 - destroy lockspace as part of a forced shutdown
862 int dlm_release_lockspace(void *lockspace, int force)
867 ls = dlm_find_lockspace_local(lockspace);
870 dlm_put_lockspace(ls);
872 mutex_lock(&ls_lock);
873 error = release_lockspace(ls, force);
878 mutex_unlock(&ls_lock);
883 void dlm_stop_lockspaces(void)
888 spin_lock(&lslist_lock);
889 list_for_each_entry(ls, &lslist, ls_list) {
890 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
892 spin_unlock(&lslist_lock);
893 log_error(ls, "no userland control daemon, stopping lockspace");
897 spin_unlock(&lslist_lock);