mm: frontswap: remove casting from function calls through ops structure
[platform/adaptation/renesas_rcar/renesas_kernel.git] / fs / dlm / lock.c
index 4c58d4a..bdafb65 100644 (file)
@@ -160,11 +160,12 @@ static const int __quecvt_compat_matrix[8][8] = {
 
 void dlm_print_lkb(struct dlm_lkb *lkb)
 {
-       printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
-              "     status %d rqmode %d grmode %d wait_type %d\n",
+       printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
+              "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
               lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
               lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
-              lkb->lkb_grmode, lkb->lkb_wait_type);
+              lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
+              (unsigned long long)lkb->lkb_recover_seq);
 }
 
 static void dlm_print_rsb(struct dlm_rsb *r)
@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb)
 
 static inline int is_master_copy(struct dlm_lkb *lkb)
 {
-       if (lkb->lkb_flags & DLM_IFL_MSTCPY)
-               DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
        return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 }
 
@@ -479,6 +478,9 @@ static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
                kref_get(&r->res_ref);
                goto out;
        }
+       if (error == -ENOTBLK)
+               goto out;
+
        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
        if (error)
                goto out;
@@ -586,6 +588,23 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
        return error;
 }
 
+static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
+{
+       struct rb_node *n;
+       struct dlm_rsb *r;
+       int i;
+
+       for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+               spin_lock(&ls->ls_rsbtbl[i].lock);
+               for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       if (r->res_hash == hash)
+                               dlm_dump_rsb(r);
+               }
+               spin_unlock(&ls->ls_rsbtbl[i].lock);
+       }
+}
+
 /* This is only called to add a reference when the code already holds
    a valid reference to the rsb, so there's no need for locking. */
 
@@ -1064,8 +1083,9 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
                goto out_del;
        }
 
-       log_error(ls, "remwait error %x reply %d flags %x no wait_type",
-                 lkb->lkb_id, mstype, lkb->lkb_flags);
+       log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
+                 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
+                 mstype, lkb->lkb_flags);
        return -1;
 
  out_del:
@@ -1498,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
        }
 
        lkb->lkb_rqmode = DLM_LOCK_IV;
+       lkb->lkb_highbast = 0;
 }
 
 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        set_lvb_lock(r, lkb);
        _grant_lock(r, lkb);
-       lkb->lkb_highbast = 0;
 }
 
 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1866,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
 /* Returns the highest requested mode of all blocked conversions; sets
    cw if there's a blocked conversion to DLM_LOCK_CW. */
 
-static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
+                                unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
        int hi, demoted, quit, grant_restart, demote_restart;
@@ -1885,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
                if (can_be_granted(r, lkb, 0, &deadlk)) {
                        grant_lock_pending(r, lkb);
                        grant_restart = 1;
+                       if (count)
+                               (*count)++;
                        continue;
                }
 
@@ -1918,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
        return max_t(int, high, hi);
 }
 
-static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
+                             unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
 
        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
