SUNRPC: Convert svc_tcp_sendmsg to use bio_vecs directly
authorChuck Lever <chuck.lever@oracle.com>
Wed, 19 Jul 2023 18:31:03 +0000 (14:31 -0400)
committerChuck Lever <chuck.lever@oracle.com>
Tue, 29 Aug 2023 21:45:22 +0000 (17:45 -0400)
Add a helper to convert a whole xdr_buf directly into an array of
bio_vecs, then send this array instead of iterating piecemeal over
the xdr_buf containing the outbound RPC message.

Reviewed-by: David Howells <dhowells@redhat.com>
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
include/linux/sunrpc/xdr.h
net/sunrpc/svcsock.c
net/sunrpc/xdr.c

index f89ec4b..42f9d7e 100644 (file)
@@ -139,6 +139,8 @@ void        xdr_terminate_string(const struct xdr_buf *, const u32);
 size_t xdr_buf_pagecount(const struct xdr_buf *buf);
 int    xdr_alloc_bvec(struct xdr_buf *buf, gfp_t gfp);
 void   xdr_free_bvec(struct xdr_buf *buf);
+unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
+                            const struct xdr_buf *xdr);
 
 static inline __be32 *xdr_encode_array(__be32 *p, const void *s, unsigned int len)
 {
index 589020e..90b1ab9 100644 (file)
@@ -36,6 +36,8 @@
 #include <linux/skbuff.h>
 #include <linux/file.h>
 #include <linux/freezer.h>
+#include <linux/bvec.h>
+
 #include <net/sock.h>
 #include <net/checksum.h>
 #include <net/ip.h>
@@ -1194,77 +1196,52 @@ err_noclose:
        return 0;       /* record not complete */
 }
 
-static int svc_tcp_send_kvec(struct socket *sock, const struct kvec *vec,
-                             int flags)
-{
-       struct msghdr msg = { .msg_flags = MSG_SPLICE_PAGES | flags, };
-
-       iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, vec, 1, vec->iov_len);
-       return sock_sendmsg(sock, &msg);
-}
-
 /*
  * MSG_SPLICE_PAGES is used exclusively to reduce the number of
  * copy operations in this path. Therefore the caller must ensure
  * that the pages backing @xdr are unchanging.
  *
- * In addition, the logic assumes that * .bv_len is never larger
- * than PAGE_SIZE.
+ * Note that the send is non-blocking. The caller has incremented
+ * the reference count on each page backing the RPC message, and
+ * the network layer will "put" these pages when transmission is
+ * complete.
+ *
+ * This is safe for our RPC services because the memory backing
+ * the head and tail components is never kmalloc'd. These always
+ * come from pages in the svc_rqst::rq_pages array.
  */
-static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
+static int svc_tcp_sendmsg(struct svc_sock *svsk, struct svc_rqst *rqstp,
                           rpc_fraghdr marker, unsigned int *sentp)
 {
-       const struct kvec *head = xdr->head;
-       const struct kvec *tail = xdr->tail;
        struct kvec rm = {
                .iov_base       = &marker,
                .iov_len        = sizeof(marker),
        };
        struct msghdr msg = {
-               .msg_flags      = 0,
+               .msg_flags      = MSG_MORE,
        };
+       unsigned int count;
        int ret;
 
        *sentp = 0;
-       ret = xdr_alloc_bvec(xdr, GFP_KERNEL);
-       if (ret < 0)
-               return ret;
 
-       ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
+       ret = kernel_sendmsg(svsk->sk_sock, &msg, &rm, 1, rm.iov_len);
        if (ret < 0)
                return ret;
        *sentp += ret;
        if (ret != rm.iov_len)
                return -EAGAIN;
 
-       ret = svc_tcp_send_kvec(sock, head, 0);
-       if (ret < 0)
-               return ret;
-       *sentp += ret;
-       if (ret != head->iov_len)
-               goto out;
-
-       if (xdr_buf_pagecount(xdr)) {
-               xdr->bvec[0].bv_offset = offset_in_page(xdr->page_base);
-               xdr->bvec[0].bv_len -= offset_in_page(xdr->page_base);
-       }
+       count = xdr_buf_to_bvec(rqstp->rq_bvec, ARRAY_SIZE(rqstp->rq_bvec),
+                               &rqstp->rq_res);
 
        msg.msg_flags = MSG_SPLICE_PAGES;
-       iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, xdr->bvec,
-                     xdr_buf_pagecount(xdr), xdr->page_len);
-       ret = sock_sendmsg(sock, &msg);
+       iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, rqstp->rq_bvec,
+                     count, rqstp->rq_res.len);
+       ret = sock_sendmsg(svsk->sk_sock, &msg);
        if (ret < 0)
                return ret;
        *sentp += ret;
-
-       if (tail->iov_len) {
-               ret = svc_tcp_send_kvec(sock, tail, 0);
-               if (ret < 0)
-                       return ret;
-               *sentp += ret;
-       }
-
-out:
        return 0;
 }
 
@@ -1295,8 +1272,7 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
        if (svc_xprt_is_dead(xprt))
                goto out_notconn;
        tcp_sock_set_cork(svsk->sk_sk, true);
-       err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
-       xdr_free_bvec(xdr);
+       err = svc_tcp_sendmsg(svsk, rqstp, marker, &sent);
        trace_svcsock_tcp_send(xprt, err < 0 ? (long)err : sent);
        if (err < 0 || sent != (xdr->len + sizeof(marker)))
                goto out_close;
index 2a22e78..358e6de 100644 (file)
@@ -165,6 +165,56 @@ xdr_free_bvec(struct xdr_buf *buf)
 }
 
 /**
+ * xdr_buf_to_bvec - Copy components of an xdr_buf into a bio_vec array
+ * @bvec: bio_vec array to populate
+ * @bvec_size: element count of @bio_vec
+ * @xdr: xdr_buf to be copied
+ *
+ * Returns the number of entries consumed in @bvec.
+ */
+unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
+                            const struct xdr_buf *xdr)
+{
+       const struct kvec *head = xdr->head;
+       const struct kvec *tail = xdr->tail;
+       unsigned int count = 0;
+
+       if (head->iov_len) {
+               bvec_set_virt(bvec++, head->iov_base, head->iov_len);
+               ++count;
+       }
+
+       if (xdr->page_len) {
+               unsigned int offset, len, remaining;
+               struct page **pages = xdr->pages;
+
+               offset = offset_in_page(xdr->page_base);
+               remaining = xdr->page_len;
+               while (remaining > 0) {
+                       len = min_t(unsigned int, remaining,
+                                   PAGE_SIZE - offset);
+                       bvec_set_page(bvec++, *pages++, len, offset);
+                       remaining -= len;
+                       offset = 0;
+                       if (unlikely(++count > bvec_size))
+                               goto bvec_overflow;
+               }
+       }
+
+       if (tail->iov_len) {
+               bvec_set_virt(bvec, tail->iov_base, tail->iov_len);
+               if (unlikely(++count > bvec_size))
+                       goto bvec_overflow;
+       }
+
+       return count;
+
+bvec_overflow:
+       pr_warn_once("%s: bio_vec array overflow\n", __func__);
+       return count - 1;
+}
+
+/**
  * xdr_inline_pages - Prepare receive buffer for a large reply
  * @xdr: xdr_buf into which reply will be placed
  * @offset: expected offset where data payload will start, in bytes