svcrdma: Introduce svc_rdma_send_ctxt
authorChuck Lever <chuck.lever@oracle.com>
Mon, 7 May 2018 19:28:04 +0000 (15:28 -0400)
committerJ. Bruce Fields <bfields@redhat.com>
Fri, 11 May 2018 19:48:57 +0000 (15:48 -0400)
svc_rdma_op_ctxt's are pre-allocated and maintained on a per-xprt
free list. This eliminates the overhead of calling kmalloc / kfree,
both of which grab a globally shared lock that disables interrupts.
Introduce a replacement to svc_rdma_op_ctxt's that is built
especially for the svcrdma Send path.

Subsequent patches will take advantage of this new structure by
allocating real resources which are then cached in these objects.
The allocations are freed when the transport is torn down.

I've renamed the structure so that static type checking can be used
to ensure that uses of op_ctxt and send_ctxt are not confused. As an
additional clean up, structure fields are renamed to conform with
kernel coding conventions.

Additional clean ups:
- Handle svc_rdma_send_ctxt_get allocation failure at each call
  site, rather than pre-allocating and hoping we guessed correctly
- All send_ctxt_put call-sites request page freeing, so remove
  the @free_pages argument
- All send_ctxt_put call-sites unmap SGEs, so fold that into
  svc_rdma_send_ctxt_put

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
include/linux/sunrpc/svc_rdma.h
net/sunrpc/xprtrdma/svc_rdma_backchannel.c
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
net/sunrpc/xprtrdma/svc_rdma_sendto.c
net/sunrpc/xprtrdma/svc_rdma_transport.c

index 8827b4e36c3c45f318a62dded47783de75a9a9b6..d3e2bb3312647ab6aaec1c56043f3a3e7af341ae 100644 (file)
@@ -109,8 +109,8 @@ struct svcxprt_rdma {
 
        struct ib_pd         *sc_pd;
 
-       spinlock_t           sc_ctxt_lock;
-       struct list_head     sc_ctxts;
+       spinlock_t           sc_send_lock;
+       struct list_head     sc_send_ctxts;
        int                  sc_ctxt_used;
        spinlock_t           sc_rw_ctxt_lock;
        struct list_head     sc_rw_ctxts;
@@ -158,6 +158,19 @@ struct svc_rdma_recv_ctxt {
        struct page             *rc_pages[RPCSVC_MAXPAGES];
 };
 
+enum {
+       RPCRDMA_MAX_SGES        = 1 + (RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE),
+};
+
+struct svc_rdma_send_ctxt {
+       struct list_head        sc_list;
+       struct ib_send_wr       sc_send_wr;
+       struct ib_cqe           sc_cqe;
+       int                     sc_page_count;
+       struct page             *sc_pages[RPCSVC_MAXPAGES];
+       struct ib_sge           sc_sges[RPCRDMA_MAX_SGES];
+};
+
 /* svc_rdma_backchannel.c */
 extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
                                    __be32 *rdma_resp,
@@ -183,24 +196,22 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
                                     struct xdr_buf *xdr);
 
 /* svc_rdma_sendto.c */
+extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma);
+extern struct svc_rdma_send_ctxt *
+               svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma);
+extern void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
+                                  struct svc_rdma_send_ctxt *ctxt);
+extern int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr);
 extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
-                                 struct svc_rdma_op_ctxt *ctxt,
+                                 struct svc_rdma_send_ctxt *ctxt,
                                  __be32 *rdma_resp, unsigned int len);
 extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
-                                struct svc_rdma_op_ctxt *ctxt,
+                                struct svc_rdma_send_ctxt *ctxt,
                                 u32 inv_rkey);
 extern int svc_rdma_sendto(struct svc_rqst *);
 
 /* svc_rdma_transport.c */
-extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
-extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
 extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
-extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
-extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
-extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
 extern void svc_sq_reap(struct svcxprt_rdma *);
 extern void svc_rq_reap(struct svcxprt_rdma *);
 extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