-               if (can_be_granted(r, lkb, 0, NULL))
+               if (can_be_granted(r, lkb, 0, NULL)) {
                        grant_lock_pending(r, lkb);
-                else {
+                       if (count)
+                               (*count)++;
+               } else {
                        high = max_t(int, lkb->lkb_rqmode, high);
                        if (lkb->lkb_rqmode == DLM_LOCK_CW)
                                *cw = 1;
@@ -1954,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
        return 0;
 }
 
-static void grant_pending_locks(struct dlm_rsb *r)
+static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
        int high = DLM_LOCK_IV;
        int cw = 0;
 
-       DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
+       if (!is_master(r)) {
+               log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+               dlm_dump_rsb(r);
+               return;
+       }
 
-       high = grant_pending_convert(r, high, &cw);
-       high = grant_pending_wait(r, high, &cw);
+       high = grant_pending_convert(r, high, &cw, count);
+       high = grant_pending_wait(r, high, &cw, count);
 
        if (high == DLM_LOCK_IV)
                return;
@@ -2499,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
           before we try again to grant this one. */
 
        if (is_demoted(lkb)) {
-               grant_pending_convert(r, DLM_LOCK_IV, NULL);
+               grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
                if (_can_be_granted(r, lkb, 1)) {
                        grant_lock(r, lkb);
                        queue_cast(r, lkb, 0);
@@ -2527,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
 {
        switch (error) {
        case 0:
-               grant_pending_locks(r);
+               grant_pending_locks(r, NULL);
                /* grant_pending_locks also sends basts */
                break;
        case -EAGAIN:
@@ -2550,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
                              int error)
 {
-       grant_pending_locks(r);
+       grant_pending_locks(r, NULL);
 }
 
 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
@@ -2571,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
                              int error)
 {
        if (error)
-               grant_pending_locks(r);
+               grant_pending_locks(r, NULL);
 }
 
 /*
@@ -3372,7 +3402,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
        return error;
 }
 
-static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
@@ -3412,14 +3442,15 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
                error = 0;
        if (error)
                dlm_put_lkb(lkb);
-       return;
+       return 0;
 
  fail:
        setup_stub_lkb(ls, ms);
        send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+       return error;
 }
 
-static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
@@ -3429,6 +3460,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
        if (error)
                goto fail;
 
+       if (lkb->lkb_remid != ms->m_lkid) {
+               log_error(ls, "receive_convert %x remid %x recover_seq %llu "
+                         "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
+                         (unsigned long long)lkb->lkb_recover_seq,
+                         ms->m_header.h_nodeid, ms->m_lkid);
+               error = -ENOENT;
+               goto fail;
+       }
+
        r = lkb->lkb_resource;
 
        hold_rsb(r);
@@ -3456,14 +3496,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
-       return;
+       return 0;
 
  fail:
        setup_stub_lkb(ls, ms);
        send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+       return error;
 }
 
-static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
@@ -3473,6 +3514,14 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
        if (error)
                goto fail;
 
+       if (lkb->lkb_remid != ms->m_lkid) {
+               log_error(ls, "receive_unlock %x remid %x remote %d %x",
+                         lkb->lkb_id, lkb->lkb_remid,
+                         ms->m_header.h_nodeid, ms->m_lkid);
+               error = -ENOENT;
+               goto fail;
+       }
+
        r = lkb->lkb_resource;
 
        hold_rsb(r);
@@ -3497,14 +3546,15 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
-       return;
+       return 0;
 
  fail:
        setup_stub_lkb(ls, ms);
        send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+       return error;
 }
 
-static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
@@ -3532,25 +3582,23 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
-       return;
+       return 0;
 
  fail:
        setup_stub_lkb(ls, ms);
        send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+       return error;
 }
 
-static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_grant from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        r = lkb->lkb_resource;
 
@@ -3570,20 +3618,18 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
-static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_bast from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        r = lkb->lkb_resource;
 
@@ -3595,10 +3641,12 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
                goto out;
 
        queue_bast(r, lkb, ms->m_bastmode);
+       lkb->lkb_highbast = ms->m_bastmode;
  out:
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3653,18 +3701,15 @@ static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
        do_purge(ls, ms->m_nodeid, ms->m_pid);
 }
 
-static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
        int error, mstype, result;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_request_reply from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        r = lkb->lkb_resource;
        hold_rsb(r);
@@ -3676,8 +3721,13 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
        mstype = lkb->lkb_wait_type;
        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
-       if (error)
+       if (error) {
+               log_error(ls, "receive_request_reply %x remote %d %x result %d",
+                         lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+                         ms->m_result);
+               dlm_dump_rsb(r);
                goto out;
+       }
 
        /* Optimization: the dir node was also the master, so it took our
           lookup as a request and sent request reply instead of lookup reply */
@@ -3755,6 +3805,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -3793,8 +3844,11 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
                break;
 
        default:
-               log_error(r->res_ls, "receive_convert_reply %x error %d",
-                         lkb->lkb_id, ms->m_result);
+               log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
+                         lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+                         ms->m_result);
+               dlm_print_rsb(r);
+               dlm_print_lkb(lkb);
        }
 }
 
@@ -3821,20 +3875,18 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
        put_rsb(r);
 }
 
-static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_convert_reply from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        _receive_convert_reply(lkb, ms);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3873,20 +3925,18 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
        put_rsb(r);
 }
 
-static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_unlock_reply from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        _receive_unlock_reply(lkb, ms);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3925,20 +3975,18 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
        put_rsb(r);
 }
 
-static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_cancel_reply from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        _receive_cancel_reply(lkb, ms);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3949,7 +3997,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
        error = find_lkb(ls, ms->m_lkid, &lkb);
        if (error) {
-               log_error(ls, "receive_lookup_reply no lkb");
+               log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
                return;
        }
 
@@ -3993,8 +4041,11 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
        dlm_put_lkb(lkb);
 }
 
