Merge tag 'phy-for-6.6' of git://git.kernel.org/pub/scm/linux/kernel/git/phy/linux-phy
[platform/kernel/linux-starfive.git] / fs / dlm / lock.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13    dlm_lock()
14    dlm_unlock()
15
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void toss_rsb(struct kref *kref);
93
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168                (unsigned long long)lkb->lkb_recover_seq);
169 }
170
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174                "rlc %d name %s\n",
175                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177                r->res_name);
178 }
179
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182         struct dlm_lkb *lkb;
183
184         dlm_print_rsb(r);
185
186         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188         printk(KERN_ERR "rsb lookup list\n");
189         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb grant queue:\n");
192         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194         printk(KERN_ERR "rsb convert queue:\n");
195         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196                 dlm_print_lkb(lkb);
197         printk(KERN_ERR "rsb wait queue:\n");
198         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199                 dlm_print_lkb(lkb);
200 }
201
202 /* Threads cannot use the lockspace while it's being recovered */
203
204 static inline void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206         down_read(&ls->ls_in_recovery);
207 }
208
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211         up_read(&ls->ls_in_recovery);
212 }
213
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216         return down_read_trylock(&ls->ls_in_recovery);
217 }
218
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231         return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236         return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247         return !!r->res_nodeid;
248 }
249
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252         return lkb->lkb_nodeid &&
253                !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287                test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297         if (rv == -DLM_ECANCEL &&
298             test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299                 rv = -EDEADLK;
300
301         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306         queue_cast(r, lkb,
307                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312         if (is_master_copy(lkb)) {
313                 send_bast(r, lkb, rqmode);
314         } else {
315                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316         }
317 }
318
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322
323 /* This is only called to add a reference when the code already holds
324    a valid reference to the rsb, so there's no need for locking. */
325
326 static inline void hold_rsb(struct dlm_rsb *r)
327 {
328         kref_get(&r->res_ref);
329 }
330
331 void dlm_hold_rsb(struct dlm_rsb *r)
332 {
333         hold_rsb(r);
334 }
335
336 /* When all references to the rsb are gone it's transferred to
337    the tossed list for later disposal. */
338
339 static void put_rsb(struct dlm_rsb *r)
340 {
341         struct dlm_ls *ls = r->res_ls;
342         uint32_t bucket = r->res_bucket;
343         int rv;
344
345         rv = kref_put_lock(&r->res_ref, toss_rsb,
346                            &ls->ls_rsbtbl[bucket].lock);
347         if (rv)
348                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
349 }
350
351 void dlm_put_rsb(struct dlm_rsb *r)
352 {
353         put_rsb(r);
354 }
355
356 static int pre_rsb_struct(struct dlm_ls *ls)
357 {
358         struct dlm_rsb *r1, *r2;
359         int count = 0;
360
361         spin_lock(&ls->ls_new_rsb_spin);
362         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
363                 spin_unlock(&ls->ls_new_rsb_spin);
364                 return 0;
365         }
366         spin_unlock(&ls->ls_new_rsb_spin);
367
368         r1 = dlm_allocate_rsb(ls);
369         r2 = dlm_allocate_rsb(ls);
370
371         spin_lock(&ls->ls_new_rsb_spin);
372         if (r1) {
373                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
374                 ls->ls_new_rsb_count++;
375         }
376         if (r2) {
377                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
378                 ls->ls_new_rsb_count++;
379         }
380         count = ls->ls_new_rsb_count;
381         spin_unlock(&ls->ls_new_rsb_spin);
382
383         if (!count)
384                 return -ENOMEM;
385         return 0;
386 }
387
388 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
389    unlock any spinlocks, go back and call pre_rsb_struct again.
390    Otherwise, take an rsb off the list and return it. */
391
392 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
393                           struct dlm_rsb **r_ret)
394 {
395         struct dlm_rsb *r;
396         int count;
397
398         spin_lock(&ls->ls_new_rsb_spin);
399         if (list_empty(&ls->ls_new_rsb)) {
400                 count = ls->ls_new_rsb_count;
401                 spin_unlock(&ls->ls_new_rsb_spin);
402                 log_debug(ls, "find_rsb retry %d %d %s",
403                           count, dlm_config.ci_new_rsb_count,
404                           (const char *)name);
405                 return -EAGAIN;
406         }
407
408         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
409         list_del(&r->res_hashchain);
410         /* Convert the empty list_head to a NULL rb_node for tree usage: */
411         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
412         ls->ls_new_rsb_count--;
413         spin_unlock(&ls->ls_new_rsb_spin);
414
415         r->res_ls = ls;
416         r->res_length = len;
417         memcpy(r->res_name, name, len);
418         mutex_init(&r->res_mutex);
419
420         INIT_LIST_HEAD(&r->res_lookup);
421         INIT_LIST_HEAD(&r->res_grantqueue);
422         INIT_LIST_HEAD(&r->res_convertqueue);
423         INIT_LIST_HEAD(&r->res_waitqueue);
424         INIT_LIST_HEAD(&r->res_root_list);
425         INIT_LIST_HEAD(&r->res_recover_list);
426
427         *r_ret = r;
428         return 0;
429 }
430
431 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
432 {
433         char maxname[DLM_RESNAME_MAXLEN];
434
435         memset(maxname, 0, DLM_RESNAME_MAXLEN);
436         memcpy(maxname, name, nlen);
437         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
438 }
439
440 int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
441                         struct dlm_rsb **r_ret)
442 {
443         struct rb_node *node = tree->rb_node;
444         struct dlm_rsb *r;
445         int rc;
446
447         while (node) {
448                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
449                 rc = rsb_cmp(r, name, len);
450                 if (rc < 0)
451                         node = node->rb_left;
452                 else if (rc > 0)
453                         node = node->rb_right;
454                 else
455                         goto found;
456         }
457         *r_ret = NULL;
458         return -EBADR;
459
460  found:
461         *r_ret = r;
462         return 0;
463 }
464
465 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
466 {
467         struct rb_node **newn = &tree->rb_node;
468         struct rb_node *parent = NULL;
469         int rc;
470
471         while (*newn) {
472                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
473                                                res_hashnode);
474
475                 parent = *newn;
476                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
477                 if (rc < 0)
478                         newn = &parent->rb_left;
479                 else if (rc > 0)
480                         newn = &parent->rb_right;
481                 else {
482                         log_print("rsb_insert match");
483                         dlm_dump_rsb(rsb);
484                         dlm_dump_rsb(cur);
485                         return -EEXIST;
486                 }
487         }
488
489         rb_link_node(&rsb->res_hashnode, parent, newn);
490         rb_insert_color(&rsb->res_hashnode, tree);
491         return 0;
492 }
493
494 /*
495  * Find rsb in rsbtbl and potentially create/add one
496  *
497  * Delaying the release of rsb's has a similar benefit to applications keeping
498  * NL locks on an rsb, but without the guarantee that the cached master value
499  * will still be valid when the rsb is reused.  Apps aren't always smart enough
500  * to keep NL locks on an rsb that they may lock again shortly; this can lead
501  * to excessive master lookups and removals if we don't delay the release.
502  *
503  * Searching for an rsb means looking through both the normal list and toss
504  * list.  When found on the toss list the rsb is moved to the normal list with
505  * ref count of 1; when found on normal list the ref count is incremented.
506  *
507  * rsb's on the keep list are being used locally and refcounted.
508  * rsb's on the toss list are not being used locally, and are not refcounted.
509  *
510  * The toss list rsb's were either
511  * - previously used locally but not any more (were on keep list, then
512  *   moved to toss list when last refcount dropped)
513  * - created and put on toss list as a directory record for a lookup
514  *   (we are the dir node for the res, but are not using the res right now,
515  *   but some other node is)
516  *
517  * The purpose of find_rsb() is to return a refcounted rsb for local use.
518  * So, if the given rsb is on the toss list, it is moved to the keep list
519  * before being returned.
520  *
521  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
522  * more refcounts exist, so the rsb is moved from the keep list to the
523  * toss list.
524  *
525  * rsb's on both keep and toss lists are used for doing a name to master
526  * lookups.  rsb's that are in use locally (and being refcounted) are on
527  * the keep list, rsb's that are not in use locally (not refcounted) and
528  * only exist for name/master lookups are on the toss list.
529  *
530  * rsb's on the toss list who's dir_nodeid is not local can have stale
531  * name/master mappings.  So, remote requests on such rsb's can potentially
532  * return with an error, which means the mapping is stale and needs to
533  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
534  * first_lkid is to keep only a single outstanding request on an rsb
535  * while that rsb has a potentially stale master.)
536  */
537
538 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
539                         uint32_t hash, uint32_t b,
540                         int dir_nodeid, int from_nodeid,
541                         unsigned int flags, struct dlm_rsb **r_ret)
542 {
543         struct dlm_rsb *r = NULL;
544         int our_nodeid = dlm_our_nodeid();
545         int from_local = 0;
546         int from_other = 0;
547         int from_dir = 0;
548         int create = 0;
549         int error;
550
551         if (flags & R_RECEIVE_REQUEST) {
552                 if (from_nodeid == dir_nodeid)
553                         from_dir = 1;
554                 else
555                         from_other = 1;
556         } else if (flags & R_REQUEST) {
557                 from_local = 1;
558         }
559
560         /*
561          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
562          * from_nodeid has sent us a lock in dlm_recover_locks, believing
563          * we're the new master.  Our local recovery may not have set
564          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
565          * create the rsb; dlm_recover_process_copy() will handle EBADR
566          * by resending.
567          *
568          * If someone sends us a request, we are the dir node, and we do
569          * not find the rsb anywhere, then recreate it.  This happens if
570          * someone sends us a request after we have removed/freed an rsb
571          * from our toss list.  (They sent a request instead of lookup
572          * because they are using an rsb from their toss list.)
573          */
574
575         if (from_local || from_dir ||
576             (from_other && (dir_nodeid == our_nodeid))) {
577                 create = 1;
578         }
579
580  retry:
581         if (create) {
582                 error = pre_rsb_struct(ls);
583                 if (error < 0)
584                         goto out;
585         }
586
587         spin_lock(&ls->ls_rsbtbl[b].lock);
588
589         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
590         if (error)
591                 goto do_toss;
592         
593         /*
594          * rsb is active, so we can't check master_nodeid without lock_rsb.
595          */
596
597         kref_get(&r->res_ref);
598         goto out_unlock;
599
600
601  do_toss:
602         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
603         if (error)
604                 goto do_new;
605
606         /*
607          * rsb found inactive (master_nodeid may be out of date unless
608          * we are the dir_nodeid or were the master)  No other thread
609          * is using this rsb because it's on the toss list, so we can
610          * look at or update res_master_nodeid without lock_rsb.
611          */
612
613         if ((r->res_master_nodeid != our_nodeid) && from_other) {
614                 /* our rsb was not master, and another node (not the dir node)
615                    has sent us a request */
616                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
617                           from_nodeid, r->res_master_nodeid, dir_nodeid,
618                           r->res_name);
619                 error = -ENOTBLK;
620                 goto out_unlock;
621         }
622
623         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
624                 /* don't think this should ever happen */
625                 log_error(ls, "find_rsb toss from_dir %d master %d",
626                           from_nodeid, r->res_master_nodeid);
627                 dlm_print_rsb(r);
628                 /* fix it and go on */
629                 r->res_master_nodeid = our_nodeid;
630                 r->res_nodeid = 0;
631                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
632                 r->res_first_lkid = 0;
633         }
634
635         if (from_local && (r->res_master_nodeid != our_nodeid)) {
636                 /* Because we have held no locks on this rsb,
637                    res_master_nodeid could have become stale. */
638                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
639                 r->res_first_lkid = 0;
640         }
641
642         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
643         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
644         goto out_unlock;
645
646
647  do_new:
648         /*
649          * rsb not found
650          */
651
652         if (error == -EBADR && !create)
653                 goto out_unlock;
654
655         error = get_rsb_struct(ls, name, len, &r);
656         if (error == -EAGAIN) {
657                 spin_unlock(&ls->ls_rsbtbl[b].lock);
658                 goto retry;
659         }
660         if (error)
661                 goto out_unlock;
662
663         r->res_hash = hash;
664         r->res_bucket = b;
665         r->res_dir_nodeid = dir_nodeid;
666         kref_init(&r->res_ref);
667
668         if (from_dir) {
669                 /* want to see how often this happens */
670                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
671                           from_nodeid, r->res_name);
672                 r->res_master_nodeid = our_nodeid;
673                 r->res_nodeid = 0;
674                 goto out_add;
675         }
676
677         if (from_other && (dir_nodeid != our_nodeid)) {
678                 /* should never happen */
679                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
680                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
681                 dlm_free_rsb(r);
682                 r = NULL;
683                 error = -ENOTBLK;
684                 goto out_unlock;
685         }
686
687         if (from_other) {
688                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
689                           from_nodeid, dir_nodeid, r->res_name);
690         }
691
692         if (dir_nodeid == our_nodeid) {
693                 /* When we are the dir nodeid, we can set the master
694                    node immediately */
695                 r->res_master_nodeid = our_nodeid;
696                 r->res_nodeid = 0;
697         } else {
698                 /* set_master will send_lookup to dir_nodeid */
699                 r->res_master_nodeid = 0;
700                 r->res_nodeid = -1;
701         }
702
703  out_add:
704         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
705  out_unlock:
706         spin_unlock(&ls->ls_rsbtbl[b].lock);
707  out:
708         *r_ret = r;
709         return error;
710 }
711
712 /* During recovery, other nodes can send us new MSTCPY locks (from
713    dlm_recover_locks) before we've made ourself master (in
714    dlm_recover_masters). */
715
716 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
717                           uint32_t hash, uint32_t b,
718                           int dir_nodeid, int from_nodeid,
719                           unsigned int flags, struct dlm_rsb **r_ret)
720 {
721         struct dlm_rsb *r = NULL;
722         int our_nodeid = dlm_our_nodeid();
723         int recover = (flags & R_RECEIVE_RECOVER);
724         int error;
725
726  retry:
727         error = pre_rsb_struct(ls);
728         if (error < 0)
729                 goto out;
730
731         spin_lock(&ls->ls_rsbtbl[b].lock);
732
733         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
734         if (error)
735                 goto do_toss;
736
737         /*
738          * rsb is active, so we can't check master_nodeid without lock_rsb.
739          */
740
741         kref_get(&r->res_ref);
742         goto out_unlock;
743
744
745  do_toss:
746         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
747         if (error)
748                 goto do_new;
749
750         /*
751          * rsb found inactive. No other thread is using this rsb because
752          * it's on the toss list, so we can look at or update
753          * res_master_nodeid without lock_rsb.
754          */
755
756         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
757                 /* our rsb is not master, and another node has sent us a
758                    request; this should never happen */
759                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
760                           from_nodeid, r->res_master_nodeid, dir_nodeid);
761                 dlm_print_rsb(r);
762                 error = -ENOTBLK;
763                 goto out_unlock;
764         }
765
766         if (!recover && (r->res_master_nodeid != our_nodeid) &&
767             (dir_nodeid == our_nodeid)) {
768                 /* our rsb is not master, and we are dir; may as well fix it;
769                    this should never happen */
770                 log_error(ls, "find_rsb toss our %d master %d dir %d",
771                           our_nodeid, r->res_master_nodeid, dir_nodeid);
772                 dlm_print_rsb(r);
773                 r->res_master_nodeid = our_nodeid;
774                 r->res_nodeid = 0;
775         }
776
777         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
778         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
779         goto out_unlock;
780
781
782  do_new:
783         /*
784          * rsb not found
785          */
786
787         error = get_rsb_struct(ls, name, len, &r);
788         if (error == -EAGAIN) {
789                 spin_unlock(&ls->ls_rsbtbl[b].lock);
790                 goto retry;
791         }
792         if (error)
793                 goto out_unlock;
794
795         r->res_hash = hash;
796         r->res_bucket = b;
797         r->res_dir_nodeid = dir_nodeid;
798         r->res_master_nodeid = dir_nodeid;
799         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
800         kref_init(&r->res_ref);
801
802         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
803  out_unlock:
804         spin_unlock(&ls->ls_rsbtbl[b].lock);
805  out:
806         *r_ret = r;
807         return error;
808 }
809
810 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
811                     int from_nodeid, unsigned int flags,
812                     struct dlm_rsb **r_ret)
813 {
814         uint32_t hash, b;
815         int dir_nodeid;
816
817         if (len > DLM_RESNAME_MAXLEN)
818                 return -EINVAL;
819
820         hash = jhash(name, len, 0);
821         b = hash & (ls->ls_rsbtbl_size - 1);
822
823         dir_nodeid = dlm_hash2nodeid(ls, hash);
824
825         if (dlm_no_directory(ls))
826                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
827                                       from_nodeid, flags, r_ret);
828         else
829                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
830                                       from_nodeid, flags, r_ret);
831 }
832
833 /* we have received a request and found that res_master_nodeid != our_nodeid,
834    so we need to return an error or make ourself the master */
835
836 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
837                                   int from_nodeid)
838 {
839         if (dlm_no_directory(ls)) {
840                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
841                           from_nodeid, r->res_master_nodeid,
842                           r->res_dir_nodeid);
843                 dlm_print_rsb(r);
844                 return -ENOTBLK;
845         }
846
847         if (from_nodeid != r->res_dir_nodeid) {
848                 /* our rsb is not master, and another node (not the dir node)
849                    has sent us a request.  this is much more common when our
850                    master_nodeid is zero, so limit debug to non-zero.  */
851
852                 if (r->res_master_nodeid) {
853                         log_debug(ls, "validate master from_other %d master %d "
854                                   "dir %d first %x %s", from_nodeid,
855                                   r->res_master_nodeid, r->res_dir_nodeid,
856                                   r->res_first_lkid, r->res_name);
857                 }
858                 return -ENOTBLK;
859         } else {
860                 /* our rsb is not master, but the dir nodeid has sent us a
861                    request; this could happen with master 0 / res_nodeid -1 */
862
863                 if (r->res_master_nodeid) {
864                         log_error(ls, "validate master from_dir %d master %d "
865                                   "first %x %s",
866                                   from_nodeid, r->res_master_nodeid,
867                                   r->res_first_lkid, r->res_name);
868                 }
869
870                 r->res_master_nodeid = dlm_our_nodeid();
871                 r->res_nodeid = 0;
872                 return 0;
873         }
874 }
875
876 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
877                                 int from_nodeid, bool toss_list, unsigned int flags,
878                                 int *r_nodeid, int *result)
879 {
880         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
881         int from_master = (flags & DLM_LU_RECOVER_DIR);
882
883         if (r->res_dir_nodeid != our_nodeid) {
884                 /* should not happen, but may as well fix it and carry on */
885                 log_error(ls, "%s res_dir %d our %d %s", __func__,
886                           r->res_dir_nodeid, our_nodeid, r->res_name);
887                 r->res_dir_nodeid = our_nodeid;
888         }
889
890         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
891                 /* Recovery uses this function to set a new master when
892                  * the previous master failed.  Setting NEW_MASTER will
893                  * force dlm_recover_masters to call recover_master on this
894                  * rsb even though the res_nodeid is no longer removed.
895                  */
896
897                 r->res_master_nodeid = from_nodeid;
898                 r->res_nodeid = from_nodeid;
899                 rsb_set_flag(r, RSB_NEW_MASTER);
900
901                 if (toss_list) {
902                         /* I don't think we should ever find it on toss list. */
903                         log_error(ls, "%s fix_master on toss", __func__);
904                         dlm_dump_rsb(r);
905                 }
906         }
907
908         if (from_master && (r->res_master_nodeid != from_nodeid)) {
909                 /* this will happen if from_nodeid became master during
910                  * a previous recovery cycle, and we aborted the previous
911                  * cycle before recovering this master value
912                  */
913
914                 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
915                           __func__, from_nodeid, r->res_master_nodeid,
916                           r->res_nodeid, r->res_first_lkid, r->res_name);
917
918                 if (r->res_master_nodeid == our_nodeid) {
919                         log_error(ls, "from_master %d our_master", from_nodeid);
920                         dlm_dump_rsb(r);
921                         goto ret_assign;
922                 }
923
924                 r->res_master_nodeid = from_nodeid;
925                 r->res_nodeid = from_nodeid;
926                 rsb_set_flag(r, RSB_NEW_MASTER);
927         }
928
929         if (!r->res_master_nodeid) {
930                 /* this will happen if recovery happens while we're looking
931                  * up the master for this rsb
932                  */
933
934                 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
935                           from_nodeid, r->res_first_lkid, r->res_name);
936                 r->res_master_nodeid = from_nodeid;
937                 r->res_nodeid = from_nodeid;
938         }
939
940         if (!from_master && !fix_master &&
941             (r->res_master_nodeid == from_nodeid)) {
942                 /* this can happen when the master sends remove, the dir node
943                  * finds the rsb on the keep list and ignores the remove,
944                  * and the former master sends a lookup
945                  */
946
947                 log_limit(ls, "%s from master %d flags %x first %x %s",
948                           __func__, from_nodeid, flags, r->res_first_lkid,
949                           r->res_name);
950         }
951
952  ret_assign:
953         *r_nodeid = r->res_master_nodeid;
954         if (result)
955                 *result = DLM_LU_MATCH;
956 }
957
958 /*
959  * We're the dir node for this res and another node wants to know the
960  * master nodeid.  During normal operation (non recovery) this is only
961  * called from receive_lookup(); master lookups when the local node is
962  * the dir node are done by find_rsb().
963  *
964  * normal operation, we are the dir node for a resource
965  * . _request_lock
966  * . set_master
967  * . send_lookup
968  * . receive_lookup
969  * . dlm_master_lookup flags 0
970  *
971  * recover directory, we are rebuilding dir for all resources
972  * . dlm_recover_directory
973  * . dlm_rcom_names
974  *   remote node sends back the rsb names it is master of and we are dir of
975  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
976  *   we either create new rsb setting remote node as master, or find existing
977  *   rsb and set master to be the remote node.
978  *
979  * recover masters, we are finding the new master for resources
980  * . dlm_recover_masters
981  * . recover_master
982  * . dlm_send_rcom_lookup
983  * . receive_rcom_lookup
984  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
985  */
986
987 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
988                       int len, unsigned int flags, int *r_nodeid, int *result)
989 {
990         struct dlm_rsb *r = NULL;
991         uint32_t hash, b;
992         int our_nodeid = dlm_our_nodeid();
993         int dir_nodeid, error;
994
995         if (len > DLM_RESNAME_MAXLEN)
996                 return -EINVAL;
997
998         if (from_nodeid == our_nodeid) {
999                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1000                           our_nodeid, flags);
1001                 return -EINVAL;
1002         }
1003
1004         hash = jhash(name, len, 0);
1005         b = hash & (ls->ls_rsbtbl_size - 1);
1006
1007         dir_nodeid = dlm_hash2nodeid(ls, hash);
1008         if (dir_nodeid != our_nodeid) {
1009                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1010                           from_nodeid, dir_nodeid, our_nodeid, hash,
1011                           ls->ls_num_nodes);
1012                 *r_nodeid = -1;
1013                 return -EINVAL;
1014         }
1015
1016  retry:
1017         error = pre_rsb_struct(ls);
1018         if (error < 0)
1019                 return error;
1020
1021         spin_lock(&ls->ls_rsbtbl[b].lock);
1022         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1023         if (!error) {
1024                 /* because the rsb is active, we need to lock_rsb before
1025                  * checking/changing re_master_nodeid
1026                  */
1027
1028                 hold_rsb(r);
1029                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1030                 lock_rsb(r);
1031
1032                 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1033                                     flags, r_nodeid, result);
1034
1035                 /* the rsb was active */
1036                 unlock_rsb(r);
1037                 put_rsb(r);
1038
1039                 return 0;
1040         }
1041
1042         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1043         if (error)
1044                 goto not_found;
1045
1046         /* because the rsb is inactive (on toss list), it's not refcounted
1047          * and lock_rsb is not used, but is protected by the rsbtbl lock
1048          */
1049
1050         __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1051                             r_nodeid, result);
1052
1053         r->res_toss_time = jiffies;
1054         /* the rsb was inactive (on toss list) */
1055         spin_unlock(&ls->ls_rsbtbl[b].lock);
1056
1057         return 0;
1058
1059  not_found:
1060         error = get_rsb_struct(ls, name, len, &r);
1061         if (error == -EAGAIN) {
1062                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1063                 goto retry;
1064         }
1065         if (error)
1066                 goto out_unlock;
1067
1068         r->res_hash = hash;
1069         r->res_bucket = b;
1070         r->res_dir_nodeid = our_nodeid;
1071         r->res_master_nodeid = from_nodeid;
1072         r->res_nodeid = from_nodeid;
1073         kref_init(&r->res_ref);
1074         r->res_toss_time = jiffies;
1075
1076         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1077         if (error) {
1078                 /* should never happen */
1079                 dlm_free_rsb(r);
1080                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1081                 goto retry;
1082         }
1083
1084         if (result)
1085                 *result = DLM_LU_ADD;
1086         *r_nodeid = from_nodeid;
1087  out_unlock:
1088         spin_unlock(&ls->ls_rsbtbl[b].lock);
1089         return error;
1090 }
1091
1092 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1093 {
1094         struct rb_node *n;
1095         struct dlm_rsb *r;
1096         int i;
1097
1098         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1099                 spin_lock(&ls->ls_rsbtbl[i].lock);
1100                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1101                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1102                         if (r->res_hash == hash)
1103                                 dlm_dump_rsb(r);
1104                 }
1105                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1106         }
1107 }
1108
1109 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1110 {
1111         struct dlm_rsb *r = NULL;
1112         uint32_t hash, b;
1113         int error;
1114
1115         hash = jhash(name, len, 0);
1116         b = hash & (ls->ls_rsbtbl_size - 1);
1117
1118         spin_lock(&ls->ls_rsbtbl[b].lock);
1119         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1120         if (!error)
1121                 goto out_dump;
1122
1123         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1124         if (error)
1125                 goto out;
1126  out_dump:
1127         dlm_dump_rsb(r);
1128  out:
1129         spin_unlock(&ls->ls_rsbtbl[b].lock);
1130 }
1131
1132 static void toss_rsb(struct kref *kref)
1133 {
1134         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1135         struct dlm_ls *ls = r->res_ls;
1136
1137         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1138         kref_init(&r->res_ref);
1139         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1140         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1141         r->res_toss_time = jiffies;
1142         set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
1143         if (r->res_lvbptr) {
1144                 dlm_free_lvb(r->res_lvbptr);
1145                 r->res_lvbptr = NULL;
1146         }
1147 }
1148
1149 /* See comment for unhold_lkb */
1150
1151 static void unhold_rsb(struct dlm_rsb *r)
1152 {
1153         int rv;
1154         rv = kref_put(&r->res_ref, toss_rsb);
1155         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1156 }
1157
1158 static void kill_rsb(struct kref *kref)
1159 {
1160         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1161
1162         /* All work is done after the return from kref_put() so we
1163            can release the write_lock before the remove and free. */
1164
1165         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1166         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1167         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1168         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1169         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1170         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1171 }
1172
1173 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1174    The rsb must exist as long as any lkb's for it do. */
1175
1176 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1177 {
1178         hold_rsb(r);
1179         lkb->lkb_resource = r;
1180 }
1181
1182 static void detach_lkb(struct dlm_lkb *lkb)
1183 {
1184         if (lkb->lkb_resource) {
1185                 put_rsb(lkb->lkb_resource);
1186                 lkb->lkb_resource = NULL;
1187         }
1188 }
1189
1190 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1191                        int start, int end)
1192 {
1193         struct dlm_lkb *lkb;
1194         int rv;
1195
1196         lkb = dlm_allocate_lkb(ls);
1197         if (!lkb)
1198                 return -ENOMEM;
1199
1200         lkb->lkb_last_bast_mode = -1;
1201         lkb->lkb_nodeid = -1;
1202         lkb->lkb_grmode = DLM_LOCK_IV;
1203         kref_init(&lkb->lkb_ref);
1204         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1205         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1206         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1207         INIT_LIST_HEAD(&lkb->lkb_callbacks);
1208         spin_lock_init(&lkb->lkb_cb_lock);
1209         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1210
1211         idr_preload(GFP_NOFS);
1212         spin_lock(&ls->ls_lkbidr_spin);
1213         rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1214         if (rv >= 0)
1215                 lkb->lkb_id = rv;
1216         spin_unlock(&ls->ls_lkbidr_spin);
1217         idr_preload_end();
1218
1219         if (rv < 0) {
1220                 log_error(ls, "create_lkb idr error %d", rv);
1221                 dlm_free_lkb(lkb);
1222                 return rv;
1223         }
1224
1225         *lkb_ret = lkb;
1226         return 0;
1227 }
1228
1229 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1230 {
1231         return _create_lkb(ls, lkb_ret, 1, 0);
1232 }
1233
1234 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1235 {
1236         struct dlm_lkb *lkb;
1237
1238         spin_lock(&ls->ls_lkbidr_spin);
1239         lkb = idr_find(&ls->ls_lkbidr, lkid);
1240         if (lkb)
1241                 kref_get(&lkb->lkb_ref);
1242         spin_unlock(&ls->ls_lkbidr_spin);
1243
1244         *lkb_ret = lkb;
1245         return lkb ? 0 : -ENOENT;
1246 }
1247
1248 static void kill_lkb(struct kref *kref)
1249 {
1250         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1251
1252         /* All work is done after the return from kref_put() so we
1253            can release the write_lock before the detach_lkb */
1254
1255         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1256 }
1257
1258 /* __put_lkb() is used when an lkb may not have an rsb attached to
1259    it so we need to provide the lockspace explicitly */
1260
1261 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1262 {
1263         uint32_t lkid = lkb->lkb_id;
1264         int rv;
1265
1266         rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1267                            &ls->ls_lkbidr_spin);
1268         if (rv) {
1269                 idr_remove(&ls->ls_lkbidr, lkid);
1270                 spin_unlock(&ls->ls_lkbidr_spin);
1271
1272                 detach_lkb(lkb);
1273
1274                 /* for local/process lkbs, lvbptr points to caller's lksb */
1275                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1276                         dlm_free_lvb(lkb->lkb_lvbptr);
1277                 dlm_free_lkb(lkb);
1278         }
1279
1280         return rv;
1281 }
1282
1283 int dlm_put_lkb(struct dlm_lkb *lkb)
1284 {
1285         struct dlm_ls *ls;
1286
1287         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1288         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1289
1290         ls = lkb->lkb_resource->res_ls;
1291         return __put_lkb(ls, lkb);
1292 }
1293
1294 /* This is only called to add a reference when the code already holds
1295    a valid reference to the lkb, so there's no need for locking. */
1296
1297 static inline void hold_lkb(struct dlm_lkb *lkb)
1298 {
1299         kref_get(&lkb->lkb_ref);
1300 }
1301
1302 static void unhold_lkb_assert(struct kref *kref)
1303 {
1304         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1305
1306         DLM_ASSERT(false, dlm_print_lkb(lkb););
1307 }
1308
1309 /* This is called when we need to remove a reference and are certain
1310    it's not the last ref.  e.g. del_lkb is always called between a
1311    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1312    put_lkb would work fine, but would involve unnecessary locking */
1313
1314 static inline void unhold_lkb(struct dlm_lkb *lkb)
1315 {
1316         kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1317 }
1318
1319 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1320                             int mode)
1321 {
1322         struct dlm_lkb *lkb = NULL, *iter;
1323
1324         list_for_each_entry(iter, head, lkb_statequeue)
1325                 if (iter->lkb_rqmode < mode) {
1326                         lkb = iter;
1327                         list_add_tail(new, &iter->lkb_statequeue);
1328                         break;
1329                 }
1330
1331         if (!lkb)
1332                 list_add_tail(new, head);
1333 }
1334
1335 /* add/remove lkb to rsb's grant/convert/wait queue */
1336
1337 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1338 {
1339         kref_get(&lkb->lkb_ref);
1340
1341         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1342
1343         lkb->lkb_timestamp = ktime_get();
1344
1345         lkb->lkb_status = status;
1346
1347         switch (status) {
1348         case DLM_LKSTS_WAITING:
1349                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1350                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1351                 else
1352                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1353                 break;
1354         case DLM_LKSTS_GRANTED:
1355                 /* convention says granted locks kept in order of grmode */
1356                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1357                                 lkb->lkb_grmode);
1358                 break;
1359         case DLM_LKSTS_CONVERT:
1360                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1362                 else
1363                         list_add_tail(&lkb->lkb_statequeue,
1364                                       &r->res_convertqueue);
1365                 break;
1366         default:
1367                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1368         }
1369 }
1370
1371 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1372 {
1373         lkb->lkb_status = 0;
1374         list_del(&lkb->lkb_statequeue);
1375         unhold_lkb(lkb);
1376 }
1377
1378 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1379 {
1380         hold_lkb(lkb);
1381         del_lkb(r, lkb);
1382         add_lkb(r, lkb, sts);
1383         unhold_lkb(lkb);
1384 }
1385
1386 static int msg_reply_type(int mstype)
1387 {
1388         switch (mstype) {
1389         case DLM_MSG_REQUEST:
1390                 return DLM_MSG_REQUEST_REPLY;
1391         case DLM_MSG_CONVERT:
1392                 return DLM_MSG_CONVERT_REPLY;
1393         case DLM_MSG_UNLOCK:
1394                 return DLM_MSG_UNLOCK_REPLY;
1395         case DLM_MSG_CANCEL:
1396                 return DLM_MSG_CANCEL_REPLY;
1397         case DLM_MSG_LOOKUP:
1398                 return DLM_MSG_LOOKUP_REPLY;
1399         }
1400         return -1;
1401 }
1402
1403 /* add/remove lkb from global waiters list of lkb's waiting for
1404    a reply from a remote node */
1405
1406 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1407 {
1408         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1409         int error = 0;
1410         int wc;
1411
1412         mutex_lock(&ls->ls_waiters_mutex);
1413
1414         if (is_overlap_unlock(lkb) ||
1415             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1416                 error = -EINVAL;
1417                 goto out;
1418         }
1419
1420         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1421                 switch (mstype) {
1422                 case DLM_MSG_UNLOCK:
1423                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1424                         break;
1425                 case DLM_MSG_CANCEL:
1426                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1427                         break;
1428                 default:
1429                         error = -EBUSY;
1430                         goto out;
1431                 }
1432                 wc = atomic_inc_return(&lkb->lkb_wait_count);
1433                 hold_lkb(lkb);
1434
1435                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1436                           lkb->lkb_id, lkb->lkb_wait_type, mstype, wc,
1437                           dlm_iflags_val(lkb));
1438                 goto out;
1439         }
1440
1441         wc = atomic_fetch_inc(&lkb->lkb_wait_count);
1442         DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc););
1443         lkb->lkb_wait_type = mstype;
1444         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1445         hold_lkb(lkb);
1446         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1447  out:
1448         if (error)
1449                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1450                           lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1451                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1452         mutex_unlock(&ls->ls_waiters_mutex);
1453         return error;
1454 }
1455
1456 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1457    list as part of process_requestqueue (e.g. a lookup that has an optimized
1458    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1459    set RESEND and dlm_recover_waiters_post() */
1460
1461 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1462                                 const struct dlm_message *ms)
1463 {
1464         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1465         int overlap_done = 0;
1466
1467         if (mstype == DLM_MSG_UNLOCK_REPLY &&
1468             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1469                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1470                 overlap_done = 1;
1471                 goto out_del;
1472         }
1473
1474         if (mstype == DLM_MSG_CANCEL_REPLY &&
1475             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1476                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1477                 overlap_done = 1;
1478                 goto out_del;
1479         }
1480
1481         /* Cancel state was preemptively cleared by a successful convert,
1482            see next comment, nothing to do. */
1483
1484         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1485             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1486                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1487                           lkb->lkb_id, lkb->lkb_wait_type);
1488                 return -1;
1489         }
1490
1491         /* Remove for the convert reply, and premptively remove for the
1492            cancel reply.  A convert has been granted while there's still
1493            an outstanding cancel on it (the cancel is moot and the result
1494            in the cancel reply should be 0).  We preempt the cancel reply
1495            because the app gets the convert result and then can follow up
1496            with another op, like convert.  This subsequent op would see the
1497            lingering state of the cancel and fail with -EBUSY. */
1498
1499         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1500             (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1501             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1502                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1503                           lkb->lkb_id);
1504                 lkb->lkb_wait_type = 0;
1505                 atomic_dec(&lkb->lkb_wait_count);
1506                 unhold_lkb(lkb);
1507                 goto out_del;
1508         }
1509
1510         /* N.B. type of reply may not always correspond to type of original
1511            msg due to lookup->request optimization, verify others? */
1512
1513         if (lkb->lkb_wait_type) {
1514                 lkb->lkb_wait_type = 0;
1515                 goto out_del;
1516         }
1517
1518         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1519                   lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1520                   lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1521         return -1;
1522
1523  out_del:
1524         /* the force-unlock/cancel has completed and we haven't recvd a reply
1525            to the op that was in progress prior to the unlock/cancel; we
1526            give up on any reply to the earlier op.  FIXME: not sure when/how
1527            this would happen */
1528
1529         if (overlap_done && lkb->lkb_wait_type) {
1530                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1531                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1532                 atomic_dec(&lkb->lkb_wait_count);
1533                 unhold_lkb(lkb);
1534                 lkb->lkb_wait_type = 0;
1535         }
1536
1537         DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb););
1538
1539         clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1540         if (atomic_dec_and_test(&lkb->lkb_wait_count))
1541                 list_del_init(&lkb->lkb_wait_reply);
1542         unhold_lkb(lkb);
1543         return 0;
1544 }
1545
1546 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1547 {
1548         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1549         int error;
1550
1551         mutex_lock(&ls->ls_waiters_mutex);
1552         error = _remove_from_waiters(lkb, mstype, NULL);
1553         mutex_unlock(&ls->ls_waiters_mutex);
1554         return error;
1555 }
1556
1557 /* Handles situations where we might be processing a "fake" or "local" reply in
1558    which we can't try to take waiters_mutex again. */
1559
1560 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1561                                   const struct dlm_message *ms, bool local)
1562 {
1563         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1564         int error;
1565
1566         if (!local)
1567                 mutex_lock(&ls->ls_waiters_mutex);
1568         error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1569         if (!local)
1570                 mutex_unlock(&ls->ls_waiters_mutex);
1571         return error;
1572 }
1573
1574 static void shrink_bucket(struct dlm_ls *ls, int b)
1575 {
1576         struct rb_node *n, *next;
1577         struct dlm_rsb *r;
1578         char *name;
1579         int our_nodeid = dlm_our_nodeid();
1580         int remote_count = 0;
1581         int need_shrink = 0;
1582         int i, len, rv;
1583
1584         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1585
1586         spin_lock(&ls->ls_rsbtbl[b].lock);
1587
1588         if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
1589                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1590                 return;
1591         }
1592
1593         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1594                 next = rb_next(n);
1595                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1596
1597                 /* If we're the directory record for this rsb, and
1598                    we're not the master of it, then we need to wait
1599                    for the master node to send us a dir remove for
1600                    before removing the dir record. */
1601
1602                 if (!dlm_no_directory(ls) &&
1603                     (r->res_master_nodeid != our_nodeid) &&
1604                     (dlm_dir_nodeid(r) == our_nodeid)) {
1605                         continue;
1606                 }
1607
1608                 need_shrink = 1;
1609
1610                 if (!time_after_eq(jiffies, r->res_toss_time +
1611                                    dlm_config.ci_toss_secs * HZ)) {
1612                         continue;
1613                 }
1614
1615                 if (!dlm_no_directory(ls) &&
1616                     (r->res_master_nodeid == our_nodeid) &&
1617                     (dlm_dir_nodeid(r) != our_nodeid)) {
1618
1619                         /* We're the master of this rsb but we're not
1620                            the directory record, so we need to tell the
1621                            dir node to remove the dir record. */
1622
1623                         ls->ls_remove_lens[remote_count] = r->res_length;
1624                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1625                                DLM_RESNAME_MAXLEN);
1626                         remote_count++;
1627
1628                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1629                                 break;
1630                         continue;
1631                 }
1632
1633                 if (!kref_put(&r->res_ref, kill_rsb)) {
1634                         log_error(ls, "tossed rsb in use %s", r->res_name);
1635                         continue;
1636                 }
1637
1638                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1639                 dlm_free_rsb(r);
1640         }
1641
1642         if (need_shrink)
1643                 set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1644         else
1645                 clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1646         spin_unlock(&ls->ls_rsbtbl[b].lock);
1647
1648         /*
1649          * While searching for rsb's to free, we found some that require
1650          * remote removal.  We leave them in place and find them again here
1651          * so there is a very small gap between removing them from the toss
1652          * list and sending the removal.  Keeping this gap small is
1653          * important to keep us (the master node) from being out of sync
1654          * with the remote dir node for very long.
1655          */
1656
1657         for (i = 0; i < remote_count; i++) {
1658                 name = ls->ls_remove_names[i];
1659                 len = ls->ls_remove_lens[i];
1660
1661                 spin_lock(&ls->ls_rsbtbl[b].lock);
1662                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1663                 if (rv) {
1664                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1665                         log_debug(ls, "remove_name not toss %s", name);
1666                         continue;
1667                 }
1668
1669                 if (r->res_master_nodeid != our_nodeid) {
1670                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1671                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1672                                   r->res_master_nodeid, r->res_dir_nodeid,
1673                                   our_nodeid, name);
1674                         continue;
1675                 }
1676
1677                 if (r->res_dir_nodeid == our_nodeid) {
1678                         /* should never happen */
1679                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1680                         log_error(ls, "remove_name dir %d master %d our %d %s",
1681                                   r->res_dir_nodeid, r->res_master_nodeid,
1682                                   our_nodeid, name);
1683                         continue;
1684                 }
1685
1686                 if (!time_after_eq(jiffies, r->res_toss_time +
1687                                    dlm_config.ci_toss_secs * HZ)) {
1688                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1689                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1690                                   r->res_toss_time, jiffies, name);
1691                         continue;
1692                 }
1693
1694                 if (!kref_put(&r->res_ref, kill_rsb)) {
1695                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1696                         log_error(ls, "remove_name in use %s", name);
1697                         continue;
1698                 }
1699
1700                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1701                 send_remove(r);
1702                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1703
1704                 dlm_free_rsb(r);
1705         }
1706 }
1707
1708 void dlm_scan_rsbs(struct dlm_ls *ls)
1709 {
1710         int i;
1711
1712         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1713                 shrink_bucket(ls, i);
1714                 if (dlm_locking_stopped(ls))
1715                         break;
1716                 cond_resched();
1717         }
1718 }
1719
1720 /* lkb is master or local copy */
1721
1722 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723 {
1724         int b, len = r->res_ls->ls_lvblen;
1725
1726         /* b=1 lvb returned to caller
1727            b=0 lvb written to rsb or invalidated
1728            b=-1 do nothing */
1729
1730         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1731
1732         if (b == 1) {
1733                 if (!lkb->lkb_lvbptr)
1734                         return;
1735
1736                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1737                         return;
1738
1739                 if (!r->res_lvbptr)
1740                         return;
1741
1742                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1743                 lkb->lkb_lvbseq = r->res_lvbseq;
1744
1745         } else if (b == 0) {
1746                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1747                         rsb_set_flag(r, RSB_VALNOTVALID);
1748                         return;
1749                 }
1750
1751                 if (!lkb->lkb_lvbptr)
1752                         return;
1753
1754                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1755                         return;
1756
1757                 if (!r->res_lvbptr)
1758                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1759
1760                 if (!r->res_lvbptr)
1761                         return;
1762
1763                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1764                 r->res_lvbseq++;
1765                 lkb->lkb_lvbseq = r->res_lvbseq;
1766                 rsb_clear_flag(r, RSB_VALNOTVALID);
1767         }
1768
1769         if (rsb_flag(r, RSB_VALNOTVALID))
1770                 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1771 }
1772
1773 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1774 {
1775         if (lkb->lkb_grmode < DLM_LOCK_PW)
1776                 return;
1777
1778         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1779                 rsb_set_flag(r, RSB_VALNOTVALID);
1780                 return;
1781         }
1782
1783         if (!lkb->lkb_lvbptr)
1784                 return;
1785
1786         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1787                 return;
1788
1789         if (!r->res_lvbptr)
1790                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1791
1792         if (!r->res_lvbptr)
1793                 return;
1794
1795         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1796         r->res_lvbseq++;
1797         rsb_clear_flag(r, RSB_VALNOTVALID);
1798 }
1799
1800 /* lkb is process copy (pc) */
1801
1802 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1803                             const struct dlm_message *ms)
1804 {
1805         int b;
1806
1807         if (!lkb->lkb_lvbptr)
1808                 return;
1809
1810         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1811                 return;
1812
1813         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1814         if (b == 1) {
1815                 int len = receive_extralen(ms);
1816                 if (len > r->res_ls->ls_lvblen)
1817                         len = r->res_ls->ls_lvblen;
1818                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1819                 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1820         }
1821 }
1822
1823 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1824    remove_lock -- used for unlock, removes lkb from granted
1825    revert_lock -- used for cancel, moves lkb from convert to granted
1826    grant_lock  -- used for request and convert, adds lkb to granted or
1827                   moves lkb from convert or waiting to granted
1828
1829    Each of these is used for master or local copy lkb's.  There is
1830    also a _pc() variation used to make the corresponding change on
1831    a process copy (pc) lkb. */
1832
1833 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1834 {
1835         del_lkb(r, lkb);
1836         lkb->lkb_grmode = DLM_LOCK_IV;
1837         /* this unhold undoes the original ref from create_lkb()
1838            so this leads to the lkb being freed */
1839         unhold_lkb(lkb);
1840 }
1841
1842 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1843 {
1844         set_lvb_unlock(r, lkb);
1845         _remove_lock(r, lkb);
1846 }
1847
1848 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1849 {
1850         _remove_lock(r, lkb);
1851 }
1852
1853 /* returns: 0 did nothing
1854             1 moved lock to granted
1855            -1 removed lock */
1856
1857 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1858 {
1859         int rv = 0;
1860
1861         lkb->lkb_rqmode = DLM_LOCK_IV;
1862
1863         switch (lkb->lkb_status) {
1864         case DLM_LKSTS_GRANTED:
1865                 break;
1866         case DLM_LKSTS_CONVERT:
1867                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1868                 rv = 1;
1869                 break;
1870         case DLM_LKSTS_WAITING:
1871                 del_lkb(r, lkb);
1872                 lkb->lkb_grmode = DLM_LOCK_IV;
1873                 /* this unhold undoes the original ref from create_lkb()
1874                    so this leads to the lkb being freed */
1875                 unhold_lkb(lkb);
1876                 rv = -1;
1877                 break;
1878         default:
1879                 log_print("invalid status for revert %d", lkb->lkb_status);
1880         }
1881         return rv;
1882 }
1883
1884 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1885 {
1886         return revert_lock(r, lkb);
1887 }
1888
1889 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1890 {
1891         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1892                 lkb->lkb_grmode = lkb->lkb_rqmode;
1893                 if (lkb->lkb_status)
1894                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1895                 else
1896                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1897         }
1898
1899         lkb->lkb_rqmode = DLM_LOCK_IV;
1900         lkb->lkb_highbast = 0;
1901 }
1902
1903 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1904 {
1905         set_lvb_lock(r, lkb);
1906         _grant_lock(r, lkb);
1907 }
1908
1909 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1910                           const struct dlm_message *ms)
1911 {
1912         set_lvb_lock_pc(r, lkb, ms);
1913         _grant_lock(r, lkb);
1914 }
1915
1916 /* called by grant_pending_locks() which means an async grant message must
1917    be sent to the requesting node in addition to granting the lock if the
1918    lkb belongs to a remote node. */
1919
1920 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1921 {
1922         grant_lock(r, lkb);
1923         if (is_master_copy(lkb))
1924                 send_grant(r, lkb);
1925         else
1926                 queue_cast(r, lkb, 0);
1927 }
1928
1929 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1930    change the granted/requested modes.  We're munging things accordingly in
1931    the process copy.
1932    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1933    conversion deadlock
1934    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1935    compatible with other granted locks */
1936
1937 static void munge_demoted(struct dlm_lkb *lkb)
1938 {
1939         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1940                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1941                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1942                 return;
1943         }
1944
1945         lkb->lkb_grmode = DLM_LOCK_NL;
1946 }
1947
1948 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
1949 {
1950         if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
1951             ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
1952                 log_print("munge_altmode %x invalid reply type %d",
1953                           lkb->lkb_id, le32_to_cpu(ms->m_type));
1954                 return;
1955         }
1956
1957         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1958                 lkb->lkb_rqmode = DLM_LOCK_PR;
1959         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1960                 lkb->lkb_rqmode = DLM_LOCK_CW;
1961         else {
1962                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1963                 dlm_print_lkb(lkb);
1964         }
1965 }
1966
1967 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1968 {
1969         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1970                                            lkb_statequeue);
1971         if (lkb->lkb_id == first->lkb_id)
1972                 return 1;
1973
1974         return 0;
1975 }
1976
1977 /* Check if the given lkb conflicts with another lkb on the queue. */
1978
1979 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1980 {
1981         struct dlm_lkb *this;
1982
1983         list_for_each_entry(this, head, lkb_statequeue) {
1984                 if (this == lkb)
1985                         continue;
1986                 if (!modes_compat(this, lkb))
1987                         return 1;
1988         }
1989         return 0;
1990 }
1991
1992 /*
1993  * "A conversion deadlock arises with a pair of lock requests in the converting
1994  * queue for one resource.  The granted mode of each lock blocks the requested
1995  * mode of the other lock."
1996  *
1997  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1998  * convert queue from being granted, then deadlk/demote lkb.
1999  *
2000  * Example:
2001  * Granted Queue: empty
2002  * Convert Queue: NL->EX (first lock)
2003  *                PR->EX (second lock)
2004  *
2005  * The first lock can't be granted because of the granted mode of the second
2006  * lock and the second lock can't be granted because it's not first in the
2007  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2008  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2009  * flag set and return DEMOTED in the lksb flags.
2010  *
2011  * Originally, this function detected conv-deadlk in a more limited scope:
2012  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2013  * - if lkb1 was the first entry in the queue (not just earlier), and was
2014  *   blocked by the granted mode of lkb2, and there was nothing on the
2015  *   granted queue preventing lkb1 from being granted immediately, i.e.
2016  *   lkb2 was the only thing preventing lkb1 from being granted.
2017  *
2018  * That second condition meant we'd only say there was conv-deadlk if
2019  * resolving it (by demotion) would lead to the first lock on the convert
2020  * queue being granted right away.  It allowed conversion deadlocks to exist
2021  * between locks on the convert queue while they couldn't be granted anyway.
2022  *
2023  * Now, we detect and take action on conversion deadlocks immediately when
2024  * they're created, even if they may not be immediately consequential.  If
2025  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2026  * mode that would prevent lkb1's conversion from being granted, we do a
2027  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2028  * I think this means that the lkb_is_ahead condition below should always
2029  * be zero, i.e. there will never be conv-deadlk between two locks that are
2030  * both already on the convert queue.
2031  */
2032
2033 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2034 {
2035         struct dlm_lkb *lkb1;
2036         int lkb_is_ahead = 0;
2037
2038         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2039                 if (lkb1 == lkb2) {
2040                         lkb_is_ahead = 1;
2041                         continue;
2042                 }
2043
2044                 if (!lkb_is_ahead) {
2045                         if (!modes_compat(lkb2, lkb1))
2046                                 return 1;
2047                 } else {
2048                         if (!modes_compat(lkb2, lkb1) &&
2049                             !modes_compat(lkb1, lkb2))
2050                                 return 1;
2051                 }
2052         }
2053         return 0;
2054 }
2055
2056 /*
2057  * Return 1 if the lock can be granted, 0 otherwise.
2058  * Also detect and resolve conversion deadlocks.
2059  *
2060  * lkb is the lock to be granted
2061  *
2062  * now is 1 if the function is being called in the context of the
2063  * immediate request, it is 0 if called later, after the lock has been
2064  * queued.
2065  *
2066  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2067  * after recovery.
2068  *
2069  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2070  */
2071
2072 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2073                            int recover)
2074 {
2075         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2076
2077         /*
2078          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2079          * a new request for a NL mode lock being blocked.
2080          *
2081          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2082          * request, then it would be granted.  In essence, the use of this flag
2083          * tells the Lock Manager to expedite theis request by not considering
2084          * what may be in the CONVERTING or WAITING queues...  As of this
2085          * writing, the EXPEDITE flag can be used only with new requests for NL
2086          * mode locks.  This flag is not valid for conversion requests.
2087          *
2088          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2089          * conversion or used with a non-NL requested mode.  We also know an
2090          * EXPEDITE request is always granted immediately, so now must always
2091          * be 1.  The full condition to grant an expedite request: (now &&
2092          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2093          * therefore be shortened to just checking the flag.
2094          */
2095
2096         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2097                 return 1;
2098
2099         /*
2100          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2101          * added to the remaining conditions.
2102          */
2103
2104         if (queue_conflict(&r->res_grantqueue, lkb))
2105                 return 0;
2106
2107         /*
2108          * 6-3: By default, a conversion request is immediately granted if the
2109          * requested mode is compatible with the modes of all other granted
2110          * locks
2111          */
2112
2113         if (queue_conflict(&r->res_convertqueue, lkb))
2114                 return 0;
2115
2116         /*
2117          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2118          * locks for a recovered rsb, on which lkb's have been rebuilt.
2119          * The lkb's may have been rebuilt on the queues in a different
2120          * order than they were in on the previous master.  So, granting
2121          * queued conversions in order after recovery doesn't make sense
2122          * since the order hasn't been preserved anyway.  The new order
2123          * could also have created a new "in place" conversion deadlock.
2124          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2125          * After recovery, there would be no granted locks, and possibly
2126          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2127          * recovery, grant conversions without considering order.
2128          */
2129
2130         if (conv && recover)
2131                 return 1;
2132
2133         /*
2134          * 6-5: But the default algorithm for deciding whether to grant or
2135          * queue conversion requests does not by itself guarantee that such
2136          * requests are serviced on a "first come first serve" basis.  This, in
2137          * turn, can lead to a phenomenon known as "indefinate postponement".
2138          *
2139          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2140          * the system service employed to request a lock conversion.  This flag
2141          * forces certain conversion requests to be queued, even if they are
2142          * compatible with the granted modes of other locks on the same
2143          * resource.  Thus, the use of this flag results in conversion requests
2144          * being ordered on a "first come first servce" basis.
2145          *
2146          * DCT: This condition is all about new conversions being able to occur
2147          * "in place" while the lock remains on the granted queue (assuming
2148          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2149          * doesn't _have_ to go onto the convert queue where it's processed in
2150          * order.  The "now" variable is necessary to distinguish converts
2151          * being received and processed for the first time now, because once a
2152          * convert is moved to the conversion queue the condition below applies
2153          * requiring fifo granting.
2154          */
2155
2156         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2157                 return 1;
2158
2159         /*
2160          * Even if the convert is compat with all granted locks,
2161          * QUECVT forces it behind other locks on the convert queue.
2162          */
2163
2164         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2165                 if (list_empty(&r->res_convertqueue))
2166                         return 1;
2167                 else
2168                         return 0;
2169         }
2170
2171         /*
2172          * The NOORDER flag is set to avoid the standard vms rules on grant
2173          * order.
2174          */
2175
2176         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2177                 return 1;
2178
2179         /*
2180          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2181          * granted until all other conversion requests ahead of it are granted
2182          * and/or canceled.
2183          */
2184
2185         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2186                 return 1;
2187
2188         /*
2189          * 6-4: By default, a new request is immediately granted only if all
2190          * three of the following conditions are satisfied when the request is
2191          * issued:
2192          * - The queue of ungranted conversion requests for the resource is
2193          *   empty.
2194          * - The queue of ungranted new requests for the resource is empty.
2195          * - The mode of the new request is compatible with the most
2196          *   restrictive mode of all granted locks on the resource.
2197          */
2198
2199         if (now && !conv && list_empty(&r->res_convertqueue) &&
2200             list_empty(&r->res_waitqueue))
2201                 return 1;
2202
2203         /*
2204          * 6-4: Once a lock request is in the queue of ungranted new requests,
2205          * it cannot be granted until the queue of ungranted conversion
2206          * requests is empty, all ungranted new requests ahead of it are
2207          * granted and/or canceled, and it is compatible with the granted mode
2208          * of the most restrictive lock granted on the resource.
2209          */
2210
2211         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2212             first_in_list(lkb, &r->res_waitqueue))
2213                 return 1;
2214
2215         return 0;
2216 }
2217
2218 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2219                           int recover, int *err)
2220 {
2221         int rv;
2222         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2223         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2224
2225         if (err)
2226                 *err = 0;
2227
2228         rv = _can_be_granted(r, lkb, now, recover);
2229         if (rv)
2230                 goto out;
2231
2232         /*
2233          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2234          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2235          * cancels one of the locks.
2236          */
2237
2238         if (is_convert && can_be_queued(lkb) &&
2239             conversion_deadlock_detect(r, lkb)) {
2240                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2241                         lkb->lkb_grmode = DLM_LOCK_NL;
2242                         set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2243                 } else if (err) {
2244                         *err = -EDEADLK;
2245                 } else {
2246                         log_print("can_be_granted deadlock %x now %d",
2247                                   lkb->lkb_id, now);
2248                         dlm_dump_rsb(r);
2249                 }
2250                 goto out;
2251         }
2252
2253         /*
2254          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2255          * to grant a request in a mode other than the normal rqmode.  It's a
2256          * simple way to provide a big optimization to applications that can
2257          * use them.
2258          */
2259
2260         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2261                 alt = DLM_LOCK_PR;
2262         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2263                 alt = DLM_LOCK_CW;
2264
2265         if (alt) {
2266                 lkb->lkb_rqmode = alt;
2267                 rv = _can_be_granted(r, lkb, now, 0);
2268                 if (rv)
2269                         set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2270                 else
2271                         lkb->lkb_rqmode = rqmode;
2272         }
2273  out:
2274         return rv;
2275 }
2276
2277 /* Returns the highest requested mode of all blocked conversions; sets
2278    cw if there's a blocked conversion to DLM_LOCK_CW. */
2279
2280 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2281                                  unsigned int *count)
2282 {
2283         struct dlm_lkb *lkb, *s;
2284         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2285         int hi, demoted, quit, grant_restart, demote_restart;
2286         int deadlk;
2287
2288         quit = 0;
2289  restart:
2290         grant_restart = 0;
2291         demote_restart = 0;
2292         hi = DLM_LOCK_IV;
2293
2294         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2295                 demoted = is_demoted(lkb);
2296                 deadlk = 0;
2297
2298                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2299                         grant_lock_pending(r, lkb);
2300                         grant_restart = 1;
2301                         if (count)
2302                                 (*count)++;
2303                         continue;
2304                 }
2305
2306                 if (!demoted && is_demoted(lkb)) {
2307                         log_print("WARN: pending demoted %x node %d %s",
2308                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2309                         demote_restart = 1;
2310                         continue;
2311                 }
2312
2313                 if (deadlk) {
2314                         /*
2315                          * If DLM_LKB_NODLKWT flag is set and conversion
2316                          * deadlock is detected, we request blocking AST and
2317                          * down (or cancel) conversion.
2318                          */
2319                         if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2320                                 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2321                                         queue_bast(r, lkb, lkb->lkb_rqmode);
2322                                         lkb->lkb_highbast = lkb->lkb_rqmode;
2323                                 }
2324                         } else {
2325                                 log_print("WARN: pending deadlock %x node %d %s",
2326                                           lkb->lkb_id, lkb->lkb_nodeid,
2327                                           r->res_name);
2328                                 dlm_dump_rsb(r);
2329                         }
2330                         continue;
2331                 }
2332
2333                 hi = max_t(int, lkb->lkb_rqmode, hi);
2334
2335                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2336                         *cw = 1;
2337         }
2338
2339         if (grant_restart)
2340                 goto restart;
2341         if (demote_restart && !quit) {
2342                 quit = 1;
2343                 goto restart;
2344         }
2345
2346         return max_t(int, high, hi);
2347 }
2348
2349 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2350                               unsigned int *count)
2351 {
2352         struct dlm_lkb *lkb, *s;
2353
2354         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2355                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2356                         grant_lock_pending(r, lkb);
2357                         if (count)
2358                                 (*count)++;
2359                 } else {
2360                         high = max_t(int, lkb->lkb_rqmode, high);
2361                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2362                                 *cw = 1;
2363                 }
2364         }
2365
2366         return high;
2367 }
2368
2369 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2370    on either the convert or waiting queue.
2371    high is the largest rqmode of all locks blocked on the convert or
2372    waiting queue. */
2373
2374 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2375 {
2376         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2377                 if (gr->lkb_highbast < DLM_LOCK_EX)
2378                         return 1;
2379                 return 0;
2380         }
2381
2382         if (gr->lkb_highbast < high &&
2383             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2384                 return 1;
2385         return 0;
2386 }
2387
2388 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2389 {
2390         struct dlm_lkb *lkb, *s;
2391         int high = DLM_LOCK_IV;
2392         int cw = 0;
2393
2394         if (!is_master(r)) {
2395                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2396                 dlm_dump_rsb(r);
2397                 return;
2398         }
2399
2400         high = grant_pending_convert(r, high, &cw, count);
2401         high = grant_pending_wait(r, high, &cw, count);
2402
2403         if (high == DLM_LOCK_IV)
2404                 return;
2405
2406         /*
2407          * If there are locks left on the wait/convert queue then send blocking
2408          * ASTs to granted locks based on the largest requested mode (high)
2409          * found above.
2410          */
2411
2412         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2413                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2414                         if (cw && high == DLM_LOCK_PR &&
2415                             lkb->lkb_grmode == DLM_LOCK_PR)
2416                                 queue_bast(r, lkb, DLM_LOCK_CW);
2417                         else
2418                                 queue_bast(r, lkb, high);
2419                         lkb->lkb_highbast = high;
2420                 }
2421         }
2422 }
2423
2424 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2425 {
2426         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2427             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2428                 if (gr->lkb_highbast < DLM_LOCK_EX)
2429                         return 1;
2430                 return 0;
2431         }
2432
2433         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2434                 return 1;
2435         return 0;
2436 }
2437
2438 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2439                             struct dlm_lkb *lkb)
2440 {
2441         struct dlm_lkb *gr;
2442
2443         list_for_each_entry(gr, head, lkb_statequeue) {
2444                 /* skip self when sending basts to convertqueue */
2445                 if (gr == lkb)
2446                         continue;
2447                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2448                         queue_bast(r, gr, lkb->lkb_rqmode);
2449                         gr->lkb_highbast = lkb->lkb_rqmode;
2450                 }
2451         }
2452 }
2453
2454 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2455 {
2456         send_bast_queue(r, &r->res_grantqueue, lkb);
2457 }
2458
2459 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2460 {
2461         send_bast_queue(r, &r->res_grantqueue, lkb);
2462         send_bast_queue(r, &r->res_convertqueue, lkb);
2463 }
2464
2465 /* set_master(r, lkb) -- set the master nodeid of a resource
2466
2467    The purpose of this function is to set the nodeid field in the given
2468    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2469    known, it can just be copied to the lkb and the function will return
2470    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2471    before it can be copied to the lkb.
2472
2473    When the rsb nodeid is being looked up remotely, the initial lkb
2474    causing the lookup is kept on the ls_waiters list waiting for the
2475    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2476    on the rsb's res_lookup list until the master is verified.
2477
2478    Return values:
2479    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2480    1: the rsb master is not available and the lkb has been placed on
2481       a wait queue
2482 */
2483
2484 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2485 {
2486         int our_nodeid = dlm_our_nodeid();
2487
2488         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2489                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2490                 r->res_first_lkid = lkb->lkb_id;
2491                 lkb->lkb_nodeid = r->res_nodeid;
2492                 return 0;
2493         }
2494
2495         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2496                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2497                 return 1;
2498         }
2499
2500         if (r->res_master_nodeid == our_nodeid) {
2501                 lkb->lkb_nodeid = 0;
2502                 return 0;
2503         }
2504
2505         if (r->res_master_nodeid) {
2506                 lkb->lkb_nodeid = r->res_master_nodeid;
2507                 return 0;
2508         }
2509
2510         if (dlm_dir_nodeid(r) == our_nodeid) {
2511                 /* This is a somewhat unusual case; find_rsb will usually
2512                    have set res_master_nodeid when dir nodeid is local, but
2513                    there are cases where we become the dir node after we've
2514                    past find_rsb and go through _request_lock again.
2515                    confirm_master() or process_lookup_list() needs to be
2516                    called after this. */
2517                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2518                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2519                           r->res_name);
2520                 r->res_master_nodeid = our_nodeid;
2521                 r->res_nodeid = 0;
2522                 lkb->lkb_nodeid = 0;
2523                 return 0;
2524         }
2525
2526         r->res_first_lkid = lkb->lkb_id;
2527         send_lookup(r, lkb);
2528         return 1;
2529 }
2530
2531 static void process_lookup_list(struct dlm_rsb *r)
2532 {
2533         struct dlm_lkb *lkb, *safe;
2534
2535         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2536                 list_del_init(&lkb->lkb_rsb_lookup);
2537                 _request_lock(r, lkb);
2538                 schedule();
2539         }
2540 }
2541
2542 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2543
2544 static void confirm_master(struct dlm_rsb *r, int error)
2545 {
2546         struct dlm_lkb *lkb;
2547
2548         if (!r->res_first_lkid)
2549                 return;
2550
2551         switch (error) {
2552         case 0:
2553         case -EINPROGRESS:
2554                 r->res_first_lkid = 0;
2555                 process_lookup_list(r);
2556                 break;
2557
2558         case -EAGAIN:
2559         case -EBADR:
2560         case -ENOTBLK:
2561                 /* the remote request failed and won't be retried (it was
2562                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2563                    lkb the first_lkid */
2564
2565                 r->res_first_lkid = 0;
2566
2567                 if (!list_empty(&r->res_lookup)) {
2568                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2569                                          lkb_rsb_lookup);
2570                         list_del_init(&lkb->lkb_rsb_lookup);
2571                         r->res_first_lkid = lkb->lkb_id;
2572                         _request_lock(r, lkb);
2573                 }
2574                 break;
2575
2576         default:
2577                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2578         }
2579 }
2580
2581 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2582                          int namelen, void (*ast)(void *astparam),
2583                          void *astparam,
2584                          void (*bast)(void *astparam, int mode),
2585                          struct dlm_args *args)
2586 {
2587         int rv = -EINVAL;
2588
2589         /* check for invalid arg usage */
2590
2591         if (mode < 0 || mode > DLM_LOCK_EX)
2592                 goto out;
2593
2594         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2595                 goto out;
2596
2597         if (flags & DLM_LKF_CANCEL)
2598                 goto out;
2599
2600         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2601                 goto out;
2602
2603         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2604                 goto out;
2605
2606         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2607                 goto out;
2608
2609         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2610                 goto out;
2611
2612         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2613                 goto out;
2614
2615         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2616                 goto out;
2617
2618         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2619                 goto out;
2620
2621         if (!ast || !lksb)
2622                 goto out;
2623
2624         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2625                 goto out;
2626
2627         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2628                 goto out;
2629
2630         /* these args will be copied to the lkb in validate_lock_args,
2631            it cannot be done now because when converting locks, fields in
2632            an active lkb cannot be modified before locking the rsb */
2633
2634         args->flags = flags;
2635         args->astfn = ast;
2636         args->astparam = astparam;
2637         args->bastfn = bast;
2638         args->mode = mode;
2639         args->lksb = lksb;
2640         rv = 0;
2641  out:
2642         return rv;
2643 }
2644
2645 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2646 {
2647         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2648                       DLM_LKF_FORCEUNLOCK))
2649                 return -EINVAL;
2650
2651         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2652                 return -EINVAL;
2653
2654         args->flags = flags;
2655         args->astparam = astarg;
2656         return 0;
2657 }
2658
2659 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2660                               struct dlm_args *args)
2661 {
2662         int rv = -EBUSY;
2663
2664         if (args->flags & DLM_LKF_CONVERT) {
2665                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2666                         goto out;
2667
2668                 /* lock not allowed if there's any op in progress */
2669                 if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))
2670                         goto out;
2671
2672                 if (is_overlap(lkb))
2673                         goto out;
2674
2675                 rv = -EINVAL;
2676                 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2677                         goto out;
2678
2679                 if (args->flags & DLM_LKF_QUECVT &&
2680                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2681                         goto out;
2682         }
2683
2684         lkb->lkb_exflags = args->flags;
2685         dlm_set_sbflags_val(lkb, 0);
2686         lkb->lkb_astfn = args->astfn;
2687         lkb->lkb_astparam = args->astparam;
2688         lkb->lkb_bastfn = args->bastfn;
2689         lkb->lkb_rqmode = args->mode;
2690         lkb->lkb_lksb = args->lksb;
2691         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2692         lkb->lkb_ownpid = (int) current->pid;
2693         rv = 0;
2694  out:
2695         switch (rv) {
2696         case 0:
2697                 break;
2698         case -EINVAL:
2699                 /* annoy the user because dlm usage is wrong */
2700                 WARN_ON(1);
2701                 log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2702                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2703                           lkb->lkb_status, lkb->lkb_wait_type,
2704                           lkb->lkb_resource->res_name);
2705                 break;
2706         default:
2707                 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2708                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2709                           lkb->lkb_status, lkb->lkb_wait_type,
2710                           lkb->lkb_resource->res_name);
2711                 break;
2712         }
2713
2714         return rv;
2715 }
2716
2717 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2718    for success */
2719
2720 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2721    because there may be a lookup in progress and it's valid to do
2722    cancel/unlockf on it */
2723
2724 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2725 {
2726         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2727         int rv = -EBUSY;
2728
2729         /* normal unlock not allowed if there's any op in progress */
2730         if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2731             (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)))
2732                 goto out;
2733
2734         /* an lkb may be waiting for an rsb lookup to complete where the
2735            lookup was initiated by another lock */
2736
2737         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2738                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2739                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2740                         list_del_init(&lkb->lkb_rsb_lookup);
2741                         queue_cast(lkb->lkb_resource, lkb,
2742                                    args->flags & DLM_LKF_CANCEL ?
2743                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2744                         unhold_lkb(lkb); /* undoes create_lkb() */
2745                 }
2746                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2747                 goto out;
2748         }
2749
2750         rv = -EINVAL;
2751         if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2752                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2753                 dlm_print_lkb(lkb);
2754                 goto out;
2755         }
2756
2757         /* an lkb may still exist even though the lock is EOL'ed due to a
2758          * cancel, unlock or failed noqueue request; an app can't use these
2759          * locks; return same error as if the lkid had not been found at all
2760          */
2761
2762         if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2763                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2764                 rv = -ENOENT;
2765                 goto out;
2766         }
2767
2768         /* cancel not allowed with another cancel/unlock in progress */
2769
2770         if (args->flags & DLM_LKF_CANCEL) {
2771                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2772                         goto out;
2773
2774                 if (is_overlap(lkb))
2775                         goto out;
2776
2777                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2778                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2779                         rv = -EBUSY;
2780                         goto out;
2781                 }
2782
2783                 /* there's nothing to cancel */
2784                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2785                     !lkb->lkb_wait_type) {
2786                         rv = -EBUSY;
2787                         goto out;
2788                 }
2789
2790                 switch (lkb->lkb_wait_type) {
2791                 case DLM_MSG_LOOKUP:
2792                 case DLM_MSG_REQUEST:
2793                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2794                         rv = -EBUSY;
2795                         goto out;
2796                 case DLM_MSG_UNLOCK:
2797                 case DLM_MSG_CANCEL:
2798                         goto out;
2799                 }
2800                 /* add_to_waiters() will set OVERLAP_CANCEL */
2801                 goto out_ok;
2802         }
2803
2804         /* do we need to allow a force-unlock if there's a normal unlock
2805            already in progress?  in what conditions could the normal unlock
2806            fail such that we'd want to send a force-unlock to be sure? */
2807
2808         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2809                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2810                         goto out;
2811
2812                 if (is_overlap_unlock(lkb))
2813                         goto out;
2814
2815                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2816                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2817                         rv = -EBUSY;
2818                         goto out;
2819                 }
2820
2821                 switch (lkb->lkb_wait_type) {
2822                 case DLM_MSG_LOOKUP:
2823                 case DLM_MSG_REQUEST:
2824                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2825                         rv = -EBUSY;
2826                         goto out;
2827                 case DLM_MSG_UNLOCK:
2828                         goto out;
2829                 }
2830                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2831         }
2832
2833  out_ok:
2834         /* an overlapping op shouldn't blow away exflags from other op */
2835         lkb->lkb_exflags |= args->flags;
2836         dlm_set_sbflags_val(lkb, 0);
2837         lkb->lkb_astparam = args->astparam;
2838         rv = 0;
2839  out:
2840         switch (rv) {
2841         case 0:
2842                 break;
2843         case -EINVAL:
2844                 /* annoy the user because dlm usage is wrong */
2845                 WARN_ON(1);
2846                 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2847                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2848                           args->flags, lkb->lkb_wait_type,
2849                           lkb->lkb_resource->res_name);
2850                 break;
2851         default:
2852                 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2853                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2854                           args->flags, lkb->lkb_wait_type,
2855                           lkb->lkb_resource->res_name);
2856                 break;
2857         }
2858
2859         return rv;
2860 }
2861
2862 /*
2863  * Four stage 4 varieties:
2864  * do_request(), do_convert(), do_unlock(), do_cancel()
2865  * These are called on the master node for the given lock and
2866  * from the central locking logic.
2867  */
2868
2869 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2870 {
2871         int error = 0;
2872
2873         if (can_be_granted(r, lkb, 1, 0, NULL)) {
2874                 grant_lock(r, lkb);
2875                 queue_cast(r, lkb, 0);
2876                 goto out;
2877         }
2878
2879         if (can_be_queued(lkb)) {
2880                 error = -EINPROGRESS;
2881                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2882                 goto out;
2883         }
2884
2885         error = -EAGAIN;
2886         queue_cast(r, lkb, -EAGAIN);
2887  out:
2888         return error;
2889 }
2890
2891 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2892                                int error)
2893 {
2894         switch (error) {
2895         case -EAGAIN:
2896                 if (force_blocking_asts(lkb))
2897                         send_blocking_asts_all(r, lkb);
2898                 break;
2899         case -EINPROGRESS:
2900                 send_blocking_asts(r, lkb);
2901                 break;
2902         }
2903 }
2904
2905 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2906 {
2907         int error = 0;
2908         int deadlk = 0;
2909
2910         /* changing an existing lock may allow others to be granted */
2911
2912         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
2913                 grant_lock(r, lkb);
2914                 queue_cast(r, lkb, 0);
2915                 goto out;
2916         }
2917
2918         /* can_be_granted() detected that this lock would block in a conversion
2919            deadlock, so we leave it on the granted queue and return EDEADLK in
2920            the ast for the convert. */
2921
2922         if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2923                 /* it's left on the granted queue */
2924                 revert_lock(r, lkb);
2925                 queue_cast(r, lkb, -EDEADLK);
2926                 error = -EDEADLK;
2927                 goto out;
2928         }
2929
2930         /* is_demoted() means the can_be_granted() above set the grmode
2931            to NL, and left us on the granted queue.  This auto-demotion
2932            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2933            now grantable.  We have to try to grant other converting locks
2934            before we try again to grant this one. */
2935
2936         if (is_demoted(lkb)) {
2937                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2938                 if (_can_be_granted(r, lkb, 1, 0)) {
2939                         grant_lock(r, lkb);
2940                         queue_cast(r, lkb, 0);
2941                         goto out;
2942                 }
2943                 /* else fall through and move to convert queue */
2944         }
2945
2946         if (can_be_queued(lkb)) {
2947                 error = -EINPROGRESS;
2948                 del_lkb(r, lkb);
2949                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2950                 goto out;
2951         }
2952
2953         error = -EAGAIN;
2954         queue_cast(r, lkb, -EAGAIN);
2955  out:
2956         return error;
2957 }
2958
2959 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2960                                int error)
2961 {
2962         switch (error) {
2963         case 0:
2964                 grant_pending_locks(r, NULL);
2965                 /* grant_pending_locks also sends basts */
2966                 break;
2967         case -EAGAIN:
2968                 if (force_blocking_asts(lkb))
2969                         send_blocking_asts_all(r, lkb);
2970                 break;
2971         case -EINPROGRESS:
2972                 send_blocking_asts(r, lkb);
2973                 break;
2974         }
2975 }
2976
2977 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2978 {
2979         remove_lock(r, lkb);
2980         queue_cast(r, lkb, -DLM_EUNLOCK);
2981         return -DLM_EUNLOCK;
2982 }
2983
2984 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2985                               int error)
2986 {
2987         grant_pending_locks(r, NULL);
2988 }
2989
2990 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2991
2992 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2993 {
2994         int error;
2995
2996         error = revert_lock(r, lkb);
2997         if (error) {
2998                 queue_cast(r, lkb, -DLM_ECANCEL);
2999                 return -DLM_ECANCEL;
3000         }
3001         return 0;
3002 }
3003
3004 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3005                               int error)
3006 {
3007         if (error)
3008                 grant_pending_locks(r, NULL);
3009 }
3010
3011 /*
3012  * Four stage 3 varieties:
3013  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3014  */
3015
3016 /* add a new lkb to a possibly new rsb, called by requesting process */
3017
3018 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3019 {
3020         int error;
3021
3022         /* set_master: sets lkb nodeid from r */
3023
3024         error = set_master(r, lkb);
3025         if (error < 0)
3026                 goto out;
3027         if (error) {
3028                 error = 0;
3029                 goto out;
3030         }
3031
3032         if (is_remote(r)) {
3033                 /* receive_request() calls do_request() on remote node */
3034                 error = send_request(r, lkb);
3035         } else {
3036                 error = do_request(r, lkb);
3037                 /* for remote locks the request_reply is sent
3038                    between do_request and do_request_effects */
3039                 do_request_effects(r, lkb, error);
3040         }
3041  out:
3042         return error;
3043 }
3044
3045 /* change some property of an existing lkb, e.g. mode */
3046
3047 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3048 {
3049         int error;
3050
3051         if (is_remote(r)) {
3052                 /* receive_convert() calls do_convert() on remote node */
3053                 error = send_convert(r, lkb);
3054         } else {
3055                 error = do_convert(r, lkb);
3056                 /* for remote locks the convert_reply is sent
3057                    between do_convert and do_convert_effects */
3058                 do_convert_effects(r, lkb, error);
3059         }
3060
3061         return error;
3062 }
3063
3064 /* remove an existing lkb from the granted queue */
3065
3066 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3067 {
3068         int error;
3069
3070         if (is_remote(r)) {
3071                 /* receive_unlock() calls do_unlock() on remote node */
3072                 error = send_unlock(r, lkb);
3073         } else {
3074                 error = do_unlock(r, lkb);
3075                 /* for remote locks the unlock_reply is sent
3076                    between do_unlock and do_unlock_effects */
3077                 do_unlock_effects(r, lkb, error);
3078         }
3079
3080         return error;
3081 }
3082
3083 /* remove an existing lkb from the convert or wait queue */
3084
3085 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3086 {
3087         int error;
3088
3089         if (is_remote(r)) {
3090                 /* receive_cancel() calls do_cancel() on remote node */
3091                 error = send_cancel(r, lkb);
3092         } else {
3093                 error = do_cancel(r, lkb);
3094                 /* for remote locks the cancel_reply is sent
3095                    between do_cancel and do_cancel_effects */
3096                 do_cancel_effects(r, lkb, error);
3097         }
3098
3099         return error;
3100 }
3101
3102 /*
3103  * Four stage 2 varieties:
3104  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3105  */
3106
3107 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3108                         const void *name, int len,
3109                         struct dlm_args *args)
3110 {
3111         struct dlm_rsb *r;
3112         int error;
3113
3114         error = validate_lock_args(ls, lkb, args);
3115         if (error)
3116                 return error;
3117
3118         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3119         if (error)
3120                 return error;
3121
3122         lock_rsb(r);
3123
3124         attach_lkb(r, lkb);
3125         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3126
3127         error = _request_lock(r, lkb);
3128
3129         unlock_rsb(r);
3130         put_rsb(r);
3131         return error;
3132 }
3133
3134 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3135                         struct dlm_args *args)
3136 {
3137         struct dlm_rsb *r;
3138         int error;
3139
3140         r = lkb->lkb_resource;
3141
3142         hold_rsb(r);
3143         lock_rsb(r);
3144
3145         error = validate_lock_args(ls, lkb, args);
3146         if (error)
3147                 goto out;
3148
3149         error = _convert_lock(r, lkb);
3150  out:
3151         unlock_rsb(r);
3152         put_rsb(r);
3153         return error;
3154 }
3155
3156 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3157                        struct dlm_args *args)
3158 {
3159         struct dlm_rsb *r;
3160         int error;
3161
3162         r = lkb->lkb_resource;
3163
3164         hold_rsb(r);
3165         lock_rsb(r);
3166
3167         error = validate_unlock_args(lkb, args);
3168         if (error)
3169                 goto out;
3170
3171         error = _unlock_lock(r, lkb);
3172  out:
3173         unlock_rsb(r);
3174         put_rsb(r);
3175         return error;
3176 }
3177
3178 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3179                        struct dlm_args *args)
3180 {
3181         struct dlm_rsb *r;
3182         int error;
3183
3184         r = lkb->lkb_resource;
3185
3186         hold_rsb(r);
3187         lock_rsb(r);
3188
3189         error = validate_unlock_args(lkb, args);
3190         if (error)
3191                 goto out;
3192
3193         error = _cancel_lock(r, lkb);
3194  out:
3195         unlock_rsb(r);
3196         put_rsb(r);
3197         return error;
3198 }
3199
3200 /*
3201  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3202  */
3203
3204 int dlm_lock(dlm_lockspace_t *lockspace,
3205              int mode,
3206              struct dlm_lksb *lksb,
3207              uint32_t flags,
3208              const void *name,
3209              unsigned int namelen,
3210              uint32_t parent_lkid,
3211              void (*ast) (void *astarg),
3212              void *astarg,
3213              void (*bast) (void *astarg, int mode))
3214 {
3215         struct dlm_ls *ls;
3216         struct dlm_lkb *lkb;
3217         struct dlm_args args;
3218         int error, convert = flags & DLM_LKF_CONVERT;
3219
3220         ls = dlm_find_lockspace_local(lockspace);
3221         if (!ls)
3222                 return -EINVAL;
3223
3224         dlm_lock_recovery(ls);
3225
3226         if (convert)
3227                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3228         else
3229                 error = create_lkb(ls, &lkb);
3230
3231         if (error)
3232                 goto out;
3233
3234         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3235
3236         error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3237                               &args);
3238         if (error)
3239                 goto out_put;
3240
3241         if (convert)
3242                 error = convert_lock(ls, lkb, &args);
3243         else
3244                 error = request_lock(ls, lkb, name, namelen, &args);
3245
3246         if (error == -EINPROGRESS)
3247                 error = 0;
3248  out_put:
3249         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3250
3251         if (convert || error)
3252                 __put_lkb(ls, lkb);
3253         if (error == -EAGAIN || error == -EDEADLK)
3254                 error = 0;
3255  out:
3256         dlm_unlock_recovery(ls);
3257         dlm_put_lockspace(ls);
3258         return error;
3259 }
3260
3261 int dlm_unlock(dlm_lockspace_t *lockspace,
3262                uint32_t lkid,
3263                uint32_t flags,
3264                struct dlm_lksb *lksb,
3265                void *astarg)
3266 {
3267         struct dlm_ls *ls;
3268         struct dlm_lkb *lkb;
3269         struct dlm_args args;
3270         int error;
3271
3272         ls = dlm_find_lockspace_local(lockspace);
3273         if (!ls)
3274                 return -EINVAL;
3275
3276         dlm_lock_recovery(ls);
3277
3278         error = find_lkb(ls, lkid, &lkb);
3279         if (error)
3280                 goto out;
3281
3282         trace_dlm_unlock_start(ls, lkb, flags);
3283
3284         error = set_unlock_args(flags, astarg, &args);
3285         if (error)
3286                 goto out_put;
3287
3288         if (flags & DLM_LKF_CANCEL)
3289                 error = cancel_lock(ls, lkb, &args);
3290         else
3291                 error = unlock_lock(ls, lkb, &args);
3292
3293         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3294                 error = 0;
3295         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3296                 error = 0;
3297  out_put:
3298         trace_dlm_unlock_end(ls, lkb, flags, error);
3299
3300         dlm_put_lkb(lkb);
3301  out:
3302         dlm_unlock_recovery(ls);
3303         dlm_put_lockspace(ls);
3304         return error;
3305 }
3306
3307 /*
3308  * send/receive routines for remote operations and replies
3309  *
3310  * send_args
3311  * send_common
3312  * send_request                 receive_request
3313  * send_convert                 receive_convert
3314  * send_unlock                  receive_unlock
3315  * send_cancel                  receive_cancel
3316  * send_grant                   receive_grant
3317  * send_bast                    receive_bast
3318  * send_lookup                  receive_lookup
3319  * send_remove                  receive_remove
3320  *
3321  *                              send_common_reply
3322  * receive_request_reply        send_request_reply
3323  * receive_convert_reply        send_convert_reply
3324  * receive_unlock_reply         send_unlock_reply
3325  * receive_cancel_reply         send_cancel_reply
3326  * receive_lookup_reply         send_lookup_reply
3327  */
3328
3329 static int _create_message(struct dlm_ls *ls, int mb_len,
3330                            int to_nodeid, int mstype,
3331                            struct dlm_message **ms_ret,
3332                            struct dlm_mhandle **mh_ret,
3333                            gfp_t allocation)
3334 {
3335         struct dlm_message *ms;
3336         struct dlm_mhandle *mh;
3337         char *mb;
3338
3339         /* get_buffer gives us a message handle (mh) that we need to
3340            pass into midcomms_commit and a message buffer (mb) that we
3341            write our data into */
3342
3343         mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
3344         if (!mh)
3345                 return -ENOBUFS;
3346
3347         ms = (struct dlm_message *) mb;
3348
3349         ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3350         ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3351         ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3352         ms->m_header.h_length = cpu_to_le16(mb_len);
3353         ms->m_header.h_cmd = DLM_MSG;
3354
3355         ms->m_type = cpu_to_le32(mstype);
3356
3357         *mh_ret = mh;
3358         *ms_ret = ms;
3359         return 0;
3360 }
3361
3362 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3363                           int to_nodeid, int mstype,
3364                           struct dlm_message **ms_ret,
3365                           struct dlm_mhandle **mh_ret,
3366                           gfp_t allocation)
3367 {
3368         int mb_len = sizeof(struct dlm_message);
3369
3370         switch (mstype) {
3371         case DLM_MSG_REQUEST:
3372         case DLM_MSG_LOOKUP:
3373         case DLM_MSG_REMOVE:
3374                 mb_len += r->res_length;
3375                 break;
3376         case DLM_MSG_CONVERT:
3377         case DLM_MSG_UNLOCK:
3378         case DLM_MSG_REQUEST_REPLY:
3379         case DLM_MSG_CONVERT_REPLY:
3380         case DLM_MSG_GRANT:
3381                 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3382                         mb_len += r->res_ls->ls_lvblen;
3383                 break;
3384         }
3385
3386         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3387                                ms_ret, mh_ret, allocation);
3388 }
3389
3390 /* further lowcomms enhancements or alternate implementations may make
3391    the return value from this function useful at some point */
3392
3393 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3394                         const void *name, int namelen)
3395 {
3396         dlm_midcomms_commit_mhandle(mh, name, namelen);
3397         return 0;
3398 }
3399
3400 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3401                       struct dlm_message *ms)
3402 {
3403         ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3404         ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3405         ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3406         ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3407         ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3408         ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3409         ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3410         ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3411         ms->m_status   = cpu_to_le32(lkb->lkb_status);
3412         ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3413         ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3414         ms->m_hash     = cpu_to_le32(r->res_hash);
3415
3416         /* m_result and m_bastmode are set from function args,
3417            not from lkb fields */
3418
3419         if (lkb->lkb_bastfn)
3420                 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3421         if (lkb->lkb_astfn)
3422                 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3423
3424         /* compare with switch in create_message; send_remove() doesn't
3425            use send_args() */
3426
3427         switch (ms->m_type) {
3428         case cpu_to_le32(DLM_MSG_REQUEST):
3429         case cpu_to_le32(DLM_MSG_LOOKUP):
3430                 memcpy(ms->m_extra, r->res_name, r->res_length);
3431                 break;
3432         case cpu_to_le32(DLM_MSG_CONVERT):
3433         case cpu_to_le32(DLM_MSG_UNLOCK):
3434         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3435         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3436         case cpu_to_le32(DLM_MSG_GRANT):
3437                 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3438                         break;
3439                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3440                 break;
3441         }
3442 }
3443
3444 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3445 {
3446         struct dlm_message *ms;
3447         struct dlm_mhandle *mh;
3448         int to_nodeid, error;
3449
3450         to_nodeid = r->res_nodeid;
3451
3452         error = add_to_waiters(lkb, mstype, to_nodeid);
3453         if (error)
3454                 return error;
3455
3456         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3457         if (error)
3458                 goto fail;
3459
3460         send_args(r, lkb, ms);
3461
3462         error = send_message(mh, ms, r->res_name, r->res_length);
3463         if (error)
3464                 goto fail;
3465         return 0;
3466
3467  fail:
3468         remove_from_waiters(lkb, msg_reply_type(mstype));
3469         return error;
3470 }
3471
3472 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3473 {
3474         return send_common(r, lkb, DLM_MSG_REQUEST);
3475 }
3476
3477 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3478 {
3479         int error;
3480
3481         error = send_common(r, lkb, DLM_MSG_CONVERT);
3482
3483         /* down conversions go without a reply from the master */
3484         if (!error && down_conversion(lkb)) {
3485                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3486                 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3487                 r->res_ls->ls_local_ms.m_result = 0;
3488                 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3489         }
3490
3491         return error;
3492 }
3493
3494 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3495    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3496    that the master is still correct. */
3497
3498 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3499 {
3500         return send_common(r, lkb, DLM_MSG_UNLOCK);
3501 }
3502
3503 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3504 {
3505         return send_common(r, lkb, DLM_MSG_CANCEL);
3506 }
3507
3508 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3509 {
3510         struct dlm_message *ms;
3511         struct dlm_mhandle *mh;
3512         int to_nodeid, error;
3513
3514         to_nodeid = lkb->lkb_nodeid;
3515
3516         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
3517                                GFP_NOFS);
3518         if (error)
3519                 goto out;
3520
3521         send_args(r, lkb, ms);
3522
3523         ms->m_result = 0;
3524
3525         error = send_message(mh, ms, r->res_name, r->res_length);
3526  out:
3527         return error;
3528 }
3529
3530 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3531 {
3532         struct dlm_message *ms;
3533         struct dlm_mhandle *mh;
3534         int to_nodeid, error;
3535
3536         to_nodeid = lkb->lkb_nodeid;
3537
3538         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
3539                                GFP_NOFS);
3540         if (error)
3541                 goto out;
3542
3543         send_args(r, lkb, ms);
3544
3545         ms->m_bastmode = cpu_to_le32(mode);
3546
3547         error = send_message(mh, ms, r->res_name, r->res_length);
3548  out:
3549         return error;
3550 }
3551
3552 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3553 {
3554         struct dlm_message *ms;
3555         struct dlm_mhandle *mh;
3556         int to_nodeid, error;
3557
3558         to_nodeid = dlm_dir_nodeid(r);
3559
3560         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3561         if (error)
3562                 return error;
3563
3564         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
3565                                GFP_NOFS);
3566         if (error)
3567                 goto fail;
3568
3569         send_args(r, lkb, ms);
3570
3571         error = send_message(mh, ms, r->res_name, r->res_length);
3572         if (error)
3573                 goto fail;
3574         return 0;
3575
3576  fail:
3577         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3578         return error;
3579 }
3580
3581 static int send_remove(struct dlm_rsb *r)
3582 {
3583         struct dlm_message *ms;
3584         struct dlm_mhandle *mh;
3585         int to_nodeid, error;
3586
3587         to_nodeid = dlm_dir_nodeid(r);
3588
3589         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
3590                                GFP_ATOMIC);
3591         if (error)
3592                 goto out;
3593
3594         memcpy(ms->m_extra, r->res_name, r->res_length);
3595         ms->m_hash = cpu_to_le32(r->res_hash);
3596
3597         error = send_message(mh, ms, r->res_name, r->res_length);
3598  out:
3599         return error;
3600 }
3601
3602 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3603                              int mstype, int rv)
3604 {
3605         struct dlm_message *ms;
3606         struct dlm_mhandle *mh;
3607         int to_nodeid, error;
3608
3609         to_nodeid = lkb->lkb_nodeid;
3610
3611         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3612         if (error)
3613                 goto out;
3614
3615         send_args(r, lkb, ms);
3616
3617         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3618
3619         error = send_message(mh, ms, r->res_name, r->res_length);
3620  out:
3621         return error;
3622 }
3623
3624 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3625 {
3626         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3627 }
3628
3629 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3630 {
3631         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3632 }
3633
3634 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3635 {
3636         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3637 }
3638
3639 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3640 {
3641         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3642 }
3643
3644 static int send_lookup_reply(struct dlm_ls *ls,
3645                              const struct dlm_message *ms_in, int ret_nodeid,
3646                              int rv)
3647 {
3648         struct dlm_rsb *r = &ls->ls_local_rsb;
3649         struct dlm_message *ms;
3650         struct dlm_mhandle *mh;
3651         int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3652
3653         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
3654                                GFP_NOFS);
3655         if (error)
3656                 goto out;
3657
3658         ms->m_lkid = ms_in->m_lkid;
3659         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3660         ms->m_nodeid = cpu_to_le32(ret_nodeid);
3661
3662         error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3663  out:
3664         return error;
3665 }
3666
3667 /* which args we save from a received message depends heavily on the type
3668    of message, unlike the send side where we can safely send everything about
3669    the lkb for any type of message */
3670
3671 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3672 {
3673         lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3674         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3675         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3676 }
3677
3678 static void receive_flags_reply(struct dlm_lkb *lkb,
3679                                 const struct dlm_message *ms,
3680                                 bool local)
3681 {
3682         if (local)
3683                 return;
3684
3685         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3686         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3687 }
3688
3689 static int receive_extralen(const struct dlm_message *ms)
3690 {
3691         return (le16_to_cpu(ms->m_header.h_length) -
3692                 sizeof(struct dlm_message));
3693 }
3694
3695 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3696                        const struct dlm_message *ms)
3697 {
3698         int len;
3699
3700         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3701                 if (!lkb->lkb_lvbptr)
3702                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3703                 if (!lkb->lkb_lvbptr)
3704                         return -ENOMEM;
3705                 len = receive_extralen(ms);
3706                 if (len > ls->ls_lvblen)
3707                         len = ls->ls_lvblen;
3708                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3709         }
3710         return 0;
3711 }
3712
3713 static void fake_bastfn(void *astparam, int mode)
3714 {
3715         log_print("fake_bastfn should not be called");
3716 }
3717
3718 static void fake_astfn(void *astparam)
3719 {
3720         log_print("fake_astfn should not be called");
3721 }
3722
3723 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3724                                 const struct dlm_message *ms)
3725 {
3726         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3727         lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3728         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3729         lkb->lkb_grmode = DLM_LOCK_IV;
3730         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3731
3732         lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3733         lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3734
3735         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3736                 /* lkb was just created so there won't be an lvb yet */
3737                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3738                 if (!lkb->lkb_lvbptr)
3739                         return -ENOMEM;
3740         }
3741
3742         return 0;
3743 }
3744
3745 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3746                                 const struct dlm_message *ms)
3747 {
3748         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3749                 return -EBUSY;
3750
3751         if (receive_lvb(ls, lkb, ms))
3752                 return -ENOMEM;
3753
3754         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3755         lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3756
3757         return 0;
3758 }
3759
3760 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3761                                const struct dlm_message *ms)
3762 {
3763         if (receive_lvb(ls, lkb, ms))
3764                 return -ENOMEM;
3765         return 0;
3766 }
3767
3768 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3769    uses to send a reply and that the remote end uses to process the reply. */
3770
3771 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3772 {
3773         struct dlm_lkb *lkb = &ls->ls_local_lkb;
3774         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3775         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3776 }
3777
3778 /* This is called after the rsb is locked so that we can safely inspect
3779    fields in the lkb. */
3780
3781 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3782 {
3783         int from = le32_to_cpu(ms->m_header.h_nodeid);
3784         int error = 0;
3785
3786         /* currently mixing of user/kernel locks are not supported */
3787         if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3788             !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3789                 log_error(lkb->lkb_resource->res_ls,
3790                           "got user dlm message for a kernel lock");
3791                 error = -EINVAL;
3792                 goto out;
3793         }
3794
3795         switch (ms->m_type) {
3796         case cpu_to_le32(DLM_MSG_CONVERT):
3797         case cpu_to_le32(DLM_MSG_UNLOCK):
3798         case cpu_to_le32(DLM_MSG_CANCEL):
3799                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3800                         error = -EINVAL;
3801                 break;
3802
3803         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3804         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3805         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3806         case cpu_to_le32(DLM_MSG_GRANT):
3807         case cpu_to_le32(DLM_MSG_BAST):
3808                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3809                         error = -EINVAL;
3810                 break;
3811
3812         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3813                 if (!is_process_copy(lkb))
3814                         error = -EINVAL;
3815                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3816                         error = -EINVAL;
3817                 break;
3818
3819         default:
3820                 error = -EINVAL;
3821         }
3822
3823 out:
3824         if (error)
3825                 log_error(lkb->lkb_resource->res_ls,
3826                           "ignore invalid message %d from %d %x %x %x %d",
3827                           le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3828                           lkb->lkb_remid, dlm_iflags_val(lkb),
3829                           lkb->lkb_nodeid);
3830         return error;
3831 }
3832
3833 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3834 {
3835         struct dlm_lkb *lkb;
3836         struct dlm_rsb *r;
3837         int from_nodeid;
3838         int error, namelen = 0;
3839
3840         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3841
3842         error = create_lkb(ls, &lkb);
3843         if (error)
3844                 goto fail;
3845
3846         receive_flags(lkb, ms);
3847         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3848         error = receive_request_args(ls, lkb, ms);
3849         if (error) {
3850                 __put_lkb(ls, lkb);
3851                 goto fail;
3852         }
3853
3854         /* The dir node is the authority on whether we are the master
3855            for this rsb or not, so if the master sends us a request, we should
3856            recreate the rsb if we've destroyed it.   This race happens when we
3857            send a remove message to the dir node at the same time that the dir
3858            node sends us a request for the rsb. */
3859
3860         namelen = receive_extralen(ms);
3861
3862         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
3863                          R_RECEIVE_REQUEST, &r);
3864         if (error) {
3865                 __put_lkb(ls, lkb);
3866                 goto fail;
3867         }
3868
3869         lock_rsb(r);
3870
3871         if (r->res_master_nodeid != dlm_our_nodeid()) {
3872                 error = validate_master_nodeid(ls, r, from_nodeid);
3873                 if (error) {
3874                         unlock_rsb(r);
3875                         put_rsb(r);
3876                         __put_lkb(ls, lkb);
3877                         goto fail;
3878                 }
3879         }
3880
3881         attach_lkb(r, lkb);
3882         error = do_request(r, lkb);
3883         send_request_reply(r, lkb, error);
3884         do_request_effects(r, lkb, error);
3885
3886         unlock_rsb(r);
3887         put_rsb(r);
3888
3889         if (error == -EINPROGRESS)
3890                 error = 0;
3891         if (error)
3892                 dlm_put_lkb(lkb);
3893         return 0;
3894
3895  fail:
3896         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
3897            and do this receive_request again from process_lookup_list once
3898            we get the lookup reply.  This would avoid a many repeated
3899            ENOTBLK request failures when the lookup reply designating us
3900            as master is delayed. */
3901
3902         if (error != -ENOTBLK) {
3903                 log_limit(ls, "receive_request %x from %d %d",
3904                           le32_to_cpu(ms->m_lkid), from_nodeid, error);
3905         }
3906
3907         setup_local_lkb(ls, ms);
3908         send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3909         return error;
3910 }
3911
3912 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
3913 {
3914         struct dlm_lkb *lkb;
3915         struct dlm_rsb *r;
3916         int error, reply = 1;
3917
3918         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3919         if (error)
3920                 goto fail;
3921
3922         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3923                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3924                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3925                           (unsigned long long)lkb->lkb_recover_seq,
3926                           le32_to_cpu(ms->m_header.h_nodeid),
3927                           le32_to_cpu(ms->m_lkid));
3928                 error = -ENOENT;
3929                 dlm_put_lkb(lkb);
3930                 goto fail;
3931         }
3932
3933         r = lkb->lkb_resource;
3934
3935         hold_rsb(r);
3936         lock_rsb(r);
3937
3938         error = validate_message(lkb, ms);
3939         if (error)
3940                 goto out;
3941
3942         receive_flags(lkb, ms);
3943
3944         error = receive_convert_args(ls, lkb, ms);
3945         if (error) {
3946                 send_convert_reply(r, lkb, error);
3947                 goto out;
3948         }
3949
3950         reply = !down_conversion(lkb);
3951
3952         error = do_convert(r, lkb);
3953         if (reply)
3954                 send_convert_reply(r, lkb, error);
3955         do_convert_effects(r, lkb, error);
3956  out:
3957         unlock_rsb(r);
3958         put_rsb(r);
3959         dlm_put_lkb(lkb);
3960         return 0;
3961
3962  fail:
3963         setup_local_lkb(ls, ms);
3964         send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3965         return error;
3966 }
3967
3968 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
3969 {
3970         struct dlm_lkb *lkb;
3971         struct dlm_rsb *r;
3972         int error;
3973
3974         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3975         if (error)
3976                 goto fail;
3977
3978         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3979                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3980                           lkb->lkb_id, lkb->lkb_remid,
3981                           le32_to_cpu(ms->m_header.h_nodeid),
3982                           le32_to_cpu(ms->m_lkid));
3983                 error = -ENOENT;
3984                 dlm_put_lkb(lkb);
3985                 goto fail;
3986         }
3987
3988         r = lkb->lkb_resource;
3989
3990         hold_rsb(r);
3991         lock_rsb(r);
3992
3993         error = validate_message(lkb, ms);
3994         if (error)
3995                 goto out;
3996
3997         receive_flags(lkb, ms);
3998
3999         error = receive_unlock_args(ls, lkb, ms);
4000         if (error) {
4001                 send_unlock_reply(r, lkb, error);
4002                 goto out;
4003         }
4004
4005         error = do_unlock(r, lkb);
4006         send_unlock_reply(r, lkb, error);
4007         do_unlock_effects(r, lkb, error);
4008  out:
4009         unlock_rsb(r);
4010         put_rsb(r);
4011         dlm_put_lkb(lkb);
4012         return 0;
4013
4014  fail:
4015         setup_local_lkb(ls, ms);
4016         send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4017         return error;
4018 }
4019
4020 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4021 {
4022         struct dlm_lkb *lkb;
4023         struct dlm_rsb *r;
4024         int error;
4025
4026         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4027         if (error)
4028                 goto fail;
4029
4030         receive_flags(lkb, ms);
4031
4032         r = lkb->lkb_resource;
4033
4034         hold_rsb(r);
4035         lock_rsb(r);
4036
4037         error = validate_message(lkb, ms);
4038         if (error)
4039                 goto out;
4040
4041         error = do_cancel(r, lkb);
4042         send_cancel_reply(r, lkb, error);
4043         do_cancel_effects(r, lkb, error);
4044  out:
4045         unlock_rsb(r);
4046         put_rsb(r);
4047         dlm_put_lkb(lkb);
4048         return 0;
4049
4050  fail:
4051         setup_local_lkb(ls, ms);
4052         send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4053         return error;
4054 }
4055
4056 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4057 {
4058         struct dlm_lkb *lkb;
4059         struct dlm_rsb *r;
4060         int error;
4061
4062         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4063         if (error)
4064                 return error;
4065
4066         r = lkb->lkb_resource;
4067
4068         hold_rsb(r);
4069         lock_rsb(r);
4070
4071         error = validate_message(lkb, ms);
4072         if (error)
4073                 goto out;
4074
4075         receive_flags_reply(lkb, ms, false);
4076         if (is_altmode(lkb))
4077                 munge_altmode(lkb, ms);
4078         grant_lock_pc(r, lkb, ms);
4079         queue_cast(r, lkb, 0);
4080  out:
4081         unlock_rsb(r);
4082         put_rsb(r);
4083         dlm_put_lkb(lkb);
4084         return 0;
4085 }
4086
4087 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4088 {
4089         struct dlm_lkb *lkb;
4090         struct dlm_rsb *r;
4091         int error;
4092
4093         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4094         if (error)
4095                 return error;
4096
4097         r = lkb->lkb_resource;
4098
4099         hold_rsb(r);
4100         lock_rsb(r);
4101
4102         error = validate_message(lkb, ms);
4103         if (error)
4104                 goto out;
4105
4106         queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4107         lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4108  out:
4109         unlock_rsb(r);
4110         put_rsb(r);
4111         dlm_put_lkb(lkb);
4112         return 0;
4113 }
4114
4115 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4116 {
4117         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4118
4119         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4120         our_nodeid = dlm_our_nodeid();
4121
4122         len = receive_extralen(ms);
4123
4124         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4125                                   &ret_nodeid, NULL);
4126
4127         /* Optimization: we're master so treat lookup as a request */
4128         if (!error && ret_nodeid == our_nodeid) {
4129                 receive_request(ls, ms);
4130                 return;
4131         }
4132         send_lookup_reply(ls, ms, ret_nodeid, error);
4133 }
4134
4135 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4136 {
4137         char name[DLM_RESNAME_MAXLEN+1];
4138         struct dlm_rsb *r;
4139         uint32_t hash, b;
4140         int rv, len, dir_nodeid, from_nodeid;
4141
4142         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4143
4144         len = receive_extralen(ms);
4145
4146         if (len > DLM_RESNAME_MAXLEN) {
4147                 log_error(ls, "receive_remove from %d bad len %d",
4148                           from_nodeid, len);
4149                 return;
4150         }
4151
4152         dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4153         if (dir_nodeid != dlm_our_nodeid()) {
4154                 log_error(ls, "receive_remove from %d bad nodeid %d",
4155                           from_nodeid, dir_nodeid);
4156                 return;
4157         }
4158
4159         /* Look for name on rsbtbl.toss, if it's there, kill it.
4160            If it's on rsbtbl.keep, it's being used, and we should ignore this
4161            message.  This is an expected race between the dir node sending a
4162            request to the master node at the same time as the master node sends
4163            a remove to the dir node.  The resolution to that race is for the
4164            dir node to ignore the remove message, and the master node to
4165            recreate the master rsb when it gets a request from the dir node for
4166            an rsb it doesn't have. */
4167
4168         memset(name, 0, sizeof(name));
4169         memcpy(name, ms->m_extra, len);
4170
4171         hash = jhash(name, len, 0);
4172         b = hash & (ls->ls_rsbtbl_size - 1);
4173
4174         spin_lock(&ls->ls_rsbtbl[b].lock);
4175
4176         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4177         if (rv) {
4178                 /* verify the rsb is on keep list per comment above */
4179                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4180                 if (rv) {
4181                         /* should not happen */
4182                         log_error(ls, "receive_remove from %d not found %s",
4183                                   from_nodeid, name);
4184                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4185                         return;
4186                 }
4187                 if (r->res_master_nodeid != from_nodeid) {
4188                         /* should not happen */
4189                         log_error(ls, "receive_remove keep from %d master %d",
4190                                   from_nodeid, r->res_master_nodeid);
4191                         dlm_print_rsb(r);
4192                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4193                         return;
4194                 }
4195
4196                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4197                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4198                           name);
4199                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4200                 return;
4201         }
4202
4203         if (r->res_master_nodeid != from_nodeid) {
4204                 log_error(ls, "receive_remove toss from %d master %d",
4205                           from_nodeid, r->res_master_nodeid);
4206                 dlm_print_rsb(r);
4207                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4208                 return;
4209         }
4210
4211         if (kref_put(&r->res_ref, kill_rsb)) {
4212                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4213                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4214                 dlm_free_rsb(r);
4215         } else {
4216                 log_error(ls, "receive_remove from %d rsb ref error",
4217                           from_nodeid);
4218                 dlm_print_rsb(r);
4219                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4220         }
4221 }
4222
4223 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4224 {
4225         do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4226 }
4227
4228 static int receive_request_reply(struct dlm_ls *ls,
4229                                  const struct dlm_message *ms)
4230 {
4231         struct dlm_lkb *lkb;
4232         struct dlm_rsb *r;
4233         int error, mstype, result;
4234         int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4235
4236         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4237         if (error)
4238                 return error;
4239
4240         r = lkb->lkb_resource;
4241         hold_rsb(r);
4242         lock_rsb(r);
4243
4244         error = validate_message(lkb, ms);
4245         if (error)
4246                 goto out;
4247
4248         mstype = lkb->lkb_wait_type;
4249         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4250         if (error) {
4251                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4252                           lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4253                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4254                 dlm_dump_rsb(r);
4255                 goto out;
4256         }
4257
4258         /* Optimization: the dir node was also the master, so it took our
4259            lookup as a request and sent request reply instead of lookup reply */
4260         if (mstype == DLM_MSG_LOOKUP) {
4261                 r->res_master_nodeid = from_nodeid;
4262                 r->res_nodeid = from_nodeid;
4263                 lkb->lkb_nodeid = from_nodeid;
4264         }
4265
4266         /* this is the value returned from do_request() on the master */
4267         result = from_dlm_errno(le32_to_cpu(ms->m_result));
4268
4269         switch (result) {
4270         case -EAGAIN:
4271                 /* request would block (be queued) on remote master */
4272                 queue_cast(r, lkb, -EAGAIN);
4273                 confirm_master(r, -EAGAIN);
4274                 unhold_lkb(lkb); /* undoes create_lkb() */
4275                 break;
4276
4277         case -EINPROGRESS:
4278         case 0:
4279                 /* request was queued or granted on remote master */
4280                 receive_flags_reply(lkb, ms, false);
4281                 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4282                 if (is_altmode(lkb))
4283                         munge_altmode(lkb, ms);
4284                 if (result) {
4285                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4286                 } else {
4287                         grant_lock_pc(r, lkb, ms);
4288                         queue_cast(r, lkb, 0);
4289                 }
4290                 confirm_master(r, result);
4291                 break;
4292
4293         case -EBADR:
4294         case -ENOTBLK:
4295                 /* find_rsb failed to find rsb or rsb wasn't master */
4296                 log_limit(ls, "receive_request_reply %x from %d %d "
4297                           "master %d dir %d first %x %s", lkb->lkb_id,
4298                           from_nodeid, result, r->res_master_nodeid,
4299                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4300
4301                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4302                     r->res_master_nodeid != dlm_our_nodeid()) {
4303                         /* cause _request_lock->set_master->send_lookup */
4304                         r->res_master_nodeid = 0;
4305                         r->res_nodeid = -1;
4306                         lkb->lkb_nodeid = -1;
4307                 }
4308
4309                 if (is_overlap(lkb)) {
4310                         /* we'll ignore error in cancel/unlock reply */
4311                         queue_cast_overlap(r, lkb);
4312                         confirm_master(r, result);
4313                         unhold_lkb(lkb); /* undoes create_lkb() */
4314                 } else {
4315                         _request_lock(r, lkb);
4316
4317                         if (r->res_master_nodeid == dlm_our_nodeid())
4318                                 confirm_master(r, 0);
4319                 }
4320                 break;
4321
4322         default:
4323                 log_error(ls, "receive_request_reply %x error %d",
4324                           lkb->lkb_id, result);
4325         }
4326
4327         if ((result == 0 || result == -EINPROGRESS) &&
4328             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4329                 log_debug(ls, "receive_request_reply %x result %d unlock",
4330                           lkb->lkb_id, result);
4331                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4332                 send_unlock(r, lkb);
4333         } else if ((result == -EINPROGRESS) &&
4334                    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4335                                       &lkb->lkb_iflags)) {
4336                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4337                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4338                 send_cancel(r, lkb);
4339         } else {
4340                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4341                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4342         }
4343  out:
4344         unlock_rsb(r);
4345         put_rsb(r);
4346         dlm_put_lkb(lkb);
4347         return 0;
4348 }
4349
4350 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4351                                     const struct dlm_message *ms, bool local)
4352 {
4353         /* this is the value returned from do_convert() on the master */
4354         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4355         case -EAGAIN:
4356                 /* convert would block (be queued) on remote master */
4357                 queue_cast(r, lkb, -EAGAIN);
4358                 break;
4359
4360         case -EDEADLK:
4361                 receive_flags_reply(lkb, ms, local);
4362                 revert_lock_pc(r, lkb);
4363                 queue_cast(r, lkb, -EDEADLK);
4364                 break;
4365
4366         case -EINPROGRESS:
4367                 /* convert was queued on remote master */
4368                 receive_flags_reply(lkb, ms, local);
4369                 if (is_demoted(lkb))
4370                         munge_demoted(lkb);
4371                 del_lkb(r, lkb);
4372                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4373                 break;
4374
4375         case 0:
4376                 /* convert was granted on remote master */
4377                 receive_flags_reply(lkb, ms, local);
4378                 if (is_demoted(lkb))
4379                         munge_demoted(lkb);
4380                 grant_lock_pc(r, lkb, ms);
4381                 queue_cast(r, lkb, 0);
4382                 break;
4383
4384         default:
4385                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4386                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4387                           le32_to_cpu(ms->m_lkid),
4388                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4389                 dlm_print_rsb(r);
4390                 dlm_print_lkb(lkb);
4391         }
4392 }
4393
4394 static void _receive_convert_reply(struct dlm_lkb *lkb,
4395                                    const struct dlm_message *ms, bool local)
4396 {
4397         struct dlm_rsb *r = lkb->lkb_resource;
4398         int error;
4399
4400         hold_rsb(r);
4401         lock_rsb(r);
4402
4403         error = validate_message(lkb, ms);
4404         if (error)
4405                 goto out;
4406
4407         /* local reply can happen with waiters_mutex held */
4408         error = remove_from_waiters_ms(lkb, ms, local);
4409         if (error)
4410                 goto out;
4411
4412         __receive_convert_reply(r, lkb, ms, local);
4413  out:
4414         unlock_rsb(r);
4415         put_rsb(r);
4416 }
4417
4418 static int receive_convert_reply(struct dlm_ls *ls,
4419                                  const struct dlm_message *ms)
4420 {
4421         struct dlm_lkb *lkb;
4422         int error;
4423
4424         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4425         if (error)
4426                 return error;
4427
4428         _receive_convert_reply(lkb, ms, false);
4429         dlm_put_lkb(lkb);
4430         return 0;
4431 }
4432
4433 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4434                                   const struct dlm_message *ms, bool local)
4435 {
4436         struct dlm_rsb *r = lkb->lkb_resource;
4437         int error;
4438
4439         hold_rsb(r);
4440         lock_rsb(r);
4441
4442         error = validate_message(lkb, ms);
4443         if (error)
4444                 goto out;
4445
4446         /* local reply can happen with waiters_mutex held */
4447         error = remove_from_waiters_ms(lkb, ms, local);
4448         if (error)
4449                 goto out;
4450
4451         /* this is the value returned from do_unlock() on the master */
4452
4453         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4454         case -DLM_EUNLOCK:
4455                 receive_flags_reply(lkb, ms, local);
4456                 remove_lock_pc(r, lkb);
4457                 queue_cast(r, lkb, -DLM_EUNLOCK);
4458                 break;
4459         case -ENOENT:
4460                 break;
4461         default:
4462                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4463                           lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4464         }
4465  out:
4466         unlock_rsb(r);
4467         put_rsb(r);
4468 }
4469
4470 static int receive_unlock_reply(struct dlm_ls *ls,
4471                                 const struct dlm_message *ms)
4472 {
4473         struct dlm_lkb *lkb;
4474         int error;
4475
4476         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4477         if (error)
4478                 return error;
4479
4480         _receive_unlock_reply(lkb, ms, false);
4481         dlm_put_lkb(lkb);
4482         return 0;
4483 }
4484
4485 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4486                                   const struct dlm_message *ms, bool local)
4487 {
4488         struct dlm_rsb *r = lkb->lkb_resource;
4489         int error;
4490
4491         hold_rsb(r);
4492         lock_rsb(r);
4493
4494         error = validate_message(lkb, ms);
4495         if (error)
4496                 goto out;
4497
4498         /* local reply can happen with waiters_mutex held */
4499         error = remove_from_waiters_ms(lkb, ms, local);
4500         if (error)
4501                 goto out;
4502
4503         /* this is the value returned from do_cancel() on the master */
4504
4505         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4506         case -DLM_ECANCEL:
4507                 receive_flags_reply(lkb, ms, local);
4508                 revert_lock_pc(r, lkb);
4509                 queue_cast(r, lkb, -DLM_ECANCEL);
4510                 break;
4511         case 0:
4512                 break;
4513         default:
4514                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4515                           lkb->lkb_id,
4516                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4517         }
4518  out:
4519         unlock_rsb(r);
4520         put_rsb(r);
4521 }
4522
4523 static int receive_cancel_reply(struct dlm_ls *ls,
4524                                 const struct dlm_message *ms)
4525 {
4526         struct dlm_lkb *lkb;
4527         int error;
4528
4529         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4530         if (error)
4531                 return error;
4532
4533         _receive_cancel_reply(lkb, ms, false);
4534         dlm_put_lkb(lkb);
4535         return 0;
4536 }
4537
4538 static void receive_lookup_reply(struct dlm_ls *ls,
4539                                  const struct dlm_message *ms)
4540 {
4541         struct dlm_lkb *lkb;
4542         struct dlm_rsb *r;
4543         int error, ret_nodeid;
4544         int do_lookup_list = 0;
4545
4546         error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4547         if (error) {
4548                 log_error(ls, "%s no lkid %x", __func__,
4549                           le32_to_cpu(ms->m_lkid));
4550                 return;
4551         }
4552
4553         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4554            FIXME: will a non-zero error ever be returned? */
4555
4556         r = lkb->lkb_resource;
4557         hold_rsb(r);
4558         lock_rsb(r);
4559
4560         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4561         if (error)
4562                 goto out;
4563
4564         ret_nodeid = le32_to_cpu(ms->m_nodeid);
4565
4566         /* We sometimes receive a request from the dir node for this
4567            rsb before we've received the dir node's loookup_reply for it.
4568            The request from the dir node implies we're the master, so we set
4569            ourself as master in receive_request_reply, and verify here that
4570            we are indeed the master. */
4571
4572         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4573                 /* This should never happen */
4574                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4575                           "master %d dir %d our %d first %x %s",
4576                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4577                           ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4578                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4579         }
4580
4581         if (ret_nodeid == dlm_our_nodeid()) {
4582                 r->res_master_nodeid = ret_nodeid;
4583                 r->res_nodeid = 0;
4584                 do_lookup_list = 1;
4585                 r->res_first_lkid = 0;
4586         } else if (ret_nodeid == -1) {
4587                 /* the remote node doesn't believe it's the dir node */
4588                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4589                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4590                 r->res_master_nodeid = 0;
4591                 r->res_nodeid = -1;
4592                 lkb->lkb_nodeid = -1;
4593         } else {
4594                 /* set_master() will set lkb_nodeid from r */
4595                 r->res_master_nodeid = ret_nodeid;
4596                 r->res_nodeid = ret_nodeid;
4597         }
4598
4599         if (is_overlap(lkb)) {
4600                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4601                           lkb->lkb_id, dlm_iflags_val(lkb));
4602                 queue_cast_overlap(r, lkb);
4603                 unhold_lkb(lkb); /* undoes create_lkb() */
4604                 goto out_list;
4605         }
4606
4607         _request_lock(r, lkb);
4608
4609  out_list:
4610         if (do_lookup_list)
4611                 process_lookup_list(r);
4612  out:
4613         unlock_rsb(r);
4614         put_rsb(r);
4615         dlm_put_lkb(lkb);
4616 }
4617
4618 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4619                              uint32_t saved_seq)
4620 {
4621         int error = 0, noent = 0;
4622
4623         if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4624                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4625                           le32_to_cpu(ms->m_type),
4626                           le32_to_cpu(ms->m_header.h_nodeid),
4627                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4628                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4629                 return;
4630         }
4631
4632         switch (ms->m_type) {
4633
4634         /* messages sent to a master node */
4635
4636         case cpu_to_le32(DLM_MSG_REQUEST):
4637                 error = receive_request(ls, ms);
4638                 break;
4639
4640         case cpu_to_le32(DLM_MSG_CONVERT):
4641                 error = receive_convert(ls, ms);
4642                 break;
4643
4644         case cpu_to_le32(DLM_MSG_UNLOCK):
4645                 error = receive_unlock(ls, ms);
4646                 break;
4647
4648         case cpu_to_le32(DLM_MSG_CANCEL):
4649                 noent = 1;
4650                 error = receive_cancel(ls, ms);
4651                 break;
4652
4653         /* messages sent from a master node (replies to above) */
4654
4655         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4656                 error = receive_request_reply(ls, ms);
4657                 break;
4658
4659         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4660                 error = receive_convert_reply(ls, ms);
4661                 break;
4662
4663         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4664                 error = receive_unlock_reply(ls, ms);
4665                 break;
4666
4667         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4668                 error = receive_cancel_reply(ls, ms);
4669                 break;
4670
4671         /* messages sent from a master node (only two types of async msg) */
4672
4673         case cpu_to_le32(DLM_MSG_GRANT):
4674                 noent = 1;
4675                 error = receive_grant(ls, ms);
4676                 break;
4677
4678         case cpu_to_le32(DLM_MSG_BAST):
4679                 noent = 1;
4680                 error = receive_bast(ls, ms);
4681                 break;
4682
4683         /* messages sent to a dir node */
4684
4685         case cpu_to_le32(DLM_MSG_LOOKUP):
4686                 receive_lookup(ls, ms);
4687                 break;
4688
4689         case cpu_to_le32(DLM_MSG_REMOVE):
4690                 receive_remove(ls, ms);
4691                 break;
4692
4693         /* messages sent from a dir node (remove has no reply) */
4694
4695         case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4696                 receive_lookup_reply(ls, ms);
4697                 break;
4698
4699         /* other messages */
4700
4701         case cpu_to_le32(DLM_MSG_PURGE):
4702                 receive_purge(ls, ms);
4703                 break;
4704
4705         default:
4706                 log_error(ls, "unknown message type %d",
4707                           le32_to_cpu(ms->m_type));
4708         }
4709
4710         /*
4711          * When checking for ENOENT, we're checking the result of
4712          * find_lkb(m_remid):
4713          *
4714          * The lock id referenced in the message wasn't found.  This may
4715          * happen in normal usage for the async messages and cancel, so
4716          * only use log_debug for them.
4717          *
4718          * Some errors are expected and normal.
4719          */
4720
4721         if (error == -ENOENT && noent) {
4722                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4723                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4724                           le32_to_cpu(ms->m_header.h_nodeid),
4725                           le32_to_cpu(ms->m_lkid), saved_seq);
4726         } else if (error == -ENOENT) {
4727                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4728                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4729                           le32_to_cpu(ms->m_header.h_nodeid),
4730                           le32_to_cpu(ms->m_lkid), saved_seq);
4731
4732                 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4733                         dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4734         }
4735
4736         if (error == -EINVAL) {
4737                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4738                           "saved_seq %u",
4739                           le32_to_cpu(ms->m_type),
4740                           le32_to_cpu(ms->m_header.h_nodeid),
4741                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4742                           saved_seq);
4743         }
4744 }
4745
4746 /* If the lockspace is in recovery mode (locking stopped), then normal
4747    messages are saved on the requestqueue for processing after recovery is
4748    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4749    messages off the requestqueue before we process new ones. This occurs right
4750    after recovery completes when we transition from saving all messages on
4751    requestqueue, to processing all the saved messages, to processing new
4752    messages as they arrive. */
4753
4754 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4755                                 int nodeid)
4756 {
4757         if (dlm_locking_stopped(ls)) {
4758                 /* If we were a member of this lockspace, left, and rejoined,
4759                    other nodes may still be sending us messages from the
4760                    lockspace generation before we left. */
4761                 if (WARN_ON_ONCE(!ls->ls_generation)) {
4762                         log_limit(ls, "receive %d from %d ignore old gen",
4763                                   le32_to_cpu(ms->m_type), nodeid);
4764                         return;
4765                 }
4766
4767                 dlm_add_requestqueue(ls, nodeid, ms);
4768         } else {
4769                 dlm_wait_requestqueue(ls);
4770                 _receive_message(ls, ms, 0);
4771         }
4772 }
4773
4774 /* This is called by dlm_recoverd to process messages that were saved on
4775    the requestqueue. */
4776
4777 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4778                                uint32_t saved_seq)
4779 {
4780         _receive_message(ls, ms, saved_seq);
4781 }
4782
4783 /* This is called by the midcomms layer when something is received for
4784    the lockspace.  It could be either a MSG (normal message sent as part of
4785    standard locking activity) or an RCOM (recovery message sent as part of
4786    lockspace recovery). */
4787
4788 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4789 {
4790         const struct dlm_header *hd = &p->header;
4791         struct dlm_ls *ls;
4792         int type = 0;
4793
4794         switch (hd->h_cmd) {
4795         case DLM_MSG:
4796                 type = le32_to_cpu(p->message.m_type);
4797                 break;
4798         case DLM_RCOM:
4799                 type = le32_to_cpu(p->rcom.rc_type);
4800                 break;
4801         default:
4802                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4803                 return;
4804         }
4805
4806         if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4807                 log_print("invalid h_nodeid %d from %d lockspace %x",
4808                           le32_to_cpu(hd->h_nodeid), nodeid,
4809                           le32_to_cpu(hd->u.h_lockspace));
4810                 return;
4811         }
4812
4813         ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4814         if (!ls) {
4815                 if (dlm_config.ci_log_debug) {
4816                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4817                                 "%u from %d cmd %d type %d\n",
4818                                 le32_to_cpu(hd->u.h_lockspace), nodeid,
4819                                 hd->h_cmd, type);
4820                 }
4821
4822                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4823                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4824                 return;
4825         }
4826
4827         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4828            be inactive (in this ls) before transitioning to recovery mode */
4829
4830         down_read(&ls->ls_recv_active);
4831         if (hd->h_cmd == DLM_MSG)
4832                 dlm_receive_message(ls, &p->message, nodeid);
4833         else if (hd->h_cmd == DLM_RCOM)
4834                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4835         else
4836                 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4837                           hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4838         up_read(&ls->ls_recv_active);
4839
4840         dlm_put_lockspace(ls);
4841 }
4842
4843 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4844                                    struct dlm_message *ms_local)
4845 {
4846         if (middle_conversion(lkb)) {
4847                 hold_lkb(lkb);
4848                 memset(ms_local, 0, sizeof(struct dlm_message));
4849                 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
4850                 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
4851                 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4852                 _receive_convert_reply(lkb, ms_local, true);
4853
4854                 /* Same special case as in receive_rcom_lock_args() */
4855                 lkb->lkb_grmode = DLM_LOCK_IV;
4856                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4857                 unhold_lkb(lkb);
4858
4859         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4860                 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4861         }
4862
4863         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4864            conversions are async; there's no reply from the remote master */
4865 }
4866
4867 /* A waiting lkb needs recovery if the master node has failed, or
4868    the master node is changing (only when no directory is used) */
4869
4870 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4871                                  int dir_nodeid)
4872 {
4873         if (dlm_no_directory(ls))
4874                 return 1;
4875
4876         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4877                 return 1;
4878
4879         return 0;
4880 }
4881
4882 /* Recovery for locks that are waiting for replies from nodes that are now
4883    gone.  We can just complete unlocks and cancels by faking a reply from the
4884    dead node.  Requests and up-conversions we flag to be resent after
4885    recovery.  Down-conversions can just be completed with a fake reply like
4886    unlocks.  Conversions between PR and CW need special attention. */
4887
4888 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4889 {
4890         struct dlm_lkb *lkb, *safe;
4891         struct dlm_message *ms_local;
4892         int wait_type, local_unlock_result, local_cancel_result;
4893         int dir_nodeid;
4894
4895         ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
4896         if (!ms_local)
4897                 return;
4898
4899         mutex_lock(&ls->ls_waiters_mutex);
4900
4901         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4902
4903                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4904
4905                 /* exclude debug messages about unlocks because there can be so
4906                    many and they aren't very interesting */
4907
4908                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4909                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4910                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4911                                   lkb->lkb_id,
4912                                   lkb->lkb_remid,
4913                                   lkb->lkb_wait_type,
4914                                   lkb->lkb_resource->res_nodeid,
4915                                   lkb->lkb_nodeid,
4916                                   lkb->lkb_wait_nodeid,
4917                                   dir_nodeid);
4918                 }
4919
4920                 /* all outstanding lookups, regardless of destination  will be
4921                    resent after recovery is done */
4922
4923                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4924                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4925                         continue;
4926                 }
4927
4928                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4929                         continue;
4930
4931                 wait_type = lkb->lkb_wait_type;
4932                 local_unlock_result = -DLM_EUNLOCK;
4933                 local_cancel_result = -DLM_ECANCEL;
4934
4935                 /* Main reply may have been received leaving a zero wait_type,
4936                    but a reply for the overlapping op may not have been
4937                    received.  In that case we need to fake the appropriate
4938                    reply for the overlap op. */
4939
4940                 if (!wait_type) {
4941                         if (is_overlap_cancel(lkb)) {
4942                                 wait_type = DLM_MSG_CANCEL;
4943                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4944                                         local_cancel_result = 0;
4945                         }
4946                         if (is_overlap_unlock(lkb)) {
4947                                 wait_type = DLM_MSG_UNLOCK;
4948                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4949                                         local_unlock_result = -ENOENT;
4950                         }
4951
4952                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4953                                   lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
4954                                   local_cancel_result, local_unlock_result);
4955                 }
4956
4957                 switch (wait_type) {
4958
4959                 case DLM_MSG_REQUEST:
4960                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4961                         break;
4962
4963                 case DLM_MSG_CONVERT:
4964                         recover_convert_waiter(ls, lkb, ms_local);
4965                         break;
4966
4967                 case DLM_MSG_UNLOCK:
4968                         hold_lkb(lkb);
4969                         memset(ms_local, 0, sizeof(struct dlm_message));
4970                         ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
4971                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
4972                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4973                         _receive_unlock_reply(lkb, ms_local, true);
4974                         dlm_put_lkb(lkb);
4975                         break;
4976
4977                 case DLM_MSG_CANCEL:
4978                         hold_lkb(lkb);
4979                         memset(ms_local, 0, sizeof(struct dlm_message));
4980                         ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
4981                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
4982                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4983                         _receive_cancel_reply(lkb, ms_local, true);
4984                         dlm_put_lkb(lkb);
4985                         break;
4986
4987                 default:
4988                         log_error(ls, "invalid lkb wait_type %d %d",
4989                                   lkb->lkb_wait_type, wait_type);
4990                 }
4991                 schedule();
4992         }
4993         mutex_unlock(&ls->ls_waiters_mutex);
4994         kfree(ms_local);
4995 }
4996
4997 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4998 {
4999         struct dlm_lkb *lkb = NULL, *iter;
5000
5001         mutex_lock(&ls->ls_waiters_mutex);
5002         list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5003                 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5004                         hold_lkb(iter);
5005                         lkb = iter;
5006                         break;
5007                 }
5008         }
5009         mutex_unlock(&ls->ls_waiters_mutex);
5010
5011         return lkb;
5012 }
5013
5014 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5015    master or dir-node for r.  Processing the lkb may result in it being placed
5016    back on waiters. */
5017
5018 /* We do this after normal locking has been enabled and any saved messages
5019    (in requestqueue) have been processed.  We should be confident that at
5020    this point we won't get or process a reply to any of these waiting
5021    operations.  But, new ops may be coming in on the rsbs/locks here from
5022    userspace or remotely. */
5023
5024 /* there may have been an overlap unlock/cancel prior to recovery or after
5025    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5026    overlap flag would just have been set and nothing new sent.  we can be
5027    confident here than any replies to either the initial op or overlap ops
5028    prior to recovery have been received. */
5029
5030 int dlm_recover_waiters_post(struct dlm_ls *ls)
5031 {
5032         struct dlm_lkb *lkb;
5033         struct dlm_rsb *r;
5034         int error = 0, mstype, err, oc, ou;
5035
5036         while (1) {
5037                 if (dlm_locking_stopped(ls)) {
5038                         log_debug(ls, "recover_waiters_post aborted");
5039                         error = -EINTR;
5040                         break;
5041                 }
5042
5043                 lkb = find_resend_waiter(ls);
5044                 if (!lkb)
5045                         break;
5046
5047                 r = lkb->lkb_resource;
5048                 hold_rsb(r);
5049                 lock_rsb(r);
5050
5051                 mstype = lkb->lkb_wait_type;
5052                 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5053                                         &lkb->lkb_iflags);
5054                 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5055                                         &lkb->lkb_iflags);
5056                 err = 0;
5057
5058                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5059                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5060                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5061                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5062                           dlm_dir_nodeid(r), oc, ou);
5063
5064                 /* At this point we assume that we won't get a reply to any
5065                    previous op or overlap op on this lock.  First, do a big
5066                    remove_from_waiters() for all previous ops. */
5067
5068                 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5069                 lkb->lkb_wait_type = 0;
5070                 /* drop all wait_count references we still
5071                  * hold a reference for this iteration.
5072                  */
5073                 while (!atomic_dec_and_test(&lkb->lkb_wait_count))
5074                         unhold_lkb(lkb);
5075
5076                 mutex_lock(&ls->ls_waiters_mutex);
5077                 list_del_init(&lkb->lkb_wait_reply);
5078                 mutex_unlock(&ls->ls_waiters_mutex);
5079
5080                 if (oc || ou) {
5081                         /* do an unlock or cancel instead of resending */
5082                         switch (mstype) {
5083                         case DLM_MSG_LOOKUP:
5084                         case DLM_MSG_REQUEST:
5085                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5086                                                         -DLM_ECANCEL);
5087                                 unhold_lkb(lkb); /* undoes create_lkb() */
5088                                 break;
5089                         case DLM_MSG_CONVERT:
5090                                 if (oc) {
5091                                         queue_cast(r, lkb, -DLM_ECANCEL);
5092                                 } else {
5093                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5094                                         _unlock_lock(r, lkb);
5095                                 }
5096                                 break;
5097                         default:
5098                                 err = 1;
5099                         }
5100                 } else {
5101                         switch (mstype) {
5102                         case DLM_MSG_LOOKUP:
5103                         case DLM_MSG_REQUEST:
5104                                 _request_lock(r, lkb);
5105                                 if (is_master(r))
5106                                         confirm_master(r, 0);
5107                                 break;
5108                         case DLM_MSG_CONVERT:
5109                                 _convert_lock(r, lkb);
5110                                 break;
5111                         default:
5112                                 err = 1;
5113                         }
5114                 }
5115
5116                 if (err) {
5117                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5118                                   "dir_nodeid %d overlap %d %d",
5119                                   lkb->lkb_id, mstype, r->res_nodeid,
5120                                   dlm_dir_nodeid(r), oc, ou);
5121                 }
5122                 unlock_rsb(r);
5123                 put_rsb(r);
5124                 dlm_put_lkb(lkb);
5125         }
5126
5127         return error;
5128 }
5129
5130 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5131                               struct list_head *list)
5132 {
5133         struct dlm_lkb *lkb, *safe;
5134
5135         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5136                 if (!is_master_copy(lkb))
5137                         continue;
5138
5139                 /* don't purge lkbs we've added in recover_master_copy for
5140                    the current recovery seq */
5141
5142                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5143                         continue;
5144
5145                 del_lkb(r, lkb);
5146
5147                 /* this put should free the lkb */
5148                 if (!dlm_put_lkb(lkb))
5149                         log_error(ls, "purged mstcpy lkb not released");
5150         }
5151 }
5152
5153 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5154 {
5155         struct dlm_ls *ls = r->res_ls;
5156
5157         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5158         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5159         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5160 }
5161
5162 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5163                             struct list_head *list,
5164                             int nodeid_gone, unsigned int *count)
5165 {
5166         struct dlm_lkb *lkb, *safe;
5167
5168         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5169                 if (!is_master_copy(lkb))
5170                         continue;
5171
5172                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5173                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5174
5175                         /* tell recover_lvb to invalidate the lvb
5176                            because a node holding EX/PW failed */
5177                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5178                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5179                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5180                         }
5181
5182                         del_lkb(r, lkb);
5183
5184                         /* this put should free the lkb */
5185                         if (!dlm_put_lkb(lkb))
5186                                 log_error(ls, "purged dead lkb not released");
5187
5188                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5189
5190                         (*count)++;
5191                 }
5192         }
5193 }
5194
5195 /* Get rid of locks held by nodes that are gone. */
5196
5197 void dlm_recover_purge(struct dlm_ls *ls)
5198 {
5199         struct dlm_rsb *r;
5200         struct dlm_member *memb;
5201         int nodes_count = 0;
5202         int nodeid_gone = 0;
5203         unsigned int lkb_count = 0;
5204
5205         /* cache one removed nodeid to optimize the common
5206            case of a single node removed */
5207
5208         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5209                 nodes_count++;
5210                 nodeid_gone = memb->nodeid;
5211         }
5212
5213         if (!nodes_count)
5214                 return;
5215
5216         down_write(&ls->ls_root_sem);
5217         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5218                 hold_rsb(r);
5219                 lock_rsb(r);
5220                 if (is_master(r)) {
5221                         purge_dead_list(ls, r, &r->res_grantqueue,
5222                                         nodeid_gone, &lkb_count);
5223                         purge_dead_list(ls, r, &r->res_convertqueue,
5224                                         nodeid_gone, &lkb_count);
5225                         purge_dead_list(ls, r, &r->res_waitqueue,
5226                                         nodeid_gone, &lkb_count);
5227                 }
5228                 unlock_rsb(r);
5229                 unhold_rsb(r);
5230                 cond_resched();
5231         }
5232         up_write(&ls->ls_root_sem);
5233
5234         if (lkb_count)
5235                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5236                           lkb_count, nodes_count);
5237 }
5238
5239 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5240 {
5241         struct rb_node *n;
5242         struct dlm_rsb *r;
5243
5244         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5245         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5246                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5247
5248                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5249                         continue;
5250                 if (!is_master(r)) {
5251                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5252                         continue;
5253                 }
5254                 hold_rsb(r);
5255                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5256                 return r;
5257         }
5258         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5259         return NULL;
5260 }
5261
5262 /*
5263  * Attempt to grant locks on resources that we are the master of.
5264  * Locks may have become grantable during recovery because locks
5265  * from departed nodes have been purged (or not rebuilt), allowing
5266  * previously blocked locks to now be granted.  The subset of rsb's
5267  * we are interested in are those with lkb's on either the convert or
5268  * waiting queues.
5269  *
5270  * Simplest would be to go through each master rsb and check for non-empty
5271  * convert or waiting queues, and attempt to grant on those rsbs.
5272  * Checking the queues requires lock_rsb, though, for which we'd need
5273  * to release the rsbtbl lock.  This would make iterating through all
5274  * rsb's very inefficient.  So, we rely on earlier recovery routines
5275  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5276  * locks for.
5277  */
5278
5279 void dlm_recover_grant(struct dlm_ls *ls)
5280 {
5281         struct dlm_rsb *r;
5282         int bucket = 0;
5283         unsigned int count = 0;
5284         unsigned int rsb_count = 0;
5285         unsigned int lkb_count = 0;
5286
5287         while (1) {
5288                 r = find_grant_rsb(ls, bucket);
5289                 if (!r) {
5290                         if (bucket == ls->ls_rsbtbl_size - 1)
5291                                 break;
5292                         bucket++;
5293                         continue;
5294                 }
5295                 rsb_count++;
5296                 count = 0;
5297                 lock_rsb(r);
5298                 /* the RECOVER_GRANT flag is checked in the grant path */
5299                 grant_pending_locks(r, &count);
5300                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5301                 lkb_count += count;
5302                 confirm_master(r, 0);
5303                 unlock_rsb(r);
5304                 put_rsb(r);
5305                 cond_resched();
5306         }
5307
5308         if (lkb_count)
5309                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5310                           lkb_count, rsb_count);
5311 }
5312
5313 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5314                                          uint32_t remid)
5315 {
5316         struct dlm_lkb *lkb;
5317
5318         list_for_each_entry(lkb, head, lkb_statequeue) {
5319                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5320                         return lkb;
5321         }
5322         return NULL;
5323 }
5324
5325 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5326                                     uint32_t remid)
5327 {
5328         struct dlm_lkb *lkb;
5329
5330         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5331         if (lkb)
5332                 return lkb;
5333         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5334         if (lkb)
5335                 return lkb;
5336         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5337         if (lkb)
5338                 return lkb;
5339         return NULL;
5340 }
5341
5342 /* needs at least dlm_rcom + rcom_lock */
5343 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5344                                   struct dlm_rsb *r, const struct dlm_rcom *rc)
5345 {
5346         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5347
5348         lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5349         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5350         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5351         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5352         dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5353         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5354         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5355         lkb->lkb_rqmode = rl->rl_rqmode;
5356         lkb->lkb_grmode = rl->rl_grmode;
5357         /* don't set lkb_status because add_lkb wants to itself */
5358
5359         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5360         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5361
5362         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5363                 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5364                         sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5365                 if (lvblen > ls->ls_lvblen)
5366                         return -EINVAL;
5367                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5368                 if (!lkb->lkb_lvbptr)
5369                         return -ENOMEM;
5370                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5371         }
5372
5373         /* Conversions between PR and CW (middle modes) need special handling.
5374            The real granted mode of these converting locks cannot be determined
5375            until all locks have been rebuilt on the rsb (recover_conversion) */
5376
5377         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5378             middle_conversion(lkb)) {
5379                 rl->rl_status = DLM_LKSTS_CONVERT;
5380                 lkb->lkb_grmode = DLM_LOCK_IV;
5381                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5382         }
5383
5384         return 0;
5385 }
5386
5387 /* This lkb may have been recovered in a previous aborted recovery so we need
5388    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5389    If so we just send back a standard reply.  If not, we create a new lkb with
5390    the given values and send back our lkid.  We send back our lkid by sending
5391    back the rcom_lock struct we got but with the remid field filled in. */
5392
5393 /* needs at least dlm_rcom + rcom_lock */
5394 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5395                             __le32 *rl_remid, __le32 *rl_result)
5396 {
5397         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5398         struct dlm_rsb *r;
5399         struct dlm_lkb *lkb;
5400         uint32_t remid = 0;
5401         int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5402         int error;
5403
5404         /* init rl_remid with rcom lock rl_remid */
5405         *rl_remid = rl->rl_remid;
5406
5407         if (rl->rl_parent_lkid) {
5408                 error = -EOPNOTSUPP;
5409                 goto out;
5410         }
5411
5412         remid = le32_to_cpu(rl->rl_lkid);
5413
5414         /* In general we expect the rsb returned to be R_MASTER, but we don't
5415            have to require it.  Recovery of masters on one node can overlap
5416            recovery of locks on another node, so one node can send us MSTCPY
5417            locks before we've made ourselves master of this rsb.  We can still
5418            add new MSTCPY locks that we receive here without any harm; when
5419            we make ourselves master, dlm_recover_masters() won't touch the
5420            MSTCPY locks we've received early. */
5421
5422         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5423                          from_nodeid, R_RECEIVE_RECOVER, &r);
5424         if (error)
5425                 goto out;
5426
5427         lock_rsb(r);
5428
5429         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5430                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5431                           from_nodeid, remid);
5432                 error = -EBADR;
5433                 goto out_unlock;
5434         }
5435
5436         lkb = search_remid(r, from_nodeid, remid);
5437         if (lkb) {
5438                 error = -EEXIST;
5439                 goto out_remid;
5440         }
5441
5442         error = create_lkb(ls, &lkb);
5443         if (error)
5444                 goto out_unlock;
5445
5446         error = receive_rcom_lock_args(ls, lkb, r, rc);
5447         if (error) {
5448                 __put_lkb(ls, lkb);
5449                 goto out_unlock;
5450         }
5451
5452         attach_lkb(r, lkb);
5453         add_lkb(r, lkb, rl->rl_status);
5454         ls->ls_recover_locks_in++;
5455
5456         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5457                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5458
5459  out_remid:
5460         /* this is the new value returned to the lock holder for
5461            saving in its process-copy lkb */
5462         *rl_remid = cpu_to_le32(lkb->lkb_id);
5463
5464         lkb->lkb_recover_seq = ls->ls_recover_seq;
5465
5466  out_unlock:
5467         unlock_rsb(r);
5468         put_rsb(r);
5469  out:
5470         if (error && error != -EEXIST)
5471                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5472                           from_nodeid, remid, error);
5473         *rl_result = cpu_to_le32(error);
5474         return error;
5475 }
5476
5477 /* needs at least dlm_rcom + rcom_lock */
5478 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5479                              uint64_t seq)
5480 {
5481         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5482         struct dlm_rsb *r;
5483         struct dlm_lkb *lkb;
5484         uint32_t lkid, remid;
5485         int error, result;
5486
5487         lkid = le32_to_cpu(rl->rl_lkid);
5488         remid = le32_to_cpu(rl->rl_remid);
5489         result = le32_to_cpu(rl->rl_result);
5490
5491         error = find_lkb(ls, lkid, &lkb);
5492         if (error) {
5493                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5494                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5495                           result);
5496                 return error;
5497         }
5498
5499         r = lkb->lkb_resource;
5500         hold_rsb(r);
5501         lock_rsb(r);
5502
5503         if (!is_process_copy(lkb)) {
5504                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5505                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5506                           result);
5507                 dlm_dump_rsb(r);
5508                 unlock_rsb(r);
5509                 put_rsb(r);
5510                 dlm_put_lkb(lkb);
5511                 return -EINVAL;
5512         }
5513
5514         switch (result) {
5515         case -EBADR:
5516                 /* There's a chance the new master received our lock before
5517                    dlm_recover_master_reply(), this wouldn't happen if we did
5518                    a barrier between recover_masters and recover_locks. */
5519
5520                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5521                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5522                           result);
5523         
5524                 dlm_send_rcom_lock(r, lkb, seq);
5525                 goto out;
5526         case -EEXIST:
5527         case 0:
5528                 lkb->lkb_remid = remid;
5529                 break;
5530         default:
5531                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5532                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5533                           result);
5534         }
5535
5536         /* an ack for dlm_recover_locks() which waits for replies from
5537            all the locks it sends to new masters */
5538         dlm_recovered_lock(r);
5539  out:
5540         unlock_rsb(r);
5541         put_rsb(r);
5542         dlm_put_lkb(lkb);
5543
5544         return 0;
5545 }
5546
5547 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5548                      int mode, uint32_t flags, void *name, unsigned int namelen)
5549 {
5550         struct dlm_lkb *lkb;
5551         struct dlm_args args;
5552         bool do_put = true;
5553         int error;
5554
5555         dlm_lock_recovery(ls);
5556
5557         error = create_lkb(ls, &lkb);
5558         if (error) {
5559                 kfree(ua);
5560                 goto out;
5561         }
5562
5563         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5564
5565         if (flags & DLM_LKF_VALBLK) {
5566                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5567                 if (!ua->lksb.sb_lvbptr) {
5568                         kfree(ua);
5569                         error = -ENOMEM;
5570                         goto out_put;
5571                 }
5572         }
5573         error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5574                               fake_bastfn, &args);
5575         if (error) {
5576                 kfree(ua->lksb.sb_lvbptr);
5577                 ua->lksb.sb_lvbptr = NULL;
5578                 kfree(ua);
5579                 goto out_put;
5580         }
5581
5582         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5583            When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5584            lock and that lkb_astparam is the dlm_user_args structure. */
5585         set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5586         error = request_lock(ls, lkb, name, namelen, &args);
5587
5588         switch (error) {
5589         case 0:
5590                 break;
5591         case -EINPROGRESS:
5592                 error = 0;
5593                 break;
5594         case -EAGAIN:
5595                 error = 0;
5596                 fallthrough;
5597         default:
5598                 goto out_put;
5599         }
5600
5601         /* add this new lkb to the per-process list of locks */
5602         spin_lock(&ua->proc->locks_spin);
5603         hold_lkb(lkb);
5604         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5605         spin_unlock(&ua->proc->locks_spin);
5606         do_put = false;
5607  out_put:
5608         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5609         if (do_put)
5610                 __put_lkb(ls, lkb);
5611  out:
5612         dlm_unlock_recovery(ls);
5613         return error;
5614 }
5615
5616 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5617                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5618 {
5619         struct dlm_lkb *lkb;
5620         struct dlm_args args;
5621         struct dlm_user_args *ua;
5622         int error;
5623
5624         dlm_lock_recovery(ls);
5625
5626         error = find_lkb(ls, lkid, &lkb);
5627         if (error)
5628                 goto out;
5629
5630         trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5631
5632         /* user can change the params on its lock when it converts it, or
5633            add an lvb that didn't exist before */
5634
5635         ua = lkb->lkb_ua;
5636
5637         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5638                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5639                 if (!ua->lksb.sb_lvbptr) {
5640                         error = -ENOMEM;
5641                         goto out_put;
5642                 }
5643         }
5644         if (lvb_in && ua->lksb.sb_lvbptr)
5645                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5646
5647         ua->xid = ua_tmp->xid;
5648         ua->castparam = ua_tmp->castparam;
5649         ua->castaddr = ua_tmp->castaddr;
5650         ua->bastparam = ua_tmp->bastparam;
5651         ua->bastaddr = ua_tmp->bastaddr;
5652         ua->user_lksb = ua_tmp->user_lksb;
5653
5654         error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5655                               fake_bastfn, &args);
5656         if (error)
5657                 goto out_put;
5658
5659         error = convert_lock(ls, lkb, &args);
5660
5661         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5662                 error = 0;
5663  out_put:
5664         trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5665         dlm_put_lkb(lkb);
5666  out:
5667         dlm_unlock_recovery(ls);
5668         kfree(ua_tmp);
5669         return error;
5670 }
5671
5672 /*
5673  * The caller asks for an orphan lock on a given resource with a given mode.
5674  * If a matching lock exists, it's moved to the owner's list of locks and
5675  * the lkid is returned.
5676  */
5677
5678 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5679                      int mode, uint32_t flags, void *name, unsigned int namelen,
5680                      uint32_t *lkid)
5681 {
5682         struct dlm_lkb *lkb = NULL, *iter;
5683         struct dlm_user_args *ua;
5684         int found_other_mode = 0;
5685         int rv = 0;
5686
5687         mutex_lock(&ls->ls_orphans_mutex);
5688         list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5689                 if (iter->lkb_resource->res_length != namelen)
5690                         continue;
5691                 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5692                         continue;
5693                 if (iter->lkb_grmode != mode) {
5694                         found_other_mode = 1;
5695                         continue;
5696                 }
5697
5698                 lkb = iter;
5699                 list_del_init(&iter->lkb_ownqueue);
5700                 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5701                 *lkid = iter->lkb_id;
5702                 break;
5703         }
5704         mutex_unlock(&ls->ls_orphans_mutex);
5705
5706         if (!lkb && found_other_mode) {
5707                 rv = -EAGAIN;
5708                 goto out;
5709         }
5710
5711         if (!lkb) {
5712                 rv = -ENOENT;
5713                 goto out;
5714         }
5715
5716         lkb->lkb_exflags = flags;
5717         lkb->lkb_ownpid = (int) current->pid;
5718
5719         ua = lkb->lkb_ua;
5720
5721         ua->proc = ua_tmp->proc;
5722         ua->xid = ua_tmp->xid;
5723         ua->castparam = ua_tmp->castparam;
5724         ua->castaddr = ua_tmp->castaddr;
5725         ua->bastparam = ua_tmp->bastparam;
5726         ua->bastaddr = ua_tmp->bastaddr;
5727         ua->user_lksb = ua_tmp->user_lksb;
5728
5729         /*
5730          * The lkb reference from the ls_orphans list was not
5731          * removed above, and is now considered the reference
5732          * for the proc locks list.
5733          */
5734
5735         spin_lock(&ua->proc->locks_spin);
5736         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5737         spin_unlock(&ua->proc->locks_spin);
5738  out:
5739         kfree(ua_tmp);
5740         return rv;
5741 }
5742
5743 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5744                     uint32_t flags, uint32_t lkid, char *lvb_in)
5745 {
5746         struct dlm_lkb *lkb;
5747         struct dlm_args args;
5748         struct dlm_user_args *ua;
5749         int error;
5750
5751         dlm_lock_recovery(ls);
5752
5753         error = find_lkb(ls, lkid, &lkb);
5754         if (error)
5755                 goto out;
5756
5757         trace_dlm_unlock_start(ls, lkb, flags);
5758
5759         ua = lkb->lkb_ua;
5760
5761         if (lvb_in && ua->lksb.sb_lvbptr)
5762                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5763         if (ua_tmp->castparam)
5764                 ua->castparam = ua_tmp->castparam;
5765         ua->user_lksb = ua_tmp->user_lksb;
5766
5767         error = set_unlock_args(flags, ua, &args);
5768         if (error)
5769                 goto out_put;
5770
5771         error = unlock_lock(ls, lkb, &args);
5772
5773         if (error == -DLM_EUNLOCK)
5774                 error = 0;
5775         /* from validate_unlock_args() */
5776         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5777                 error = 0;
5778         if (error)
5779                 goto out_put;
5780
5781         spin_lock(&ua->proc->locks_spin);
5782         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5783         if (!list_empty(&lkb->lkb_ownqueue))
5784                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5785         spin_unlock(&ua->proc->locks_spin);
5786  out_put:
5787         trace_dlm_unlock_end(ls, lkb, flags, error);
5788         dlm_put_lkb(lkb);
5789  out:
5790         dlm_unlock_recovery(ls);
5791         kfree(ua_tmp);
5792         return error;
5793 }
5794
5795 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5796                     uint32_t flags, uint32_t lkid)
5797 {
5798         struct dlm_lkb *lkb;
5799         struct dlm_args args;
5800         struct dlm_user_args *ua;
5801         int error;
5802
5803         dlm_lock_recovery(ls);
5804
5805         error = find_lkb(ls, lkid, &lkb);
5806         if (error)
5807                 goto out;
5808
5809         trace_dlm_unlock_start(ls, lkb, flags);
5810
5811         ua = lkb->lkb_ua;
5812         if (ua_tmp->castparam)
5813                 ua->castparam = ua_tmp->castparam;
5814         ua->user_lksb = ua_tmp->user_lksb;
5815
5816         error = set_unlock_args(flags, ua, &args);
5817         if (error)
5818                 goto out_put;
5819
5820         error = cancel_lock(ls, lkb, &args);
5821
5822         if (error == -DLM_ECANCEL)
5823                 error = 0;
5824         /* from validate_unlock_args() */
5825         if (error == -EBUSY)
5826                 error = 0;
5827  out_put:
5828         trace_dlm_unlock_end(ls, lkb, flags, error);
5829         dlm_put_lkb(lkb);
5830  out:
5831         dlm_unlock_recovery(ls);
5832         kfree(ua_tmp);
5833         return error;
5834 }
5835
5836 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5837 {
5838         struct dlm_lkb *lkb;
5839         struct dlm_args args;
5840         struct dlm_user_args *ua;
5841         struct dlm_rsb *r;
5842         int error;
5843
5844         dlm_lock_recovery(ls);
5845
5846         error = find_lkb(ls, lkid, &lkb);
5847         if (error)
5848                 goto out;
5849
5850         trace_dlm_unlock_start(ls, lkb, flags);
5851
5852         ua = lkb->lkb_ua;
5853
5854         error = set_unlock_args(flags, ua, &args);
5855         if (error)
5856                 goto out_put;
5857
5858         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5859
5860         r = lkb->lkb_resource;
5861         hold_rsb(r);
5862         lock_rsb(r);
5863
5864         error = validate_unlock_args(lkb, &args);
5865         if (error)
5866                 goto out_r;
5867         set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
5868
5869         error = _cancel_lock(r, lkb);
5870  out_r:
5871         unlock_rsb(r);
5872         put_rsb(r);
5873
5874         if (error == -DLM_ECANCEL)
5875                 error = 0;
5876         /* from validate_unlock_args() */
5877         if (error == -EBUSY)
5878                 error = 0;
5879  out_put:
5880         trace_dlm_unlock_end(ls, lkb, flags, error);
5881         dlm_put_lkb(lkb);
5882  out:
5883         dlm_unlock_recovery(ls);
5884         return error;
5885 }
5886
5887 /* lkb's that are removed from the waiters list by revert are just left on the
5888    orphans list with the granted orphan locks, to be freed by purge */
5889
5890 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5891 {
5892         struct dlm_args args;
5893         int error;
5894
5895         hold_lkb(lkb); /* reference for the ls_orphans list */
5896         mutex_lock(&ls->ls_orphans_mutex);
5897         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5898         mutex_unlock(&ls->ls_orphans_mutex);
5899
5900         set_unlock_args(0, lkb->lkb_ua, &args);
5901
5902         error = cancel_lock(ls, lkb, &args);
5903         if (error == -DLM_ECANCEL)
5904                 error = 0;
5905         return error;
5906 }
5907
5908 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
5909    granted.  Regardless of what rsb queue the lock is on, it's removed and
5910    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
5911    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
5912
5913 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5914 {
5915         struct dlm_args args;
5916         int error;
5917
5918         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
5919                         lkb->lkb_ua, &args);
5920
5921         error = unlock_lock(ls, lkb, &args);
5922         if (error == -DLM_EUNLOCK)
5923                 error = 0;
5924         return error;
5925 }
5926
5927 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5928    (which does lock_rsb) due to deadlock with receiving a message that does
5929    lock_rsb followed by dlm_user_add_cb() */
5930
5931 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5932                                      struct dlm_user_proc *proc)
5933 {
5934         struct dlm_lkb *lkb = NULL;
5935
5936         spin_lock(&ls->ls_clear_proc_locks);
5937         if (list_empty(&proc->locks))
5938                 goto out;
5939
5940         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5941         list_del_init(&lkb->lkb_ownqueue);
5942
5943         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5944                 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
5945         else
5946                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5947  out:
5948         spin_unlock(&ls->ls_clear_proc_locks);
5949         return lkb;
5950 }
5951
5952 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5953    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5954    which we clear here. */
5955
5956 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5957    list, and no more device_writes should add lkb's to proc->locks list; so we
5958    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5959    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5960    them ourself. */
5961
5962 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5963 {
5964         struct dlm_lkb *lkb, *safe;
5965
5966         dlm_lock_recovery(ls);
5967
5968         while (1) {
5969                 lkb = del_proc_lock(ls, proc);
5970                 if (!lkb)
5971                         break;
5972                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5973                         orphan_proc_lock(ls, lkb);
5974                 else
5975                         unlock_proc_lock(ls, lkb);
5976
5977                 /* this removes the reference for the proc->locks list
5978                    added by dlm_user_request, it may result in the lkb
5979                    being freed */
5980
5981                 dlm_put_lkb(lkb);
5982         }
5983
5984         spin_lock(&ls->ls_clear_proc_locks);
5985
5986         /* in-progress unlocks */
5987         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5988                 list_del_init(&lkb->lkb_ownqueue);
5989                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5990                 dlm_put_lkb(lkb);
5991         }
5992
5993         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5994                 dlm_purge_lkb_callbacks(lkb);
5995                 list_del_init(&lkb->lkb_cb_list);
5996                 dlm_put_lkb(lkb);
5997         }
5998
5999         spin_unlock(&ls->ls_clear_proc_locks);
6000         dlm_unlock_recovery(ls);
6001 }
6002
6003 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6004 {
6005         struct dlm_lkb *lkb, *safe;
6006
6007         while (1) {
6008                 lkb = NULL;
6009                 spin_lock(&proc->locks_spin);
6010                 if (!list_empty(&proc->locks)) {
6011                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6012                                          lkb_ownqueue);
6013                         list_del_init(&lkb->lkb_ownqueue);
6014                 }
6015                 spin_unlock(&proc->locks_spin);
6016
6017                 if (!lkb)
6018                         break;
6019
6020                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6021                 unlock_proc_lock(ls, lkb);
6022                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6023         }
6024
6025         spin_lock(&proc->locks_spin);
6026         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6027                 list_del_init(&lkb->lkb_ownqueue);
6028                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6029                 dlm_put_lkb(lkb);
6030         }
6031         spin_unlock(&proc->locks_spin);
6032
6033         spin_lock(&proc->asts_spin);
6034         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6035                 dlm_purge_lkb_callbacks(lkb);
6036                 list_del_init(&lkb->lkb_cb_list);
6037                 dlm_put_lkb(lkb);
6038         }
6039         spin_unlock(&proc->asts_spin);
6040 }
6041
6042 /* pid of 0 means purge all orphans */
6043
6044 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6045 {
6046         struct dlm_lkb *lkb, *safe;
6047
6048         mutex_lock(&ls->ls_orphans_mutex);
6049         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6050                 if (pid && lkb->lkb_ownpid != pid)
6051                         continue;
6052                 unlock_proc_lock(ls, lkb);
6053                 list_del_init(&lkb->lkb_ownqueue);
6054                 dlm_put_lkb(lkb);
6055         }
6056         mutex_unlock(&ls->ls_orphans_mutex);
6057 }
6058
6059 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6060 {
6061         struct dlm_message *ms;
6062         struct dlm_mhandle *mh;
6063         int error;
6064
6065         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6066                                 DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
6067         if (error)
6068                 return error;
6069         ms->m_nodeid = cpu_to_le32(nodeid);
6070         ms->m_pid = cpu_to_le32(pid);
6071
6072         return send_message(mh, ms, NULL, 0);
6073 }
6074
6075 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6076                    int nodeid, int pid)
6077 {
6078         int error = 0;
6079
6080         if (nodeid && (nodeid != dlm_our_nodeid())) {
6081                 error = send_purge(ls, nodeid, pid);
6082         } else {
6083                 dlm_lock_recovery(ls);
6084                 if (pid == current->pid)
6085                         purge_proc_locks(ls, proc);
6086                 else
6087                         do_purge(ls, nodeid, pid);
6088                 dlm_unlock_recovery(ls);
6089         }
6090         return error;
6091 }
6092
6093 /* debug functionality */
6094 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6095                       int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6096 {
6097         struct dlm_lksb *lksb;
6098         struct dlm_lkb *lkb;
6099         struct dlm_rsb *r;
6100         int error;
6101
6102         /* we currently can't set a valid user lock */
6103         if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6104                 return -EOPNOTSUPP;
6105
6106         lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6107         if (!lksb)
6108                 return -ENOMEM;
6109
6110         error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6111         if (error) {
6112                 kfree(lksb);
6113                 return error;
6114         }
6115
6116         dlm_set_dflags_val(lkb, lkb_dflags);
6117         lkb->lkb_nodeid = lkb_nodeid;
6118         lkb->lkb_lksb = lksb;
6119         /* user specific pointer, just don't have it NULL for kernel locks */
6120         if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6121                 lkb->lkb_astparam = (void *)0xDEADBEEF;
6122
6123         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6124         if (error) {
6125                 kfree(lksb);
6126                 __put_lkb(ls, lkb);
6127                 return error;
6128         }
6129
6130         lock_rsb(r);
6131         attach_lkb(r, lkb);
6132         add_lkb(r, lkb, lkb_status);
6133         unlock_rsb(r);
6134         put_rsb(r);
6135
6136         return 0;
6137 }
6138
6139 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6140                                  int mstype, int to_nodeid)
6141 {
6142         struct dlm_lkb *lkb;
6143         int error;
6144
6145         error = find_lkb(ls, lkb_id, &lkb);
6146         if (error)
6147                 return error;
6148
6149         error = add_to_waiters(lkb, mstype, to_nodeid);
6150         dlm_put_lkb(lkb);
6151         return error;
6152 }
6153