index 0b9ba9f50a765d86beaa63bafbabdcea649c9ed0..95e33511cc6f3f2758c79bc08f8ef8ad3a93411a 100644 (file)
@@ -1,6 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0
 /*
- * Copyright (c) 2015 Oracle.  All rights reserved.
+ * Copyright (c) 2015-2018 Oracle.  All rights reserved.
  *
  * Support for backward direction RPCs on RPC/RDMA (server-side).
  */
@@ -117,10 +117,14 @@ out_notfound:
 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
                              struct rpc_rqst *rqst)
 {
-       struct svc_rdma_op_ctxt *ctxt;
+       struct svc_rdma_send_ctxt *ctxt;
        int ret;
 
-       ctxt = svc_rdma_get_context(rdma);
+       ctxt = svc_rdma_send_ctxt_get(rdma);
+       if (!ctxt) {
+               ret = -ENOMEM;
+               goto out_err;
+       }
 
        /* rpcrdma_bc_send_request builds the transport header and
         * the backchannel RPC message in the same buffer. Thus only
@@ -144,8 +148,7 @@ out_err:
        return ret;
 
 out_unmap:
-       svc_rdma_unmap_dma(ctxt);
-       svc_rdma_put_context(ctxt, 1);
+       svc_rdma_send_ctxt_put(rdma, ctxt);
        ret = -EIO;
        goto out_err;
 }
index af6d2f3b32420fd10f451d0be73c3e449cc89c76..2d1e0db4c8697327fdc9408c598f384db2c111d6 100644 (file)
@@ -601,7 +601,7 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
 static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
                                __be32 *rdma_argp, int status)
 {
-       struct svc_rdma_op_ctxt *ctxt;
+       struct svc_rdma_send_ctxt *ctxt;
        __be32 *p, *err_msgp;
        unsigned int length;
        struct page *page;
@@ -631,7 +631,10 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
        length = (unsigned long)p - (unsigned long)err_msgp;
 
        /* Map transport header; no RPC message payload */
-       ctxt = svc_rdma_get_context(xprt);
+       ctxt = svc_rdma_send_ctxt_get(xprt);
+       if (!ctxt)
+               return;
+
        ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
        if (ret) {
                dprintk("svcrdma: Error %d mapping send for protocol error\n",
@@ -640,10 +643,8 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
        }
 
        ret = svc_rdma_post_send_wr(xprt, ctxt, 0);
-       if (ret) {
-               svc_rdma_unmap_dma(ctxt);
-               svc_rdma_put_context(ctxt, 1);
-       }
+       if (ret)
+               svc_rdma_send_ctxt_put(xprt, ctxt);
 }
 
 /* By convention, backchannel calls arrive via rdma_msg type
index 4591017adc1e0795e561deb93cf2a75248877bb3..b286d6a6e4294513dc2024d3dd4390c1efa77b42 100644 (file)
  * DMA-unmap the pages under I/O for that Write segment. The Write
  * completion handler does not release any pages.
  *
- * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
+ * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
  * The ownership of all of the Reply's pages are transferred into that
  * ctxt, the Send WR is posted, and sendto returns.
  *
- * The svc_rdma_op_ctxt is presented when the Send WR completes. The
+ * The svc_rdma_send_ctxt is presented when the Send WR completes. The
  * Send completion handler finally releases the Reply's pages.
  *
  * This mechanism also assumes that completions on the transport's Send
 
 #define RPCDBG_FACILITY        RPCDBG_SVCXPRT
 
+static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
+
+static inline struct svc_rdma_send_ctxt *
+svc_rdma_next_send_ctxt(struct list_head *list)
+{
+       return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
+                                       sc_list);
+}
+
+static struct svc_rdma_send_ctxt *
+svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
+{
+       struct svc_rdma_send_ctxt *ctxt;
+       int i;
+
+       ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+       if (!ctxt)
+               return NULL;
+
+       ctxt->sc_cqe.done = svc_rdma_wc_send;
+       ctxt->sc_send_wr.next = NULL;
+       ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
+       ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
+       ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
+       for (i = 0; i < ARRAY_SIZE(ctxt->sc_sges); i++)
+               ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
+       return ctxt;
+}
+
+/**
+ * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
+ * @rdma: svcxprt_rdma being torn down
+ *
+ */
+void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
+{
+       struct svc_rdma_send_ctxt *ctxt;
+
+       while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) {
+               list_del(&ctxt->sc_list);
+               kfree(ctxt);
+       }
+}
+
+/**
+ * svc_rdma_send_ctxt_get - Get a free send_ctxt
+ * @rdma: controlling svcxprt_rdma
+ *
+ * Returns a ready-to-use send_ctxt, or NULL if none are
+ * available and a fresh one cannot be allocated.
+ */
+struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
+{
+       struct svc_rdma_send_ctxt *ctxt;
+
+       spin_lock(&rdma->sc_send_lock);
+       ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts);
+       if (!ctxt)
+               goto out_empty;
+       list_del(&ctxt->sc_list);
+       spin_unlock(&rdma->sc_send_lock);
+
+out:
+       ctxt->sc_send_wr.num_sge = 0;
+       ctxt->sc_page_count = 0;
+       return ctxt;
+
+out_empty:
+       spin_unlock(&rdma->sc_send_lock);
+       ctxt = svc_rdma_send_ctxt_alloc(rdma);
+       if (!ctxt)
+               return NULL;
+       goto out;
+}
+
+/**
+ * svc_rdma_send_ctxt_put - Return send_ctxt to free list
+ * @rdma: controlling svcxprt_rdma
+ * @ctxt: object to return to the free list
+ *
+ * Pages left in sc_pages are DMA unmapped and released.
+ */
+void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
+                           struct svc_rdma_send_ctxt *ctxt)
+{
+       struct ib_device *device = rdma->sc_cm_id->device;
+       unsigned int i;
+
+       for (i = 0; i < ctxt->sc_send_wr.num_sge; i++)
+               ib_dma_unmap_page(device,
+                                 ctxt->sc_sges[i].addr,
+                                 ctxt->sc_sges[i].length,
+                                 DMA_TO_DEVICE);
+
+       for (i = 0; i < ctxt->sc_page_count; ++i)
+               put_page(ctxt->sc_pages[i]);
+
+       spin_lock(&rdma->sc_send_lock);
+       list_add(&ctxt->sc_list, &rdma->sc_send_ctxts);
+       spin_unlock(&rdma->sc_send_lock);
+}
+
+/**
+ * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq: Completion Queue context
+ * @wc: Work Completion object
+ *
+ * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
+ * the Send completion handler could be running.
+ */
+static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
+{
+       struct svcxprt_rdma *rdma = cq->cq_context;
+       struct ib_cqe *cqe = wc->wr_cqe;
+       struct svc_rdma_send_ctxt *ctxt;
+
+       trace_svcrdma_wc_send(wc);
+
+       atomic_inc(&rdma->sc_sq_avail);
+       wake_up(&rdma->sc_send_wait);
+
+       ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
+       svc_rdma_send_ctxt_put(rdma, ctxt);
+
+       if (unlikely(wc->status != IB_WC_SUCCESS)) {
+               set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+               svc_xprt_enqueue(&rdma->sc_xprt);
+               if (wc->status != IB_WC_WR_FLUSH_ERR)
+                       pr_err("svcrdma: Send: %s (%u/0x%x)\n",
+                              ib_wc_status_msg(wc->status),
+                              wc->status, wc->vendor_err);
+       }
+
+       svc_xprt_put(&rdma->sc_xprt);
+}
+
+int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
+{
+       struct ib_send_wr *bad_wr, *n_wr;
+       int wr_count;
+       int i;
+       int ret;
+
+       wr_count = 1;
+       for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
+               wr_count++;
+
+       /* If the SQ is full, wait until an SQ entry is available */
+       while (1) {
+               if ((atomic_sub_return(wr_count, &rdma->sc_sq_avail) < 0)) {
+                       atomic_inc(&rdma_stat_sq_starve);
+                       trace_svcrdma_sq_full(rdma);
+                       atomic_add(wr_count, &rdma->sc_sq_avail);
+                       wait_event(rdma->sc_send_wait,
+                                  atomic_read(&rdma->sc_sq_avail) > wr_count);
+                       if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
+                               return -ENOTCONN;
+                       trace_svcrdma_sq_retry(rdma);
+                       continue;
+               }
+               /* Take a transport ref for each WR posted */
+               for (i = 0; i < wr_count; i++)
+                       svc_xprt_get(&rdma->sc_xprt);
+
+               /* Bump used SQ WR count and post */
+               ret = ib_post_send(rdma->sc_qp, wr, &bad_wr);
+               trace_svcrdma_post_send(wr, ret);
+               if (ret) {
+                       set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+                       for (i = 0; i < wr_count; i++)
+                               svc_xprt_put(&rdma->sc_xprt);
+                       wake_up(&rdma->sc_send_wait);
+               }
+               break;
+       }
+       return ret;
+}
+
 static u32 xdr_padsize(u32 len)
 {
        return (len & 3) ? (4 - (len & 3)) : 0;
@@ -303,7 +481,7 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
 }
 
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
-                                struct svc_rdma_op_ctxt *ctxt,
+                                struct svc_rdma_send_ctxt *ctxt,
                                 unsigned int sge_no,
                                 struct page *page,
                                 unsigned long offset,