-static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+                            uint32_t saved_seq)
 {
+       int error = 0, noent = 0;
+
        if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
                log_debug(ls, "ignore non-member message %d from %d %x %x %d",
                          ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
@@ -4007,47 +4058,50 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
        /* messages sent to a master node */
 
        case DLM_MSG_REQUEST:
-               receive_request(ls, ms);
+               error = receive_request(ls, ms);
                break;
 
        case DLM_MSG_CONVERT:
-               receive_convert(ls, ms);
+               error = receive_convert(ls, ms);
                break;
 
        case DLM_MSG_UNLOCK:
-               receive_unlock(ls, ms);
+               error = receive_unlock(ls, ms);
                break;
 
        case DLM_MSG_CANCEL:
-               receive_cancel(ls, ms);
+               noent = 1;
+               error = receive_cancel(ls, ms);
                break;
 
        /* messages sent from a master node (replies to above) */
 
        case DLM_MSG_REQUEST_REPLY:
-               receive_request_reply(ls, ms);
+               error = receive_request_reply(ls, ms);
                break;
 
        case DLM_MSG_CONVERT_REPLY:
-               receive_convert_reply(ls, ms);
+               error = receive_convert_reply(ls, ms);
                break;
 
        case DLM_MSG_UNLOCK_REPLY:
-               receive_unlock_reply(ls, ms);
+               error = receive_unlock_reply(ls, ms);
                break;
 
        case DLM_MSG_CANCEL_REPLY:
-               receive_cancel_reply(ls, ms);
+               error = receive_cancel_reply(ls, ms);
                break;
 
        /* messages sent from a master node (only two types of async msg) */
 
        case DLM_MSG_GRANT:
-               receive_grant(ls, ms);
+               noent = 1;
+               error = receive_grant(ls, ms);
                break;
 
        case DLM_MSG_BAST:
-               receive_bast(ls, ms);
+               noent = 1;
+               error = receive_bast(ls, ms);
                break;
 
        /* messages sent to a dir node */
@@ -4075,6 +4129,37 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
        default:
                log_error(ls, "unknown message type %d", ms->m_type);
        }
+
+       /*
+        * When checking for ENOENT, we're checking the result of
+        * find_lkb(m_remid):
+        *
+        * The lock id referenced in the message wasn't found.  This may
+        * happen in normal usage for the async messages and cancel, so
+        * only use log_debug for them.
+        *
+        * Some errors are expected and normal.
+        */
+
+       if (error == -ENOENT && noent) {
+               log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
+                         ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
+                         ms->m_lkid, saved_seq);
+       } else if (error == -ENOENT) {
+               log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
+                         ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
+                         ms->m_lkid, saved_seq);
+
+               if (ms->m_type == DLM_MSG_CONVERT)
+                       dlm_dump_rsb_hash(ls, ms->m_hash);
+       }
+
+       if (error == -EINVAL) {
+               log_error(ls, "receive %d inval from %d lkid %x remid %x "
+                         "saved_seq %u",
+                         ms->m_type, ms->m_header.h_nodeid,
+                         ms->m_lkid, ms->m_remid, saved_seq);
+       }
 }
 
 /* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4092,16 +4177,17 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
                dlm_add_requestqueue(ls, nodeid, ms);
        } else {
                dlm_wait_requestqueue(ls);
-               _receive_message(ls, ms);
+               _receive_message(ls, ms, 0);
        }
 }
 
 /* This is called by dlm_recoverd to process messages that were saved on
    the requestqueue. */
 
-void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
+                              uint32_t saved_seq)
 {
-       _receive_message(ls, ms);
+       _receive_message(ls, ms, saved_seq);
 }
 
 /* This is called by the midcomms layer when something is received for
@@ -4137,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
 
        ls = dlm_find_lockspace_global(hd->h_lockspace);
        if (!ls) {
-               if (dlm_config.ci_log_debug)
-                       log_print("invalid lockspace %x from %d cmd %d type %d",
-                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
+               if (dlm_config.ci_log_debug) {
+                       printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
+                               "%u from %d cmd %d type %d\n",
+                               hd->h_lockspace, nodeid, hd->h_cmd, type);
+               }
 
                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
                        dlm_send_ls_not_ready(nodeid, &p->rcom);
@@ -4187,15 +4275,13 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
 /* A waiting lkb needs recovery if the master node has failed, or
    the master node is changing (only when no directory is used) */
 
