struct ib_pd *sc_pd;
- spinlock_t sc_ctxt_lock;
- struct list_head sc_ctxts;
+ spinlock_t sc_send_lock;
+ struct list_head sc_send_ctxts;
int sc_ctxt_used;
spinlock_t sc_rw_ctxt_lock;
struct list_head sc_rw_ctxts;
struct page *rc_pages[RPCSVC_MAXPAGES];
};
+enum {
+ RPCRDMA_MAX_SGES = 1 + (RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE),
+};
+
+struct svc_rdma_send_ctxt {
+ struct list_head sc_list;
+ struct ib_send_wr sc_send_wr;
+ struct ib_cqe sc_cqe;
+ int sc_page_count;
+ struct page *sc_pages[RPCSVC_MAXPAGES];
+ struct ib_sge sc_sges[RPCRDMA_MAX_SGES];
+};
+
/* svc_rdma_backchannel.c */
extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
__be32 *rdma_resp,
struct xdr_buf *xdr);
/* svc_rdma_sendto.c */
+extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma);
+extern struct svc_rdma_send_ctxt *
+ svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma);
+extern void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt);
+extern int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr);
extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
__be32 *rdma_resp, unsigned int len);
extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
u32 inv_rkey);
extern int svc_rdma_sendto(struct svc_rqst *);
/* svc_rdma_transport.c */
-extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
-extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
-extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
-extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
-extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
extern void svc_sq_reap(struct svcxprt_rdma *);
extern void svc_rq_reap(struct svcxprt_rdma *);
extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
* DMA-unmap the pages under I/O for that Write segment. The Write
* completion handler does not release any pages.
*
- * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
+ * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
* The ownership of all of the Reply's pages are transferred into that
* ctxt, the Send WR is posted, and sendto returns.
*
- * The svc_rdma_op_ctxt is presented when the Send WR completes. The
+ * The svc_rdma_send_ctxt is presented when the Send WR completes. The
* Send completion handler finally releases the Reply's pages.
*
* This mechanism also assumes that completions on the transport's Send
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc);
+
+static inline struct svc_rdma_send_ctxt *
+svc_rdma_next_send_ctxt(struct list_head *list)
+{
+ return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
+ sc_list);
+}
+
+static struct svc_rdma_send_ctxt *
+svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_send_ctxt *ctxt;
+ int i;
+
+ ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+ if (!ctxt)
+ return NULL;
+
+ ctxt->sc_cqe.done = svc_rdma_wc_send;
+ ctxt->sc_send_wr.next = NULL;
+ ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
+ ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
+ ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
+ for (i = 0; i < ARRAY_SIZE(ctxt->sc_sges); i++)
+ ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
+ return ctxt;
+}
+
+/**
+ * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
+ * @rdma: svcxprt_rdma being torn down
+ *
+ */
+void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_send_ctxt *ctxt;
+
+ while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) {
+ list_del(&ctxt->sc_list);
+ kfree(ctxt);
+ }
+}
+
+/**
+ * svc_rdma_send_ctxt_get - Get a free send_ctxt
+ * @rdma: controlling svcxprt_rdma
+ *
+ * Returns a ready-to-use send_ctxt, or NULL if none are
+ * available and a fresh one cannot be allocated.
+ */
+struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_send_ctxt *ctxt;
+
+ spin_lock(&rdma->sc_send_lock);
+ ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts);
+ if (!ctxt)
+ goto out_empty;
+ list_del(&ctxt->sc_list);
+ spin_unlock(&rdma->sc_send_lock);
+
+out:
+ ctxt->sc_send_wr.num_sge = 0;
+ ctxt->sc_page_count = 0;
+ return ctxt;
+
+out_empty:
+ spin_unlock(&rdma->sc_send_lock);
+ ctxt = svc_rdma_send_ctxt_alloc(rdma);
+ if (!ctxt)
+ return NULL;
+ goto out;
+}
+
+/**
+ * svc_rdma_send_ctxt_put - Return send_ctxt to free list
+ * @rdma: controlling svcxprt_rdma
+ * @ctxt: object to return to the free list
+ *
+ * Pages left in sc_pages are DMA unmapped and released.
+ */
+void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
+{
+ struct ib_device *device = rdma->sc_cm_id->device;
+ unsigned int i;
+
+ for (i = 0; i < ctxt->sc_send_wr.num_sge; i++)
+ ib_dma_unmap_page(device,
+ ctxt->sc_sges[i].addr,
+ ctxt->sc_sges[i].length,
+ DMA_TO_DEVICE);
+
+ for (i = 0; i < ctxt->sc_page_count; ++i)
+ put_page(ctxt->sc_pages[i]);
+
+ spin_lock(&rdma->sc_send_lock);
+ list_add(&ctxt->sc_list, &rdma->sc_send_ctxts);
+ spin_unlock(&rdma->sc_send_lock);
+}
+
+/**
+ * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq: Completion Queue context
+ * @wc: Work Completion object
+ *
+ * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
+ * the Send completion handler could be running.
+ */
+static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct svcxprt_rdma *rdma = cq->cq_context;
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_send_ctxt *ctxt;
+
+ trace_svcrdma_wc_send(wc);
+
+ atomic_inc(&rdma->sc_sq_avail);
+ wake_up(&rdma->sc_send_wait);
+
+ ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
+ svc_rdma_send_ctxt_put(rdma, ctxt);
+
+ if (unlikely(wc->status != IB_WC_SUCCESS)) {
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&rdma->sc_xprt);
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("svcrdma: Send: %s (%u/0x%x)\n",
+ ib_wc_status_msg(wc->status),
+ wc->status, wc->vendor_err);
+ }
+
+ svc_xprt_put(&rdma->sc_xprt);
+}
+
+int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
+{
+ struct ib_send_wr *bad_wr, *n_wr;
+ int wr_count;
+ int i;
+ int ret;
+
+ wr_count = 1;
+ for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
+ wr_count++;
+
+ /* If the SQ is full, wait until an SQ entry is available */
+ while (1) {
+ if ((atomic_sub_return(wr_count, &rdma->sc_sq_avail) < 0)) {
+ atomic_inc(&rdma_stat_sq_starve);
+ trace_svcrdma_sq_full(rdma);
+ atomic_add(wr_count, &rdma->sc_sq_avail);
+ wait_event(rdma->sc_send_wait,
+ atomic_read(&rdma->sc_sq_avail) > wr_count);
+ if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
+ return -ENOTCONN;
+ trace_svcrdma_sq_retry(rdma);
+ continue;
+ }
+ /* Take a transport ref for each WR posted */
+ for (i = 0; i < wr_count; i++)
+ svc_xprt_get(&rdma->sc_xprt);
+
+ /* Bump used SQ WR count and post */
+ ret = ib_post_send(rdma->sc_qp, wr, &bad_wr);
+ trace_svcrdma_post_send(wr, ret);
+ if (ret) {
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ for (i = 0; i < wr_count; i++)
+ svc_xprt_put(&rdma->sc_xprt);
+ wake_up(&rdma->sc_send_wait);
+ }
+ break;
+ }
+ return ret;
+}
+
static u32 xdr_padsize(u32 len)
{
return (len & 3) ? (4 - (len & 3)) : 0;
}
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
unsigned int sge_no,
struct page *page,
unsigned long offset,
if (ib_dma_mapping_error(dev, dma_addr))
goto out_maperr;
- ctxt->sge[sge_no].addr = dma_addr;
- ctxt->sge[sge_no].length = len;
- ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
- ctxt->mapped_sges++;
+ ctxt->sc_sges[sge_no].addr = dma_addr;
+ ctxt->sc_sges[sge_no].length = len;
+ ctxt->sc_send_wr.num_sge++;
return 0;
out_maperr:
* handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
*/
static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
unsigned int sge_no,
unsigned char *base,
unsigned int len)
* %-EIO if DMA mapping failed.
*/
int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
__be32 *rdma_resp,
unsigned int len)
{
- ctxt->direction = DMA_TO_DEVICE;
- ctxt->pages[0] = virt_to_page(rdma_resp);
- ctxt->count = 1;
- return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
+ ctxt->sc_pages[0] = virt_to_page(rdma_resp);
+ ctxt->sc_page_count++;
+ return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->sc_pages[0], 0, len);
}
/* Load the xdr_buf into the ctxt's sge array, and DMA map each
* Returns zero on success, or a negative errno on failure.
*/
static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
struct xdr_buf *xdr, __be32 *wr_lst)
{
unsigned int len, sge_no, remaining;
* so they are released by the Send completion handler.
*/
static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *ctxt)
+ struct svc_rdma_send_ctxt *ctxt)
{
int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
- ctxt->count += pages;
+ ctxt->sc_page_count += pages;
for (i = 0; i < pages; i++) {
- ctxt->pages[i + 1] = rqstp->rq_respages[i];
+ ctxt->sc_pages[i + 1] = rqstp->rq_respages[i];
rqstp->rq_respages[i] = NULL;
}
rqstp->rq_next_page = rqstp->rq_respages + 1;
* %-ENOMEM if ib_post_send failed.
*/
int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
- struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *ctxt,
u32 inv_rkey)
{
- struct ib_send_wr *send_wr = &ctxt->send_wr;
-
dprintk("svcrdma: posting Send WR with %u sge(s)\n",
- ctxt->mapped_sges);
-
- send_wr->next = NULL;
- ctxt->cqe.done = svc_rdma_wc_send;
- send_wr->wr_cqe = &ctxt->cqe;
- send_wr->sg_list = ctxt->sge;
- send_wr->num_sge = ctxt->mapped_sges;
- send_wr->send_flags = IB_SEND_SIGNALED;
+ ctxt->sc_send_wr.num_sge);
+
if (inv_rkey) {
- send_wr->opcode = IB_WR_SEND_WITH_INV;
- send_wr->ex.invalidate_rkey = inv_rkey;
+ ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+ ctxt->sc_send_wr.ex.invalidate_rkey = inv_rkey;
} else {
- send_wr->opcode = IB_WR_SEND;
+ ctxt->sc_send_wr.opcode = IB_WR_SEND;
}
- return svc_rdma_send(rdma, send_wr);
+ return svc_rdma_send(rdma, &ctxt->sc_send_wr);
}
/* Prepare the portion of the RPC Reply that will be transmitted
* via RDMA Send. The RPC-over-RDMA transport header is prepared
- * in sge[0], and the RPC xdr_buf is prepared in following sges.
+ * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
*
* Depending on whether a Write list or Reply chunk is present,
* the server may send all, a portion of, or none of the xdr_buf.
- * In the latter case, only the transport header (sge[0]) is
+ * In the latter case, only the transport header (sc_sges[0]) is
* transmitted.
*
* RDMA Send is the last step of transmitting an RPC reply. Pages
struct svc_rqst *rqstp,
__be32 *wr_lst, __be32 *rp_ch)
{
- struct svc_rdma_op_ctxt *ctxt;
+ struct svc_rdma_send_ctxt *ctxt;
u32 inv_rkey;
int ret;
- ctxt = svc_rdma_get_context(rdma);
+ ctxt = svc_rdma_send_ctxt_get(rdma);
+ if (!ctxt)
+ return -ENOMEM;
ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
svc_rdma_reply_hdr_len(rdma_resp));
return 0;
err:
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
+ svc_rdma_send_ctxt_put(rdma, ctxt);
return ret;
}
static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
__be32 *rdma_resp, struct svc_rqst *rqstp)
{
- struct svc_rdma_op_ctxt *ctxt;
+ struct svc_rdma_send_ctxt *ctxt;
__be32 *p;
int ret;
- ctxt = svc_rdma_get_context(rdma);
+ ctxt = svc_rdma_send_ctxt_get(rdma);
+ if (!ctxt)
+ return -ENOMEM;
/* Replace the original transport header with an
* RDMA_ERROR response. XID etc are preserved.
return 0;
err:
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
+ svc_rdma_send_ctxt_put(rdma, ctxt);
return ret;
}
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
/*
+ * Copyright (c) 2015-2018 Oracle. All rights reserved.
* Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
*
}
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
-static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt,
- gfp_t flags)
-{
- struct svc_rdma_op_ctxt *ctxt;
-
- ctxt = kmalloc(sizeof(*ctxt), flags);
- if (ctxt) {
- ctxt->xprt = xprt;
- INIT_LIST_HEAD(&ctxt->list);
- }
- return ctxt;
-}
-
-static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
-{
- unsigned int i;
-
- i = xprt->sc_sq_depth;
- while (i--) {
- struct svc_rdma_op_ctxt *ctxt;
-
- ctxt = alloc_ctxt(xprt, GFP_KERNEL);
- if (!ctxt) {
- dprintk("svcrdma: No memory for RDMA ctxt\n");
- return false;
- }
- list_add(&ctxt->list, &xprt->sc_ctxts);
- }
- return true;
-}
-
-struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
-{
- struct svc_rdma_op_ctxt *ctxt = NULL;
-
- spin_lock(&xprt->sc_ctxt_lock);
- xprt->sc_ctxt_used++;
- if (list_empty(&xprt->sc_ctxts))
- goto out_empty;
-
- ctxt = list_first_entry(&xprt->sc_ctxts,
- struct svc_rdma_op_ctxt, list);
- list_del(&ctxt->list);
- spin_unlock(&xprt->sc_ctxt_lock);
-
-out:
- ctxt->count = 0;
- ctxt->mapped_sges = 0;
- return ctxt;
-
-out_empty:
- /* Either pre-allocation missed the mark, or send
- * queue accounting is broken.
- */
- spin_unlock(&xprt->sc_ctxt_lock);
-
- ctxt = alloc_ctxt(xprt, GFP_NOIO);
- if (ctxt)
- goto out;
-
- spin_lock(&xprt->sc_ctxt_lock);
- xprt->sc_ctxt_used--;
- spin_unlock(&xprt->sc_ctxt_lock);
- WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n");
- return NULL;
-}
-
-void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
-{
- struct svcxprt_rdma *xprt = ctxt->xprt;
- struct ib_device *device = xprt->sc_cm_id->device;
- unsigned int i;
-
- for (i = 0; i < ctxt->mapped_sges; i++)
- ib_dma_unmap_page(device,
- ctxt->sge[i].addr,
- ctxt->sge[i].length,
- ctxt->direction);
- ctxt->mapped_sges = 0;
-}
-
-void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
-{
- struct svcxprt_rdma *xprt = ctxt->xprt;
- int i;
-
- if (free_pages)
- for (i = 0; i < ctxt->count; i++)
- put_page(ctxt->pages[i]);
-
- spin_lock(&xprt->sc_ctxt_lock);
- xprt->sc_ctxt_used--;
- list_add(&ctxt->list, &xprt->sc_ctxts);
- spin_unlock(&xprt->sc_ctxt_lock);
-}
-
-static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
-{
- while (!list_empty(&xprt->sc_ctxts)) {
- struct svc_rdma_op_ctxt *ctxt;
-
- ctxt = list_first_entry(&xprt->sc_ctxts,
- struct svc_rdma_op_ctxt, list);
- list_del(&ctxt->list);
- kfree(ctxt);
- }
-}
-
/* QP event handler */
static void qp_event_handler(struct ib_event *event, void *context)
{
}
}
-/**
- * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
- * @cq: completion queue
- * @wc: completed WR
- *
- */
-void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct svcxprt_rdma *xprt = cq->cq_context;
- struct ib_cqe *cqe = wc->wr_cqe;
- struct svc_rdma_op_ctxt *ctxt;
-
- trace_svcrdma_wc_send(wc);
-
- atomic_inc(&xprt->sc_sq_avail);
- wake_up(&xprt->sc_send_wait);
-
- ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 1);
-
- if (unlikely(wc->status != IB_WC_SUCCESS)) {
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- svc_xprt_enqueue(&xprt->sc_xprt);
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: Send: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
- }
-
- svc_xprt_put(&xprt->sc_xprt);
-}
-
static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
struct net *net)
{
INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
- INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
+ INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
init_waitqueue_head(&cma_xprt->sc_send_wait);
spin_lock_init(&cma_xprt->sc_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock);
- spin_lock_init(&cma_xprt->sc_ctxt_lock);
+ spin_lock_init(&cma_xprt->sc_send_lock);
spin_lock_init(&cma_xprt->sc_recv_lock);
spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
}
atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
- if (!svc_rdma_prealloc_ctxts(newxprt))
- goto errout;
-
newxprt->sc_pd = ib_alloc_pd(dev, 0);
if (IS_ERR(newxprt->sc_pd)) {
dprintk("svcrdma: error creating PD for connect request\n");
svc_rdma_flush_recv_queues(rdma);
- /* Warn if we leaked a resource or under-referenced */
- if (rdma->sc_ctxt_used != 0)
- pr_err("svcrdma: ctxt still in use? (%d)\n",
- rdma->sc_ctxt_used);
-
/* Final put of backchannel client transport */
if (xprt->xpt_bc_xprt) {
xprt_put(xprt->xpt_bc_xprt);
}
svc_rdma_destroy_rw_ctxts(rdma);
- svc_rdma_destroy_ctxts(rdma);
+ svc_rdma_send_ctxts_destroy(rdma);
svc_rdma_recv_ctxts_destroy(rdma);
/* Destroy the QP if present (not a listener) */
static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
{
}
-
-int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
-{
- struct ib_send_wr *bad_wr, *n_wr;
- int wr_count;
- int i;
- int ret;
-
- if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
- return -ENOTCONN;
-
- wr_count = 1;
- for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
- wr_count++;
-
- /* If the SQ is full, wait until an SQ entry is available */
- while (1) {
- if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) {
- atomic_inc(&rdma_stat_sq_starve);
- trace_svcrdma_sq_full(xprt);
- atomic_add(wr_count, &xprt->sc_sq_avail);
- wait_event(xprt->sc_send_wait,
- atomic_read(&xprt->sc_sq_avail) > wr_count);
- if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
- return -ENOTCONN;
- trace_svcrdma_sq_retry(xprt);
- continue;
- }
- /* Take a transport ref for each WR posted */
- for (i = 0; i < wr_count; i++)
- svc_xprt_get(&xprt->sc_xprt);
-
- /* Bump used SQ WR count and post */
- ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
- trace_svcrdma_post_send(wr, ret);
- if (ret) {
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- for (i = 0; i < wr_count; i ++)
- svc_xprt_put(&xprt->sc_xprt);
- dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret);
- dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n",
- atomic_read(&xprt->sc_sq_avail),
- xprt->sc_sq_depth);
- wake_up(&xprt->sc_send_wait);
- }
- break;
- }
- return ret;
-}