svcrdma: Remove BH-disabled spin locking in svc_rdma_send()
authorChuck Lever <chuck.lever@oracle.com>
Tue, 29 Nov 2016 16:04:50 +0000 (11:04 -0500)
committerJ. Bruce Fields <bfields@redhat.com>
Wed, 30 Nov 2016 22:31:13 +0000 (17:31 -0500)
svcrdma's current SQ accounting algorithm takes sc_lock and disables
bottom-halves while posting all RDMA Read, Write, and Send WRs.

This is relatively heavyweight serialization. And note that Write and
Send are already fully serialized by the xpt_mutex.

Using a single atomic_t should be all that is necessary to guarantee
that ib_post_send() is called only when there is enough space on the
send queue. This is what the other RDMA-enabled storage targets do.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
include/linux/sunrpc/svc_rdma.h
net/sunrpc/xprtrdma/svc_rdma_sendto.c
net/sunrpc/xprtrdma/svc_rdma_transport.c

index 6aef63b..601cb07 100644 (file)
@@ -139,7 +139,7 @@ struct svcxprt_rdma {
        int                  sc_max_sge_rd;     /* max sge for read target */
        bool                 sc_snd_w_inv;      /* OK to use Send With Invalidate */
 
-       atomic_t             sc_sq_count;       /* Number of SQ WR on queue */
+       atomic_t             sc_sq_avail;       /* SQEs ready to be consumed */
        unsigned int         sc_sq_depth;       /* Depth of SQ */
        unsigned int         sc_rq_depth;       /* Depth of RQ */
        u32                  sc_max_requests;   /* Forward credits */
index 0a58d40..30eeab5 100644 (file)
@@ -594,7 +594,12 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
                goto err0;
        inline_bytes = rqstp->rq_res.len;
 
-       /* Create the RDMA response header */
+       /* Create the RDMA response header. xprt->xpt_mutex,
+        * acquired in svc_send(), serializes RPC replies. The
+        * code path below that inserts the credit grant value
+        * into each transport header runs only inside this
+        * critical section.
+        */
        ret = -ENOMEM;
        res_page = alloc_page(GFP_KERNEL);
        if (!res_page)
index 6864fb9..da990d7 100644 (file)
@@ -434,7 +434,7 @@ static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
                goto err;
 
 out:
-       atomic_dec(&xprt->sc_sq_count);
+       atomic_inc(&xprt->sc_sq_avail);
        wake_up(&xprt->sc_send_wait);
        return;
 
@@ -1008,6 +1008,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        newxprt->sc_rq_depth = newxprt->sc_max_requests +
                               newxprt->sc_max_bc_requests;
        newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth;
+       atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
 
        if (!svc_rdma_prealloc_ctxts(newxprt))
                goto errout;
@@ -1333,15 +1334,13 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
 
        /* If the SQ is full, wait until an SQ entry is available */
        while (1) {
-               spin_lock_bh(&xprt->sc_lock);
-               if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) {
-                       spin_unlock_bh(&xprt->sc_lock);
+               if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) {
                        atomic_inc(&rdma_stat_sq_starve);
 
                        /* Wait until SQ WR available if SQ still full */
+                       atomic_add(wr_count, &xprt->sc_sq_avail);
                        wait_event(xprt->sc_send_wait,
-                                  atomic_read(&xprt->sc_sq_count) <
-                                  xprt->sc_sq_depth);
+                                  atomic_read(&xprt->sc_sq_avail) > wr_count);
                        if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
                                return -ENOTCONN;
                        continue;
@@ -1351,21 +1350,17 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
                        svc_xprt_get(&xprt->sc_xprt);
 
                /* Bump used SQ WR count and post */
-               atomic_add(wr_count, &xprt->sc_sq_count);
                ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
                if (ret) {
                        set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-                       atomic_sub(wr_count, &xprt->sc_sq_count);
                        for (i = 0; i < wr_count; i ++)
                                svc_xprt_put(&xprt->sc_xprt);
-                       dprintk("svcrdma: failed to post SQ WR rc=%d, "
-                              "sc_sq_count=%d, sc_sq_depth=%d\n",
-                              ret, atomic_read(&xprt->sc_sq_count),
-                              xprt->sc_sq_depth);
-               }
-               spin_unlock_bh(&xprt->sc_lock);
-               if (ret)
+                       dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret);
+                       dprintk("    sc_sq_avail=%d, sc_sq_depth=%d\n",
+                               atomic_read(&xprt->sc_sq_avail),
+                               xprt->sc_sq_depth);
                        wake_up(&xprt->sc_send_wait);
+               }
                break;
        }
        return ret;