xprtrdma: Pass only the list of registered MRs to ro_unmap_sync
authorChuck Lever <chuck.lever@oracle.com>
Thu, 8 Jun 2017 15:52:04 +0000 (11:52 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Thu, 13 Jul 2017 20:00:10 +0000 (16:00 -0400)
There are rare cases where an rpcrdma_req can be re-used (via
rpcrdma_buffer_put) while the RPC reply handler is still running.
This is due to a signal firing at just the wrong instant.

Since commit 9d6b04097882 ("xprtrdma: Place registered MWs on a
per-req list"), rpcrdma_mws are self-contained; ie., they fully
describe an MR and scatterlist, and no part of that information is
stored in struct rpcrdma_req.

As part of closing the above race window, pass only the req's list
of registered MRs to ro_unmap_sync, rather than the rpcrdma_req
itself.

Some extra transport header sanity checking is removed. Since the
client depends on its own recollection of what memory had been
registered, there doesn't seem to be a way to abuse this change.

And, the check was not terribly effective. If the client had sent
Read chunks, the "list_empty" test is negative in both of the
removed cases, which are actually looking for Write or Reply
chunks.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=305
Fixes: 68791649a725 ('xprtrdma: Invalidate in the RPC reply ... ')
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/fmr_ops.c
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 21f3cd5..5556ed9 100644 (file)
@@ -255,24 +255,26 @@ out_maperr:
  * Sleeps until it is safe for the host CPU to access the
  * previously mapped memory regions.
  *
- * Caller ensures that req->rl_registered is not empty.
+ * Caller ensures that @mws is not empty before the call. This
+ * function empties the list.
  */
 static void
-fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
 {
        struct rpcrdma_mw *mw, *tmp;
        LIST_HEAD(unmap_list);
        int rc;
 
-       dprintk("RPC:       %s: req %p\n", __func__, req);
-
        /* ORDER: Invalidate all of the req's MRs first
         *
         * ib_unmap_fmr() is slow, so use a single call instead
         * of one call per mapped FMR.
         */
-       list_for_each_entry(mw, &req->rl_registered, mw_list)
+       list_for_each_entry(mw, mws, mw_list) {
+               dprintk("RPC:       %s: unmapping fmr %p\n",
+                       __func__, &mw->fmr);
                list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
+       }
        r_xprt->rx_stats.local_inv_needed++;
        rc = ib_unmap_fmr(&unmap_list);
        if (rc)
@@ -281,7 +283,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        /* ORDER: Now DMA unmap all of the req's MRs, and return
         * them to the free MW list.
         */
-       list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
+       list_for_each_entry_safe(mw, tmp, mws, mw_list) {
                list_del_init(&mw->mw_list);
                list_del_init(&mw->fmr.fm_mr->list);
                ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
@@ -294,7 +296,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 out_reset:
        pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
 
-       list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
+       list_for_each_entry_safe(mw, tmp, mws, mw_list) {
                list_del_init(&mw->mw_list);
                list_del_init(&mw->fmr.fm_mr->list);
                fmr_op_recover_mr(mw);
index 31290cb..97f9f85 100644 (file)
@@ -458,10 +458,11 @@ out_senderr:
  * Sleeps until it is safe for the host CPU to access the
  * previously mapped memory regions.
  *
- * Caller ensures that req->rl_registered is not empty.
+ * Caller ensures that @mws is not empty before the call. This
+ * function empties the list.
  */
 static void
-frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
 {
        struct ib_send_wr *first, **prev, *last, *bad_wr;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
@@ -469,9 +470,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        struct rpcrdma_mw *mw;
        int count, rc;
 
-       dprintk("RPC:       %s: req %p\n", __func__, req);
-
-       /* ORDER: Invalidate all of the req's MRs first
+       /* ORDER: Invalidate all of the MRs first
         *
         * Chain the LOCAL_INV Work Requests and post them with
         * a single ib_post_send() call.
@@ -479,7 +478,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        f = NULL;
        count = 0;
        prev = &first;
-       list_for_each_entry(mw, &req->rl_registered, mw_list) {
+       list_for_each_entry(mw, mws, mw_list) {
                mw->frmr.fr_state = FRMR_IS_INVALID;
 
                if (mw->mw_flags & RPCRDMA_MW_F_RI)
@@ -528,12 +527,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 
        wait_for_completion(&f->fr_linv_done);
 
-       /* ORDER: Now DMA unmap all of the req's MRs, and return
+       /* ORDER: Now DMA unmap all of the MRs, and return
         * them to the free MW list.
         */
 unmap:
-       while (!list_empty(&req->rl_registered)) {
-               mw = rpcrdma_pop_mw(&req->rl_registered);
+       while (!list_empty(mws)) {
+               mw = rpcrdma_pop_mw(mws);
                dprintk("RPC:       %s: DMA unmapping frmr %p\n",
                        __func__, &mw->frmr);
                ib_dma_unmap_sg(ia->ri_device,
@@ -549,7 +548,7 @@ reset_mrs:
        /* Find and reset the MRs in the LOCAL_INV WRs that did not
         * get posted. This is synchronous, and slow.
         */
-       list_for_each_entry(mw, &req->rl_registered, mw_list) {
+       list_for_each_entry(mw, mws, mw_list) {
                f = &mw->frmr;
                if (mw->mw_handle == bad_wr->ex.invalidate_rkey) {
                        __frwr_reset_mr(ia, mw);
index 2356a63..c88132d 100644 (file)
@@ -995,6 +995,7 @@ rpcrdma_reply_handler(struct work_struct *work)
        __be32 *iptr;
        int rdmalen, status, rmerr;
        unsigned long cwnd;
+       struct list_head mws;
 
        dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 
@@ -1024,7 +1025,8 @@ rpcrdma_reply_handler(struct work_struct *work)
        /* Sanity checking has passed. We are now committed
         * to complete this transaction.
         */
-       rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
+       list_replace_init(&req->rl_registered, &mws);
+       rpcrdma_mark_remote_invalidation(&mws, rep);
        list_del_init(&rqst->rq_list);
        req->rl_reply = rep;
        spin_unlock_bh(&xprt->transport_lock);
@@ -1042,12 +1044,9 @@ rpcrdma_reply_handler(struct work_struct *work)
        case rdma_msg:
                /* never expect read chunks */
                /* never expect reply chunks (two ways to check) */
-               /* never expect write chunks without having offered RDMA */
                if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
                    (headerp->rm_body.rm_chunks[1] == xdr_zero &&
-                    headerp->rm_body.rm_chunks[2] != xdr_zero) ||
-                   (headerp->rm_body.rm_chunks[1] != xdr_zero &&
-                    list_empty(&req->rl_registered)))
+                    headerp->rm_body.rm_chunks[2] != xdr_zero))
                        goto badheader;
                if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
                        /* count any expected write chunks in read reply */
@@ -1084,8 +1083,7 @@ rpcrdma_reply_handler(struct work_struct *work)
                /* never expect read or write chunks, always reply chunks */
                if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
                    headerp->rm_body.rm_chunks[1] != xdr_zero ||
-                   headerp->rm_body.rm_chunks[2] != xdr_one ||
-                   list_empty(&req->rl_registered))
+                   headerp->rm_body.rm_chunks[2] != xdr_one)
                        goto badheader;
                iptr = (__be32 *)((unsigned char *)headerp +
                                                        RPCRDMA_HDRLEN_MIN);
@@ -1118,8 +1116,8 @@ out:
         * control: waking the next RPC waits until this RPC has
         * relinquished all its Send Queue entries.
         */
-       if (!list_empty(&req->rl_registered))
-               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
+       if (!list_empty(&mws))
+               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
 
        spin_lock_bh(&xprt->transport_lock);
        cwnd = xprt->cwnd;
index 2e02733..1c23117 100644 (file)
@@ -467,7 +467,7 @@ struct rpcrdma_memreg_ops {
                                  struct rpcrdma_mr_seg *, int, bool,
                                  struct rpcrdma_mw **);
        void            (*ro_unmap_sync)(struct rpcrdma_xprt *,
-                                        struct rpcrdma_req *);
+                                        struct list_head *);
        void            (*ro_unmap_safe)(struct rpcrdma_xprt *,
                                         struct rpcrdma_req *, bool);
        void            (*ro_recover_mr)(struct rpcrdma_mw *);