[DLM] block scand during recovery [1/6]
[platform/adaptation/renesas_rcar/renesas_kernel.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86                                     struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
89
90 /*
91  * Lock compatibilty matrix - thanks Steve
92  * UN = Unlocked state. Not really a state, used as a flag
93  * PD = Padding. Used to make the matrix a nice power of two in size
94  * Other states are the same as the VMS DLM.
95  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
96  */
97
98 static const int __dlm_compat_matrix[8][8] = {
99       /* UN NL CR CW PR PW EX PD */
100         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
101         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
102         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
103         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
104         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
105         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
106         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
107         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
108 };
109
110 /*
111  * This defines the direction of transfer of LVB data.
112  * Granted mode is the row; requested mode is the column.
113  * Usage: matrix[grmode+1][rqmode+1]
114  * 1 = LVB is returned to the caller
115  * 0 = LVB is written to the resource
116  * -1 = nothing happens to the LVB
117  */
118
119 const int dlm_lvb_operations[8][8] = {
120         /* UN   NL  CR  CW  PR  PW  EX  PD*/
121         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
122         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
123         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
124         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
125         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
126         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
127         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
128         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
129 };
130
131 #define modes_compat(gr, rq) \
132         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133
134 int dlm_modes_compat(int mode1, int mode2)
135 {
136         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137 }
138
139 /*
140  * Compatibility matrix for conversions with QUECVT set.
141  * Granted mode is the row; requested mode is the column.
142  * Usage: matrix[grmode+1][rqmode+1]
143  */
144
145 static const int __quecvt_compat_matrix[8][8] = {
146       /* UN NL CR CW PR PW EX PD */
147         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
148         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
149         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
150         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
151         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
152         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
153         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
154         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
155 };
156
157 void dlm_print_lkb(struct dlm_lkb *lkb)
158 {
159         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164 }
165
166 void dlm_print_rsb(struct dlm_rsb *r)
167 {
168         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169                r->res_nodeid, r->res_flags, r->res_first_lkid,
170                r->res_recover_locks_count, r->res_name);
171 }
172
173 void dlm_dump_rsb(struct dlm_rsb *r)
174 {
175         struct dlm_lkb *lkb;
176
177         dlm_print_rsb(r);
178
179         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181         printk(KERN_ERR "rsb lookup list\n");
182         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183                 dlm_print_lkb(lkb);
184         printk(KERN_ERR "rsb grant queue:\n");
185         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb convert queue:\n");
188         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb wait queue:\n");
191         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193 }
194
195 /* Threads cannot use the lockspace while it's being recovered */
196
197 static inline void dlm_lock_recovery(struct dlm_ls *ls)
198 {
199         down_read(&ls->ls_in_recovery);
200 }
201
202 void dlm_unlock_recovery(struct dlm_ls *ls)
203 {
204         up_read(&ls->ls_in_recovery);
205 }
206
207 int dlm_lock_recovery_try(struct dlm_ls *ls)
208 {
209         return down_read_trylock(&ls->ls_in_recovery);
210 }
211
212 static inline int can_be_queued(struct dlm_lkb *lkb)
213 {
214         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215 }
216
217 static inline int force_blocking_asts(struct dlm_lkb *lkb)
218 {
219         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220 }
221
222 static inline int is_demoted(struct dlm_lkb *lkb)
223 {
224         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225 }
226
227 static inline int is_altmode(struct dlm_lkb *lkb)
228 {
229         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
230 }
231
232 static inline int is_granted(struct dlm_lkb *lkb)
233 {
234         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
235 }
236
237 static inline int is_remote(struct dlm_rsb *r)
238 {
239         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
240         return !!r->res_nodeid;
241 }
242
243 static inline int is_process_copy(struct dlm_lkb *lkb)
244 {
245         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
246 }
247
248 static inline int is_master_copy(struct dlm_lkb *lkb)
249 {
250         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
251                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
252         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
253 }
254
255 static inline int middle_conversion(struct dlm_lkb *lkb)
256 {
257         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
258             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
259                 return 1;
260         return 0;
261 }
262
263 static inline int down_conversion(struct dlm_lkb *lkb)
264 {
265         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
266 }
267
268 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
269 {
270         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
271 }
272
273 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
274 {
275         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
276 }
277
278 static inline int is_overlap(struct dlm_lkb *lkb)
279 {
280         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281                                   DLM_IFL_OVERLAP_CANCEL));
282 }
283
284 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
285 {
286         if (is_master_copy(lkb))
287                 return;
288
289         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
290
291         lkb->lkb_lksb->sb_status = rv;
292         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
293
294         dlm_add_ast(lkb, AST_COMP);
295 }
296
297 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
298 {
299         queue_cast(r, lkb,
300                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
301 }
302
303 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
304 {
305         if (is_master_copy(lkb))
306                 send_bast(r, lkb, rqmode);
307         else {
308                 lkb->lkb_bastmode = rqmode;
309                 dlm_add_ast(lkb, AST_BAST);
310         }
311 }
312
313 /*
314  * Basic operations on rsb's and lkb's
315  */
316
317 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
318 {
319         struct dlm_rsb *r;
320
321         r = allocate_rsb(ls, len);
322         if (!r)
323                 return NULL;
324
325         r->res_ls = ls;
326         r->res_length = len;
327         memcpy(r->res_name, name, len);
328         mutex_init(&r->res_mutex);
329
330         INIT_LIST_HEAD(&r->res_lookup);
331         INIT_LIST_HEAD(&r->res_grantqueue);
332         INIT_LIST_HEAD(&r->res_convertqueue);
333         INIT_LIST_HEAD(&r->res_waitqueue);
334         INIT_LIST_HEAD(&r->res_root_list);
335         INIT_LIST_HEAD(&r->res_recover_list);
336
337         return r;
338 }
339
340 static int search_rsb_list(struct list_head *head, char *name, int len,
341                            unsigned int flags, struct dlm_rsb **r_ret)
342 {
343         struct dlm_rsb *r;
344         int error = 0;
345
346         list_for_each_entry(r, head, res_hashchain) {
347                 if (len == r->res_length && !memcmp(name, r->res_name, len))
348                         goto found;
349         }
350         return -EBADR;
351
352  found:
353         if (r->res_nodeid && (flags & R_MASTER))
354                 error = -ENOTBLK;
355         *r_ret = r;
356         return error;
357 }
358
359 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
360                        unsigned int flags, struct dlm_rsb **r_ret)
361 {
362         struct dlm_rsb *r;
363         int error;
364
365         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
366         if (!error) {
367                 kref_get(&r->res_ref);
368                 goto out;
369         }
370         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
371         if (error)
372                 goto out;
373
374         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
375
376         if (dlm_no_directory(ls))
377                 goto out;
378
379         if (r->res_nodeid == -1) {
380                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
381                 r->res_first_lkid = 0;
382         } else if (r->res_nodeid > 0) {
383                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
384                 r->res_first_lkid = 0;
385         } else {
386                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
387                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
388         }
389  out:
390         *r_ret = r;
391         return error;
392 }
393
394 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
395                       unsigned int flags, struct dlm_rsb **r_ret)
396 {
397         int error;
398         write_lock(&ls->ls_rsbtbl[b].lock);
399         error = _search_rsb(ls, name, len, b, flags, r_ret);
400         write_unlock(&ls->ls_rsbtbl[b].lock);
401         return error;
402 }
403
404 /*
405  * Find rsb in rsbtbl and potentially create/add one
406  *
407  * Delaying the release of rsb's has a similar benefit to applications keeping
408  * NL locks on an rsb, but without the guarantee that the cached master value
409  * will still be valid when the rsb is reused.  Apps aren't always smart enough
410  * to keep NL locks on an rsb that they may lock again shortly; this can lead
411  * to excessive master lookups and removals if we don't delay the release.
412  *
413  * Searching for an rsb means looking through both the normal list and toss
414  * list.  When found on the toss list the rsb is moved to the normal list with
415  * ref count of 1; when found on normal list the ref count is incremented.
416  */
417
418 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
419                     unsigned int flags, struct dlm_rsb **r_ret)
420 {
421         struct dlm_rsb *r, *tmp;
422         uint32_t hash, bucket;
423         int error = 0;
424
425         if (dlm_no_directory(ls))
426                 flags |= R_CREATE;
427
428         hash = jhash(name, namelen, 0);
429         bucket = hash & (ls->ls_rsbtbl_size - 1);
430
431         error = search_rsb(ls, name, namelen, bucket, flags, &r);
432         if (!error)
433                 goto out;
434
435         if (error == -EBADR && !(flags & R_CREATE))
436                 goto out;
437
438         /* the rsb was found but wasn't a master copy */
439         if (error == -ENOTBLK)
440                 goto out;
441
442         error = -ENOMEM;
443         r = create_rsb(ls, name, namelen);
444         if (!r)
445                 goto out;
446
447         r->res_hash = hash;
448         r->res_bucket = bucket;
449         r->res_nodeid = -1;
450         kref_init(&r->res_ref);
451
452         /* With no directory, the master can be set immediately */
453         if (dlm_no_directory(ls)) {
454                 int nodeid = dlm_dir_nodeid(r);
455                 if (nodeid == dlm_our_nodeid())
456                         nodeid = 0;
457                 r->res_nodeid = nodeid;
458         }
459
460         write_lock(&ls->ls_rsbtbl[bucket].lock);
461         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
462         if (!error) {
463                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
464                 free_rsb(r);
465                 r = tmp;
466                 goto out;
467         }
468         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
469         write_unlock(&ls->ls_rsbtbl[bucket].lock);
470         error = 0;
471  out:
472         *r_ret = r;
473         return error;
474 }
475
476 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
477                  unsigned int flags, struct dlm_rsb **r_ret)
478 {
479         return find_rsb(ls, name, namelen, flags, r_ret);
480 }
481
482 /* This is only called to add a reference when the code already holds
483    a valid reference to the rsb, so there's no need for locking. */
484
485 static inline void hold_rsb(struct dlm_rsb *r)
486 {
487         kref_get(&r->res_ref);
488 }
489
490 void dlm_hold_rsb(struct dlm_rsb *r)
491 {
492         hold_rsb(r);
493 }
494
495 static void toss_rsb(struct kref *kref)
496 {
497         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
498         struct dlm_ls *ls = r->res_ls;
499
500         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
501         kref_init(&r->res_ref);
502         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
503         r->res_toss_time = jiffies;
504         if (r->res_lvbptr) {
505                 free_lvb(r->res_lvbptr);
506                 r->res_lvbptr = NULL;
507         }
508 }
509
510 /* When all references to the rsb are gone it's transfered to
511    the tossed list for later disposal. */
512
513 static void put_rsb(struct dlm_rsb *r)
514 {
515         struct dlm_ls *ls = r->res_ls;
516         uint32_t bucket = r->res_bucket;
517
518         write_lock(&ls->ls_rsbtbl[bucket].lock);
519         kref_put(&r->res_ref, toss_rsb);
520         write_unlock(&ls->ls_rsbtbl[bucket].lock);
521 }
522
523 void dlm_put_rsb(struct dlm_rsb *r)
524 {
525         put_rsb(r);
526 }
527
528 /* See comment for unhold_lkb */
529
530 static void unhold_rsb(struct dlm_rsb *r)
531 {
532         int rv;
533         rv = kref_put(&r->res_ref, toss_rsb);
534         DLM_ASSERT(!rv, dlm_dump_rsb(r););
535 }
536
537 static void kill_rsb(struct kref *kref)
538 {
539         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
540
541         /* All work is done after the return from kref_put() so we
542            can release the write_lock before the remove and free. */
543
544         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
545         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
546         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
547         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
548         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
549         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
550 }
551
552 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553    The rsb must exist as long as any lkb's for it do. */
554
555 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
556 {
557         hold_rsb(r);
558         lkb->lkb_resource = r;
559 }
560
561 static void detach_lkb(struct dlm_lkb *lkb)
562 {
563         if (lkb->lkb_resource) {
564                 put_rsb(lkb->lkb_resource);
565                 lkb->lkb_resource = NULL;
566         }
567 }
568
569 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
570 {
571         struct dlm_lkb *lkb, *tmp;
572         uint32_t lkid = 0;
573         uint16_t bucket;
574
575         lkb = allocate_lkb(ls);
576         if (!lkb)
577                 return -ENOMEM;
578
579         lkb->lkb_nodeid = -1;
580         lkb->lkb_grmode = DLM_LOCK_IV;
581         kref_init(&lkb->lkb_ref);
582         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
584
585         get_random_bytes(&bucket, sizeof(bucket));
586         bucket &= (ls->ls_lkbtbl_size - 1);
587
588         write_lock(&ls->ls_lkbtbl[bucket].lock);
589
590         /* counter can roll over so we must verify lkid is not in use */
591
592         while (lkid == 0) {
593                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
594
595                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
596                                     lkb_idtbl_list) {
597                         if (tmp->lkb_id != lkid)
598                                 continue;
599                         lkid = 0;
600                         break;
601                 }
602         }
603
604         lkb->lkb_id = lkid;
605         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
606         write_unlock(&ls->ls_lkbtbl[bucket].lock);
607
608         *lkb_ret = lkb;
609         return 0;
610 }
611
612 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
613 {
614         struct dlm_lkb *lkb;
615         uint16_t bucket = (lkid >> 16);
616
617         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
618                 if (lkb->lkb_id == lkid)
619                         return lkb;
620         }
621         return NULL;
622 }
623
624 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
625 {
626         struct dlm_lkb *lkb;
627         uint16_t bucket = (lkid >> 16);
628
629         if (bucket >= ls->ls_lkbtbl_size)
630                 return -EBADSLT;
631
632         read_lock(&ls->ls_lkbtbl[bucket].lock);
633         lkb = __find_lkb(ls, lkid);
634         if (lkb)
635                 kref_get(&lkb->lkb_ref);
636         read_unlock(&ls->ls_lkbtbl[bucket].lock);
637
638         *lkb_ret = lkb;
639         return lkb ? 0 : -ENOENT;
640 }
641
642 static void kill_lkb(struct kref *kref)
643 {
644         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
645
646         /* All work is done after the return from kref_put() so we
647            can release the write_lock before the detach_lkb */
648
649         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
650 }
651
652 /* __put_lkb() is used when an lkb may not have an rsb attached to
653    it so we need to provide the lockspace explicitly */
654
655 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
656 {
657         uint16_t bucket = (lkb->lkb_id >> 16);
658
659         write_lock(&ls->ls_lkbtbl[bucket].lock);
660         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
661                 list_del(&lkb->lkb_idtbl_list);
662                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
663
664                 detach_lkb(lkb);
665
666                 /* for local/process lkbs, lvbptr points to caller's lksb */
667                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
668                         free_lvb(lkb->lkb_lvbptr);
669                 free_lkb(lkb);
670                 return 1;
671         } else {
672                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
673                 return 0;
674         }
675 }
676
677 int dlm_put_lkb(struct dlm_lkb *lkb)
678 {
679         struct dlm_ls *ls;
680
681         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
682         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
683
684         ls = lkb->lkb_resource->res_ls;
685         return __put_lkb(ls, lkb);
686 }
687
688 /* This is only called to add a reference when the code already holds
689    a valid reference to the lkb, so there's no need for locking. */
690
691 static inline void hold_lkb(struct dlm_lkb *lkb)
692 {
693         kref_get(&lkb->lkb_ref);
694 }
695
696 /* This is called when we need to remove a reference and are certain
697    it's not the last ref.  e.g. del_lkb is always called between a
698    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699    put_lkb would work fine, but would involve unnecessary locking */
700
701 static inline void unhold_lkb(struct dlm_lkb *lkb)
702 {
703         int rv;
704         rv = kref_put(&lkb->lkb_ref, kill_lkb);
705         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
706 }
707
708 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
709                             int mode)
710 {
711         struct dlm_lkb *lkb = NULL;
712
713         list_for_each_entry(lkb, head, lkb_statequeue)
714                 if (lkb->lkb_rqmode < mode)
715                         break;
716
717         if (!lkb)
718                 list_add_tail(new, head);
719         else
720                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
721 }
722
723 /* add/remove lkb to rsb's grant/convert/wait queue */
724
725 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
726 {
727         kref_get(&lkb->lkb_ref);
728
729         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
730
731         lkb->lkb_status = status;
732
733         switch (status) {
734         case DLM_LKSTS_WAITING:
735                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
737                 else
738                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
739                 break;
740         case DLM_LKSTS_GRANTED:
741                 /* convention says granted locks kept in order of grmode */
742                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
743                                 lkb->lkb_grmode);
744                 break;
745         case DLM_LKSTS_CONVERT:
746                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
748                 else
749                         list_add_tail(&lkb->lkb_statequeue,
750                                       &r->res_convertqueue);
751                 break;
752         default:
753                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
754         }
755 }
756
757 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
758 {
759         lkb->lkb_status = 0;
760         list_del(&lkb->lkb_statequeue);
761         unhold_lkb(lkb);
762 }
763
764 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
765 {
766         hold_lkb(lkb);
767         del_lkb(r, lkb);
768         add_lkb(r, lkb, sts);
769         unhold_lkb(lkb);
770 }
771
772 static int msg_reply_type(int mstype)
773 {
774         switch (mstype) {
775         case DLM_MSG_REQUEST:
776                 return DLM_MSG_REQUEST_REPLY;
777         case DLM_MSG_CONVERT:
778                 return DLM_MSG_CONVERT_REPLY;
779         case DLM_MSG_UNLOCK:
780                 return DLM_MSG_UNLOCK_REPLY;
781         case DLM_MSG_CANCEL:
782                 return DLM_MSG_CANCEL_REPLY;
783         case DLM_MSG_LOOKUP:
784                 return DLM_MSG_LOOKUP_REPLY;
785         }
786         return -1;
787 }
788
789 /* add/remove lkb from global waiters list of lkb's waiting for
790    a reply from a remote node */
791
792 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
793 {
794         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
795         int error = 0;
796
797         mutex_lock(&ls->ls_waiters_mutex);
798
799         if (is_overlap_unlock(lkb) ||
800             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801                 error = -EINVAL;
802                 goto out;
803         }
804
805         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806                 switch (mstype) {
807                 case DLM_MSG_UNLOCK:
808                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809                         break;
810                 case DLM_MSG_CANCEL:
811                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812                         break;
813                 default:
814                         error = -EBUSY;
815                         goto out;
816                 }
817                 lkb->lkb_wait_count++;
818                 hold_lkb(lkb);
819
820                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
822                           lkb->lkb_wait_count, lkb->lkb_flags);
823                 goto out;
824         }
825
826         DLM_ASSERT(!lkb->lkb_wait_count,
827                    dlm_print_lkb(lkb);
828                    printk("wait_count %d\n", lkb->lkb_wait_count););
829
830         lkb->lkb_wait_count++;
831         lkb->lkb_wait_type = mstype;
832         hold_lkb(lkb);
833         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
834  out:
835         if (error)
836                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
838                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
839         mutex_unlock(&ls->ls_waiters_mutex);
840         return error;
841 }
842
843 /* We clear the RESEND flag because we might be taking an lkb off the waiters
844    list as part of process_requestqueue (e.g. a lookup that has an optimized
845    request reply on the requestqueue) between dlm_recover_waiters_pre() which
846    set RESEND and dlm_recover_waiters_post() */
847
848 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
849 {
850         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851         int overlap_done = 0;
852
853         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
854                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
855                 overlap_done = 1;
856                 goto out_del;
857         }
858
859         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861                 overlap_done = 1;
862                 goto out_del;
863         }
864
865         /* N.B. type of reply may not always correspond to type of original
866            msg due to lookup->request optimization, verify others? */
867
868         if (lkb->lkb_wait_type) {
869                 lkb->lkb_wait_type = 0;
870                 goto out_del;
871         }
872
873         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875         return -1;
876
877  out_del:
878         /* the force-unlock/cancel has completed and we haven't recvd a reply
879            to the op that was in progress prior to the unlock/cancel; we
880            give up on any reply to the earlier op.  FIXME: not sure when/how
881            this would happen */
882
883         if (overlap_done && lkb->lkb_wait_type) {
884                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
886                 lkb->lkb_wait_count--;
887                 lkb->lkb_wait_type = 0;
888         }
889
890         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
891
892         lkb->lkb_flags &= ~DLM_IFL_RESEND;
893         lkb->lkb_wait_count--;
894         if (!lkb->lkb_wait_count)
895                 list_del_init(&lkb->lkb_wait_reply);
896         unhold_lkb(lkb);
897         return 0;
898 }
899
900 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
901 {
902         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
903         int error;
904
905         mutex_lock(&ls->ls_waiters_mutex);
906         error = _remove_from_waiters(lkb, mstype);
907         mutex_unlock(&ls->ls_waiters_mutex);
908         return error;
909 }
910
911 /* Handles situations where we might be processing a "fake" or "stub" reply in
912    which we can't try to take waiters_mutex again. */
913
914 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
915 {
916         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917         int error;
918
919         if (ms != &ls->ls_stub_ms)
920                 mutex_lock(&ls->ls_waiters_mutex);
921         error = _remove_from_waiters(lkb, ms->m_type);
922         if (ms != &ls->ls_stub_ms)
923                 mutex_unlock(&ls->ls_waiters_mutex);
924         return error;
925 }
926
927 static void dir_remove(struct dlm_rsb *r)
928 {
929         int to_nodeid;
930
931         if (dlm_no_directory(r->res_ls))
932                 return;
933
934         to_nodeid = dlm_dir_nodeid(r);
935         if (to_nodeid != dlm_our_nodeid())
936                 send_remove(r);
937         else
938                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
939                                      r->res_name, r->res_length);
940 }
941
942 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
943    found since they are in order of newest to oldest? */
944
945 static int shrink_bucket(struct dlm_ls *ls, int b)
946 {
947         struct dlm_rsb *r;
948         int count = 0, found;
949
950         for (;;) {
951                 found = 0;
952                 write_lock(&ls->ls_rsbtbl[b].lock);
953                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
954                                             res_hashchain) {
955                         if (!time_after_eq(jiffies, r->res_toss_time +
956                                            dlm_config.ci_toss_secs * HZ))
957                                 continue;
958                         found = 1;
959                         break;
960                 }
961
962                 if (!found) {
963                         write_unlock(&ls->ls_rsbtbl[b].lock);
964                         break;
965                 }
966
967                 if (kref_put(&r->res_ref, kill_rsb)) {
968                         list_del(&r->res_hashchain);
969                         write_unlock(&ls->ls_rsbtbl[b].lock);
970
971                         if (is_master(r))
972                                 dir_remove(r);
973                         free_rsb(r);
974                         count++;
975                 } else {
976                         write_unlock(&ls->ls_rsbtbl[b].lock);
977                         log_error(ls, "tossed rsb in use %s", r->res_name);
978                 }
979         }
980
981         return count;
982 }
983
984 void dlm_scan_rsbs(struct dlm_ls *ls)
985 {
986         int i;
987
988         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
989                 shrink_bucket(ls, i);
990                 if (dlm_locking_stopped(ls))
991                         break;
992                 cond_resched();
993         }
994 }
995
996 /* lkb is master or local copy */
997
998 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
999 {
1000         int b, len = r->res_ls->ls_lvblen;
1001
1002         /* b=1 lvb returned to caller
1003            b=0 lvb written to rsb or invalidated
1004            b=-1 do nothing */
1005
1006         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1007
1008         if (b == 1) {
1009                 if (!lkb->lkb_lvbptr)
1010                         return;
1011
1012                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1013                         return;
1014
1015                 if (!r->res_lvbptr)
1016                         return;
1017
1018                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1019                 lkb->lkb_lvbseq = r->res_lvbseq;
1020
1021         } else if (b == 0) {
1022                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1023                         rsb_set_flag(r, RSB_VALNOTVALID);
1024                         return;
1025                 }
1026
1027                 if (!lkb->lkb_lvbptr)
1028                         return;
1029
1030                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1031                         return;
1032
1033                 if (!r->res_lvbptr)
1034                         r->res_lvbptr = allocate_lvb(r->res_ls);
1035
1036                 if (!r->res_lvbptr)
1037                         return;
1038
1039                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1040                 r->res_lvbseq++;
1041                 lkb->lkb_lvbseq = r->res_lvbseq;
1042                 rsb_clear_flag(r, RSB_VALNOTVALID);
1043         }
1044
1045         if (rsb_flag(r, RSB_VALNOTVALID))
1046                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1047 }
1048
1049 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1050 {
1051         if (lkb->lkb_grmode < DLM_LOCK_PW)
1052                 return;
1053
1054         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1055                 rsb_set_flag(r, RSB_VALNOTVALID);
1056                 return;
1057         }
1058
1059         if (!lkb->lkb_lvbptr)
1060                 return;
1061
1062         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1063                 return;
1064
1065         if (!r->res_lvbptr)
1066                 r->res_lvbptr = allocate_lvb(r->res_ls);
1067
1068         if (!r->res_lvbptr)
1069                 return;
1070
1071         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1072         r->res_lvbseq++;
1073         rsb_clear_flag(r, RSB_VALNOTVALID);
1074 }
1075
1076 /* lkb is process copy (pc) */
1077
1078 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1079                             struct dlm_message *ms)
1080 {
1081         int b;
1082
1083         if (!lkb->lkb_lvbptr)
1084                 return;
1085
1086         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1087                 return;
1088
1089         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1090         if (b == 1) {
1091                 int len = receive_extralen(ms);
1092                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1093                 lkb->lkb_lvbseq = ms->m_lvbseq;
1094         }
1095 }
1096
1097 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1098    remove_lock -- used for unlock, removes lkb from granted
1099    revert_lock -- used for cancel, moves lkb from convert to granted
1100    grant_lock  -- used for request and convert, adds lkb to granted or
1101                   moves lkb from convert or waiting to granted
1102
1103    Each of these is used for master or local copy lkb's.  There is
1104    also a _pc() variation used to make the corresponding change on
1105    a process copy (pc) lkb. */
1106
1107 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1108 {
1109         del_lkb(r, lkb);
1110         lkb->lkb_grmode = DLM_LOCK_IV;
1111         /* this unhold undoes the original ref from create_lkb()
1112            so this leads to the lkb being freed */
1113         unhold_lkb(lkb);
1114 }
1115
1116 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1117 {
1118         set_lvb_unlock(r, lkb);
1119         _remove_lock(r, lkb);
1120 }
1121
1122 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1123 {
1124         _remove_lock(r, lkb);
1125 }
1126
1127 /* returns: 0 did nothing
1128             1 moved lock to granted
1129            -1 removed lock */
1130
1131 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1132 {
1133         int rv = 0;
1134
1135         lkb->lkb_rqmode = DLM_LOCK_IV;
1136
1137         switch (lkb->lkb_status) {
1138         case DLM_LKSTS_GRANTED:
1139                 break;
1140         case DLM_LKSTS_CONVERT:
1141                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1142                 rv = 1;
1143                 break;
1144         case DLM_LKSTS_WAITING:
1145                 del_lkb(r, lkb);
1146                 lkb->lkb_grmode = DLM_LOCK_IV;
1147                 /* this unhold undoes the original ref from create_lkb()
1148                    so this leads to the lkb being freed */
1149                 unhold_lkb(lkb);
1150                 rv = -1;
1151                 break;
1152         default:
1153                 log_print("invalid status for revert %d", lkb->lkb_status);
1154         }
1155         return rv;
1156 }
1157
1158 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1159 {
1160         return revert_lock(r, lkb);
1161 }
1162
1163 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1164 {
1165         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1166                 lkb->lkb_grmode = lkb->lkb_rqmode;
1167                 if (lkb->lkb_status)
1168                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1169                 else
1170                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1171         }
1172
1173         lkb->lkb_rqmode = DLM_LOCK_IV;
1174 }
1175
1176 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1177 {
1178         set_lvb_lock(r, lkb);
1179         _grant_lock(r, lkb);
1180         lkb->lkb_highbast = 0;
1181 }
1182
1183 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1184                           struct dlm_message *ms)
1185 {
1186         set_lvb_lock_pc(r, lkb, ms);
1187         _grant_lock(r, lkb);
1188 }
1189
1190 /* called by grant_pending_locks() which means an async grant message must
1191    be sent to the requesting node in addition to granting the lock if the
1192    lkb belongs to a remote node. */
1193
1194 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1195 {
1196         grant_lock(r, lkb);
1197         if (is_master_copy(lkb))
1198                 send_grant(r, lkb);
1199         else
1200                 queue_cast(r, lkb, 0);
1201 }
1202
1203 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1204    change the granted/requested modes.  We're munging things accordingly in
1205    the process copy.
1206    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1207    conversion deadlock
1208    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1209    compatible with other granted locks */
1210
1211 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1212 {
1213         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1214                 log_print("munge_demoted %x invalid reply type %d",
1215                           lkb->lkb_id, ms->m_type);
1216                 return;
1217         }
1218
1219         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1220                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1221                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1222                 return;
1223         }
1224
1225         lkb->lkb_grmode = DLM_LOCK_NL;
1226 }
1227
1228 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1229 {
1230         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1231             ms->m_type != DLM_MSG_GRANT) {
1232                 log_print("munge_altmode %x invalid reply type %d",
1233                           lkb->lkb_id, ms->m_type);
1234                 return;
1235         }
1236
1237         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1238                 lkb->lkb_rqmode = DLM_LOCK_PR;
1239         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1240                 lkb->lkb_rqmode = DLM_LOCK_CW;
1241         else {
1242                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1243                 dlm_print_lkb(lkb);
1244         }
1245 }
1246
1247 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1248 {
1249         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1250                                            lkb_statequeue);
1251         if (lkb->lkb_id == first->lkb_id)
1252                 return 1;
1253
1254         return 0;
1255 }
1256
1257 /* Check if the given lkb conflicts with another lkb on the queue. */
1258
1259 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1260 {
1261         struct dlm_lkb *this;
1262
1263         list_for_each_entry(this, head, lkb_statequeue) {
1264                 if (this == lkb)
1265                         continue;
1266                 if (!modes_compat(this, lkb))
1267                         return 1;
1268         }
1269         return 0;
1270 }
1271
1272 /*
1273  * "A conversion deadlock arises with a pair of lock requests in the converting
1274  * queue for one resource.  The granted mode of each lock blocks the requested
1275  * mode of the other lock."
1276  *
1277  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1278  * convert queue from being granted, then demote lkb (set grmode to NL).
1279  * This second form requires that we check for conv-deadlk even when
1280  * now == 0 in _can_be_granted().
1281  *
1282  * Example:
1283  * Granted Queue: empty
1284  * Convert Queue: NL->EX (first lock)
1285  *                PR->EX (second lock)
1286  *
1287  * The first lock can't be granted because of the granted mode of the second
1288  * lock and the second lock can't be granted because it's not first in the
1289  * list.  We demote the granted mode of the second lock (the lkb passed to this
1290  * function).
1291  *
1292  * After the resolution, the "grant pending" function needs to go back and try
1293  * to grant locks on the convert queue again since the first lock can now be
1294  * granted.
1295  */
1296
1297 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1298 {
1299         struct dlm_lkb *this, *first = NULL, *self = NULL;
1300
1301         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1302                 if (!first)
1303                         first = this;
1304                 if (this == lkb) {
1305                         self = lkb;
1306                         continue;
1307                 }
1308
1309                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1310                         return 1;
1311         }
1312
1313         /* if lkb is on the convert queue and is preventing the first
1314            from being granted, then there's deadlock and we demote lkb.
1315            multiple converting locks may need to do this before the first
1316            converting lock can be granted. */
1317
1318         if (self && self != first) {
1319                 if (!modes_compat(lkb, first) &&
1320                     !queue_conflict(&rsb->res_grantqueue, first))
1321                         return 1;
1322         }
1323
1324         return 0;
1325 }
1326
1327 /*
1328  * Return 1 if the lock can be granted, 0 otherwise.
1329  * Also detect and resolve conversion deadlocks.
1330  *
1331  * lkb is the lock to be granted
1332  *
1333  * now is 1 if the function is being called in the context of the
1334  * immediate request, it is 0 if called later, after the lock has been
1335  * queued.
1336  *
1337  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1338  */
1339
1340 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1341 {
1342         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1343
1344         /*
1345          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1346          * a new request for a NL mode lock being blocked.
1347          *
1348          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1349          * request, then it would be granted.  In essence, the use of this flag
1350          * tells the Lock Manager to expedite theis request by not considering
1351          * what may be in the CONVERTING or WAITING queues...  As of this
1352          * writing, the EXPEDITE flag can be used only with new requests for NL
1353          * mode locks.  This flag is not valid for conversion requests.
1354          *
1355          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1356          * conversion or used with a non-NL requested mode.  We also know an
1357          * EXPEDITE request is always granted immediately, so now must always
1358          * be 1.  The full condition to grant an expedite request: (now &&
1359          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1360          * therefore be shortened to just checking the flag.
1361          */
1362
1363         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1364                 return 1;
1365
1366         /*
1367          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1368          * added to the remaining conditions.
1369          */
1370
1371         if (queue_conflict(&r->res_grantqueue, lkb))
1372                 goto out;
1373
1374         /*
1375          * 6-3: By default, a conversion request is immediately granted if the
1376          * requested mode is compatible with the modes of all other granted
1377          * locks
1378          */
1379
1380         if (queue_conflict(&r->res_convertqueue, lkb))
1381                 goto out;
1382
1383         /*
1384          * 6-5: But the default algorithm for deciding whether to grant or
1385          * queue conversion requests does not by itself guarantee that such
1386          * requests are serviced on a "first come first serve" basis.  This, in
1387          * turn, can lead to a phenomenon known as "indefinate postponement".
1388          *
1389          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1390          * the system service employed to request a lock conversion.  This flag
1391          * forces certain conversion requests to be queued, even if they are
1392          * compatible with the granted modes of other locks on the same
1393          * resource.  Thus, the use of this flag results in conversion requests
1394          * being ordered on a "first come first servce" basis.
1395          *
1396          * DCT: This condition is all about new conversions being able to occur
1397          * "in place" while the lock remains on the granted queue (assuming
1398          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1399          * doesn't _have_ to go onto the convert queue where it's processed in
1400          * order.  The "now" variable is necessary to distinguish converts
1401          * being received and processed for the first time now, because once a
1402          * convert is moved to the conversion queue the condition below applies
1403          * requiring fifo granting.
1404          */
1405
1406         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1407                 return 1;
1408
1409         /*
1410          * The NOORDER flag is set to avoid the standard vms rules on grant
1411          * order.
1412          */
1413
1414         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1415                 return 1;
1416
1417         /*
1418          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1419          * granted until all other conversion requests ahead of it are granted
1420          * and/or canceled.
1421          */
1422
1423         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1424                 return 1;
1425
1426         /*
1427          * 6-4: By default, a new request is immediately granted only if all
1428          * three of the following conditions are satisfied when the request is
1429          * issued:
1430          * - The queue of ungranted conversion requests for the resource is
1431          *   empty.
1432          * - The queue of ungranted new requests for the resource is empty.
1433          * - The mode of the new request is compatible with the most
1434          *   restrictive mode of all granted locks on the resource.
1435          */
1436
1437         if (now && !conv && list_empty(&r->res_convertqueue) &&
1438             list_empty(&r->res_waitqueue))
1439                 return 1;
1440
1441         /*
1442          * 6-4: Once a lock request is in the queue of ungranted new requests,
1443          * it cannot be granted until the queue of ungranted conversion
1444          * requests is empty, all ungranted new requests ahead of it are
1445          * granted and/or canceled, and it is compatible with the granted mode
1446          * of the most restrictive lock granted on the resource.
1447          */
1448
1449         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1450             first_in_list(lkb, &r->res_waitqueue))
1451                 return 1;
1452
1453  out:
1454         /*
1455          * The following, enabled by CONVDEADLK, departs from VMS.
1456          */
1457
1458         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1459             conversion_deadlock_detect(r, lkb)) {
1460                 lkb->lkb_grmode = DLM_LOCK_NL;
1461                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1462         }
1463
1464         return 0;
1465 }
1466
1467 /*
1468  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1469  * simple way to provide a big optimization to applications that can use them.
1470  */
1471
1472 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1473 {
1474         uint32_t flags = lkb->lkb_exflags;
1475         int rv;
1476         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1477
1478         rv = _can_be_granted(r, lkb, now);
1479         if (rv)
1480                 goto out;
1481
1482         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1483                 goto out;
1484
1485         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1486                 alt = DLM_LOCK_PR;
1487         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1488                 alt = DLM_LOCK_CW;
1489
1490         if (alt) {
1491                 lkb->lkb_rqmode = alt;
1492                 rv = _can_be_granted(r, lkb, now);
1493                 if (rv)
1494                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1495                 else
1496                         lkb->lkb_rqmode = rqmode;
1497         }
1498  out:
1499         return rv;
1500 }
1501
1502 static int grant_pending_convert(struct dlm_rsb *r, int high)
1503 {
1504         struct dlm_lkb *lkb, *s;
1505         int hi, demoted, quit, grant_restart, demote_restart;
1506
1507         quit = 0;
1508  restart:
1509         grant_restart = 0;
1510         demote_restart = 0;
1511         hi = DLM_LOCK_IV;
1512
1513         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1514                 demoted = is_demoted(lkb);
1515                 if (can_be_granted(r, lkb, 0)) {
1516                         grant_lock_pending(r, lkb);
1517                         grant_restart = 1;
1518                 } else {
1519                         hi = max_t(int, lkb->lkb_rqmode, hi);
1520                         if (!demoted && is_demoted(lkb))
1521                                 demote_restart = 1;
1522                 }
1523         }
1524
1525         if (grant_restart)
1526                 goto restart;
1527         if (demote_restart && !quit) {
1528                 quit = 1;
1529                 goto restart;
1530         }
1531
1532         return max_t(int, high, hi);
1533 }
1534
1535 static int grant_pending_wait(struct dlm_rsb *r, int high)
1536 {
1537         struct dlm_lkb *lkb, *s;
1538
1539         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1540                 if (can_be_granted(r, lkb, 0))
1541                         grant_lock_pending(r, lkb);
1542                 else
1543                         high = max_t(int, lkb->lkb_rqmode, high);
1544         }
1545
1546         return high;
1547 }
1548
1549 static void grant_pending_locks(struct dlm_rsb *r)
1550 {
1551         struct dlm_lkb *lkb, *s;
1552         int high = DLM_LOCK_IV;
1553
1554         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1555
1556         high = grant_pending_convert(r, high);
1557         high = grant_pending_wait(r, high);
1558
1559         if (high == DLM_LOCK_IV)
1560                 return;
1561
1562         /*
1563          * If there are locks left on the wait/convert queue then send blocking
1564          * ASTs to granted locks based on the largest requested mode (high)
1565          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1566          */
1567
1568         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1569                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1570                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1571                         queue_bast(r, lkb, high);
1572                         lkb->lkb_highbast = high;
1573                 }
1574         }
1575 }
1576
1577 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1578                             struct dlm_lkb *lkb)
1579 {
1580         struct dlm_lkb *gr;
1581
1582         list_for_each_entry(gr, head, lkb_statequeue) {
1583                 if (gr->lkb_bastaddr &&
1584                     gr->lkb_highbast < lkb->lkb_rqmode &&
1585                     !modes_compat(gr, lkb)) {
1586                         queue_bast(r, gr, lkb->lkb_rqmode);
1587                         gr->lkb_highbast = lkb->lkb_rqmode;
1588                 }
1589         }
1590 }
1591
1592 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1593 {
1594         send_bast_queue(r, &r->res_grantqueue, lkb);
1595 }
1596
1597 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1598 {
1599         send_bast_queue(r, &r->res_grantqueue, lkb);
1600         send_bast_queue(r, &r->res_convertqueue, lkb);
1601 }
1602
1603 /* set_master(r, lkb) -- set the master nodeid of a resource
1604
1605    The purpose of this function is to set the nodeid field in the given
1606    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1607    known, it can just be copied to the lkb and the function will return
1608    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1609    before it can be copied to the lkb.
1610
1611    When the rsb nodeid is being looked up remotely, the initial lkb
1612    causing the lookup is kept on the ls_waiters list waiting for the
1613    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1614    on the rsb's res_lookup list until the master is verified.
1615
1616    Return values:
1617    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1618    1: the rsb master is not available and the lkb has been placed on
1619       a wait queue
1620 */
1621
1622 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1623 {
1624         struct dlm_ls *ls = r->res_ls;
1625         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1626
1627         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1628                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1629                 r->res_first_lkid = lkb->lkb_id;
1630                 lkb->lkb_nodeid = r->res_nodeid;
1631                 return 0;
1632         }
1633
1634         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1635                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1636                 return 1;
1637         }
1638
1639         if (r->res_nodeid == 0) {
1640                 lkb->lkb_nodeid = 0;
1641                 return 0;
1642         }
1643
1644         if (r->res_nodeid > 0) {
1645                 lkb->lkb_nodeid = r->res_nodeid;
1646                 return 0;
1647         }
1648
1649         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1650
1651         dir_nodeid = dlm_dir_nodeid(r);
1652
1653         if (dir_nodeid != our_nodeid) {
1654                 r->res_first_lkid = lkb->lkb_id;
1655                 send_lookup(r, lkb);
1656                 return 1;
1657         }
1658
1659         for (;;) {
1660                 /* It's possible for dlm_scand to remove an old rsb for
1661                    this same resource from the toss list, us to create
1662                    a new one, look up the master locally, and find it
1663                    already exists just before dlm_scand does the
1664                    dir_remove() on the previous rsb. */
1665
1666                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1667                                        r->res_length, &ret_nodeid);
1668                 if (!error)
1669                         break;
1670                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1671                 schedule();
1672         }
1673
1674         if (ret_nodeid == our_nodeid) {
1675                 r->res_first_lkid = 0;
1676                 r->res_nodeid = 0;
1677                 lkb->lkb_nodeid = 0;
1678         } else {
1679                 r->res_first_lkid = lkb->lkb_id;
1680                 r->res_nodeid = ret_nodeid;
1681                 lkb->lkb_nodeid = ret_nodeid;
1682         }
1683         return 0;
1684 }
1685
1686 static void process_lookup_list(struct dlm_rsb *r)
1687 {
1688         struct dlm_lkb *lkb, *safe;
1689
1690         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1691                 list_del_init(&lkb->lkb_rsb_lookup);
1692                 _request_lock(r, lkb);
1693                 schedule();
1694         }
1695 }
1696
1697 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1698
1699 static void confirm_master(struct dlm_rsb *r, int error)
1700 {
1701         struct dlm_lkb *lkb;
1702
1703         if (!r->res_first_lkid)
1704                 return;
1705
1706         switch (error) {
1707         case 0:
1708         case -EINPROGRESS:
1709                 r->res_first_lkid = 0;
1710                 process_lookup_list(r);
1711                 break;
1712
1713         case -EAGAIN:
1714                 /* the remote master didn't queue our NOQUEUE request;
1715                    make a waiting lkb the first_lkid */
1716
1717                 r->res_first_lkid = 0;
1718
1719                 if (!list_empty(&r->res_lookup)) {
1720                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1721                                          lkb_rsb_lookup);
1722                         list_del_init(&lkb->lkb_rsb_lookup);
1723                         r->res_first_lkid = lkb->lkb_id;
1724                         _request_lock(r, lkb);
1725                 } else
1726                         r->res_nodeid = -1;
1727                 break;
1728
1729         default:
1730                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1731         }
1732 }
1733
1734 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1735                          int namelen, uint32_t parent_lkid, void *ast,
1736                          void *astarg, void *bast, struct dlm_args *args)
1737 {
1738         int rv = -EINVAL;
1739
1740         /* check for invalid arg usage */
1741
1742         if (mode < 0 || mode > DLM_LOCK_EX)
1743                 goto out;
1744
1745         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1746                 goto out;
1747
1748         if (flags & DLM_LKF_CANCEL)
1749                 goto out;
1750
1751         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1752                 goto out;
1753
1754         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1755                 goto out;
1756
1757         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1758                 goto out;
1759
1760         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1761                 goto out;
1762
1763         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1764                 goto out;
1765
1766         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1767                 goto out;
1768
1769         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1770                 goto out;
1771
1772         if (!ast || !lksb)
1773                 goto out;
1774
1775         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1776                 goto out;
1777
1778         /* parent/child locks not yet supported */
1779         if (parent_lkid)
1780                 goto out;
1781
1782         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1783                 goto out;
1784
1785         /* these args will be copied to the lkb in validate_lock_args,
1786            it cannot be done now because when converting locks, fields in
1787            an active lkb cannot be modified before locking the rsb */
1788
1789         args->flags = flags;
1790         args->astaddr = ast;
1791         args->astparam = (long) astarg;
1792         args->bastaddr = bast;
1793         args->mode = mode;
1794         args->lksb = lksb;
1795         rv = 0;
1796  out:
1797         return rv;
1798 }
1799
1800 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1801 {
1802         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1803                       DLM_LKF_FORCEUNLOCK))
1804                 return -EINVAL;
1805
1806         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1807                 return -EINVAL;
1808
1809         args->flags = flags;
1810         args->astparam = (long) astarg;
1811         return 0;
1812 }
1813
1814 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1815                               struct dlm_args *args)
1816 {
1817         int rv = -EINVAL;
1818
1819         if (args->flags & DLM_LKF_CONVERT) {
1820                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1821                         goto out;
1822
1823                 if (args->flags & DLM_LKF_QUECVT &&
1824                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1825                         goto out;
1826
1827                 rv = -EBUSY;
1828                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1829                         goto out;
1830
1831                 if (lkb->lkb_wait_type)
1832                         goto out;
1833
1834                 if (is_overlap(lkb))
1835                         goto out;
1836         }
1837
1838         lkb->lkb_exflags = args->flags;
1839         lkb->lkb_sbflags = 0;
1840         lkb->lkb_astaddr = args->astaddr;
1841         lkb->lkb_astparam = args->astparam;
1842         lkb->lkb_bastaddr = args->bastaddr;
1843         lkb->lkb_rqmode = args->mode;
1844         lkb->lkb_lksb = args->lksb;
1845         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1846         lkb->lkb_ownpid = (int) current->pid;
1847         rv = 0;
1848  out:
1849         return rv;
1850 }
1851
1852 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1853    for success */
1854
1855 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1856    because there may be a lookup in progress and it's valid to do
1857    cancel/unlockf on it */
1858
1859 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1860 {
1861         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1862         int rv = -EINVAL;
1863
1864         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1865                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1866                 dlm_print_lkb(lkb);
1867                 goto out;
1868         }
1869
1870         /* an lkb may still exist even though the lock is EOL'ed due to a
1871            cancel, unlock or failed noqueue request; an app can't use these
1872            locks; return same error as if the lkid had not been found at all */
1873
1874         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1875                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1876                 rv = -ENOENT;
1877                 goto out;
1878         }
1879
1880         /* an lkb may be waiting for an rsb lookup to complete where the
1881            lookup was initiated by another lock */
1882
1883         if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1884                 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1885                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1886                         list_del_init(&lkb->lkb_rsb_lookup);
1887                         queue_cast(lkb->lkb_resource, lkb,
1888                                    args->flags & DLM_LKF_CANCEL ?
1889                                    -DLM_ECANCEL : -DLM_EUNLOCK);
1890                         unhold_lkb(lkb); /* undoes create_lkb() */
1891                         rv = -EBUSY;
1892                         goto out;
1893                 }
1894         }
1895
1896         /* cancel not allowed with another cancel/unlock in progress */
1897
1898         if (args->flags & DLM_LKF_CANCEL) {
1899                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1900                         goto out;
1901
1902                 if (is_overlap(lkb))
1903                         goto out;
1904
1905                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1906                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1907                         rv = -EBUSY;
1908                         goto out;
1909                 }
1910
1911                 switch (lkb->lkb_wait_type) {
1912                 case DLM_MSG_LOOKUP:
1913                 case DLM_MSG_REQUEST:
1914                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1915                         rv = -EBUSY;
1916                         goto out;
1917                 case DLM_MSG_UNLOCK:
1918                 case DLM_MSG_CANCEL:
1919                         goto out;
1920                 }
1921                 /* add_to_waiters() will set OVERLAP_CANCEL */
1922                 goto out_ok;
1923         }
1924
1925         /* do we need to allow a force-unlock if there's a normal unlock
1926            already in progress?  in what conditions could the normal unlock
1927            fail such that we'd want to send a force-unlock to be sure? */
1928
1929         if (args->flags & DLM_LKF_FORCEUNLOCK) {
1930                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1931                         goto out;
1932
1933                 if (is_overlap_unlock(lkb))
1934                         goto out;
1935
1936                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1937                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1938                         rv = -EBUSY;
1939                         goto out;
1940                 }
1941
1942                 switch (lkb->lkb_wait_type) {
1943                 case DLM_MSG_LOOKUP:
1944                 case DLM_MSG_REQUEST:
1945                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1946                         rv = -EBUSY;
1947                         goto out;
1948                 case DLM_MSG_UNLOCK:
1949                         goto out;
1950                 }
1951                 /* add_to_waiters() will set OVERLAP_UNLOCK */
1952                 goto out_ok;
1953         }
1954
1955         /* normal unlock not allowed if there's any op in progress */
1956         rv = -EBUSY;
1957         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1958                 goto out;
1959
1960  out_ok:
1961         /* an overlapping op shouldn't blow away exflags from other op */
1962         lkb->lkb_exflags |= args->flags;
1963         lkb->lkb_sbflags = 0;
1964         lkb->lkb_astparam = args->astparam;
1965         rv = 0;
1966  out:
1967         if (rv)
1968                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1969                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1970                           args->flags, lkb->lkb_wait_type,
1971                           lkb->lkb_resource->res_name);
1972         return rv;
1973 }
1974
1975 /*
1976  * Four stage 4 varieties:
1977  * do_request(), do_convert(), do_unlock(), do_cancel()
1978  * These are called on the master node for the given lock and
1979  * from the central locking logic.
1980  */
1981
1982 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1983 {
1984         int error = 0;
1985
1986         if (can_be_granted(r, lkb, 1)) {
1987                 grant_lock(r, lkb);
1988                 queue_cast(r, lkb, 0);
1989                 goto out;
1990         }
1991
1992         if (can_be_queued(lkb)) {
1993                 error = -EINPROGRESS;
1994                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1995                 send_blocking_asts(r, lkb);
1996                 goto out;
1997         }
1998
1999         error = -EAGAIN;
2000         if (force_blocking_asts(lkb))
2001                 send_blocking_asts_all(r, lkb);
2002         queue_cast(r, lkb, -EAGAIN);
2003
2004  out:
2005         return error;
2006 }
2007
2008 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2009 {
2010         int error = 0;
2011
2012         /* changing an existing lock may allow others to be granted */
2013
2014         if (can_be_granted(r, lkb, 1)) {
2015                 grant_lock(r, lkb);
2016                 queue_cast(r, lkb, 0);
2017                 grant_pending_locks(r);
2018                 goto out;
2019         }
2020
2021         /* is_demoted() means the can_be_granted() above set the grmode
2022            to NL, and left us on the granted queue.  This auto-demotion
2023            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2024            now grantable.  We have to try to grant other converting locks
2025            before we try again to grant this one. */
2026
2027         if (is_demoted(lkb)) {
2028                 grant_pending_convert(r, DLM_LOCK_IV);
2029                 if (_can_be_granted(r, lkb, 1)) {
2030                         grant_lock(r, lkb);
2031                         queue_cast(r, lkb, 0);
2032                         grant_pending_locks(r);
2033                         goto out;
2034                 }
2035                 /* else fall through and move to convert queue */
2036         }
2037
2038         if (can_be_queued(lkb)) {
2039                 error = -EINPROGRESS;
2040                 del_lkb(r, lkb);
2041                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2042                 send_blocking_asts(r, lkb);
2043                 goto out;
2044         }
2045
2046         error = -EAGAIN;
2047         if (force_blocking_asts(lkb))
2048                 send_blocking_asts_all(r, lkb);
2049         queue_cast(r, lkb, -EAGAIN);
2050
2051  out:
2052         return error;
2053 }
2054
2055 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2056 {
2057         remove_lock(r, lkb);
2058         queue_cast(r, lkb, -DLM_EUNLOCK);
2059         grant_pending_locks(r);
2060         return -DLM_EUNLOCK;
2061 }
2062
2063 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2064  
2065 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2066 {
2067         int error;
2068
2069         error = revert_lock(r, lkb);
2070         if (error) {
2071                 queue_cast(r, lkb, -DLM_ECANCEL);
2072                 grant_pending_locks(r);
2073                 return -DLM_ECANCEL;
2074         }
2075         return 0;
2076 }
2077
2078 /*
2079  * Four stage 3 varieties:
2080  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2081  */
2082
2083 /* add a new lkb to a possibly new rsb, called by requesting process */
2084
2085 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2086 {
2087         int error;
2088
2089         /* set_master: sets lkb nodeid from r */
2090
2091         error = set_master(r, lkb);
2092         if (error < 0)
2093                 goto out;
2094         if (error) {
2095                 error = 0;
2096                 goto out;
2097         }
2098
2099         if (is_remote(r))
2100                 /* receive_request() calls do_request() on remote node */
2101                 error = send_request(r, lkb);
2102         else
2103                 error = do_request(r, lkb);
2104  out:
2105         return error;
2106 }
2107
2108 /* change some property of an existing lkb, e.g. mode */
2109
2110 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2111 {
2112         int error;
2113
2114         if (is_remote(r))
2115                 /* receive_convert() calls do_convert() on remote node */
2116                 error = send_convert(r, lkb);
2117         else
2118                 error = do_convert(r, lkb);
2119
2120         return error;
2121 }
2122
2123 /* remove an existing lkb from the granted queue */
2124
2125 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2126 {
2127         int error;
2128
2129         if (is_remote(r))
2130                 /* receive_unlock() calls do_unlock() on remote node */
2131                 error = send_unlock(r, lkb);
2132         else
2133                 error = do_unlock(r, lkb);
2134
2135         return error;
2136 }
2137
2138 /* remove an existing lkb from the convert or wait queue */
2139
2140 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2141 {
2142         int error;
2143
2144         if (is_remote(r))
2145                 /* receive_cancel() calls do_cancel() on remote node */
2146                 error = send_cancel(r, lkb);
2147         else
2148                 error = do_cancel(r, lkb);
2149
2150         return error;
2151 }
2152
2153 /*
2154  * Four stage 2 varieties:
2155  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2156  */
2157
2158 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2159                         int len, struct dlm_args *args)
2160 {
2161         struct dlm_rsb *r;
2162         int error;
2163
2164         error = validate_lock_args(ls, lkb, args);
2165         if (error)
2166                 goto out;
2167
2168         error = find_rsb(ls, name, len, R_CREATE, &r);
2169         if (error)
2170                 goto out;
2171
2172         lock_rsb(r);
2173
2174         attach_lkb(r, lkb);
2175         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2176
2177         error = _request_lock(r, lkb);
2178
2179         unlock_rsb(r);
2180         put_rsb(r);
2181
2182  out:
2183         return error;
2184 }
2185
2186 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2187                         struct dlm_args *args)
2188 {
2189         struct dlm_rsb *r;
2190         int error;
2191
2192         r = lkb->lkb_resource;
2193
2194         hold_rsb(r);
2195         lock_rsb(r);
2196
2197         error = validate_lock_args(ls, lkb, args);
2198         if (error)
2199                 goto out;
2200
2201         error = _convert_lock(r, lkb);
2202  out:
2203         unlock_rsb(r);
2204         put_rsb(r);
2205         return error;
2206 }
2207
2208 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2209                        struct dlm_args *args)
2210 {
2211         struct dlm_rsb *r;
2212         int error;
2213
2214         r = lkb->lkb_resource;
2215
2216         hold_rsb(r);
2217         lock_rsb(r);
2218
2219         error = validate_unlock_args(lkb, args);
2220         if (error)
2221                 goto out;
2222
2223         error = _unlock_lock(r, lkb);
2224  out:
2225         unlock_rsb(r);
2226         put_rsb(r);
2227         return error;
2228 }
2229
2230 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2231                        struct dlm_args *args)
2232 {
2233         struct dlm_rsb *r;
2234         int error;
2235
2236         r = lkb->lkb_resource;
2237
2238         hold_rsb(r);
2239         lock_rsb(r);
2240
2241         error = validate_unlock_args(lkb, args);
2242         if (error)
2243                 goto out;
2244
2245         error = _cancel_lock(r, lkb);
2246  out:
2247         unlock_rsb(r);
2248         put_rsb(r);
2249         return error;
2250 }
2251
2252 /*
2253  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2254  */
2255
2256 int dlm_lock(dlm_lockspace_t *lockspace,
2257              int mode,
2258              struct dlm_lksb *lksb,
2259              uint32_t flags,
2260              void *name,
2261              unsigned int namelen,
2262              uint32_t parent_lkid,
2263              void (*ast) (void *astarg),
2264              void *astarg,
2265              void (*bast) (void *astarg, int mode))
2266 {
2267         struct dlm_ls *ls;
2268         struct dlm_lkb *lkb;
2269         struct dlm_args args;
2270         int error, convert = flags & DLM_LKF_CONVERT;
2271
2272         ls = dlm_find_lockspace_local(lockspace);
2273         if (!ls)
2274                 return -EINVAL;
2275
2276         dlm_lock_recovery(ls);
2277
2278         if (convert)
2279                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2280         else
2281                 error = create_lkb(ls, &lkb);
2282
2283         if (error)
2284                 goto out;
2285
2286         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
2287                               astarg, bast, &args);
2288         if (error)
2289                 goto out_put;
2290
2291         if (convert)
2292                 error = convert_lock(ls, lkb, &args);
2293         else
2294                 error = request_lock(ls, lkb, name, namelen, &args);
2295
2296         if (error == -EINPROGRESS)
2297                 error = 0;
2298  out_put:
2299         if (convert || error)
2300                 __put_lkb(ls, lkb);
2301         if (error == -EAGAIN)
2302                 error = 0;
2303  out:
2304         dlm_unlock_recovery(ls);
2305         dlm_put_lockspace(ls);
2306         return error;
2307 }
2308
2309 int dlm_unlock(dlm_lockspace_t *lockspace,
2310                uint32_t lkid,
2311                uint32_t flags,
2312                struct dlm_lksb *lksb,
2313                void *astarg)
2314 {
2315         struct dlm_ls *ls;
2316         struct dlm_lkb *lkb;
2317         struct dlm_args args;
2318         int error;
2319
2320         ls = dlm_find_lockspace_local(lockspace);
2321         if (!ls)
2322                 return -EINVAL;
2323
2324         dlm_lock_recovery(ls);
2325
2326         error = find_lkb(ls, lkid, &lkb);
2327         if (error)
2328                 goto out;
2329
2330         error = set_unlock_args(flags, astarg, &args);
2331         if (error)
2332                 goto out_put;
2333
2334         if (flags & DLM_LKF_CANCEL)
2335                 error = cancel_lock(ls, lkb, &args);
2336         else
2337                 error = unlock_lock(ls, lkb, &args);
2338
2339         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2340                 error = 0;
2341         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2342                 error = 0;
2343  out_put:
2344         dlm_put_lkb(lkb);
2345  out:
2346         dlm_unlock_recovery(ls);
2347         dlm_put_lockspace(ls);
2348         return error;
2349 }
2350
2351 /*
2352  * send/receive routines for remote operations and replies
2353  *
2354  * send_args
2355  * send_common
2356  * send_request                 receive_request
2357  * send_convert                 receive_convert
2358  * send_unlock                  receive_unlock
2359  * send_cancel                  receive_cancel
2360  * send_grant                   receive_grant
2361  * send_bast                    receive_bast
2362  * send_lookup                  receive_lookup
2363  * send_remove                  receive_remove
2364  *
2365  *                              send_common_reply
2366  * receive_request_reply        send_request_reply
2367  * receive_convert_reply        send_convert_reply
2368  * receive_unlock_reply         send_unlock_reply
2369  * receive_cancel_reply         send_cancel_reply
2370  * receive_lookup_reply         send_lookup_reply
2371  */
2372
2373 static int _create_message(struct dlm_ls *ls, int mb_len,
2374                            int to_nodeid, int mstype,
2375                            struct dlm_message **ms_ret,
2376                            struct dlm_mhandle **mh_ret)
2377 {
2378         struct dlm_message *ms;
2379         struct dlm_mhandle *mh;
2380         char *mb;
2381
2382         /* get_buffer gives us a message handle (mh) that we need to
2383            pass into lowcomms_commit and a message buffer (mb) that we
2384            write our data into */
2385
2386         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2387         if (!mh)
2388                 return -ENOBUFS;
2389
2390         memset(mb, 0, mb_len);
2391
2392         ms = (struct dlm_message *) mb;
2393
2394         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2395         ms->m_header.h_lockspace = ls->ls_global_id;
2396         ms->m_header.h_nodeid = dlm_our_nodeid();
2397         ms->m_header.h_length = mb_len;
2398         ms->m_header.h_cmd = DLM_MSG;
2399
2400         ms->m_type = mstype;
2401
2402         *mh_ret = mh;
2403         *ms_ret = ms;
2404         return 0;
2405 }
2406
2407 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2408                           int to_nodeid, int mstype,
2409                           struct dlm_message **ms_ret,
2410                           struct dlm_mhandle **mh_ret)
2411 {
2412         int mb_len = sizeof(struct dlm_message);
2413
2414         switch (mstype) {
2415         case DLM_MSG_REQUEST:
2416         case DLM_MSG_LOOKUP:
2417         case DLM_MSG_REMOVE:
2418                 mb_len += r->res_length;
2419                 break;
2420         case DLM_MSG_CONVERT:
2421         case DLM_MSG_UNLOCK:
2422         case DLM_MSG_REQUEST_REPLY:
2423         case DLM_MSG_CONVERT_REPLY:
2424         case DLM_MSG_GRANT:
2425                 if (lkb && lkb->lkb_lvbptr)
2426                         mb_len += r->res_ls->ls_lvblen;
2427                 break;
2428         }
2429
2430         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2431                                ms_ret, mh_ret);
2432 }
2433
2434 /* further lowcomms enhancements or alternate implementations may make
2435    the return value from this function useful at some point */
2436
2437 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2438 {
2439         dlm_message_out(ms);
2440         dlm_lowcomms_commit_buffer(mh);
2441         return 0;
2442 }
2443
2444 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2445                       struct dlm_message *ms)
2446 {
2447         ms->m_nodeid   = lkb->lkb_nodeid;
2448         ms->m_pid      = lkb->lkb_ownpid;
2449         ms->m_lkid     = lkb->lkb_id;
2450         ms->m_remid    = lkb->lkb_remid;
2451         ms->m_exflags  = lkb->lkb_exflags;
2452         ms->m_sbflags  = lkb->lkb_sbflags;
2453         ms->m_flags    = lkb->lkb_flags;
2454         ms->m_lvbseq   = lkb->lkb_lvbseq;
2455         ms->m_status   = lkb->lkb_status;
2456         ms->m_grmode   = lkb->lkb_grmode;
2457         ms->m_rqmode   = lkb->lkb_rqmode;
2458         ms->m_hash     = r->res_hash;
2459
2460         /* m_result and m_bastmode are set from function args,
2461            not from lkb fields */
2462
2463         if (lkb->lkb_bastaddr)
2464                 ms->m_asts |= AST_BAST;
2465         if (lkb->lkb_astaddr)
2466                 ms->m_asts |= AST_COMP;
2467
2468         /* compare with switch in create_message; send_remove() doesn't
2469            use send_args() */
2470
2471         switch (ms->m_type) {
2472         case DLM_MSG_REQUEST:
2473         case DLM_MSG_LOOKUP:
2474                 memcpy(ms->m_extra, r->res_name, r->res_length);
2475                 break;
2476         case DLM_MSG_CONVERT:
2477         case DLM_MSG_UNLOCK:
2478         case DLM_MSG_REQUEST_REPLY:
2479         case DLM_MSG_CONVERT_REPLY:
2480         case DLM_MSG_GRANT:
2481                 if (!lkb->lkb_lvbptr)
2482                         break;
2483                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2484                 break;
2485         }
2486 }
2487
2488 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2489 {
2490         struct dlm_message *ms;
2491         struct dlm_mhandle *mh;
2492         int to_nodeid, error;
2493
2494         error = add_to_waiters(lkb, mstype);
2495         if (error)
2496                 return error;
2497
2498         to_nodeid = r->res_nodeid;
2499
2500         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2501         if (error)
2502                 goto fail;
2503
2504         send_args(r, lkb, ms);
2505
2506         error = send_message(mh, ms);
2507         if (error)
2508                 goto fail;
2509         return 0;
2510
2511  fail:
2512         remove_from_waiters(lkb, msg_reply_type(mstype));
2513         return error;
2514 }
2515
2516 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2517 {
2518         return send_common(r, lkb, DLM_MSG_REQUEST);
2519 }
2520
2521 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2522 {
2523         int error;
2524
2525         error = send_common(r, lkb, DLM_MSG_CONVERT);
2526
2527         /* down conversions go without a reply from the master */
2528         if (!error && down_conversion(lkb)) {
2529                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2530                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2531                 r->res_ls->ls_stub_ms.m_result = 0;
2532                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2533                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2534         }
2535
2536         return error;
2537 }
2538
2539 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2540    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2541    that the master is still correct. */
2542
2543 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2544 {
2545         return send_common(r, lkb, DLM_MSG_UNLOCK);
2546 }
2547
2548 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2549 {
2550         return send_common(r, lkb, DLM_MSG_CANCEL);
2551 }
2552
2553 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2554 {
2555         struct dlm_message *ms;
2556         struct dlm_mhandle *mh;
2557         int to_nodeid, error;
2558
2559         to_nodeid = lkb->lkb_nodeid;
2560
2561         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2562         if (error)
2563                 goto out;
2564
2565         send_args(r, lkb, ms);
2566
2567         ms->m_result = 0;
2568
2569         error = send_message(mh, ms);
2570  out:
2571         return error;
2572 }
2573
2574 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2575 {
2576         struct dlm_message *ms;
2577         struct dlm_mhandle *mh;
2578         int to_nodeid, error;
2579
2580         to_nodeid = lkb->lkb_nodeid;
2581
2582         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2583         if (error)
2584                 goto out;
2585
2586         send_args(r, lkb, ms);
2587
2588         ms->m_bastmode = mode;
2589
2590         error = send_message(mh, ms);
2591  out:
2592         return error;
2593 }
2594
2595 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2596 {
2597         struct dlm_message *ms;
2598         struct dlm_mhandle *mh;
2599         int to_nodeid, error;
2600
2601         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2602         if (error)
2603                 return error;
2604
2605         to_nodeid = dlm_dir_nodeid(r);
2606
2607         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2608         if (error)
2609                 goto fail;
2610
2611         send_args(r, lkb, ms);
2612
2613         error = send_message(mh, ms);
2614         if (error)
2615                 goto fail;
2616         return 0;
2617
2618  fail:
2619         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2620         return error;
2621 }
2622
2623 static int send_remove(struct dlm_rsb *r)
2624 {
2625         struct dlm_message *ms;
2626         struct dlm_mhandle *mh;
2627         int to_nodeid, error;
2628
2629         to_nodeid = dlm_dir_nodeid(r);
2630
2631         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2632         if (error)
2633                 goto out;
2634
2635         memcpy(ms->m_extra, r->res_name, r->res_length);
2636         ms->m_hash = r->res_hash;
2637
2638         error = send_message(mh, ms);
2639  out:
2640         return error;
2641 }
2642
2643 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2644                              int mstype, int rv)
2645 {
2646         struct dlm_message *ms;
2647         struct dlm_mhandle *mh;
2648         int to_nodeid, error;
2649
2650         to_nodeid = lkb->lkb_nodeid;
2651
2652         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2653         if (error)
2654                 goto out;
2655
2656         send_args(r, lkb, ms);
2657
2658         ms->m_result = rv;
2659
2660         error = send_message(mh, ms);
2661  out:
2662         return error;
2663 }
2664
2665 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2666 {
2667         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2668 }
2669
2670 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2671 {
2672         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2673 }
2674
2675 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2676 {
2677         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2678 }
2679
2680 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2681 {
2682         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2683 }
2684
2685 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2686                              int ret_nodeid, int rv)
2687 {
2688         struct dlm_rsb *r = &ls->ls_stub_rsb;
2689         struct dlm_message *ms;
2690         struct dlm_mhandle *mh;
2691         int error, nodeid = ms_in->m_header.h_nodeid;
2692
2693         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2694         if (error)
2695                 goto out;
2696
2697         ms->m_lkid = ms_in->m_lkid;
2698         ms->m_result = rv;
2699         ms->m_nodeid = ret_nodeid;
2700
2701         error = send_message(mh, ms);
2702  out:
2703         return error;
2704 }
2705
2706 /* which args we save from a received message depends heavily on the type
2707    of message, unlike the send side where we can safely send everything about
2708    the lkb for any type of message */
2709
2710 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2711 {
2712         lkb->lkb_exflags = ms->m_exflags;
2713         lkb->lkb_sbflags = ms->m_sbflags;
2714         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2715                          (ms->m_flags & 0x0000FFFF);
2716 }
2717
2718 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2719 {
2720         lkb->lkb_sbflags = ms->m_sbflags;
2721         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2722                          (ms->m_flags & 0x0000FFFF);
2723 }
2724
2725 static int receive_extralen(struct dlm_message *ms)
2726 {
2727         return (ms->m_header.h_length - sizeof(struct dlm_message));
2728 }
2729
2730 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2731                        struct dlm_message *ms)
2732 {
2733         int len;
2734
2735         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2736                 if (!lkb->lkb_lvbptr)
2737                         lkb->lkb_lvbptr = allocate_lvb(ls);
2738                 if (!lkb->lkb_lvbptr)
2739                         return -ENOMEM;
2740                 len = receive_extralen(ms);
2741                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2742         }
2743         return 0;
2744 }
2745
2746 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2747                                 struct dlm_message *ms)
2748 {
2749         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2750         lkb->lkb_ownpid = ms->m_pid;
2751         lkb->lkb_remid = ms->m_lkid;
2752         lkb->lkb_grmode = DLM_LOCK_IV;
2753         lkb->lkb_rqmode = ms->m_rqmode;
2754         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2755         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2756
2757         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2758
2759         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2760                 /* lkb was just created so there won't be an lvb yet */
2761                 lkb->lkb_lvbptr = allocate_lvb(ls);
2762                 if (!lkb->lkb_lvbptr)
2763                         return -ENOMEM;
2764         }
2765
2766         return 0;
2767 }
2768
2769 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2770                                 struct dlm_message *ms)
2771 {
2772         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2773                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2774                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2775                           lkb->lkb_id, lkb->lkb_remid);
2776                 return -EINVAL;
2777         }
2778
2779         if (!is_master_copy(lkb))
2780                 return -EINVAL;
2781
2782         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2783                 return -EBUSY;
2784
2785         if (receive_lvb(ls, lkb, ms))
2786                 return -ENOMEM;
2787
2788         lkb->lkb_rqmode = ms->m_rqmode;
2789         lkb->lkb_lvbseq = ms->m_lvbseq;
2790
2791         return 0;
2792 }
2793
2794 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2795                                struct dlm_message *ms)
2796 {
2797         if (!is_master_copy(lkb))
2798                 return -EINVAL;
2799         if (receive_lvb(ls, lkb, ms))
2800                 return -ENOMEM;
2801         return 0;
2802 }
2803
2804 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2805    uses to send a reply and that the remote end uses to process the reply. */
2806
2807 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2808 {
2809         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2810         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2811         lkb->lkb_remid = ms->m_lkid;
2812 }
2813
2814 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2815 {
2816         struct dlm_lkb *lkb;
2817         struct dlm_rsb *r;
2818         int error, namelen;
2819
2820         error = create_lkb(ls, &lkb);
2821         if (error)
2822                 goto fail;
2823
2824         receive_flags(lkb, ms);
2825         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2826         error = receive_request_args(ls, lkb, ms);
2827         if (error) {
2828                 __put_lkb(ls, lkb);
2829                 goto fail;
2830         }
2831
2832         namelen = receive_extralen(ms);
2833
2834         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2835         if (error) {
2836                 __put_lkb(ls, lkb);
2837                 goto fail;
2838         }
2839
2840         lock_rsb(r);
2841
2842         attach_lkb(r, lkb);
2843         error = do_request(r, lkb);
2844         send_request_reply(r, lkb, error);
2845
2846         unlock_rsb(r);
2847         put_rsb(r);
2848
2849         if (error == -EINPROGRESS)
2850                 error = 0;
2851         if (error)
2852                 dlm_put_lkb(lkb);
2853         return;
2854
2855  fail:
2856         setup_stub_lkb(ls, ms);
2857         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2858 }
2859
2860 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2861 {
2862         struct dlm_lkb *lkb;
2863         struct dlm_rsb *r;
2864         int error, reply = 1;
2865
2866         error = find_lkb(ls, ms->m_remid, &lkb);
2867         if (error)
2868                 goto fail;
2869
2870         r = lkb->lkb_resource;
2871
2872         hold_rsb(r);
2873         lock_rsb(r);
2874
2875         receive_flags(lkb, ms);
2876         error = receive_convert_args(ls, lkb, ms);
2877         if (error)
2878                 goto out;
2879         reply = !down_conversion(lkb);
2880
2881         error = do_convert(r, lkb);
2882  out:
2883         if (reply)
2884                 send_convert_reply(r, lkb, error);
2885
2886         unlock_rsb(r);
2887         put_rsb(r);
2888         dlm_put_lkb(lkb);
2889         return;
2890
2891  fail:
2892         setup_stub_lkb(ls, ms);
2893         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2894 }
2895
2896 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2897 {
2898         struct dlm_lkb *lkb;
2899         struct dlm_rsb *r;
2900         int error;
2901
2902         error = find_lkb(ls, ms->m_remid, &lkb);
2903         if (error)
2904                 goto fail;
2905
2906         r = lkb->lkb_resource;
2907
2908         hold_rsb(r);
2909         lock_rsb(r);
2910
2911         receive_flags(lkb, ms);
2912         error = receive_unlock_args(ls, lkb, ms);
2913         if (error)
2914                 goto out;
2915
2916         error = do_unlock(r, lkb);
2917  out:
2918         send_unlock_reply(r, lkb, error);
2919
2920         unlock_rsb(r);
2921         put_rsb(r);
2922         dlm_put_lkb(lkb);
2923         return;
2924
2925  fail:
2926         setup_stub_lkb(ls, ms);
2927         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2928 }
2929
2930 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2931 {
2932         struct dlm_lkb *lkb;
2933         struct dlm_rsb *r;
2934         int error;
2935
2936         error = find_lkb(ls, ms->m_remid, &lkb);
2937         if (error)
2938                 goto fail;
2939
2940         receive_flags(lkb, ms);
2941
2942         r = lkb->lkb_resource;
2943
2944         hold_rsb(r);
2945         lock_rsb(r);
2946
2947         error = do_cancel(r, lkb);
2948         send_cancel_reply(r, lkb, error);
2949
2950         unlock_rsb(r);
2951         put_rsb(r);
2952         dlm_put_lkb(lkb);
2953         return;
2954
2955  fail:
2956         setup_stub_lkb(ls, ms);
2957         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2958 }
2959
2960 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2961 {
2962         struct dlm_lkb *lkb;
2963         struct dlm_rsb *r;
2964         int error;
2965
2966         error = find_lkb(ls, ms->m_remid, &lkb);
2967         if (error) {
2968                 log_error(ls, "receive_grant no lkb");
2969                 return;
2970         }
2971         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2972
2973         r = lkb->lkb_resource;
2974
2975         hold_rsb(r);
2976         lock_rsb(r);
2977
2978         receive_flags_reply(lkb, ms);
2979         if (is_altmode(lkb))
2980                 munge_altmode(lkb, ms);
2981         grant_lock_pc(r, lkb, ms);
2982         queue_cast(r, lkb, 0);
2983
2984         unlock_rsb(r);
2985         put_rsb(r);
2986         dlm_put_lkb(lkb);
2987 }
2988
2989 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2990 {
2991         struct dlm_lkb *lkb;
2992         struct dlm_rsb *r;
2993         int error;
2994
2995         error = find_lkb(ls, ms->m_remid, &lkb);
2996         if (error) {
2997                 log_error(ls, "receive_bast no lkb");
2998                 return;
2999         }
3000         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3001
3002         r = lkb->lkb_resource;
3003
3004         hold_rsb(r);
3005         lock_rsb(r);
3006
3007         queue_bast(r, lkb, ms->m_bastmode);
3008
3009         unlock_rsb(r);
3010         put_rsb(r);
3011         dlm_put_lkb(lkb);
3012 }
3013
3014 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3015 {
3016         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3017
3018         from_nodeid = ms->m_header.h_nodeid;
3019         our_nodeid = dlm_our_nodeid();
3020
3021         len = receive_extralen(ms);
3022
3023         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3024         if (dir_nodeid != our_nodeid) {
3025                 log_error(ls, "lookup dir_nodeid %d from %d",
3026                           dir_nodeid, from_nodeid);
3027                 error = -EINVAL;
3028                 ret_nodeid = -1;
3029                 goto out;
3030         }
3031
3032         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3033
3034         /* Optimization: we're master so treat lookup as a request */
3035         if (!error && ret_nodeid == our_nodeid) {
3036                 receive_request(ls, ms);
3037                 return;
3038         }
3039  out:
3040         send_lookup_reply(ls, ms, ret_nodeid, error);
3041 }
3042
3043 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3044 {
3045         int len, dir_nodeid, from_nodeid;
3046
3047         from_nodeid = ms->m_header.h_nodeid;
3048
3049         len = receive_extralen(ms);
3050
3051         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3052         if (dir_nodeid != dlm_our_nodeid()) {
3053                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3054                           dir_nodeid, from_nodeid);
3055                 return;
3056         }
3057
3058         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3059 }
3060
3061 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3062 {
3063         do_purge(ls, ms->m_nodeid, ms->m_pid);
3064 }
3065
3066 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3067 {
3068         struct dlm_lkb *lkb;
3069         struct dlm_rsb *r;
3070         int error, mstype, result;
3071
3072         error = find_lkb(ls, ms->m_remid, &lkb);
3073         if (error) {
3074                 log_error(ls, "receive_request_reply no lkb");
3075                 return;
3076         }
3077         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3078
3079         r = lkb->lkb_resource;
3080         hold_rsb(r);
3081         lock_rsb(r);
3082
3083         mstype = lkb->lkb_wait_type;
3084         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3085         if (error)
3086                 goto out;
3087
3088         /* Optimization: the dir node was also the master, so it took our
3089            lookup as a request and sent request reply instead of lookup reply */
3090         if (mstype == DLM_MSG_LOOKUP) {
3091                 r->res_nodeid = ms->m_header.h_nodeid;
3092                 lkb->lkb_nodeid = r->res_nodeid;
3093         }
3094
3095         /* this is the value returned from do_request() on the master */
3096         result = ms->m_result;
3097
3098         switch (result) {
3099         case -EAGAIN:
3100                 /* request would block (be queued) on remote master */
3101                 queue_cast(r, lkb, -EAGAIN);
3102                 confirm_master(r, -EAGAIN);
3103                 unhold_lkb(lkb); /* undoes create_lkb() */
3104                 break;
3105
3106         case -EINPROGRESS:
3107         case 0:
3108                 /* request was queued or granted on remote master */
3109                 receive_flags_reply(lkb, ms);
3110                 lkb->lkb_remid = ms->m_lkid;
3111                 if (is_altmode(lkb))
3112                         munge_altmode(lkb, ms);
3113                 if (result)
3114                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3115                 else {
3116                         grant_lock_pc(r, lkb, ms);
3117                         queue_cast(r, lkb, 0);
3118                 }
3119                 confirm_master(r, result);
3120                 break;
3121
3122         case -EBADR:
3123         case -ENOTBLK:
3124                 /* find_rsb failed to find rsb or rsb wasn't master */
3125                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3126                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3127                 r->res_nodeid = -1;
3128                 lkb->lkb_nodeid = -1;
3129
3130                 if (is_overlap(lkb)) {
3131                         /* we'll ignore error in cancel/unlock reply */
3132                         queue_cast_overlap(r, lkb);
3133                         unhold_lkb(lkb); /* undoes create_lkb() */
3134                 } else
3135                         _request_lock(r, lkb);
3136                 break;
3137
3138         default:
3139                 log_error(ls, "receive_request_reply %x error %d",
3140                           lkb->lkb_id, result);
3141         }
3142
3143         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3144                 log_debug(ls, "receive_request_reply %x result %d unlock",
3145                           lkb->lkb_id, result);
3146                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3147                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3148                 send_unlock(r, lkb);
3149         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3150                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3151                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3152                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3153                 send_cancel(r, lkb);
3154         } else {
3155                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3156                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3157         }
3158  out:
3159         unlock_rsb(r);
3160         put_rsb(r);
3161         dlm_put_lkb(lkb);
3162 }
3163
3164 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3165                                     struct dlm_message *ms)
3166 {
3167         /* this is the value returned from do_convert() on the master */
3168         switch (ms->m_result) {
3169         case -EAGAIN:
3170                 /* convert would block (be queued) on remote master */
3171                 queue_cast(r, lkb, -EAGAIN);
3172                 break;
3173
3174         case -EINPROGRESS:
3175                 /* convert was queued on remote master */
3176                 receive_flags_reply(lkb, ms);
3177                 if (is_demoted(lkb))
3178                         munge_demoted(lkb, ms);
3179                 del_lkb(r, lkb);
3180                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3181                 break;
3182
3183         case 0:
3184                 /* convert was granted on remote master */
3185                 receive_flags_reply(lkb, ms);
3186                 if (is_demoted(lkb))
3187                         munge_demoted(lkb, ms);
3188                 grant_lock_pc(r, lkb, ms);
3189                 queue_cast(r, lkb, 0);
3190                 break;
3191
3192         default:
3193                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3194                           lkb->lkb_id, ms->m_result);
3195         }
3196 }
3197
3198 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3199 {
3200         struct dlm_rsb *r = lkb->lkb_resource;
3201         int error;
3202
3203         hold_rsb(r);
3204         lock_rsb(r);
3205
3206         /* stub reply can happen with waiters_mutex held */
3207         error = remove_from_waiters_ms(lkb, ms);
3208         if (error)
3209                 goto out;
3210
3211         __receive_convert_reply(r, lkb, ms);
3212  out:
3213         unlock_rsb(r);
3214         put_rsb(r);
3215 }
3216
3217 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3218 {
3219         struct dlm_lkb *lkb;
3220         int error;
3221
3222         error = find_lkb(ls, ms->m_remid, &lkb);
3223         if (error) {
3224                 log_error(ls, "receive_convert_reply no lkb");
3225                 return;
3226         }
3227         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3228
3229         _receive_convert_reply(lkb, ms);
3230         dlm_put_lkb(lkb);
3231 }
3232
3233 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3234 {
3235         struct dlm_rsb *r = lkb->lkb_resource;
3236         int error;
3237
3238         hold_rsb(r);
3239         lock_rsb(r);
3240
3241         /* stub reply can happen with waiters_mutex held */
3242         error = remove_from_waiters_ms(lkb, ms);
3243         if (error)
3244                 goto out;
3245
3246         /* this is the value returned from do_unlock() on the master */
3247
3248         switch (ms->m_result) {
3249         case -DLM_EUNLOCK:
3250                 receive_flags_reply(lkb, ms);
3251                 remove_lock_pc(r, lkb);
3252                 queue_cast(r, lkb, -DLM_EUNLOCK);
3253                 break;
3254         case -ENOENT:
3255                 break;
3256         default:
3257                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3258                           lkb->lkb_id, ms->m_result);
3259         }
3260  out:
3261         unlock_rsb(r);
3262         put_rsb(r);
3263 }
3264
3265 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3266 {
3267         struct dlm_lkb *lkb;
3268         int error;
3269
3270         error = find_lkb(ls, ms->m_remid, &lkb);
3271         if (error) {
3272                 log_error(ls, "receive_unlock_reply no lkb");
3273                 return;
3274         }
3275         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3276
3277         _receive_unlock_reply(lkb, ms);
3278         dlm_put_lkb(lkb);
3279 }
3280
3281 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3282 {
3283         struct dlm_rsb *r = lkb->lkb_resource;
3284         int error;
3285
3286         hold_rsb(r);
3287         lock_rsb(r);
3288
3289         /* stub reply can happen with waiters_mutex held */
3290         error = remove_from_waiters_ms(lkb, ms);
3291         if (error)
3292                 goto out;
3293
3294         /* this is the value returned from do_cancel() on the master */
3295
3296         switch (ms->m_result) {
3297         case -DLM_ECANCEL:
3298                 receive_flags_reply(lkb, ms);
3299                 revert_lock_pc(r, lkb);
3300                 if (ms->m_result)
3301                         queue_cast(r, lkb, -DLM_ECANCEL);
3302                 break;
3303         case 0:
3304                 break;
3305         default:
3306                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3307                           lkb->lkb_id, ms->m_result);
3308         }
3309  out:
3310         unlock_rsb(r);
3311         put_rsb(r);
3312 }
3313
3314 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3315 {
3316         struct dlm_lkb *lkb;
3317         int error;
3318
3319         error = find_lkb(ls, ms->m_remid, &lkb);
3320         if (error) {
3321                 log_error(ls, "receive_cancel_reply no lkb");
3322                 return;
3323         }
3324         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3325
3326         _receive_cancel_reply(lkb, ms);
3327         dlm_put_lkb(lkb);
3328 }
3329
3330 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3331 {
3332         struct dlm_lkb *lkb;
3333         struct dlm_rsb *r;
3334         int error, ret_nodeid;
3335
3336         error = find_lkb(ls, ms->m_lkid, &lkb);
3337         if (error) {
3338                 log_error(ls, "receive_lookup_reply no lkb");
3339                 return;
3340         }
3341
3342         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3343            FIXME: will a non-zero error ever be returned? */
3344
3345         r = lkb->lkb_resource;
3346         hold_rsb(r);
3347         lock_rsb(r);
3348
3349         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3350         if (error)
3351                 goto out;
3352
3353         ret_nodeid = ms->m_nodeid;
3354         if (ret_nodeid == dlm_our_nodeid()) {
3355                 r->res_nodeid = 0;
3356                 ret_nodeid = 0;
3357                 r->res_first_lkid = 0;
3358         } else {
3359                 /* set_master() will copy res_nodeid to lkb_nodeid */
3360                 r->res_nodeid = ret_nodeid;
3361         }
3362
3363         if (is_overlap(lkb)) {
3364                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3365                           lkb->lkb_id, lkb->lkb_flags);
3366                 queue_cast_overlap(r, lkb);
3367                 unhold_lkb(lkb); /* undoes create_lkb() */
3368                 goto out_list;
3369         }
3370
3371         _request_lock(r, lkb);
3372
3373  out_list:
3374         if (!ret_nodeid)
3375                 process_lookup_list(r);
3376  out:
3377         unlock_rsb(r);
3378         put_rsb(r);
3379         dlm_put_lkb(lkb);
3380 }
3381
3382 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3383 {
3384         struct dlm_message *ms = (struct dlm_message *) hd;
3385         struct dlm_ls *ls;
3386         int error = 0;
3387
3388         if (!recovery)
3389                 dlm_message_in(ms);
3390
3391         ls = dlm_find_lockspace_global(hd->h_lockspace);
3392         if (!ls) {
3393                 log_print("drop message %d from %d for unknown lockspace %d",
3394                           ms->m_type, nodeid, hd->h_lockspace);
3395                 return -EINVAL;
3396         }
3397
3398         /* recovery may have just ended leaving a bunch of backed-up requests
3399            in the requestqueue; wait while dlm_recoverd clears them */
3400
3401         if (!recovery)
3402                 dlm_wait_requestqueue(ls);
3403
3404         /* recovery may have just started while there were a bunch of
3405            in-flight requests -- save them in requestqueue to be processed
3406            after recovery.  we can't let dlm_recvd block on the recovery
3407            lock.  if dlm_recoverd is calling this function to clear the
3408            requestqueue, it needs to be interrupted (-EINTR) if another
3409            recovery operation is starting. */
3410
3411         while (1) {
3412                 if (dlm_locking_stopped(ls)) {
3413                         if (recovery) {
3414                                 error = -EINTR;
3415                                 goto out;
3416                         }
3417                         error = dlm_add_requestqueue(ls, nodeid, hd);
3418                         if (error == -EAGAIN)
3419                                 continue;
3420                         else {
3421                                 error = -EINTR;
3422                                 goto out;
3423                         }
3424                 }
3425
3426                 if (dlm_lock_recovery_try(ls))
3427                         break;
3428                 schedule();
3429         }
3430
3431         switch (ms->m_type) {
3432
3433         /* messages sent to a master node */
3434
3435         case DLM_MSG_REQUEST:
3436                 receive_request(ls, ms);
3437                 break;
3438
3439         case DLM_MSG_CONVERT:
3440                 receive_convert(ls, ms);
3441                 break;
3442
3443         case DLM_MSG_UNLOCK:
3444                 receive_unlock(ls, ms);
3445                 break;
3446
3447         case DLM_MSG_CANCEL:
3448                 receive_cancel(ls, ms);
3449                 break;
3450
3451         /* messages sent from a master node (replies to above) */
3452
3453         case DLM_MSG_REQUEST_REPLY:
3454                 receive_request_reply(ls, ms);
3455                 break;
3456
3457         case DLM_MSG_CONVERT_REPLY:
3458                 receive_convert_reply(ls, ms);
3459                 break;
3460
3461         case DLM_MSG_UNLOCK_REPLY:
3462                 receive_unlock_reply(ls, ms);
3463                 break;
3464
3465         case DLM_MSG_CANCEL_REPLY:
3466                 receive_cancel_reply(ls, ms);
3467                 break;
3468
3469         /* messages sent from a master node (only two types of async msg) */
3470
3471         case DLM_MSG_GRANT:
3472                 receive_grant(ls, ms);
3473                 break;
3474
3475         case DLM_MSG_BAST:
3476                 receive_bast(ls, ms);
3477                 break;
3478
3479         /* messages sent to a dir node */
3480
3481         case DLM_MSG_LOOKUP:
3482                 receive_lookup(ls, ms);
3483                 break;
3484
3485         case DLM_MSG_REMOVE:
3486                 receive_remove(ls, ms);
3487                 break;
3488
3489         /* messages sent from a dir node (remove has no reply) */
3490
3491         case DLM_MSG_LOOKUP_REPLY:
3492                 receive_lookup_reply(ls, ms);
3493                 break;
3494
3495         /* other messages */
3496
3497         case DLM_MSG_PURGE:
3498                 receive_purge(ls, ms);
3499                 break;
3500
3501         default:
3502                 log_error(ls, "unknown message type %d", ms->m_type);
3503         }
3504
3505         dlm_unlock_recovery(ls);
3506  out:
3507         dlm_put_lockspace(ls);
3508         dlm_astd_wake();
3509         return error;
3510 }
3511
3512
3513 /*
3514  * Recovery related
3515  */
3516
3517 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3518 {
3519         if (middle_conversion(lkb)) {
3520                 hold_lkb(lkb);
3521                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3522                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3523                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3524                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3525
3526                 /* Same special case as in receive_rcom_lock_args() */
3527                 lkb->lkb_grmode = DLM_LOCK_IV;
3528                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3529                 unhold_lkb(lkb);
3530
3531         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3532                 lkb->lkb_flags |= DLM_IFL_RESEND;
3533         }
3534
3535         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3536            conversions are async; there's no reply from the remote master */
3537 }
3538
3539 /* A waiting lkb needs recovery if the master node has failed, or
3540    the master node is changing (only when no directory is used) */
3541
3542 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3543 {
3544         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3545                 return 1;
3546
3547         if (!dlm_no_directory(ls))
3548                 return 0;
3549
3550         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3551                 return 1;
3552
3553         return 0;
3554 }
3555
3556 /* Recovery for locks that are waiting for replies from nodes that are now
3557    gone.  We can just complete unlocks and cancels by faking a reply from the
3558    dead node.  Requests and up-conversions we flag to be resent after
3559    recovery.  Down-conversions can just be completed with a fake reply like
3560    unlocks.  Conversions between PR and CW need special attention. */
3561
3562 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3563 {
3564         struct dlm_lkb *lkb, *safe;
3565
3566         mutex_lock(&ls->ls_waiters_mutex);
3567
3568         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3569                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3570                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3571
3572                 /* all outstanding lookups, regardless of destination  will be
3573                    resent after recovery is done */
3574
3575                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3576                         lkb->lkb_flags |= DLM_IFL_RESEND;
3577                         continue;
3578                 }
3579
3580                 if (!waiter_needs_recovery(ls, lkb))
3581                         continue;
3582
3583                 switch (lkb->lkb_wait_type) {
3584
3585                 case DLM_MSG_REQUEST:
3586                         lkb->lkb_flags |= DLM_IFL_RESEND;
3587                         break;
3588
3589                 case DLM_MSG_CONVERT:
3590                         recover_convert_waiter(ls, lkb);
3591                         break;
3592
3593                 case DLM_MSG_UNLOCK:
3594                         hold_lkb(lkb);
3595                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3596                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3597                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3598                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3599                         dlm_put_lkb(lkb);
3600                         break;
3601
3602                 case DLM_MSG_CANCEL:
3603                         hold_lkb(lkb);
3604                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3605                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3606                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3607                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3608                         dlm_put_lkb(lkb);
3609                         break;
3610
3611                 default:
3612                         log_error(ls, "invalid lkb wait_type %d",
3613                                   lkb->lkb_wait_type);
3614                 }
3615                 schedule();
3616         }
3617         mutex_unlock(&ls->ls_waiters_mutex);
3618 }
3619
3620 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3621 {
3622         struct dlm_lkb *lkb;
3623         int found = 0;
3624
3625         mutex_lock(&ls->ls_waiters_mutex);
3626         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3627                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3628                         hold_lkb(lkb);
3629                         found = 1;
3630                         break;
3631                 }
3632         }
3633         mutex_unlock(&ls->ls_waiters_mutex);
3634
3635         if (!found)
3636                 lkb = NULL;
3637         return lkb;
3638 }
3639
3640 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3641    master or dir-node for r.  Processing the lkb may result in it being placed
3642    back on waiters. */
3643
3644 /* We do this after normal locking has been enabled and any saved messages
3645    (in requestqueue) have been processed.  We should be confident that at
3646    this point we won't get or process a reply to any of these waiting
3647    operations.  But, new ops may be coming in on the rsbs/locks here from
3648    userspace or remotely. */
3649
3650 /* there may have been an overlap unlock/cancel prior to recovery or after
3651    recovery.  if before, the lkb may still have a pos wait_count; if after, the
3652    overlap flag would just have been set and nothing new sent.  we can be
3653    confident here than any replies to either the initial op or overlap ops
3654    prior to recovery have been received. */
3655
3656 int dlm_recover_waiters_post(struct dlm_ls *ls)
3657 {
3658         struct dlm_lkb *lkb;
3659         struct dlm_rsb *r;
3660         int error = 0, mstype, err, oc, ou;
3661
3662         while (1) {
3663                 if (dlm_locking_stopped(ls)) {
3664                         log_debug(ls, "recover_waiters_post aborted");
3665                         error = -EINTR;
3666                         break;
3667                 }
3668
3669                 lkb = find_resend_waiter(ls);
3670                 if (!lkb)
3671                         break;
3672
3673                 r = lkb->lkb_resource;
3674                 hold_rsb(r);
3675                 lock_rsb(r);
3676
3677                 mstype = lkb->lkb_wait_type;
3678                 oc = is_overlap_cancel(lkb);
3679                 ou = is_overlap_unlock(lkb);
3680                 err = 0;
3681
3682                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3683                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3684
3685                 /* At this point we assume that we won't get a reply to any
3686                    previous op or overlap op on this lock.  First, do a big
3687                    remove_from_waiters() for all previous ops. */
3688
3689                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3690                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3691                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3692                 lkb->lkb_wait_type = 0;
3693                 lkb->lkb_wait_count = 0;
3694                 mutex_lock(&ls->ls_waiters_mutex);
3695                 list_del_init(&lkb->lkb_wait_reply);
3696                 mutex_unlock(&ls->ls_waiters_mutex);
3697                 unhold_lkb(lkb); /* for waiters list */
3698
3699                 if (oc || ou) {
3700                         /* do an unlock or cancel instead of resending */
3701                         switch (mstype) {
3702                         case DLM_MSG_LOOKUP:
3703                         case DLM_MSG_REQUEST:
3704                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3705                                                         -DLM_ECANCEL);
3706                                 unhold_lkb(lkb); /* undoes create_lkb() */
3707                                 break;
3708                         case DLM_MSG_CONVERT:
3709                                 if (oc) {
3710                                         queue_cast(r, lkb, -DLM_ECANCEL);
3711                                 } else {
3712                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3713                                         _unlock_lock(r, lkb);
3714                                 }
3715                                 break;
3716                         default:
3717                                 err = 1;
3718                         }
3719                 } else {
3720                         switch (mstype) {
3721                         case DLM_MSG_LOOKUP:
3722                         case DLM_MSG_REQUEST:
3723                                 _request_lock(r, lkb);
3724                                 if (is_master(r))
3725                                         confirm_master(r, 0);
3726                                 break;
3727                         case DLM_MSG_CONVERT:
3728                                 _convert_lock(r, lkb);
3729                                 break;
3730                         default:
3731                                 err = 1;
3732                         }
3733                 }
3734
3735                 if (err)
3736                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
3737                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3738                 unlock_rsb(r);
3739                 put_rsb(r);
3740                 dlm_put_lkb(lkb);
3741         }
3742
3743         return error;
3744 }
3745
3746 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3747                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3748 {
3749         struct dlm_ls *ls = r->res_ls;
3750         struct dlm_lkb *lkb, *safe;
3751
3752         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3753                 if (test(ls, lkb)) {
3754                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3755                         del_lkb(r, lkb);
3756                         /* this put should free the lkb */
3757                         if (!dlm_put_lkb(lkb))
3758                                 log_error(ls, "purged lkb not released");
3759                 }
3760         }
3761 }
3762
3763 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3764 {
3765         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3766 }
3767
3768 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3769 {
3770         return is_master_copy(lkb);
3771 }
3772
3773 static void purge_dead_locks(struct dlm_rsb *r)
3774 {
3775         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3776         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3777         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3778 }
3779
3780 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3781 {
3782         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3783         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3784         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3785 }
3786
3787 /* Get rid of locks held by nodes that are gone. */
3788
3789 int dlm_purge_locks(struct dlm_ls *ls)
3790 {
3791         struct dlm_rsb *r;
3792
3793         log_debug(ls, "dlm_purge_locks");
3794
3795         down_write(&ls->ls_root_sem);
3796         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3797                 hold_rsb(r);
3798                 lock_rsb(r);
3799                 if (is_master(r))
3800                         purge_dead_locks(r);
3801                 unlock_rsb(r);
3802                 unhold_rsb(r);
3803
3804                 schedule();
3805         }
3806         up_write(&ls->ls_root_sem);
3807
3808         return 0;
3809 }
3810
3811 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3812 {
3813         struct dlm_rsb *r, *r_ret = NULL;
3814
3815         read_lock(&ls->ls_rsbtbl[bucket].lock);
3816         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3817                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3818                         continue;
3819                 hold_rsb(r);
3820                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3821                 r_ret = r;
3822                 break;
3823         }
3824         read_unlock(&ls->ls_rsbtbl[bucket].lock);
3825         return r_ret;
3826 }
3827
3828 void dlm_grant_after_purge(struct dlm_ls *ls)
3829 {
3830         struct dlm_rsb *r;
3831         int bucket = 0;
3832
3833         while (1) {
3834                 r = find_purged_rsb(ls, bucket);
3835                 if (!r) {
3836                         if (bucket == ls->ls_rsbtbl_size - 1)
3837                                 break;
3838                         bucket++;
3839                         continue;
3840                 }
3841                 lock_rsb(r);
3842                 if (is_master(r)) {
3843                         grant_pending_locks(r);
3844                         confirm_master(r, 0);
3845                 }
3846                 unlock_rsb(r);
3847                 put_rsb(r);
3848                 schedule();
3849         }
3850 }
3851
3852 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3853                                          uint32_t remid)
3854 {
3855         struct dlm_lkb *lkb;
3856
3857         list_for_each_entry(lkb, head, lkb_statequeue) {
3858                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3859                         return lkb;
3860         }
3861         return NULL;
3862 }
3863
3864 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3865                                     uint32_t remid)
3866 {
3867         struct dlm_lkb *lkb;
3868
3869         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3870         if (lkb)
3871                 return lkb;
3872         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3873         if (lkb)
3874                 return lkb;
3875         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3876         if (lkb)
3877                 return lkb;
3878         return NULL;
3879 }
3880
3881 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3882                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3883 {
3884         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3885         int lvblen;
3886
3887         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3888         lkb->lkb_ownpid = rl->rl_ownpid;
3889         lkb->lkb_remid = rl->rl_lkid;
3890         lkb->lkb_exflags = rl->rl_exflags;
3891         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3892         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3893         lkb->lkb_lvbseq = rl->rl_lvbseq;
3894         lkb->lkb_rqmode = rl->rl_rqmode;
3895         lkb->lkb_grmode = rl->rl_grmode;
3896         /* don't set lkb_status because add_lkb wants to itself */
3897
3898         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3899         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3900
3901         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3902                 lkb->lkb_lvbptr = allocate_lvb(ls);
3903                 if (!lkb->lkb_lvbptr)
3904                         return -ENOMEM;
3905                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3906                          sizeof(struct rcom_lock);
3907                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3908         }
3909
3910         /* Conversions between PR and CW (middle modes) need special handling.
3911            The real granted mode of these converting locks cannot be determined
3912            until all locks have been rebuilt on the rsb (recover_conversion) */
3913
3914         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3915                 rl->rl_status = DLM_LKSTS_CONVERT;
3916                 lkb->lkb_grmode = DLM_LOCK_IV;
3917                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3918         }
3919
3920         return 0;
3921 }
3922
3923 /* This lkb may have been recovered in a previous aborted recovery so we need
3924    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3925    If so we just send back a standard reply.  If not, we create a new lkb with
3926    the given values and send back our lkid.  We send back our lkid by sending
3927    back the rcom_lock struct we got but with the remid field filled in. */
3928
3929 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3930 {
3931         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3932         struct dlm_rsb *r;
3933         struct dlm_lkb *lkb;
3934         int error;
3935
3936         if (rl->rl_parent_lkid) {
3937                 error = -EOPNOTSUPP;
3938                 goto out;
3939         }
3940
3941         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3942         if (error)
3943                 goto out;
3944
3945         lock_rsb(r);
3946
3947         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3948         if (lkb) {
3949                 error = -EEXIST;
3950                 goto out_remid;
3951         }
3952
3953         error = create_lkb(ls, &lkb);
3954         if (error)
3955                 goto out_unlock;
3956
3957         error = receive_rcom_lock_args(ls, lkb, r, rc);
3958         if (error) {
3959                 __put_lkb(ls, lkb);
3960                 goto out_unlock;
3961         }
3962
3963         attach_lkb(r, lkb);
3964         add_lkb(r, lkb, rl->rl_status);
3965         error = 0;
3966
3967  out_remid:
3968         /* this is the new value returned to the lock holder for
3969            saving in its process-copy lkb */
3970         rl->rl_remid = lkb->lkb_id;
3971
3972  out_unlock:
3973         unlock_rsb(r);
3974         put_rsb(r);
3975  out:
3976         if (error)
3977                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3978         rl->rl_result = error;
3979         return error;
3980 }
3981
3982 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3983 {
3984         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3985         struct dlm_rsb *r;
3986         struct dlm_lkb *lkb;
3987         int error;
3988
3989         error = find_lkb(ls, rl->rl_lkid, &lkb);
3990         if (error) {
3991                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3992                 return error;
3993         }
3994
3995         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3996
3997         error = rl->rl_result;
3998
3999         r = lkb->lkb_resource;
4000         hold_rsb(r);
4001         lock_rsb(r);
4002
4003         switch (error) {
4004         case -EBADR:
4005                 /* There's a chance the new master received our lock before
4006                    dlm_recover_master_reply(), this wouldn't happen if we did
4007                    a barrier between recover_masters and recover_locks. */
4008                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4009                           (unsigned long)r, r->res_name);
4010                 dlm_send_rcom_lock(r, lkb);
4011                 goto out;
4012         case -EEXIST:
4013                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4014                 /* fall through */
4015         case 0:
4016                 lkb->lkb_remid = rl->rl_remid;
4017                 break;
4018         default:
4019                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4020                           error, lkb->lkb_id);
4021         }
4022
4023         /* an ack for dlm_recover_locks() which waits for replies from
4024            all the locks it sends to new masters */
4025         dlm_recovered_lock(r);
4026  out:
4027         unlock_rsb(r);
4028         put_rsb(r);
4029         dlm_put_lkb(lkb);
4030
4031         return 0;
4032 }
4033
4034 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4035                      int mode, uint32_t flags, void *name, unsigned int namelen,
4036                      uint32_t parent_lkid)
4037 {
4038         struct dlm_lkb *lkb;
4039         struct dlm_args args;
4040         int error;
4041
4042         dlm_lock_recovery(ls);
4043
4044         error = create_lkb(ls, &lkb);
4045         if (error) {
4046                 kfree(ua);
4047                 goto out;
4048         }
4049
4050         if (flags & DLM_LKF_VALBLK) {
4051                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4052                 if (!ua->lksb.sb_lvbptr) {
4053                         kfree(ua);
4054                         __put_lkb(ls, lkb);
4055                         error = -ENOMEM;
4056                         goto out;
4057                 }
4058         }
4059
4060         /* After ua is attached to lkb it will be freed by free_lkb().
4061            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4062            lock and that lkb_astparam is the dlm_user_args structure. */
4063
4064         error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
4065                               DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4066         lkb->lkb_flags |= DLM_IFL_USER;
4067         ua->old_mode = DLM_LOCK_IV;
4068
4069         if (error) {
4070                 __put_lkb(ls, lkb);
4071                 goto out;
4072         }
4073
4074         error = request_lock(ls, lkb, name, namelen, &args);
4075
4076         switch (error) {
4077         case 0:
4078                 break;
4079         case -EINPROGRESS:
4080                 error = 0;
4081                 break;
4082         case -EAGAIN:
4083                 error = 0;
4084                 /* fall through */
4085         default:
4086                 __put_lkb(ls, lkb);
4087                 goto out;
4088         }
4089
4090         /* add this new lkb to the per-process list of locks */
4091         spin_lock(&ua->proc->locks_spin);
4092         hold_lkb(lkb);
4093         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4094         spin_unlock(&ua->proc->locks_spin);
4095  out:
4096         dlm_unlock_recovery(ls);
4097         return error;
4098 }
4099
4100 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4101                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4102 {
4103         struct dlm_lkb *lkb;
4104         struct dlm_args args;
4105         struct dlm_user_args *ua;
4106         int error;
4107
4108         dlm_lock_recovery(ls);
4109
4110         error = find_lkb(ls, lkid, &lkb);
4111         if (error)
4112                 goto out;
4113
4114         /* user can change the params on its lock when it converts it, or
4115            add an lvb that didn't exist before */
4116
4117         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4118
4119         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4120                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4121                 if (!ua->lksb.sb_lvbptr) {
4122                         error = -ENOMEM;
4123                         goto out_put;
4124                 }
4125         }
4126         if (lvb_in && ua->lksb.sb_lvbptr)
4127                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4128
4129         ua->castparam = ua_tmp->castparam;
4130         ua->castaddr = ua_tmp->castaddr;
4131         ua->bastparam = ua_tmp->bastparam;
4132         ua->bastaddr = ua_tmp->bastaddr;
4133         ua->user_lksb = ua_tmp->user_lksb;
4134         ua->old_mode = lkb->lkb_grmode;
4135
4136         error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4137                               ua, DLM_FAKE_USER_AST, &args);
4138         if (error)
4139                 goto out_put;
4140
4141         error = convert_lock(ls, lkb, &args);
4142
4143         if (error == -EINPROGRESS || error == -EAGAIN)
4144                 error = 0;
4145  out_put:
4146         dlm_put_lkb(lkb);
4147  out:
4148         dlm_unlock_recovery(ls);
4149         kfree(ua_tmp);
4150         return error;
4151 }
4152
4153 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4154                     uint32_t flags, uint32_t lkid, char *lvb_in)
4155 {
4156         struct dlm_lkb *lkb;
4157         struct dlm_args args;
4158         struct dlm_user_args *ua;
4159         int error;
4160
4161         dlm_lock_recovery(ls);
4162
4163         error = find_lkb(ls, lkid, &lkb);
4164         if (error)
4165                 goto out;
4166
4167         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4168
4169         if (lvb_in && ua->lksb.sb_lvbptr)
4170                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4171         ua->castparam = ua_tmp->castparam;
4172         ua->user_lksb = ua_tmp->user_lksb;
4173
4174         error = set_unlock_args(flags, ua, &args);
4175         if (error)
4176                 goto out_put;
4177
4178         error = unlock_lock(ls, lkb, &args);
4179
4180         if (error == -DLM_EUNLOCK)
4181                 error = 0;
4182         /* from validate_unlock_args() */
4183         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4184                 error = 0;
4185         if (error)
4186                 goto out_put;
4187
4188         spin_lock(&ua->proc->locks_spin);
4189         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4190         if (!list_empty(&lkb->lkb_ownqueue))
4191                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4192         spin_unlock(&ua->proc->locks_spin);
4193  out_put:
4194         dlm_put_lkb(lkb);
4195  out:
4196         dlm_unlock_recovery(ls);
4197         kfree(ua_tmp);
4198         return error;
4199 }
4200
4201 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4202                     uint32_t flags, uint32_t lkid)
4203 {
4204         struct dlm_lkb *lkb;
4205         struct dlm_args args;
4206         struct dlm_user_args *ua;
4207         int error;
4208
4209         dlm_lock_recovery(ls);
4210
4211         error = find_lkb(ls, lkid, &lkb);
4212         if (error)
4213                 goto out;
4214
4215         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4216         ua->castparam = ua_tmp->castparam;
4217         ua->user_lksb = ua_tmp->user_lksb;
4218
4219         error = set_unlock_args(flags, ua, &args);
4220         if (error)
4221                 goto out_put;
4222
4223         error = cancel_lock(ls, lkb, &args);
4224
4225         if (error == -DLM_ECANCEL)
4226                 error = 0;
4227         /* from validate_unlock_args() */
4228         if (error == -EBUSY)
4229                 error = 0;
4230  out_put:
4231         dlm_put_lkb(lkb);
4232  out:
4233         dlm_unlock_recovery(ls);
4234         kfree(ua_tmp);
4235         return error;
4236 }
4237
4238 /* lkb's that are removed from the waiters list by revert are just left on the
4239    orphans list with the granted orphan locks, to be freed by purge */
4240
4241 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4242 {
4243         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4244         struct dlm_args args;
4245         int error;
4246
4247         hold_lkb(lkb);
4248         mutex_lock(&ls->ls_orphans_mutex);
4249         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4250         mutex_unlock(&ls->ls_orphans_mutex);
4251
4252         set_unlock_args(0, ua, &args);
4253
4254         error = cancel_lock(ls, lkb, &args);
4255         if (error == -DLM_ECANCEL)
4256                 error = 0;
4257         return error;
4258 }
4259
4260 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4261    Regardless of what rsb queue the lock is on, it's removed and freed. */
4262
4263 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4264 {
4265         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4266         struct dlm_args args;
4267         int error;
4268
4269         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4270
4271         error = unlock_lock(ls, lkb, &args);
4272         if (error == -DLM_EUNLOCK)
4273                 error = 0;
4274         return error;
4275 }
4276
4277 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4278    (which does lock_rsb) due to deadlock with receiving a message that does
4279    lock_rsb followed by dlm_user_add_ast() */
4280
4281 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4282                                      struct dlm_user_proc *proc)
4283 {
4284         struct dlm_lkb *lkb = NULL;
4285
4286         mutex_lock(&ls->ls_clear_proc_locks);
4287         if (list_empty(&proc->locks))
4288                 goto out;
4289
4290         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4291         list_del_init(&lkb->lkb_ownqueue);
4292
4293         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4294                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4295         else
4296                 lkb->lkb_flags |= DLM_IFL_DEAD;
4297  out:
4298         mutex_unlock(&ls->ls_clear_proc_locks);
4299         return lkb;
4300 }
4301
4302 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4303    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4304    which we clear here. */
4305
4306 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4307    list, and no more device_writes should add lkb's to proc->locks list; so we
4308    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4309    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4310    them ourself. */
4311
4312 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4313 {
4314         struct dlm_lkb *lkb, *safe;
4315
4316         dlm_lock_recovery(ls);
4317
4318         while (1) {
4319                 lkb = del_proc_lock(ls, proc);
4320                 if (!lkb)
4321                         break;
4322                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4323                         orphan_proc_lock(ls, lkb);
4324                 else
4325                         unlock_proc_lock(ls, lkb);
4326
4327                 /* this removes the reference for the proc->locks list
4328                    added by dlm_user_request, it may result in the lkb
4329                    being freed */
4330
4331                 dlm_put_lkb(lkb);
4332         }
4333
4334         mutex_lock(&ls->ls_clear_proc_locks);
4335
4336         /* in-progress unlocks */
4337         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4338                 list_del_init(&lkb->lkb_ownqueue);
4339                 lkb->lkb_flags |= DLM_IFL_DEAD;
4340                 dlm_put_lkb(lkb);
4341         }
4342
4343         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4344                 list_del(&lkb->lkb_astqueue);
4345                 dlm_put_lkb(lkb);
4346         }
4347
4348         mutex_unlock(&ls->ls_clear_proc_locks);
4349         dlm_unlock_recovery(ls);
4350 }
4351
4352 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4353 {
4354         struct dlm_lkb *lkb, *safe;
4355
4356         while (1) {
4357                 lkb = NULL;
4358                 spin_lock(&proc->locks_spin);
4359                 if (!list_empty(&proc->locks)) {
4360                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4361                                          lkb_ownqueue);
4362                         list_del_init(&lkb->lkb_ownqueue);
4363                 }
4364                 spin_unlock(&proc->locks_spin);
4365
4366                 if (!lkb)
4367                         break;
4368
4369                 lkb->lkb_flags |= DLM_IFL_DEAD;
4370                 unlock_proc_lock(ls, lkb);
4371                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4372         }
4373
4374         spin_lock(&proc->locks_spin);
4375         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4376                 list_del_init(&lkb->lkb_ownqueue);
4377                 lkb->lkb_flags |= DLM_IFL_DEAD;
4378                 dlm_put_lkb(lkb);
4379         }
4380         spin_unlock(&proc->locks_spin);
4381
4382         spin_lock(&proc->asts_spin);
4383         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4384                 list_del(&lkb->lkb_astqueue);
4385                 dlm_put_lkb(lkb);
4386         }
4387         spin_unlock(&proc->asts_spin);
4388 }
4389
4390 /* pid of 0 means purge all orphans */
4391
4392 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4393 {
4394         struct dlm_lkb *lkb, *safe;
4395
4396         mutex_lock(&ls->ls_orphans_mutex);
4397         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4398                 if (pid && lkb->lkb_ownpid != pid)
4399                         continue;
4400                 unlock_proc_lock(ls, lkb);
4401                 list_del_init(&lkb->lkb_ownqueue);
4402                 dlm_put_lkb(lkb);
4403         }
4404         mutex_unlock(&ls->ls_orphans_mutex);
4405 }
4406
4407 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4408 {
4409         struct dlm_message *ms;
4410         struct dlm_mhandle *mh;
4411         int error;
4412
4413         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4414                                 DLM_MSG_PURGE, &ms, &mh);
4415         if (error)
4416                 return error;
4417         ms->m_nodeid = nodeid;
4418         ms->m_pid = pid;
4419
4420         return send_message(mh, ms);
4421 }
4422
4423 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4424                    int nodeid, int pid)
4425 {
4426         int error = 0;
4427
4428         if (nodeid != dlm_our_nodeid()) {
4429                 error = send_purge(ls, nodeid, pid);
4430         } else {
4431                 dlm_lock_recovery(ls);
4432                 if (pid == current->pid)
4433                         purge_proc_locks(ls, proc);
4434                 else
4435                         do_purge(ls, nodeid, pid);
4436                 dlm_unlock_recovery(ls);
4437         }
4438         return error;
4439 }
4440