Merge tag 'isci-for-3.5' into misc
[platform/adaptation/renesas_rcar/renesas_kernel.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
164                "     status %d rqmode %d grmode %d wait_type %d\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type);
168 }
169
170 static void dlm_print_rsb(struct dlm_rsb *r)
171 {
172         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
173                r->res_nodeid, r->res_flags, r->res_first_lkid,
174                r->res_recover_locks_count, r->res_name);
175 }
176
177 void dlm_dump_rsb(struct dlm_rsb *r)
178 {
179         struct dlm_lkb *lkb;
180
181         dlm_print_rsb(r);
182
183         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
184                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
185         printk(KERN_ERR "rsb lookup list\n");
186         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
187                 dlm_print_lkb(lkb);
188         printk(KERN_ERR "rsb grant queue:\n");
189         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb convert queue:\n");
192         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194         printk(KERN_ERR "rsb wait queue:\n");
195         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
196                 dlm_print_lkb(lkb);
197 }
198
199 /* Threads cannot use the lockspace while it's being recovered */
200
201 static inline void dlm_lock_recovery(struct dlm_ls *ls)
202 {
203         down_read(&ls->ls_in_recovery);
204 }
205
206 void dlm_unlock_recovery(struct dlm_ls *ls)
207 {
208         up_read(&ls->ls_in_recovery);
209 }
210
211 int dlm_lock_recovery_try(struct dlm_ls *ls)
212 {
213         return down_read_trylock(&ls->ls_in_recovery);
214 }
215
216 static inline int can_be_queued(struct dlm_lkb *lkb)
217 {
218         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
219 }
220
221 static inline int force_blocking_asts(struct dlm_lkb *lkb)
222 {
223         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
224 }
225
226 static inline int is_demoted(struct dlm_lkb *lkb)
227 {
228         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
229 }
230
231 static inline int is_altmode(struct dlm_lkb *lkb)
232 {
233         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
234 }
235
236 static inline int is_granted(struct dlm_lkb *lkb)
237 {
238         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
239 }
240
241 static inline int is_remote(struct dlm_rsb *r)
242 {
243         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
244         return !!r->res_nodeid;
245 }
246
247 static inline int is_process_copy(struct dlm_lkb *lkb)
248 {
249         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
250 }
251
252 static inline int is_master_copy(struct dlm_lkb *lkb)
253 {
254         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
256         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257 }
258
259 static inline int middle_conversion(struct dlm_lkb *lkb)
260 {
261         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
263                 return 1;
264         return 0;
265 }
266
267 static inline int down_conversion(struct dlm_lkb *lkb)
268 {
269         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
270 }
271
272 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273 {
274         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
275 }
276
277 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278 {
279         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
280 }
281
282 static inline int is_overlap(struct dlm_lkb *lkb)
283 {
284         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
285                                   DLM_IFL_OVERLAP_CANCEL));
286 }
287
288 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289 {
290         if (is_master_copy(lkb))
291                 return;
292
293         del_timeout(lkb);
294
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
298            timeout caused the cancel then return -ETIMEDOUT */
299         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
300                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
301                 rv = -ETIMEDOUT;
302         }
303
304         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
305                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
306                 rv = -EDEADLK;
307         }
308
309         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
310 }
311
312 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
313 {
314         queue_cast(r, lkb,
315                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
316 }
317
318 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319 {
320         if (is_master_copy(lkb)) {
321                 send_bast(r, lkb, rqmode);
322         } else {
323                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
324         }
325 }
326
327 /*
328  * Basic operations on rsb's and lkb's
329  */
330
331 static int pre_rsb_struct(struct dlm_ls *ls)
332 {
333         struct dlm_rsb *r1, *r2;
334         int count = 0;
335
336         spin_lock(&ls->ls_new_rsb_spin);
337         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
338                 spin_unlock(&ls->ls_new_rsb_spin);
339                 return 0;
340         }
341         spin_unlock(&ls->ls_new_rsb_spin);
342
343         r1 = dlm_allocate_rsb(ls);
344         r2 = dlm_allocate_rsb(ls);
345
346         spin_lock(&ls->ls_new_rsb_spin);
347         if (r1) {
348                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
349                 ls->ls_new_rsb_count++;
350         }
351         if (r2) {
352                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
353                 ls->ls_new_rsb_count++;
354         }
355         count = ls->ls_new_rsb_count;
356         spin_unlock(&ls->ls_new_rsb_spin);
357
358         if (!count)
359                 return -ENOMEM;
360         return 0;
361 }
362
363 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
364    unlock any spinlocks, go back and call pre_rsb_struct again.
365    Otherwise, take an rsb off the list and return it. */
366
367 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
368                           struct dlm_rsb **r_ret)
369 {
370         struct dlm_rsb *r;
371         int count;
372
373         spin_lock(&ls->ls_new_rsb_spin);
374         if (list_empty(&ls->ls_new_rsb)) {
375                 count = ls->ls_new_rsb_count;
376                 spin_unlock(&ls->ls_new_rsb_spin);
377                 log_debug(ls, "find_rsb retry %d %d %s",
378                           count, dlm_config.ci_new_rsb_count, name);
379                 return -EAGAIN;
380         }
381
382         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
383         list_del(&r->res_hashchain);
384         /* Convert the empty list_head to a NULL rb_node for tree usage: */
385         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
386         ls->ls_new_rsb_count--;
387         spin_unlock(&ls->ls_new_rsb_spin);
388
389         r->res_ls = ls;
390         r->res_length = len;
391         memcpy(r->res_name, name, len);
392         mutex_init(&r->res_mutex);
393
394         INIT_LIST_HEAD(&r->res_lookup);
395         INIT_LIST_HEAD(&r->res_grantqueue);
396         INIT_LIST_HEAD(&r->res_convertqueue);
397         INIT_LIST_HEAD(&r->res_waitqueue);
398         INIT_LIST_HEAD(&r->res_root_list);
399         INIT_LIST_HEAD(&r->res_recover_list);
400
401         *r_ret = r;
402         return 0;
403 }
404
405 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
406 {
407         char maxname[DLM_RESNAME_MAXLEN];
408
409         memset(maxname, 0, DLM_RESNAME_MAXLEN);
410         memcpy(maxname, name, nlen);
411         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
412 }
413
414 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
415                         unsigned int flags, struct dlm_rsb **r_ret)
416 {
417         struct rb_node *node = tree->rb_node;
418         struct dlm_rsb *r;
419         int error = 0;
420         int rc;
421
422         while (node) {
423                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
424                 rc = rsb_cmp(r, name, len);
425                 if (rc < 0)
426                         node = node->rb_left;
427                 else if (rc > 0)
428                         node = node->rb_right;
429                 else
430                         goto found;
431         }
432         *r_ret = NULL;
433         return -EBADR;
434
435  found:
436         if (r->res_nodeid && (flags & R_MASTER))
437                 error = -ENOTBLK;
438         *r_ret = r;
439         return error;
440 }
441
442 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
443 {
444         struct rb_node **newn = &tree->rb_node;
445         struct rb_node *parent = NULL;
446         int rc;
447
448         while (*newn) {
449                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
450                                                res_hashnode);
451
452                 parent = *newn;
453                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
454                 if (rc < 0)
455                         newn = &parent->rb_left;
456                 else if (rc > 0)
457                         newn = &parent->rb_right;
458                 else {
459                         log_print("rsb_insert match");
460                         dlm_dump_rsb(rsb);
461                         dlm_dump_rsb(cur);
462                         return -EEXIST;
463                 }
464         }
465
466         rb_link_node(&rsb->res_hashnode, parent, newn);
467         rb_insert_color(&rsb->res_hashnode, tree);
468         return 0;
469 }
470
471 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
472                        unsigned int flags, struct dlm_rsb **r_ret)
473 {
474         struct dlm_rsb *r;
475         int error;
476
477         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
478         if (!error) {
479                 kref_get(&r->res_ref);
480                 goto out;
481         }
482         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
483         if (error)
484                 goto out;
485
486         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
487         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
488         if (error)
489                 return error;
490
491         if (dlm_no_directory(ls))
492                 goto out;
493
494         if (r->res_nodeid == -1) {
495                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
496                 r->res_first_lkid = 0;
497         } else if (r->res_nodeid > 0) {
498                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
499                 r->res_first_lkid = 0;
500         } else {
501                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
502                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
503         }
504  out:
505         *r_ret = r;
506         return error;
507 }
508
509 /*
510  * Find rsb in rsbtbl and potentially create/add one
511  *
512  * Delaying the release of rsb's has a similar benefit to applications keeping
513  * NL locks on an rsb, but without the guarantee that the cached master value
514  * will still be valid when the rsb is reused.  Apps aren't always smart enough
515  * to keep NL locks on an rsb that they may lock again shortly; this can lead
516  * to excessive master lookups and removals if we don't delay the release.
517  *
518  * Searching for an rsb means looking through both the normal list and toss
519  * list.  When found on the toss list the rsb is moved to the normal list with
520  * ref count of 1; when found on normal list the ref count is incremented.
521  */
522
523 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
524                     unsigned int flags, struct dlm_rsb **r_ret)
525 {
526         struct dlm_rsb *r = NULL;
527         uint32_t hash, bucket;
528         int error;
529
530         if (namelen > DLM_RESNAME_MAXLEN) {
531                 error = -EINVAL;
532                 goto out;
533         }
534
535         if (dlm_no_directory(ls))
536                 flags |= R_CREATE;
537
538         hash = jhash(name, namelen, 0);
539         bucket = hash & (ls->ls_rsbtbl_size - 1);
540
541  retry:
542         if (flags & R_CREATE) {
543                 error = pre_rsb_struct(ls);
544                 if (error < 0)
545                         goto out;
546         }
547
548         spin_lock(&ls->ls_rsbtbl[bucket].lock);
549
550         error = _search_rsb(ls, name, namelen, bucket, flags, &r);
551         if (!error)
552                 goto out_unlock;
553
554         if (error == -EBADR && !(flags & R_CREATE))
555                 goto out_unlock;
556
557         /* the rsb was found but wasn't a master copy */
558         if (error == -ENOTBLK)
559                 goto out_unlock;
560
561         error = get_rsb_struct(ls, name, namelen, &r);
562         if (error == -EAGAIN) {
563                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
564                 goto retry;
565         }
566         if (error)
567                 goto out_unlock;
568
569         r->res_hash = hash;
570         r->res_bucket = bucket;
571         r->res_nodeid = -1;
572         kref_init(&r->res_ref);
573
574         /* With no directory, the master can be set immediately */
575         if (dlm_no_directory(ls)) {
576                 int nodeid = dlm_dir_nodeid(r);
577                 if (nodeid == dlm_our_nodeid())
578                         nodeid = 0;
579                 r->res_nodeid = nodeid;
580         }
581         error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
582  out_unlock:
583         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
584  out:
585         *r_ret = r;
586         return error;
587 }
588
589 /* This is only called to add a reference when the code already holds
590    a valid reference to the rsb, so there's no need for locking. */
591
592 static inline void hold_rsb(struct dlm_rsb *r)
593 {
594         kref_get(&r->res_ref);
595 }
596
597 void dlm_hold_rsb(struct dlm_rsb *r)
598 {
599         hold_rsb(r);
600 }
601
602 static void toss_rsb(struct kref *kref)
603 {
604         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
605         struct dlm_ls *ls = r->res_ls;
606
607         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
608         kref_init(&r->res_ref);
609         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
610         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
611         r->res_toss_time = jiffies;
612         if (r->res_lvbptr) {
613                 dlm_free_lvb(r->res_lvbptr);
614                 r->res_lvbptr = NULL;
615         }
616 }
617
618 /* When all references to the rsb are gone it's transferred to
619    the tossed list for later disposal. */
620
621 static void put_rsb(struct dlm_rsb *r)
622 {
623         struct dlm_ls *ls = r->res_ls;
624         uint32_t bucket = r->res_bucket;
625
626         spin_lock(&ls->ls_rsbtbl[bucket].lock);
627         kref_put(&r->res_ref, toss_rsb);
628         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
629 }
630
631 void dlm_put_rsb(struct dlm_rsb *r)
632 {
633         put_rsb(r);
634 }
635
636 /* See comment for unhold_lkb */
637
638 static void unhold_rsb(struct dlm_rsb *r)
639 {
640         int rv;
641         rv = kref_put(&r->res_ref, toss_rsb);
642         DLM_ASSERT(!rv, dlm_dump_rsb(r););
643 }
644
645 static void kill_rsb(struct kref *kref)
646 {
647         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
648
649         /* All work is done after the return from kref_put() so we
650            can release the write_lock before the remove and free. */
651
652         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
653         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
654         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
655         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
656         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
657         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
658 }
659
660 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
661    The rsb must exist as long as any lkb's for it do. */
662
663 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
664 {
665         hold_rsb(r);
666         lkb->lkb_resource = r;
667 }
668
669 static void detach_lkb(struct dlm_lkb *lkb)
670 {
671         if (lkb->lkb_resource) {
672                 put_rsb(lkb->lkb_resource);
673                 lkb->lkb_resource = NULL;
674         }
675 }
676
677 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
678 {
679         struct dlm_lkb *lkb;
680         int rv, id;
681
682         lkb = dlm_allocate_lkb(ls);
683         if (!lkb)
684                 return -ENOMEM;
685
686         lkb->lkb_nodeid = -1;
687         lkb->lkb_grmode = DLM_LOCK_IV;
688         kref_init(&lkb->lkb_ref);
689         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
690         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
691         INIT_LIST_HEAD(&lkb->lkb_time_list);
692         INIT_LIST_HEAD(&lkb->lkb_cb_list);
693         mutex_init(&lkb->lkb_cb_mutex);
694         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
695
696  retry:
697         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
698         if (!rv)
699                 return -ENOMEM;
700
701         spin_lock(&ls->ls_lkbidr_spin);
702         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
703         if (!rv)
704                 lkb->lkb_id = id;
705         spin_unlock(&ls->ls_lkbidr_spin);
706
707         if (rv == -EAGAIN)
708                 goto retry;
709
710         if (rv < 0) {
711                 log_error(ls, "create_lkb idr error %d", rv);
712                 return rv;
713         }
714
715         *lkb_ret = lkb;
716         return 0;
717 }
718
719 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
720 {
721         struct dlm_lkb *lkb;
722
723         spin_lock(&ls->ls_lkbidr_spin);
724         lkb = idr_find(&ls->ls_lkbidr, lkid);
725         if (lkb)
726                 kref_get(&lkb->lkb_ref);
727         spin_unlock(&ls->ls_lkbidr_spin);
728
729         *lkb_ret = lkb;
730         return lkb ? 0 : -ENOENT;
731 }
732
733 static void kill_lkb(struct kref *kref)
734 {
735         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
736
737         /* All work is done after the return from kref_put() so we
738            can release the write_lock before the detach_lkb */
739
740         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741 }
742
743 /* __put_lkb() is used when an lkb may not have an rsb attached to
744    it so we need to provide the lockspace explicitly */
745
746 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
747 {
748         uint32_t lkid = lkb->lkb_id;
749
750         spin_lock(&ls->ls_lkbidr_spin);
751         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
752                 idr_remove(&ls->ls_lkbidr, lkid);
753                 spin_unlock(&ls->ls_lkbidr_spin);
754
755                 detach_lkb(lkb);
756
757                 /* for local/process lkbs, lvbptr points to caller's lksb */
758                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
759                         dlm_free_lvb(lkb->lkb_lvbptr);
760                 dlm_free_lkb(lkb);
761                 return 1;
762         } else {
763                 spin_unlock(&ls->ls_lkbidr_spin);
764                 return 0;
765         }
766 }
767
768 int dlm_put_lkb(struct dlm_lkb *lkb)
769 {
770         struct dlm_ls *ls;
771
772         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
773         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
774
775         ls = lkb->lkb_resource->res_ls;
776         return __put_lkb(ls, lkb);
777 }
778
779 /* This is only called to add a reference when the code already holds
780    a valid reference to the lkb, so there's no need for locking. */
781
782 static inline void hold_lkb(struct dlm_lkb *lkb)
783 {
784         kref_get(&lkb->lkb_ref);
785 }
786
787 /* This is called when we need to remove a reference and are certain
788    it's not the last ref.  e.g. del_lkb is always called between a
789    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
790    put_lkb would work fine, but would involve unnecessary locking */
791
792 static inline void unhold_lkb(struct dlm_lkb *lkb)
793 {
794         int rv;
795         rv = kref_put(&lkb->lkb_ref, kill_lkb);
796         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
797 }
798
799 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
800                             int mode)
801 {
802         struct dlm_lkb *lkb = NULL;
803
804         list_for_each_entry(lkb, head, lkb_statequeue)
805                 if (lkb->lkb_rqmode < mode)
806                         break;
807
808         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
809 }
810
811 /* add/remove lkb to rsb's grant/convert/wait queue */
812
813 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
814 {
815         kref_get(&lkb->lkb_ref);
816
817         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
818
819         lkb->lkb_timestamp = ktime_get();
820
821         lkb->lkb_status = status;
822
823         switch (status) {
824         case DLM_LKSTS_WAITING:
825                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
826                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
827                 else
828                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
829                 break;
830         case DLM_LKSTS_GRANTED:
831                 /* convention says granted locks kept in order of grmode */
832                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
833                                 lkb->lkb_grmode);
834                 break;
835         case DLM_LKSTS_CONVERT:
836                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
837                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
838                 else
839                         list_add_tail(&lkb->lkb_statequeue,
840                                       &r->res_convertqueue);
841                 break;
842         default:
843                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
844         }
845 }
846
847 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
848 {
849         lkb->lkb_status = 0;
850         list_del(&lkb->lkb_statequeue);
851         unhold_lkb(lkb);
852 }
853
854 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
855 {
856         hold_lkb(lkb);
857         del_lkb(r, lkb);
858         add_lkb(r, lkb, sts);
859         unhold_lkb(lkb);
860 }
861
862 static int msg_reply_type(int mstype)
863 {
864         switch (mstype) {
865         case DLM_MSG_REQUEST:
866                 return DLM_MSG_REQUEST_REPLY;
867         case DLM_MSG_CONVERT:
868                 return DLM_MSG_CONVERT_REPLY;
869         case DLM_MSG_UNLOCK:
870                 return DLM_MSG_UNLOCK_REPLY;
871         case DLM_MSG_CANCEL:
872                 return DLM_MSG_CANCEL_REPLY;
873         case DLM_MSG_LOOKUP:
874                 return DLM_MSG_LOOKUP_REPLY;
875         }
876         return -1;
877 }
878
879 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
880 {
881         int i;
882
883         for (i = 0; i < num_nodes; i++) {
884                 if (!warned[i]) {
885                         warned[i] = nodeid;
886                         return 0;
887                 }
888                 if (warned[i] == nodeid)
889                         return 1;
890         }
891         return 0;
892 }
893
894 void dlm_scan_waiters(struct dlm_ls *ls)
895 {
896         struct dlm_lkb *lkb;
897         ktime_t zero = ktime_set(0, 0);
898         s64 us;
899         s64 debug_maxus = 0;
900         u32 debug_scanned = 0;
901         u32 debug_expired = 0;
902         int num_nodes = 0;
903         int *warned = NULL;
904
905         if (!dlm_config.ci_waitwarn_us)
906                 return;
907
908         mutex_lock(&ls->ls_waiters_mutex);
909
910         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
911                 if (ktime_equal(lkb->lkb_wait_time, zero))
912                         continue;
913
914                 debug_scanned++;
915
916                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
917
918                 if (us < dlm_config.ci_waitwarn_us)
919                         continue;
920
921                 lkb->lkb_wait_time = zero;
922
923                 debug_expired++;
924                 if (us > debug_maxus)
925                         debug_maxus = us;
926
927                 if (!num_nodes) {
928                         num_nodes = ls->ls_num_nodes;
929                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
930                 }
931                 if (!warned)
932                         continue;
933                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
934                         continue;
935
936                 log_error(ls, "waitwarn %x %lld %d us check connection to "
937                           "node %d", lkb->lkb_id, (long long)us,
938                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
939         }
940         mutex_unlock(&ls->ls_waiters_mutex);
941         kfree(warned);
942
943         if (debug_expired)
944                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
945                           debug_scanned, debug_expired,
946                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
947 }
948
949 /* add/remove lkb from global waiters list of lkb's waiting for
950    a reply from a remote node */
951
952 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
953 {
954         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
955         int error = 0;
956
957         mutex_lock(&ls->ls_waiters_mutex);
958
959         if (is_overlap_unlock(lkb) ||
960             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
961                 error = -EINVAL;
962                 goto out;
963         }
964
965         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
966                 switch (mstype) {
967                 case DLM_MSG_UNLOCK:
968                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
969                         break;
970                 case DLM_MSG_CANCEL:
971                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
972                         break;
973                 default:
974                         error = -EBUSY;
975                         goto out;
976                 }
977                 lkb->lkb_wait_count++;
978                 hold_lkb(lkb);
979
980                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
981                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
982                           lkb->lkb_wait_count, lkb->lkb_flags);
983                 goto out;
984         }
985
986         DLM_ASSERT(!lkb->lkb_wait_count,
987                    dlm_print_lkb(lkb);
988                    printk("wait_count %d\n", lkb->lkb_wait_count););
989
990         lkb->lkb_wait_count++;
991         lkb->lkb_wait_type = mstype;
992         lkb->lkb_wait_time = ktime_get();
993         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
994         hold_lkb(lkb);
995         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
996  out:
997         if (error)
998                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
999                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1000                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1001         mutex_unlock(&ls->ls_waiters_mutex);
1002         return error;
1003 }
1004
1005 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1006    list as part of process_requestqueue (e.g. a lookup that has an optimized
1007    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1008    set RESEND and dlm_recover_waiters_post() */
1009
1010 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1011                                 struct dlm_message *ms)
1012 {
1013         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1014         int overlap_done = 0;
1015
1016         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1017                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1018                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1019                 overlap_done = 1;
1020                 goto out_del;
1021         }
1022
1023         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1024                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1025                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1026                 overlap_done = 1;
1027                 goto out_del;
1028         }
1029
1030         /* Cancel state was preemptively cleared by a successful convert,
1031            see next comment, nothing to do. */
1032
1033         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1034             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1035                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1036                           lkb->lkb_id, lkb->lkb_wait_type);
1037                 return -1;
1038         }
1039
1040         /* Remove for the convert reply, and premptively remove for the
1041            cancel reply.  A convert has been granted while there's still
1042            an outstanding cancel on it (the cancel is moot and the result
1043            in the cancel reply should be 0).  We preempt the cancel reply
1044            because the app gets the convert result and then can follow up
1045            with another op, like convert.  This subsequent op would see the
1046            lingering state of the cancel and fail with -EBUSY. */
1047
1048         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1049             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1050             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1051                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1052                           lkb->lkb_id);
1053                 lkb->lkb_wait_type = 0;
1054                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1055                 lkb->lkb_wait_count--;
1056                 goto out_del;
1057         }
1058
1059         /* N.B. type of reply may not always correspond to type of original
1060            msg due to lookup->request optimization, verify others? */
1061
1062         if (lkb->lkb_wait_type) {
1063                 lkb->lkb_wait_type = 0;
1064                 goto out_del;
1065         }
1066
1067         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1068                   lkb->lkb_id, mstype, lkb->lkb_flags);
1069         return -1;
1070
1071  out_del:
1072         /* the force-unlock/cancel has completed and we haven't recvd a reply
1073            to the op that was in progress prior to the unlock/cancel; we
1074            give up on any reply to the earlier op.  FIXME: not sure when/how
1075            this would happen */
1076
1077         if (overlap_done && lkb->lkb_wait_type) {
1078                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1079                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1080                 lkb->lkb_wait_count--;
1081                 lkb->lkb_wait_type = 0;
1082         }
1083
1084         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1085
1086         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1087         lkb->lkb_wait_count--;
1088         if (!lkb->lkb_wait_count)
1089                 list_del_init(&lkb->lkb_wait_reply);
1090         unhold_lkb(lkb);
1091         return 0;
1092 }
1093
1094 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1095 {
1096         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1097         int error;
1098
1099         mutex_lock(&ls->ls_waiters_mutex);
1100         error = _remove_from_waiters(lkb, mstype, NULL);
1101         mutex_unlock(&ls->ls_waiters_mutex);
1102         return error;
1103 }
1104
1105 /* Handles situations where we might be processing a "fake" or "stub" reply in
1106    which we can't try to take waiters_mutex again. */
1107
1108 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1109 {
1110         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1111         int error;
1112
1113         if (ms->m_flags != DLM_IFL_STUB_MS)
1114                 mutex_lock(&ls->ls_waiters_mutex);
1115         error = _remove_from_waiters(lkb, ms->m_type, ms);
1116         if (ms->m_flags != DLM_IFL_STUB_MS)
1117                 mutex_unlock(&ls->ls_waiters_mutex);
1118         return error;
1119 }
1120
1121 static void dir_remove(struct dlm_rsb *r)
1122 {
1123         int to_nodeid;
1124
1125         if (dlm_no_directory(r->res_ls))
1126                 return;
1127
1128         to_nodeid = dlm_dir_nodeid(r);
1129         if (to_nodeid != dlm_our_nodeid())
1130                 send_remove(r);
1131         else
1132                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1133                                      r->res_name, r->res_length);
1134 }
1135
1136 /* FIXME: make this more efficient */
1137
1138 static int shrink_bucket(struct dlm_ls *ls, int b)
1139 {
1140         struct rb_node *n;
1141         struct dlm_rsb *r;
1142         int count = 0, found;
1143
1144         for (;;) {
1145                 found = 0;
1146                 spin_lock(&ls->ls_rsbtbl[b].lock);
1147                 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1148                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1149                         if (!time_after_eq(jiffies, r->res_toss_time +
1150                                            dlm_config.ci_toss_secs * HZ))
1151                                 continue;
1152                         found = 1;
1153                         break;
1154                 }
1155
1156                 if (!found) {
1157                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1158                         break;
1159                 }
1160
1161                 if (kref_put(&r->res_ref, kill_rsb)) {
1162                         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1163                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1164
1165                         if (is_master(r))
1166                                 dir_remove(r);
1167                         dlm_free_rsb(r);
1168                         count++;
1169                 } else {
1170                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1171                         log_error(ls, "tossed rsb in use %s", r->res_name);
1172                 }
1173         }
1174
1175         return count;
1176 }
1177
1178 void dlm_scan_rsbs(struct dlm_ls *ls)
1179 {
1180         int i;
1181
1182         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1183                 shrink_bucket(ls, i);
1184                 if (dlm_locking_stopped(ls))
1185                         break;
1186                 cond_resched();
1187         }
1188 }
1189
1190 static void add_timeout(struct dlm_lkb *lkb)
1191 {
1192         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1193
1194         if (is_master_copy(lkb))
1195                 return;
1196
1197         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1198             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1199                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1200                 goto add_it;
1201         }
1202         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1203                 goto add_it;
1204         return;
1205
1206  add_it:
1207         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1208         mutex_lock(&ls->ls_timeout_mutex);
1209         hold_lkb(lkb);
1210         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1211         mutex_unlock(&ls->ls_timeout_mutex);
1212 }
1213
1214 static void del_timeout(struct dlm_lkb *lkb)
1215 {
1216         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1217
1218         mutex_lock(&ls->ls_timeout_mutex);
1219         if (!list_empty(&lkb->lkb_time_list)) {
1220                 list_del_init(&lkb->lkb_time_list);
1221                 unhold_lkb(lkb);
1222         }
1223         mutex_unlock(&ls->ls_timeout_mutex);
1224 }
1225
1226 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1227    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1228    and then lock rsb because of lock ordering in add_timeout.  We may need
1229    to specify some special timeout-related bits in the lkb that are just to
1230    be accessed under the timeout_mutex. */
1231
1232 void dlm_scan_timeout(struct dlm_ls *ls)
1233 {
1234         struct dlm_rsb *r;
1235         struct dlm_lkb *lkb;
1236         int do_cancel, do_warn;
1237         s64 wait_us;
1238
1239         for (;;) {
1240                 if (dlm_locking_stopped(ls))
1241                         break;
1242
1243                 do_cancel = 0;
1244                 do_warn = 0;
1245                 mutex_lock(&ls->ls_timeout_mutex);
1246                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1247
1248                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1249                                                         lkb->lkb_timestamp));
1250
1251                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1252                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1253                                 do_cancel = 1;
1254
1255                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1256                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1257                                 do_warn = 1;
1258
1259                         if (!do_cancel && !do_warn)
1260                                 continue;
1261                         hold_lkb(lkb);
1262                         break;
1263                 }
1264                 mutex_unlock(&ls->ls_timeout_mutex);
1265
1266                 if (!do_cancel && !do_warn)
1267                         break;
1268
1269                 r = lkb->lkb_resource;
1270                 hold_rsb(r);
1271                 lock_rsb(r);
1272
1273                 if (do_warn) {
1274                         /* clear flag so we only warn once */
1275                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1276                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1277                                 del_timeout(lkb);
1278                         dlm_timeout_warn(lkb);
1279                 }
1280
1281                 if (do_cancel) {
1282                         log_debug(ls, "timeout cancel %x node %d %s",
1283                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1284                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1285                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1286                         del_timeout(lkb);
1287                         _cancel_lock(r, lkb);
1288                 }
1289
1290                 unlock_rsb(r);
1291                 unhold_rsb(r);
1292                 dlm_put_lkb(lkb);
1293         }
1294 }
1295
1296 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1297    dlm_recoverd before checking/setting ls_recover_begin. */
1298
1299 void dlm_adjust_timeouts(struct dlm_ls *ls)
1300 {
1301         struct dlm_lkb *lkb;
1302         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1303
1304         ls->ls_recover_begin = 0;
1305         mutex_lock(&ls->ls_timeout_mutex);
1306         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1307                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1308         mutex_unlock(&ls->ls_timeout_mutex);
1309
1310         if (!dlm_config.ci_waitwarn_us)
1311                 return;
1312
1313         mutex_lock(&ls->ls_waiters_mutex);
1314         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1315                 if (ktime_to_us(lkb->lkb_wait_time))
1316                         lkb->lkb_wait_time = ktime_get();
1317         }
1318         mutex_unlock(&ls->ls_waiters_mutex);
1319 }
1320
1321 /* lkb is master or local copy */
1322
1323 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1324 {
1325         int b, len = r->res_ls->ls_lvblen;
1326
1327         /* b=1 lvb returned to caller
1328            b=0 lvb written to rsb or invalidated
1329            b=-1 do nothing */
1330
1331         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1332
1333         if (b == 1) {
1334                 if (!lkb->lkb_lvbptr)
1335                         return;
1336
1337                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1338                         return;
1339
1340                 if (!r->res_lvbptr)
1341                         return;
1342
1343                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1344                 lkb->lkb_lvbseq = r->res_lvbseq;
1345
1346         } else if (b == 0) {
1347                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1348                         rsb_set_flag(r, RSB_VALNOTVALID);
1349                         return;
1350                 }
1351
1352                 if (!lkb->lkb_lvbptr)
1353                         return;
1354
1355                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1356                         return;
1357
1358                 if (!r->res_lvbptr)
1359                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1360
1361                 if (!r->res_lvbptr)
1362                         return;
1363
1364                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1365                 r->res_lvbseq++;
1366                 lkb->lkb_lvbseq = r->res_lvbseq;
1367                 rsb_clear_flag(r, RSB_VALNOTVALID);
1368         }
1369
1370         if (rsb_flag(r, RSB_VALNOTVALID))
1371                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1372 }
1373
1374 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1375 {
1376         if (lkb->lkb_grmode < DLM_LOCK_PW)
1377                 return;
1378
1379         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1380                 rsb_set_flag(r, RSB_VALNOTVALID);
1381                 return;
1382         }
1383
1384         if (!lkb->lkb_lvbptr)
1385                 return;
1386
1387         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1388                 return;
1389
1390         if (!r->res_lvbptr)
1391                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1392
1393         if (!r->res_lvbptr)
1394                 return;
1395
1396         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1397         r->res_lvbseq++;
1398         rsb_clear_flag(r, RSB_VALNOTVALID);
1399 }
1400
1401 /* lkb is process copy (pc) */
1402
1403 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1404                             struct dlm_message *ms)
1405 {
1406         int b;
1407
1408         if (!lkb->lkb_lvbptr)
1409                 return;
1410
1411         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1412                 return;
1413
1414         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1415         if (b == 1) {
1416                 int len = receive_extralen(ms);
1417                 if (len > DLM_RESNAME_MAXLEN)
1418                         len = DLM_RESNAME_MAXLEN;
1419                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1420                 lkb->lkb_lvbseq = ms->m_lvbseq;
1421         }
1422 }
1423
1424 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1425    remove_lock -- used for unlock, removes lkb from granted
1426    revert_lock -- used for cancel, moves lkb from convert to granted
1427    grant_lock  -- used for request and convert, adds lkb to granted or
1428                   moves lkb from convert or waiting to granted
1429
1430    Each of these is used for master or local copy lkb's.  There is
1431    also a _pc() variation used to make the corresponding change on
1432    a process copy (pc) lkb. */
1433
1434 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435 {
1436         del_lkb(r, lkb);
1437         lkb->lkb_grmode = DLM_LOCK_IV;
1438         /* this unhold undoes the original ref from create_lkb()
1439            so this leads to the lkb being freed */
1440         unhold_lkb(lkb);
1441 }
1442
1443 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1444 {
1445         set_lvb_unlock(r, lkb);
1446         _remove_lock(r, lkb);
1447 }
1448
1449 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1450 {
1451         _remove_lock(r, lkb);
1452 }
1453
1454 /* returns: 0 did nothing
1455             1 moved lock to granted
1456            -1 removed lock */
1457
1458 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1459 {
1460         int rv = 0;
1461
1462         lkb->lkb_rqmode = DLM_LOCK_IV;
1463
1464         switch (lkb->lkb_status) {
1465         case DLM_LKSTS_GRANTED:
1466                 break;
1467         case DLM_LKSTS_CONVERT:
1468                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1469                 rv = 1;
1470                 break;
1471         case DLM_LKSTS_WAITING:
1472                 del_lkb(r, lkb);
1473                 lkb->lkb_grmode = DLM_LOCK_IV;
1474                 /* this unhold undoes the original ref from create_lkb()
1475                    so this leads to the lkb being freed */
1476                 unhold_lkb(lkb);
1477                 rv = -1;
1478                 break;
1479         default:
1480                 log_print("invalid status for revert %d", lkb->lkb_status);
1481         }
1482         return rv;
1483 }
1484
1485 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1486 {
1487         return revert_lock(r, lkb);
1488 }
1489
1490 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1491 {
1492         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1493                 lkb->lkb_grmode = lkb->lkb_rqmode;
1494                 if (lkb->lkb_status)
1495                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1496                 else
1497                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1498         }
1499
1500         lkb->lkb_rqmode = DLM_LOCK_IV;
1501 }
1502
1503 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1504 {
1505         set_lvb_lock(r, lkb);
1506         _grant_lock(r, lkb);
1507         lkb->lkb_highbast = 0;
1508 }
1509
1510 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1511                           struct dlm_message *ms)
1512 {
1513         set_lvb_lock_pc(r, lkb, ms);
1514         _grant_lock(r, lkb);
1515 }
1516
1517 /* called by grant_pending_locks() which means an async grant message must
1518    be sent to the requesting node in addition to granting the lock if the
1519    lkb belongs to a remote node. */
1520
1521 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1522 {
1523         grant_lock(r, lkb);
1524         if (is_master_copy(lkb))
1525                 send_grant(r, lkb);
1526         else
1527                 queue_cast(r, lkb, 0);
1528 }
1529
1530 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1531    change the granted/requested modes.  We're munging things accordingly in
1532    the process copy.
1533    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1534    conversion deadlock
1535    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1536    compatible with other granted locks */
1537
1538 static void munge_demoted(struct dlm_lkb *lkb)
1539 {
1540         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1541                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1542                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1543                 return;
1544         }
1545
1546         lkb->lkb_grmode = DLM_LOCK_NL;
1547 }
1548
1549 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1550 {
1551         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1552             ms->m_type != DLM_MSG_GRANT) {
1553                 log_print("munge_altmode %x invalid reply type %d",
1554                           lkb->lkb_id, ms->m_type);
1555                 return;
1556         }
1557
1558         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1559                 lkb->lkb_rqmode = DLM_LOCK_PR;
1560         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1561                 lkb->lkb_rqmode = DLM_LOCK_CW;
1562         else {
1563                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1564                 dlm_print_lkb(lkb);
1565         }
1566 }
1567
1568 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1569 {
1570         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1571                                            lkb_statequeue);
1572         if (lkb->lkb_id == first->lkb_id)
1573                 return 1;
1574
1575         return 0;
1576 }
1577
1578 /* Check if the given lkb conflicts with another lkb on the queue. */
1579
1580 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1581 {
1582         struct dlm_lkb *this;
1583
1584         list_for_each_entry(this, head, lkb_statequeue) {
1585                 if (this == lkb)
1586                         continue;
1587                 if (!modes_compat(this, lkb))
1588                         return 1;
1589         }
1590         return 0;
1591 }
1592
1593 /*
1594  * "A conversion deadlock arises with a pair of lock requests in the converting
1595  * queue for one resource.  The granted mode of each lock blocks the requested
1596  * mode of the other lock."
1597  *
1598  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1599  * convert queue from being granted, then deadlk/demote lkb.
1600  *
1601  * Example:
1602  * Granted Queue: empty
1603  * Convert Queue: NL->EX (first lock)
1604  *                PR->EX (second lock)
1605  *
1606  * The first lock can't be granted because of the granted mode of the second
1607  * lock and the second lock can't be granted because it's not first in the
1608  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1609  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1610  * flag set and return DEMOTED in the lksb flags.
1611  *
1612  * Originally, this function detected conv-deadlk in a more limited scope:
1613  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1614  * - if lkb1 was the first entry in the queue (not just earlier), and was
1615  *   blocked by the granted mode of lkb2, and there was nothing on the
1616  *   granted queue preventing lkb1 from being granted immediately, i.e.
1617  *   lkb2 was the only thing preventing lkb1 from being granted.
1618  *
1619  * That second condition meant we'd only say there was conv-deadlk if
1620  * resolving it (by demotion) would lead to the first lock on the convert
1621  * queue being granted right away.  It allowed conversion deadlocks to exist
1622  * between locks on the convert queue while they couldn't be granted anyway.
1623  *
1624  * Now, we detect and take action on conversion deadlocks immediately when
1625  * they're created, even if they may not be immediately consequential.  If
1626  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1627  * mode that would prevent lkb1's conversion from being granted, we do a
1628  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1629  * I think this means that the lkb_is_ahead condition below should always
1630  * be zero, i.e. there will never be conv-deadlk between two locks that are
1631  * both already on the convert queue.
1632  */
1633
1634 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1635 {
1636         struct dlm_lkb *lkb1;
1637         int lkb_is_ahead = 0;
1638
1639         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1640                 if (lkb1 == lkb2) {
1641                         lkb_is_ahead = 1;
1642                         continue;
1643                 }
1644
1645                 if (!lkb_is_ahead) {
1646                         if (!modes_compat(lkb2, lkb1))
1647                                 return 1;
1648                 } else {
1649                         if (!modes_compat(lkb2, lkb1) &&
1650                             !modes_compat(lkb1, lkb2))
1651                                 return 1;
1652                 }
1653         }
1654         return 0;
1655 }
1656
1657 /*
1658  * Return 1 if the lock can be granted, 0 otherwise.
1659  * Also detect and resolve conversion deadlocks.
1660  *
1661  * lkb is the lock to be granted
1662  *
1663  * now is 1 if the function is being called in the context of the
1664  * immediate request, it is 0 if called later, after the lock has been
1665  * queued.
1666  *
1667  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1668  */
1669
1670 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1671 {
1672         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1673
1674         /*
1675          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1676          * a new request for a NL mode lock being blocked.
1677          *
1678          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1679          * request, then it would be granted.  In essence, the use of this flag
1680          * tells the Lock Manager to expedite theis request by not considering
1681          * what may be in the CONVERTING or WAITING queues...  As of this
1682          * writing, the EXPEDITE flag can be used only with new requests for NL
1683          * mode locks.  This flag is not valid for conversion requests.
1684          *
1685          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1686          * conversion or used with a non-NL requested mode.  We also know an
1687          * EXPEDITE request is always granted immediately, so now must always
1688          * be 1.  The full condition to grant an expedite request: (now &&
1689          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1690          * therefore be shortened to just checking the flag.
1691          */
1692
1693         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1694                 return 1;
1695
1696         /*
1697          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1698          * added to the remaining conditions.
1699          */
1700
1701         if (queue_conflict(&r->res_grantqueue, lkb))
1702                 goto out;
1703
1704         /*
1705          * 6-3: By default, a conversion request is immediately granted if the
1706          * requested mode is compatible with the modes of all other granted
1707          * locks
1708          */
1709
1710         if (queue_conflict(&r->res_convertqueue, lkb))
1711                 goto out;
1712
1713         /*
1714          * 6-5: But the default algorithm for deciding whether to grant or
1715          * queue conversion requests does not by itself guarantee that such
1716          * requests are serviced on a "first come first serve" basis.  This, in
1717          * turn, can lead to a phenomenon known as "indefinate postponement".
1718          *
1719          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1720          * the system service employed to request a lock conversion.  This flag
1721          * forces certain conversion requests to be queued, even if they are
1722          * compatible with the granted modes of other locks on the same
1723          * resource.  Thus, the use of this flag results in conversion requests
1724          * being ordered on a "first come first servce" basis.
1725          *
1726          * DCT: This condition is all about new conversions being able to occur
1727          * "in place" while the lock remains on the granted queue (assuming
1728          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1729          * doesn't _have_ to go onto the convert queue where it's processed in
1730          * order.  The "now" variable is necessary to distinguish converts
1731          * being received and processed for the first time now, because once a
1732          * convert is moved to the conversion queue the condition below applies
1733          * requiring fifo granting.
1734          */
1735
1736         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1737                 return 1;
1738
1739         /*
1740          * Even if the convert is compat with all granted locks,
1741          * QUECVT forces it behind other locks on the convert queue.
1742          */
1743
1744         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
1745                 if (list_empty(&r->res_convertqueue))
1746                         return 1;
1747                 else
1748                         goto out;
1749         }
1750
1751         /*
1752          * The NOORDER flag is set to avoid the standard vms rules on grant
1753          * order.
1754          */
1755
1756         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1757                 return 1;
1758
1759         /*
1760          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1761          * granted until all other conversion requests ahead of it are granted
1762          * and/or canceled.
1763          */
1764
1765         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1766                 return 1;
1767
1768         /*
1769          * 6-4: By default, a new request is immediately granted only if all
1770          * three of the following conditions are satisfied when the request is
1771          * issued:
1772          * - The queue of ungranted conversion requests for the resource is
1773          *   empty.
1774          * - The queue of ungranted new requests for the resource is empty.
1775          * - The mode of the new request is compatible with the most
1776          *   restrictive mode of all granted locks on the resource.
1777          */
1778
1779         if (now && !conv && list_empty(&r->res_convertqueue) &&
1780             list_empty(&r->res_waitqueue))
1781                 return 1;
1782
1783         /*
1784          * 6-4: Once a lock request is in the queue of ungranted new requests,
1785          * it cannot be granted until the queue of ungranted conversion
1786          * requests is empty, all ungranted new requests ahead of it are
1787          * granted and/or canceled, and it is compatible with the granted mode
1788          * of the most restrictive lock granted on the resource.
1789          */
1790
1791         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1792             first_in_list(lkb, &r->res_waitqueue))
1793                 return 1;
1794  out:
1795         return 0;
1796 }
1797
1798 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1799                           int *err)
1800 {
1801         int rv;
1802         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1803         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1804
1805         if (err)
1806                 *err = 0;
1807
1808         rv = _can_be_granted(r, lkb, now);
1809         if (rv)
1810                 goto out;
1811
1812         /*
1813          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1814          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1815          * cancels one of the locks.
1816          */
1817
1818         if (is_convert && can_be_queued(lkb) &&
1819             conversion_deadlock_detect(r, lkb)) {
1820                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1821                         lkb->lkb_grmode = DLM_LOCK_NL;
1822                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1823                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1824                         if (err)
1825                                 *err = -EDEADLK;
1826                         else {
1827                                 log_print("can_be_granted deadlock %x now %d",
1828                                           lkb->lkb_id, now);
1829                                 dlm_dump_rsb(r);
1830                         }
1831                 }
1832                 goto out;
1833         }
1834
1835         /*
1836          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1837          * to grant a request in a mode other than the normal rqmode.  It's a
1838          * simple way to provide a big optimization to applications that can
1839          * use them.
1840          */
1841
1842         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1843                 alt = DLM_LOCK_PR;
1844         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1845                 alt = DLM_LOCK_CW;
1846
1847         if (alt) {
1848                 lkb->lkb_rqmode = alt;
1849                 rv = _can_be_granted(r, lkb, now);
1850                 if (rv)
1851                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1852                 else
1853                         lkb->lkb_rqmode = rqmode;
1854         }
1855  out:
1856         return rv;
1857 }
1858
1859 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1860    for locks pending on the convert list.  Once verified (watch for these
1861    log_prints), we should be able to just call _can_be_granted() and not
1862    bother with the demote/deadlk cases here (and there's no easy way to deal
1863    with a deadlk here, we'd have to generate something like grant_lock with
1864    the deadlk error.) */
1865
1866 /* Returns the highest requested mode of all blocked conversions; sets
1867    cw if there's a blocked conversion to DLM_LOCK_CW. */
1868
1869 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1870 {
1871         struct dlm_lkb *lkb, *s;
1872         int hi, demoted, quit, grant_restart, demote_restart;
1873         int deadlk;
1874
1875         quit = 0;
1876  restart:
1877         grant_restart = 0;
1878         demote_restart = 0;
1879         hi = DLM_LOCK_IV;
1880
1881         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1882                 demoted = is_demoted(lkb);
1883                 deadlk = 0;
1884
1885                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1886                         grant_lock_pending(r, lkb);
1887                         grant_restart = 1;
1888                         continue;
1889                 }
1890
1891                 if (!demoted && is_demoted(lkb)) {
1892                         log_print("WARN: pending demoted %x node %d %s",
1893                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1894                         demote_restart = 1;
1895                         continue;
1896                 }
1897
1898                 if (deadlk) {
1899                         log_print("WARN: pending deadlock %x node %d %s",
1900                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1901                         dlm_dump_rsb(r);
1902                         continue;
1903                 }
1904
1905                 hi = max_t(int, lkb->lkb_rqmode, hi);
1906
1907                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1908                         *cw = 1;
1909         }
1910
1911         if (grant_restart)
1912                 goto restart;
1913         if (demote_restart && !quit) {
1914                 quit = 1;
1915                 goto restart;
1916         }
1917
1918         return max_t(int, high, hi);
1919 }
1920
1921 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1922 {
1923         struct dlm_lkb *lkb, *s;
1924
1925         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1926                 if (can_be_granted(r, lkb, 0, NULL))
1927                         grant_lock_pending(r, lkb);
1928                 else {
1929                         high = max_t(int, lkb->lkb_rqmode, high);
1930                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1931                                 *cw = 1;
1932                 }
1933         }
1934
1935         return high;
1936 }
1937
1938 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1939    on either the convert or waiting queue.
1940    high is the largest rqmode of all locks blocked on the convert or
1941    waiting queue. */
1942
1943 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1944 {
1945         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1946                 if (gr->lkb_highbast < DLM_LOCK_EX)
1947                         return 1;
1948                 return 0;
1949         }
1950
1951         if (gr->lkb_highbast < high &&
1952             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1953                 return 1;
1954         return 0;
1955 }
1956
1957 static void grant_pending_locks(struct dlm_rsb *r)
1958 {
1959         struct dlm_lkb *lkb, *s;
1960         int high = DLM_LOCK_IV;
1961         int cw = 0;
1962
1963         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1964
1965         high = grant_pending_convert(r, high, &cw);
1966         high = grant_pending_wait(r, high, &cw);
1967
1968         if (high == DLM_LOCK_IV)
1969                 return;
1970
1971         /*
1972          * If there are locks left on the wait/convert queue then send blocking
1973          * ASTs to granted locks based on the largest requested mode (high)
1974          * found above.
1975          */
1976
1977         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1978                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1979                         if (cw && high == DLM_LOCK_PR &&
1980                             lkb->lkb_grmode == DLM_LOCK_PR)
1981                                 queue_bast(r, lkb, DLM_LOCK_CW);
1982                         else
1983                                 queue_bast(r, lkb, high);
1984                         lkb->lkb_highbast = high;
1985                 }
1986         }
1987 }
1988
1989 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1990 {
1991         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1992             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1993                 if (gr->lkb_highbast < DLM_LOCK_EX)
1994                         return 1;
1995                 return 0;
1996         }
1997
1998         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1999                 return 1;
2000         return 0;
2001 }
2002
2003 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2004                             struct dlm_lkb *lkb)
2005 {
2006         struct dlm_lkb *gr;
2007
2008         list_for_each_entry(gr, head, lkb_statequeue) {
2009                 /* skip self when sending basts to convertqueue */
2010                 if (gr == lkb)
2011                         continue;
2012                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2013                         queue_bast(r, gr, lkb->lkb_rqmode);
2014                         gr->lkb_highbast = lkb->lkb_rqmode;
2015                 }
2016         }
2017 }
2018
2019 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2020 {
2021         send_bast_queue(r, &r->res_grantqueue, lkb);
2022 }
2023
2024 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2025 {
2026         send_bast_queue(r, &r->res_grantqueue, lkb);
2027         send_bast_queue(r, &r->res_convertqueue, lkb);
2028 }
2029
2030 /* set_master(r, lkb) -- set the master nodeid of a resource
2031
2032    The purpose of this function is to set the nodeid field in the given
2033    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2034    known, it can just be copied to the lkb and the function will return
2035    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2036    before it can be copied to the lkb.
2037
2038    When the rsb nodeid is being looked up remotely, the initial lkb
2039    causing the lookup is kept on the ls_waiters list waiting for the
2040    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2041    on the rsb's res_lookup list until the master is verified.
2042
2043    Return values:
2044    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2045    1: the rsb master is not available and the lkb has been placed on
2046       a wait queue
2047 */
2048
2049 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2050 {
2051         struct dlm_ls *ls = r->res_ls;
2052         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
2053
2054         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2055                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2056                 r->res_first_lkid = lkb->lkb_id;
2057                 lkb->lkb_nodeid = r->res_nodeid;
2058                 return 0;
2059         }
2060
2061         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2062                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2063                 return 1;
2064         }
2065
2066         if (r->res_nodeid == 0) {
2067                 lkb->lkb_nodeid = 0;
2068                 return 0;
2069         }
2070
2071         if (r->res_nodeid > 0) {
2072                 lkb->lkb_nodeid = r->res_nodeid;
2073                 return 0;
2074         }
2075
2076         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2077
2078         dir_nodeid = dlm_dir_nodeid(r);
2079
2080         if (dir_nodeid != our_nodeid) {
2081                 r->res_first_lkid = lkb->lkb_id;
2082                 send_lookup(r, lkb);
2083                 return 1;
2084         }
2085
2086         for (i = 0; i < 2; i++) {
2087                 /* It's possible for dlm_scand to remove an old rsb for
2088                    this same resource from the toss list, us to create
2089                    a new one, look up the master locally, and find it
2090                    already exists just before dlm_scand does the
2091                    dir_remove() on the previous rsb. */
2092
2093                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2094                                        r->res_length, &ret_nodeid);
2095                 if (!error)
2096                         break;
2097                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2098                 schedule();
2099         }
2100         if (error && error != -EEXIST)
2101                 return error;
2102
2103         if (ret_nodeid == our_nodeid) {
2104                 r->res_first_lkid = 0;
2105                 r->res_nodeid = 0;
2106                 lkb->lkb_nodeid = 0;
2107         } else {
2108                 r->res_first_lkid = lkb->lkb_id;
2109                 r->res_nodeid = ret_nodeid;
2110                 lkb->lkb_nodeid = ret_nodeid;
2111         }
2112         return 0;
2113 }
2114
2115 static void process_lookup_list(struct dlm_rsb *r)
2116 {
2117         struct dlm_lkb *lkb, *safe;
2118
2119         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2120                 list_del_init(&lkb->lkb_rsb_lookup);
2121                 _request_lock(r, lkb);
2122                 schedule();
2123         }
2124 }
2125
2126 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2127
2128 static void confirm_master(struct dlm_rsb *r, int error)
2129 {
2130         struct dlm_lkb *lkb;
2131
2132         if (!r->res_first_lkid)
2133                 return;
2134
2135         switch (error) {
2136         case 0:
2137         case -EINPROGRESS:
2138                 r->res_first_lkid = 0;
2139                 process_lookup_list(r);
2140                 break;
2141
2142         case -EAGAIN:
2143         case -EBADR:
2144         case -ENOTBLK:
2145                 /* the remote request failed and won't be retried (it was
2146                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2147                    lkb the first_lkid */
2148
2149                 r->res_first_lkid = 0;
2150
2151                 if (!list_empty(&r->res_lookup)) {
2152                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2153                                          lkb_rsb_lookup);
2154                         list_del_init(&lkb->lkb_rsb_lookup);
2155                         r->res_first_lkid = lkb->lkb_id;
2156                         _request_lock(r, lkb);
2157                 }
2158                 break;
2159
2160         default:
2161                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2162         }
2163 }
2164
2165 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2166                          int namelen, unsigned long timeout_cs,
2167                          void (*ast) (void *astparam),
2168                          void *astparam,
2169                          void (*bast) (void *astparam, int mode),
2170                          struct dlm_args *args)
2171 {
2172         int rv = -EINVAL;
2173
2174         /* check for invalid arg usage */
2175
2176         if (mode < 0 || mode > DLM_LOCK_EX)
2177                 goto out;
2178
2179         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2180                 goto out;
2181
2182         if (flags & DLM_LKF_CANCEL)
2183                 goto out;
2184
2185         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2186                 goto out;
2187
2188         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2189                 goto out;
2190
2191         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2192                 goto out;
2193
2194         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2195                 goto out;
2196
2197         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2198                 goto out;
2199
2200         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2201                 goto out;
2202
2203         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2204                 goto out;
2205
2206         if (!ast || !lksb)
2207                 goto out;
2208
2209         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2210                 goto out;
2211
2212         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2213                 goto out;
2214
2215         /* these args will be copied to the lkb in validate_lock_args,
2216            it cannot be done now because when converting locks, fields in
2217            an active lkb cannot be modified before locking the rsb */
2218
2219         args->flags = flags;
2220         args->astfn = ast;
2221         args->astparam = astparam;
2222         args->bastfn = bast;
2223         args->timeout = timeout_cs;
2224         args->mode = mode;
2225         args->lksb = lksb;
2226         rv = 0;
2227  out:
2228         return rv;
2229 }
2230
2231 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2232 {
2233         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2234                       DLM_LKF_FORCEUNLOCK))
2235                 return -EINVAL;
2236
2237         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2238                 return -EINVAL;
2239
2240         args->flags = flags;
2241         args->astparam = astarg;
2242         return 0;
2243 }
2244
2245 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2246                               struct dlm_args *args)
2247 {
2248         int rv = -EINVAL;
2249
2250         if (args->flags & DLM_LKF_CONVERT) {
2251                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2252                         goto out;
2253
2254                 if (args->flags & DLM_LKF_QUECVT &&
2255                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2256                         goto out;
2257
2258                 rv = -EBUSY;
2259                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2260                         goto out;
2261
2262                 if (lkb->lkb_wait_type)
2263                         goto out;
2264
2265                 if (is_overlap(lkb))
2266                         goto out;
2267         }
2268
2269         lkb->lkb_exflags = args->flags;
2270         lkb->lkb_sbflags = 0;
2271         lkb->lkb_astfn = args->astfn;
2272         lkb->lkb_astparam = args->astparam;
2273         lkb->lkb_bastfn = args->bastfn;
2274         lkb->lkb_rqmode = args->mode;
2275         lkb->lkb_lksb = args->lksb;
2276         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2277         lkb->lkb_ownpid = (int) current->pid;
2278         lkb->lkb_timeout_cs = args->timeout;
2279         rv = 0;
2280  out:
2281         if (rv)
2282                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2283                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2284                           lkb->lkb_status, lkb->lkb_wait_type,
2285                           lkb->lkb_resource->res_name);
2286         return rv;
2287 }
2288
2289 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2290    for success */
2291
2292 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2293    because there may be a lookup in progress and it's valid to do
2294    cancel/unlockf on it */
2295
2296 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2297 {
2298         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2299         int rv = -EINVAL;
2300
2301         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2302                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2303                 dlm_print_lkb(lkb);
2304                 goto out;
2305         }
2306
2307         /* an lkb may still exist even though the lock is EOL'ed due to a
2308            cancel, unlock or failed noqueue request; an app can't use these
2309            locks; return same error as if the lkid had not been found at all */
2310
2311         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2312                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2313                 rv = -ENOENT;
2314                 goto out;
2315         }
2316
2317         /* an lkb may be waiting for an rsb lookup to complete where the
2318            lookup was initiated by another lock */
2319
2320         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2321                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2322                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2323                         list_del_init(&lkb->lkb_rsb_lookup);
2324                         queue_cast(lkb->lkb_resource, lkb,
2325                                    args->flags & DLM_LKF_CANCEL ?
2326                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2327                         unhold_lkb(lkb); /* undoes create_lkb() */
2328                 }
2329                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2330                 rv = -EBUSY;
2331                 goto out;
2332         }
2333
2334         /* cancel not allowed with another cancel/unlock in progress */
2335
2336         if (args->flags & DLM_LKF_CANCEL) {
2337                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2338                         goto out;
2339
2340                 if (is_overlap(lkb))
2341                         goto out;
2342
2343                 /* don't let scand try to do a cancel */
2344                 del_timeout(lkb);
2345
2346                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2347                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2348                         rv = -EBUSY;
2349                         goto out;
2350                 }
2351
2352                 /* there's nothing to cancel */
2353                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2354                     !lkb->lkb_wait_type) {
2355                         rv = -EBUSY;
2356                         goto out;
2357                 }
2358
2359                 switch (lkb->lkb_wait_type) {
2360                 case DLM_MSG_LOOKUP:
2361                 case DLM_MSG_REQUEST:
2362                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2363                         rv = -EBUSY;
2364                         goto out;
2365                 case DLM_MSG_UNLOCK:
2366                 case DLM_MSG_CANCEL:
2367                         goto out;
2368                 }
2369                 /* add_to_waiters() will set OVERLAP_CANCEL */
2370                 goto out_ok;
2371         }
2372
2373         /* do we need to allow a force-unlock if there's a normal unlock
2374            already in progress?  in what conditions could the normal unlock
2375            fail such that we'd want to send a force-unlock to be sure? */
2376
2377         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2378                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2379                         goto out;
2380
2381                 if (is_overlap_unlock(lkb))
2382                         goto out;
2383
2384                 /* don't let scand try to do a cancel */
2385                 del_timeout(lkb);
2386
2387                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2388                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2389                         rv = -EBUSY;
2390                         goto out;
2391                 }
2392
2393                 switch (lkb->lkb_wait_type) {
2394                 case DLM_MSG_LOOKUP:
2395                 case DLM_MSG_REQUEST:
2396                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2397                         rv = -EBUSY;
2398                         goto out;
2399                 case DLM_MSG_UNLOCK:
2400                         goto out;
2401                 }
2402                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2403                 goto out_ok;
2404         }
2405
2406         /* normal unlock not allowed if there's any op in progress */
2407         rv = -EBUSY;
2408         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2409                 goto out;
2410
2411  out_ok:
2412         /* an overlapping op shouldn't blow away exflags from other op */
2413         lkb->lkb_exflags |= args->flags;
2414         lkb->lkb_sbflags = 0;
2415         lkb->lkb_astparam = args->astparam;
2416         rv = 0;
2417  out:
2418         if (rv)
2419                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2420                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2421                           args->flags, lkb->lkb_wait_type,
2422                           lkb->lkb_resource->res_name);
2423         return rv;
2424 }
2425
2426 /*
2427  * Four stage 4 varieties:
2428  * do_request(), do_convert(), do_unlock(), do_cancel()
2429  * These are called on the master node for the given lock and
2430  * from the central locking logic.
2431  */
2432
2433 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2434 {
2435         int error = 0;
2436
2437         if (can_be_granted(r, lkb, 1, NULL)) {
2438                 grant_lock(r, lkb);
2439                 queue_cast(r, lkb, 0);
2440                 goto out;
2441         }
2442
2443         if (can_be_queued(lkb)) {
2444                 error = -EINPROGRESS;
2445                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2446                 add_timeout(lkb);
2447                 goto out;
2448         }
2449
2450         error = -EAGAIN;
2451         queue_cast(r, lkb, -EAGAIN);
2452  out:
2453         return error;
2454 }
2455
2456 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2457                                int error)
2458 {
2459         switch (error) {
2460         case -EAGAIN:
2461                 if (force_blocking_asts(lkb))
2462                         send_blocking_asts_all(r, lkb);
2463                 break;
2464         case -EINPROGRESS:
2465                 send_blocking_asts(r, lkb);
2466                 break;
2467         }
2468 }
2469
2470 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2471 {
2472         int error = 0;
2473         int deadlk = 0;
2474
2475         /* changing an existing lock may allow others to be granted */
2476
2477         if (can_be_granted(r, lkb, 1, &deadlk)) {
2478                 grant_lock(r, lkb);
2479                 queue_cast(r, lkb, 0);
2480                 goto out;
2481         }
2482
2483         /* can_be_granted() detected that this lock would block in a conversion
2484            deadlock, so we leave it on the granted queue and return EDEADLK in
2485            the ast for the convert. */
2486
2487         if (deadlk) {
2488                 /* it's left on the granted queue */
2489                 revert_lock(r, lkb);
2490                 queue_cast(r, lkb, -EDEADLK);
2491                 error = -EDEADLK;
2492                 goto out;
2493         }
2494
2495         /* is_demoted() means the can_be_granted() above set the grmode
2496            to NL, and left us on the granted queue.  This auto-demotion
2497            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2498            now grantable.  We have to try to grant other converting locks
2499            before we try again to grant this one. */
2500
2501         if (is_demoted(lkb)) {
2502                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2503                 if (_can_be_granted(r, lkb, 1)) {
2504                         grant_lock(r, lkb);
2505                         queue_cast(r, lkb, 0);
2506                         goto out;
2507                 }
2508                 /* else fall through and move to convert queue */
2509         }
2510
2511         if (can_be_queued(lkb)) {
2512                 error = -EINPROGRESS;
2513                 del_lkb(r, lkb);
2514                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2515                 add_timeout(lkb);
2516                 goto out;
2517         }
2518
2519         error = -EAGAIN;
2520         queue_cast(r, lkb, -EAGAIN);
2521  out:
2522         return error;
2523 }
2524
2525 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2526                                int error)
2527 {
2528         switch (error) {
2529         case 0:
2530                 grant_pending_locks(r);
2531                 /* grant_pending_locks also sends basts */
2532                 break;
2533         case -EAGAIN:
2534                 if (force_blocking_asts(lkb))
2535                         send_blocking_asts_all(r, lkb);
2536                 break;
2537         case -EINPROGRESS:
2538                 send_blocking_asts(r, lkb);
2539                 break;
2540         }
2541 }
2542
2543 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2544 {
2545         remove_lock(r, lkb);
2546         queue_cast(r, lkb, -DLM_EUNLOCK);
2547         return -DLM_EUNLOCK;
2548 }
2549
2550 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2551                               int error)
2552 {
2553         grant_pending_locks(r);
2554 }
2555
2556 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2557  
2558 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2559 {
2560         int error;
2561
2562         error = revert_lock(r, lkb);
2563         if (error) {
2564                 queue_cast(r, lkb, -DLM_ECANCEL);
2565                 return -DLM_ECANCEL;
2566         }
2567         return 0;
2568 }
2569
2570 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2571                               int error)
2572 {
2573         if (error)
2574                 grant_pending_locks(r);
2575 }
2576
2577 /*
2578  * Four stage 3 varieties:
2579  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2580  */
2581
2582 /* add a new lkb to a possibly new rsb, called by requesting process */
2583
2584 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2585 {
2586         int error;
2587
2588         /* set_master: sets lkb nodeid from r */
2589
2590         error = set_master(r, lkb);
2591         if (error < 0)
2592                 goto out;
2593         if (error) {
2594                 error = 0;
2595                 goto out;
2596         }
2597
2598         if (is_remote(r)) {
2599                 /* receive_request() calls do_request() on remote node */
2600                 error = send_request(r, lkb);
2601         } else {
2602                 error = do_request(r, lkb);
2603                 /* for remote locks the request_reply is sent
2604                    between do_request and do_request_effects */
2605                 do_request_effects(r, lkb, error);
2606         }
2607  out:
2608         return error;
2609 }
2610
2611 /* change some property of an existing lkb, e.g. mode */
2612
2613 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2614 {
2615         int error;
2616
2617         if (is_remote(r)) {
2618                 /* receive_convert() calls do_convert() on remote node */
2619                 error = send_convert(r, lkb);
2620         } else {
2621                 error = do_convert(r, lkb);
2622                 /* for remote locks the convert_reply is sent
2623                    between do_convert and do_convert_effects */
2624                 do_convert_effects(r, lkb, error);
2625         }
2626
2627         return error;
2628 }
2629
2630 /* remove an existing lkb from the granted queue */
2631
2632 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2633 {
2634         int error;
2635
2636         if (is_remote(r)) {
2637                 /* receive_unlock() calls do_unlock() on remote node */
2638                 error = send_unlock(r, lkb);
2639         } else {
2640                 error = do_unlock(r, lkb);
2641                 /* for remote locks the unlock_reply is sent
2642                    between do_unlock and do_unlock_effects */
2643                 do_unlock_effects(r, lkb, error);
2644         }
2645
2646         return error;
2647 }
2648
2649 /* remove an existing lkb from the convert or wait queue */
2650
2651 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2652 {
2653         int error;
2654
2655         if (is_remote(r)) {
2656                 /* receive_cancel() calls do_cancel() on remote node */
2657                 error = send_cancel(r, lkb);
2658         } else {
2659                 error = do_cancel(r, lkb);
2660                 /* for remote locks the cancel_reply is sent
2661                    between do_cancel and do_cancel_effects */
2662                 do_cancel_effects(r, lkb, error);
2663         }
2664
2665         return error;
2666 }
2667
2668 /*
2669  * Four stage 2 varieties:
2670  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2671  */
2672
2673 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2674                         int len, struct dlm_args *args)
2675 {
2676         struct dlm_rsb *r;
2677         int error;
2678
2679         error = validate_lock_args(ls, lkb, args);
2680         if (error)
2681                 goto out;
2682
2683         error = find_rsb(ls, name, len, R_CREATE, &r);
2684         if (error)
2685                 goto out;
2686
2687         lock_rsb(r);
2688
2689         attach_lkb(r, lkb);
2690         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2691
2692         error = _request_lock(r, lkb);
2693
2694         unlock_rsb(r);
2695         put_rsb(r);
2696
2697  out:
2698         return error;
2699 }
2700
2701 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2702                         struct dlm_args *args)
2703 {
2704         struct dlm_rsb *r;
2705         int error;
2706
2707         r = lkb->lkb_resource;
2708
2709         hold_rsb(r);
2710         lock_rsb(r);
2711
2712         error = validate_lock_args(ls, lkb, args);
2713         if (error)
2714                 goto out;
2715
2716         error = _convert_lock(r, lkb);
2717  out:
2718         unlock_rsb(r);
2719         put_rsb(r);
2720         return error;
2721 }
2722
2723 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2724                        struct dlm_args *args)
2725 {
2726         struct dlm_rsb *r;
2727         int error;
2728
2729         r = lkb->lkb_resource;
2730
2731         hold_rsb(r);
2732         lock_rsb(r);
2733
2734         error = validate_unlock_args(lkb, args);
2735         if (error)
2736                 goto out;
2737
2738         error = _unlock_lock(r, lkb);
2739  out:
2740         unlock_rsb(r);
2741         put_rsb(r);
2742         return error;
2743 }
2744
2745 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2746                        struct dlm_args *args)
2747 {
2748         struct dlm_rsb *r;
2749         int error;
2750
2751         r = lkb->lkb_resource;
2752
2753         hold_rsb(r);
2754         lock_rsb(r);
2755
2756         error = validate_unlock_args(lkb, args);
2757         if (error)
2758                 goto out;
2759
2760         error = _cancel_lock(r, lkb);
2761  out:
2762         unlock_rsb(r);
2763         put_rsb(r);
2764         return error;
2765 }
2766
2767 /*
2768  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2769  */
2770
2771 int dlm_lock(dlm_lockspace_t *lockspace,
2772              int mode,
2773              struct dlm_lksb *lksb,
2774              uint32_t flags,
2775              void *name,
2776              unsigned int namelen,
2777              uint32_t parent_lkid,
2778              void (*ast) (void *astarg),
2779              void *astarg,
2780              void (*bast) (void *astarg, int mode))
2781 {
2782         struct dlm_ls *ls;
2783         struct dlm_lkb *lkb;
2784         struct dlm_args args;
2785         int error, convert = flags & DLM_LKF_CONVERT;
2786
2787         ls = dlm_find_lockspace_local(lockspace);
2788         if (!ls)
2789                 return -EINVAL;
2790
2791         dlm_lock_recovery(ls);
2792
2793         if (convert)
2794                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2795         else
2796                 error = create_lkb(ls, &lkb);
2797
2798         if (error)
2799                 goto out;
2800
2801         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2802                               astarg, bast, &args);
2803         if (error)
2804                 goto out_put;
2805
2806         if (convert)
2807                 error = convert_lock(ls, lkb, &args);
2808         else
2809                 error = request_lock(ls, lkb, name, namelen, &args);
2810
2811         if (error == -EINPROGRESS)
2812                 error = 0;
2813  out_put:
2814         if (convert || error)
2815                 __put_lkb(ls, lkb);
2816         if (error == -EAGAIN || error == -EDEADLK)
2817                 error = 0;
2818  out:
2819         dlm_unlock_recovery(ls);
2820         dlm_put_lockspace(ls);
2821         return error;
2822 }
2823
2824 int dlm_unlock(dlm_lockspace_t *lockspace,
2825                uint32_t lkid,
2826                uint32_t flags,
2827                struct dlm_lksb *lksb,
2828                void *astarg)
2829 {
2830         struct dlm_ls *ls;
2831         struct dlm_lkb *lkb;
2832         struct dlm_args args;
2833         int error;
2834
2835         ls = dlm_find_lockspace_local(lockspace);
2836         if (!ls)
2837                 return -EINVAL;
2838
2839         dlm_lock_recovery(ls);
2840
2841         error = find_lkb(ls, lkid, &lkb);
2842         if (error)
2843                 goto out;
2844
2845         error = set_unlock_args(flags, astarg, &args);
2846         if (error)
2847                 goto out_put;
2848
2849         if (flags & DLM_LKF_CANCEL)
2850                 error = cancel_lock(ls, lkb, &args);
2851         else
2852                 error = unlock_lock(ls, lkb, &args);
2853
2854         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2855                 error = 0;
2856         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2857                 error = 0;
2858  out_put:
2859         dlm_put_lkb(lkb);
2860  out:
2861         dlm_unlock_recovery(ls);
2862         dlm_put_lockspace(ls);
2863         return error;
2864 }
2865
2866 /*
2867  * send/receive routines for remote operations and replies
2868  *
2869  * send_args
2870  * send_common
2871  * send_request                 receive_request
2872  * send_convert                 receive_convert
2873  * send_unlock                  receive_unlock
2874  * send_cancel                  receive_cancel
2875  * send_grant                   receive_grant
2876  * send_bast                    receive_bast
2877  * send_lookup                  receive_lookup
2878  * send_remove                  receive_remove
2879  *
2880  *                              send_common_reply
2881  * receive_request_reply        send_request_reply
2882  * receive_convert_reply        send_convert_reply
2883  * receive_unlock_reply         send_unlock_reply
2884  * receive_cancel_reply         send_cancel_reply
2885  * receive_lookup_reply         send_lookup_reply
2886  */
2887
2888 static int _create_message(struct dlm_ls *ls, int mb_len,
2889                            int to_nodeid, int mstype,
2890                            struct dlm_message **ms_ret,
2891                            struct dlm_mhandle **mh_ret)
2892 {
2893         struct dlm_message *ms;
2894         struct dlm_mhandle *mh;
2895         char *mb;
2896
2897         /* get_buffer gives us a message handle (mh) that we need to
2898            pass into lowcomms_commit and a message buffer (mb) that we
2899            write our data into */
2900
2901         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2902         if (!mh)
2903                 return -ENOBUFS;
2904
2905         memset(mb, 0, mb_len);
2906
2907         ms = (struct dlm_message *) mb;
2908
2909         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2910         ms->m_header.h_lockspace = ls->ls_global_id;
2911         ms->m_header.h_nodeid = dlm_our_nodeid();
2912         ms->m_header.h_length = mb_len;
2913         ms->m_header.h_cmd = DLM_MSG;
2914
2915         ms->m_type = mstype;
2916
2917         *mh_ret = mh;
2918         *ms_ret = ms;
2919         return 0;
2920 }
2921
2922 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2923                           int to_nodeid, int mstype,
2924                           struct dlm_message **ms_ret,
2925                           struct dlm_mhandle **mh_ret)
2926 {
2927         int mb_len = sizeof(struct dlm_message);
2928
2929         switch (mstype) {
2930         case DLM_MSG_REQUEST:
2931         case DLM_MSG_LOOKUP:
2932         case DLM_MSG_REMOVE:
2933                 mb_len += r->res_length;
2934                 break;
2935         case DLM_MSG_CONVERT:
2936         case DLM_MSG_UNLOCK:
2937         case DLM_MSG_REQUEST_REPLY:
2938         case DLM_MSG_CONVERT_REPLY:
2939         case DLM_MSG_GRANT:
2940                 if (lkb && lkb->lkb_lvbptr)
2941                         mb_len += r->res_ls->ls_lvblen;
2942                 break;
2943         }
2944
2945         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2946                                ms_ret, mh_ret);
2947 }
2948
2949 /* further lowcomms enhancements or alternate implementations may make
2950    the return value from this function useful at some point */
2951
2952 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2953 {
2954         dlm_message_out(ms);
2955         dlm_lowcomms_commit_buffer(mh);
2956         return 0;
2957 }
2958
2959 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2960                       struct dlm_message *ms)
2961 {
2962         ms->m_nodeid   = lkb->lkb_nodeid;
2963         ms->m_pid      = lkb->lkb_ownpid;
2964         ms->m_lkid     = lkb->lkb_id;
2965         ms->m_remid    = lkb->lkb_remid;
2966         ms->m_exflags  = lkb->lkb_exflags;
2967         ms->m_sbflags  = lkb->lkb_sbflags;
2968         ms->m_flags    = lkb->lkb_flags;
2969         ms->m_lvbseq   = lkb->lkb_lvbseq;
2970         ms->m_status   = lkb->lkb_status;
2971         ms->m_grmode   = lkb->lkb_grmode;
2972         ms->m_rqmode   = lkb->lkb_rqmode;
2973         ms->m_hash     = r->res_hash;
2974
2975         /* m_result and m_bastmode are set from function args,
2976            not from lkb fields */
2977
2978         if (lkb->lkb_bastfn)
2979                 ms->m_asts |= DLM_CB_BAST;
2980         if (lkb->lkb_astfn)
2981                 ms->m_asts |= DLM_CB_CAST;
2982
2983         /* compare with switch in create_message; send_remove() doesn't
2984            use send_args() */
2985
2986         switch (ms->m_type) {
2987         case DLM_MSG_REQUEST:
2988         case DLM_MSG_LOOKUP:
2989                 memcpy(ms->m_extra, r->res_name, r->res_length);
2990                 break;
2991         case DLM_MSG_CONVERT:
2992         case DLM_MSG_UNLOCK:
2993         case DLM_MSG_REQUEST_REPLY:
2994         case DLM_MSG_CONVERT_REPLY:
2995         case DLM_MSG_GRANT:
2996                 if (!lkb->lkb_lvbptr)
2997                         break;
2998                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2999                 break;
3000         }
3001 }
3002
3003 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3004 {
3005         struct dlm_message *ms;
3006         struct dlm_mhandle *mh;
3007         int to_nodeid, error;
3008
3009         to_nodeid = r->res_nodeid;
3010
3011         error = add_to_waiters(lkb, mstype, to_nodeid);
3012         if (error)
3013                 return error;
3014
3015         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3016         if (error)
3017                 goto fail;
3018
3019         send_args(r, lkb, ms);
3020
3021         error = send_message(mh, ms);
3022         if (error)
3023                 goto fail;
3024         return 0;
3025
3026  fail:
3027         remove_from_waiters(lkb, msg_reply_type(mstype));
3028         return error;
3029 }
3030
3031 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3032 {
3033         return send_common(r, lkb, DLM_MSG_REQUEST);
3034 }
3035
3036 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3037 {
3038         int error;
3039
3040         error = send_common(r, lkb, DLM_MSG_CONVERT);
3041
3042         /* down conversions go without a reply from the master */
3043         if (!error && down_conversion(lkb)) {
3044                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3045                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3046                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3047                 r->res_ls->ls_stub_ms.m_result = 0;
3048                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3049         }
3050
3051         return error;
3052 }
3053
3054 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3055    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3056    that the master is still correct. */
3057
3058 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3059 {
3060         return send_common(r, lkb, DLM_MSG_UNLOCK);
3061 }
3062
3063 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3064 {
3065         return send_common(r, lkb, DLM_MSG_CANCEL);
3066 }
3067
3068 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3069 {
3070         struct dlm_message *ms;
3071         struct dlm_mhandle *mh;
3072         int to_nodeid, error;
3073
3074         to_nodeid = lkb->lkb_nodeid;
3075
3076         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3077         if (error)
3078                 goto out;
3079
3080         send_args(r, lkb, ms);
3081
3082         ms->m_result = 0;
3083
3084         error = send_message(mh, ms);
3085  out:
3086         return error;
3087 }
3088
3089 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3090 {
3091         struct dlm_message *ms;
3092         struct dlm_mhandle *mh;
3093         int to_nodeid, error;
3094
3095         to_nodeid = lkb->lkb_nodeid;
3096
3097         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3098         if (error)
3099                 goto out;
3100
3101         send_args(r, lkb, ms);
3102
3103         ms->m_bastmode = mode;
3104
3105         error = send_message(mh, ms);
3106  out:
3107         return error;
3108 }
3109
3110 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3111 {
3112         struct dlm_message *ms;
3113         struct dlm_mhandle *mh;
3114         int to_nodeid, error;
3115
3116         to_nodeid = dlm_dir_nodeid(r);
3117
3118         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3119         if (error)
3120                 return error;
3121
3122         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3123         if (error)
3124                 goto fail;
3125
3126         send_args(r, lkb, ms);
3127
3128         error = send_message(mh, ms);
3129         if (error)
3130                 goto fail;
3131         return 0;
3132
3133  fail:
3134         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3135         return error;
3136 }
3137
3138 static int send_remove(struct dlm_rsb *r)
3139 {
3140         struct dlm_message *ms;
3141         struct dlm_mhandle *mh;
3142         int to_nodeid, error;
3143
3144         to_nodeid = dlm_dir_nodeid(r);
3145
3146         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3147         if (error)
3148                 goto out;
3149
3150         memcpy(ms->m_extra, r->res_name, r->res_length);
3151         ms->m_hash = r->res_hash;
3152
3153         error = send_message(mh, ms);
3154  out:
3155         return error;
3156 }
3157
3158 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3159                              int mstype, int rv)
3160 {
3161         struct dlm_message *ms;
3162         struct dlm_mhandle *mh;
3163         int to_nodeid, error;
3164
3165         to_nodeid = lkb->lkb_nodeid;
3166
3167         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3168         if (error)
3169                 goto out;
3170
3171         send_args(r, lkb, ms);
3172
3173         ms->m_result = rv;
3174
3175         error = send_message(mh, ms);
3176  out:
3177         return error;
3178 }
3179
3180 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3181 {
3182         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3183 }
3184
3185 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3186 {
3187         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3188 }
3189
3190 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3191 {
3192         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3193 }
3194
3195 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3196 {
3197         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3198 }
3199
3200 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3201                              int ret_nodeid, int rv)
3202 {
3203         struct dlm_rsb *r = &ls->ls_stub_rsb;
3204         struct dlm_message *ms;
3205         struct dlm_mhandle *mh;
3206         int error, nodeid = ms_in->m_header.h_nodeid;
3207
3208         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3209         if (error)
3210                 goto out;
3211
3212         ms->m_lkid = ms_in->m_lkid;
3213         ms->m_result = rv;
3214         ms->m_nodeid = ret_nodeid;
3215
3216         error = send_message(mh, ms);
3217  out:
3218         return error;
3219 }
3220
3221 /* which args we save from a received message depends heavily on the type
3222    of message, unlike the send side where we can safely send everything about
3223    the lkb for any type of message */
3224
3225 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3226 {
3227         lkb->lkb_exflags = ms->m_exflags;
3228         lkb->lkb_sbflags = ms->m_sbflags;
3229         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3230                          (ms->m_flags & 0x0000FFFF);
3231 }
3232
3233 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3234 {
3235         if (ms->m_flags == DLM_IFL_STUB_MS)
3236                 return;
3237
3238         lkb->lkb_sbflags = ms->m_sbflags;
3239         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3240                          (ms->m_flags & 0x0000FFFF);
3241 }
3242
3243 static int receive_extralen(struct dlm_message *ms)
3244 {
3245         return (ms->m_header.h_length - sizeof(struct dlm_message));
3246 }
3247
3248 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3249                        struct dlm_message *ms)
3250 {
3251         int len;
3252
3253         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3254                 if (!lkb->lkb_lvbptr)
3255                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3256                 if (!lkb->lkb_lvbptr)
3257                         return -ENOMEM;
3258                 len = receive_extralen(ms);
3259                 if (len > DLM_RESNAME_MAXLEN)
3260                         len = DLM_RESNAME_MAXLEN;
3261                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3262         }
3263         return 0;
3264 }
3265
3266 static void fake_bastfn(void *astparam, int mode)
3267 {
3268         log_print("fake_bastfn should not be called");
3269 }
3270
3271 static void fake_astfn(void *astparam)
3272 {
3273         log_print("fake_astfn should not be called");
3274 }
3275
3276 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3277                                 struct dlm_message *ms)
3278 {
3279         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3280         lkb->lkb_ownpid = ms->m_pid;
3281         lkb->lkb_remid = ms->m_lkid;
3282         lkb->lkb_grmode = DLM_LOCK_IV;
3283         lkb->lkb_rqmode = ms->m_rqmode;
3284
3285         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3286         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3287
3288         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3289                 /* lkb was just created so there won't be an lvb yet */
3290                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3291                 if (!lkb->lkb_lvbptr)
3292                         return -ENOMEM;
3293         }
3294
3295         return 0;
3296 }
3297
3298 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3299                                 struct dlm_message *ms)
3300 {
3301         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3302                 return -EBUSY;
3303
3304         if (receive_lvb(ls, lkb, ms))
3305                 return -ENOMEM;
3306
3307         lkb->lkb_rqmode = ms->m_rqmode;
3308         lkb->lkb_lvbseq = ms->m_lvbseq;
3309
3310         return 0;
3311 }
3312
3313 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3314                                struct dlm_message *ms)
3315 {
3316         if (receive_lvb(ls, lkb, ms))
3317                 return -ENOMEM;
3318         return 0;
3319 }
3320
3321 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3322    uses to send a reply and that the remote end uses to process the reply. */
3323
3324 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3325 {
3326         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3327         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3328         lkb->lkb_remid = ms->m_lkid;
3329 }
3330
3331 /* This is called after the rsb is locked so that we can safely inspect
3332    fields in the lkb. */
3333
3334 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3335 {
3336         int from = ms->m_header.h_nodeid;
3337         int error = 0;
3338
3339         switch (ms->m_type) {
3340         case DLM_MSG_CONVERT:
3341         case DLM_MSG_UNLOCK:
3342         case DLM_MSG_CANCEL:
3343                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3344                         error = -EINVAL;
3345                 break;
3346
3347         case DLM_MSG_CONVERT_REPLY:
3348         case DLM_MSG_UNLOCK_REPLY:
3349         case DLM_MSG_CANCEL_REPLY:
3350         case DLM_MSG_GRANT:
3351         case DLM_MSG_BAST:
3352                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3353                         error = -EINVAL;
3354                 break;
3355
3356         case DLM_MSG_REQUEST_REPLY:
3357                 if (!is_process_copy(lkb))
3358                         error = -EINVAL;
3359                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3360                         error = -EINVAL;
3361                 break;
3362
3363         default:
3364                 error = -EINVAL;
3365         }
3366
3367         if (error)
3368                 log_error(lkb->lkb_resource->res_ls,
3369                           "ignore invalid message %d from %d %x %x %x %d",
3370                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3371                           lkb->lkb_flags, lkb->lkb_nodeid);
3372         return error;
3373 }
3374
3375 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3376 {
3377         struct dlm_lkb *lkb;
3378         struct dlm_rsb *r;
3379         int error, namelen;
3380
3381         error = create_lkb(ls, &lkb);
3382         if (error)
3383                 goto fail;
3384
3385         receive_flags(lkb, ms);
3386         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3387         error = receive_request_args(ls, lkb, ms);
3388         if (error) {
3389                 __put_lkb(ls, lkb);
3390                 goto fail;
3391         }
3392
3393         namelen = receive_extralen(ms);
3394
3395         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3396         if (error) {
3397                 __put_lkb(ls, lkb);
3398                 goto fail;
3399         }
3400
3401         lock_rsb(r);
3402
3403         attach_lkb(r, lkb);
3404         error = do_request(r, lkb);
3405         send_request_reply(r, lkb, error);
3406         do_request_effects(r, lkb, error);
3407
3408         unlock_rsb(r);
3409         put_rsb(r);
3410
3411         if (error == -EINPROGRESS)
3412                 error = 0;
3413         if (error)
3414                 dlm_put_lkb(lkb);
3415         return;
3416
3417  fail:
3418         setup_stub_lkb(ls, ms);
3419         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3420 }
3421
3422 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3423 {
3424         struct dlm_lkb *lkb;
3425         struct dlm_rsb *r;
3426         int error, reply = 1;
3427
3428         error = find_lkb(ls, ms->m_remid, &lkb);
3429         if (error)
3430                 goto fail;
3431
3432         r = lkb->lkb_resource;
3433
3434         hold_rsb(r);
3435         lock_rsb(r);
3436
3437         error = validate_message(lkb, ms);
3438         if (error)
3439                 goto out;
3440
3441         receive_flags(lkb, ms);
3442
3443         error = receive_convert_args(ls, lkb, ms);
3444         if (error) {
3445                 send_convert_reply(r, lkb, error);
3446                 goto out;
3447         }
3448
3449         reply = !down_conversion(lkb);
3450
3451         error = do_convert(r, lkb);
3452         if (reply)
3453                 send_convert_reply(r, lkb, error);
3454         do_convert_effects(r, lkb, error);
3455  out:
3456         unlock_rsb(r);
3457         put_rsb(r);
3458         dlm_put_lkb(lkb);
3459         return;
3460
3461  fail:
3462         setup_stub_lkb(ls, ms);
3463         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3464 }
3465
3466 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3467 {
3468         struct dlm_lkb *lkb;
3469         struct dlm_rsb *r;
3470         int error;
3471
3472         error = find_lkb(ls, ms->m_remid, &lkb);
3473         if (error)
3474                 goto fail;
3475
3476         r = lkb->lkb_resource;
3477
3478         hold_rsb(r);
3479         lock_rsb(r);
3480
3481         error = validate_message(lkb, ms);
3482         if (error)
3483                 goto out;
3484
3485         receive_flags(lkb, ms);
3486
3487         error = receive_unlock_args(ls, lkb, ms);
3488         if (error) {
3489                 send_unlock_reply(r, lkb, error);
3490                 goto out;
3491         }
3492
3493         error = do_unlock(r, lkb);
3494         send_unlock_reply(r, lkb, error);
3495         do_unlock_effects(r, lkb, error);
3496  out:
3497         unlock_rsb(r);
3498         put_rsb(r);
3499         dlm_put_lkb(lkb);
3500         return;
3501
3502  fail:
3503         setup_stub_lkb(ls, ms);
3504         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3505 }
3506
3507 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3508 {
3509         struct dlm_lkb *lkb;
3510         struct dlm_rsb *r;
3511         int error;
3512
3513         error = find_lkb(ls, ms->m_remid, &lkb);
3514         if (error)
3515                 goto fail;
3516
3517         receive_flags(lkb, ms);
3518
3519         r = lkb->lkb_resource;
3520
3521         hold_rsb(r);
3522         lock_rsb(r);
3523
3524         error = validate_message(lkb, ms);
3525         if (error)
3526                 goto out;
3527
3528         error = do_cancel(r, lkb);
3529         send_cancel_reply(r, lkb, error);
3530         do_cancel_effects(r, lkb, error);
3531  out:
3532         unlock_rsb(r);
3533         put_rsb(r);
3534         dlm_put_lkb(lkb);
3535         return;
3536
3537  fail:
3538         setup_stub_lkb(ls, ms);
3539         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3540 }
3541
3542 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3543 {
3544         struct dlm_lkb *lkb;
3545         struct dlm_rsb *r;
3546         int error;
3547
3548         error = find_lkb(ls, ms->m_remid, &lkb);
3549         if (error) {
3550                 log_debug(ls, "receive_grant from %d no lkb %x",
3551                           ms->m_header.h_nodeid, ms->m_remid);
3552                 return;
3553         }
3554
3555         r = lkb->lkb_resource;
3556
3557         hold_rsb(r);
3558         lock_rsb(r);
3559
3560         error = validate_message(lkb, ms);
3561         if (error)
3562                 goto out;
3563
3564         receive_flags_reply(lkb, ms);
3565         if (is_altmode(lkb))
3566                 munge_altmode(lkb, ms);
3567         grant_lock_pc(r, lkb, ms);
3568         queue_cast(r, lkb, 0);
3569  out:
3570         unlock_rsb(r);
3571         put_rsb(r);
3572         dlm_put_lkb(lkb);
3573 }
3574
3575 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3576 {
3577         struct dlm_lkb *lkb;
3578         struct dlm_rsb *r;
3579         int error;
3580
3581         error = find_lkb(ls, ms->m_remid, &lkb);
3582         if (error) {
3583                 log_debug(ls, "receive_bast from %d no lkb %x",
3584                           ms->m_header.h_nodeid, ms->m_remid);
3585                 return;
3586         }
3587
3588         r = lkb->lkb_resource;
3589
3590         hold_rsb(r);
3591         lock_rsb(r);
3592
3593         error = validate_message(lkb, ms);
3594         if (error)
3595                 goto out;
3596
3597         queue_bast(r, lkb, ms->m_bastmode);
3598  out:
3599         unlock_rsb(r);
3600         put_rsb(r);
3601         dlm_put_lkb(lkb);
3602 }
3603
3604 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3605 {
3606         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3607
3608         from_nodeid = ms->m_header.h_nodeid;
3609         our_nodeid = dlm_our_nodeid();
3610
3611         len = receive_extralen(ms);
3612
3613         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3614         if (dir_nodeid != our_nodeid) {
3615                 log_error(ls, "lookup dir_nodeid %d from %d",
3616                           dir_nodeid, from_nodeid);
3617                 error = -EINVAL;
3618                 ret_nodeid = -1;
3619                 goto out;
3620         }
3621
3622         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3623
3624         /* Optimization: we're master so treat lookup as a request */
3625         if (!error && ret_nodeid == our_nodeid) {
3626                 receive_request(ls, ms);
3627                 return;
3628         }
3629  out:
3630         send_lookup_reply(ls, ms, ret_nodeid, error);
3631 }
3632
3633 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3634 {
3635         int len, dir_nodeid, from_nodeid;
3636
3637         from_nodeid = ms->m_header.h_nodeid;
3638
3639         len = receive_extralen(ms);
3640
3641         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3642         if (dir_nodeid != dlm_our_nodeid()) {
3643                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3644                           dir_nodeid, from_nodeid);
3645                 return;
3646         }
3647
3648         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3649 }
3650
3651 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3652 {
3653         do_purge(ls, ms->m_nodeid, ms->m_pid);
3654 }
3655
3656 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3657 {
3658         struct dlm_lkb *lkb;
3659         struct dlm_rsb *r;
3660         int error, mstype, result;
3661
3662         error = find_lkb(ls, ms->m_remid, &lkb);
3663         if (error) {
3664                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3665                           ms->m_header.h_nodeid, ms->m_remid);
3666                 return;
3667         }
3668
3669         r = lkb->lkb_resource;
3670         hold_rsb(r);
3671         lock_rsb(r);
3672
3673         error = validate_message(lkb, ms);
3674         if (error)
3675                 goto out;
3676
3677         mstype = lkb->lkb_wait_type;
3678         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3679         if (error)
3680                 goto out;
3681
3682         /* Optimization: the dir node was also the master, so it took our
3683            lookup as a request and sent request reply instead of lookup reply */
3684         if (mstype == DLM_MSG_LOOKUP) {
3685                 r->res_nodeid = ms->m_header.h_nodeid;
3686                 lkb->lkb_nodeid = r->res_nodeid;
3687         }
3688
3689         /* this is the value returned from do_request() on the master */
3690         result = ms->m_result;
3691
3692         switch (result) {
3693         case -EAGAIN:
3694                 /* request would block (be queued) on remote master */
3695                 queue_cast(r, lkb, -EAGAIN);
3696                 confirm_master(r, -EAGAIN);
3697                 unhold_lkb(lkb); /* undoes create_lkb() */
3698                 break;
3699
3700         case -EINPROGRESS:
3701         case 0:
3702                 /* request was queued or granted on remote master */
3703                 receive_flags_reply(lkb, ms);
3704                 lkb->lkb_remid = ms->m_lkid;
3705                 if (is_altmode(lkb))
3706                         munge_altmode(lkb, ms);
3707                 if (result) {
3708                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3709                         add_timeout(lkb);
3710                 } else {
3711                         grant_lock_pc(r, lkb, ms);
3712                         queue_cast(r, lkb, 0);
3713                 }
3714                 confirm_master(r, result);
3715                 break;
3716
3717         case -EBADR:
3718         case -ENOTBLK:
3719                 /* find_rsb failed to find rsb or rsb wasn't master */
3720                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3721                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3722                 r->res_nodeid = -1;
3723                 lkb->lkb_nodeid = -1;
3724
3725                 if (is_overlap(lkb)) {
3726                         /* we'll ignore error in cancel/unlock reply */
3727                         queue_cast_overlap(r, lkb);
3728                         confirm_master(r, result);
3729                         unhold_lkb(lkb); /* undoes create_lkb() */
3730                 } else
3731                         _request_lock(r, lkb);
3732                 break;
3733
3734         default:
3735                 log_error(ls, "receive_request_reply %x error %d",
3736                           lkb->lkb_id, result);
3737         }
3738
3739         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3740                 log_debug(ls, "receive_request_reply %x result %d unlock",
3741                           lkb->lkb_id, result);
3742                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3743                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3744                 send_unlock(r, lkb);
3745         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3746                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3747                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3748                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3749                 send_cancel(r, lkb);
3750         } else {
3751                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3752                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3753         }
3754  out:
3755         unlock_rsb(r);
3756         put_rsb(r);
3757         dlm_put_lkb(lkb);
3758 }
3759
3760 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3761                                     struct dlm_message *ms)
3762 {
3763         /* this is the value returned from do_convert() on the master */
3764         switch (ms->m_result) {
3765         case -EAGAIN:
3766                 /* convert would block (be queued) on remote master */
3767                 queue_cast(r, lkb, -EAGAIN);
3768                 break;
3769
3770         case -EDEADLK:
3771                 receive_flags_reply(lkb, ms);
3772                 revert_lock_pc(r, lkb);
3773                 queue_cast(r, lkb, -EDEADLK);
3774                 break;
3775
3776         case -EINPROGRESS:
3777                 /* convert was queued on remote master */
3778                 receive_flags_reply(lkb, ms);
3779                 if (is_demoted(lkb))
3780                         munge_demoted(lkb);
3781                 del_lkb(r, lkb);
3782                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3783                 add_timeout(lkb);
3784                 break;
3785
3786         case 0:
3787                 /* convert was granted on remote master */
3788                 receive_flags_reply(lkb, ms);
3789                 if (is_demoted(lkb))
3790                         munge_demoted(lkb);
3791                 grant_lock_pc(r, lkb, ms);
3792                 queue_cast(r, lkb, 0);
3793                 break;
3794
3795         default:
3796                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3797                           lkb->lkb_id, ms->m_result);
3798         }
3799 }
3800
3801 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3802 {
3803         struct dlm_rsb *r = lkb->lkb_resource;
3804         int error;
3805
3806         hold_rsb(r);
3807         lock_rsb(r);
3808
3809         error = validate_message(lkb, ms);
3810         if (error)
3811                 goto out;
3812
3813         /* stub reply can happen with waiters_mutex held */
3814         error = remove_from_waiters_ms(lkb, ms);
3815         if (error)
3816                 goto out;
3817
3818         __receive_convert_reply(r, lkb, ms);
3819  out:
3820         unlock_rsb(r);
3821         put_rsb(r);
3822 }
3823
3824 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3825 {
3826         struct dlm_lkb *lkb;
3827         int error;
3828
3829         error = find_lkb(ls, ms->m_remid, &lkb);
3830         if (error) {
3831                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3832                           ms->m_header.h_nodeid, ms->m_remid);
3833                 return;
3834         }
3835
3836         _receive_convert_reply(lkb, ms);
3837         dlm_put_lkb(lkb);
3838 }
3839
3840 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3841 {
3842         struct dlm_rsb *r = lkb->lkb_resource;
3843         int error;
3844
3845         hold_rsb(r);
3846         lock_rsb(r);
3847
3848         error = validate_message(lkb, ms);
3849         if (error)
3850                 goto out;
3851
3852         /* stub reply can happen with waiters_mutex held */
3853         error = remove_from_waiters_ms(lkb, ms);
3854         if (error)
3855                 goto out;
3856
3857         /* this is the value returned from do_unlock() on the master */
3858
3859         switch (ms->m_result) {
3860         case -DLM_EUNLOCK:
3861                 receive_flags_reply(lkb, ms);
3862                 remove_lock_pc(r, lkb);
3863                 queue_cast(r, lkb, -DLM_EUNLOCK);
3864                 break;
3865         case -ENOENT:
3866                 break;
3867         default:
3868                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3869                           lkb->lkb_id, ms->m_result);
3870         }
3871  out:
3872         unlock_rsb(r);
3873         put_rsb(r);
3874 }
3875
3876 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3877 {
3878         struct dlm_lkb *lkb;
3879         int error;
3880
3881         error = find_lkb(ls, ms->m_remid, &lkb);
3882         if (error) {
3883                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3884                           ms->m_header.h_nodeid, ms->m_remid);
3885                 return;
3886         }
3887
3888         _receive_unlock_reply(lkb, ms);
3889         dlm_put_lkb(lkb);
3890 }
3891
3892 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3893 {
3894         struct dlm_rsb *r = lkb->lkb_resource;
3895         int error;
3896
3897         hold_rsb(r);
3898         lock_rsb(r);
3899
3900         error = validate_message(lkb, ms);
3901         if (error)
3902                 goto out;
3903
3904         /* stub reply can happen with waiters_mutex held */
3905         error = remove_from_waiters_ms(lkb, ms);
3906         if (error)
3907                 goto out;
3908
3909         /* this is the value returned from do_cancel() on the master */
3910
3911         switch (ms->m_result) {
3912         case -DLM_ECANCEL:
3913                 receive_flags_reply(lkb, ms);
3914                 revert_lock_pc(r, lkb);
3915                 queue_cast(r, lkb, -DLM_ECANCEL);
3916                 break;
3917         case 0:
3918                 break;
3919         default:
3920                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3921                           lkb->lkb_id, ms->m_result);
3922         }
3923  out:
3924         unlock_rsb(r);
3925         put_rsb(r);
3926 }
3927
3928 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3929 {
3930         struct dlm_lkb *lkb;
3931         int error;
3932
3933         error = find_lkb(ls, ms->m_remid, &lkb);
3934         if (error) {
3935                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3936                           ms->m_header.h_nodeid, ms->m_remid);
3937                 return;
3938         }
3939
3940         _receive_cancel_reply(lkb, ms);
3941         dlm_put_lkb(lkb);
3942 }
3943
3944 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3945 {
3946         struct dlm_lkb *lkb;
3947         struct dlm_rsb *r;
3948         int error, ret_nodeid;
3949
3950         error = find_lkb(ls, ms->m_lkid, &lkb);
3951         if (error) {
3952                 log_error(ls, "receive_lookup_reply no lkb");
3953                 return;
3954         }
3955
3956         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3957            FIXME: will a non-zero error ever be returned? */
3958
3959         r = lkb->lkb_resource;
3960         hold_rsb(r);
3961         lock_rsb(r);
3962
3963         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3964         if (error)
3965                 goto out;
3966
3967         ret_nodeid = ms->m_nodeid;
3968         if (ret_nodeid == dlm_our_nodeid()) {
3969                 r->res_nodeid = 0;
3970                 ret_nodeid = 0;
3971                 r->res_first_lkid = 0;
3972         } else {
3973                 /* set_master() will copy res_nodeid to lkb_nodeid */
3974                 r->res_nodeid = ret_nodeid;
3975         }
3976
3977         if (is_overlap(lkb)) {
3978                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3979                           lkb->lkb_id, lkb->lkb_flags);
3980                 queue_cast_overlap(r, lkb);
3981                 unhold_lkb(lkb); /* undoes create_lkb() */
3982                 goto out_list;
3983         }
3984
3985         _request_lock(r, lkb);
3986
3987  out_list:
3988         if (!ret_nodeid)
3989                 process_lookup_list(r);
3990  out:
3991         unlock_rsb(r);
3992         put_rsb(r);
3993         dlm_put_lkb(lkb);
3994 }
3995
3996 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3997 {
3998         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3999                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4000                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4001                           ms->m_remid, ms->m_result);
4002                 return;
4003         }
4004
4005         switch (ms->m_type) {
4006
4007         /* messages sent to a master node */
4008
4009         case DLM_MSG_REQUEST:
4010                 receive_request(ls, ms);
4011                 break;
4012
4013         case DLM_MSG_CONVERT:
4014                 receive_convert(ls, ms);
4015                 break;
4016
4017         case DLM_MSG_UNLOCK:
4018                 receive_unlock(ls, ms);
4019                 break;
4020
4021         case DLM_MSG_CANCEL:
4022                 receive_cancel(ls, ms);
4023                 break;
4024
4025         /* messages sent from a master node (replies to above) */
4026
4027         case DLM_MSG_REQUEST_REPLY:
4028                 receive_request_reply(ls, ms);
4029                 break;
4030
4031         case DLM_MSG_CONVERT_REPLY:
4032                 receive_convert_reply(ls, ms);
4033                 break;
4034
4035         case DLM_MSG_UNLOCK_REPLY:
4036                 receive_unlock_reply(ls, ms);
4037                 break;
4038
4039         case DLM_MSG_CANCEL_REPLY:
4040                 receive_cancel_reply(ls, ms);
4041                 break;
4042
4043         /* messages sent from a master node (only two types of async msg) */
4044
4045         case DLM_MSG_GRANT:
4046                 receive_grant(ls, ms);
4047                 break;
4048
4049         case DLM_MSG_BAST:
4050                 receive_bast(ls, ms);
4051                 break;
4052
4053         /* messages sent to a dir node */
4054
4055         case DLM_MSG_LOOKUP:
4056                 receive_lookup(ls, ms);
4057                 break;
4058
4059         case DLM_MSG_REMOVE:
4060                 receive_remove(ls, ms);
4061                 break;
4062
4063         /* messages sent from a dir node (remove has no reply) */
4064
4065         case DLM_MSG_LOOKUP_REPLY:
4066                 receive_lookup_reply(ls, ms);
4067                 break;
4068
4069         /* other messages */
4070
4071         case DLM_MSG_PURGE:
4072                 receive_purge(ls, ms);
4073                 break;
4074
4075         default:
4076                 log_error(ls, "unknown message type %d", ms->m_type);
4077         }
4078 }
4079
4080 /* If the lockspace is in recovery mode (locking stopped), then normal
4081    messages are saved on the requestqueue for processing after recovery is
4082    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4083    messages off the requestqueue before we process new ones. This occurs right
4084    after recovery completes when we transition from saving all messages on
4085    requestqueue, to processing all the saved messages, to processing new
4086    messages as they arrive. */
4087
4088 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4089                                 int nodeid)
4090 {
4091         if (dlm_locking_stopped(ls)) {
4092                 dlm_add_requestqueue(ls, nodeid, ms);
4093         } else {
4094                 dlm_wait_requestqueue(ls);
4095                 _receive_message(ls, ms);
4096         }
4097 }
4098
4099 /* This is called by dlm_recoverd to process messages that were saved on
4100    the requestqueue. */
4101
4102 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4103 {
4104         _receive_message(ls, ms);
4105 }
4106
4107 /* This is called by the midcomms layer when something is received for
4108    the lockspace.  It could be either a MSG (normal message sent as part of
4109    standard locking activity) or an RCOM (recovery message sent as part of
4110    lockspace recovery). */
4111
4112 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4113 {
4114         struct dlm_header *hd = &p->header;
4115         struct dlm_ls *ls;
4116         int type = 0;
4117
4118         switch (hd->h_cmd) {
4119         case DLM_MSG:
4120                 dlm_message_in(&p->message);
4121                 type = p->message.m_type;
4122                 break;
4123         case DLM_RCOM:
4124                 dlm_rcom_in(&p->rcom);
4125                 type = p->rcom.rc_type;
4126                 break;
4127         default:
4128                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4129                 return;
4130         }
4131
4132         if (hd->h_nodeid != nodeid) {
4133                 log_print("invalid h_nodeid %d from %d lockspace %x",
4134                           hd->h_nodeid, nodeid, hd->h_lockspace);
4135                 return;
4136         }
4137
4138         ls = dlm_find_lockspace_global(hd->h_lockspace);
4139         if (!ls) {
4140                 if (dlm_config.ci_log_debug)
4141                         log_print("invalid lockspace %x from %d cmd %d type %d",
4142                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
4143
4144                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4145                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4146                 return;
4147         }
4148
4149         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4150            be inactive (in this ls) before transitioning to recovery mode */
4151
4152         down_read(&ls->ls_recv_active);
4153         if (hd->h_cmd == DLM_MSG)
4154                 dlm_receive_message(ls, &p->message, nodeid);
4155         else
4156                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4157         up_read(&ls->ls_recv_active);
4158
4159         dlm_put_lockspace(ls);
4160 }
4161
4162 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4163                                    struct dlm_message *ms_stub)
4164 {
4165         if (middle_conversion(lkb)) {
4166                 hold_lkb(lkb);
4167                 memset(ms_stub, 0, sizeof(struct dlm_message));
4168                 ms_stub->m_flags = DLM_IFL_STUB_MS;
4169                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4170                 ms_stub->m_result = -EINPROGRESS;
4171                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4172                 _receive_convert_reply(lkb, ms_stub);
4173
4174                 /* Same special case as in receive_rcom_lock_args() */
4175                 lkb->lkb_grmode = DLM_LOCK_IV;
4176                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4177                 unhold_lkb(lkb);
4178
4179         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4180                 lkb->lkb_flags |= DLM_IFL_RESEND;
4181         }
4182
4183         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4184            conversions are async; there's no reply from the remote master */
4185 }
4186
4187 /* A waiting lkb needs recovery if the master node has failed, or
4188    the master node is changing (only when no directory is used) */
4189
4190 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4191 {
4192         if (dlm_is_removed(ls, lkb->lkb_nodeid))
4193                 return 1;
4194
4195         if (!dlm_no_directory(ls))
4196                 return 0;
4197
4198         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4199                 return 1;
4200
4201         return 0;
4202 }
4203
4204 /* Recovery for locks that are waiting for replies from nodes that are now
4205    gone.  We can just complete unlocks and cancels by faking a reply from the
4206    dead node.  Requests and up-conversions we flag to be resent after
4207    recovery.  Down-conversions can just be completed with a fake reply like
4208    unlocks.  Conversions between PR and CW need special attention. */
4209
4210 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4211 {
4212         struct dlm_lkb *lkb, *safe;
4213         struct dlm_message *ms_stub;
4214         int wait_type, stub_unlock_result, stub_cancel_result;
4215
4216         ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4217         if (!ms_stub) {
4218                 log_error(ls, "dlm_recover_waiters_pre no mem");
4219                 return;
4220         }
4221
4222         mutex_lock(&ls->ls_waiters_mutex);
4223
4224         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4225
4226                 /* exclude debug messages about unlocks because there can be so
4227                    many and they aren't very interesting */
4228
4229                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4230                         log_debug(ls, "recover_waiter %x nodeid %d "
4231                                   "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4232                                   lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4233                 }
4234
4235                 /* all outstanding lookups, regardless of destination  will be
4236                    resent after recovery is done */
4237
4238                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4239                         lkb->lkb_flags |= DLM_IFL_RESEND;
4240                         continue;
4241                 }
4242
4243                 if (!waiter_needs_recovery(ls, lkb))
4244                         continue;
4245
4246                 wait_type = lkb->lkb_wait_type;
4247                 stub_unlock_result = -DLM_EUNLOCK;
4248                 stub_cancel_result = -DLM_ECANCEL;
4249
4250                 /* Main reply may have been received leaving a zero wait_type,
4251                    but a reply for the overlapping op may not have been
4252                    received.  In that case we need to fake the appropriate
4253                    reply for the overlap op. */
4254
4255                 if (!wait_type) {
4256                         if (is_overlap_cancel(lkb)) {
4257                                 wait_type = DLM_MSG_CANCEL;
4258                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4259                                         stub_cancel_result = 0;
4260                         }
4261                         if (is_overlap_unlock(lkb)) {
4262                                 wait_type = DLM_MSG_UNLOCK;
4263                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4264                                         stub_unlock_result = -ENOENT;
4265                         }
4266
4267                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4268                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4269                                   stub_cancel_result, stub_unlock_result);
4270                 }
4271
4272                 switch (wait_type) {
4273
4274                 case DLM_MSG_REQUEST:
4275                         lkb->lkb_flags |= DLM_IFL_RESEND;
4276                         break;
4277
4278                 case DLM_MSG_CONVERT:
4279                         recover_convert_waiter(ls, lkb, ms_stub);
4280                         break;
4281
4282                 case DLM_MSG_UNLOCK:
4283                         hold_lkb(lkb);
4284                         memset(ms_stub, 0, sizeof(struct dlm_message));
4285                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4286                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4287                         ms_stub->m_result = stub_unlock_result;
4288                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4289                         _receive_unlock_reply(lkb, ms_stub);
4290                         dlm_put_lkb(lkb);
4291                         break;
4292
4293                 case DLM_MSG_CANCEL:
4294                         hold_lkb(lkb);
4295                         memset(ms_stub, 0, sizeof(struct dlm_message));
4296                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4297                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4298                         ms_stub->m_result = stub_cancel_result;
4299                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4300                         _receive_cancel_reply(lkb, ms_stub);
4301                         dlm_put_lkb(lkb);
4302                         break;
4303
4304                 default:
4305                         log_error(ls, "invalid lkb wait_type %d %d",
4306                                   lkb->lkb_wait_type, wait_type);
4307                 }
4308                 schedule();
4309         }
4310         mutex_unlock(&ls->ls_waiters_mutex);
4311         kfree(ms_stub);
4312 }
4313
4314 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4315 {
4316         struct dlm_lkb *lkb;
4317         int found = 0;
4318
4319         mutex_lock(&ls->ls_waiters_mutex);
4320         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4321                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4322                         hold_lkb(lkb);
4323                         found = 1;
4324                         break;
4325                 }
4326         }
4327         mutex_unlock(&ls->ls_waiters_mutex);
4328
4329         if (!found)
4330                 lkb = NULL;
4331         return lkb;
4332 }
4333
4334 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4335    master or dir-node for r.  Processing the lkb may result in it being placed
4336    back on waiters. */
4337
4338 /* We do this after normal locking has been enabled and any saved messages
4339    (in requestqueue) have been processed.  We should be confident that at
4340    this point we won't get or process a reply to any of these waiting
4341    operations.  But, new ops may be coming in on the rsbs/locks here from
4342    userspace or remotely. */
4343
4344 /* there may have been an overlap unlock/cancel prior to recovery or after
4345    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4346    overlap flag would just have been set and nothing new sent.  we can be
4347    confident here than any replies to either the initial op or overlap ops
4348    prior to recovery have been received. */
4349
4350 int dlm_recover_waiters_post(struct dlm_ls *ls)
4351 {
4352         struct dlm_lkb *lkb;
4353         struct dlm_rsb *r;
4354         int error = 0, mstype, err, oc, ou;
4355
4356         while (1) {
4357                 if (dlm_locking_stopped(ls)) {
4358                         log_debug(ls, "recover_waiters_post aborted");
4359                         error = -EINTR;
4360                         break;
4361                 }
4362
4363                 lkb = find_resend_waiter(ls);
4364                 if (!lkb)
4365                         break;
4366
4367                 r = lkb->lkb_resource;
4368                 hold_rsb(r);
4369                 lock_rsb(r);
4370
4371                 mstype = lkb->lkb_wait_type;
4372                 oc = is_overlap_cancel(lkb);
4373                 ou = is_overlap_unlock(lkb);
4374                 err = 0;
4375
4376                 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4377                           lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4378
4379                 /* At this point we assume that we won't get a reply to any
4380                    previous op or overlap op on this lock.  First, do a big
4381                    remove_from_waiters() for all previous ops. */
4382
4383                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4384                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4385                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4386                 lkb->lkb_wait_type = 0;
4387                 lkb->lkb_wait_count = 0;
4388                 mutex_lock(&ls->ls_waiters_mutex);
4389                 list_del_init(&lkb->lkb_wait_reply);
4390                 mutex_unlock(&ls->ls_waiters_mutex);
4391                 unhold_lkb(lkb); /* for waiters list */
4392
4393                 if (oc || ou) {
4394                         /* do an unlock or cancel instead of resending */
4395                         switch (mstype) {
4396                         case DLM_MSG_LOOKUP:
4397                         case DLM_MSG_REQUEST:
4398                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4399                                                         -DLM_ECANCEL);
4400                                 unhold_lkb(lkb); /* undoes create_lkb() */
4401                                 break;
4402                         case DLM_MSG_CONVERT:
4403                                 if (oc) {
4404                                         queue_cast(r, lkb, -DLM_ECANCEL);
4405                                 } else {
4406                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4407                                         _unlock_lock(r, lkb);
4408                                 }
4409                                 break;
4410                         default:
4411                                 err = 1;
4412                         }
4413                 } else {
4414                         switch (mstype) {
4415                         case DLM_MSG_LOOKUP:
4416                         case DLM_MSG_REQUEST:
4417                                 _request_lock(r, lkb);
4418                                 if (is_master(r))
4419                                         confirm_master(r, 0);
4420                                 break;
4421                         case DLM_MSG_CONVERT:
4422                                 _convert_lock(r, lkb);
4423                                 break;
4424                         default:
4425                                 err = 1;
4426                         }
4427                 }
4428
4429                 if (err)
4430                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4431                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4432                 unlock_rsb(r);
4433                 put_rsb(r);
4434                 dlm_put_lkb(lkb);
4435         }
4436
4437         return error;
4438 }
4439
4440 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4441                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4442 {
4443         struct dlm_ls *ls = r->res_ls;
4444         struct dlm_lkb *lkb, *safe;
4445
4446         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4447                 if (test(ls, lkb)) {
4448                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4449                         del_lkb(r, lkb);
4450                         /* this put should free the lkb */
4451                         if (!dlm_put_lkb(lkb))
4452                                 log_error(ls, "purged lkb not released");
4453                 }
4454         }
4455 }
4456
4457 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4458 {
4459         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4460 }
4461
4462 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4463 {
4464         return is_master_copy(lkb);
4465 }
4466
4467 static void purge_dead_locks(struct dlm_rsb *r)
4468 {
4469         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4470         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4471         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4472 }
4473
4474 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4475 {
4476         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4477         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4478         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4479 }
4480
4481 /* Get rid of locks held by nodes that are gone. */
4482
4483 int dlm_purge_locks(struct dlm_ls *ls)
4484 {
4485         struct dlm_rsb *r;
4486
4487         log_debug(ls, "dlm_purge_locks");
4488
4489         down_write(&ls->ls_root_sem);
4490         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4491                 hold_rsb(r);
4492                 lock_rsb(r);
4493                 if (is_master(r))
4494                         purge_dead_locks(r);
4495                 unlock_rsb(r);
4496                 unhold_rsb(r);
4497
4498                 schedule();
4499         }
4500         up_write(&ls->ls_root_sem);
4501
4502         return 0;
4503 }
4504
4505 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4506 {
4507         struct rb_node *n;
4508         struct dlm_rsb *r, *r_ret = NULL;
4509
4510         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4511         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4512                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4513                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4514                         continue;
4515                 hold_rsb(r);
4516                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4517                 r_ret = r;
4518                 break;
4519         }
4520         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4521         return r_ret;
4522 }
4523
4524 void dlm_grant_after_purge(struct dlm_ls *ls)
4525 {
4526         struct dlm_rsb *r;
4527         int bucket = 0;
4528
4529         while (1) {
4530                 r = find_purged_rsb(ls, bucket);
4531                 if (!r) {
4532                         if (bucket == ls->ls_rsbtbl_size - 1)
4533                                 break;
4534                         bucket++;
4535                         continue;
4536                 }
4537                 lock_rsb(r);
4538                 if (is_master(r)) {
4539                         grant_pending_locks(r);
4540                         confirm_master(r, 0);
4541                 }
4542                 unlock_rsb(r);
4543                 put_rsb(r);
4544                 schedule();
4545         }
4546 }
4547
4548 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4549                                          uint32_t remid)
4550 {
4551         struct dlm_lkb *lkb;
4552
4553         list_for_each_entry(lkb, head, lkb_statequeue) {
4554                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4555                         return lkb;
4556         }
4557         return NULL;
4558 }
4559
4560 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4561                                     uint32_t remid)
4562 {
4563         struct dlm_lkb *lkb;
4564
4565         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4566         if (lkb)
4567                 return lkb;
4568         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4569         if (lkb)
4570                 return lkb;
4571         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4572         if (lkb)
4573                 return lkb;
4574         return NULL;
4575 }
4576
4577 /* needs at least dlm_rcom + rcom_lock */
4578 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4579                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4580 {
4581         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4582
4583         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4584         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4585         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4586         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4587         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4588         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4589         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4590         lkb->lkb_rqmode = rl->rl_rqmode;
4591         lkb->lkb_grmode = rl->rl_grmode;
4592         /* don't set lkb_status because add_lkb wants to itself */
4593
4594         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4595         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4596
4597         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4598                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4599                          sizeof(struct rcom_lock);
4600                 if (lvblen > ls->ls_lvblen)
4601                         return -EINVAL;
4602                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4603                 if (!lkb->lkb_lvbptr)
4604                         return -ENOMEM;
4605                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4606         }
4607
4608         /* Conversions between PR and CW (middle modes) need special handling.
4609            The real granted mode of these converting locks cannot be determined
4610            until all locks have been rebuilt on the rsb (recover_conversion) */
4611
4612         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4613             middle_conversion(lkb)) {
4614                 rl->rl_status = DLM_LKSTS_CONVERT;
4615                 lkb->lkb_grmode = DLM_LOCK_IV;
4616                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4617         }
4618
4619         return 0;
4620 }
4621
4622 /* This lkb may have been recovered in a previous aborted recovery so we need
4623    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4624    If so we just send back a standard reply.  If not, we create a new lkb with
4625    the given values and send back our lkid.  We send back our lkid by sending
4626    back the rcom_lock struct we got but with the remid field filled in. */
4627
4628 /* needs at least dlm_rcom + rcom_lock */
4629 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4630 {
4631         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4632         struct dlm_rsb *r;
4633         struct dlm_lkb *lkb;
4634         int error;
4635
4636         if (rl->rl_parent_lkid) {
4637                 error = -EOPNOTSUPP;
4638                 goto out;
4639         }
4640
4641         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4642                          R_MASTER, &r);
4643         if (error)
4644                 goto out;
4645
4646         lock_rsb(r);
4647
4648         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4649         if (lkb) {
4650                 error = -EEXIST;
4651                 goto out_remid;
4652         }
4653
4654         error = create_lkb(ls, &lkb);
4655         if (error)
4656                 goto out_unlock;
4657
4658         error = receive_rcom_lock_args(ls, lkb, r, rc);
4659         if (error) {
4660                 __put_lkb(ls, lkb);
4661                 goto out_unlock;
4662         }
4663
4664         attach_lkb(r, lkb);
4665         add_lkb(r, lkb, rl->rl_status);
4666         error = 0;
4667
4668  out_remid:
4669         /* this is the new value returned to the lock holder for
4670            saving in its process-copy lkb */
4671         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4672
4673  out_unlock:
4674         unlock_rsb(r);
4675         put_rsb(r);
4676  out:
4677         if (error)
4678                 log_debug(ls, "recover_master_copy %d %x", error,
4679                           le32_to_cpu(rl->rl_lkid));
4680         rl->rl_result = cpu_to_le32(error);
4681         return error;
4682 }
4683
4684 /* needs at least dlm_rcom + rcom_lock */
4685 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4686 {
4687         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4688         struct dlm_rsb *r;
4689         struct dlm_lkb *lkb;
4690         int error;
4691
4692         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4693         if (error) {
4694                 log_error(ls, "recover_process_copy no lkid %x",
4695                                 le32_to_cpu(rl->rl_lkid));
4696                 return error;
4697         }
4698
4699         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4700
4701         error = le32_to_cpu(rl->rl_result);
4702
4703         r = lkb->lkb_resource;
4704         hold_rsb(r);
4705         lock_rsb(r);
4706
4707         switch (error) {
4708         case -EBADR:
4709                 /* There's a chance the new master received our lock before
4710                    dlm_recover_master_reply(), this wouldn't happen if we did
4711                    a barrier between recover_masters and recover_locks. */
4712                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4713                           (unsigned long)r, r->res_name);
4714                 dlm_send_rcom_lock(r, lkb);
4715                 goto out;
4716         case -EEXIST:
4717                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4718                 /* fall through */
4719         case 0:
4720                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4721                 break;
4722         default:
4723                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4724                           error, lkb->lkb_id);
4725         }
4726
4727         /* an ack for dlm_recover_locks() which waits for replies from
4728            all the locks it sends to new masters */
4729         dlm_recovered_lock(r);
4730  out:
4731         unlock_rsb(r);
4732         put_rsb(r);
4733         dlm_put_lkb(lkb);
4734
4735         return 0;
4736 }
4737
4738 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4739                      int mode, uint32_t flags, void *name, unsigned int namelen,
4740                      unsigned long timeout_cs)
4741 {
4742         struct dlm_lkb *lkb;
4743         struct dlm_args args;
4744         int error;
4745
4746         dlm_lock_recovery(ls);
4747
4748         error = create_lkb(ls, &lkb);
4749         if (error) {
4750                 kfree(ua);
4751                 goto out;
4752         }
4753
4754         if (flags & DLM_LKF_VALBLK) {
4755                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4756                 if (!ua->lksb.sb_lvbptr) {
4757                         kfree(ua);
4758                         __put_lkb(ls, lkb);
4759                         error = -ENOMEM;
4760                         goto out;
4761                 }
4762         }
4763
4764         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4765            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4766            lock and that lkb_astparam is the dlm_user_args structure. */
4767
4768         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4769                               fake_astfn, ua, fake_bastfn, &args);
4770         lkb->lkb_flags |= DLM_IFL_USER;
4771
4772         if (error) {
4773                 __put_lkb(ls, lkb);
4774                 goto out;
4775         }
4776
4777         error = request_lock(ls, lkb, name, namelen, &args);
4778
4779         switch (error) {
4780         case 0:
4781                 break;
4782         case -EINPROGRESS:
4783                 error = 0;
4784                 break;
4785         case -EAGAIN:
4786                 error = 0;
4787                 /* fall through */
4788         default:
4789                 __put_lkb(ls, lkb);
4790                 goto out;
4791         }
4792
4793         /* add this new lkb to the per-process list of locks */
4794         spin_lock(&ua->proc->locks_spin);
4795         hold_lkb(lkb);
4796         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4797         spin_unlock(&ua->proc->locks_spin);
4798  out:
4799         dlm_unlock_recovery(ls);
4800         return error;
4801 }
4802
4803 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4804                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4805                      unsigned long timeout_cs)
4806 {
4807         struct dlm_lkb *lkb;
4808         struct dlm_args args;
4809         struct dlm_user_args *ua;
4810         int error;
4811
4812         dlm_lock_recovery(ls);
4813
4814         error = find_lkb(ls, lkid, &lkb);
4815         if (error)
4816                 goto out;
4817
4818         /* user can change the params on its lock when it converts it, or
4819            add an lvb that didn't exist before */
4820
4821         ua = lkb->lkb_ua;
4822
4823         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4824                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4825                 if (!ua->lksb.sb_lvbptr) {
4826                         error = -ENOMEM;
4827                         goto out_put;
4828                 }
4829         }
4830         if (lvb_in && ua->lksb.sb_lvbptr)
4831                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4832
4833         ua->xid = ua_tmp->xid;
4834         ua->castparam = ua_tmp->castparam;
4835         ua->castaddr = ua_tmp->castaddr;
4836         ua->bastparam = ua_tmp->bastparam;
4837         ua->bastaddr = ua_tmp->bastaddr;
4838         ua->user_lksb = ua_tmp->user_lksb;
4839
4840         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4841                               fake_astfn, ua, fake_bastfn, &args);
4842         if (error)
4843                 goto out_put;
4844
4845         error = convert_lock(ls, lkb, &args);
4846
4847         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4848                 error = 0;
4849  out_put:
4850         dlm_put_lkb(lkb);
4851  out:
4852         dlm_unlock_recovery(ls);
4853         kfree(ua_tmp);
4854         return error;
4855 }
4856
4857 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4858                     uint32_t flags, uint32_t lkid, char *lvb_in)
4859 {
4860         struct dlm_lkb *lkb;
4861         struct dlm_args args;
4862         struct dlm_user_args *ua;
4863         int error;
4864
4865         dlm_lock_recovery(ls);
4866
4867         error = find_lkb(ls, lkid, &lkb);
4868         if (error)
4869                 goto out;
4870
4871         ua = lkb->lkb_ua;
4872
4873         if (lvb_in && ua->lksb.sb_lvbptr)
4874                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4875         if (ua_tmp->castparam)
4876                 ua->castparam = ua_tmp->castparam;
4877         ua->user_lksb = ua_tmp->user_lksb;
4878
4879         error = set_unlock_args(flags, ua, &args);
4880         if (error)
4881                 goto out_put;
4882
4883         error = unlock_lock(ls, lkb, &args);
4884
4885         if (error == -DLM_EUNLOCK)
4886                 error = 0;
4887         /* from validate_unlock_args() */
4888         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4889                 error = 0;
4890         if (error)
4891                 goto out_put;
4892
4893         spin_lock(&ua->proc->locks_spin);
4894         /* dlm_user_add_cb() may have already taken lkb off the proc list */
4895         if (!list_empty(&lkb->lkb_ownqueue))
4896                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4897         spin_unlock(&ua->proc->locks_spin);
4898  out_put:
4899         dlm_put_lkb(lkb);
4900  out:
4901         dlm_unlock_recovery(ls);
4902         kfree(ua_tmp);
4903         return error;
4904 }
4905
4906 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4907                     uint32_t flags, uint32_t lkid)
4908 {
4909         struct dlm_lkb *lkb;
4910         struct dlm_args args;
4911         struct dlm_user_args *ua;
4912         int error;
4913
4914         dlm_lock_recovery(ls);
4915
4916         error = find_lkb(ls, lkid, &lkb);
4917         if (error)
4918                 goto out;
4919
4920         ua = lkb->lkb_ua;
4921         if (ua_tmp->castparam)
4922                 ua->castparam = ua_tmp->castparam;
4923         ua->user_lksb = ua_tmp->user_lksb;
4924
4925         error = set_unlock_args(flags, ua, &args);
4926         if (error)
4927                 goto out_put;
4928
4929         error = cancel_lock(ls, lkb, &args);
4930
4931         if (error == -DLM_ECANCEL)
4932                 error = 0;
4933         /* from validate_unlock_args() */
4934         if (error == -EBUSY)
4935                 error = 0;
4936  out_put:
4937         dlm_put_lkb(lkb);
4938  out:
4939         dlm_unlock_recovery(ls);
4940         kfree(ua_tmp);
4941         return error;
4942 }
4943
4944 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4945 {
4946         struct dlm_lkb *lkb;
4947         struct dlm_args args;
4948         struct dlm_user_args *ua;
4949         struct dlm_rsb *r;
4950         int error;
4951
4952         dlm_lock_recovery(ls);
4953
4954         error = find_lkb(ls, lkid, &lkb);
4955         if (error)
4956                 goto out;
4957
4958         ua = lkb->lkb_ua;
4959
4960         error = set_unlock_args(flags, ua, &args);
4961         if (error)
4962                 goto out_put;
4963
4964         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4965
4966         r = lkb->lkb_resource;
4967         hold_rsb(r);
4968         lock_rsb(r);
4969
4970         error = validate_unlock_args(lkb, &args);
4971         if (error)
4972                 goto out_r;
4973         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4974
4975         error = _cancel_lock(r, lkb);
4976  out_r:
4977         unlock_rsb(r);
4978         put_rsb(r);
4979
4980         if (error == -DLM_ECANCEL)
4981                 error = 0;
4982         /* from validate_unlock_args() */
4983         if (error == -EBUSY)
4984                 error = 0;
4985  out_put:
4986         dlm_put_lkb(lkb);
4987  out:
4988         dlm_unlock_recovery(ls);
4989         return error;
4990 }
4991
4992 /* lkb's that are removed from the waiters list by revert are just left on the
4993    orphans list with the granted orphan locks, to be freed by purge */
4994
4995 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4996 {
4997         struct dlm_args args;
4998         int error;
4999
5000         hold_lkb(lkb);
5001         mutex_lock(&ls->ls_orphans_mutex);
5002         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5003         mutex_unlock(&ls->ls_orphans_mutex);
5004
5005         set_unlock_args(0, lkb->lkb_ua, &args);
5006
5007         error = cancel_lock(ls, lkb, &args);
5008         if (error == -DLM_ECANCEL)
5009                 error = 0;
5010         return error;
5011 }
5012
5013 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5014    Regardless of what rsb queue the lock is on, it's removed and freed. */
5015
5016 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5017 {
5018         struct dlm_args args;
5019         int error;
5020
5021         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5022
5023         error = unlock_lock(ls, lkb, &args);
5024         if (error == -DLM_EUNLOCK)
5025                 error = 0;
5026         return error;
5027 }
5028
5029 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5030    (which does lock_rsb) due to deadlock with receiving a message that does
5031    lock_rsb followed by dlm_user_add_cb() */
5032
5033 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5034                                      struct dlm_user_proc *proc)
5035 {
5036         struct dlm_lkb *lkb = NULL;
5037
5038         mutex_lock(&ls->ls_clear_proc_locks);
5039         if (list_empty(&proc->locks))
5040                 goto out;
5041
5042         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5043         list_del_init(&lkb->lkb_ownqueue);
5044
5045         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5046                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
5047         else
5048                 lkb->lkb_flags |= DLM_IFL_DEAD;
5049  out:
5050         mutex_unlock(&ls->ls_clear_proc_locks);
5051         return lkb;
5052 }
5053
5054 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5055    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5056    which we clear here. */
5057
5058 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5059    list, and no more device_writes should add lkb's to proc->locks list; so we
5060    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5061    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5062    them ourself. */
5063
5064 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5065 {
5066         struct dlm_lkb *lkb, *safe;
5067
5068         dlm_lock_recovery(ls);
5069
5070         while (1) {
5071                 lkb = del_proc_lock(ls, proc);
5072                 if (!lkb)
5073                         break;
5074                 del_timeout(lkb);
5075                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5076                         orphan_proc_lock(ls, lkb);
5077                 else
5078                         unlock_proc_lock(ls, lkb);
5079
5080                 /* this removes the reference for the proc->locks list
5081                    added by dlm_user_request, it may result in the lkb
5082                    being freed */
5083
5084                 dlm_put_lkb(lkb);
5085         }
5086
5087         mutex_lock(&ls->ls_clear_proc_locks);
5088
5089         /* in-progress unlocks */
5090         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5091                 list_del_init(&lkb->lkb_ownqueue);
5092                 lkb->lkb_flags |= DLM_IFL_DEAD;
5093                 dlm_put_lkb(lkb);
5094         }
5095
5096         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5097                 memset(&lkb->lkb_callbacks, 0,
5098                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5099                 list_del_init(&lkb->lkb_cb_list);
5100                 dlm_put_lkb(lkb);
5101         }
5102
5103         mutex_unlock(&ls->ls_clear_proc_locks);
5104         dlm_unlock_recovery(ls);
5105 }
5106
5107 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5108 {
5109         struct dlm_lkb *lkb, *safe;
5110
5111         while (1) {
5112                 lkb = NULL;
5113                 spin_lock(&proc->locks_spin);
5114                 if (!list_empty(&proc->locks)) {
5115                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
5116                                          lkb_ownqueue);
5117                         list_del_init(&lkb->lkb_ownqueue);
5118                 }
5119                 spin_unlock(&proc->locks_spin);
5120
5121                 if (!lkb)
5122                         break;
5123
5124                 lkb->lkb_flags |= DLM_IFL_DEAD;
5125                 unlock_proc_lock(ls, lkb);
5126                 dlm_put_lkb(lkb); /* ref from proc->locks list */
5127         }
5128
5129         spin_lock(&proc->locks_spin);
5130         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5131                 list_del_init(&lkb->lkb_ownqueue);
5132                 lkb->lkb_flags |= DLM_IFL_DEAD;
5133                 dlm_put_lkb(lkb);
5134         }
5135         spin_unlock(&proc->locks_spin);
5136
5137         spin_lock(&proc->asts_spin);
5138         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5139                 memset(&lkb->lkb_callbacks, 0,
5140                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5141                 list_del_init(&lkb->lkb_cb_list);
5142                 dlm_put_lkb(lkb);
5143         }
5144         spin_unlock(&proc->asts_spin);
5145 }
5146
5147 /* pid of 0 means purge all orphans */
5148
5149 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5150 {
5151         struct dlm_lkb *lkb, *safe;
5152
5153         mutex_lock(&ls->ls_orphans_mutex);
5154         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5155                 if (pid && lkb->lkb_ownpid != pid)
5156                         continue;
5157                 unlock_proc_lock(ls, lkb);
5158                 list_del_init(&lkb->lkb_ownqueue);
5159                 dlm_put_lkb(lkb);
5160         }
5161         mutex_unlock(&ls->ls_orphans_mutex);
5162 }
5163
5164 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5165 {
5166         struct dlm_message *ms;
5167         struct dlm_mhandle *mh;
5168         int error;
5169
5170         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5171                                 DLM_MSG_PURGE, &ms, &mh);
5172         if (error)
5173                 return error;
5174         ms->m_nodeid = nodeid;
5175         ms->m_pid = pid;
5176
5177         return send_message(mh, ms);
5178 }
5179
5180 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5181                    int nodeid, int pid)
5182 {
5183         int error = 0;
5184
5185         if (nodeid != dlm_our_nodeid()) {
5186                 error = send_purge(ls, nodeid, pid);
5187         } else {
5188                 dlm_lock_recovery(ls);
5189                 if (pid == current->pid)
5190                         purge_proc_locks(ls, proc);
5191                 else
5192                         do_purge(ls, nodeid, pid);
5193                 dlm_unlock_recovery(ls);
5194         }
5195         return error;
5196 }
5197