svcrdma: Reduce Receive doorbell rate
authorChuck Lever <chuck.lever@oracle.com>
Tue, 8 Dec 2020 18:14:15 +0000 (13:14 -0500)
committerChuck Lever <chuck.lever@oracle.com>
Mon, 25 Jan 2021 14:36:28 +0000 (09:36 -0500)
This is similar to commit e340c2d6ef2a ("xprtrdma: Reduce the
doorbell rate (Receive)") which added Receive batching to the
client.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
include/linux/sunrpc/svc_rdma.h
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c

index 1e76ed6..7c693b3 100644 (file)
@@ -104,6 +104,7 @@ struct svcxprt_rdma {
 
        wait_queue_head_t    sc_send_wait;      /* SQ exhaustion waitlist */
        unsigned long        sc_flags;
+       u32                  sc_pending_recvs;
        struct list_head     sc_read_complete_q;
        struct work_struct   sc_work;
 
index 7d14a74..ab0b7e9 100644 (file)
@@ -266,33 +266,46 @@ void svc_rdma_release_rqst(struct svc_rqst *rqstp)
                svc_rdma_recv_ctxt_put(rdma, ctxt);
 }
 
-static int __svc_rdma_post_recv(struct svcxprt_rdma *rdma,
-                               struct svc_rdma_recv_ctxt *ctxt)
+static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma,
+                                  unsigned int wanted, bool temp)
 {
+       const struct ib_recv_wr *bad_wr = NULL;
+       struct svc_rdma_recv_ctxt *ctxt;
+       struct ib_recv_wr *recv_chain;
        int ret;
 
-       trace_svcrdma_post_recv(ctxt);
-       ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, NULL);
+       recv_chain = NULL;
+       while (wanted--) {
+               ctxt = svc_rdma_recv_ctxt_get(rdma);
+               if (!ctxt)
+                       break;
+
+               trace_svcrdma_post_recv(ctxt);
+               ctxt->rc_temp = temp;
+               ctxt->rc_recv_wr.next = recv_chain;
+               recv_chain = &ctxt->rc_recv_wr;
+               rdma->sc_pending_recvs++;
+       }
+       if (!recv_chain)
+               return false;
+
+       ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr);
        if (ret)
                goto err_post;
-       return 0;
+       return true;
 
 err_post:
-       trace_svcrdma_rq_post_err(rdma, ret);
-       svc_rdma_recv_ctxt_put(rdma, ctxt);
-       return ret;
-}
-
-static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
-{
-       struct svc_rdma_recv_ctxt *ctxt;
+       while (bad_wr) {
+               ctxt = container_of(bad_wr, struct svc_rdma_recv_ctxt,
+                                   rc_recv_wr);
+               bad_wr = bad_wr->next;
+               svc_rdma_recv_ctxt_put(rdma, ctxt);
+       }
 
-       if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
-               return 0;
-       ctxt = svc_rdma_recv_ctxt_get(rdma);
-       if (!ctxt)
-               return -ENOMEM;
-       return __svc_rdma_post_recv(rdma, ctxt);
+       trace_svcrdma_rq_post_err(rdma, ret);
+       /* Since we're destroying the xprt, no need to reset
+        * sc_pending_recvs. */
+       return false;
 }
 
 /**
@@ -303,20 +316,7 @@ static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
  */
 bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
 {
-       struct svc_rdma_recv_ctxt *ctxt;
-       unsigned int i;
-       int ret;
-
-       for (i = 0; i < rdma->sc_max_requests; i++) {
-               ctxt = svc_rdma_recv_ctxt_get(rdma);
-               if (!ctxt)
-                       return false;
-               ctxt->rc_temp = true;
-               ret = __svc_rdma_post_recv(rdma, ctxt);
-               if (ret)
-                       return false;
-       }
-       return true;
+       return svc_rdma_refresh_recvs(rdma, rdma->sc_max_requests, true);
 }
 
 /**
@@ -324,8 +324,6 @@ bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
  * @cq: Completion Queue context
  * @wc: Work Completion object
  *
- * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
- * the Receive completion handler could be running.
  */
 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 {
@@ -333,6 +331,8 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
        struct ib_cqe *cqe = wc->wr_cqe;
        struct svc_rdma_recv_ctxt *ctxt;
 
+       rdma->sc_pending_recvs--;
+
        /* WARNING: Only wc->wr_cqe and wc->status are reliable */
        ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
 
@@ -340,9 +340,6 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
        if (wc->status != IB_WC_SUCCESS)
                goto flushed;
 
-       if (svc_rdma_post_recv(rdma))
-               goto post_err;
-
        /* All wc fields are now known to be valid */
        ctxt->rc_byte_len = wc->byte_len;
        ib_dma_sync_single_for_cpu(rdma->sc_pd->device,
@@ -356,11 +353,18 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
        spin_unlock(&rdma->sc_rq_dto_lock);
        if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
                svc_xprt_enqueue(&rdma->sc_xprt);
+
+       if (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) &&
+           rdma->sc_pending_recvs < rdma->sc_max_requests)
+               if (!svc_rdma_refresh_recvs(rdma, RPCRDMA_MAX_RECV_BATCH,
+                                           false))
+                       goto post_err;
+
        return;
 
 flushed:
-post_err:
        svc_rdma_recv_ctxt_put(rdma, ctxt);
+post_err:
        set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
        svc_xprt_enqueue(&rdma->sc_xprt);
 }