-static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
+static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
+                                int dir_nodeid)
 {
-       if (dlm_is_removed(ls, lkb->lkb_nodeid))
+       if (dlm_no_directory(ls))
                return 1;
 
-       if (!dlm_no_directory(ls))
-               return 0;
-
-       if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
+       if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
                return 1;
 
        return 0;
@@ -4212,6 +4298,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
        struct dlm_lkb *lkb, *safe;
        struct dlm_message *ms_stub;
        int wait_type, stub_unlock_result, stub_cancel_result;
+       int dir_nodeid;
 
        ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
        if (!ms_stub) {
@@ -4223,13 +4310,21 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 
        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
 
+               dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
+
                /* exclude debug messages about unlocks because there can be so
                   many and they aren't very interesting */
 
                if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
-                       log_debug(ls, "recover_waiter %x nodeid %d "
-                                 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
-                                 lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
+                       log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+                                 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
+                                 lkb->lkb_id,
+                                 lkb->lkb_remid,
+                                 lkb->lkb_wait_type,
+                                 lkb->lkb_resource->res_nodeid,
+                                 lkb->lkb_nodeid,
+                                 lkb->lkb_wait_nodeid,
+                                 dir_nodeid);
                }
 
                /* all outstanding lookups, regardless of destination  will be
@@ -4240,7 +4335,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                        continue;
                }
 
-               if (!waiter_needs_recovery(ls, lkb))
+               if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
                        continue;
 
                wait_type = lkb->lkb_wait_type;
@@ -4373,8 +4468,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
                ou = is_overlap_unlock(lkb);
                err = 0;
 
-               log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
-                         lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
+               log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+                         "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
+                         "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
+                         r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
+                         dlm_dir_nodeid(r), oc, ou);
 
                /* At this point we assume that we won't get a reply to any
                   previous op or overlap op on this lock.  First, do a big
@@ -4426,9 +4524,12 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
                        }
                }
 
-               if (err)
-                       log_error(ls, "recover_waiters_post %x %d %x %d %d",
-                                 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
+               if (err) {
+                       log_error(ls, "waiter %x msg %d r_nodeid %d "
+                                 "dir_nodeid %d overlap %d %d",
+                                 lkb->lkb_id, mstype, r->res_nodeid,
+                                 dlm_dir_nodeid(r), oc, ou);
+               }
                unlock_rsb(r);
                put_rsb(r);
                dlm_put_lkb(lkb);
@@ -4437,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
        return error;
 }
 
-static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
-                       int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
+static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
+                             struct list_head *list)
 {
-       struct dlm_ls *ls = r->res_ls;
        struct dlm_lkb *lkb, *safe;
 
-       list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
-               if (test(ls, lkb)) {
-                       rsb_set_flag(r, RSB_LOCKS_PURGED);
-                       del_lkb(r, lkb);
-                       /* this put should free the lkb */
-                       if (!dlm_put_lkb(lkb))
-                               log_error(ls, "purged lkb not released");
-               }
+       list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+               if (!is_master_copy(lkb))
+                       continue;
+
+               /* don't purge lkbs we've added in recover_master_copy for
+                  the current recovery seq */
+
+               if (lkb->lkb_recover_seq == ls->ls_recover_seq)
+                       continue;
+
+               del_lkb(r, lkb);
+
+               /* this put should free the lkb */
+               if (!dlm_put_lkb(lkb))
+                       log_error(ls, "purged mstcpy lkb not released");
        }
 }
 
-static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
+void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
 {
-       return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
-}
+       struct dlm_ls *ls = r->res_ls;
 
-static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
-{
-       return is_master_copy(lkb);
+       purge_mstcpy_list(ls, r, &r->res_grantqueue);
+       purge_mstcpy_list(ls, r, &r->res_convertqueue);
+       purge_mstcpy_list(ls, r, &r->res_waitqueue);
 }
 
-static void purge_dead_locks(struct dlm_rsb *r)
+static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
+                           struct list_head *list,
+                           int nodeid_gone, unsigned int *count)
 {
-       purge_queue(r, &r->res_grantqueue, &purge_dead_test);
-       purge_queue(r, &r->res_convertqueue, &purge_dead_test);
-       purge_queue(r, &r->res_waitqueue, &purge_dead_test);
-}
+       struct dlm_lkb *lkb, *safe;
 
