Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 24 May 2011 22:04:00 +0000 (15:04 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 24 May 2011 22:04:00 +0000 (15:04 -0700)
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm:
  dlm: make plock operation killable
  dlm: remove shared message stub for recovery
  dlm: delayed reply message warning
  dlm: Remove superfluous call to recalc_sigpending()

1  2 
fs/dlm/lock.c

diff --combined fs/dlm/lock.c
@@@ -519,7 -519,7 +519,7 @@@ static void toss_rsb(struct kref *kref
        }
  }
  
 -/* When all references to the rsb are gone it's transfered to
 +/* When all references to the rsb are gone it's transferred to
     the tossed list for later disposal. */
  
  static void put_rsb(struct dlm_rsb *r)
@@@ -799,10 -799,84 +799,84 @@@ static int msg_reply_type(int mstype
        return -1;
  }
  
+ static int nodeid_warned(int nodeid, int num_nodes, int *warned)
+ {
+       int i;
+       for (i = 0; i < num_nodes; i++) {
+               if (!warned[i]) {
+                       warned[i] = nodeid;
+                       return 0;
+               }
+               if (warned[i] == nodeid)
+                       return 1;
+       }
+       return 0;
+ }
+ void dlm_scan_waiters(struct dlm_ls *ls)
+ {
+       struct dlm_lkb *lkb;
+       ktime_t zero = ktime_set(0, 0);
+       s64 us;
+       s64 debug_maxus = 0;
+       u32 debug_scanned = 0;
+       u32 debug_expired = 0;
+       int num_nodes = 0;
+       int *warned = NULL;
+       if (!dlm_config.ci_waitwarn_us)
+               return;
+       mutex_lock(&ls->ls_waiters_mutex);
+       list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+               if (ktime_equal(lkb->lkb_wait_time, zero))
+                       continue;
+               debug_scanned++;
+               us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
+               if (us < dlm_config.ci_waitwarn_us)
+                       continue;
+               lkb->lkb_wait_time = zero;
+               debug_expired++;
+               if (us > debug_maxus)
+                       debug_maxus = us;
+               if (!num_nodes) {
+                       num_nodes = ls->ls_num_nodes;
+                       warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
+                       if (warned)
+                               memset(warned, 0, num_nodes * sizeof(int));
+               }
+               if (!warned)
+                       continue;
+               if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
+                       continue;
+               log_error(ls, "waitwarn %x %lld %d us check connection to "
+                         "node %d", lkb->lkb_id, (long long)us,
+                         dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
+       }
+       mutex_unlock(&ls->ls_waiters_mutex);
+       if (warned)
+               kfree(warned);
+       if (debug_expired)
+               log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
+                         debug_scanned, debug_expired,
+                         dlm_config.ci_waitwarn_us, (long long)debug_maxus);
+ }
  /* add/remove lkb from global waiters list of lkb's waiting for
     a reply from a remote node */
  
- static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
+ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
  {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
        int error = 0;
  
        lkb->lkb_wait_count++;
        lkb->lkb_wait_type = mstype;
+       lkb->lkb_wait_time = ktime_get();
+       lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
        hold_lkb(lkb);
        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
   out:
@@@ -961,10 -1037,10 +1037,10 @@@ static int remove_from_waiters_ms(struc
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
        int error;
  
-       if (ms != &ls->ls_stub_ms)
+       if (ms->m_flags != DLM_IFL_STUB_MS)
                mutex_lock(&ls->ls_waiters_mutex);
        error = _remove_from_waiters(lkb, ms->m_type, ms);
-       if (ms != &ls->ls_stub_ms)
+       if (ms->m_flags != DLM_IFL_STUB_MS)
                mutex_unlock(&ls->ls_waiters_mutex);
        return error;
  }
@@@ -1157,6 -1233,16 +1233,16 @@@ void dlm_adjust_timeouts(struct dlm_ls 
        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
                lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
        mutex_unlock(&ls->ls_timeout_mutex);
+       if (!dlm_config.ci_waitwarn_us)
+               return;
+       mutex_lock(&ls->ls_waiters_mutex);
+       list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+               if (ktime_to_us(lkb->lkb_wait_time))
+                       lkb->lkb_wait_time = ktime_get();
+       }
+       mutex_unlock(&ls->ls_waiters_mutex);
  }
  
  /* lkb is master or local copy */
