xprtrdma: Make FRWR send queue entry accounting more accurate
authorChuck Lever <chuck.lever@oracle.com>
Tue, 29 Nov 2016 15:52:16 +0000 (10:52 -0500)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Tue, 29 Nov 2016 21:45:44 +0000 (16:45 -0500)
Verbs providers may perform house-keeping on the Send Queue during
each signaled send completion. It is necessary therefore for a verbs
consumer (like xprtrdma) to occasionally force a signaled send
completion if it runs unsignaled most of the time.

xprtrdma does not require signaled completions for Send or FastReg
Work Requests, but does signal some LocalInv Work Requests. To
ensure that Send Queue house-keeping can run before the Send Queue
is more than half-consumed, xprtrdma forces a signaled completion
on occasion by counting the number of Send Queue Entries it
consumes. It currently does this by counting each ib_post_send as
one Entry.

Commit c9918ff56dfb ("xprtrdma: Add ro_unmap_sync method for FRWR")
introduced the ability for frwr_op_unmap_sync to post more than one
Work Request with a single post_send. Thus the underlying assumption
of one Send Queue Entry per ib_post_send is no longer true.

Also, FastReg Work Requests are currently never signaled. They
should be signaled once in a while, just as Send is, to keep the
accounting of consumed SQEs accurate.

While we're here, convert the CQCOUNT macros to the currently
preferred kernel coding style, which is inline functions.

Fixes: c9918ff56dfb ("xprtrdma: Add ro_unmap_sync method for FRWR")
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 26b26beef2d4a6dd7ef9d31f09de7fe51504d841..adbf52c6df833b01548e1e08a5d1565c0ec9c5af 100644 (file)
@@ -421,7 +421,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
                         IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
                         IB_ACCESS_REMOTE_READ;
 
-       DECR_CQCOUNT(&r_xprt->rx_ep);
+       rpcrdma_set_signaled(&r_xprt->rx_ep, &reg_wr->wr);
        rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
        if (rc)
                goto out_senderr;
@@ -486,7 +486,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rpcrdma_mw *mw, *tmp;
        struct rpcrdma_frmr *f;
-       int rc;
+       int count, rc;
 
        dprintk("RPC:       %s: req %p\n", __func__, req);
 
@@ -496,6 +496,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
         * a single ib_post_send() call.
         */
        f = NULL;
+       count = 0;
        invalidate_wrs = pos = prev = NULL;
        list_for_each_entry(mw, &req->rl_registered, mw_list) {
                if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
@@ -505,6 +506,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
                }
 
                pos = __frwr_prepare_linv_wr(mw);
+               count++;
 
                if (!invalidate_wrs)
                        invalidate_wrs = pos;
@@ -523,7 +525,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        f->fr_invwr.send_flags = IB_SEND_SIGNALED;
        f->fr_cqe.done = frwr_wc_localinv_wake;
        reinit_completion(&f->fr_linv_done);
-       INIT_CQCOUNT(&r_xprt->rx_ep);
+
+       /* Initialize CQ count, since there is always a signaled
+        * WR being posted here.  The new cqcount depends on how
+        * many SQEs are about to be consumed.
+        */
+       rpcrdma_init_cqcount(&r_xprt->rx_ep, count);
 
        /* Transport disconnect drains the receive CQ before it
         * replaces the QP. The RPC reply handler won't call us
index ec74289af7ec90b7f33a2ec2e2157c588ed0eeb3..451f5f27d8af07887f38b8b6368b7cba7c7b52ee 100644 (file)
@@ -532,7 +532,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
        if (ep->rep_cqinit <= 2)
                ep->rep_cqinit = 0;     /* always signal? */
-       INIT_CQCOUNT(ep);
+       rpcrdma_init_cqcount(ep, 0);
        init_waitqueue_head(&ep->rep_connect_wait);
        INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
@@ -1311,13 +1311,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
        dprintk("RPC:       %s: posting %d s/g entries\n",
                __func__, send_wr->num_sge);
 
-       if (DECR_CQCOUNT(ep) > 0)
-               send_wr->send_flags = 0;
-       else { /* Provider must take a send completion every now and then */
-               INIT_CQCOUNT(ep);
-               send_wr->send_flags = IB_SEND_SIGNALED;
-       }
-
+       rpcrdma_set_signaled(ep, send_wr);
        rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
        if (rc)
                goto out_postsend_err;
index 6e1bba358203694e79cbc9074554b12053fa45c7..f6ae1b22da476b48d3f9736e7eeaf0625a913b5d 100644 (file)
@@ -95,8 +95,24 @@ struct rpcrdma_ep {
        struct delayed_work     rep_connect_worker;
 };
 
-#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
-#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
+static inline void
+rpcrdma_init_cqcount(struct rpcrdma_ep *ep, int count)
+{
+       atomic_set(&ep->rep_cqcount, ep->rep_cqinit - count);
+}
+
+/* To update send queue accounting, provider must take a
+ * send completion every now and then.
+ */
+static inline void
+rpcrdma_set_signaled(struct rpcrdma_ep *ep, struct ib_send_wr *send_wr)
+{
+       send_wr->send_flags = 0;
+       if (unlikely(atomic_sub_return(1, &ep->rep_cqcount) <= 0)) {
+               rpcrdma_init_cqcount(ep, 0);
+               send_wr->send_flags = IB_SEND_SIGNALED;
+       }
+}
 
 /* Pre-allocate extra Work Requests for handling backward receives
  * and sends. This is a fixed value because the Work Queues are