Merge patch series "Fix dt-validate issues on qemu dtbdumps due to dt-bindings"
[platform/kernel/linux-starfive.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 #include "ast.h"
28
29 static int                      ls_count;
30 static struct mutex             ls_lock;
31 static struct list_head         lslist;
32 static spinlock_t               lslist_lock;
33 static struct task_struct *     scand_task;
34
35
36 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 {
38         ssize_t ret = len;
39         int n;
40         int rc = kstrtoint(buf, 0, &n);
41
42         if (rc)
43                 return rc;
44         ls = dlm_find_lockspace_local(ls->ls_local_handle);
45         if (!ls)
46                 return -EINVAL;
47
48         switch (n) {
49         case 0:
50                 dlm_ls_stop(ls);
51                 break;
52         case 1:
53                 dlm_ls_start(ls);
54                 break;
55         default:
56                 ret = -EINVAL;
57         }
58         dlm_put_lockspace(ls);
59         return ret;
60 }
61
62 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
63 {
64         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
65
66         if (rc)
67                 return rc;
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
81
82         if (rc)
83                 return rc;
84         return len;
85 }
86
87 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
88 {
89         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
90 }
91
92 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
93 {
94         int val;
95         int rc = kstrtoint(buf, 0, &val);
96
97         if (rc)
98                 return rc;
99         if (val == 1)
100                 set_bit(LSFL_NODIR, &ls->ls_flags);
101         return len;
102 }
103
104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
105 {
106         uint32_t status = dlm_recover_status(ls);
107         return snprintf(buf, PAGE_SIZE, "%x\n", status);
108 }
109
110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
111 {
112         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
113 }
114
115 struct dlm_attr {
116         struct attribute attr;
117         ssize_t (*show)(struct dlm_ls *, char *);
118         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
119 };
120
121 static struct dlm_attr dlm_attr_control = {
122         .attr  = {.name = "control", .mode = S_IWUSR},
123         .store = dlm_control_store
124 };
125
126 static struct dlm_attr dlm_attr_event = {
127         .attr  = {.name = "event_done", .mode = S_IWUSR},
128         .store = dlm_event_store
129 };
130
131 static struct dlm_attr dlm_attr_id = {
132         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
133         .show  = dlm_id_show,
134         .store = dlm_id_store
135 };
136
137 static struct dlm_attr dlm_attr_nodir = {
138         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
139         .show  = dlm_nodir_show,
140         .store = dlm_nodir_store
141 };
142
143 static struct dlm_attr dlm_attr_recover_status = {
144         .attr  = {.name = "recover_status", .mode = S_IRUGO},
145         .show  = dlm_recover_status_show
146 };
147
148 static struct dlm_attr dlm_attr_recover_nodeid = {
149         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
150         .show  = dlm_recover_nodeid_show
151 };
152
153 static struct attribute *dlm_attrs[] = {
154         &dlm_attr_control.attr,
155         &dlm_attr_event.attr,
156         &dlm_attr_id.attr,
157         &dlm_attr_nodir.attr,
158         &dlm_attr_recover_status.attr,
159         &dlm_attr_recover_nodeid.attr,
160         NULL,
161 };
162 ATTRIBUTE_GROUPS(dlm);
163
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165                              char *buf)
166 {
167         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169         return a->show ? a->show(ls, buf) : 0;
170 }
171
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173                               const char *buf, size_t len)
174 {
175         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177         return a->store ? a->store(ls, buf, len) : len;
178 }
179
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183         kfree(ls);
184 }
185
186 static const struct sysfs_ops dlm_attr_ops = {
187         .show  = dlm_attr_show,
188         .store = dlm_attr_store,
189 };
190
191 static struct kobj_type dlm_ktype = {
192         .default_groups = dlm_groups,
193         .sysfs_ops     = &dlm_attr_ops,
194         .release       = lockspace_kobj_release,
195 };
196
197 static struct kset *dlm_kset;
198
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201         if (in)
202                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203         else
204                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205
206         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207
208         /* dlm_controld will see the uevent, do the necessary group management
209            and then write to sysfs to wake us */
210
211         wait_event(ls->ls_uevent_wait,
212                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213
214         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
215
216         return ls->ls_uevent_result;
217 }
218
219 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
220 {
221         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222
223         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224         return 0;
225 }
226
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228         .uevent = dlm_uevent,
229 };
230
231 int __init dlm_lockspace_init(void)
232 {
233         ls_count = 0;
234         mutex_init(&ls_lock);
235         INIT_LIST_HEAD(&lslist);
236         spin_lock_init(&lslist_lock);
237
238         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239         if (!dlm_kset) {
240                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
241                 return -ENOMEM;
242         }
243         return 0;
244 }
245
246 void dlm_lockspace_exit(void)
247 {
248         kset_unregister(dlm_kset);
249 }
250
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253         struct dlm_ls *ls;
254
255         spin_lock(&lslist_lock);
256         list_for_each_entry(ls, &lslist, ls_list) {
257                 if (time_after_eq(jiffies, ls->ls_scan_time +
258                                             dlm_config.ci_scan_secs * HZ)) {
259                         spin_unlock(&lslist_lock);
260                         return ls;
261                 }
262         }
263         spin_unlock(&lslist_lock);
264         return NULL;
265 }
266
267 static int dlm_scand(void *data)
268 {
269         struct dlm_ls *ls;
270
271         while (!kthread_should_stop()) {
272                 ls = find_ls_to_scan();
273                 if (ls) {
274                         if (dlm_lock_recovery_try(ls)) {
275                                 ls->ls_scan_time = jiffies;
276                                 dlm_scan_rsbs(ls);
277                                 dlm_scan_timeout(ls);
278                                 dlm_unlock_recovery(ls);
279                         } else {
280                                 ls->ls_scan_time += HZ;
281                         }
282                         continue;
283                 }
284                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
285         }
286         return 0;
287 }
288
289 static int dlm_scand_start(void)
290 {
291         struct task_struct *p;
292         int error = 0;
293
294         p = kthread_run(dlm_scand, NULL, "dlm_scand");
295         if (IS_ERR(p))
296                 error = PTR_ERR(p);
297         else
298                 scand_task = p;
299         return error;
300 }
301
302 static void dlm_scand_stop(void)
303 {
304         kthread_stop(scand_task);
305 }
306
307 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
308 {
309         struct dlm_ls *ls;
310
311         spin_lock(&lslist_lock);
312
313         list_for_each_entry(ls, &lslist, ls_list) {
314                 if (ls->ls_global_id == id) {
315                         atomic_inc(&ls->ls_count);
316                         goto out;
317                 }
318         }
319         ls = NULL;
320  out:
321         spin_unlock(&lslist_lock);
322         return ls;
323 }
324
325 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
326 {
327         struct dlm_ls *ls;
328
329         spin_lock(&lslist_lock);
330         list_for_each_entry(ls, &lslist, ls_list) {
331                 if (ls->ls_local_handle == lockspace) {
332                         atomic_inc(&ls->ls_count);
333                         goto out;
334                 }
335         }
336         ls = NULL;
337  out:
338         spin_unlock(&lslist_lock);
339         return ls;
340 }
341
342 struct dlm_ls *dlm_find_lockspace_device(int minor)
343 {
344         struct dlm_ls *ls;
345
346         spin_lock(&lslist_lock);
347         list_for_each_entry(ls, &lslist, ls_list) {
348                 if (ls->ls_device.minor == minor) {
349                         atomic_inc(&ls->ls_count);
350                         goto out;
351                 }
352         }
353         ls = NULL;
354  out:
355         spin_unlock(&lslist_lock);
356         return ls;
357 }
358
359 void dlm_put_lockspace(struct dlm_ls *ls)
360 {
361         if (atomic_dec_and_test(&ls->ls_count))
362                 wake_up(&ls->ls_count_wait);
363 }
364
365 static void remove_lockspace(struct dlm_ls *ls)
366 {
367 retry:
368         wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
369
370         spin_lock(&lslist_lock);
371         if (atomic_read(&ls->ls_count) != 0) {
372                 spin_unlock(&lslist_lock);
373                 goto retry;
374         }
375
376         WARN_ON(ls->ls_create_count != 0);
377         list_del(&ls->ls_list);
378         spin_unlock(&lslist_lock);
379 }
380
381 static int threads_start(void)
382 {
383         int error;
384
385         error = dlm_scand_start();
386         if (error) {
387                 log_print("cannot start dlm_scand thread %d", error);
388                 goto fail;
389         }
390
391         /* Thread for sending/receiving messages for all lockspace's */
392         error = dlm_midcomms_start();
393         if (error) {
394                 log_print("cannot start dlm lowcomms %d", error);
395                 goto scand_fail;
396         }
397
398         return 0;
399
400  scand_fail:
401         dlm_scand_stop();
402  fail:
403         return error;
404 }
405
406 static int new_lockspace(const char *name, const char *cluster,
407                          uint32_t flags, int lvblen,
408                          const struct dlm_lockspace_ops *ops, void *ops_arg,
409                          int *ops_result, dlm_lockspace_t **lockspace)
410 {
411         struct dlm_ls *ls;
412         int i, size, error;
413         int do_unreg = 0;
414         int namelen = strlen(name);
415
416         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
417                 return -EINVAL;
418
419         if (lvblen % 8)
420                 return -EINVAL;
421
422         if (!try_module_get(THIS_MODULE))
423                 return -EINVAL;
424
425         if (!dlm_user_daemon_available()) {
426                 log_print("dlm user daemon not available");
427                 error = -EUNATCH;
428                 goto out;
429         }
430
431         if (ops && ops_result) {
432                 if (!dlm_config.ci_recover_callbacks)
433                         *ops_result = -EOPNOTSUPP;
434                 else
435                         *ops_result = 0;
436         }
437
438         if (!cluster)
439                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
440                           dlm_config.ci_cluster_name);
441
442         if (dlm_config.ci_recover_callbacks && cluster &&
443             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
444                 log_print("dlm cluster name '%s' does not match "
445                           "the application cluster name '%s'",
446                           dlm_config.ci_cluster_name, cluster);
447                 error = -EBADR;
448                 goto out;
449         }
450
451         error = 0;
452
453         spin_lock(&lslist_lock);
454         list_for_each_entry(ls, &lslist, ls_list) {
455                 WARN_ON(ls->ls_create_count <= 0);
456                 if (ls->ls_namelen != namelen)
457                         continue;
458                 if (memcmp(ls->ls_name, name, namelen))
459                         continue;
460                 if (flags & DLM_LSFL_NEWEXCL) {
461                         error = -EEXIST;
462                         break;
463                 }
464                 ls->ls_create_count++;
465                 *lockspace = ls;
466                 error = 1;
467                 break;
468         }
469         spin_unlock(&lslist_lock);
470
471         if (error)
472                 goto out;
473
474         error = -ENOMEM;
475
476         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
477         if (!ls)
478                 goto out;
479         memcpy(ls->ls_name, name, namelen);
480         ls->ls_namelen = namelen;
481         ls->ls_lvblen = lvblen;
482         atomic_set(&ls->ls_count, 0);
483         init_waitqueue_head(&ls->ls_count_wait);
484         ls->ls_flags = 0;
485         ls->ls_scan_time = jiffies;
486
487         if (ops && dlm_config.ci_recover_callbacks) {
488                 ls->ls_ops = ops;
489                 ls->ls_ops_arg = ops_arg;
490         }
491
492 #ifdef CONFIG_DLM_DEPRECATED_API
493         if (flags & DLM_LSFL_TIMEWARN) {
494                 pr_warn_once("===============================================================\n"
495                              "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
496                              "         will be removed in v6.2!\n"
497                              "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
498                              "===============================================================\n");
499
500                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
501         }
502
503         /* ls_exflags are forced to match among nodes, and we don't
504          * need to require all nodes to have some flags set
505          */
506         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
507                                     DLM_LSFL_NEWEXCL));
508 #else
509         /* ls_exflags are forced to match among nodes, and we don't
510          * need to require all nodes to have some flags set
511          */
512         ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
513 #endif
514
515         size = READ_ONCE(dlm_config.ci_rsbtbl_size);
516         ls->ls_rsbtbl_size = size;
517
518         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
519         if (!ls->ls_rsbtbl)
520                 goto out_lsfree;
521         for (i = 0; i < size; i++) {
522                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
523                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
524                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
525         }
526
527         spin_lock_init(&ls->ls_remove_spin);
528         init_waitqueue_head(&ls->ls_remove_wait);
529
530         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
531                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
532                                                  GFP_KERNEL);
533                 if (!ls->ls_remove_names[i])
534                         goto out_rsbtbl;
535         }
536
537         idr_init(&ls->ls_lkbidr);
538         spin_lock_init(&ls->ls_lkbidr_spin);
539
540         INIT_LIST_HEAD(&ls->ls_waiters);
541         mutex_init(&ls->ls_waiters_mutex);
542         INIT_LIST_HEAD(&ls->ls_orphans);
543         mutex_init(&ls->ls_orphans_mutex);
544 #ifdef CONFIG_DLM_DEPRECATED_API
545         INIT_LIST_HEAD(&ls->ls_timeout);
546         mutex_init(&ls->ls_timeout_mutex);
547 #endif
548
549         INIT_LIST_HEAD(&ls->ls_new_rsb);
550         spin_lock_init(&ls->ls_new_rsb_spin);
551
552         INIT_LIST_HEAD(&ls->ls_nodes);
553         INIT_LIST_HEAD(&ls->ls_nodes_gone);
554         ls->ls_num_nodes = 0;
555         ls->ls_low_nodeid = 0;
556         ls->ls_total_weight = 0;
557         ls->ls_node_array = NULL;
558
559         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
560         ls->ls_stub_rsb.res_ls = ls;
561
562         ls->ls_debug_rsb_dentry = NULL;
563         ls->ls_debug_waiters_dentry = NULL;
564
565         init_waitqueue_head(&ls->ls_uevent_wait);
566         ls->ls_uevent_result = 0;
567         init_completion(&ls->ls_recovery_done);
568         ls->ls_recovery_result = -1;
569
570         mutex_init(&ls->ls_cb_mutex);
571         INIT_LIST_HEAD(&ls->ls_cb_delay);
572
573         ls->ls_recoverd_task = NULL;
574         mutex_init(&ls->ls_recoverd_active);
575         spin_lock_init(&ls->ls_recover_lock);
576         spin_lock_init(&ls->ls_rcom_spin);
577         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
578         ls->ls_recover_status = 0;
579         ls->ls_recover_seq = 0;
580         ls->ls_recover_args = NULL;
581         init_rwsem(&ls->ls_in_recovery);
582         init_rwsem(&ls->ls_recv_active);
583         INIT_LIST_HEAD(&ls->ls_requestqueue);
584         atomic_set(&ls->ls_requestqueue_cnt, 0);
585         init_waitqueue_head(&ls->ls_requestqueue_wait);
586         mutex_init(&ls->ls_requestqueue_mutex);
587         spin_lock_init(&ls->ls_clear_proc_locks);
588
589         /* Due backwards compatibility with 3.1 we need to use maximum
590          * possible dlm message size to be sure the message will fit and
591          * not having out of bounds issues. However on sending side 3.2
592          * might send less.
593          */
594         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
595         if (!ls->ls_recover_buf)
596                 goto out_lkbidr;
597
598         ls->ls_slot = 0;
599         ls->ls_num_slots = 0;
600         ls->ls_slots_size = 0;
601         ls->ls_slots = NULL;
602
603         INIT_LIST_HEAD(&ls->ls_recover_list);
604         spin_lock_init(&ls->ls_recover_list_lock);
605         idr_init(&ls->ls_recover_idr);
606         spin_lock_init(&ls->ls_recover_idr_lock);
607         ls->ls_recover_list_count = 0;
608         ls->ls_local_handle = ls;
609         init_waitqueue_head(&ls->ls_wait_general);
610         INIT_LIST_HEAD(&ls->ls_root_list);
611         init_rwsem(&ls->ls_root_sem);
612
613         spin_lock(&lslist_lock);
614         ls->ls_create_count = 1;
615         list_add(&ls->ls_list, &lslist);
616         spin_unlock(&lslist_lock);
617
618         if (flags & DLM_LSFL_FS) {
619                 error = dlm_callback_start(ls);
620                 if (error) {
621                         log_error(ls, "can't start dlm_callback %d", error);
622                         goto out_delist;
623                 }
624         }
625
626         init_waitqueue_head(&ls->ls_recover_lock_wait);
627
628         /*
629          * Once started, dlm_recoverd first looks for ls in lslist, then
630          * initializes ls_in_recovery as locked in "down" mode.  We need
631          * to wait for the wakeup from dlm_recoverd because in_recovery
632          * has to start out in down mode.
633          */
634
635         error = dlm_recoverd_start(ls);
636         if (error) {
637                 log_error(ls, "can't start dlm_recoverd %d", error);
638                 goto out_callback;
639         }
640
641         wait_event(ls->ls_recover_lock_wait,
642                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
643
644         /* let kobject handle freeing of ls if there's an error */
645         do_unreg = 1;
646
647         ls->ls_kobj.kset = dlm_kset;
648         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
649                                      "%s", ls->ls_name);
650         if (error)
651                 goto out_recoverd;
652         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
653
654         /* This uevent triggers dlm_controld in userspace to add us to the
655            group of nodes that are members of this lockspace (managed by the
656            cluster infrastructure.)  Once it's done that, it tells us who the
657            current lockspace members are (via configfs) and then tells the
658            lockspace to start running (via sysfs) in dlm_ls_start(). */
659
660         error = do_uevent(ls, 1);
661         if (error)
662                 goto out_recoverd;
663
664         /* wait until recovery is successful or failed */
665         wait_for_completion(&ls->ls_recovery_done);
666         error = ls->ls_recovery_result;
667         if (error)
668                 goto out_members;
669
670         dlm_create_debug_file(ls);
671
672         log_rinfo(ls, "join complete");
673         *lockspace = ls;
674         return 0;
675
676  out_members:
677         do_uevent(ls, 0);
678         dlm_clear_members(ls);
679         kfree(ls->ls_node_array);
680  out_recoverd:
681         dlm_recoverd_stop(ls);
682  out_callback:
683         dlm_callback_stop(ls);
684  out_delist:
685         spin_lock(&lslist_lock);
686         list_del(&ls->ls_list);
687         spin_unlock(&lslist_lock);
688         idr_destroy(&ls->ls_recover_idr);
689         kfree(ls->ls_recover_buf);
690  out_lkbidr:
691         idr_destroy(&ls->ls_lkbidr);
692  out_rsbtbl:
693         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
694                 kfree(ls->ls_remove_names[i]);
695         vfree(ls->ls_rsbtbl);
696  out_lsfree:
697         if (do_unreg)
698                 kobject_put(&ls->ls_kobj);
699         else
700                 kfree(ls);
701  out:
702         module_put(THIS_MODULE);
703         return error;
704 }
705
706 static int __dlm_new_lockspace(const char *name, const char *cluster,
707                                uint32_t flags, int lvblen,
708                                const struct dlm_lockspace_ops *ops,
709                                void *ops_arg, int *ops_result,
710                                dlm_lockspace_t **lockspace)
711 {
712         int error = 0;
713
714         mutex_lock(&ls_lock);
715         if (!ls_count)
716                 error = threads_start();
717         if (error)
718                 goto out;
719
720         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
721                               ops_result, lockspace);
722         if (!error)
723                 ls_count++;
724         if (error > 0)
725                 error = 0;
726         if (!ls_count) {
727                 dlm_scand_stop();
728                 dlm_midcomms_shutdown();
729                 dlm_lowcomms_stop();
730         }
731  out:
732         mutex_unlock(&ls_lock);
733         return error;
734 }
735
736 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
737                       int lvblen, const struct dlm_lockspace_ops *ops,
738                       void *ops_arg, int *ops_result,
739                       dlm_lockspace_t **lockspace)
740 {
741         return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
742                                    ops, ops_arg, ops_result, lockspace);
743 }
744
745 int dlm_new_user_lockspace(const char *name, const char *cluster,
746                            uint32_t flags, int lvblen,
747                            const struct dlm_lockspace_ops *ops,
748                            void *ops_arg, int *ops_result,
749                            dlm_lockspace_t **lockspace)
750 {
751         return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
752                                    ops_arg, ops_result, lockspace);
753 }
754
755 static int lkb_idr_is_local(int id, void *p, void *data)
756 {
757         struct dlm_lkb *lkb = p;
758
759         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
760 }
761
762 static int lkb_idr_is_any(int id, void *p, void *data)
763 {
764         return 1;
765 }
766
767 static int lkb_idr_free(int id, void *p, void *data)
768 {
769         struct dlm_lkb *lkb = p;
770
771         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
772                 dlm_free_lvb(lkb->lkb_lvbptr);
773
774         dlm_free_lkb(lkb);
775         return 0;
776 }
777
778 /* NOTE: We check the lkbidr here rather than the resource table.
779    This is because there may be LKBs queued as ASTs that have been unlinked
780    from their RSBs and are pending deletion once the AST has been delivered */
781
782 static int lockspace_busy(struct dlm_ls *ls, int force)
783 {
784         int rv;
785
786         spin_lock(&ls->ls_lkbidr_spin);
787         if (force == 0) {
788                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
789         } else if (force == 1) {
790                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
791         } else {
792                 rv = 0;
793         }
794         spin_unlock(&ls->ls_lkbidr_spin);
795         return rv;
796 }
797
798 static int release_lockspace(struct dlm_ls *ls, int force)
799 {
800         struct dlm_rsb *rsb;
801         struct rb_node *n;
802         int i, busy, rv;
803
804         busy = lockspace_busy(ls, force);
805
806         spin_lock(&lslist_lock);
807         if (ls->ls_create_count == 1) {
808                 if (busy) {
809                         rv = -EBUSY;
810                 } else {
811                         /* remove_lockspace takes ls off lslist */
812                         ls->ls_create_count = 0;
813                         rv = 0;
814                 }
815         } else if (ls->ls_create_count > 1) {
816                 rv = --ls->ls_create_count;
817         } else {
818                 rv = -EINVAL;
819         }
820         spin_unlock(&lslist_lock);
821
822         if (rv) {
823                 log_debug(ls, "release_lockspace no remove %d", rv);
824                 return rv;
825         }
826
827         dlm_device_deregister(ls);
828
829         if (force < 3 && dlm_user_daemon_available())
830                 do_uevent(ls, 0);
831
832         dlm_recoverd_stop(ls);
833
834         if (ls_count == 1) {
835                 dlm_scand_stop();
836                 dlm_clear_members(ls);
837                 dlm_midcomms_shutdown();
838         }
839
840         dlm_callback_stop(ls);
841
842         remove_lockspace(ls);
843
844         dlm_delete_debug_file(ls);
845
846         idr_destroy(&ls->ls_recover_idr);
847         kfree(ls->ls_recover_buf);
848
849         /*
850          * Free all lkb's in idr
851          */
852
853         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
854         idr_destroy(&ls->ls_lkbidr);
855
856         /*
857          * Free all rsb's on rsbtbl[] lists
858          */
859
860         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
861                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
862                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
863                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
864                         dlm_free_rsb(rsb);
865                 }
866
867                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
868                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
869                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
870                         dlm_free_rsb(rsb);
871                 }
872         }
873
874         vfree(ls->ls_rsbtbl);
875
876         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
877                 kfree(ls->ls_remove_names[i]);
878
879         while (!list_empty(&ls->ls_new_rsb)) {
880                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
881                                        res_hashchain);
882                 list_del(&rsb->res_hashchain);
883                 dlm_free_rsb(rsb);
884         }
885
886         /*
887          * Free structures on any other lists
888          */
889
890         dlm_purge_requestqueue(ls);
891         kfree(ls->ls_recover_args);
892         dlm_clear_members(ls);
893         dlm_clear_members_gone(ls);
894         kfree(ls->ls_node_array);
895         log_rinfo(ls, "release_lockspace final free");
896         kobject_put(&ls->ls_kobj);
897         /* The ls structure will be freed when the kobject is done with */
898
899         module_put(THIS_MODULE);
900         return 0;
901 }
902
903 /*
904  * Called when a system has released all its locks and is not going to use the
905  * lockspace any longer.  We free everything we're managing for this lockspace.
906  * Remaining nodes will go through the recovery process as if we'd died.  The
907  * lockspace must continue to function as usual, participating in recoveries,
908  * until this returns.
909  *
910  * Force has 4 possible values:
911  * 0 - don't destroy lockspace if it has any LKBs
912  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
913  * 2 - destroy lockspace regardless of LKBs
914  * 3 - destroy lockspace as part of a forced shutdown
915  */
916
917 int dlm_release_lockspace(void *lockspace, int force)
918 {
919         struct dlm_ls *ls;
920         int error;
921
922         ls = dlm_find_lockspace_local(lockspace);
923         if (!ls)
924                 return -EINVAL;
925         dlm_put_lockspace(ls);
926
927         mutex_lock(&ls_lock);
928         error = release_lockspace(ls, force);
929         if (!error)
930                 ls_count--;
931         if (!ls_count)
932                 dlm_lowcomms_stop();
933         mutex_unlock(&ls_lock);
934
935         return error;
936 }
937
938 void dlm_stop_lockspaces(void)
939 {
940         struct dlm_ls *ls;
941         int count;
942
943  restart:
944         count = 0;
945         spin_lock(&lslist_lock);
946         list_for_each_entry(ls, &lslist, ls_list) {
947                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
948                         count++;
949                         continue;
950                 }
951                 spin_unlock(&lslist_lock);
952                 log_error(ls, "no userland control daemon, stopping lockspace");
953                 dlm_ls_stop(ls);
954                 goto restart;
955         }
956         spin_unlock(&lslist_lock);
957
958         if (count)
959                 log_print("dlm user daemon left %d lockspaces", count);
960 }
961
962 void dlm_stop_lockspaces_check(void)
963 {
964         struct dlm_ls *ls;
965
966         spin_lock(&lslist_lock);
967         list_for_each_entry(ls, &lslist, ls_list) {
968                 if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
969                             !dlm_locking_stopped(ls)))
970                         break;
971         }
972         spin_unlock(&lslist_lock);
973 }