xprtrdma: Make FRWR send queue entry accounting more accurate
authorChuck Lever <chuck.lever@oracle.com>
Tue, 29 Nov 2016 15:52:16 +0000 (10:52 -0500)
committerGreg Kroah-Hartman <gregkh@linuxfoundation.org>
Thu, 26 Jan 2017 07:24:43 +0000 (08:24 +0100)
commit 8d38de65644d900199f035277aa5f3da4aa9fc17 upstream.

Verbs providers may perform house-keeping on the Send Queue during
each signaled send completion. It is necessary therefore for a verbs
consumer (like xprtrdma) to occasionally force a signaled send
completion if it runs unsignaled most of the time.

xprtrdma does not require signaled completions for Send or FastReg
Work Requests, but does signal some LocalInv Work Requests. To
ensure that Send Queue house-keeping can run before the Send Queue
is more than half-consumed, xprtrdma forces a signaled completion
on occasion by counting the number of Send Queue Entries it
consumes. It currently does this by counting each ib_post_send as
one Entry.

Commit c9918ff56dfb ("xprtrdma: Add ro_unmap_sync method for FRWR")
introduced the ability for frwr_op_unmap_sync to post more than one
Work Request with a single post_send. Thus the underlying assumption
of one Send Queue Entry per ib_post_send is no longer true.

Also, FastReg Work Requests are currently never signaled. They
should be signaled once in a while, just as Send is, to keep the
accounting of consumed SQEs accurate.

While we're here, convert the CQCOUNT macros to the currently
preferred kernel coding style, which is inline functions.

Fixes: c9918ff56dfb ("xprtrdma: Add ro_unmap_sync method for FRWR")
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 26b26be..adbf52c 100644 (file)
@@ -421,7 +421,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
                         IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
                         IB_ACCESS_REMOTE_READ;
 
-       DECR_CQCOUNT(&r_xprt->rx_ep);
+       rpcrdma_set_signaled(&r_xprt->rx_ep, &reg_wr->wr);
        rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
        if (rc)
                goto out_senderr;
@@ -486,7 +486,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rpcrdma_mw *mw, *tmp;
        struct rpcrdma_frmr *f;
-       int rc;
+       int count, rc;
 
        dprintk("RPC:       %s: req %p\n", __func__, req);
 
@@ -496,6 +496,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
         * a single ib_post_send() call.
         */
        f = NULL;
+       count = 0;
        invalidate_wrs = pos = prev = NULL;
        list_for_each_entry(mw, &req->rl_registered, mw_list) {
                if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
@@ -505,6 +506,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
                }
 
                pos = __frwr_prepare_linv_wr(mw);
+               count++;
 
                if (!invalidate_wrs)
                        invalidate_wrs = pos;
@@ -523,7 +525,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        f->fr_invwr.send_flags = IB_SEND_SIGNALED;
        f->fr_cqe.done = frwr_wc_localinv_wake;
        reinit_completion(&f->fr_linv_done);
-       INIT_CQCOUNT(&r_xprt->rx_ep);
+
+       /* Initialize CQ count, since there is always a signaled
+        * WR being posted here.  The new cqcount depends on how
+        * many SQEs are about to be consumed.
+        */
+       rpcrdma_init_cqcount(&r_xprt->rx_ep, count);
 
        /* Transport disconnect drains the receive CQ before it
         * replaces the QP. The RPC reply handler won't call us
index ec74289..451f5f2 100644 (file)
@@ -532,7 +532,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
        if (ep->rep_cqinit <= 2)
                ep->rep_cqinit = 0;     /* always signal? */
-       INIT_CQCOUNT(ep);
+       rpcrdma_init_cqcount(ep, 0);
        init_waitqueue_head(&ep->rep_connect_wait);
        INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
@@ -1311,13 +1311,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
        dprintk("RPC:       %s: posting %d s/g entries\n",
                __func__, send_wr->num_sge);
 
-       if (DECR_CQCOUNT(ep) > 0)
-               send_wr->send_flags = 0;
-       else { /* Provider must take a send completion every now and then */
-               INIT_CQCOUNT(ep);
-               send_wr->send_flags = IB_SEND_SIGNALED;
-       }
-
+       rpcrdma_set_signaled(ep, send_wr);
        rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
        if (rc)
                goto out_postsend_err;
index 6e1bba3..f6ae1b2 100644 (file)
@@ -95,8 +95,24 @@ struct rpcrdma_ep {
        struct delayed_work     rep_connect_worker;
 };
 
-#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
-#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
+static inline void
+rpcrdma_init_cqcount(struct rpcrdma_ep *ep, int count)
+{
+       atomic_set(&ep->rep_cqcount, ep->rep_cqinit - count);
+}
+
+/* To update send queue accounting, provider must take a
+ * send completion every now and then.
+ */
+static inline void
+rpcrdma_set_signaled(struct rpcrdma_ep *ep, struct ib_send_wr *send_wr)
+{
+       send_wr->send_flags = 0;
+       if (unlikely(atomic_sub_return(1, &ep->rep_cqcount) <= 0)) {
+               rpcrdma_init_cqcount(ep, 0);
+               send_wr->send_flags = IB_SEND_SIGNALED;
+       }
+}
 
 /* Pre-allocate extra Work Requests for handling backward receives
  * and sends. This is a fixed value because the Work Queues are