dlm: delayed reply message warning
authorDavid Teigland <teigland@redhat.com>
Mon, 28 Mar 2011 19:17:26 +0000 (14:17 -0500)
committerDavid Teigland <teigland@redhat.com>
Fri, 1 Apr 2011 19:19:06 +0000 (14:19 -0500)
Add an option (disabled by default) to print a warning message
when a lock has been waiting a configurable amount of time for
a reply message from another node.  This is mainly for debugging.

Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/config.c
fs/dlm/config.h
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/lock.h
fs/dlm/lockspace.c

index 0d329ff..9b026ea 100644 (file)
@@ -100,6 +100,7 @@ struct dlm_cluster {
        unsigned int cl_log_debug;
        unsigned int cl_protocol;
        unsigned int cl_timewarn_cs;
+       unsigned int cl_waitwarn_us;
 };
 
 enum {
@@ -114,6 +115,7 @@ enum {
        CLUSTER_ATTR_LOG_DEBUG,
        CLUSTER_ATTR_PROTOCOL,
        CLUSTER_ATTR_TIMEWARN_CS,
+       CLUSTER_ATTR_WAITWARN_US,
 };
 
 struct cluster_attribute {
@@ -166,6 +168,7 @@ CLUSTER_ATTR(scan_secs, 1);
 CLUSTER_ATTR(log_debug, 0);
 CLUSTER_ATTR(protocol, 0);
 CLUSTER_ATTR(timewarn_cs, 1);
+CLUSTER_ATTR(waitwarn_us, 0);
 
 static struct configfs_attribute *cluster_attrs[] = {
        [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
@@ -179,6 +182,7 @@ static struct configfs_attribute *cluster_attrs[] = {
        [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr,
        [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
        [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
+       [CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
        NULL,
 };
 
@@ -439,6 +443,7 @@ static struct config_group *make_cluster(struct config_group *g,
        cl->cl_log_debug = dlm_config.ci_log_debug;
        cl->cl_protocol = dlm_config.ci_protocol;
        cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
+       cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;
 
        space_list = &sps->ss_group;
        comm_list = &cms->cs_group;
@@ -986,6 +991,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
 #define DEFAULT_LOG_DEBUG          0
 #define DEFAULT_PROTOCOL           0
 #define DEFAULT_TIMEWARN_CS      500 /* 5 sec = 500 centiseconds */
+#define DEFAULT_WAITWARN_US       0
 
 struct dlm_config_info dlm_config = {
        .ci_tcp_port = DEFAULT_TCP_PORT,
@@ -998,6 +1004,7 @@ struct dlm_config_info dlm_config = {
        .ci_scan_secs = DEFAULT_SCAN_SECS,
        .ci_log_debug = DEFAULT_LOG_DEBUG,
        .ci_protocol = DEFAULT_PROTOCOL,
-       .ci_timewarn_cs = DEFAULT_TIMEWARN_CS
+       .ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
+       .ci_waitwarn_us = DEFAULT_WAITWARN_US
 };
 
index 4f1d6fc..dd0ce24 100644 (file)
@@ -28,6 +28,7 @@ struct dlm_config_info {
        int ci_log_debug;
        int ci_protocol;
        int ci_timewarn_cs;
+       int ci_waitwarn_us;
 };
 
 extern struct dlm_config_info dlm_config;
index b942049..6a92478 100644 (file)
@@ -245,6 +245,7 @@ struct dlm_lkb {
 
        int8_t                  lkb_wait_type;  /* type of reply waiting for */
        int8_t                  lkb_wait_count;
+       int                     lkb_wait_nodeid; /* for debugging */
 
        struct list_head        lkb_idtbl_list; /* lockspace lkbtbl */
        struct list_head        lkb_statequeue; /* rsb g/c/w list */
@@ -254,6 +255,7 @@ struct dlm_lkb {
        struct list_head        lkb_ownqueue;   /* list of locks for a process */
        struct list_head        lkb_time_list;
        ktime_t                 lkb_timestamp;
+       ktime_t                 lkb_wait_time;
        unsigned long           lkb_timeout_cs;
 
        struct dlm_callback     lkb_callbacks[DLM_CALLBACKS_SIZE];
index 04b8c44..e3c8641 100644 (file)
@@ -799,10 +799,84 @@ static int msg_reply_type(int mstype)
        return -1;
 }
 
+static int nodeid_warned(int nodeid, int num_nodes, int *warned)
+{
+       int i;
+
+       for (i = 0; i < num_nodes; i++) {
+               if (!warned[i]) {
+                       warned[i] = nodeid;
+                       return 0;
+               }
+               if (warned[i] == nodeid)
+                       return 1;
+       }
+       return 0;
+}
+
+void dlm_scan_waiters(struct dlm_ls *ls)
+{
+       struct dlm_lkb *lkb;
+       ktime_t zero = ktime_set(0, 0);
+       s64 us;
+       s64 debug_maxus = 0;
+       u32 debug_scanned = 0;
+       u32 debug_expired = 0;
+       int num_nodes = 0;
+       int *warned = NULL;
+
+       if (!dlm_config.ci_waitwarn_us)
+               return;
+
+       mutex_lock(&ls->ls_waiters_mutex);
+
+       list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+               if (ktime_equal(lkb->lkb_wait_time, zero))
+                       continue;
+
+               debug_scanned++;
+
+               us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
+
+               if (us < dlm_config.ci_waitwarn_us)
+                       continue;
+
+               lkb->lkb_wait_time = zero;
+
+               debug_expired++;
+               if (us > debug_maxus)
+                       debug_maxus = us;
+
+               if (!num_nodes) {
+                       num_nodes = ls->ls_num_nodes;
+                       warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
+                       if (warned)
+                               memset(warned, 0, num_nodes * sizeof(int));
+               }
+               if (!warned)
+                       continue;
+               if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
+                       continue;
+
+               log_error(ls, "waitwarn %x %lld %d us check connection to "
+                         "node %d", lkb->lkb_id, (long long)us,
+                         dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
+       }
+       mutex_unlock(&ls->ls_waiters_mutex);
+
+       if (warned)
+               kfree(warned);
+
+       if (debug_expired)
+               log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
+                         debug_scanned, debug_expired,
+                         dlm_config.ci_waitwarn_us, (long long)debug_maxus);
+}
+
 /* add/remove lkb from global waiters list of lkb's waiting for
    a reply from a remote node */
 
-static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
+static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
        int error = 0;
@@ -842,6 +916,8 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
 
        lkb->lkb_wait_count++;
        lkb->lkb_wait_type = mstype;
+       lkb->lkb_wait_time = ktime_get();
+       lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
        hold_lkb(lkb);
        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
  out:
@@ -1157,6 +1233,16 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
                lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
        mutex_unlock(&ls->ls_timeout_mutex);
+
+       if (!dlm_config.ci_waitwarn_us)
+               return;
+
+       mutex_lock(&ls->ls_waiters_mutex);
+       list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+               if (ktime_to_us(lkb->lkb_wait_time))
+                       lkb->lkb_wait_time = ktime_get();
+       }
+       mutex_unlock(&ls->ls_waiters_mutex);
 }
 
 /* lkb is master or local copy */
@@ -2844,12 +2930,12 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
        struct dlm_mhandle *mh;
        int to_nodeid, error;
 
-       error = add_to_waiters(lkb, mstype);
+       to_nodeid = r->res_nodeid;
+
+       error = add_to_waiters(lkb, mstype, to_nodeid);
        if (error)
                return error;
 
-       to_nodeid = r->res_nodeid;
-
        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
        if (error)
                goto fail;
@@ -2951,12 +3037,12 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
        struct dlm_mhandle *mh;
        int to_nodeid, error;
 
-       error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
+       to_nodeid = dlm_dir_nodeid(r);
+
+       error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
        if (error)
                return error;
 
-       to_nodeid = dlm_dir_nodeid(r);
-
        error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
        if (error)
                goto fail;
index 88e93c8..265017a 100644 (file)
@@ -24,6 +24,7 @@ int dlm_put_lkb(struct dlm_lkb *lkb);
 void dlm_scan_rsbs(struct dlm_ls *ls);
 int dlm_lock_recovery_try(struct dlm_ls *ls);
 void dlm_unlock_recovery(struct dlm_ls *ls);
+void dlm_scan_waiters(struct dlm_ls *ls);
 void dlm_scan_timeout(struct dlm_ls *ls);
 void dlm_adjust_timeouts(struct dlm_ls *ls);
 
index f994a7d..14cbf40 100644 (file)
@@ -243,7 +243,6 @@ static struct dlm_ls *find_ls_to_scan(void)
 static int dlm_scand(void *data)
 {
        struct dlm_ls *ls;
-       int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
 
        while (!kthread_should_stop()) {
                ls = find_ls_to_scan();
@@ -252,13 +251,14 @@ static int dlm_scand(void *data)
                                ls->ls_scan_time = jiffies;
                                dlm_scan_rsbs(ls);
                                dlm_scan_timeout(ls);
+                               dlm_scan_waiters(ls);
                                dlm_unlock_recovery(ls);
                        } else {
                                ls->ls_scan_time += HZ;
                        }
-               } else {
-                       schedule_timeout_interruptible(timeout_jiffies);
+                       continue;
                }
+               schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
        }
        return 0;
 }