-void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
-{
-       purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
-       purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
-       purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
+       list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+               if (!is_master_copy(lkb))
+                       continue;
+
+               if ((lkb->lkb_nodeid == nodeid_gone) ||
+                   dlm_is_removed(ls, lkb->lkb_nodeid)) {
+
+                       del_lkb(r, lkb);
+
+                       /* this put should free the lkb */
+                       if (!dlm_put_lkb(lkb))
+                               log_error(ls, "purged dead lkb not released");
+
+                       rsb_set_flag(r, RSB_RECOVER_GRANT);
+
+                       (*count)++;
+               }
+       }
 }
 
 /* Get rid of locks held by nodes that are gone. */
 
-int dlm_purge_locks(struct dlm_ls *ls)
+void dlm_recover_purge(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
+       struct dlm_member *memb;
+       int nodes_count = 0;
+       int nodeid_gone = 0;
+       unsigned int lkb_count = 0;
 
-       log_debug(ls, "dlm_purge_locks");
+       /* cache one removed nodeid to optimize the common
+          case of a single node removed */
+
+       list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
+               nodes_count++;
+               nodeid_gone = memb->nodeid;
+       }
+
+       if (!nodes_count)
+               return;
 
        down_write(&ls->ls_root_sem);
        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
                hold_rsb(r);
                lock_rsb(r);
-               if (is_master(r))
-                       purge_dead_locks(r);
+               if (is_master(r)) {
+                       purge_dead_list(ls, r, &r->res_grantqueue,
+                                       nodeid_gone, &lkb_count);
+                       purge_dead_list(ls, r, &r->res_convertqueue,
+                                       nodeid_gone, &lkb_count);
+                       purge_dead_list(ls, r, &r->res_waitqueue,
+                                       nodeid_gone, &lkb_count);
+               }
                unlock_rsb(r);
                unhold_rsb(r);
-
-               schedule();
+               cond_resched();
        }
        up_write(&ls->ls_root_sem);
 
-       return 0;
+       if (lkb_count)
+               log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
+                         lkb_count, nodes_count);
 }
 
-static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 {
        struct rb_node *n;
-       struct dlm_rsb *r, *r_ret = NULL;
+       struct dlm_rsb *r;
 
        spin_lock(&ls->ls_rsbtbl[bucket].lock);
        for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
                r = rb_entry(n, struct dlm_rsb, res_hashnode);
-               if (!rsb_flag(r, RSB_LOCKS_PURGED))
+
+               if (!rsb_flag(r, RSB_RECOVER_GRANT))
+                       continue;
+               rsb_clear_flag(r, RSB_RECOVER_GRANT);
+               if (!is_master(r))
                        continue;
                hold_rsb(r);
-               rsb_clear_flag(r, RSB_LOCKS_PURGED);
-               r_ret = r;
-               break;
+               spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+               return r;
        }
        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
-       return r_ret;
+       return NULL;
 }
 
-void dlm_grant_after_purge(struct dlm_ls *ls)
+/*
+ * Attempt to grant locks on resources that we are the master of.
+ * Locks may have become grantable during recovery because locks
+ * from departed nodes have been purged (or not rebuilt), allowing
+ * previously blocked locks to now be granted.  The subset of rsb's
+ * we are interested in are those with lkb's on either the convert or
+ * waiting queues.
+ *
+ * Simplest would be to go through each master rsb and check for non-empty
+ * convert or waiting queues, and attempt to grant on those rsbs.
+ * Checking the queues requires lock_rsb, though, for which we'd need
+ * to release the rsbtbl lock.  This would make iterating through all
+ * rsb's very inefficient.  So, we rely on earlier recovery routines
+ * to set RECOVER_GRANT on any rsb's that we should attempt to grant
+ * locks for.
+ */
+
+void dlm_recover_grant(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
        int bucket = 0;
+       unsigned int count = 0;
+       unsigned int rsb_count = 0;
+       unsigned int lkb_count = 0;
 
        while (1) {
-               r = find_purged_rsb(ls, bucket);
+               r = find_grant_rsb(ls, bucket);
                if (!r) {
                        if (bucket == ls->ls_rsbtbl_size - 1)
                                break;
                        bucket++;
                        continue;
                }
+               rsb_count++;
+               count = 0;
                lock_rsb(r);
-               if (is_master(r)) {
-                       grant_pending_locks(r);
-                       confirm_master(r, 0);
-               }
+               grant_pending_locks(r, &count);
+               lkb_count += count;
+               confirm_master(r, 0);
                unlock_rsb(r);
                put_rsb(r);
-               schedule();
+               cond_resched();
        }
+
+       if (lkb_count)
+               log_debug(ls, "dlm_recover_grant %u locks on %u resources",
+                         lkb_count, rsb_count);
 }
 
 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
