Merge tag 'for-6.4/block-2023-05-06' of git://git.kernel.dk/linux
[platform/kernel/linux-starfive.git] / fs / dlm / lock.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13    dlm_lock()
14    dlm_unlock()
15
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms, bool local);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void toss_rsb(struct kref *kref);
93
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168                (unsigned long long)lkb->lkb_recover_seq);
169 }
170
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174                "rlc %d name %s\n",
175                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177                r->res_name);
178 }
179
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182         struct dlm_lkb *lkb;
183
184         dlm_print_rsb(r);
185
186         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188         printk(KERN_ERR "rsb lookup list\n");
189         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb grant queue:\n");
192         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194         printk(KERN_ERR "rsb convert queue:\n");
195         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196                 dlm_print_lkb(lkb);
197         printk(KERN_ERR "rsb wait queue:\n");
198         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199                 dlm_print_lkb(lkb);
200 }
201
202 /* Threads cannot use the lockspace while it's being recovered */
203
204 static inline void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206         down_read(&ls->ls_in_recovery);
207 }
208
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211         up_read(&ls->ls_in_recovery);
212 }
213
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216         return down_read_trylock(&ls->ls_in_recovery);
217 }
218
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231         return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236         return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247         return !!r->res_nodeid;
248 }
249
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252         return lkb->lkb_nodeid &&
253                !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287                test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297         if (rv == -DLM_ECANCEL &&
298             test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299                 rv = -EDEADLK;
300
301         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306         queue_cast(r, lkb,
307                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312         if (is_master_copy(lkb)) {
313                 send_bast(r, lkb, rqmode);
314         } else {
315                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316         }
317 }
318
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322
323 /* This is only called to add a reference when the code already holds
324    a valid reference to the rsb, so there's no need for locking. */
325
326 static inline void hold_rsb(struct dlm_rsb *r)
327 {
328         kref_get(&r->res_ref);
329 }
330
331 void dlm_hold_rsb(struct dlm_rsb *r)
332 {
333         hold_rsb(r);
334 }
335
336 /* When all references to the rsb are gone it's transferred to
337    the tossed list for later disposal. */
338
339 static void put_rsb(struct dlm_rsb *r)
340 {
341         struct dlm_ls *ls = r->res_ls;
342         uint32_t bucket = r->res_bucket;
343         int rv;
344
345         rv = kref_put_lock(&r->res_ref, toss_rsb,
346                            &ls->ls_rsbtbl[bucket].lock);
347         if (rv)
348                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
349 }
350
351 void dlm_put_rsb(struct dlm_rsb *r)
352 {
353         put_rsb(r);
354 }
355
356 static int pre_rsb_struct(struct dlm_ls *ls)
357 {
358         struct dlm_rsb *r1, *r2;
359         int count = 0;
360
361         spin_lock(&ls->ls_new_rsb_spin);
362         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
363                 spin_unlock(&ls->ls_new_rsb_spin);
364                 return 0;
365         }
366         spin_unlock(&ls->ls_new_rsb_spin);
367
368         r1 = dlm_allocate_rsb(ls);
369         r2 = dlm_allocate_rsb(ls);
370
371         spin_lock(&ls->ls_new_rsb_spin);
372         if (r1) {
373                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
374                 ls->ls_new_rsb_count++;
375         }
376         if (r2) {
377                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
378                 ls->ls_new_rsb_count++;
379         }
380         count = ls->ls_new_rsb_count;
381         spin_unlock(&ls->ls_new_rsb_spin);
382
383         if (!count)
384                 return -ENOMEM;
385         return 0;
386 }
387
388 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
389    unlock any spinlocks, go back and call pre_rsb_struct again.
390    Otherwise, take an rsb off the list and return it. */
391
392 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
393                           struct dlm_rsb **r_ret)
394 {
395         struct dlm_rsb *r;
396         int count;
397
398         spin_lock(&ls->ls_new_rsb_spin);
399         if (list_empty(&ls->ls_new_rsb)) {
400                 count = ls->ls_new_rsb_count;
401                 spin_unlock(&ls->ls_new_rsb_spin);
402                 log_debug(ls, "find_rsb retry %d %d %s",
403                           count, dlm_config.ci_new_rsb_count,
404                           (const char *)name);
405                 return -EAGAIN;
406         }
407
408         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
409         list_del(&r->res_hashchain);
410         /* Convert the empty list_head to a NULL rb_node for tree usage: */
411         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
412         ls->ls_new_rsb_count--;
413         spin_unlock(&ls->ls_new_rsb_spin);
414
415         r->res_ls = ls;
416         r->res_length = len;
417         memcpy(r->res_name, name, len);
418         mutex_init(&r->res_mutex);
419
420         INIT_LIST_HEAD(&r->res_lookup);
421         INIT_LIST_HEAD(&r->res_grantqueue);
422         INIT_LIST_HEAD(&r->res_convertqueue);
423         INIT_LIST_HEAD(&r->res_waitqueue);
424         INIT_LIST_HEAD(&r->res_root_list);
425         INIT_LIST_HEAD(&r->res_recover_list);
426
427         *r_ret = r;
428         return 0;
429 }
430
431 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
432 {
433         char maxname[DLM_RESNAME_MAXLEN];
434
435         memset(maxname, 0, DLM_RESNAME_MAXLEN);
436         memcpy(maxname, name, nlen);
437         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
438 }
439
440 int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
441                         struct dlm_rsb **r_ret)
442 {
443         struct rb_node *node = tree->rb_node;
444         struct dlm_rsb *r;
445         int rc;
446
447         while (node) {
448                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
449                 rc = rsb_cmp(r, name, len);
450                 if (rc < 0)
451                         node = node->rb_left;
452                 else if (rc > 0)
453                         node = node->rb_right;
454                 else
455                         goto found;
456         }
457         *r_ret = NULL;
458         return -EBADR;
459
460  found:
461         *r_ret = r;
462         return 0;
463 }
464
465 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
466 {
467         struct rb_node **newn = &tree->rb_node;
468         struct rb_node *parent = NULL;
469         int rc;
470
471         while (*newn) {
472                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
473                                                res_hashnode);
474
475                 parent = *newn;
476                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
477                 if (rc < 0)
478                         newn = &parent->rb_left;
479                 else if (rc > 0)
480                         newn = &parent->rb_right;
481                 else {
482                         log_print("rsb_insert match");
483                         dlm_dump_rsb(rsb);
484                         dlm_dump_rsb(cur);
485                         return -EEXIST;
486                 }
487         }
488
489         rb_link_node(&rsb->res_hashnode, parent, newn);
490         rb_insert_color(&rsb->res_hashnode, tree);
491         return 0;
492 }
493
494 /*
495  * Find rsb in rsbtbl and potentially create/add one
496  *
497  * Delaying the release of rsb's has a similar benefit to applications keeping
498  * NL locks on an rsb, but without the guarantee that the cached master value
499  * will still be valid when the rsb is reused.  Apps aren't always smart enough
500  * to keep NL locks on an rsb that they may lock again shortly; this can lead
501  * to excessive master lookups and removals if we don't delay the release.
502  *
503  * Searching for an rsb means looking through both the normal list and toss
504  * list.  When found on the toss list the rsb is moved to the normal list with
505  * ref count of 1; when found on normal list the ref count is incremented.
506  *
507  * rsb's on the keep list are being used locally and refcounted.
508  * rsb's on the toss list are not being used locally, and are not refcounted.
509  *
510  * The toss list rsb's were either
511  * - previously used locally but not any more (were on keep list, then
512  *   moved to toss list when last refcount dropped)
513  * - created and put on toss list as a directory record for a lookup
514  *   (we are the dir node for the res, but are not using the res right now,
515  *   but some other node is)
516  *
517  * The purpose of find_rsb() is to return a refcounted rsb for local use.
518  * So, if the given rsb is on the toss list, it is moved to the keep list
519  * before being returned.
520  *
521  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
522  * more refcounts exist, so the rsb is moved from the keep list to the
523  * toss list.
524  *
525  * rsb's on both keep and toss lists are used for doing a name to master
526  * lookups.  rsb's that are in use locally (and being refcounted) are on
527  * the keep list, rsb's that are not in use locally (not refcounted) and
528  * only exist for name/master lookups are on the toss list.
529  *
530  * rsb's on the toss list who's dir_nodeid is not local can have stale
531  * name/master mappings.  So, remote requests on such rsb's can potentially
532  * return with an error, which means the mapping is stale and needs to
533  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
534  * first_lkid is to keep only a single outstanding request on an rsb
535  * while that rsb has a potentially stale master.)
536  */
537
538 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
539                         uint32_t hash, uint32_t b,
540                         int dir_nodeid, int from_nodeid,
541                         unsigned int flags, struct dlm_rsb **r_ret)
542 {
543         struct dlm_rsb *r = NULL;
544         int our_nodeid = dlm_our_nodeid();
545         int from_local = 0;
546         int from_other = 0;
547         int from_dir = 0;
548         int create = 0;
549         int error;
550
551         if (flags & R_RECEIVE_REQUEST) {
552                 if (from_nodeid == dir_nodeid)
553                         from_dir = 1;
554                 else
555                         from_other = 1;
556         } else if (flags & R_REQUEST) {
557                 from_local = 1;
558         }
559
560         /*
561          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
562          * from_nodeid has sent us a lock in dlm_recover_locks, believing
563          * we're the new master.  Our local recovery may not have set
564          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
565          * create the rsb; dlm_recover_process_copy() will handle EBADR
566          * by resending.
567          *
568          * If someone sends us a request, we are the dir node, and we do
569          * not find the rsb anywhere, then recreate it.  This happens if
570          * someone sends us a request after we have removed/freed an rsb
571          * from our toss list.  (They sent a request instead of lookup
572          * because they are using an rsb from their toss list.)
573          */
574
575         if (from_local || from_dir ||
576             (from_other && (dir_nodeid == our_nodeid))) {
577                 create = 1;
578         }
579
580  retry:
581         if (create) {
582                 error = pre_rsb_struct(ls);
583                 if (error < 0)
584                         goto out;
585         }
586
587         spin_lock(&ls->ls_rsbtbl[b].lock);
588
589         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
590         if (error)
591                 goto do_toss;
592         
593         /*
594          * rsb is active, so we can't check master_nodeid without lock_rsb.
595          */
596
597         kref_get(&r->res_ref);
598         goto out_unlock;
599
600
601  do_toss:
602         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
603         if (error)
604                 goto do_new;
605
606         /*
607          * rsb found inactive (master_nodeid may be out of date unless
608          * we are the dir_nodeid or were the master)  No other thread
609          * is using this rsb because it's on the toss list, so we can
610          * look at or update res_master_nodeid without lock_rsb.
611          */
612
613         if ((r->res_master_nodeid != our_nodeid) && from_other) {
614                 /* our rsb was not master, and another node (not the dir node)
615                    has sent us a request */
616                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
617                           from_nodeid, r->res_master_nodeid, dir_nodeid,
618                           r->res_name);
619                 error = -ENOTBLK;
620                 goto out_unlock;
621         }
622
623         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
624                 /* don't think this should ever happen */
625                 log_error(ls, "find_rsb toss from_dir %d master %d",
626                           from_nodeid, r->res_master_nodeid);
627                 dlm_print_rsb(r);
628                 /* fix it and go on */
629                 r->res_master_nodeid = our_nodeid;
630                 r->res_nodeid = 0;
631                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
632                 r->res_first_lkid = 0;
633         }
634
635         if (from_local && (r->res_master_nodeid != our_nodeid)) {
636                 /* Because we have held no locks on this rsb,
637                    res_master_nodeid could have become stale. */
638                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
639                 r->res_first_lkid = 0;
640         }
641
642         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
643         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
644         goto out_unlock;
645
646
647  do_new:
648         /*
649          * rsb not found
650          */
651
652         if (error == -EBADR && !create)
653                 goto out_unlock;
654
655         error = get_rsb_struct(ls, name, len, &r);
656         if (error == -EAGAIN) {
657                 spin_unlock(&ls->ls_rsbtbl[b].lock);
658                 goto retry;
659         }
660         if (error)
661                 goto out_unlock;
662
663         r->res_hash = hash;
664         r->res_bucket = b;
665         r->res_dir_nodeid = dir_nodeid;
666         kref_init(&r->res_ref);
667
668         if (from_dir) {
669                 /* want to see how often this happens */
670                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
671                           from_nodeid, r->res_name);
672                 r->res_master_nodeid = our_nodeid;
673                 r->res_nodeid = 0;
674                 goto out_add;
675         }
676
677         if (from_other && (dir_nodeid != our_nodeid)) {
678                 /* should never happen */
679                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
680                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
681                 dlm_free_rsb(r);
682                 r = NULL;
683                 error = -ENOTBLK;
684                 goto out_unlock;
685         }
686
687         if (from_other) {
688                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
689                           from_nodeid, dir_nodeid, r->res_name);
690         }
691
692         if (dir_nodeid == our_nodeid) {
693                 /* When we are the dir nodeid, we can set the master
694                    node immediately */
695                 r->res_master_nodeid = our_nodeid;
696                 r->res_nodeid = 0;
697         } else {
698                 /* set_master will send_lookup to dir_nodeid */
699                 r->res_master_nodeid = 0;
700                 r->res_nodeid = -1;
701         }
702
703  out_add:
704         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
705  out_unlock:
706         spin_unlock(&ls->ls_rsbtbl[b].lock);
707  out:
708         *r_ret = r;
709         return error;
710 }
711
712 /* During recovery, other nodes can send us new MSTCPY locks (from
713    dlm_recover_locks) before we've made ourself master (in
714    dlm_recover_masters). */
715
716 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
717                           uint32_t hash, uint32_t b,
718                           int dir_nodeid, int from_nodeid,
719                           unsigned int flags, struct dlm_rsb **r_ret)
720 {
721         struct dlm_rsb *r = NULL;
722         int our_nodeid = dlm_our_nodeid();
723         int recover = (flags & R_RECEIVE_RECOVER);
724         int error;
725
726  retry:
727         error = pre_rsb_struct(ls);
728         if (error < 0)
729                 goto out;
730
731         spin_lock(&ls->ls_rsbtbl[b].lock);
732
733         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
734         if (error)
735                 goto do_toss;
736
737         /*
738          * rsb is active, so we can't check master_nodeid without lock_rsb.
739          */
740
741         kref_get(&r->res_ref);
742         goto out_unlock;
743
744
745  do_toss:
746         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
747         if (error)
748                 goto do_new;
749
750         /*
751          * rsb found inactive. No other thread is using this rsb because
752          * it's on the toss list, so we can look at or update
753          * res_master_nodeid without lock_rsb.
754          */
755
756         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
757                 /* our rsb is not master, and another node has sent us a
758                    request; this should never happen */
759                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
760                           from_nodeid, r->res_master_nodeid, dir_nodeid);
761                 dlm_print_rsb(r);
762                 error = -ENOTBLK;
763                 goto out_unlock;
764         }
765
766         if (!recover && (r->res_master_nodeid != our_nodeid) &&
767             (dir_nodeid == our_nodeid)) {
768                 /* our rsb is not master, and we are dir; may as well fix it;
769                    this should never happen */
770                 log_error(ls, "find_rsb toss our %d master %d dir %d",
771                           our_nodeid, r->res_master_nodeid, dir_nodeid);
772                 dlm_print_rsb(r);
773                 r->res_master_nodeid = our_nodeid;
774                 r->res_nodeid = 0;
775         }
776
777         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
778         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
779         goto out_unlock;
780
781
782  do_new:
783         /*
784          * rsb not found
785          */
786
787         error = get_rsb_struct(ls, name, len, &r);
788         if (error == -EAGAIN) {
789                 spin_unlock(&ls->ls_rsbtbl[b].lock);
790                 goto retry;
791         }
792         if (error)
793                 goto out_unlock;
794
795         r->res_hash = hash;
796         r->res_bucket = b;
797         r->res_dir_nodeid = dir_nodeid;
798         r->res_master_nodeid = dir_nodeid;
799         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
800         kref_init(&r->res_ref);
801
802         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
803  out_unlock:
804         spin_unlock(&ls->ls_rsbtbl[b].lock);
805  out:
806         *r_ret = r;
807         return error;
808 }
809
810 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
811                     int from_nodeid, unsigned int flags,
812                     struct dlm_rsb **r_ret)
813 {
814         uint32_t hash, b;
815         int dir_nodeid;
816
817         if (len > DLM_RESNAME_MAXLEN)
818                 return -EINVAL;
819
820         hash = jhash(name, len, 0);
821         b = hash & (ls->ls_rsbtbl_size - 1);
822
823         dir_nodeid = dlm_hash2nodeid(ls, hash);
824
825         if (dlm_no_directory(ls))
826                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
827                                       from_nodeid, flags, r_ret);
828         else
829                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
830                                       from_nodeid, flags, r_ret);
831 }
832
833 /* we have received a request and found that res_master_nodeid != our_nodeid,
834    so we need to return an error or make ourself the master */
835
836 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
837                                   int from_nodeid)
838 {
839         if (dlm_no_directory(ls)) {
840                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
841                           from_nodeid, r->res_master_nodeid,
842                           r->res_dir_nodeid);
843                 dlm_print_rsb(r);
844                 return -ENOTBLK;
845         }
846
847         if (from_nodeid != r->res_dir_nodeid) {
848                 /* our rsb is not master, and another node (not the dir node)
849                    has sent us a request.  this is much more common when our
850                    master_nodeid is zero, so limit debug to non-zero.  */
851
852                 if (r->res_master_nodeid) {
853                         log_debug(ls, "validate master from_other %d master %d "
854                                   "dir %d first %x %s", from_nodeid,
855                                   r->res_master_nodeid, r->res_dir_nodeid,
856                                   r->res_first_lkid, r->res_name);
857                 }
858                 return -ENOTBLK;
859         } else {
860                 /* our rsb is not master, but the dir nodeid has sent us a
861                    request; this could happen with master 0 / res_nodeid -1 */
862
863                 if (r->res_master_nodeid) {
864                         log_error(ls, "validate master from_dir %d master %d "
865                                   "first %x %s",
866                                   from_nodeid, r->res_master_nodeid,
867                                   r->res_first_lkid, r->res_name);
868                 }
869
870                 r->res_master_nodeid = dlm_our_nodeid();
871                 r->res_nodeid = 0;
872                 return 0;
873         }
874 }
875
876 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
877                                 int from_nodeid, bool toss_list, unsigned int flags,
878                                 int *r_nodeid, int *result)
879 {
880         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
881         int from_master = (flags & DLM_LU_RECOVER_DIR);
882
883         if (r->res_dir_nodeid != our_nodeid) {
884                 /* should not happen, but may as well fix it and carry on */
885                 log_error(ls, "%s res_dir %d our %d %s", __func__,
886                           r->res_dir_nodeid, our_nodeid, r->res_name);
887                 r->res_dir_nodeid = our_nodeid;
888         }
889
890         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
891                 /* Recovery uses this function to set a new master when
892                  * the previous master failed.  Setting NEW_MASTER will
893                  * force dlm_recover_masters to call recover_master on this
894                  * rsb even though the res_nodeid is no longer removed.
895                  */
896
897                 r->res_master_nodeid = from_nodeid;
898                 r->res_nodeid = from_nodeid;
899                 rsb_set_flag(r, RSB_NEW_MASTER);
900
901                 if (toss_list) {
902                         /* I don't think we should ever find it on toss list. */
903                         log_error(ls, "%s fix_master on toss", __func__);
904                         dlm_dump_rsb(r);
905                 }
906         }
907
908         if (from_master && (r->res_master_nodeid != from_nodeid)) {
909                 /* this will happen if from_nodeid became master during
910                  * a previous recovery cycle, and we aborted the previous
911                  * cycle before recovering this master value
912                  */
913
914                 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
915                           __func__, from_nodeid, r->res_master_nodeid,
916                           r->res_nodeid, r->res_first_lkid, r->res_name);
917
918                 if (r->res_master_nodeid == our_nodeid) {
919                         log_error(ls, "from_master %d our_master", from_nodeid);
920                         dlm_dump_rsb(r);
921                         goto ret_assign;
922                 }
923
924                 r->res_master_nodeid = from_nodeid;
925                 r->res_nodeid = from_nodeid;
926                 rsb_set_flag(r, RSB_NEW_MASTER);
927         }
928
929         if (!r->res_master_nodeid) {
930                 /* this will happen if recovery happens while we're looking
931                  * up the master for this rsb
932                  */
933
934                 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
935                           from_nodeid, r->res_first_lkid, r->res_name);
936                 r->res_master_nodeid = from_nodeid;
937                 r->res_nodeid = from_nodeid;
938         }
939
940         if (!from_master && !fix_master &&
941             (r->res_master_nodeid == from_nodeid)) {
942                 /* this can happen when the master sends remove, the dir node
943                  * finds the rsb on the keep list and ignores the remove,
944                  * and the former master sends a lookup
945                  */
946
947                 log_limit(ls, "%s from master %d flags %x first %x %s",
948                           __func__, from_nodeid, flags, r->res_first_lkid,
949                           r->res_name);
950         }
951
952  ret_assign:
953         *r_nodeid = r->res_master_nodeid;
954         if (result)
955                 *result = DLM_LU_MATCH;
956 }
957
958 /*
959  * We're the dir node for this res and another node wants to know the
960  * master nodeid.  During normal operation (non recovery) this is only
961  * called from receive_lookup(); master lookups when the local node is
962  * the dir node are done by find_rsb().
963  *
964  * normal operation, we are the dir node for a resource
965  * . _request_lock
966  * . set_master
967  * . send_lookup
968  * . receive_lookup
969  * . dlm_master_lookup flags 0
970  *
971  * recover directory, we are rebuilding dir for all resources
972  * . dlm_recover_directory
973  * . dlm_rcom_names
974  *   remote node sends back the rsb names it is master of and we are dir of
975  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
976  *   we either create new rsb setting remote node as master, or find existing
977  *   rsb and set master to be the remote node.
978  *
979  * recover masters, we are finding the new master for resources
980  * . dlm_recover_masters
981  * . recover_master
982  * . dlm_send_rcom_lookup
983  * . receive_rcom_lookup
984  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
985  */
986
987 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
988                       unsigned int flags, int *r_nodeid, int *result)
989 {
990         struct dlm_rsb *r = NULL;
991         uint32_t hash, b;
992         int our_nodeid = dlm_our_nodeid();
993         int dir_nodeid, error;
994
995         if (len > DLM_RESNAME_MAXLEN)
996                 return -EINVAL;
997
998         if (from_nodeid == our_nodeid) {
999                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1000                           our_nodeid, flags);
1001                 return -EINVAL;
1002         }
1003
1004         hash = jhash(name, len, 0);
1005         b = hash & (ls->ls_rsbtbl_size - 1);
1006
1007         dir_nodeid = dlm_hash2nodeid(ls, hash);
1008         if (dir_nodeid != our_nodeid) {
1009                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1010                           from_nodeid, dir_nodeid, our_nodeid, hash,
1011                           ls->ls_num_nodes);
1012                 *r_nodeid = -1;
1013                 return -EINVAL;
1014         }
1015
1016  retry:
1017         error = pre_rsb_struct(ls);
1018         if (error < 0)
1019                 return error;
1020
1021         spin_lock(&ls->ls_rsbtbl[b].lock);
1022         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1023         if (!error) {
1024                 /* because the rsb is active, we need to lock_rsb before
1025                  * checking/changing re_master_nodeid
1026                  */
1027
1028                 hold_rsb(r);
1029                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1030                 lock_rsb(r);
1031
1032                 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1033                                     flags, r_nodeid, result);
1034
1035                 /* the rsb was active */
1036                 unlock_rsb(r);
1037                 put_rsb(r);
1038
1039                 return 0;
1040         }
1041
1042         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1043         if (error)
1044                 goto not_found;
1045
1046         /* because the rsb is inactive (on toss list), it's not refcounted
1047          * and lock_rsb is not used, but is protected by the rsbtbl lock
1048          */
1049
1050         __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1051                             r_nodeid, result);
1052
1053         r->res_toss_time = jiffies;
1054         /* the rsb was inactive (on toss list) */
1055         spin_unlock(&ls->ls_rsbtbl[b].lock);
1056
1057         return 0;
1058
1059  not_found:
1060         error = get_rsb_struct(ls, name, len, &r);
1061         if (error == -EAGAIN) {
1062                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1063                 goto retry;
1064         }
1065         if (error)
1066                 goto out_unlock;
1067
1068         r->res_hash = hash;
1069         r->res_bucket = b;
1070         r->res_dir_nodeid = our_nodeid;
1071         r->res_master_nodeid = from_nodeid;
1072         r->res_nodeid = from_nodeid;
1073         kref_init(&r->res_ref);
1074         r->res_toss_time = jiffies;
1075
1076         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1077         if (error) {
1078                 /* should never happen */
1079                 dlm_free_rsb(r);
1080                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1081                 goto retry;
1082         }
1083
1084         if (result)
1085                 *result = DLM_LU_ADD;
1086         *r_nodeid = from_nodeid;
1087  out_unlock:
1088         spin_unlock(&ls->ls_rsbtbl[b].lock);
1089         return error;
1090 }
1091
1092 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1093 {
1094         struct rb_node *n;
1095         struct dlm_rsb *r;
1096         int i;
1097
1098         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1099                 spin_lock(&ls->ls_rsbtbl[i].lock);
1100                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1101                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1102                         if (r->res_hash == hash)
1103                                 dlm_dump_rsb(r);
1104                 }
1105                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1106         }
1107 }
1108
1109 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1110 {
1111         struct dlm_rsb *r = NULL;
1112         uint32_t hash, b;
1113         int error;
1114
1115         hash = jhash(name, len, 0);
1116         b = hash & (ls->ls_rsbtbl_size - 1);
1117
1118         spin_lock(&ls->ls_rsbtbl[b].lock);
1119         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1120         if (!error)
1121                 goto out_dump;
1122
1123         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1124         if (error)
1125                 goto out;
1126  out_dump:
1127         dlm_dump_rsb(r);
1128  out:
1129         spin_unlock(&ls->ls_rsbtbl[b].lock);
1130 }
1131
1132 static void toss_rsb(struct kref *kref)
1133 {
1134         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1135         struct dlm_ls *ls = r->res_ls;
1136
1137         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1138         kref_init(&r->res_ref);
1139         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1140         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1141         r->res_toss_time = jiffies;
1142         set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
1143         if (r->res_lvbptr) {
1144                 dlm_free_lvb(r->res_lvbptr);
1145                 r->res_lvbptr = NULL;
1146         }
1147 }
1148
1149 /* See comment for unhold_lkb */
1150
1151 static void unhold_rsb(struct dlm_rsb *r)
1152 {
1153         int rv;
1154         rv = kref_put(&r->res_ref, toss_rsb);
1155         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1156 }
1157
1158 static void kill_rsb(struct kref *kref)
1159 {
1160         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1161
1162         /* All work is done after the return from kref_put() so we
1163            can release the write_lock before the remove and free. */
1164
1165         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1166         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1167         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1168         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1169         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1170         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1171 }
1172
1173 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1174    The rsb must exist as long as any lkb's for it do. */
1175
1176 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1177 {
1178         hold_rsb(r);
1179         lkb->lkb_resource = r;
1180 }
1181
1182 static void detach_lkb(struct dlm_lkb *lkb)
1183 {
1184         if (lkb->lkb_resource) {
1185                 put_rsb(lkb->lkb_resource);
1186                 lkb->lkb_resource = NULL;
1187         }
1188 }
1189
1190 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1191                        int start, int end)
1192 {
1193         struct dlm_lkb *lkb;
1194         int rv;
1195
1196         lkb = dlm_allocate_lkb(ls);
1197         if (!lkb)
1198                 return -ENOMEM;
1199
1200         lkb->lkb_last_bast_mode = -1;
1201         lkb->lkb_nodeid = -1;
1202         lkb->lkb_grmode = DLM_LOCK_IV;
1203         kref_init(&lkb->lkb_ref);
1204         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1205         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1206         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1207         INIT_LIST_HEAD(&lkb->lkb_callbacks);
1208         spin_lock_init(&lkb->lkb_cb_lock);
1209         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1210
1211         idr_preload(GFP_NOFS);
1212         spin_lock(&ls->ls_lkbidr_spin);
1213         rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1214         if (rv >= 0)
1215                 lkb->lkb_id = rv;
1216         spin_unlock(&ls->ls_lkbidr_spin);
1217         idr_preload_end();
1218
1219         if (rv < 0) {
1220                 log_error(ls, "create_lkb idr error %d", rv);
1221                 dlm_free_lkb(lkb);
1222                 return rv;
1223         }
1224
1225         *lkb_ret = lkb;
1226         return 0;
1227 }
1228
1229 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1230 {
1231         return _create_lkb(ls, lkb_ret, 1, 0);
1232 }
1233
1234 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1235 {
1236         struct dlm_lkb *lkb;
1237
1238         spin_lock(&ls->ls_lkbidr_spin);
1239         lkb = idr_find(&ls->ls_lkbidr, lkid);
1240         if (lkb)
1241                 kref_get(&lkb->lkb_ref);
1242         spin_unlock(&ls->ls_lkbidr_spin);
1243
1244         *lkb_ret = lkb;
1245         return lkb ? 0 : -ENOENT;
1246 }
1247
1248 static void kill_lkb(struct kref *kref)
1249 {
1250         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1251
1252         /* All work is done after the return from kref_put() so we
1253            can release the write_lock before the detach_lkb */
1254
1255         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1256 }
1257
1258 /* __put_lkb() is used when an lkb may not have an rsb attached to
1259    it so we need to provide the lockspace explicitly */
1260
1261 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1262 {
1263         uint32_t lkid = lkb->lkb_id;
1264         int rv;
1265
1266         rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1267                            &ls->ls_lkbidr_spin);
1268         if (rv) {
1269                 idr_remove(&ls->ls_lkbidr, lkid);
1270                 spin_unlock(&ls->ls_lkbidr_spin);
1271
1272                 detach_lkb(lkb);
1273
1274                 /* for local/process lkbs, lvbptr points to caller's lksb */
1275                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1276                         dlm_free_lvb(lkb->lkb_lvbptr);
1277                 dlm_free_lkb(lkb);
1278         }
1279
1280         return rv;
1281 }
1282
1283 int dlm_put_lkb(struct dlm_lkb *lkb)
1284 {
1285         struct dlm_ls *ls;
1286
1287         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1288         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1289
1290         ls = lkb->lkb_resource->res_ls;
1291         return __put_lkb(ls, lkb);
1292 }
1293
1294 /* This is only called to add a reference when the code already holds
1295    a valid reference to the lkb, so there's no need for locking. */
1296
1297 static inline void hold_lkb(struct dlm_lkb *lkb)
1298 {
1299         kref_get(&lkb->lkb_ref);
1300 }
1301
1302 static void unhold_lkb_assert(struct kref *kref)
1303 {
1304         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1305
1306         DLM_ASSERT(false, dlm_print_lkb(lkb););
1307 }
1308
1309 /* This is called when we need to remove a reference and are certain
1310    it's not the last ref.  e.g. del_lkb is always called between a
1311    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1312    put_lkb would work fine, but would involve unnecessary locking */
1313
1314 static inline void unhold_lkb(struct dlm_lkb *lkb)
1315 {
1316         kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1317 }
1318
1319 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1320                             int mode)
1321 {
1322         struct dlm_lkb *lkb = NULL, *iter;
1323
1324         list_for_each_entry(iter, head, lkb_statequeue)
1325                 if (iter->lkb_rqmode < mode) {
1326                         lkb = iter;
1327                         list_add_tail(new, &iter->lkb_statequeue);
1328                         break;
1329                 }
1330
1331         if (!lkb)
1332                 list_add_tail(new, head);
1333 }
1334
1335 /* add/remove lkb to rsb's grant/convert/wait queue */
1336
1337 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1338 {
1339         kref_get(&lkb->lkb_ref);
1340
1341         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1342
1343         lkb->lkb_timestamp = ktime_get();
1344
1345         lkb->lkb_status = status;
1346
1347         switch (status) {
1348         case DLM_LKSTS_WAITING:
1349                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1350                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1351                 else
1352                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1353                 break;
1354         case DLM_LKSTS_GRANTED:
1355                 /* convention says granted locks kept in order of grmode */
1356                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1357                                 lkb->lkb_grmode);
1358                 break;
1359         case DLM_LKSTS_CONVERT:
1360                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1362                 else
1363                         list_add_tail(&lkb->lkb_statequeue,
1364                                       &r->res_convertqueue);
1365                 break;
1366         default:
1367                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1368         }
1369 }
1370
1371 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1372 {
1373         lkb->lkb_status = 0;
1374         list_del(&lkb->lkb_statequeue);
1375         unhold_lkb(lkb);
1376 }
1377
1378 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1379 {
1380         hold_lkb(lkb);
1381         del_lkb(r, lkb);
1382         add_lkb(r, lkb, sts);
1383         unhold_lkb(lkb);
1384 }
1385
1386 static int msg_reply_type(int mstype)
1387 {
1388         switch (mstype) {
1389         case DLM_MSG_REQUEST:
1390                 return DLM_MSG_REQUEST_REPLY;
1391         case DLM_MSG_CONVERT:
1392                 return DLM_MSG_CONVERT_REPLY;
1393         case DLM_MSG_UNLOCK:
1394                 return DLM_MSG_UNLOCK_REPLY;
1395         case DLM_MSG_CANCEL:
1396                 return DLM_MSG_CANCEL_REPLY;
1397         case DLM_MSG_LOOKUP:
1398                 return DLM_MSG_LOOKUP_REPLY;
1399         }
1400         return -1;
1401 }
1402
1403 /* add/remove lkb from global waiters list of lkb's waiting for
1404    a reply from a remote node */
1405
1406 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1407 {
1408         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1409         int error = 0;
1410
1411         mutex_lock(&ls->ls_waiters_mutex);
1412
1413         if (is_overlap_unlock(lkb) ||
1414             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1415                 error = -EINVAL;
1416                 goto out;
1417         }
1418
1419         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1420                 switch (mstype) {
1421                 case DLM_MSG_UNLOCK:
1422                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1423                         break;
1424                 case DLM_MSG_CANCEL:
1425                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1426                         break;
1427                 default:
1428                         error = -EBUSY;
1429                         goto out;
1430                 }
1431                 lkb->lkb_wait_count++;
1432                 hold_lkb(lkb);
1433
1434                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1435                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1436                           lkb->lkb_wait_count, dlm_iflags_val(lkb));
1437                 goto out;
1438         }
1439
1440         DLM_ASSERT(!lkb->lkb_wait_count,
1441                    dlm_print_lkb(lkb);
1442                    printk("wait_count %d\n", lkb->lkb_wait_count););
1443
1444         lkb->lkb_wait_count++;
1445         lkb->lkb_wait_type = mstype;
1446         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1447         hold_lkb(lkb);
1448         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1449  out:
1450         if (error)
1451                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1452                           lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1453                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1454         mutex_unlock(&ls->ls_waiters_mutex);
1455         return error;
1456 }
1457
1458 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1459    list as part of process_requestqueue (e.g. a lookup that has an optimized
1460    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1461    set RESEND and dlm_recover_waiters_post() */
1462
1463 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1464                                 struct dlm_message *ms)
1465 {
1466         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1467         int overlap_done = 0;
1468
1469         if (mstype == DLM_MSG_UNLOCK_REPLY &&
1470             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1471                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1472                 overlap_done = 1;
1473                 goto out_del;
1474         }
1475
1476         if (mstype == DLM_MSG_CANCEL_REPLY &&
1477             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1478                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1479                 overlap_done = 1;
1480                 goto out_del;
1481         }
1482
1483         /* Cancel state was preemptively cleared by a successful convert,
1484            see next comment, nothing to do. */
1485
1486         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1487             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1488                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1489                           lkb->lkb_id, lkb->lkb_wait_type);
1490                 return -1;
1491         }
1492
1493         /* Remove for the convert reply, and premptively remove for the
1494            cancel reply.  A convert has been granted while there's still
1495            an outstanding cancel on it (the cancel is moot and the result
1496            in the cancel reply should be 0).  We preempt the cancel reply
1497            because the app gets the convert result and then can follow up
1498            with another op, like convert.  This subsequent op would see the
1499            lingering state of the cancel and fail with -EBUSY. */
1500
1501         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1502             (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1503             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1504                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1505                           lkb->lkb_id);
1506                 lkb->lkb_wait_type = 0;
1507                 lkb->lkb_wait_count--;
1508                 unhold_lkb(lkb);
1509                 goto out_del;
1510         }
1511
1512         /* N.B. type of reply may not always correspond to type of original
1513            msg due to lookup->request optimization, verify others? */
1514
1515         if (lkb->lkb_wait_type) {
1516                 lkb->lkb_wait_type = 0;
1517                 goto out_del;
1518         }
1519
1520         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1521                   lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1522                   lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1523         return -1;
1524
1525  out_del:
1526         /* the force-unlock/cancel has completed and we haven't recvd a reply
1527            to the op that was in progress prior to the unlock/cancel; we
1528            give up on any reply to the earlier op.  FIXME: not sure when/how
1529            this would happen */
1530
1531         if (overlap_done && lkb->lkb_wait_type) {
1532                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1533                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1534                 lkb->lkb_wait_count--;
1535                 unhold_lkb(lkb);
1536                 lkb->lkb_wait_type = 0;
1537         }
1538
1539         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1540
1541         clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1542         lkb->lkb_wait_count--;
1543         if (!lkb->lkb_wait_count)
1544                 list_del_init(&lkb->lkb_wait_reply);
1545         unhold_lkb(lkb);
1546         return 0;
1547 }
1548
1549 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1550 {
1551         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1552         int error;
1553
1554         mutex_lock(&ls->ls_waiters_mutex);
1555         error = _remove_from_waiters(lkb, mstype, NULL);
1556         mutex_unlock(&ls->ls_waiters_mutex);
1557         return error;
1558 }
1559
1560 /* Handles situations where we might be processing a "fake" or "local" reply in
1561    which we can't try to take waiters_mutex again. */
1562
1563 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms,
1564                                   bool local)
1565 {
1566         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1567         int error;
1568
1569         if (!local)
1570                 mutex_lock(&ls->ls_waiters_mutex);
1571         error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1572         if (!local)
1573                 mutex_unlock(&ls->ls_waiters_mutex);
1574         return error;
1575 }
1576
1577 static void shrink_bucket(struct dlm_ls *ls, int b)
1578 {
1579         struct rb_node *n, *next;
1580         struct dlm_rsb *r;
1581         char *name;
1582         int our_nodeid = dlm_our_nodeid();
1583         int remote_count = 0;
1584         int need_shrink = 0;
1585         int i, len, rv;
1586
1587         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1588
1589         spin_lock(&ls->ls_rsbtbl[b].lock);
1590
1591         if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
1592                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1593                 return;
1594         }
1595
1596         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1597                 next = rb_next(n);
1598                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1599
1600                 /* If we're the directory record for this rsb, and
1601                    we're not the master of it, then we need to wait
1602                    for the master node to send us a dir remove for
1603                    before removing the dir record. */
1604
1605                 if (!dlm_no_directory(ls) &&
1606                     (r->res_master_nodeid != our_nodeid) &&
1607                     (dlm_dir_nodeid(r) == our_nodeid)) {
1608                         continue;
1609                 }
1610
1611                 need_shrink = 1;
1612
1613                 if (!time_after_eq(jiffies, r->res_toss_time +
1614                                    dlm_config.ci_toss_secs * HZ)) {
1615                         continue;
1616                 }
1617
1618                 if (!dlm_no_directory(ls) &&
1619                     (r->res_master_nodeid == our_nodeid) &&
1620                     (dlm_dir_nodeid(r) != our_nodeid)) {
1621
1622                         /* We're the master of this rsb but we're not
1623                            the directory record, so we need to tell the
1624                            dir node to remove the dir record. */
1625
1626                         ls->ls_remove_lens[remote_count] = r->res_length;
1627                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1628                                DLM_RESNAME_MAXLEN);
1629                         remote_count++;
1630
1631                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1632                                 break;
1633                         continue;
1634                 }
1635
1636                 if (!kref_put(&r->res_ref, kill_rsb)) {
1637                         log_error(ls, "tossed rsb in use %s", r->res_name);
1638                         continue;
1639                 }
1640
1641                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1642                 dlm_free_rsb(r);
1643         }
1644
1645         if (need_shrink)
1646                 set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1647         else
1648                 clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1649         spin_unlock(&ls->ls_rsbtbl[b].lock);
1650
1651         /*
1652          * While searching for rsb's to free, we found some that require
1653          * remote removal.  We leave them in place and find them again here
1654          * so there is a very small gap between removing them from the toss
1655          * list and sending the removal.  Keeping this gap small is
1656          * important to keep us (the master node) from being out of sync
1657          * with the remote dir node for very long.
1658          */
1659
1660         for (i = 0; i < remote_count; i++) {
1661                 name = ls->ls_remove_names[i];
1662                 len = ls->ls_remove_lens[i];
1663
1664                 spin_lock(&ls->ls_rsbtbl[b].lock);
1665                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1666                 if (rv) {
1667                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1668                         log_debug(ls, "remove_name not toss %s", name);
1669                         continue;
1670                 }
1671
1672                 if (r->res_master_nodeid != our_nodeid) {
1673                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1674                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1675                                   r->res_master_nodeid, r->res_dir_nodeid,
1676                                   our_nodeid, name);
1677                         continue;
1678                 }
1679
1680                 if (r->res_dir_nodeid == our_nodeid) {
1681                         /* should never happen */
1682                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1683                         log_error(ls, "remove_name dir %d master %d our %d %s",
1684                                   r->res_dir_nodeid, r->res_master_nodeid,
1685                                   our_nodeid, name);
1686                         continue;
1687                 }
1688
1689                 if (!time_after_eq(jiffies, r->res_toss_time +
1690                                    dlm_config.ci_toss_secs * HZ)) {
1691                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1692                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1693                                   r->res_toss_time, jiffies, name);
1694                         continue;
1695                 }
1696
1697                 if (!kref_put(&r->res_ref, kill_rsb)) {
1698                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1699                         log_error(ls, "remove_name in use %s", name);
1700                         continue;
1701                 }
1702
1703                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1704                 send_remove(r);
1705                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1706
1707                 dlm_free_rsb(r);
1708         }
1709 }
1710
1711 void dlm_scan_rsbs(struct dlm_ls *ls)
1712 {
1713         int i;
1714
1715         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1716                 shrink_bucket(ls, i);
1717                 if (dlm_locking_stopped(ls))
1718                         break;
1719                 cond_resched();
1720         }
1721 }
1722
1723 /* lkb is master or local copy */
1724
1725 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1726 {
1727         int b, len = r->res_ls->ls_lvblen;
1728
1729         /* b=1 lvb returned to caller
1730            b=0 lvb written to rsb or invalidated
1731            b=-1 do nothing */
1732
1733         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1734
1735         if (b == 1) {
1736                 if (!lkb->lkb_lvbptr)
1737                         return;
1738
1739                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1740                         return;
1741
1742                 if (!r->res_lvbptr)
1743                         return;
1744
1745                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1746                 lkb->lkb_lvbseq = r->res_lvbseq;
1747
1748         } else if (b == 0) {
1749                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1750                         rsb_set_flag(r, RSB_VALNOTVALID);
1751                         return;
1752                 }
1753
1754                 if (!lkb->lkb_lvbptr)
1755                         return;
1756
1757                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1758                         return;
1759
1760                 if (!r->res_lvbptr)
1761                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1762
1763                 if (!r->res_lvbptr)
1764                         return;
1765
1766                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1767                 r->res_lvbseq++;
1768                 lkb->lkb_lvbseq = r->res_lvbseq;
1769                 rsb_clear_flag(r, RSB_VALNOTVALID);
1770         }
1771
1772         if (rsb_flag(r, RSB_VALNOTVALID))
1773                 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1774 }
1775
1776 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1777 {
1778         if (lkb->lkb_grmode < DLM_LOCK_PW)
1779                 return;
1780
1781         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1782                 rsb_set_flag(r, RSB_VALNOTVALID);
1783                 return;
1784         }
1785
1786         if (!lkb->lkb_lvbptr)
1787                 return;
1788
1789         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1790                 return;
1791
1792         if (!r->res_lvbptr)
1793                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1794
1795         if (!r->res_lvbptr)
1796                 return;
1797
1798         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1799         r->res_lvbseq++;
1800         rsb_clear_flag(r, RSB_VALNOTVALID);
1801 }
1802
1803 /* lkb is process copy (pc) */
1804
1805 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1806                             struct dlm_message *ms)
1807 {
1808         int b;
1809
1810         if (!lkb->lkb_lvbptr)
1811                 return;
1812
1813         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1814                 return;
1815
1816         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1817         if (b == 1) {
1818                 int len = receive_extralen(ms);
1819                 if (len > r->res_ls->ls_lvblen)
1820                         len = r->res_ls->ls_lvblen;
1821                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1822                 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1823         }
1824 }
1825
1826 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1827    remove_lock -- used for unlock, removes lkb from granted
1828    revert_lock -- used for cancel, moves lkb from convert to granted
1829    grant_lock  -- used for request and convert, adds lkb to granted or
1830                   moves lkb from convert or waiting to granted
1831
1832    Each of these is used for master or local copy lkb's.  There is
1833    also a _pc() variation used to make the corresponding change on
1834    a process copy (pc) lkb. */
1835
1836 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1837 {
1838         del_lkb(r, lkb);
1839         lkb->lkb_grmode = DLM_LOCK_IV;
1840         /* this unhold undoes the original ref from create_lkb()
1841            so this leads to the lkb being freed */
1842         unhold_lkb(lkb);
1843 }
1844
1845 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1846 {
1847         set_lvb_unlock(r, lkb);
1848         _remove_lock(r, lkb);
1849 }
1850
1851 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1852 {
1853         _remove_lock(r, lkb);
1854 }
1855
1856 /* returns: 0 did nothing
1857             1 moved lock to granted
1858            -1 removed lock */
1859
1860 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1861 {
1862         int rv = 0;
1863
1864         lkb->lkb_rqmode = DLM_LOCK_IV;
1865
1866         switch (lkb->lkb_status) {
1867         case DLM_LKSTS_GRANTED:
1868                 break;
1869         case DLM_LKSTS_CONVERT:
1870                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1871                 rv = 1;
1872                 break;
1873         case DLM_LKSTS_WAITING:
1874                 del_lkb(r, lkb);
1875                 lkb->lkb_grmode = DLM_LOCK_IV;
1876                 /* this unhold undoes the original ref from create_lkb()
1877                    so this leads to the lkb being freed */
1878                 unhold_lkb(lkb);
1879                 rv = -1;
1880                 break;
1881         default:
1882                 log_print("invalid status for revert %d", lkb->lkb_status);
1883         }
1884         return rv;
1885 }
1886
1887 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1888 {
1889         return revert_lock(r, lkb);
1890 }
1891
1892 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1893 {
1894         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1895                 lkb->lkb_grmode = lkb->lkb_rqmode;
1896                 if (lkb->lkb_status)
1897                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1898                 else
1899                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1900         }
1901
1902         lkb->lkb_rqmode = DLM_LOCK_IV;
1903         lkb->lkb_highbast = 0;
1904 }
1905
1906 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1907 {
1908         set_lvb_lock(r, lkb);
1909         _grant_lock(r, lkb);
1910 }
1911
1912 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1913                           struct dlm_message *ms)
1914 {
1915         set_lvb_lock_pc(r, lkb, ms);
1916         _grant_lock(r, lkb);
1917 }
1918
1919 /* called by grant_pending_locks() which means an async grant message must
1920    be sent to the requesting node in addition to granting the lock if the
1921    lkb belongs to a remote node. */
1922
1923 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1924 {
1925         grant_lock(r, lkb);
1926         if (is_master_copy(lkb))
1927                 send_grant(r, lkb);
1928         else
1929                 queue_cast(r, lkb, 0);
1930 }
1931
1932 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1933    change the granted/requested modes.  We're munging things accordingly in
1934    the process copy.
1935    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1936    conversion deadlock
1937    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1938    compatible with other granted locks */
1939
1940 static void munge_demoted(struct dlm_lkb *lkb)
1941 {
1942         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1943                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1944                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1945                 return;
1946         }
1947
1948         lkb->lkb_grmode = DLM_LOCK_NL;
1949 }
1950
1951 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1952 {
1953         if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
1954             ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
1955                 log_print("munge_altmode %x invalid reply type %d",
1956                           lkb->lkb_id, le32_to_cpu(ms->m_type));
1957                 return;
1958         }
1959
1960         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1961                 lkb->lkb_rqmode = DLM_LOCK_PR;
1962         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1963                 lkb->lkb_rqmode = DLM_LOCK_CW;
1964         else {
1965                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1966                 dlm_print_lkb(lkb);
1967         }
1968 }
1969
1970 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1971 {
1972         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1973                                            lkb_statequeue);
1974         if (lkb->lkb_id == first->lkb_id)
1975                 return 1;
1976
1977         return 0;
1978 }
1979
1980 /* Check if the given lkb conflicts with another lkb on the queue. */
1981
1982 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1983 {
1984         struct dlm_lkb *this;
1985
1986         list_for_each_entry(this, head, lkb_statequeue) {
1987                 if (this == lkb)
1988                         continue;
1989                 if (!modes_compat(this, lkb))
1990                         return 1;
1991         }
1992         return 0;
1993 }
1994
1995 /*
1996  * "A conversion deadlock arises with a pair of lock requests in the converting
1997  * queue for one resource.  The granted mode of each lock blocks the requested
1998  * mode of the other lock."
1999  *
2000  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2001  * convert queue from being granted, then deadlk/demote lkb.
2002  *
2003  * Example:
2004  * Granted Queue: empty
2005  * Convert Queue: NL->EX (first lock)
2006  *                PR->EX (second lock)
2007  *
2008  * The first lock can't be granted because of the granted mode of the second
2009  * lock and the second lock can't be granted because it's not first in the
2010  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2011  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2012  * flag set and return DEMOTED in the lksb flags.
2013  *
2014  * Originally, this function detected conv-deadlk in a more limited scope:
2015  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2016  * - if lkb1 was the first entry in the queue (not just earlier), and was
2017  *   blocked by the granted mode of lkb2, and there was nothing on the
2018  *   granted queue preventing lkb1 from being granted immediately, i.e.
2019  *   lkb2 was the only thing preventing lkb1 from being granted.
2020  *
2021  * That second condition meant we'd only say there was conv-deadlk if
2022  * resolving it (by demotion) would lead to the first lock on the convert
2023  * queue being granted right away.  It allowed conversion deadlocks to exist
2024  * between locks on the convert queue while they couldn't be granted anyway.
2025  *
2026  * Now, we detect and take action on conversion deadlocks immediately when
2027  * they're created, even if they may not be immediately consequential.  If
2028  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2029  * mode that would prevent lkb1's conversion from being granted, we do a
2030  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2031  * I think this means that the lkb_is_ahead condition below should always
2032  * be zero, i.e. there will never be conv-deadlk between two locks that are
2033  * both already on the convert queue.
2034  */
2035
2036 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2037 {
2038         struct dlm_lkb *lkb1;
2039         int lkb_is_ahead = 0;
2040
2041         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2042                 if (lkb1 == lkb2) {
2043                         lkb_is_ahead = 1;
2044                         continue;
2045                 }
2046
2047                 if (!lkb_is_ahead) {
2048                         if (!modes_compat(lkb2, lkb1))
2049                                 return 1;
2050                 } else {
2051                         if (!modes_compat(lkb2, lkb1) &&
2052                             !modes_compat(lkb1, lkb2))
2053                                 return 1;
2054                 }
2055         }
2056         return 0;
2057 }
2058
2059 /*
2060  * Return 1 if the lock can be granted, 0 otherwise.
2061  * Also detect and resolve conversion deadlocks.
2062  *
2063  * lkb is the lock to be granted
2064  *
2065  * now is 1 if the function is being called in the context of the
2066  * immediate request, it is 0 if called later, after the lock has been
2067  * queued.
2068  *
2069  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2070  * after recovery.
2071  *
2072  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2073  */
2074
2075 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2076                            int recover)
2077 {
2078         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2079
2080         /*
2081          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2082          * a new request for a NL mode lock being blocked.
2083          *
2084          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2085          * request, then it would be granted.  In essence, the use of this flag
2086          * tells the Lock Manager to expedite theis request by not considering
2087          * what may be in the CONVERTING or WAITING queues...  As of this
2088          * writing, the EXPEDITE flag can be used only with new requests for NL
2089          * mode locks.  This flag is not valid for conversion requests.
2090          *
2091          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2092          * conversion or used with a non-NL requested mode.  We also know an
2093          * EXPEDITE request is always granted immediately, so now must always
2094          * be 1.  The full condition to grant an expedite request: (now &&
2095          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2096          * therefore be shortened to just checking the flag.
2097          */
2098
2099         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2100                 return 1;
2101
2102         /*
2103          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2104          * added to the remaining conditions.
2105          */
2106
2107         if (queue_conflict(&r->res_grantqueue, lkb))
2108                 return 0;
2109
2110         /*
2111          * 6-3: By default, a conversion request is immediately granted if the
2112          * requested mode is compatible with the modes of all other granted
2113          * locks
2114          */
2115
2116         if (queue_conflict(&r->res_convertqueue, lkb))
2117                 return 0;
2118
2119         /*
2120          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2121          * locks for a recovered rsb, on which lkb's have been rebuilt.
2122          * The lkb's may have been rebuilt on the queues in a different
2123          * order than they were in on the previous master.  So, granting
2124          * queued conversions in order after recovery doesn't make sense
2125          * since the order hasn't been preserved anyway.  The new order
2126          * could also have created a new "in place" conversion deadlock.
2127          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2128          * After recovery, there would be no granted locks, and possibly
2129          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2130          * recovery, grant conversions without considering order.
2131          */
2132
2133         if (conv && recover)
2134                 return 1;
2135
2136         /*
2137          * 6-5: But the default algorithm for deciding whether to grant or
2138          * queue conversion requests does not by itself guarantee that such
2139          * requests are serviced on a "first come first serve" basis.  This, in
2140          * turn, can lead to a phenomenon known as "indefinate postponement".
2141          *
2142          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2143          * the system service employed to request a lock conversion.  This flag
2144          * forces certain conversion requests to be queued, even if they are
2145          * compatible with the granted modes of other locks on the same
2146          * resource.  Thus, the use of this flag results in conversion requests
2147          * being ordered on a "first come first servce" basis.
2148          *
2149          * DCT: This condition is all about new conversions being able to occur
2150          * "in place" while the lock remains on the granted queue (assuming
2151          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2152          * doesn't _have_ to go onto the convert queue where it's processed in
2153          * order.  The "now" variable is necessary to distinguish converts
2154          * being received and processed for the first time now, because once a
2155          * convert is moved to the conversion queue the condition below applies
2156          * requiring fifo granting.
2157          */
2158
2159         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2160                 return 1;
2161
2162         /*
2163          * Even if the convert is compat with all granted locks,
2164          * QUECVT forces it behind other locks on the convert queue.
2165          */
2166
2167         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2168                 if (list_empty(&r->res_convertqueue))
2169                         return 1;
2170                 else
2171                         return 0;
2172         }
2173
2174         /*
2175          * The NOORDER flag is set to avoid the standard vms rules on grant
2176          * order.
2177          */
2178
2179         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2180                 return 1;
2181
2182         /*
2183          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2184          * granted until all other conversion requests ahead of it are granted
2185          * and/or canceled.
2186          */
2187
2188         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2189                 return 1;
2190
2191         /*
2192          * 6-4: By default, a new request is immediately granted only if all
2193          * three of the following conditions are satisfied when the request is
2194          * issued:
2195          * - The queue of ungranted conversion requests for the resource is
2196          *   empty.
2197          * - The queue of ungranted new requests for the resource is empty.
2198          * - The mode of the new request is compatible with the most
2199          *   restrictive mode of all granted locks on the resource.
2200          */
2201
2202         if (now && !conv && list_empty(&r->res_convertqueue) &&
2203             list_empty(&r->res_waitqueue))
2204                 return 1;
2205
2206         /*
2207          * 6-4: Once a lock request is in the queue of ungranted new requests,
2208          * it cannot be granted until the queue of ungranted conversion
2209          * requests is empty, all ungranted new requests ahead of it are
2210          * granted and/or canceled, and it is compatible with the granted mode
2211          * of the most restrictive lock granted on the resource.
2212          */
2213
2214         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2215             first_in_list(lkb, &r->res_waitqueue))
2216                 return 1;
2217
2218         return 0;
2219 }
2220
2221 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2222                           int recover, int *err)
2223 {
2224         int rv;
2225         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2226         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2227
2228         if (err)
2229                 *err = 0;
2230
2231         rv = _can_be_granted(r, lkb, now, recover);
2232         if (rv)
2233                 goto out;
2234
2235         /*
2236          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2237          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2238          * cancels one of the locks.
2239          */
2240
2241         if (is_convert && can_be_queued(lkb) &&
2242             conversion_deadlock_detect(r, lkb)) {
2243                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2244                         lkb->lkb_grmode = DLM_LOCK_NL;
2245                         set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2246                 } else if (err) {
2247                         *err = -EDEADLK;
2248                 } else {
2249                         log_print("can_be_granted deadlock %x now %d",
2250                                   lkb->lkb_id, now);
2251                         dlm_dump_rsb(r);
2252                 }
2253                 goto out;
2254         }
2255
2256         /*
2257          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2258          * to grant a request in a mode other than the normal rqmode.  It's a
2259          * simple way to provide a big optimization to applications that can
2260          * use them.
2261          */
2262
2263         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2264                 alt = DLM_LOCK_PR;
2265         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2266                 alt = DLM_LOCK_CW;
2267
2268         if (alt) {
2269                 lkb->lkb_rqmode = alt;
2270                 rv = _can_be_granted(r, lkb, now, 0);
2271                 if (rv)
2272                         set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2273                 else
2274                         lkb->lkb_rqmode = rqmode;
2275         }
2276  out:
2277         return rv;
2278 }
2279
2280 /* Returns the highest requested mode of all blocked conversions; sets
2281    cw if there's a blocked conversion to DLM_LOCK_CW. */
2282
2283 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2284                                  unsigned int *count)
2285 {
2286         struct dlm_lkb *lkb, *s;
2287         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2288         int hi, demoted, quit, grant_restart, demote_restart;
2289         int deadlk;
2290
2291         quit = 0;
2292  restart:
2293         grant_restart = 0;
2294         demote_restart = 0;
2295         hi = DLM_LOCK_IV;
2296
2297         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2298                 demoted = is_demoted(lkb);
2299                 deadlk = 0;
2300
2301                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2302                         grant_lock_pending(r, lkb);
2303                         grant_restart = 1;
2304                         if (count)
2305                                 (*count)++;
2306                         continue;
2307                 }
2308
2309                 if (!demoted && is_demoted(lkb)) {
2310                         log_print("WARN: pending demoted %x node %d %s",
2311                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2312                         demote_restart = 1;
2313                         continue;
2314                 }
2315
2316                 if (deadlk) {
2317                         /*
2318                          * If DLM_LKB_NODLKWT flag is set and conversion
2319                          * deadlock is detected, we request blocking AST and
2320                          * down (or cancel) conversion.
2321                          */
2322                         if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2323                                 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2324                                         queue_bast(r, lkb, lkb->lkb_rqmode);
2325                                         lkb->lkb_highbast = lkb->lkb_rqmode;
2326                                 }
2327                         } else {
2328                                 log_print("WARN: pending deadlock %x node %d %s",
2329                                           lkb->lkb_id, lkb->lkb_nodeid,
2330                                           r->res_name);
2331                                 dlm_dump_rsb(r);
2332                         }
2333                         continue;
2334                 }
2335
2336                 hi = max_t(int, lkb->lkb_rqmode, hi);
2337
2338                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2339                         *cw = 1;
2340         }
2341
2342         if (grant_restart)
2343                 goto restart;
2344         if (demote_restart && !quit) {
2345                 quit = 1;
2346                 goto restart;
2347         }
2348
2349         return max_t(int, high, hi);
2350 }
2351
2352 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2353                               unsigned int *count)
2354 {
2355         struct dlm_lkb *lkb, *s;
2356
2357         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2358                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2359                         grant_lock_pending(r, lkb);
2360                         if (count)
2361                                 (*count)++;
2362                 } else {
2363                         high = max_t(int, lkb->lkb_rqmode, high);
2364                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2365                                 *cw = 1;
2366                 }
2367         }
2368
2369         return high;
2370 }
2371
2372 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2373    on either the convert or waiting queue.
2374    high is the largest rqmode of all locks blocked on the convert or
2375    waiting queue. */
2376
2377 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2378 {
2379         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2380                 if (gr->lkb_highbast < DLM_LOCK_EX)
2381                         return 1;
2382                 return 0;
2383         }
2384
2385         if (gr->lkb_highbast < high &&
2386             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2387                 return 1;
2388         return 0;
2389 }
2390
2391 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2392 {
2393         struct dlm_lkb *lkb, *s;
2394         int high = DLM_LOCK_IV;
2395         int cw = 0;
2396
2397         if (!is_master(r)) {
2398                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2399                 dlm_dump_rsb(r);
2400                 return;
2401         }
2402
2403         high = grant_pending_convert(r, high, &cw, count);
2404         high = grant_pending_wait(r, high, &cw, count);
2405
2406         if (high == DLM_LOCK_IV)
2407                 return;
2408
2409         /*
2410          * If there are locks left on the wait/convert queue then send blocking
2411          * ASTs to granted locks based on the largest requested mode (high)
2412          * found above.
2413          */
2414
2415         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2416                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2417                         if (cw && high == DLM_LOCK_PR &&
2418                             lkb->lkb_grmode == DLM_LOCK_PR)
2419                                 queue_bast(r, lkb, DLM_LOCK_CW);
2420                         else
2421                                 queue_bast(r, lkb, high);
2422                         lkb->lkb_highbast = high;
2423                 }
2424         }
2425 }
2426
2427 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2428 {
2429         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2430             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2431                 if (gr->lkb_highbast < DLM_LOCK_EX)
2432                         return 1;
2433                 return 0;
2434         }
2435
2436         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2437                 return 1;
2438         return 0;
2439 }
2440
2441 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2442                             struct dlm_lkb *lkb)
2443 {
2444         struct dlm_lkb *gr;
2445
2446         list_for_each_entry(gr, head, lkb_statequeue) {
2447                 /* skip self when sending basts to convertqueue */
2448                 if (gr == lkb)
2449                         continue;
2450                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2451                         queue_bast(r, gr, lkb->lkb_rqmode);
2452                         gr->lkb_highbast = lkb->lkb_rqmode;
2453                 }
2454         }
2455 }
2456
2457 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2458 {
2459         send_bast_queue(r, &r->res_grantqueue, lkb);
2460 }
2461
2462 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2463 {
2464         send_bast_queue(r, &r->res_grantqueue, lkb);
2465         send_bast_queue(r, &r->res_convertqueue, lkb);
2466 }
2467
2468 /* set_master(r, lkb) -- set the master nodeid of a resource
2469
2470    The purpose of this function is to set the nodeid field in the given
2471    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2472    known, it can just be copied to the lkb and the function will return
2473    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2474    before it can be copied to the lkb.
2475
2476    When the rsb nodeid is being looked up remotely, the initial lkb
2477    causing the lookup is kept on the ls_waiters list waiting for the
2478    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2479    on the rsb's res_lookup list until the master is verified.
2480
2481    Return values:
2482    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2483    1: the rsb master is not available and the lkb has been placed on
2484       a wait queue
2485 */
2486
2487 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2488 {
2489         int our_nodeid = dlm_our_nodeid();
2490
2491         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2492                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2493                 r->res_first_lkid = lkb->lkb_id;
2494                 lkb->lkb_nodeid = r->res_nodeid;
2495                 return 0;
2496         }
2497
2498         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2499                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2500                 return 1;
2501         }
2502
2503         if (r->res_master_nodeid == our_nodeid) {
2504                 lkb->lkb_nodeid = 0;
2505                 return 0;
2506         }
2507
2508         if (r->res_master_nodeid) {
2509                 lkb->lkb_nodeid = r->res_master_nodeid;
2510                 return 0;
2511         }
2512
2513         if (dlm_dir_nodeid(r) == our_nodeid) {
2514                 /* This is a somewhat unusual case; find_rsb will usually
2515                    have set res_master_nodeid when dir nodeid is local, but
2516                    there are cases where we become the dir node after we've
2517                    past find_rsb and go through _request_lock again.
2518                    confirm_master() or process_lookup_list() needs to be
2519                    called after this. */
2520                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2521                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2522                           r->res_name);
2523                 r->res_master_nodeid = our_nodeid;
2524                 r->res_nodeid = 0;
2525                 lkb->lkb_nodeid = 0;
2526                 return 0;
2527         }
2528
2529         r->res_first_lkid = lkb->lkb_id;
2530         send_lookup(r, lkb);
2531         return 1;
2532 }
2533
2534 static void process_lookup_list(struct dlm_rsb *r)
2535 {
2536         struct dlm_lkb *lkb, *safe;
2537
2538         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2539                 list_del_init(&lkb->lkb_rsb_lookup);
2540                 _request_lock(r, lkb);
2541                 schedule();
2542         }
2543 }
2544
2545 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2546
2547 static void confirm_master(struct dlm_rsb *r, int error)
2548 {
2549         struct dlm_lkb *lkb;
2550
2551         if (!r->res_first_lkid)
2552                 return;
2553
2554         switch (error) {
2555         case 0:
2556         case -EINPROGRESS:
2557                 r->res_first_lkid = 0;
2558                 process_lookup_list(r);
2559                 break;
2560
2561         case -EAGAIN:
2562         case -EBADR:
2563         case -ENOTBLK:
2564                 /* the remote request failed and won't be retried (it was
2565                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2566                    lkb the first_lkid */
2567
2568                 r->res_first_lkid = 0;
2569
2570                 if (!list_empty(&r->res_lookup)) {
2571                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2572                                          lkb_rsb_lookup);
2573                         list_del_init(&lkb->lkb_rsb_lookup);
2574                         r->res_first_lkid = lkb->lkb_id;
2575                         _request_lock(r, lkb);
2576                 }
2577                 break;
2578
2579         default:
2580                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2581         }
2582 }
2583
2584 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2585                          int namelen, void (*ast)(void *astparam),
2586                          void *astparam,
2587                          void (*bast)(void *astparam, int mode),
2588                          struct dlm_args *args)
2589 {
2590         int rv = -EINVAL;
2591
2592         /* check for invalid arg usage */
2593
2594         if (mode < 0 || mode > DLM_LOCK_EX)
2595                 goto out;
2596
2597         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2598                 goto out;
2599
2600         if (flags & DLM_LKF_CANCEL)
2601                 goto out;
2602
2603         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2604                 goto out;
2605
2606         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2607                 goto out;
2608
2609         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2610                 goto out;
2611
2612         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2613                 goto out;
2614
2615         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2616                 goto out;
2617
2618         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2619                 goto out;
2620
2621         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2622                 goto out;
2623
2624         if (!ast || !lksb)
2625                 goto out;
2626
2627         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2628                 goto out;
2629
2630         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2631                 goto out;
2632
2633         /* these args will be copied to the lkb in validate_lock_args,
2634            it cannot be done now because when converting locks, fields in
2635            an active lkb cannot be modified before locking the rsb */
2636
2637         args->flags = flags;
2638         args->astfn = ast;
2639         args->astparam = astparam;
2640         args->bastfn = bast;
2641         args->mode = mode;
2642         args->lksb = lksb;
2643         rv = 0;
2644  out:
2645         return rv;
2646 }
2647
2648 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2649 {
2650         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2651                       DLM_LKF_FORCEUNLOCK))
2652                 return -EINVAL;
2653
2654         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2655                 return -EINVAL;
2656
2657         args->flags = flags;
2658         args->astparam = astarg;
2659         return 0;
2660 }
2661
2662 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2663                               struct dlm_args *args)
2664 {
2665         int rv = -EBUSY;
2666
2667         if (args->flags & DLM_LKF_CONVERT) {
2668                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2669                         goto out;
2670
2671                 /* lock not allowed if there's any op in progress */
2672                 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2673                         goto out;
2674
2675                 if (is_overlap(lkb))
2676                         goto out;
2677
2678                 rv = -EINVAL;
2679                 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2680                         goto out;
2681
2682                 if (args->flags & DLM_LKF_QUECVT &&
2683                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2684                         goto out;
2685         }
2686
2687         lkb->lkb_exflags = args->flags;
2688         dlm_set_sbflags_val(lkb, 0);
2689         lkb->lkb_astfn = args->astfn;
2690         lkb->lkb_astparam = args->astparam;
2691         lkb->lkb_bastfn = args->bastfn;
2692         lkb->lkb_rqmode = args->mode;
2693         lkb->lkb_lksb = args->lksb;
2694         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2695         lkb->lkb_ownpid = (int) current->pid;
2696         rv = 0;
2697  out:
2698         switch (rv) {
2699         case 0:
2700                 break;
2701         case -EINVAL:
2702                 /* annoy the user because dlm usage is wrong */
2703                 WARN_ON(1);
2704                 log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2705                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2706                           lkb->lkb_status, lkb->lkb_wait_type,
2707                           lkb->lkb_resource->res_name);
2708                 break;
2709         default:
2710                 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2711                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2712                           lkb->lkb_status, lkb->lkb_wait_type,
2713                           lkb->lkb_resource->res_name);
2714                 break;
2715         }
2716
2717         return rv;
2718 }
2719
2720 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2721    for success */
2722
2723 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2724    because there may be a lookup in progress and it's valid to do
2725    cancel/unlockf on it */
2726
2727 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2728 {
2729         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2730         int rv = -EBUSY;
2731
2732         /* normal unlock not allowed if there's any op in progress */
2733         if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2734             (lkb->lkb_wait_type || lkb->lkb_wait_count))
2735                 goto out;
2736
2737         /* an lkb may be waiting for an rsb lookup to complete where the
2738            lookup was initiated by another lock */
2739
2740         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2741                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2742                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2743                         list_del_init(&lkb->lkb_rsb_lookup);
2744                         queue_cast(lkb->lkb_resource, lkb,
2745                                    args->flags & DLM_LKF_CANCEL ?
2746                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2747                         unhold_lkb(lkb); /* undoes create_lkb() */
2748                 }
2749                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2750                 goto out;
2751         }
2752
2753         rv = -EINVAL;
2754         if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2755                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2756                 dlm_print_lkb(lkb);
2757                 goto out;
2758         }
2759
2760         /* an lkb may still exist even though the lock is EOL'ed due to a
2761          * cancel, unlock or failed noqueue request; an app can't use these
2762          * locks; return same error as if the lkid had not been found at all
2763          */
2764
2765         if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2766                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2767                 rv = -ENOENT;
2768                 goto out;
2769         }
2770
2771         /* cancel not allowed with another cancel/unlock in progress */
2772
2773         if (args->flags & DLM_LKF_CANCEL) {
2774                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2775                         goto out;
2776
2777                 if (is_overlap(lkb))
2778                         goto out;
2779
2780                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2781                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2782                         rv = -EBUSY;
2783                         goto out;
2784                 }
2785
2786                 /* there's nothing to cancel */
2787                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2788                     !lkb->lkb_wait_type) {
2789                         rv = -EBUSY;
2790                         goto out;
2791                 }
2792
2793                 switch (lkb->lkb_wait_type) {
2794                 case DLM_MSG_LOOKUP:
2795                 case DLM_MSG_REQUEST:
2796                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2797                         rv = -EBUSY;
2798                         goto out;
2799                 case DLM_MSG_UNLOCK:
2800                 case DLM_MSG_CANCEL:
2801                         goto out;
2802                 }
2803                 /* add_to_waiters() will set OVERLAP_CANCEL */
2804                 goto out_ok;
2805         }
2806
2807         /* do we need to allow a force-unlock if there's a normal unlock
2808            already in progress?  in what conditions could the normal unlock
2809            fail such that we'd want to send a force-unlock to be sure? */
2810
2811         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2812                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2813                         goto out;
2814
2815                 if (is_overlap_unlock(lkb))
2816                         goto out;
2817
2818                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2819                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2820                         rv = -EBUSY;
2821                         goto out;
2822                 }
2823
2824                 switch (lkb->lkb_wait_type) {
2825                 case DLM_MSG_LOOKUP:
2826                 case DLM_MSG_REQUEST:
2827                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2828                         rv = -EBUSY;
2829                         goto out;
2830                 case DLM_MSG_UNLOCK:
2831                         goto out;
2832                 }
2833                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2834         }
2835
2836  out_ok:
2837         /* an overlapping op shouldn't blow away exflags from other op */
2838         lkb->lkb_exflags |= args->flags;
2839         dlm_set_sbflags_val(lkb, 0);
2840         lkb->lkb_astparam = args->astparam;
2841         rv = 0;
2842  out:
2843         switch (rv) {
2844         case 0:
2845                 break;
2846         case -EINVAL:
2847                 /* annoy the user because dlm usage is wrong */
2848                 WARN_ON(1);
2849                 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2850                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2851                           args->flags, lkb->lkb_wait_type,
2852                           lkb->lkb_resource->res_name);
2853                 break;
2854         default:
2855                 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2856                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2857                           args->flags, lkb->lkb_wait_type,
2858                           lkb->lkb_resource->res_name);
2859                 break;
2860         }
2861
2862         return rv;
2863 }
2864
2865 /*
2866  * Four stage 4 varieties:
2867  * do_request(), do_convert(), do_unlock(), do_cancel()
2868  * These are called on the master node for the given lock and
2869  * from the central locking logic.
2870  */
2871
2872 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2873 {
2874         int error = 0;
2875
2876         if (can_be_granted(r, lkb, 1, 0, NULL)) {
2877                 grant_lock(r, lkb);
2878                 queue_cast(r, lkb, 0);
2879                 goto out;
2880         }
2881
2882         if (can_be_queued(lkb)) {
2883                 error = -EINPROGRESS;
2884                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2885                 goto out;
2886         }
2887
2888         error = -EAGAIN;
2889         queue_cast(r, lkb, -EAGAIN);
2890  out:
2891         return error;
2892 }
2893
2894 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2895                                int error)
2896 {
2897         switch (error) {
2898         case -EAGAIN:
2899                 if (force_blocking_asts(lkb))
2900                         send_blocking_asts_all(r, lkb);
2901                 break;
2902         case -EINPROGRESS:
2903                 send_blocking_asts(r, lkb);
2904                 break;
2905         }
2906 }
2907
2908 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2909 {
2910         int error = 0;
2911         int deadlk = 0;
2912
2913         /* changing an existing lock may allow others to be granted */
2914
2915         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
2916                 grant_lock(r, lkb);
2917                 queue_cast(r, lkb, 0);
2918                 goto out;
2919         }
2920
2921         /* can_be_granted() detected that this lock would block in a conversion
2922            deadlock, so we leave it on the granted queue and return EDEADLK in
2923            the ast for the convert. */
2924
2925         if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2926                 /* it's left on the granted queue */
2927                 revert_lock(r, lkb);
2928                 queue_cast(r, lkb, -EDEADLK);
2929                 error = -EDEADLK;
2930                 goto out;
2931         }
2932
2933         /* is_demoted() means the can_be_granted() above set the grmode
2934            to NL, and left us on the granted queue.  This auto-demotion
2935            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2936            now grantable.  We have to try to grant other converting locks
2937            before we try again to grant this one. */
2938
2939         if (is_demoted(lkb)) {
2940                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2941                 if (_can_be_granted(r, lkb, 1, 0)) {
2942                         grant_lock(r, lkb);
2943                         queue_cast(r, lkb, 0);
2944                         goto out;
2945                 }
2946                 /* else fall through and move to convert queue */
2947         }
2948
2949         if (can_be_queued(lkb)) {
2950                 error = -EINPROGRESS;
2951                 del_lkb(r, lkb);
2952                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2953                 goto out;
2954         }
2955
2956         error = -EAGAIN;
2957         queue_cast(r, lkb, -EAGAIN);
2958  out:
2959         return error;
2960 }
2961
2962 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2963                                int error)
2964 {
2965         switch (error) {
2966         case 0:
2967                 grant_pending_locks(r, NULL);
2968                 /* grant_pending_locks also sends basts */
2969                 break;
2970         case -EAGAIN:
2971                 if (force_blocking_asts(lkb))
2972                         send_blocking_asts_all(r, lkb);
2973                 break;
2974         case -EINPROGRESS:
2975                 send_blocking_asts(r, lkb);
2976                 break;
2977         }
2978 }
2979
2980 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2981 {
2982         remove_lock(r, lkb);
2983         queue_cast(r, lkb, -DLM_EUNLOCK);
2984         return -DLM_EUNLOCK;
2985 }
2986
2987 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2988                               int error)
2989 {
2990         grant_pending_locks(r, NULL);
2991 }
2992
2993 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2994
2995 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2996 {
2997         int error;
2998
2999         error = revert_lock(r, lkb);
3000         if (error) {
3001                 queue_cast(r, lkb, -DLM_ECANCEL);
3002                 return -DLM_ECANCEL;
3003         }
3004         return 0;
3005 }
3006
3007 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3008                               int error)
3009 {
3010         if (error)
3011                 grant_pending_locks(r, NULL);
3012 }
3013
3014 /*
3015  * Four stage 3 varieties:
3016  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3017  */
3018
3019 /* add a new lkb to a possibly new rsb, called by requesting process */
3020
3021 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3022 {
3023         int error;
3024
3025         /* set_master: sets lkb nodeid from r */
3026
3027         error = set_master(r, lkb);
3028         if (error < 0)
3029                 goto out;
3030         if (error) {
3031                 error = 0;
3032                 goto out;
3033         }
3034
3035         if (is_remote(r)) {
3036                 /* receive_request() calls do_request() on remote node */
3037                 error = send_request(r, lkb);
3038         } else {
3039                 error = do_request(r, lkb);
3040                 /* for remote locks the request_reply is sent
3041                    between do_request and do_request_effects */
3042                 do_request_effects(r, lkb, error);
3043         }
3044  out:
3045         return error;
3046 }
3047
3048 /* change some property of an existing lkb, e.g. mode */
3049
3050 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3051 {
3052         int error;
3053
3054         if (is_remote(r)) {
3055                 /* receive_convert() calls do_convert() on remote node */
3056                 error = send_convert(r, lkb);
3057         } else {
3058                 error = do_convert(r, lkb);
3059                 /* for remote locks the convert_reply is sent
3060                    between do_convert and do_convert_effects */
3061                 do_convert_effects(r, lkb, error);
3062         }
3063
3064         return error;
3065 }
3066
3067 /* remove an existing lkb from the granted queue */
3068
3069 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3070 {
3071         int error;
3072
3073         if (is_remote(r)) {
3074                 /* receive_unlock() calls do_unlock() on remote node */
3075                 error = send_unlock(r, lkb);
3076         } else {
3077                 error = do_unlock(r, lkb);
3078                 /* for remote locks the unlock_reply is sent
3079                    between do_unlock and do_unlock_effects */
3080                 do_unlock_effects(r, lkb, error);
3081         }
3082
3083         return error;
3084 }
3085
3086 /* remove an existing lkb from the convert or wait queue */
3087
3088 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3089 {
3090         int error;
3091
3092         if (is_remote(r)) {
3093                 /* receive_cancel() calls do_cancel() on remote node */
3094                 error = send_cancel(r, lkb);
3095         } else {
3096                 error = do_cancel(r, lkb);
3097                 /* for remote locks the cancel_reply is sent
3098                    between do_cancel and do_cancel_effects */
3099                 do_cancel_effects(r, lkb, error);
3100         }
3101
3102         return error;
3103 }
3104
3105 /*
3106  * Four stage 2 varieties:
3107  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3108  */
3109
3110 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3111                         const void *name, int len,
3112                         struct dlm_args *args)
3113 {
3114         struct dlm_rsb *r;
3115         int error;
3116
3117         error = validate_lock_args(ls, lkb, args);
3118         if (error)
3119                 return error;
3120
3121         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3122         if (error)
3123                 return error;
3124
3125         lock_rsb(r);
3126
3127         attach_lkb(r, lkb);
3128         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3129
3130         error = _request_lock(r, lkb);
3131
3132         unlock_rsb(r);
3133         put_rsb(r);
3134         return error;
3135 }
3136
3137 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3138                         struct dlm_args *args)
3139 {
3140         struct dlm_rsb *r;
3141         int error;
3142
3143         r = lkb->lkb_resource;
3144
3145         hold_rsb(r);
3146         lock_rsb(r);
3147
3148         error = validate_lock_args(ls, lkb, args);
3149         if (error)
3150                 goto out;
3151
3152         error = _convert_lock(r, lkb);
3153  out:
3154         unlock_rsb(r);
3155         put_rsb(r);
3156         return error;
3157 }
3158
3159 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3160                        struct dlm_args *args)
3161 {
3162         struct dlm_rsb *r;
3163         int error;
3164
3165         r = lkb->lkb_resource;
3166
3167         hold_rsb(r);
3168         lock_rsb(r);
3169
3170         error = validate_unlock_args(lkb, args);
3171         if (error)
3172                 goto out;
3173
3174         error = _unlock_lock(r, lkb);
3175  out:
3176         unlock_rsb(r);
3177         put_rsb(r);
3178         return error;
3179 }
3180
3181 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3182                        struct dlm_args *args)
3183 {
3184         struct dlm_rsb *r;
3185         int error;
3186
3187         r = lkb->lkb_resource;
3188
3189         hold_rsb(r);
3190         lock_rsb(r);
3191
3192         error = validate_unlock_args(lkb, args);
3193         if (error)
3194                 goto out;
3195
3196         error = _cancel_lock(r, lkb);
3197  out:
3198         unlock_rsb(r);
3199         put_rsb(r);
3200         return error;
3201 }
3202
3203 /*
3204  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3205  */
3206
3207 int dlm_lock(dlm_lockspace_t *lockspace,
3208              int mode,
3209              struct dlm_lksb *lksb,
3210              uint32_t flags,
3211              const void *name,
3212              unsigned int namelen,
3213              uint32_t parent_lkid,
3214              void (*ast) (void *astarg),
3215              void *astarg,
3216              void (*bast) (void *astarg, int mode))
3217 {
3218         struct dlm_ls *ls;
3219         struct dlm_lkb *lkb;
3220         struct dlm_args args;
3221         int error, convert = flags & DLM_LKF_CONVERT;
3222
3223         ls = dlm_find_lockspace_local(lockspace);
3224         if (!ls)
3225                 return -EINVAL;
3226
3227         dlm_lock_recovery(ls);
3228
3229         if (convert)
3230                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3231         else
3232                 error = create_lkb(ls, &lkb);
3233
3234         if (error)
3235                 goto out;
3236
3237         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3238
3239         error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3240                               &args);
3241         if (error)
3242                 goto out_put;
3243
3244         if (convert)
3245                 error = convert_lock(ls, lkb, &args);
3246         else
3247                 error = request_lock(ls, lkb, name, namelen, &args);
3248
3249         if (error == -EINPROGRESS)
3250                 error = 0;
3251  out_put:
3252         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3253
3254         if (convert || error)
3255                 __put_lkb(ls, lkb);
3256         if (error == -EAGAIN || error == -EDEADLK)
3257                 error = 0;
3258  out:
3259         dlm_unlock_recovery(ls);
3260         dlm_put_lockspace(ls);
3261         return error;
3262 }
3263
3264 int dlm_unlock(dlm_lockspace_t *lockspace,
3265                uint32_t lkid,
3266                uint32_t flags,
3267                struct dlm_lksb *lksb,
3268                void *astarg)
3269 {
3270         struct dlm_ls *ls;
3271         struct dlm_lkb *lkb;
3272         struct dlm_args args;
3273         int error;
3274
3275         ls = dlm_find_lockspace_local(lockspace);
3276         if (!ls)
3277                 return -EINVAL;
3278
3279         dlm_lock_recovery(ls);
3280
3281         error = find_lkb(ls, lkid, &lkb);
3282         if (error)
3283                 goto out;
3284
3285         trace_dlm_unlock_start(ls, lkb, flags);
3286
3287         error = set_unlock_args(flags, astarg, &args);
3288         if (error)
3289                 goto out_put;
3290
3291         if (flags & DLM_LKF_CANCEL)
3292                 error = cancel_lock(ls, lkb, &args);
3293         else
3294                 error = unlock_lock(ls, lkb, &args);
3295
3296         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3297                 error = 0;
3298         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3299                 error = 0;
3300  out_put:
3301         trace_dlm_unlock_end(ls, lkb, flags, error);
3302
3303         dlm_put_lkb(lkb);
3304  out:
3305         dlm_unlock_recovery(ls);
3306         dlm_put_lockspace(ls);
3307         return error;
3308 }
3309
3310 /*
3311  * send/receive routines for remote operations and replies
3312  *
3313  * send_args
3314  * send_common
3315  * send_request                 receive_request
3316  * send_convert                 receive_convert
3317  * send_unlock                  receive_unlock
3318  * send_cancel                  receive_cancel
3319  * send_grant                   receive_grant
3320  * send_bast                    receive_bast
3321  * send_lookup                  receive_lookup
3322  * send_remove                  receive_remove
3323  *
3324  *                              send_common_reply
3325  * receive_request_reply        send_request_reply
3326  * receive_convert_reply        send_convert_reply
3327  * receive_unlock_reply         send_unlock_reply
3328  * receive_cancel_reply         send_cancel_reply
3329  * receive_lookup_reply         send_lookup_reply
3330  */
3331
3332 static int _create_message(struct dlm_ls *ls, int mb_len,
3333                            int to_nodeid, int mstype,
3334                            struct dlm_message **ms_ret,
3335                            struct dlm_mhandle **mh_ret,
3336                            gfp_t allocation)
3337 {
3338         struct dlm_message *ms;
3339         struct dlm_mhandle *mh;
3340         char *mb;
3341
3342         /* get_buffer gives us a message handle (mh) that we need to
3343            pass into midcomms_commit and a message buffer (mb) that we
3344            write our data into */
3345
3346         mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
3347         if (!mh)
3348                 return -ENOBUFS;
3349
3350         ms = (struct dlm_message *) mb;
3351
3352         ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3353         ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3354         ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3355         ms->m_header.h_length = cpu_to_le16(mb_len);
3356         ms->m_header.h_cmd = DLM_MSG;
3357
3358         ms->m_type = cpu_to_le32(mstype);
3359
3360         *mh_ret = mh;
3361         *ms_ret = ms;
3362         return 0;
3363 }
3364
3365 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3366                           int to_nodeid, int mstype,
3367                           struct dlm_message **ms_ret,
3368                           struct dlm_mhandle **mh_ret,
3369                           gfp_t allocation)
3370 {
3371         int mb_len = sizeof(struct dlm_message);
3372
3373         switch (mstype) {
3374         case DLM_MSG_REQUEST:
3375         case DLM_MSG_LOOKUP:
3376         case DLM_MSG_REMOVE:
3377                 mb_len += r->res_length;
3378                 break;
3379         case DLM_MSG_CONVERT:
3380         case DLM_MSG_UNLOCK:
3381         case DLM_MSG_REQUEST_REPLY:
3382         case DLM_MSG_CONVERT_REPLY:
3383         case DLM_MSG_GRANT:
3384                 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3385                         mb_len += r->res_ls->ls_lvblen;
3386                 break;
3387         }
3388
3389         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3390                                ms_ret, mh_ret, allocation);
3391 }
3392
3393 /* further lowcomms enhancements or alternate implementations may make
3394    the return value from this function useful at some point */
3395
3396 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3397                         const void *name, int namelen)
3398 {
3399         dlm_midcomms_commit_mhandle(mh, name, namelen);
3400         return 0;
3401 }
3402
3403 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3404                       struct dlm_message *ms)
3405 {
3406         ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3407         ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3408         ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3409         ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3410         ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3411         ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3412         ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3413         ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3414         ms->m_status   = cpu_to_le32(lkb->lkb_status);
3415         ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3416         ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3417         ms->m_hash     = cpu_to_le32(r->res_hash);
3418
3419         /* m_result and m_bastmode are set from function args,
3420            not from lkb fields */
3421
3422         if (lkb->lkb_bastfn)
3423                 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3424         if (lkb->lkb_astfn)
3425                 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3426
3427         /* compare with switch in create_message; send_remove() doesn't
3428            use send_args() */
3429
3430         switch (ms->m_type) {
3431         case cpu_to_le32(DLM_MSG_REQUEST):
3432         case cpu_to_le32(DLM_MSG_LOOKUP):
3433                 memcpy(ms->m_extra, r->res_name, r->res_length);
3434                 break;
3435         case cpu_to_le32(DLM_MSG_CONVERT):
3436         case cpu_to_le32(DLM_MSG_UNLOCK):
3437         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3438         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3439         case cpu_to_le32(DLM_MSG_GRANT):
3440                 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3441                         break;
3442                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3443                 break;
3444         }
3445 }
3446
3447 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3448 {
3449         struct dlm_message *ms;
3450         struct dlm_mhandle *mh;
3451         int to_nodeid, error;
3452
3453         to_nodeid = r->res_nodeid;
3454
3455         error = add_to_waiters(lkb, mstype, to_nodeid);
3456         if (error)
3457                 return error;
3458
3459         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3460         if (error)
3461                 goto fail;
3462
3463         send_args(r, lkb, ms);
3464
3465         error = send_message(mh, ms, r->res_name, r->res_length);
3466         if (error)
3467                 goto fail;
3468         return 0;
3469
3470  fail:
3471         remove_from_waiters(lkb, msg_reply_type(mstype));
3472         return error;
3473 }
3474
3475 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3476 {
3477         return send_common(r, lkb, DLM_MSG_REQUEST);
3478 }
3479
3480 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3481 {
3482         int error;
3483
3484         error = send_common(r, lkb, DLM_MSG_CONVERT);
3485
3486         /* down conversions go without a reply from the master */
3487         if (!error && down_conversion(lkb)) {
3488                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3489                 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3490                 r->res_ls->ls_local_ms.m_result = 0;
3491                 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3492         }
3493
3494         return error;
3495 }
3496
3497 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3498    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3499    that the master is still correct. */
3500
3501 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3502 {
3503         return send_common(r, lkb, DLM_MSG_UNLOCK);
3504 }
3505
3506 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3507 {
3508         return send_common(r, lkb, DLM_MSG_CANCEL);
3509 }
3510
3511 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3512 {
3513         struct dlm_message *ms;
3514         struct dlm_mhandle *mh;
3515         int to_nodeid, error;
3516
3517         to_nodeid = lkb->lkb_nodeid;
3518
3519         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
3520                                GFP_NOFS);
3521         if (error)
3522                 goto out;
3523
3524         send_args(r, lkb, ms);
3525
3526         ms->m_result = 0;
3527
3528         error = send_message(mh, ms, r->res_name, r->res_length);
3529  out:
3530         return error;
3531 }
3532
3533 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3534 {
3535         struct dlm_message *ms;
3536         struct dlm_mhandle *mh;
3537         int to_nodeid, error;
3538
3539         to_nodeid = lkb->lkb_nodeid;
3540
3541         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
3542                                GFP_NOFS);
3543         if (error)
3544                 goto out;
3545
3546         send_args(r, lkb, ms);
3547
3548         ms->m_bastmode = cpu_to_le32(mode);
3549
3550         error = send_message(mh, ms, r->res_name, r->res_length);
3551  out:
3552         return error;
3553 }
3554
3555 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3556 {
3557         struct dlm_message *ms;
3558         struct dlm_mhandle *mh;
3559         int to_nodeid, error;
3560
3561         to_nodeid = dlm_dir_nodeid(r);
3562
3563         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3564         if (error)
3565                 return error;
3566
3567         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
3568                                GFP_NOFS);
3569         if (error)
3570                 goto fail;
3571
3572         send_args(r, lkb, ms);
3573
3574         error = send_message(mh, ms, r->res_name, r->res_length);
3575         if (error)
3576                 goto fail;
3577         return 0;
3578
3579  fail:
3580         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3581         return error;
3582 }
3583
3584 static int send_remove(struct dlm_rsb *r)
3585 {
3586         struct dlm_message *ms;
3587         struct dlm_mhandle *mh;
3588         int to_nodeid, error;
3589
3590         to_nodeid = dlm_dir_nodeid(r);
3591
3592         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
3593                                GFP_ATOMIC);
3594         if (error)
3595                 goto out;
3596
3597         memcpy(ms->m_extra, r->res_name, r->res_length);
3598         ms->m_hash = cpu_to_le32(r->res_hash);
3599
3600         error = send_message(mh, ms, r->res_name, r->res_length);
3601  out:
3602         return error;
3603 }
3604
3605 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3606                              int mstype, int rv)
3607 {
3608         struct dlm_message *ms;
3609         struct dlm_mhandle *mh;
3610         int to_nodeid, error;
3611
3612         to_nodeid = lkb->lkb_nodeid;
3613
3614         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3615         if (error)
3616                 goto out;
3617
3618         send_args(r, lkb, ms);
3619
3620         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3621
3622         error = send_message(mh, ms, r->res_name, r->res_length);
3623  out:
3624         return error;
3625 }
3626
3627 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3628 {
3629         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3630 }
3631
3632 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3633 {
3634         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3635 }
3636
3637 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3638 {
3639         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3640 }
3641
3642 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3643 {
3644         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3645 }
3646
3647 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3648                              int ret_nodeid, int rv)
3649 {
3650         struct dlm_rsb *r = &ls->ls_local_rsb;
3651         struct dlm_message *ms;
3652         struct dlm_mhandle *mh;
3653         int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3654
3655         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
3656                                GFP_NOFS);
3657         if (error)
3658                 goto out;
3659
3660         ms->m_lkid = ms_in->m_lkid;
3661         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3662         ms->m_nodeid = cpu_to_le32(ret_nodeid);
3663
3664         error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3665  out:
3666         return error;
3667 }
3668
3669 /* which args we save from a received message depends heavily on the type
3670    of message, unlike the send side where we can safely send everything about
3671    the lkb for any type of message */
3672
3673 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3674 {
3675         lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3676         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3677         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3678 }
3679
3680 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
3681                                 bool local)
3682 {
3683         if (local)
3684                 return;
3685
3686         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3687         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3688 }
3689
3690 static int receive_extralen(struct dlm_message *ms)
3691 {
3692         return (le16_to_cpu(ms->m_header.h_length) -
3693                 sizeof(struct dlm_message));
3694 }
3695
3696 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3697                        struct dlm_message *ms)
3698 {
3699         int len;
3700
3701         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3702                 if (!lkb->lkb_lvbptr)
3703                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3704                 if (!lkb->lkb_lvbptr)
3705                         return -ENOMEM;
3706                 len = receive_extralen(ms);
3707                 if (len > ls->ls_lvblen)
3708                         len = ls->ls_lvblen;
3709                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3710         }
3711         return 0;
3712 }
3713
3714 static void fake_bastfn(void *astparam, int mode)
3715 {
3716         log_print("fake_bastfn should not be called");
3717 }
3718
3719 static void fake_astfn(void *astparam)
3720 {
3721         log_print("fake_astfn should not be called");
3722 }
3723
3724 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3725                                 struct dlm_message *ms)
3726 {
3727         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3728         lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3729         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3730         lkb->lkb_grmode = DLM_LOCK_IV;
3731         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3732
3733         lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3734         lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3735
3736         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3737                 /* lkb was just created so there won't be an lvb yet */
3738                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3739                 if (!lkb->lkb_lvbptr)
3740                         return -ENOMEM;
3741         }
3742
3743         return 0;
3744 }
3745
3746 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3747                                 struct dlm_message *ms)
3748 {
3749         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3750                 return -EBUSY;
3751
3752         if (receive_lvb(ls, lkb, ms))
3753                 return -ENOMEM;
3754
3755         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3756         lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3757
3758         return 0;
3759 }
3760
3761 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3762                                struct dlm_message *ms)
3763 {
3764         if (receive_lvb(ls, lkb, ms))
3765                 return -ENOMEM;
3766         return 0;
3767 }
3768
3769 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3770    uses to send a reply and that the remote end uses to process the reply. */
3771
3772 static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3773 {
3774         struct dlm_lkb *lkb = &ls->ls_local_lkb;
3775         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3776         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3777 }
3778
3779 /* This is called after the rsb is locked so that we can safely inspect
3780    fields in the lkb. */
3781
3782 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3783 {
3784         int from = le32_to_cpu(ms->m_header.h_nodeid);
3785         int error = 0;
3786
3787         /* currently mixing of user/kernel locks are not supported */
3788         if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3789             !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3790                 log_error(lkb->lkb_resource->res_ls,
3791                           "got user dlm message for a kernel lock");
3792                 error = -EINVAL;
3793                 goto out;
3794         }
3795
3796         switch (ms->m_type) {
3797         case cpu_to_le32(DLM_MSG_CONVERT):
3798         case cpu_to_le32(DLM_MSG_UNLOCK):
3799         case cpu_to_le32(DLM_MSG_CANCEL):
3800                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3801                         error = -EINVAL;
3802                 break;
3803
3804         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3805         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3806         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3807         case cpu_to_le32(DLM_MSG_GRANT):
3808         case cpu_to_le32(DLM_MSG_BAST):
3809                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3810                         error = -EINVAL;
3811                 break;
3812
3813         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3814                 if (!is_process_copy(lkb))
3815                         error = -EINVAL;
3816                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3817                         error = -EINVAL;
3818                 break;
3819
3820         default:
3821                 error = -EINVAL;
3822         }
3823
3824 out:
3825         if (error)
3826                 log_error(lkb->lkb_resource->res_ls,
3827                           "ignore invalid message %d from %d %x %x %x %d",
3828                           le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3829                           lkb->lkb_remid, dlm_iflags_val(lkb),
3830                           lkb->lkb_nodeid);
3831         return error;
3832 }
3833
3834 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3835 {
3836         struct dlm_lkb *lkb;
3837         struct dlm_rsb *r;
3838         int from_nodeid;
3839         int error, namelen = 0;
3840
3841         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3842
3843         error = create_lkb(ls, &lkb);
3844         if (error)
3845                 goto fail;
3846
3847         receive_flags(lkb, ms);
3848         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3849         error = receive_request_args(ls, lkb, ms);
3850         if (error) {
3851                 __put_lkb(ls, lkb);
3852                 goto fail;
3853         }
3854
3855         /* The dir node is the authority on whether we are the master
3856            for this rsb or not, so if the master sends us a request, we should
3857            recreate the rsb if we've destroyed it.   This race happens when we
3858            send a remove message to the dir node at the same time that the dir
3859            node sends us a request for the rsb. */
3860
3861         namelen = receive_extralen(ms);
3862
3863         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
3864                          R_RECEIVE_REQUEST, &r);
3865         if (error) {
3866                 __put_lkb(ls, lkb);
3867                 goto fail;
3868         }
3869
3870         lock_rsb(r);
3871
3872         if (r->res_master_nodeid != dlm_our_nodeid()) {
3873                 error = validate_master_nodeid(ls, r, from_nodeid);
3874                 if (error) {
3875                         unlock_rsb(r);
3876                         put_rsb(r);
3877                         __put_lkb(ls, lkb);
3878                         goto fail;
3879                 }
3880         }
3881
3882         attach_lkb(r, lkb);
3883         error = do_request(r, lkb);
3884         send_request_reply(r, lkb, error);
3885         do_request_effects(r, lkb, error);
3886
3887         unlock_rsb(r);
3888         put_rsb(r);
3889
3890         if (error == -EINPROGRESS)
3891                 error = 0;
3892         if (error)
3893                 dlm_put_lkb(lkb);
3894         return 0;
3895
3896  fail:
3897         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
3898            and do this receive_request again from process_lookup_list once
3899            we get the lookup reply.  This would avoid a many repeated
3900            ENOTBLK request failures when the lookup reply designating us
3901            as master is delayed. */
3902
3903         if (error != -ENOTBLK) {
3904                 log_limit(ls, "receive_request %x from %d %d",
3905                           le32_to_cpu(ms->m_lkid), from_nodeid, error);
3906         }
3907
3908         setup_local_lkb(ls, ms);
3909         send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3910         return error;
3911 }
3912
3913 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3914 {
3915         struct dlm_lkb *lkb;
3916         struct dlm_rsb *r;
3917         int error, reply = 1;
3918
3919         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3920         if (error)
3921                 goto fail;
3922
3923         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3924                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3925                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3926                           (unsigned long long)lkb->lkb_recover_seq,
3927                           le32_to_cpu(ms->m_header.h_nodeid),
3928                           le32_to_cpu(ms->m_lkid));
3929                 error = -ENOENT;
3930                 dlm_put_lkb(lkb);
3931                 goto fail;
3932         }
3933
3934         r = lkb->lkb_resource;
3935
3936         hold_rsb(r);
3937         lock_rsb(r);
3938
3939         error = validate_message(lkb, ms);
3940         if (error)
3941                 goto out;
3942
3943         receive_flags(lkb, ms);
3944
3945         error = receive_convert_args(ls, lkb, ms);
3946         if (error) {
3947                 send_convert_reply(r, lkb, error);
3948                 goto out;
3949         }
3950
3951         reply = !down_conversion(lkb);
3952
3953         error = do_convert(r, lkb);
3954         if (reply)
3955                 send_convert_reply(r, lkb, error);
3956         do_convert_effects(r, lkb, error);
3957  out:
3958         unlock_rsb(r);
3959         put_rsb(r);
3960         dlm_put_lkb(lkb);
3961         return 0;
3962
3963  fail:
3964         setup_local_lkb(ls, ms);
3965         send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3966         return error;
3967 }
3968
3969 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3970 {
3971         struct dlm_lkb *lkb;
3972         struct dlm_rsb *r;
3973         int error;
3974
3975         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3976         if (error)
3977                 goto fail;
3978
3979         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3980                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3981                           lkb->lkb_id, lkb->lkb_remid,
3982                           le32_to_cpu(ms->m_header.h_nodeid),
3983                           le32_to_cpu(ms->m_lkid));
3984                 error = -ENOENT;
3985                 dlm_put_lkb(lkb);
3986                 goto fail;
3987         }
3988
3989         r = lkb->lkb_resource;
3990
3991         hold_rsb(r);
3992         lock_rsb(r);
3993
3994         error = validate_message(lkb, ms);
3995         if (error)
3996                 goto out;
3997
3998         receive_flags(lkb, ms);
3999
4000         error = receive_unlock_args(ls, lkb, ms);
4001         if (error) {
4002                 send_unlock_reply(r, lkb, error);
4003                 goto out;
4004         }
4005
4006         error = do_unlock(r, lkb);
4007         send_unlock_reply(r, lkb, error);
4008         do_unlock_effects(r, lkb, error);
4009  out:
4010         unlock_rsb(r);
4011         put_rsb(r);
4012         dlm_put_lkb(lkb);
4013         return 0;
4014
4015  fail:
4016         setup_local_lkb(ls, ms);
4017         send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4018         return error;
4019 }
4020
4021 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4022 {
4023         struct dlm_lkb *lkb;
4024         struct dlm_rsb *r;
4025         int error;
4026
4027         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4028         if (error)
4029                 goto fail;
4030
4031         receive_flags(lkb, ms);
4032
4033         r = lkb->lkb_resource;
4034
4035         hold_rsb(r);
4036         lock_rsb(r);
4037
4038         error = validate_message(lkb, ms);
4039         if (error)
4040                 goto out;
4041
4042         error = do_cancel(r, lkb);
4043         send_cancel_reply(r, lkb, error);
4044         do_cancel_effects(r, lkb, error);
4045  out:
4046         unlock_rsb(r);
4047         put_rsb(r);
4048         dlm_put_lkb(lkb);
4049         return 0;
4050
4051  fail:
4052         setup_local_lkb(ls, ms);
4053         send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4054         return error;
4055 }
4056
4057 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4058 {
4059         struct dlm_lkb *lkb;
4060         struct dlm_rsb *r;
4061         int error;
4062
4063         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4064         if (error)
4065                 return error;
4066
4067         r = lkb->lkb_resource;
4068
4069         hold_rsb(r);
4070         lock_rsb(r);
4071
4072         error = validate_message(lkb, ms);
4073         if (error)
4074                 goto out;
4075
4076         receive_flags_reply(lkb, ms, false);
4077         if (is_altmode(lkb))
4078                 munge_altmode(lkb, ms);
4079         grant_lock_pc(r, lkb, ms);
4080         queue_cast(r, lkb, 0);
4081  out:
4082         unlock_rsb(r);
4083         put_rsb(r);
4084         dlm_put_lkb(lkb);
4085         return 0;
4086 }
4087
4088 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4089 {
4090         struct dlm_lkb *lkb;
4091         struct dlm_rsb *r;
4092         int error;
4093
4094         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4095         if (error)
4096                 return error;
4097
4098         r = lkb->lkb_resource;
4099
4100         hold_rsb(r);
4101         lock_rsb(r);
4102
4103         error = validate_message(lkb, ms);
4104         if (error)
4105                 goto out;
4106
4107         queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4108         lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4109  out:
4110         unlock_rsb(r);
4111         put_rsb(r);
4112         dlm_put_lkb(lkb);
4113         return 0;
4114 }
4115
4116 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4117 {
4118         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4119
4120         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4121         our_nodeid = dlm_our_nodeid();
4122
4123         len = receive_extralen(ms);
4124
4125         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4126                                   &ret_nodeid, NULL);
4127
4128         /* Optimization: we're master so treat lookup as a request */
4129         if (!error && ret_nodeid == our_nodeid) {
4130                 receive_request(ls, ms);
4131                 return;
4132         }
4133         send_lookup_reply(ls, ms, ret_nodeid, error);
4134 }
4135
4136 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4137 {
4138         char name[DLM_RESNAME_MAXLEN+1];
4139         struct dlm_rsb *r;
4140         uint32_t hash, b;
4141         int rv, len, dir_nodeid, from_nodeid;
4142
4143         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4144
4145         len = receive_extralen(ms);
4146
4147         if (len > DLM_RESNAME_MAXLEN) {
4148                 log_error(ls, "receive_remove from %d bad len %d",
4149                           from_nodeid, len);
4150                 return;
4151         }
4152
4153         dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4154         if (dir_nodeid != dlm_our_nodeid()) {
4155                 log_error(ls, "receive_remove from %d bad nodeid %d",
4156                           from_nodeid, dir_nodeid);
4157                 return;
4158         }
4159
4160         /* Look for name on rsbtbl.toss, if it's there, kill it.
4161            If it's on rsbtbl.keep, it's being used, and we should ignore this
4162            message.  This is an expected race between the dir node sending a
4163            request to the master node at the same time as the master node sends
4164            a remove to the dir node.  The resolution to that race is for the
4165            dir node to ignore the remove message, and the master node to
4166            recreate the master rsb when it gets a request from the dir node for
4167            an rsb it doesn't have. */
4168
4169         memset(name, 0, sizeof(name));
4170         memcpy(name, ms->m_extra, len);
4171
4172         hash = jhash(name, len, 0);
4173         b = hash & (ls->ls_rsbtbl_size - 1);
4174
4175         spin_lock(&ls->ls_rsbtbl[b].lock);
4176
4177         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4178         if (rv) {
4179                 /* verify the rsb is on keep list per comment above */
4180                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4181                 if (rv) {
4182                         /* should not happen */
4183                         log_error(ls, "receive_remove from %d not found %s",
4184                                   from_nodeid, name);
4185                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4186                         return;
4187                 }
4188                 if (r->res_master_nodeid != from_nodeid) {
4189                         /* should not happen */
4190                         log_error(ls, "receive_remove keep from %d master %d",
4191                                   from_nodeid, r->res_master_nodeid);
4192                         dlm_print_rsb(r);
4193                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4194                         return;
4195                 }
4196
4197                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4198                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4199                           name);
4200                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4201                 return;
4202         }
4203
4204         if (r->res_master_nodeid != from_nodeid) {
4205                 log_error(ls, "receive_remove toss from %d master %d",
4206                           from_nodeid, r->res_master_nodeid);
4207                 dlm_print_rsb(r);
4208                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4209                 return;
4210         }
4211
4212         if (kref_put(&r->res_ref, kill_rsb)) {
4213                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4214                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4215                 dlm_free_rsb(r);
4216         } else {
4217                 log_error(ls, "receive_remove from %d rsb ref error",
4218                           from_nodeid);
4219                 dlm_print_rsb(r);
4220                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4221         }
4222 }
4223
4224 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4225 {
4226         do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4227 }
4228
4229 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4230 {
4231         struct dlm_lkb *lkb;
4232         struct dlm_rsb *r;
4233         int error, mstype, result;
4234         int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4235
4236         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4237         if (error)
4238                 return error;
4239
4240         r = lkb->lkb_resource;
4241         hold_rsb(r);
4242         lock_rsb(r);
4243
4244         error = validate_message(lkb, ms);
4245         if (error)
4246                 goto out;
4247
4248         mstype = lkb->lkb_wait_type;
4249         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4250         if (error) {
4251                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4252                           lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4253                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4254                 dlm_dump_rsb(r);
4255                 goto out;
4256         }
4257
4258         /* Optimization: the dir node was also the master, so it took our
4259            lookup as a request and sent request reply instead of lookup reply */
4260         if (mstype == DLM_MSG_LOOKUP) {
4261                 r->res_master_nodeid = from_nodeid;
4262                 r->res_nodeid = from_nodeid;
4263                 lkb->lkb_nodeid = from_nodeid;
4264         }
4265
4266         /* this is the value returned from do_request() on the master */
4267         result = from_dlm_errno(le32_to_cpu(ms->m_result));
4268
4269         switch (result) {
4270         case -EAGAIN:
4271                 /* request would block (be queued) on remote master */
4272                 queue_cast(r, lkb, -EAGAIN);
4273                 confirm_master(r, -EAGAIN);
4274                 unhold_lkb(lkb); /* undoes create_lkb() */
4275                 break;
4276
4277         case -EINPROGRESS:
4278         case 0:
4279                 /* request was queued or granted on remote master */
4280                 receive_flags_reply(lkb, ms, false);
4281                 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4282                 if (is_altmode(lkb))
4283                         munge_altmode(lkb, ms);
4284                 if (result) {
4285                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4286                 } else {
4287                         grant_lock_pc(r, lkb, ms);
4288                         queue_cast(r, lkb, 0);
4289                 }
4290                 confirm_master(r, result);
4291                 break;
4292
4293         case -EBADR:
4294         case -ENOTBLK:
4295                 /* find_rsb failed to find rsb or rsb wasn't master */
4296                 log_limit(ls, "receive_request_reply %x from %d %d "
4297                           "master %d dir %d first %x %s", lkb->lkb_id,
4298                           from_nodeid, result, r->res_master_nodeid,
4299                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4300
4301                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4302                     r->res_master_nodeid != dlm_our_nodeid()) {
4303                         /* cause _request_lock->set_master->send_lookup */
4304                         r->res_master_nodeid = 0;
4305                         r->res_nodeid = -1;
4306                         lkb->lkb_nodeid = -1;
4307                 }
4308
4309                 if (is_overlap(lkb)) {
4310                         /* we'll ignore error in cancel/unlock reply */
4311                         queue_cast_overlap(r, lkb);
4312                         confirm_master(r, result);
4313                         unhold_lkb(lkb); /* undoes create_lkb() */
4314                 } else {
4315                         _request_lock(r, lkb);
4316
4317                         if (r->res_master_nodeid == dlm_our_nodeid())
4318                                 confirm_master(r, 0);
4319                 }
4320                 break;
4321
4322         default:
4323                 log_error(ls, "receive_request_reply %x error %d",
4324                           lkb->lkb_id, result);
4325         }
4326
4327         if ((result == 0 || result == -EINPROGRESS) &&
4328             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4329                 log_debug(ls, "receive_request_reply %x result %d unlock",
4330                           lkb->lkb_id, result);
4331                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4332                 send_unlock(r, lkb);
4333         } else if ((result == -EINPROGRESS) &&
4334                    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4335                                       &lkb->lkb_iflags)) {
4336                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4337                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4338                 send_cancel(r, lkb);
4339         } else {
4340                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4341                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4342         }
4343  out:
4344         unlock_rsb(r);
4345         put_rsb(r);
4346         dlm_put_lkb(lkb);
4347         return 0;
4348 }
4349
4350 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4351                                     struct dlm_message *ms, bool local)
4352 {
4353         /* this is the value returned from do_convert() on the master */
4354         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4355         case -EAGAIN:
4356                 /* convert would block (be queued) on remote master */
4357                 queue_cast(r, lkb, -EAGAIN);
4358                 break;
4359
4360         case -EDEADLK:
4361                 receive_flags_reply(lkb, ms, local);
4362                 revert_lock_pc(r, lkb);
4363                 queue_cast(r, lkb, -EDEADLK);
4364                 break;
4365
4366         case -EINPROGRESS:
4367                 /* convert was queued on remote master */
4368                 receive_flags_reply(lkb, ms, local);
4369                 if (is_demoted(lkb))
4370                         munge_demoted(lkb);
4371                 del_lkb(r, lkb);
4372                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4373                 break;
4374
4375         case 0:
4376                 /* convert was granted on remote master */
4377                 receive_flags_reply(lkb, ms, local);
4378                 if (is_demoted(lkb))
4379                         munge_demoted(lkb);
4380                 grant_lock_pc(r, lkb, ms);
4381                 queue_cast(r, lkb, 0);
4382                 break;
4383
4384         default:
4385                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4386                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4387                           le32_to_cpu(ms->m_lkid),
4388                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4389                 dlm_print_rsb(r);
4390                 dlm_print_lkb(lkb);
4391         }
4392 }
4393
4394 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
4395                                    bool local)
4396 {
4397         struct dlm_rsb *r = lkb->lkb_resource;
4398         int error;
4399
4400         hold_rsb(r);
4401         lock_rsb(r);
4402
4403         error = validate_message(lkb, ms);
4404         if (error)
4405                 goto out;
4406
4407         /* local reply can happen with waiters_mutex held */
4408         error = remove_from_waiters_ms(lkb, ms, local);
4409         if (error)
4410                 goto out;
4411
4412         __receive_convert_reply(r, lkb, ms, local);
4413  out:
4414         unlock_rsb(r);
4415         put_rsb(r);
4416 }
4417
4418 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4419 {
4420         struct dlm_lkb *lkb;
4421         int error;
4422
4423         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4424         if (error)
4425                 return error;
4426
4427         _receive_convert_reply(lkb, ms, false);
4428         dlm_put_lkb(lkb);
4429         return 0;
4430 }
4431
4432 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
4433                                   bool local)
4434 {
4435         struct dlm_rsb *r = lkb->lkb_resource;
4436         int error;
4437
4438         hold_rsb(r);
4439         lock_rsb(r);
4440
4441         error = validate_message(lkb, ms);
4442         if (error)
4443                 goto out;
4444
4445         /* local reply can happen with waiters_mutex held */
4446         error = remove_from_waiters_ms(lkb, ms, local);
4447         if (error)
4448                 goto out;
4449
4450         /* this is the value returned from do_unlock() on the master */
4451
4452         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4453         case -DLM_EUNLOCK:
4454                 receive_flags_reply(lkb, ms, local);
4455                 remove_lock_pc(r, lkb);
4456                 queue_cast(r, lkb, -DLM_EUNLOCK);
4457                 break;
4458         case -ENOENT:
4459                 break;
4460         default:
4461                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4462                           lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4463         }
4464  out:
4465         unlock_rsb(r);
4466         put_rsb(r);
4467 }
4468
4469 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4470 {
4471         struct dlm_lkb *lkb;
4472         int error;
4473
4474         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4475         if (error)
4476                 return error;
4477
4478         _receive_unlock_reply(lkb, ms, false);
4479         dlm_put_lkb(lkb);
4480         return 0;
4481 }
4482
4483 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
4484                                   bool local)
4485 {
4486         struct dlm_rsb *r = lkb->lkb_resource;
4487         int error;
4488
4489         hold_rsb(r);
4490         lock_rsb(r);
4491
4492         error = validate_message(lkb, ms);
4493         if (error)
4494                 goto out;
4495
4496         /* local reply can happen with waiters_mutex held */
4497         error = remove_from_waiters_ms(lkb, ms, local);
4498         if (error)
4499                 goto out;
4500
4501         /* this is the value returned from do_cancel() on the master */
4502
4503         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4504         case -DLM_ECANCEL:
4505                 receive_flags_reply(lkb, ms, local);
4506                 revert_lock_pc(r, lkb);
4507                 queue_cast(r, lkb, -DLM_ECANCEL);
4508                 break;
4509         case 0:
4510                 break;
4511         default:
4512                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4513                           lkb->lkb_id,
4514                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4515         }
4516  out:
4517         unlock_rsb(r);
4518         put_rsb(r);
4519 }
4520
4521 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4522 {
4523         struct dlm_lkb *lkb;
4524         int error;
4525
4526         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4527         if (error)
4528                 return error;
4529
4530         _receive_cancel_reply(lkb, ms, false);
4531         dlm_put_lkb(lkb);
4532         return 0;
4533 }
4534
4535 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4536 {
4537         struct dlm_lkb *lkb;
4538         struct dlm_rsb *r;
4539         int error, ret_nodeid;
4540         int do_lookup_list = 0;
4541
4542         error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4543         if (error) {
4544                 log_error(ls, "%s no lkid %x", __func__,
4545                           le32_to_cpu(ms->m_lkid));
4546                 return;
4547         }
4548
4549         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4550            FIXME: will a non-zero error ever be returned? */
4551
4552         r = lkb->lkb_resource;
4553         hold_rsb(r);
4554         lock_rsb(r);
4555
4556         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4557         if (error)
4558                 goto out;
4559
4560         ret_nodeid = le32_to_cpu(ms->m_nodeid);
4561
4562         /* We sometimes receive a request from the dir node for this
4563            rsb before we've received the dir node's loookup_reply for it.
4564            The request from the dir node implies we're the master, so we set
4565            ourself as master in receive_request_reply, and verify here that
4566            we are indeed the master. */
4567
4568         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4569                 /* This should never happen */
4570                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4571                           "master %d dir %d our %d first %x %s",
4572                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4573                           ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4574                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4575         }
4576
4577         if (ret_nodeid == dlm_our_nodeid()) {
4578                 r->res_master_nodeid = ret_nodeid;
4579                 r->res_nodeid = 0;
4580                 do_lookup_list = 1;
4581                 r->res_first_lkid = 0;
4582         } else if (ret_nodeid == -1) {
4583                 /* the remote node doesn't believe it's the dir node */
4584                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4585                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4586                 r->res_master_nodeid = 0;
4587                 r->res_nodeid = -1;
4588                 lkb->lkb_nodeid = -1;
4589         } else {
4590                 /* set_master() will set lkb_nodeid from r */
4591                 r->res_master_nodeid = ret_nodeid;
4592                 r->res_nodeid = ret_nodeid;
4593         }
4594
4595         if (is_overlap(lkb)) {
4596                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4597                           lkb->lkb_id, dlm_iflags_val(lkb));
4598                 queue_cast_overlap(r, lkb);
4599                 unhold_lkb(lkb); /* undoes create_lkb() */
4600                 goto out_list;
4601         }
4602
4603         _request_lock(r, lkb);
4604
4605  out_list:
4606         if (do_lookup_list)
4607                 process_lookup_list(r);
4608  out:
4609         unlock_rsb(r);
4610         put_rsb(r);
4611         dlm_put_lkb(lkb);
4612 }
4613
4614 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4615                              uint32_t saved_seq)
4616 {
4617         int error = 0, noent = 0;
4618
4619         if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) {
4620                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4621                           le32_to_cpu(ms->m_type),
4622                           le32_to_cpu(ms->m_header.h_nodeid),
4623                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4624                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4625                 return;
4626         }
4627
4628         switch (ms->m_type) {
4629
4630         /* messages sent to a master node */
4631
4632         case cpu_to_le32(DLM_MSG_REQUEST):
4633                 error = receive_request(ls, ms);
4634                 break;
4635
4636         case cpu_to_le32(DLM_MSG_CONVERT):
4637                 error = receive_convert(ls, ms);
4638                 break;
4639
4640         case cpu_to_le32(DLM_MSG_UNLOCK):
4641                 error = receive_unlock(ls, ms);
4642                 break;
4643
4644         case cpu_to_le32(DLM_MSG_CANCEL):
4645                 noent = 1;
4646                 error = receive_cancel(ls, ms);
4647                 break;
4648
4649         /* messages sent from a master node (replies to above) */
4650
4651         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4652                 error = receive_request_reply(ls, ms);
4653                 break;
4654
4655         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4656                 error = receive_convert_reply(ls, ms);
4657                 break;
4658
4659         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4660                 error = receive_unlock_reply(ls, ms);
4661                 break;
4662
4663         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4664                 error = receive_cancel_reply(ls, ms);
4665                 break;
4666
4667         /* messages sent from a master node (only two types of async msg) */
4668
4669         case cpu_to_le32(DLM_MSG_GRANT):
4670                 noent = 1;
4671                 error = receive_grant(ls, ms);
4672                 break;
4673
4674         case cpu_to_le32(DLM_MSG_BAST):
4675                 noent = 1;
4676                 error = receive_bast(ls, ms);
4677                 break;
4678
4679         /* messages sent to a dir node */
4680
4681         case cpu_to_le32(DLM_MSG_LOOKUP):
4682                 receive_lookup(ls, ms);
4683                 break;
4684
4685         case cpu_to_le32(DLM_MSG_REMOVE):
4686                 receive_remove(ls, ms);
4687                 break;
4688
4689         /* messages sent from a dir node (remove has no reply) */
4690
4691         case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4692                 receive_lookup_reply(ls, ms);
4693                 break;
4694
4695         /* other messages */
4696
4697         case cpu_to_le32(DLM_MSG_PURGE):
4698                 receive_purge(ls, ms);
4699                 break;
4700
4701         default:
4702                 log_error(ls, "unknown message type %d",
4703                           le32_to_cpu(ms->m_type));
4704         }
4705
4706         /*
4707          * When checking for ENOENT, we're checking the result of
4708          * find_lkb(m_remid):
4709          *
4710          * The lock id referenced in the message wasn't found.  This may
4711          * happen in normal usage for the async messages and cancel, so
4712          * only use log_debug for them.
4713          *
4714          * Some errors are expected and normal.
4715          */
4716
4717         if (error == -ENOENT && noent) {
4718                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4719                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4720                           le32_to_cpu(ms->m_header.h_nodeid),
4721                           le32_to_cpu(ms->m_lkid), saved_seq);
4722         } else if (error == -ENOENT) {
4723                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4724                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4725                           le32_to_cpu(ms->m_header.h_nodeid),
4726                           le32_to_cpu(ms->m_lkid), saved_seq);
4727
4728                 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4729                         dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4730         }
4731
4732         if (error == -EINVAL) {
4733                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4734                           "saved_seq %u",
4735                           le32_to_cpu(ms->m_type),
4736                           le32_to_cpu(ms->m_header.h_nodeid),
4737                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4738                           saved_seq);
4739         }
4740 }
4741
4742 /* If the lockspace is in recovery mode (locking stopped), then normal
4743    messages are saved on the requestqueue for processing after recovery is
4744    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4745    messages off the requestqueue before we process new ones. This occurs right
4746    after recovery completes when we transition from saving all messages on
4747    requestqueue, to processing all the saved messages, to processing new
4748    messages as they arrive. */
4749
4750 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4751                                 int nodeid)
4752 {
4753         if (dlm_locking_stopped(ls)) {
4754                 /* If we were a member of this lockspace, left, and rejoined,
4755                    other nodes may still be sending us messages from the
4756                    lockspace generation before we left. */
4757                 if (!ls->ls_generation) {
4758                         log_limit(ls, "receive %d from %d ignore old gen",
4759                                   le32_to_cpu(ms->m_type), nodeid);
4760                         return;
4761                 }
4762
4763                 dlm_add_requestqueue(ls, nodeid, ms);
4764         } else {
4765                 dlm_wait_requestqueue(ls);
4766                 _receive_message(ls, ms, 0);
4767         }
4768 }
4769
4770 /* This is called by dlm_recoverd to process messages that were saved on
4771    the requestqueue. */
4772
4773 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4774                                uint32_t saved_seq)
4775 {
4776         _receive_message(ls, ms, saved_seq);
4777 }
4778
4779 /* This is called by the midcomms layer when something is received for
4780    the lockspace.  It could be either a MSG (normal message sent as part of
4781    standard locking activity) or an RCOM (recovery message sent as part of
4782    lockspace recovery). */
4783
4784 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4785 {
4786         struct dlm_header *hd = &p->header;
4787         struct dlm_ls *ls;
4788         int type = 0;
4789
4790         switch (hd->h_cmd) {
4791         case DLM_MSG:
4792                 type = le32_to_cpu(p->message.m_type);
4793                 break;
4794         case DLM_RCOM:
4795                 type = le32_to_cpu(p->rcom.rc_type);
4796                 break;
4797         default:
4798                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4799                 return;
4800         }
4801
4802         if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4803                 log_print("invalid h_nodeid %d from %d lockspace %x",
4804                           le32_to_cpu(hd->h_nodeid), nodeid,
4805                           le32_to_cpu(hd->u.h_lockspace));
4806                 return;
4807         }
4808
4809         ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4810         if (!ls) {
4811                 if (dlm_config.ci_log_debug) {
4812                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4813                                 "%u from %d cmd %d type %d\n",
4814                                 le32_to_cpu(hd->u.h_lockspace), nodeid,
4815                                 hd->h_cmd, type);
4816                 }
4817
4818                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4819                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4820                 return;
4821         }
4822
4823         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4824            be inactive (in this ls) before transitioning to recovery mode */
4825
4826         down_read(&ls->ls_recv_active);
4827         if (hd->h_cmd == DLM_MSG)
4828                 dlm_receive_message(ls, &p->message, nodeid);
4829         else if (hd->h_cmd == DLM_RCOM)
4830                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4831         else
4832                 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4833                           hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4834         up_read(&ls->ls_recv_active);
4835
4836         dlm_put_lockspace(ls);
4837 }
4838
4839 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4840                                    struct dlm_message *ms_local)
4841 {
4842         if (middle_conversion(lkb)) {
4843                 hold_lkb(lkb);
4844                 memset(ms_local, 0, sizeof(struct dlm_message));
4845                 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
4846                 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
4847                 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4848                 _receive_convert_reply(lkb, ms_local, true);
4849
4850                 /* Same special case as in receive_rcom_lock_args() */
4851                 lkb->lkb_grmode = DLM_LOCK_IV;
4852                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4853                 unhold_lkb(lkb);
4854
4855         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4856                 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4857         }
4858
4859         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4860            conversions are async; there's no reply from the remote master */
4861 }
4862
4863 /* A waiting lkb needs recovery if the master node has failed, or
4864    the master node is changing (only when no directory is used) */
4865
4866 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4867                                  int dir_nodeid)
4868 {
4869         if (dlm_no_directory(ls))
4870                 return 1;
4871
4872         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4873                 return 1;
4874
4875         return 0;
4876 }
4877
4878 /* Recovery for locks that are waiting for replies from nodes that are now
4879    gone.  We can just complete unlocks and cancels by faking a reply from the
4880    dead node.  Requests and up-conversions we flag to be resent after
4881    recovery.  Down-conversions can just be completed with a fake reply like
4882    unlocks.  Conversions between PR and CW need special attention. */
4883
4884 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4885 {
4886         struct dlm_lkb *lkb, *safe;
4887         struct dlm_message *ms_local;
4888         int wait_type, local_unlock_result, local_cancel_result;
4889         int dir_nodeid;
4890
4891         ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
4892         if (!ms_local)
4893                 return;
4894
4895         mutex_lock(&ls->ls_waiters_mutex);
4896
4897         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4898
4899                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4900
4901                 /* exclude debug messages about unlocks because there can be so
4902                    many and they aren't very interesting */
4903
4904                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4905                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4906                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4907                                   lkb->lkb_id,
4908                                   lkb->lkb_remid,
4909                                   lkb->lkb_wait_type,
4910                                   lkb->lkb_resource->res_nodeid,
4911                                   lkb->lkb_nodeid,
4912                                   lkb->lkb_wait_nodeid,
4913                                   dir_nodeid);
4914                 }
4915
4916                 /* all outstanding lookups, regardless of destination  will be
4917                    resent after recovery is done */
4918
4919                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4920                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4921                         continue;
4922                 }
4923
4924                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4925                         continue;
4926
4927                 wait_type = lkb->lkb_wait_type;
4928                 local_unlock_result = -DLM_EUNLOCK;
4929                 local_cancel_result = -DLM_ECANCEL;
4930
4931                 /* Main reply may have been received leaving a zero wait_type,
4932                    but a reply for the overlapping op may not have been
4933                    received.  In that case we need to fake the appropriate
4934                    reply for the overlap op. */
4935
4936                 if (!wait_type) {
4937                         if (is_overlap_cancel(lkb)) {
4938                                 wait_type = DLM_MSG_CANCEL;
4939                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4940                                         local_cancel_result = 0;
4941                         }
4942                         if (is_overlap_unlock(lkb)) {
4943                                 wait_type = DLM_MSG_UNLOCK;
4944                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4945                                         local_unlock_result = -ENOENT;
4946                         }
4947
4948                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4949                                   lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
4950                                   local_cancel_result, local_unlock_result);
4951                 }
4952
4953                 switch (wait_type) {
4954
4955                 case DLM_MSG_REQUEST:
4956                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4957                         break;
4958
4959                 case DLM_MSG_CONVERT:
4960                         recover_convert_waiter(ls, lkb, ms_local);
4961                         break;
4962
4963                 case DLM_MSG_UNLOCK:
4964                         hold_lkb(lkb);
4965                         memset(ms_local, 0, sizeof(struct dlm_message));
4966                         ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
4967                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
4968                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4969                         _receive_unlock_reply(lkb, ms_local, true);
4970                         dlm_put_lkb(lkb);
4971                         break;
4972
4973                 case DLM_MSG_CANCEL:
4974                         hold_lkb(lkb);
4975                         memset(ms_local, 0, sizeof(struct dlm_message));
4976                         ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
4977                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
4978                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4979                         _receive_cancel_reply(lkb, ms_local, true);
4980                         dlm_put_lkb(lkb);
4981                         break;
4982
4983                 default:
4984                         log_error(ls, "invalid lkb wait_type %d %d",
4985                                   lkb->lkb_wait_type, wait_type);
4986                 }
4987                 schedule();
4988         }
4989         mutex_unlock(&ls->ls_waiters_mutex);
4990         kfree(ms_local);
4991 }
4992
4993 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4994 {
4995         struct dlm_lkb *lkb = NULL, *iter;
4996
4997         mutex_lock(&ls->ls_waiters_mutex);
4998         list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
4999                 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5000                         hold_lkb(iter);
5001                         lkb = iter;
5002                         break;
5003                 }
5004         }
5005         mutex_unlock(&ls->ls_waiters_mutex);
5006
5007         return lkb;
5008 }
5009
5010 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5011    master or dir-node for r.  Processing the lkb may result in it being placed
5012    back on waiters. */
5013
5014 /* We do this after normal locking has been enabled and any saved messages
5015    (in requestqueue) have been processed.  We should be confident that at
5016    this point we won't get or process a reply to any of these waiting
5017    operations.  But, new ops may be coming in on the rsbs/locks here from
5018    userspace or remotely. */
5019
5020 /* there may have been an overlap unlock/cancel prior to recovery or after
5021    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5022    overlap flag would just have been set and nothing new sent.  we can be
5023    confident here than any replies to either the initial op or overlap ops
5024    prior to recovery have been received. */
5025
5026 int dlm_recover_waiters_post(struct dlm_ls *ls)
5027 {
5028         struct dlm_lkb *lkb;
5029         struct dlm_rsb *r;
5030         int error = 0, mstype, err, oc, ou;
5031
5032         while (1) {
5033                 if (dlm_locking_stopped(ls)) {
5034                         log_debug(ls, "recover_waiters_post aborted");
5035                         error = -EINTR;
5036                         break;
5037                 }
5038
5039                 lkb = find_resend_waiter(ls);
5040                 if (!lkb)
5041                         break;
5042
5043                 r = lkb->lkb_resource;
5044                 hold_rsb(r);
5045                 lock_rsb(r);
5046
5047                 mstype = lkb->lkb_wait_type;
5048                 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5049                                         &lkb->lkb_iflags);
5050                 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5051                                         &lkb->lkb_iflags);
5052                 err = 0;
5053
5054                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5055                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5056                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5057                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5058                           dlm_dir_nodeid(r), oc, ou);
5059
5060                 /* At this point we assume that we won't get a reply to any
5061                    previous op or overlap op on this lock.  First, do a big
5062                    remove_from_waiters() for all previous ops. */
5063
5064                 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5065                 lkb->lkb_wait_type = 0;
5066                 /* drop all wait_count references we still
5067                  * hold a reference for this iteration.
5068                  */
5069                 while (lkb->lkb_wait_count) {
5070                         lkb->lkb_wait_count--;
5071                         unhold_lkb(lkb);
5072                 }
5073                 mutex_lock(&ls->ls_waiters_mutex);
5074                 list_del_init(&lkb->lkb_wait_reply);
5075                 mutex_unlock(&ls->ls_waiters_mutex);
5076
5077                 if (oc || ou) {
5078                         /* do an unlock or cancel instead of resending */
5079                         switch (mstype) {
5080                         case DLM_MSG_LOOKUP:
5081                         case DLM_MSG_REQUEST:
5082                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5083                                                         -DLM_ECANCEL);
5084                                 unhold_lkb(lkb); /* undoes create_lkb() */
5085                                 break;
5086                         case DLM_MSG_CONVERT:
5087                                 if (oc) {
5088                                         queue_cast(r, lkb, -DLM_ECANCEL);
5089                                 } else {
5090                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5091                                         _unlock_lock(r, lkb);
5092                                 }
5093                                 break;
5094                         default:
5095                                 err = 1;
5096                         }
5097                 } else {
5098                         switch (mstype) {
5099                         case DLM_MSG_LOOKUP:
5100                         case DLM_MSG_REQUEST:
5101                                 _request_lock(r, lkb);
5102                                 if (is_master(r))
5103                                         confirm_master(r, 0);
5104                                 break;
5105                         case DLM_MSG_CONVERT:
5106                                 _convert_lock(r, lkb);
5107                                 break;
5108                         default:
5109                                 err = 1;
5110                         }
5111                 }
5112
5113                 if (err) {
5114                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5115                                   "dir_nodeid %d overlap %d %d",
5116                                   lkb->lkb_id, mstype, r->res_nodeid,
5117                                   dlm_dir_nodeid(r), oc, ou);
5118                 }
5119                 unlock_rsb(r);
5120                 put_rsb(r);
5121                 dlm_put_lkb(lkb);
5122         }
5123
5124         return error;
5125 }
5126
5127 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5128                               struct list_head *list)
5129 {
5130         struct dlm_lkb *lkb, *safe;
5131
5132         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5133                 if (!is_master_copy(lkb))
5134                         continue;
5135
5136                 /* don't purge lkbs we've added in recover_master_copy for
5137                    the current recovery seq */
5138
5139                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5140                         continue;
5141
5142                 del_lkb(r, lkb);
5143
5144                 /* this put should free the lkb */
5145                 if (!dlm_put_lkb(lkb))
5146                         log_error(ls, "purged mstcpy lkb not released");
5147         }
5148 }
5149
5150 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5151 {
5152         struct dlm_ls *ls = r->res_ls;
5153
5154         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5155         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5156         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5157 }
5158
5159 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5160                             struct list_head *list,
5161                             int nodeid_gone, unsigned int *count)
5162 {
5163         struct dlm_lkb *lkb, *safe;
5164
5165         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5166                 if (!is_master_copy(lkb))
5167                         continue;
5168
5169                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5170                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5171
5172                         /* tell recover_lvb to invalidate the lvb
5173                            because a node holding EX/PW failed */
5174                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5175                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5176                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5177                         }
5178
5179                         del_lkb(r, lkb);
5180
5181                         /* this put should free the lkb */
5182                         if (!dlm_put_lkb(lkb))
5183                                 log_error(ls, "purged dead lkb not released");
5184
5185                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5186
5187                         (*count)++;
5188                 }
5189         }
5190 }
5191
5192 /* Get rid of locks held by nodes that are gone. */
5193
5194 void dlm_recover_purge(struct dlm_ls *ls)
5195 {
5196         struct dlm_rsb *r;
5197         struct dlm_member *memb;
5198         int nodes_count = 0;
5199         int nodeid_gone = 0;
5200         unsigned int lkb_count = 0;
5201
5202         /* cache one removed nodeid to optimize the common
5203            case of a single node removed */
5204
5205         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5206                 nodes_count++;
5207                 nodeid_gone = memb->nodeid;
5208         }
5209
5210         if (!nodes_count)
5211                 return;
5212
5213         down_write(&ls->ls_root_sem);
5214         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5215                 hold_rsb(r);
5216                 lock_rsb(r);
5217                 if (is_master(r)) {
5218                         purge_dead_list(ls, r, &r->res_grantqueue,
5219                                         nodeid_gone, &lkb_count);
5220                         purge_dead_list(ls, r, &r->res_convertqueue,
5221                                         nodeid_gone, &lkb_count);
5222                         purge_dead_list(ls, r, &r->res_waitqueue,
5223                                         nodeid_gone, &lkb_count);
5224                 }
5225                 unlock_rsb(r);
5226                 unhold_rsb(r);
5227                 cond_resched();
5228         }
5229         up_write(&ls->ls_root_sem);
5230
5231         if (lkb_count)
5232                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5233                           lkb_count, nodes_count);
5234 }
5235
5236 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5237 {
5238         struct rb_node *n;
5239         struct dlm_rsb *r;
5240
5241         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5242         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5243                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5244
5245                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5246                         continue;
5247                 if (!is_master(r)) {
5248                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5249                         continue;
5250                 }
5251                 hold_rsb(r);
5252                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5253                 return r;
5254         }
5255         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5256         return NULL;
5257 }
5258
5259 /*
5260  * Attempt to grant locks on resources that we are the master of.
5261  * Locks may have become grantable during recovery because locks
5262  * from departed nodes have been purged (or not rebuilt), allowing
5263  * previously blocked locks to now be granted.  The subset of rsb's
5264  * we are interested in are those with lkb's on either the convert or
5265  * waiting queues.
5266  *
5267  * Simplest would be to go through each master rsb and check for non-empty
5268  * convert or waiting queues, and attempt to grant on those rsbs.
5269  * Checking the queues requires lock_rsb, though, for which we'd need
5270  * to release the rsbtbl lock.  This would make iterating through all
5271  * rsb's very inefficient.  So, we rely on earlier recovery routines
5272  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5273  * locks for.
5274  */
5275
5276 void dlm_recover_grant(struct dlm_ls *ls)
5277 {
5278         struct dlm_rsb *r;
5279         int bucket = 0;
5280         unsigned int count = 0;
5281         unsigned int rsb_count = 0;
5282         unsigned int lkb_count = 0;
5283
5284         while (1) {
5285                 r = find_grant_rsb(ls, bucket);
5286                 if (!r) {
5287                         if (bucket == ls->ls_rsbtbl_size - 1)
5288                                 break;
5289                         bucket++;
5290                         continue;
5291                 }
5292                 rsb_count++;
5293                 count = 0;
5294                 lock_rsb(r);
5295                 /* the RECOVER_GRANT flag is checked in the grant path */
5296                 grant_pending_locks(r, &count);
5297                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5298                 lkb_count += count;
5299                 confirm_master(r, 0);
5300                 unlock_rsb(r);
5301                 put_rsb(r);
5302                 cond_resched();
5303         }
5304
5305         if (lkb_count)
5306                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5307                           lkb_count, rsb_count);
5308 }
5309
5310 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5311                                          uint32_t remid)
5312 {
5313         struct dlm_lkb *lkb;
5314
5315         list_for_each_entry(lkb, head, lkb_statequeue) {
5316                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5317                         return lkb;
5318         }
5319         return NULL;
5320 }
5321
5322 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5323                                     uint32_t remid)
5324 {
5325         struct dlm_lkb *lkb;
5326
5327         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5328         if (lkb)
5329                 return lkb;
5330         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5331         if (lkb)
5332                 return lkb;
5333         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5334         if (lkb)
5335                 return lkb;
5336         return NULL;
5337 }
5338
5339 /* needs at least dlm_rcom + rcom_lock */
5340 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5341                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5342 {
5343         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5344
5345         lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5346         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5347         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5348         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5349         dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5350         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5351         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5352         lkb->lkb_rqmode = rl->rl_rqmode;
5353         lkb->lkb_grmode = rl->rl_grmode;
5354         /* don't set lkb_status because add_lkb wants to itself */
5355
5356         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5357         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5358
5359         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5360                 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5361                         sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5362                 if (lvblen > ls->ls_lvblen)
5363                         return -EINVAL;
5364                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5365                 if (!lkb->lkb_lvbptr)
5366                         return -ENOMEM;
5367                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5368         }
5369
5370         /* Conversions between PR and CW (middle modes) need special handling.
5371            The real granted mode of these converting locks cannot be determined
5372            until all locks have been rebuilt on the rsb (recover_conversion) */
5373
5374         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5375             middle_conversion(lkb)) {
5376                 rl->rl_status = DLM_LKSTS_CONVERT;
5377                 lkb->lkb_grmode = DLM_LOCK_IV;
5378                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5379         }
5380
5381         return 0;
5382 }
5383
5384 /* This lkb may have been recovered in a previous aborted recovery so we need
5385    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5386    If so we just send back a standard reply.  If not, we create a new lkb with
5387    the given values and send back our lkid.  We send back our lkid by sending
5388    back the rcom_lock struct we got but with the remid field filled in. */
5389
5390 /* needs at least dlm_rcom + rcom_lock */
5391 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5392 {
5393         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5394         struct dlm_rsb *r;
5395         struct dlm_lkb *lkb;
5396         uint32_t remid = 0;
5397         int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5398         int error;
5399
5400         if (rl->rl_parent_lkid) {
5401                 error = -EOPNOTSUPP;
5402                 goto out;
5403         }
5404
5405         remid = le32_to_cpu(rl->rl_lkid);
5406
5407         /* In general we expect the rsb returned to be R_MASTER, but we don't
5408            have to require it.  Recovery of masters on one node can overlap
5409            recovery of locks on another node, so one node can send us MSTCPY
5410            locks before we've made ourselves master of this rsb.  We can still
5411            add new MSTCPY locks that we receive here without any harm; when
5412            we make ourselves master, dlm_recover_masters() won't touch the
5413            MSTCPY locks we've received early. */
5414
5415         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5416                          from_nodeid, R_RECEIVE_RECOVER, &r);
5417         if (error)
5418                 goto out;
5419
5420         lock_rsb(r);
5421
5422         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5423                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5424                           from_nodeid, remid);
5425                 error = -EBADR;
5426                 goto out_unlock;
5427         }
5428
5429         lkb = search_remid(r, from_nodeid, remid);
5430         if (lkb) {
5431                 error = -EEXIST;
5432                 goto out_remid;
5433         }
5434
5435         error = create_lkb(ls, &lkb);
5436         if (error)
5437                 goto out_unlock;
5438
5439         error = receive_rcom_lock_args(ls, lkb, r, rc);
5440         if (error) {
5441                 __put_lkb(ls, lkb);
5442                 goto out_unlock;
5443         }
5444
5445         attach_lkb(r, lkb);
5446         add_lkb(r, lkb, rl->rl_status);
5447         ls->ls_recover_locks_in++;
5448
5449         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5450                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5451
5452  out_remid:
5453         /* this is the new value returned to the lock holder for
5454            saving in its process-copy lkb */
5455         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5456
5457         lkb->lkb_recover_seq = ls->ls_recover_seq;
5458
5459  out_unlock:
5460         unlock_rsb(r);
5461         put_rsb(r);
5462  out:
5463         if (error && error != -EEXIST)
5464                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5465                           from_nodeid, remid, error);
5466         rl->rl_result = cpu_to_le32(error);
5467         return error;
5468 }
5469
5470 /* needs at least dlm_rcom + rcom_lock */
5471 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5472 {
5473         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5474         struct dlm_rsb *r;
5475         struct dlm_lkb *lkb;
5476         uint32_t lkid, remid;
5477         int error, result;
5478
5479         lkid = le32_to_cpu(rl->rl_lkid);
5480         remid = le32_to_cpu(rl->rl_remid);
5481         result = le32_to_cpu(rl->rl_result);
5482
5483         error = find_lkb(ls, lkid, &lkb);
5484         if (error) {
5485                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5486                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5487                           result);
5488                 return error;
5489         }
5490
5491         r = lkb->lkb_resource;
5492         hold_rsb(r);
5493         lock_rsb(r);
5494
5495         if (!is_process_copy(lkb)) {
5496                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5497                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5498                           result);
5499                 dlm_dump_rsb(r);
5500                 unlock_rsb(r);
5501                 put_rsb(r);
5502                 dlm_put_lkb(lkb);
5503                 return -EINVAL;
5504         }
5505
5506         switch (result) {
5507         case -EBADR:
5508                 /* There's a chance the new master received our lock before
5509                    dlm_recover_master_reply(), this wouldn't happen if we did
5510                    a barrier between recover_masters and recover_locks. */
5511
5512                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5513                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5514                           result);
5515         
5516                 dlm_send_rcom_lock(r, lkb);
5517                 goto out;
5518         case -EEXIST:
5519         case 0:
5520                 lkb->lkb_remid = remid;
5521                 break;
5522         default:
5523                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5524                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5525                           result);
5526         }
5527
5528         /* an ack for dlm_recover_locks() which waits for replies from
5529            all the locks it sends to new masters */
5530         dlm_recovered_lock(r);
5531  out:
5532         unlock_rsb(r);
5533         put_rsb(r);
5534         dlm_put_lkb(lkb);
5535
5536         return 0;
5537 }
5538
5539 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5540                      int mode, uint32_t flags, void *name, unsigned int namelen)
5541 {
5542         struct dlm_lkb *lkb;
5543         struct dlm_args args;
5544         bool do_put = true;
5545         int error;
5546
5547         dlm_lock_recovery(ls);
5548
5549         error = create_lkb(ls, &lkb);
5550         if (error) {
5551                 kfree(ua);
5552                 goto out;
5553         }
5554
5555         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5556
5557         if (flags & DLM_LKF_VALBLK) {
5558                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5559                 if (!ua->lksb.sb_lvbptr) {
5560                         kfree(ua);
5561                         error = -ENOMEM;
5562                         goto out_put;
5563                 }
5564         }
5565         error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5566                               fake_bastfn, &args);
5567         if (error) {
5568                 kfree(ua->lksb.sb_lvbptr);
5569                 ua->lksb.sb_lvbptr = NULL;
5570                 kfree(ua);
5571                 goto out_put;
5572         }
5573
5574         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5575            When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5576            lock and that lkb_astparam is the dlm_user_args structure. */
5577         set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5578         error = request_lock(ls, lkb, name, namelen, &args);
5579
5580         switch (error) {
5581         case 0:
5582                 break;
5583         case -EINPROGRESS:
5584                 error = 0;
5585                 break;
5586         case -EAGAIN:
5587                 error = 0;
5588                 fallthrough;
5589         default:
5590                 goto out_put;
5591         }
5592
5593         /* add this new lkb to the per-process list of locks */
5594         spin_lock(&ua->proc->locks_spin);
5595         hold_lkb(lkb);
5596         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5597         spin_unlock(&ua->proc->locks_spin);
5598         do_put = false;
5599  out_put:
5600         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5601         if (do_put)
5602                 __put_lkb(ls, lkb);
5603  out:
5604         dlm_unlock_recovery(ls);
5605         return error;
5606 }
5607
5608 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5609                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5610 {
5611         struct dlm_lkb *lkb;
5612         struct dlm_args args;
5613         struct dlm_user_args *ua;
5614         int error;
5615
5616         dlm_lock_recovery(ls);
5617
5618         error = find_lkb(ls, lkid, &lkb);
5619         if (error)
5620                 goto out;
5621
5622         trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5623
5624         /* user can change the params on its lock when it converts it, or
5625            add an lvb that didn't exist before */
5626
5627         ua = lkb->lkb_ua;
5628
5629         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5630                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5631                 if (!ua->lksb.sb_lvbptr) {
5632                         error = -ENOMEM;
5633                         goto out_put;
5634                 }
5635         }
5636         if (lvb_in && ua->lksb.sb_lvbptr)
5637                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5638
5639         ua->xid = ua_tmp->xid;
5640         ua->castparam = ua_tmp->castparam;
5641         ua->castaddr = ua_tmp->castaddr;
5642         ua->bastparam = ua_tmp->bastparam;
5643         ua->bastaddr = ua_tmp->bastaddr;
5644         ua->user_lksb = ua_tmp->user_lksb;
5645
5646         error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5647                               fake_bastfn, &args);
5648         if (error)
5649                 goto out_put;
5650
5651         error = convert_lock(ls, lkb, &args);
5652
5653         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5654                 error = 0;
5655  out_put:
5656         trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5657         dlm_put_lkb(lkb);
5658  out:
5659         dlm_unlock_recovery(ls);
5660         kfree(ua_tmp);
5661         return error;
5662 }
5663
5664 /*
5665  * The caller asks for an orphan lock on a given resource with a given mode.
5666  * If a matching lock exists, it's moved to the owner's list of locks and
5667  * the lkid is returned.
5668  */
5669
5670 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5671                      int mode, uint32_t flags, void *name, unsigned int namelen,
5672                      uint32_t *lkid)
5673 {
5674         struct dlm_lkb *lkb = NULL, *iter;
5675         struct dlm_user_args *ua;
5676         int found_other_mode = 0;
5677         int rv = 0;
5678
5679         mutex_lock(&ls->ls_orphans_mutex);
5680         list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5681                 if (iter->lkb_resource->res_length != namelen)
5682                         continue;
5683                 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5684                         continue;
5685                 if (iter->lkb_grmode != mode) {
5686                         found_other_mode = 1;
5687                         continue;
5688                 }
5689
5690                 lkb = iter;
5691                 list_del_init(&iter->lkb_ownqueue);
5692                 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5693                 *lkid = iter->lkb_id;
5694                 break;
5695         }
5696         mutex_unlock(&ls->ls_orphans_mutex);
5697
5698         if (!lkb && found_other_mode) {
5699                 rv = -EAGAIN;
5700                 goto out;
5701         }
5702
5703         if (!lkb) {
5704                 rv = -ENOENT;
5705                 goto out;
5706         }
5707
5708         lkb->lkb_exflags = flags;
5709         lkb->lkb_ownpid = (int) current->pid;
5710
5711         ua = lkb->lkb_ua;
5712
5713         ua->proc = ua_tmp->proc;
5714         ua->xid = ua_tmp->xid;
5715         ua->castparam = ua_tmp->castparam;
5716         ua->castaddr = ua_tmp->castaddr;
5717         ua->bastparam = ua_tmp->bastparam;
5718         ua->bastaddr = ua_tmp->bastaddr;
5719         ua->user_lksb = ua_tmp->user_lksb;
5720
5721         /*
5722          * The lkb reference from the ls_orphans list was not
5723          * removed above, and is now considered the reference
5724          * for the proc locks list.
5725          */
5726
5727         spin_lock(&ua->proc->locks_spin);
5728         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5729         spin_unlock(&ua->proc->locks_spin);
5730  out:
5731         kfree(ua_tmp);
5732         return rv;
5733 }
5734
5735 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5736                     uint32_t flags, uint32_t lkid, char *lvb_in)
5737 {
5738         struct dlm_lkb *lkb;
5739         struct dlm_args args;
5740         struct dlm_user_args *ua;
5741         int error;
5742
5743         dlm_lock_recovery(ls);
5744
5745         error = find_lkb(ls, lkid, &lkb);
5746         if (error)
5747                 goto out;
5748
5749         trace_dlm_unlock_start(ls, lkb, flags);
5750
5751         ua = lkb->lkb_ua;
5752
5753         if (lvb_in && ua->lksb.sb_lvbptr)
5754                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5755         if (ua_tmp->castparam)
5756                 ua->castparam = ua_tmp->castparam;
5757         ua->user_lksb = ua_tmp->user_lksb;
5758
5759         error = set_unlock_args(flags, ua, &args);
5760         if (error)
5761                 goto out_put;
5762
5763         error = unlock_lock(ls, lkb, &args);
5764
5765         if (error == -DLM_EUNLOCK)
5766                 error = 0;
5767         /* from validate_unlock_args() */
5768         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5769                 error = 0;
5770         if (error)
5771                 goto out_put;
5772
5773         spin_lock(&ua->proc->locks_spin);
5774         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5775         if (!list_empty(&lkb->lkb_ownqueue))
5776                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5777         spin_unlock(&ua->proc->locks_spin);
5778  out_put:
5779         trace_dlm_unlock_end(ls, lkb, flags, error);
5780         dlm_put_lkb(lkb);
5781  out:
5782         dlm_unlock_recovery(ls);
5783         kfree(ua_tmp);
5784         return error;
5785 }
5786
5787 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5788                     uint32_t flags, uint32_t lkid)
5789 {
5790         struct dlm_lkb *lkb;
5791         struct dlm_args args;
5792         struct dlm_user_args *ua;
5793         int error;
5794
5795         dlm_lock_recovery(ls);
5796
5797         error = find_lkb(ls, lkid, &lkb);
5798         if (error)
5799                 goto out;
5800
5801         trace_dlm_unlock_start(ls, lkb, flags);
5802
5803         ua = lkb->lkb_ua;
5804         if (ua_tmp->castparam)
5805                 ua->castparam = ua_tmp->castparam;
5806         ua->user_lksb = ua_tmp->user_lksb;
5807
5808         error = set_unlock_args(flags, ua, &args);
5809         if (error)
5810                 goto out_put;
5811
5812         error = cancel_lock(ls, lkb, &args);
5813
5814         if (error == -DLM_ECANCEL)
5815                 error = 0;
5816         /* from validate_unlock_args() */
5817         if (error == -EBUSY)
5818                 error = 0;
5819  out_put:
5820         trace_dlm_unlock_end(ls, lkb, flags, error);
5821         dlm_put_lkb(lkb);
5822  out:
5823         dlm_unlock_recovery(ls);
5824         kfree(ua_tmp);
5825         return error;
5826 }
5827
5828 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5829 {
5830         struct dlm_lkb *lkb;
5831         struct dlm_args args;
5832         struct dlm_user_args *ua;
5833         struct dlm_rsb *r;
5834         int error;
5835
5836         dlm_lock_recovery(ls);
5837
5838         error = find_lkb(ls, lkid, &lkb);
5839         if (error)
5840                 goto out;
5841
5842         trace_dlm_unlock_start(ls, lkb, flags);
5843
5844         ua = lkb->lkb_ua;
5845
5846         error = set_unlock_args(flags, ua, &args);
5847         if (error)
5848                 goto out_put;
5849
5850         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5851
5852         r = lkb->lkb_resource;
5853         hold_rsb(r);
5854         lock_rsb(r);
5855
5856         error = validate_unlock_args(lkb, &args);
5857         if (error)
5858                 goto out_r;
5859         set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
5860
5861         error = _cancel_lock(r, lkb);
5862  out_r:
5863         unlock_rsb(r);
5864         put_rsb(r);
5865
5866         if (error == -DLM_ECANCEL)
5867                 error = 0;
5868         /* from validate_unlock_args() */
5869         if (error == -EBUSY)
5870                 error = 0;
5871  out_put:
5872         trace_dlm_unlock_end(ls, lkb, flags, error);
5873         dlm_put_lkb(lkb);
5874  out:
5875         dlm_unlock_recovery(ls);
5876         return error;
5877 }
5878
5879 /* lkb's that are removed from the waiters list by revert are just left on the
5880    orphans list with the granted orphan locks, to be freed by purge */
5881
5882 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5883 {
5884         struct dlm_args args;
5885         int error;
5886
5887         hold_lkb(lkb); /* reference for the ls_orphans list */
5888         mutex_lock(&ls->ls_orphans_mutex);
5889         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5890         mutex_unlock(&ls->ls_orphans_mutex);
5891
5892         set_unlock_args(0, lkb->lkb_ua, &args);
5893
5894         error = cancel_lock(ls, lkb, &args);
5895         if (error == -DLM_ECANCEL)
5896                 error = 0;
5897         return error;
5898 }
5899
5900 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
5901    granted.  Regardless of what rsb queue the lock is on, it's removed and
5902    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
5903    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
5904
5905 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5906 {
5907         struct dlm_args args;
5908         int error;
5909
5910         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
5911                         lkb->lkb_ua, &args);
5912
5913         error = unlock_lock(ls, lkb, &args);
5914         if (error == -DLM_EUNLOCK)
5915                 error = 0;
5916         return error;
5917 }
5918
5919 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5920    (which does lock_rsb) due to deadlock with receiving a message that does
5921    lock_rsb followed by dlm_user_add_cb() */
5922
5923 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5924                                      struct dlm_user_proc *proc)
5925 {
5926         struct dlm_lkb *lkb = NULL;
5927
5928         spin_lock(&ls->ls_clear_proc_locks);
5929         if (list_empty(&proc->locks))
5930                 goto out;
5931
5932         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5933         list_del_init(&lkb->lkb_ownqueue);
5934
5935         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5936                 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
5937         else
5938                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5939  out:
5940         spin_unlock(&ls->ls_clear_proc_locks);
5941         return lkb;
5942 }
5943
5944 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5945    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5946    which we clear here. */
5947
5948 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5949    list, and no more device_writes should add lkb's to proc->locks list; so we
5950    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5951    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5952    them ourself. */
5953
5954 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5955 {
5956         struct dlm_lkb *lkb, *safe;
5957
5958         dlm_lock_recovery(ls);
5959
5960         while (1) {
5961                 lkb = del_proc_lock(ls, proc);
5962                 if (!lkb)
5963                         break;
5964                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5965                         orphan_proc_lock(ls, lkb);
5966                 else
5967                         unlock_proc_lock(ls, lkb);
5968
5969                 /* this removes the reference for the proc->locks list
5970                    added by dlm_user_request, it may result in the lkb
5971                    being freed */
5972
5973                 dlm_put_lkb(lkb);
5974         }
5975
5976         spin_lock(&ls->ls_clear_proc_locks);
5977
5978         /* in-progress unlocks */
5979         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5980                 list_del_init(&lkb->lkb_ownqueue);
5981                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5982                 dlm_put_lkb(lkb);
5983         }
5984
5985         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5986                 dlm_purge_lkb_callbacks(lkb);
5987                 list_del_init(&lkb->lkb_cb_list);
5988                 dlm_put_lkb(lkb);
5989         }
5990
5991         spin_unlock(&ls->ls_clear_proc_locks);
5992         dlm_unlock_recovery(ls);
5993 }
5994
5995 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5996 {
5997         struct dlm_lkb *lkb, *safe;
5998
5999         while (1) {
6000                 lkb = NULL;
6001                 spin_lock(&proc->locks_spin);
6002                 if (!list_empty(&proc->locks)) {
6003                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6004                                          lkb_ownqueue);
6005                         list_del_init(&lkb->lkb_ownqueue);
6006                 }
6007                 spin_unlock(&proc->locks_spin);
6008
6009                 if (!lkb)
6010                         break;
6011
6012                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6013                 unlock_proc_lock(ls, lkb);
6014                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6015         }
6016
6017         spin_lock(&proc->locks_spin);
6018         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6019                 list_del_init(&lkb->lkb_ownqueue);
6020                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6021                 dlm_put_lkb(lkb);
6022         }
6023         spin_unlock(&proc->locks_spin);
6024
6025         spin_lock(&proc->asts_spin);
6026         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6027                 dlm_purge_lkb_callbacks(lkb);
6028                 list_del_init(&lkb->lkb_cb_list);
6029                 dlm_put_lkb(lkb);
6030         }
6031         spin_unlock(&proc->asts_spin);
6032 }
6033
6034 /* pid of 0 means purge all orphans */
6035
6036 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6037 {
6038         struct dlm_lkb *lkb, *safe;
6039
6040         mutex_lock(&ls->ls_orphans_mutex);
6041         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6042                 if (pid && lkb->lkb_ownpid != pid)
6043                         continue;
6044                 unlock_proc_lock(ls, lkb);
6045                 list_del_init(&lkb->lkb_ownqueue);
6046                 dlm_put_lkb(lkb);
6047         }
6048         mutex_unlock(&ls->ls_orphans_mutex);
6049 }
6050
6051 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6052 {
6053         struct dlm_message *ms;
6054         struct dlm_mhandle *mh;
6055         int error;
6056
6057         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6058                                 DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
6059         if (error)
6060                 return error;
6061         ms->m_nodeid = cpu_to_le32(nodeid);
6062         ms->m_pid = cpu_to_le32(pid);
6063
6064         return send_message(mh, ms, NULL, 0);
6065 }
6066
6067 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6068                    int nodeid, int pid)
6069 {
6070         int error = 0;
6071
6072         if (nodeid && (nodeid != dlm_our_nodeid())) {
6073                 error = send_purge(ls, nodeid, pid);
6074         } else {
6075                 dlm_lock_recovery(ls);
6076                 if (pid == current->pid)
6077                         purge_proc_locks(ls, proc);
6078                 else
6079                         do_purge(ls, nodeid, pid);
6080                 dlm_unlock_recovery(ls);
6081         }
6082         return error;
6083 }
6084
6085 /* debug functionality */
6086 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6087                       int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6088 {
6089         struct dlm_lksb *lksb;
6090         struct dlm_lkb *lkb;
6091         struct dlm_rsb *r;
6092         int error;
6093
6094         /* we currently can't set a valid user lock */
6095         if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6096                 return -EOPNOTSUPP;
6097
6098         lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6099         if (!lksb)
6100                 return -ENOMEM;
6101
6102         error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6103         if (error) {
6104                 kfree(lksb);
6105                 return error;
6106         }
6107
6108         dlm_set_dflags_val(lkb, lkb_dflags);
6109         lkb->lkb_nodeid = lkb_nodeid;
6110         lkb->lkb_lksb = lksb;
6111         /* user specific pointer, just don't have it NULL for kernel locks */
6112         if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6113                 lkb->lkb_astparam = (void *)0xDEADBEEF;
6114
6115         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6116         if (error) {
6117                 kfree(lksb);
6118                 __put_lkb(ls, lkb);
6119                 return error;
6120         }
6121
6122         lock_rsb(r);
6123         attach_lkb(r, lkb);
6124         add_lkb(r, lkb, lkb_status);
6125         unlock_rsb(r);
6126         put_rsb(r);
6127
6128         return 0;
6129 }
6130
6131 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6132                                  int mstype, int to_nodeid)
6133 {
6134         struct dlm_lkb *lkb;
6135         int error;
6136
6137         error = find_lkb(ls, lkb_id, &lkb);
6138         if (error)
6139                 return error;
6140
6141         error = add_to_waiters(lkb, mstype, to_nodeid);
6142         dlm_put_lkb(lkb);
6143         return error;
6144 }
6145