SUNRPC: Dequeue the request from the receive queue while we're re-encoding
authorTrond Myklebust <trondmy@gmail.com>
Tue, 10 Sep 2019 17:01:35 +0000 (13:01 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Tue, 17 Sep 2019 19:14:11 +0000 (15:14 -0400)
Ensure that we dequeue the request from the transport receive queue
while we're re-encoding to prevent issues like use-after-free when
we release the bvec.

Fixes: 7536908982047 ("SUNRPC: Ensure the bvecs are reset when we re-encode...")
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
Cc: stable@vger.kernel.org # v4.20+
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
include/linux/sunrpc/xprt.h
net/sunrpc/clnt.c
net/sunrpc/xprt.c

index 13e108b..d783e15 100644 (file)
@@ -352,6 +352,7 @@ bool                        xprt_prepare_transmit(struct rpc_task *task);
 void                   xprt_request_enqueue_transmit(struct rpc_task *task);
 void                   xprt_request_enqueue_receive(struct rpc_task *task);
 void                   xprt_request_wait_receive(struct rpc_task *task);
+void                   xprt_request_dequeue_xprt(struct rpc_task *task);
 bool                   xprt_request_need_retransmit(struct rpc_task *task);
 void                   xprt_transmit(struct rpc_task *task);
 void                   xprt_end_transmit(struct rpc_task *task);
index d8679b6..0359466 100644 (file)
@@ -1862,6 +1862,7 @@ rpc_xdr_encode(struct rpc_task *task)
                     req->rq_rbuffer,
                     req->rq_rcvsize);
 
+       req->rq_reply_bytes_recvd = 0;
        req->rq_snd_buf.head[0].iov_len = 0;
        xdr_init_encode(&xdr, &req->rq_snd_buf,
                        req->rq_snd_buf.head[0].iov_base, req);
@@ -1881,6 +1882,8 @@ call_encode(struct rpc_task *task)
        if (!rpc_task_need_encode(task))
                goto out;
        dprint_status(task);
+       /* Dequeue task from the receive queue while we're encoding */
+       xprt_request_dequeue_xprt(task);
        /* Encode here so that rpcsec_gss can use correct sequence number. */
        rpc_xdr_encode(task);
        /* Did the encode result in an error condition? */
@@ -2501,9 +2504,6 @@ call_decode(struct rpc_task *task)
                return;
        case -EAGAIN:
                task->tk_status = 0;
-               xdr_free_bvec(&req->rq_rcv_buf);
-               req->rq_reply_bytes_recvd = 0;
-               req->rq_rcv_buf.len = 0;
                if (task->tk_client->cl_discrtry)
                        xprt_conditional_disconnect(req->rq_xprt,
                                                    req->rq_connect_cookie);
index 783748d..02d5b21 100644 (file)
@@ -1324,6 +1324,36 @@ xprt_request_dequeue_transmit(struct rpc_task *task)
 }
 
 /**
+ * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmit and receive queues, and ensure that
+ * it is not pinned by the receive work item.
+ */
+void
+xprt_request_dequeue_xprt(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
+           test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
+           xprt_is_pinned_rqst(req)) {
+               spin_lock(&xprt->queue_lock);
+               xprt_request_dequeue_transmit_locked(task);
+               xprt_request_dequeue_receive_locked(task);
+               while (xprt_is_pinned_rqst(req)) {
+                       set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+                       spin_unlock(&xprt->queue_lock);
+                       xprt_wait_on_pinned_rqst(req);
+                       spin_lock(&xprt->queue_lock);
+                       clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+               }
+               spin_unlock(&xprt->queue_lock);
+       }
+}
+
+/**
  * xprt_request_prepare - prepare an encoded request for transport
  * @req: pointer to rpc_rqst
  *
@@ -1754,28 +1784,6 @@ void xprt_retry_reserve(struct rpc_task *task)
        xprt_do_reserve(xprt, task);
 }
 
-static void
-xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req)
-{
-       struct rpc_xprt *xprt = req->rq_xprt;
-
-       if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
-           test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
-           xprt_is_pinned_rqst(req)) {
-               spin_lock(&xprt->queue_lock);
-               xprt_request_dequeue_transmit_locked(task);
-               xprt_request_dequeue_receive_locked(task);
-               while (xprt_is_pinned_rqst(req)) {
-                       set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
-                       spin_unlock(&xprt->queue_lock);
-                       xprt_wait_on_pinned_rqst(req);
-                       spin_lock(&xprt->queue_lock);
-                       clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
-               }
-               spin_unlock(&xprt->queue_lock);
-       }
-}
-
 /**
  * xprt_release - release an RPC request slot
  * @task: task which is finished with the slot
@@ -1795,7 +1803,7 @@ void xprt_release(struct rpc_task *task)
        }
 
        xprt = req->rq_xprt;
-       xprt_request_dequeue_all(task, req);
+       xprt_request_dequeue_xprt(task);
        spin_lock(&xprt->transport_lock);
        xprt->ops->release_xprt(xprt, task);
        if (xprt->ops->release_request)