1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
12 #include <linux/module.h>
14 #include "dlm_internal.h"
15 #include "lockspace.h"
25 #include "requestqueue.h"
30 static struct mutex ls_lock;
31 static struct list_head lslist;
32 static spinlock_t lslist_lock;
33 static struct task_struct * scand_task;
36 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
40 int rc = kstrtoint(buf, 0, &n);
44 ls = dlm_find_lockspace_local(ls->ls_local_handle);
58 dlm_put_lockspace(ls);
62 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
64 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
68 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69 wake_up(&ls->ls_uevent_wait);
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
75 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
80 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
87 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
89 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
92 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
95 int rc = kstrtoint(buf, 0, &val);
100 set_bit(LSFL_NODIR, &ls->ls_flags);
104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
106 uint32_t status = dlm_recover_status(ls);
107 return snprintf(buf, PAGE_SIZE, "%x\n", status);
110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
112 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
116 struct attribute attr;
117 ssize_t (*show)(struct dlm_ls *, char *);
118 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
121 static struct dlm_attr dlm_attr_control = {
122 .attr = {.name = "control", .mode = S_IWUSR},
123 .store = dlm_control_store
126 static struct dlm_attr dlm_attr_event = {
127 .attr = {.name = "event_done", .mode = S_IWUSR},
128 .store = dlm_event_store
131 static struct dlm_attr dlm_attr_id = {
132 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
134 .store = dlm_id_store
137 static struct dlm_attr dlm_attr_nodir = {
138 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
139 .show = dlm_nodir_show,
140 .store = dlm_nodir_store
143 static struct dlm_attr dlm_attr_recover_status = {
144 .attr = {.name = "recover_status", .mode = S_IRUGO},
145 .show = dlm_recover_status_show
148 static struct dlm_attr dlm_attr_recover_nodeid = {
149 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
150 .show = dlm_recover_nodeid_show
153 static struct attribute *dlm_attrs[] = {
154 &dlm_attr_control.attr,
155 &dlm_attr_event.attr,
157 &dlm_attr_nodir.attr,
158 &dlm_attr_recover_status.attr,
159 &dlm_attr_recover_nodeid.attr,
162 ATTRIBUTE_GROUPS(dlm);
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
167 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
168 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169 return a->show ? a->show(ls, buf) : 0;
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173 const char *buf, size_t len)
175 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
176 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177 return a->store ? a->store(ls, buf, len) : len;
180 static void lockspace_kobj_release(struct kobject *k)
182 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
186 static const struct sysfs_ops dlm_attr_ops = {
187 .show = dlm_attr_show,
188 .store = dlm_attr_store,
191 static struct kobj_type dlm_ktype = {
192 .default_groups = dlm_groups,
193 .sysfs_ops = &dlm_attr_ops,
194 .release = lockspace_kobj_release,
197 static struct kset *dlm_kset;
199 static int do_uevent(struct dlm_ls *ls, int in)
202 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
204 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
206 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
208 /* dlm_controld will see the uevent, do the necessary group management
209 and then write to sysfs to wake us */
211 wait_event(ls->ls_uevent_wait,
212 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
214 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
216 return ls->ls_uevent_result;
219 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
221 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
223 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228 .uevent = dlm_uevent,
231 int __init dlm_lockspace_init(void)
234 mutex_init(&ls_lock);
235 INIT_LIST_HEAD(&lslist);
236 spin_lock_init(&lslist_lock);
238 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
240 printk(KERN_WARNING "%s: can not create kset\n", __func__);
246 void dlm_lockspace_exit(void)
248 kset_unregister(dlm_kset);
251 static struct dlm_ls *find_ls_to_scan(void)
255 spin_lock(&lslist_lock);
256 list_for_each_entry(ls, &lslist, ls_list) {
257 if (time_after_eq(jiffies, ls->ls_scan_time +
258 dlm_config.ci_scan_secs * HZ)) {
259 spin_unlock(&lslist_lock);
263 spin_unlock(&lslist_lock);
267 static int dlm_scand(void *data)
271 while (!kthread_should_stop()) {
272 ls = find_ls_to_scan();
274 if (dlm_lock_recovery_try(ls)) {
275 ls->ls_scan_time = jiffies;
277 dlm_scan_timeout(ls);
278 dlm_scan_waiters(ls);
279 dlm_unlock_recovery(ls);
281 ls->ls_scan_time += HZ;
285 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
290 static int dlm_scand_start(void)
292 struct task_struct *p;
295 p = kthread_run(dlm_scand, NULL, "dlm_scand");
303 static void dlm_scand_stop(void)
305 kthread_stop(scand_task);
308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
312 spin_lock(&lslist_lock);
314 list_for_each_entry(ls, &lslist, ls_list) {
315 if (ls->ls_global_id == id) {
316 atomic_inc(&ls->ls_count);
322 spin_unlock(&lslist_lock);
326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
330 spin_lock(&lslist_lock);
331 list_for_each_entry(ls, &lslist, ls_list) {
332 if (ls->ls_local_handle == lockspace) {
333 atomic_inc(&ls->ls_count);
339 spin_unlock(&lslist_lock);
343 struct dlm_ls *dlm_find_lockspace_device(int minor)
347 spin_lock(&lslist_lock);
348 list_for_each_entry(ls, &lslist, ls_list) {
349 if (ls->ls_device.minor == minor) {
350 atomic_inc(&ls->ls_count);
356 spin_unlock(&lslist_lock);
360 void dlm_put_lockspace(struct dlm_ls *ls)
362 if (atomic_dec_and_test(&ls->ls_count))
363 wake_up(&ls->ls_count_wait);
366 static void remove_lockspace(struct dlm_ls *ls)
369 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
371 spin_lock(&lslist_lock);
372 if (atomic_read(&ls->ls_count) != 0) {
373 spin_unlock(&lslist_lock);
377 WARN_ON(ls->ls_create_count != 0);
378 list_del(&ls->ls_list);
379 spin_unlock(&lslist_lock);
382 static int threads_start(void)
386 error = dlm_scand_start();
388 log_print("cannot start dlm_scand thread %d", error);
392 /* Thread for sending/receiving messages for all lockspace's */
393 error = dlm_midcomms_start();
395 log_print("cannot start dlm lowcomms %d", error);
407 static int new_lockspace(const char *name, const char *cluster,
408 uint32_t flags, int lvblen,
409 const struct dlm_lockspace_ops *ops, void *ops_arg,
410 int *ops_result, dlm_lockspace_t **lockspace)
415 int namelen = strlen(name);
417 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
420 if (!lvblen || (lvblen % 8))
423 if (!try_module_get(THIS_MODULE))
426 if (!dlm_user_daemon_available()) {
427 log_print("dlm user daemon not available");
432 if (ops && ops_result) {
433 if (!dlm_config.ci_recover_callbacks)
434 *ops_result = -EOPNOTSUPP;
440 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
441 dlm_config.ci_cluster_name);
443 if (dlm_config.ci_recover_callbacks && cluster &&
444 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
445 log_print("dlm cluster name '%s' does not match "
446 "the application cluster name '%s'",
447 dlm_config.ci_cluster_name, cluster);
454 spin_lock(&lslist_lock);
455 list_for_each_entry(ls, &lslist, ls_list) {
456 WARN_ON(ls->ls_create_count <= 0);
457 if (ls->ls_namelen != namelen)
459 if (memcmp(ls->ls_name, name, namelen))
461 if (flags & DLM_LSFL_NEWEXCL) {
465 ls->ls_create_count++;
470 spin_unlock(&lslist_lock);
477 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
480 memcpy(ls->ls_name, name, namelen);
481 ls->ls_namelen = namelen;
482 ls->ls_lvblen = lvblen;
483 atomic_set(&ls->ls_count, 0);
484 init_waitqueue_head(&ls->ls_count_wait);
486 ls->ls_scan_time = jiffies;
488 if (ops && dlm_config.ci_recover_callbacks) {
490 ls->ls_ops_arg = ops_arg;
493 if (flags & DLM_LSFL_TIMEWARN)
494 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
496 /* ls_exflags are forced to match among nodes, and we don't
497 need to require all nodes to have some flags set */
498 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
501 size = READ_ONCE(dlm_config.ci_rsbtbl_size);
502 ls->ls_rsbtbl_size = size;
504 ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
507 for (i = 0; i < size; i++) {
508 ls->ls_rsbtbl[i].keep.rb_node = NULL;
509 ls->ls_rsbtbl[i].toss.rb_node = NULL;
510 spin_lock_init(&ls->ls_rsbtbl[i].lock);
513 spin_lock_init(&ls->ls_remove_spin);
514 init_waitqueue_head(&ls->ls_remove_wait);
516 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
517 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
519 if (!ls->ls_remove_names[i])
523 idr_init(&ls->ls_lkbidr);
524 spin_lock_init(&ls->ls_lkbidr_spin);
526 INIT_LIST_HEAD(&ls->ls_waiters);
527 mutex_init(&ls->ls_waiters_mutex);
528 INIT_LIST_HEAD(&ls->ls_orphans);
529 mutex_init(&ls->ls_orphans_mutex);
530 INIT_LIST_HEAD(&ls->ls_timeout);
531 mutex_init(&ls->ls_timeout_mutex);
533 INIT_LIST_HEAD(&ls->ls_new_rsb);
534 spin_lock_init(&ls->ls_new_rsb_spin);
536 INIT_LIST_HEAD(&ls->ls_nodes);
537 INIT_LIST_HEAD(&ls->ls_nodes_gone);
538 ls->ls_num_nodes = 0;
539 ls->ls_low_nodeid = 0;
540 ls->ls_total_weight = 0;
541 ls->ls_node_array = NULL;
543 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
544 ls->ls_stub_rsb.res_ls = ls;
546 ls->ls_debug_rsb_dentry = NULL;
547 ls->ls_debug_waiters_dentry = NULL;
549 init_waitqueue_head(&ls->ls_uevent_wait);
550 ls->ls_uevent_result = 0;
551 init_completion(&ls->ls_members_done);
552 ls->ls_members_result = -1;
554 mutex_init(&ls->ls_cb_mutex);
555 INIT_LIST_HEAD(&ls->ls_cb_delay);
557 ls->ls_recoverd_task = NULL;
558 mutex_init(&ls->ls_recoverd_active);
559 spin_lock_init(&ls->ls_recover_lock);
560 spin_lock_init(&ls->ls_rcom_spin);
561 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
562 ls->ls_recover_status = 0;
563 ls->ls_recover_seq = 0;
564 ls->ls_recover_args = NULL;
565 init_rwsem(&ls->ls_in_recovery);
566 init_rwsem(&ls->ls_recv_active);
567 INIT_LIST_HEAD(&ls->ls_requestqueue);
568 atomic_set(&ls->ls_requestqueue_cnt, 0);
569 init_waitqueue_head(&ls->ls_requestqueue_wait);
570 mutex_init(&ls->ls_requestqueue_mutex);
571 mutex_init(&ls->ls_clear_proc_locks);
573 /* Due backwards compatibility with 3.1 we need to use maximum
574 * possible dlm message size to be sure the message will fit and
575 * not having out of bounds issues. However on sending side 3.2
578 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
579 if (!ls->ls_recover_buf)
583 ls->ls_num_slots = 0;
584 ls->ls_slots_size = 0;
587 INIT_LIST_HEAD(&ls->ls_recover_list);
588 spin_lock_init(&ls->ls_recover_list_lock);
589 idr_init(&ls->ls_recover_idr);
590 spin_lock_init(&ls->ls_recover_idr_lock);
591 ls->ls_recover_list_count = 0;
592 ls->ls_local_handle = ls;
593 init_waitqueue_head(&ls->ls_wait_general);
594 INIT_LIST_HEAD(&ls->ls_root_list);
595 init_rwsem(&ls->ls_root_sem);
597 spin_lock(&lslist_lock);
598 ls->ls_create_count = 1;
599 list_add(&ls->ls_list, &lslist);
600 spin_unlock(&lslist_lock);
602 if (flags & DLM_LSFL_FS) {
603 error = dlm_callback_start(ls);
605 log_error(ls, "can't start dlm_callback %d", error);
610 init_waitqueue_head(&ls->ls_recover_lock_wait);
613 * Once started, dlm_recoverd first looks for ls in lslist, then
614 * initializes ls_in_recovery as locked in "down" mode. We need
615 * to wait for the wakeup from dlm_recoverd because in_recovery
616 * has to start out in down mode.
619 error = dlm_recoverd_start(ls);
621 log_error(ls, "can't start dlm_recoverd %d", error);
625 wait_event(ls->ls_recover_lock_wait,
626 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
628 /* let kobject handle freeing of ls if there's an error */
631 ls->ls_kobj.kset = dlm_kset;
632 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
636 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
638 /* This uevent triggers dlm_controld in userspace to add us to the
639 group of nodes that are members of this lockspace (managed by the
640 cluster infrastructure.) Once it's done that, it tells us who the
641 current lockspace members are (via configfs) and then tells the
642 lockspace to start running (via sysfs) in dlm_ls_start(). */
644 error = do_uevent(ls, 1);
648 wait_for_completion(&ls->ls_members_done);
649 error = ls->ls_members_result;
653 dlm_create_debug_file(ls);
655 log_rinfo(ls, "join complete");
661 dlm_clear_members(ls);
662 kfree(ls->ls_node_array);
664 dlm_recoverd_stop(ls);
666 dlm_callback_stop(ls);
668 spin_lock(&lslist_lock);
669 list_del(&ls->ls_list);
670 spin_unlock(&lslist_lock);
671 idr_destroy(&ls->ls_recover_idr);
672 kfree(ls->ls_recover_buf);
674 idr_destroy(&ls->ls_lkbidr);
676 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
677 kfree(ls->ls_remove_names[i]);
678 vfree(ls->ls_rsbtbl);
681 kobject_put(&ls->ls_kobj);
685 module_put(THIS_MODULE);
689 int dlm_new_lockspace(const char *name, const char *cluster,
690 uint32_t flags, int lvblen,
691 const struct dlm_lockspace_ops *ops, void *ops_arg,
692 int *ops_result, dlm_lockspace_t **lockspace)
696 mutex_lock(&ls_lock);
698 error = threads_start();
702 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
703 ops_result, lockspace);
710 dlm_midcomms_shutdown();
714 mutex_unlock(&ls_lock);
718 static int lkb_idr_is_local(int id, void *p, void *data)
720 struct dlm_lkb *lkb = p;
722 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
725 static int lkb_idr_is_any(int id, void *p, void *data)
730 static int lkb_idr_free(int id, void *p, void *data)
732 struct dlm_lkb *lkb = p;
734 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735 dlm_free_lvb(lkb->lkb_lvbptr);
741 /* NOTE: We check the lkbidr here rather than the resource table.
742 This is because there may be LKBs queued as ASTs that have been unlinked
743 from their RSBs and are pending deletion once the AST has been delivered */
745 static int lockspace_busy(struct dlm_ls *ls, int force)
749 spin_lock(&ls->ls_lkbidr_spin);
751 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
752 } else if (force == 1) {
753 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
757 spin_unlock(&ls->ls_lkbidr_spin);
761 static int release_lockspace(struct dlm_ls *ls, int force)
767 busy = lockspace_busy(ls, force);
769 spin_lock(&lslist_lock);
770 if (ls->ls_create_count == 1) {
774 /* remove_lockspace takes ls off lslist */
775 ls->ls_create_count = 0;
778 } else if (ls->ls_create_count > 1) {
779 rv = --ls->ls_create_count;
783 spin_unlock(&lslist_lock);
786 log_debug(ls, "release_lockspace no remove %d", rv);
790 dlm_device_deregister(ls);
792 if (force < 3 && dlm_user_daemon_available())
795 dlm_recoverd_stop(ls);
799 dlm_clear_members(ls);
800 dlm_midcomms_shutdown();
803 dlm_callback_stop(ls);
805 remove_lockspace(ls);
807 dlm_delete_debug_file(ls);
809 idr_destroy(&ls->ls_recover_idr);
810 kfree(ls->ls_recover_buf);
813 * Free all lkb's in idr
816 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
817 idr_destroy(&ls->ls_lkbidr);
820 * Free all rsb's on rsbtbl[] lists
823 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
824 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
825 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
826 rb_erase(n, &ls->ls_rsbtbl[i].keep);
830 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
831 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
832 rb_erase(n, &ls->ls_rsbtbl[i].toss);
837 vfree(ls->ls_rsbtbl);
839 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
840 kfree(ls->ls_remove_names[i]);
842 while (!list_empty(&ls->ls_new_rsb)) {
843 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
845 list_del(&rsb->res_hashchain);
850 * Free structures on any other lists
853 dlm_purge_requestqueue(ls);
854 kfree(ls->ls_recover_args);
855 dlm_clear_members(ls);
856 dlm_clear_members_gone(ls);
857 kfree(ls->ls_node_array);
858 log_rinfo(ls, "release_lockspace final free");
859 kobject_put(&ls->ls_kobj);
860 /* The ls structure will be freed when the kobject is done with */
862 module_put(THIS_MODULE);
867 * Called when a system has released all its locks and is not going to use the
868 * lockspace any longer. We free everything we're managing for this lockspace.
869 * Remaining nodes will go through the recovery process as if we'd died. The
870 * lockspace must continue to function as usual, participating in recoveries,
871 * until this returns.
873 * Force has 4 possible values:
874 * 0 - don't destroy lockspace if it has any LKBs
875 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
876 * 2 - destroy lockspace regardless of LKBs
877 * 3 - destroy lockspace as part of a forced shutdown
880 int dlm_release_lockspace(void *lockspace, int force)
885 ls = dlm_find_lockspace_local(lockspace);
888 dlm_put_lockspace(ls);
890 mutex_lock(&ls_lock);
891 error = release_lockspace(ls, force);
896 mutex_unlock(&ls_lock);
901 void dlm_stop_lockspaces(void)
908 spin_lock(&lslist_lock);
909 list_for_each_entry(ls, &lslist, ls_list) {
910 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
914 spin_unlock(&lslist_lock);
915 log_error(ls, "no userland control daemon, stopping lockspace");
919 spin_unlock(&lslist_lock);
922 log_print("dlm user daemon left %d lockspaces", count);