dlm: fix waiter recovery
authorDavid Teigland <teigland@redhat.com>
Mon, 23 Apr 2012 17:18:18 +0000 (12:18 -0500)
committerDavid Teigland <teigland@redhat.com>
Thu, 26 Apr 2012 20:36:04 +0000 (15:36 -0500)
An outstanding remote operation (an lkb on the "waiter"
list) could sometimes miss being resent during recovery.
The decision was based on the lkb_nodeid field, which
could have changed during an earlier aborted recovery,
so it no longer represents the actual remote destination.
The lkb_wait_nodeid is always the actual remote node,
so it is the best value to use.

Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/lock.c

index 4c58d4a..3d35c59 100644 (file)
@@ -4187,15 +4187,19 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
 /* A waiting lkb needs recovery if the master node has failed, or
    the master node is changing (only when no directory is used) */
 
-static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
+static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
+                                int dir_nodeid)
 {
-       if (dlm_is_removed(ls, lkb->lkb_nodeid))
+       if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
                return 1;
 
        if (!dlm_no_directory(ls))
                return 0;
 
-       if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
+       if (dir_nodeid == dlm_our_nodeid())
+               return 1;
+
+       if (dir_nodeid != lkb->lkb_wait_nodeid)
                return 1;
 
        return 0;
@@ -4212,6 +4216,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
        struct dlm_lkb *lkb, *safe;
        struct dlm_message *ms_stub;
        int wait_type, stub_unlock_result, stub_cancel_result;
+       int dir_nodeid;
 
        ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
        if (!ms_stub) {
@@ -4223,13 +4228,21 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 
        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
 
+               dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
+
                /* exclude debug messages about unlocks because there can be so
                   many and they aren't very interesting */
 
                if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
-                       log_debug(ls, "recover_waiter %x nodeid %d "
-                                 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
-                                 lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
+                       log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+                                 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
+                                 lkb->lkb_id,
+                                 lkb->lkb_remid,
+                                 lkb->lkb_wait_type,
+                                 lkb->lkb_resource->res_nodeid,
+                                 lkb->lkb_nodeid,
+                                 lkb->lkb_wait_nodeid,
+                                 dir_nodeid);
                }
 
                /* all outstanding lookups, regardless of destination  will be
@@ -4240,7 +4253,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                        continue;
                }
 
-               if (!waiter_needs_recovery(ls, lkb))
+               if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
                        continue;
 
                wait_type = lkb->lkb_wait_type;
@@ -4373,8 +4386,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
                ou = is_overlap_unlock(lkb);
                err = 0;
 
-               log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
-                         lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
+               log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+                         "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
+                         "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
+                         r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
+                         dlm_dir_nodeid(r), oc, ou);
 
                /* At this point we assume that we won't get a reply to any
                   previous op or overlap op on this lock.  First, do a big
@@ -4426,9 +4442,12 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
                        }
                }
 
-               if (err)
-                       log_error(ls, "recover_waiters_post %x %d %x %d %d",
-                                 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
+               if (err) {
+                       log_error(ls, "waiter %x msg %d r_nodeid %d "
+                                 "dir_nodeid %d overlap %d %d",
+                                 lkb->lkb_id, mstype, r->res_nodeid,
+                                 dlm_dir_nodeid(r), oc, ou);
+               }
                unlock_rsb(r);
                put_rsb(r);
                dlm_put_lkb(lkb);