@@ -316,10 +494,9 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
        if (ib_dma_mapping_error(dev, dma_addr))
                goto out_maperr;
 
-       ctxt->sge[sge_no].addr = dma_addr;
-       ctxt->sge[sge_no].length = len;
-       ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
-       ctxt->mapped_sges++;
+       ctxt->sc_sges[sge_no].addr = dma_addr;
+       ctxt->sc_sges[sge_no].length = len;
+       ctxt->sc_send_wr.num_sge++;
        return 0;
 
 out_maperr:
@@ -331,7 +508,7 @@ out_maperr:
  * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
  */
 static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
-                               struct svc_rdma_op_ctxt *ctxt,
+                               struct svc_rdma_send_ctxt *ctxt,
                                unsigned int sge_no,
                                unsigned char *base,
                                unsigned int len)
@@ -352,14 +529,13 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
  *     %-EIO if DMA mapping failed.
  */
 int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
-                          struct svc_rdma_op_ctxt *ctxt,
+                          struct svc_rdma_send_ctxt *ctxt,
                           __be32 *rdma_resp,
                           unsigned int len)
 {
-       ctxt->direction = DMA_TO_DEVICE;
-       ctxt->pages[0] = virt_to_page(rdma_resp);
-       ctxt->count = 1;
-       return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
+       ctxt->sc_pages[0] = virt_to_page(rdma_resp);
+       ctxt->sc_page_count++;
+       return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->sc_pages[0], 0, len);
 }
 
 /* Load the xdr_buf into the ctxt's sge array, and DMA map each
@@ -368,7 +544,7 @@ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
  * Returns zero on success, or a negative errno on failure.
  */
 static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