@@ -4631,6 +4797,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
        struct dlm_rsb *r;
        struct dlm_lkb *lkb;
+       uint32_t remid = 0;
        int error;
 
        if (rl->rl_parent_lkid) {
@@ -4638,14 +4805,31 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
                goto out;
        }
 
-       error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
-                        R_MASTER, &r);
+       remid = le32_to_cpu(rl->rl_lkid);
+
+       /* In general we expect the rsb returned to be R_MASTER, but we don't
+          have to require it.  Recovery of masters on one node can overlap
+          recovery of locks on another node, so one node can send us MSTCPY
+          locks before we've made ourselves master of this rsb.  We can still
+          add new MSTCPY locks that we receive here without any harm; when
+          we make ourselves master, dlm_recover_masters() won't touch the
+          MSTCPY locks we've received early. */
+
+       error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
        if (error)
                goto out;
 
+       if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
+               log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
+                         rc->rc_header.h_nodeid, remid);
+               error = -EBADR;
+               put_rsb(r);
+               goto out;
+       }
+
        lock_rsb(r);
 
-       lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
+       lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
        if (lkb) {
                error = -EEXIST;
                goto out_remid;
@@ -4664,19 +4848,25 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        attach_lkb(r, lkb);
        add_lkb(r, lkb, rl->rl_status);
        error = 0;
+       ls->ls_recover_locks_in++;
+
+       if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
+               rsb_set_flag(r, RSB_RECOVER_GRANT);
 
  out_remid:
        /* this is the new value returned to the lock holder for
           saving in its process-copy lkb */
        rl->rl_remid = cpu_to_le32(lkb->lkb_id);
 
+       lkb->lkb_recover_seq = ls->ls_recover_seq;
+
  out_unlock:
        unlock_rsb(r);
        put_rsb(r);
  out:
-       if (error)
-               log_debug(ls, "recover_master_copy %d %x", error,
-                         le32_to_cpu(rl->rl_lkid));
+       if (error && error != -EEXIST)
+               log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
+                         rc->rc_header.h_nodeid, remid, error);
        rl->rl_result = cpu_to_le32(error);
        return error;
 }
@@ -4687,41 +4877,52 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
        struct dlm_rsb *r;
        struct dlm_lkb *lkb;
-       int error;
+       uint32_t lkid, remid;
+       int error, result;
+
+       lkid = le32_to_cpu(rl->rl_lkid);
+       remid = le32_to_cpu(rl->rl_remid);
+       result = le32_to_cpu(rl->rl_result);
 
-       error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
+       error = find_lkb(ls, lkid, &lkb);
        if (error) {
-               log_error(ls, "recover_process_copy no lkid %x",
-                               le32_to_cpu(rl->rl_lkid));
+               log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
+                         lkid, rc->rc_header.h_nodeid, remid, result);
                return error;
        }
 
-       DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
-
-       error = le32_to_cpu(rl->rl_result);
-
        r = lkb->lkb_resource;
        hold_rsb(r);
        lock_rsb(r);
 
-       switch (error) {
+       if (!is_process_copy(lkb)) {
+               log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
+                         lkid, rc->rc_header.h_nodeid, remid, result);
+               dlm_dump_rsb(r);
+               unlock_rsb(r);
+               put_rsb(r);
+               dlm_put_lkb(lkb);
+               return -EINVAL;
+       }
+
+       switch (result) {
        case -EBADR:
                /* There's a chance the new master received our lock before
                   dlm_recover_master_reply(), this wouldn't happen if we did
                   a barrier between recover_masters and recover_locks. */
-               log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
-                         (unsigned long)r, r->res_name);
+
+               log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
+                         lkid, rc->rc_header.h_nodeid, remid, result);
+       
                dlm_send_rcom_lock(r, lkb);
                goto out;
        case -EEXIST:
-               log_debug(ls, "master copy exists %x", lkb->lkb_id);
-               /* fall through */
        case 0:
-               lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
+               lkb->lkb_remid = remid;
                break;
        default:
-               log_error(ls, "dlm_recover_process_copy unknown error %d %x",
-                         error, lkb->lkb_id);
+               log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
+                         lkid, rc->rc_header.h_nodeid, remid, result);
        }
 
        /* an ack for dlm_recover_locks() which waits for replies from