@@@ -1376,14 -1462,8 +1462,8 @@@ static void grant_lock_pending(struct d
     ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
     compatible with other granted locks */
  
- static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
+ static void munge_demoted(struct dlm_lkb *lkb)
  {
-       if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
-               log_print("munge_demoted %x invalid reply type %d",
-                         lkb->lkb_id, ms->m_type);
-               return;
-       }
        if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
                log_print("munge_demoted %x invalid modes gr %d rq %d",
                          lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
@@@ -2844,12 -2924,12 +2924,12 @@@ static int send_common(struct dlm_rsb *
        struct dlm_mhandle *mh;
        int to_nodeid, error;
  
-       error = add_to_waiters(lkb, mstype);
+       to_nodeid = r->res_nodeid;
+       error = add_to_waiters(lkb, mstype, to_nodeid);
        if (error)
                return error;
  
-       to_nodeid = r->res_nodeid;
        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
        if (error)
                goto fail;
@@@ -2880,9 -2960,9 +2960,9 @@@ static int send_convert(struct dlm_rsb 
        /* down conversions go without a reply from the master */
        if (!error && down_conversion(lkb)) {
                remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
+               r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
                r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
                r->res_ls->ls_stub_ms.m_result = 0;
-               r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
                __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
        }
  
@@@ -2951,12 -3031,12 +3031,12 @@@ static int send_lookup(struct dlm_rsb *
        struct dlm_mhandle *mh;
        int to_nodeid, error;
  
-       error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
+       to_nodeid = dlm_dir_nodeid(r);
+       error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
        if (error)
                return error;
  
-       to_nodeid = dlm_dir_nodeid(r);
        error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
        if (error)
                goto fail;
@@@ -3070,6 -3150,9 +3150,9 @@@ static void receive_flags(struct dlm_lk
  
  static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
  {
+       if (ms->m_flags == DLM_IFL_STUB_MS)
+               return;
        lkb->lkb_sbflags = ms->m_sbflags;
        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
                         (ms->m_flags & 0x0000FFFF);
@@@ -3612,7 -3695,7 +3695,7 @@@ static void __receive_convert_reply(str
                /* convert was queued on remote master */
                receive_flags_reply(lkb, ms);
                if (is_demoted(lkb))
-                       munge_demoted(lkb, ms);
+                       munge_demoted(lkb);
                del_lkb(r, lkb);
                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
                add_timeout(lkb);
                /* convert was granted on remote master */
                receive_flags_reply(lkb, ms);
                if (is_demoted(lkb))
-                       munge_demoted(lkb, ms);
+                       munge_demoted(lkb);
                grant_lock_pc(r, lkb, ms);
                queue_cast(r, lkb, 0);
                break;
@@@ -3996,15 -4079,17 +4079,17 @@@ void dlm_receive_buffer(union dlm_packe
        dlm_put_lockspace(ls);
  }
  
- static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
+ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
+                                  struct dlm_message *ms_stub)
  {
        if (middle_conversion(lkb)) {
                hold_lkb(lkb);
-               ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
-               ls->ls_stub_ms.m_result = -EINPROGRESS;
-               ls->ls_stub_ms.m_flags = lkb->lkb_flags;
-               ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
-               _receive_convert_reply(lkb, &ls->ls_stub_ms);
+               memset(ms_stub, 0, sizeof(struct dlm_message));
+               ms_stub->m_flags = DLM_IFL_STUB_MS;
+               ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
+               ms_stub->m_result = -EINPROGRESS;
+               ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
+               _receive_convert_reply(lkb, ms_stub);
  
                /* Same special case as in receive_rcom_lock_args() */
                lkb->lkb_grmode = DLM_LOCK_IV;
@@@ -4045,13 -4130,27 +4130,27 @@@ static int waiter_needs_recovery(struc
  void dlm_recover_waiters_pre(struct dlm_ls *ls)
  {
        struct dlm_lkb *lkb, *safe;
+       struct dlm_message *ms_stub;
        int wait_type, stub_unlock_result, stub_cancel_result;
  
+       ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
+       if (!ms_stub) {
+               log_error(ls, "dlm_recover_waiters_pre no mem");
+               return;
+       }
        mutex_lock(&ls->ls_waiters_mutex);
  
        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
-               log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
-                         lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
+               /* exclude debug messages about unlocks because there can be so
+                  many and they aren't very interesting */
+               if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
+                       log_debug(ls, "recover_waiter %x nodeid %d "
+                                 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
+                                 lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
+               }
  
                /* all outstanding lookups, regardless of destination  will be
                   resent after recovery is done */
                        break;
  
                case DLM_MSG_CONVERT:
-                       recover_convert_waiter(ls, lkb);
+                       recover_convert_waiter(ls, lkb, ms_stub);
                        break;
  
                case DLM_MSG_UNLOCK:
                        hold_lkb(lkb);
-                       ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
-                       ls->ls_stub_ms.m_result = stub_unlock_result;
-                       ls->ls_stub_ms.m_flags = lkb->lkb_flags;
-                       ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
-                       _receive_unlock_reply(lkb, &ls->ls_stub_ms);
+                       memset(ms_stub, 0, sizeof(struct dlm_message));
+                       ms_stub->m_flags = DLM_IFL_STUB_MS;
+                       ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
+                       ms_stub->m_result = stub_unlock_result;
+                       ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
+                       _receive_unlock_reply(lkb, ms_stub);
                        dlm_put_lkb(lkb);
                        break;
  
                case DLM_MSG_CANCEL:
                        hold_lkb(lkb);
-                       ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
-                       ls->ls_stub_ms.m_result = stub_cancel_result;
-                       ls->ls_stub_ms.m_flags = lkb->lkb_flags;
-                       ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
-                       _receive_cancel_reply(lkb, &ls->ls_stub_ms);
+                       memset(ms_stub, 0, sizeof(struct dlm_message));
+                       ms_stub->m_flags = DLM_IFL_STUB_MS;
+                       ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
+                       ms_stub->m_result = stub_cancel_result;
+                       ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
+                       _receive_cancel_reply(lkb, ms_stub);
                        dlm_put_lkb(lkb);
                        break;
  
                schedule();
        }
        mutex_unlock(&ls->ls_waiters_mutex);
+       kfree(ms_stub);
  }
  
  static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
@@@ -4191,8 -4293,8 +4293,8 @@@ int dlm_recover_waiters_post(struct dlm
                ou = is_overlap_unlock(lkb);
                err = 0;
  
-               log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
-                         lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
+               log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
+                         lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
  
                /* At this point we assume that we won't get a reply to any
                   previous op or overlap op on this lock.  First, do a big