-                                 struct svc_rdma_op_ctxt *ctxt,
+                                 struct svc_rdma_send_ctxt *ctxt,
                                  struct xdr_buf *xdr, __be32 *wr_lst)
 {
        unsigned int len, sge_no, remaining;
@@ -436,13 +612,13 @@ tail:
  * so they are released by the Send completion handler.
  */
 static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
-                                  struct svc_rdma_op_ctxt *ctxt)
+                                  struct svc_rdma_send_ctxt *ctxt)
 {
        int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
 
-       ctxt->count += pages;
+       ctxt->sc_page_count += pages;
        for (i = 0; i < pages; i++) {
-               ctxt->pages[i + 1] = rqstp->rq_respages[i];
+               ctxt->sc_pages[i + 1] = rqstp->rq_respages[i];
                rqstp->rq_respages[i] = NULL;
        }
        rqstp->rq_next_page = rqstp->rq_respages + 1;
@@ -461,37 +637,29 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
  *     %-ENOMEM if ib_post_send failed.
  */
 int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
-                         struct svc_rdma_op_ctxt *ctxt,
+                         struct svc_rdma_send_ctxt *ctxt,
                          u32 inv_rkey)
 {
-       struct ib_send_wr *send_wr = &ctxt->send_wr;
-
        dprintk("svcrdma: posting Send WR with %u sge(s)\n",
-               ctxt->mapped_sges);
-
-       send_wr->next = NULL;
-       ctxt->cqe.done = svc_rdma_wc_send;
-       send_wr->wr_cqe = &ctxt->cqe;
-       send_wr->sg_list = ctxt->sge;
-       send_wr->num_sge = ctxt->mapped_sges;
-       send_wr->send_flags = IB_SEND_SIGNALED;
+               ctxt->sc_send_wr.num_sge);
+
        if (inv_rkey) {
-               send_wr->opcode = IB_WR_SEND_WITH_INV;
-               send_wr->ex.invalidate_rkey = inv_rkey;
+               ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+               ctxt->sc_send_wr.ex.invalidate_rkey = inv_rkey;
        } else {
-               send_wr->opcode = IB_WR_SEND;
+               ctxt->sc_send_wr.opcode = IB_WR_SEND;
        }
 
-       return svc_rdma_send(rdma, send_wr);
+       return svc_rdma_send(rdma, &ctxt->sc_send_wr);
 }
 
 /* Prepare the portion of the RPC Reply that will be transmitted
  * via RDMA Send. The RPC-over-RDMA transport header is prepared
- * in sge[0], and the RPC xdr_buf is prepared in following sges.
+ * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
  *
  * Depending on whether a Write list or Reply chunk is present,
  * the server may send all, a portion of, or none of the xdr_buf.
- * In the latter case, only the transport header (sge[0]) is
+ * In the latter case, only the transport header (sc_sges[0]) is
  * transmitted.
  *
  * RDMA Send is the last step of transmitting an RPC reply. Pages
@@ -508,11 +676,13 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
                                   struct svc_rqst *rqstp,
                                   __be32 *wr_lst, __be32 *rp_ch)
 {
-       struct svc_rdma_op_ctxt *ctxt;
+       struct svc_rdma_send_ctxt *ctxt;
        u32 inv_rkey;
        int ret;
 
-       ctxt = svc_rdma_get_context(rdma);
+       ctxt = svc_rdma_send_ctxt_get(rdma);
+       if (!ctxt)
+               return -ENOMEM;
 
        ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
                                     svc_rdma_reply_hdr_len(rdma_resp));
@@ -538,8 +708,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
        return 0;
 
 err:
-       svc_rdma_unmap_dma(ctxt);
-       svc_rdma_put_context(ctxt, 1);
+       svc_rdma_send_ctxt_put(rdma, ctxt);
        return ret;
 }
 
@@ -553,11 +722,13 @@ err:
 static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
                                   __be32 *rdma_resp, struct svc_rqst *rqstp)
 {
-       struct svc_rdma_op_ctxt *ctxt;
+       struct svc_rdma_send_ctxt *ctxt;
        __be32 *p;
        int ret;
 
-       ctxt = svc_rdma_get_context(rdma);
+       ctxt = svc_rdma_send_ctxt_get(rdma);
+       if (!ctxt)
+               return -ENOMEM;
 
        /* Replace the original transport header with an
         * RDMA_ERROR response. XID etc are preserved.
@@ -580,8 +751,7 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
        return 0;
 
 err:
-       svc_rdma_unmap_dma(ctxt);
-       svc_rdma_put_context(ctxt, 1);
+       svc_rdma_send_ctxt_put(rdma, ctxt);
        return ret;
 }
 
index baeecbb2f763a32c42b7cb04ef3a49d576599b9a..3de81735a6ccfa812928650343e34ffd6dafd662 100644 (file)
@@ -1,5 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
 /*
+ * Copyright (c) 2015-2018 Oracle. All rights reserved.
  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
  *
@@ -157,114 +158,6 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt)
 }
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
-static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt,
-                                          gfp_t flags)
-{
-       struct svc_rdma_op_ctxt *ctxt;
-
-       ctxt = kmalloc(sizeof(*ctxt), flags);
-       if (ctxt) {
-               ctxt->xprt = xprt;
-               INIT_LIST_HEAD(&ctxt->list);
-       }
-       return ctxt;
-}
-
-static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
-{
-       unsigned int i;
-
-       i = xprt->sc_sq_depth;
-       while (i--) {
-               struct svc_rdma_op_ctxt *ctxt;
-
-               ctxt = alloc_ctxt(xprt, GFP_KERNEL);
-               if (!ctxt) {
-                       dprintk("svcrdma: No memory for RDMA ctxt\n");
-                       return false;
-               }
-               list_add(&ctxt->list, &xprt->sc_ctxts);
-       }
-       return true;
-}
-
-struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
-{
-       struct svc_rdma_op_ctxt *ctxt = NULL;
-
-       spin_lock(&xprt->sc_ctxt_lock);
-       xprt->sc_ctxt_used++;
-       if (list_empty(&xprt->sc_ctxts))
-               goto out_empty;
-
-       ctxt = list_first_entry(&xprt->sc_ctxts,
-                               struct svc_rdma_op_ctxt, list);
-       list_del(&ctxt->list);
-       spin_unlock(&xprt->sc_ctxt_lock);
-
-out:
-       ctxt->count = 0;
-       ctxt->mapped_sges = 0;
-       return ctxt;
-
-out_empty:
-       /* Either pre-allocation missed the mark, or send
-        * queue accounting is broken.
-        */
-       spin_unlock(&xprt->sc_ctxt_lock);
-
-       ctxt = alloc_ctxt(xprt, GFP_NOIO);
-       if (ctxt)
-               goto out;
-
-       spin_lock(&xprt->sc_ctxt_lock);
-       xprt->sc_ctxt_used--;
-       spin_unlock(&xprt->sc_ctxt_lock);
-       WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n");
-       return NULL;
-}
-
-void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
-{
-       struct svcxprt_rdma *xprt = ctxt->xprt;
-       struct ib_device *device = xprt->sc_cm_id->device;
-       unsigned int i;
-
-       for (i = 0; i < ctxt->mapped_sges; i++)
-               ib_dma_unmap_page(device,
-                                 ctxt->sge[i].addr,
-                                 ctxt->sge[i].length,
-                                 ctxt->direction);
-       ctxt->mapped_sges = 0;
-}
-
-void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
-{
-       struct svcxprt_rdma *xprt = ctxt->xprt;
-       int i;
-
-       if (free_pages)
-               for (i = 0; i < ctxt->count; i++)
-                       put_page(ctxt->pages[i]);
-
-       spin_lock(&xprt->sc_ctxt_lock);
-       xprt->sc_ctxt_used--;
-       list_add(&ctxt->list, &xprt->sc_ctxts);
-       spin_unlock(&xprt->sc_ctxt_lock);
-}
-
-static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
-{
-       while (!list_empty(&xprt->sc_ctxts)) {
-               struct svc_rdma_op_ctxt *ctxt;
-
-               ctxt = list_first_entry(&xprt->sc_ctxts,
-                                       struct svc_rdma_op_ctxt, list);
-               list_del(&ctxt->list);
-               kfree(ctxt);
-       }
-}
-
 /* QP event handler */
 static void qp_event_handler(struct ib_event *event, void *context)
 {
@@ -292,39 +185,6 @@ static void qp_event_handler(struct ib_event *event, void *context)
        }
 }
 
-/**
- * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
- * @cq:        completion queue
- * @wc:        completed WR
- *
- */
-void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
-{
-       struct svcxprt_rdma *xprt = cq->cq_context;
-       struct ib_cqe *cqe = wc->wr_cqe;
-       struct svc_rdma_op_ctxt *ctxt;
-
-       trace_svcrdma_wc_send(wc);
-
-       atomic_inc(&xprt->sc_sq_avail);
-       wake_up(&xprt->sc_send_wait);
-
-       ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
-       svc_rdma_unmap_dma(ctxt);
-       svc_rdma_put_context(ctxt, 1);
-
-       if (unlikely(wc->status != IB_WC_SUCCESS)) {
-               set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-               svc_xprt_enqueue(&xprt->sc_xprt);
-               if (wc->status != IB_WC_WR_FLUSH_ERR)
-                       pr_err("svcrdma: Send: %s (%u/0x%x)\n",
-                              ib_wc_status_msg(wc->status),
-                              wc->status, wc->vendor_err);
-       }
-
-       svc_xprt_put(&xprt->sc_xprt);
-}
-
 static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
                                                 struct net *net)
 {
@@ -338,14 +198,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
        INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
        INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
        INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
-       INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
+       INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts);
        INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
        INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
        init_waitqueue_head(&cma_xprt->sc_send_wait);
 
        spin_lock_init(&cma_xprt->sc_lock);
        spin_lock_init(&cma_xprt->sc_rq_dto_lock);
-       spin_lock_init(&cma_xprt->sc_ctxt_lock);
+       spin_lock_init(&cma_xprt->sc_send_lock);
        spin_lock_init(&cma_xprt->sc_recv_lock);
        spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
 
@@ -640,9 +500,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        }
        atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
 
-       if (!svc_rdma_prealloc_ctxts(newxprt))
-               goto errout;
-
        newxprt->sc_pd = ib_alloc_pd(dev, 0);
        if (IS_ERR(newxprt->sc_pd)) {
                dprintk("svcrdma: error creating PD for connect request\n");
@@ -794,11 +651,6 @@ static void __svc_rdma_free(struct work_struct *work)
 
        svc_rdma_flush_recv_queues(rdma);
 
-       /* Warn if we leaked a resource or under-referenced */
-       if (rdma->sc_ctxt_used != 0)
-               pr_err("svcrdma: ctxt still in use? (%d)\n",
-                      rdma->sc_ctxt_used);
-
        /* Final put of backchannel client transport */
        if (xprt->xpt_bc_xprt) {
                xprt_put(xprt->xpt_bc_xprt);
@@ -806,7 +658,7 @@ static void __svc_rdma_free(struct work_struct *work)
        }
 
        svc_rdma_destroy_rw_ctxts(rdma);
-       svc_rdma_destroy_ctxts(rdma);
+       svc_rdma_send_ctxts_destroy(rdma);
        svc_rdma_recv_ctxts_destroy(rdma);
 
        /* Destroy the QP if present (not a listener) */
@@ -860,52 +712,3 @@ static void svc_rdma_secure_port(struct svc_rqst *rqstp)
 static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
 {
 }
-
-int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
-{
-       struct ib_send_wr *bad_wr, *n_wr;
-       int wr_count;
-       int i;
-       int ret;
-
-       if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
-               return -ENOTCONN;
-
-       wr_count = 1;
-       for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
-               wr_count++;
-
-       /* If the SQ is full, wait until an SQ entry is available */
-       while (1) {
-               if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) {
-                       atomic_inc(&rdma_stat_sq_starve);
-                       trace_svcrdma_sq_full(xprt);
-                       atomic_add(wr_count, &xprt->sc_sq_avail);
-                       wait_event(xprt->sc_send_wait,
-                                  atomic_read(&xprt->sc_sq_avail) > wr_count);
-                       if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
-                               return -ENOTCONN;
-                       trace_svcrdma_sq_retry(xprt);
-                       continue;
-               }
-               /* Take a transport ref for each WR posted */
-               for (i = 0; i < wr_count; i++)
-                       svc_xprt_get(&xprt->sc_xprt);
-
-               /* Bump used SQ WR count and post */
-               ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
-               trace_svcrdma_post_send(wr, ret);
-               if (ret) {
-                       set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-                       for (i = 0; i < wr_count; i ++)
-                               svc_xprt_put(&xprt->sc_xprt);
-                       dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret);
-                       dprintk("    sc_sq_avail=%d, sc_sq_depth=%d\n",
-                               atomic_read(&xprt->sc_sq_avail),
-                               xprt->sc_sq_depth);
-                       wake_up(&xprt->sc_send_wait);
-               }
-               break;
-       }
-       return ret;
-}