Merge tag 'nfsd-6.6' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux
[platform/kernel/linux-rpi.git] / net / sunrpc / svcsock.c
index 8c9a8ee..9986874 100644 (file)
@@ -36,6 +36,8 @@
 #include <linux/skbuff.h>
 #include <linux/file.h>
 #include <linux/freezer.h>
+#include <linux/bvec.h>
+
 #include <net/sock.h>
 #include <net/checksum.h>
 #include <net/ip.h>
@@ -695,9 +697,10 @@ static int svc_udp_sendto(struct svc_rqst *rqstp)
                .msg_name       = &rqstp->rq_addr,
                .msg_namelen    = rqstp->rq_addrlen,
                .msg_control    = cmh,
+               .msg_flags      = MSG_SPLICE_PAGES,
                .msg_controllen = sizeof(buffer),
        };
-       unsigned int sent;
+       unsigned int count;
        int err;
 
        svc_udp_release_ctxt(xprt, rqstp->rq_xprt_ctxt);
@@ -710,22 +713,23 @@ static int svc_udp_sendto(struct svc_rqst *rqstp)
        if (svc_xprt_is_dead(xprt))
                goto out_notconn;
 
-       err = xdr_alloc_bvec(xdr, GFP_KERNEL);
-       if (err < 0)
-               goto out_unlock;
+       count = xdr_buf_to_bvec(rqstp->rq_bvec,
+                               ARRAY_SIZE(rqstp->rq_bvec), xdr);
 
-       err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
+       iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, rqstp->rq_bvec,
+                     count, 0);
+       err = sock_sendmsg(svsk->sk_sock, &msg);
        if (err == -ECONNREFUSED) {
                /* ICMP error on earlier request. */
-               err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
+               iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, rqstp->rq_bvec,
+                             count, 0);
+               err = sock_sendmsg(svsk->sk_sock, &msg);
        }
-       xdr_free_bvec(xdr);
+
        trace_svcsock_udp_send(xprt, err);
-out_unlock:
+
        mutex_unlock(&xprt->xpt_mutex);
-       if (err < 0)
-               return err;
-       return sent;
+       return err;
 
 out_notconn:
        mutex_unlock(&xprt->xpt_mutex);
@@ -1089,6 +1093,9 @@ static void svc_tcp_fragment_received(struct svc_sock *svsk)
        /* If we have more data, signal svc_xprt_enqueue() to try again */
        svsk->sk_tcplen = 0;
        svsk->sk_marker = xdr_zero;
+
+       smp_wmb();
+       tcp_set_rcvlowat(svsk->sk_sk, 1);
 }
 
 /**
@@ -1178,10 +1185,17 @@ err_incomplete:
                goto err_delete;
        if (len == want)
                svc_tcp_fragment_received(svsk);
-       else
+       else {
+               /* Avoid more ->sk_data_ready() calls until the rest
+                * of the message has arrived. This reduces service
+                * thread wake-ups on large incoming messages. */
+               tcp_set_rcvlowat(svsk->sk_sk,
+                                svc_sock_reclen(svsk) - svsk->sk_tcplen);
+
                trace_svcsock_tcp_recv_short(&svsk->sk_xprt,
                                svc_sock_reclen(svsk),
                                svsk->sk_tcplen - sizeof(rpc_fraghdr));
+       }
        goto err_noclose;
 error:
        if (len != -EAGAIN)
@@ -1198,75 +1212,51 @@ err_noclose:
        return 0;       /* record not complete */
 }
 
-static int svc_tcp_send_kvec(struct socket *sock, const struct kvec *vec,
-                             int flags)
-{
-       struct msghdr msg = { .msg_flags = MSG_SPLICE_PAGES | flags, };
-
-       iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, vec, 1, vec->iov_len);
-       return sock_sendmsg(sock, &msg);
-}
-
 /*
  * MSG_SPLICE_PAGES is used exclusively to reduce the number of
  * copy operations in this path. Therefore the caller must ensure
  * that the pages backing @xdr are unchanging.
  *
- * In addition, the logic assumes that * .bv_len is never larger
- * than PAGE_SIZE.
+ * Note that the send is non-blocking. The caller has incremented
+ * the reference count on each page backing the RPC message, and
+ * the network layer will "put" these pages when transmission is
+ * complete.
+ *
+ * This is safe for our RPC services because the memory backing
+ * the head and tail components is never kmalloc'd. These always
+ * come from pages in the svc_rqst::rq_pages array.
  */
-static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
+static int svc_tcp_sendmsg(struct svc_sock *svsk, struct svc_rqst *rqstp,
                           rpc_fraghdr marker, unsigned int *sentp)
 {
-       const struct kvec *head = xdr->head;
-       const struct kvec *tail = xdr->tail;
-       struct kvec rm = {
-               .iov_base       = &marker,
-               .iov_len        = sizeof(marker),
-       };
        struct msghdr msg = {
-               .msg_flags      = 0,
+               .msg_flags      = MSG_SPLICE_PAGES,
        };
+       unsigned int count;
+       void *buf;
        int ret;
 
        *sentp = 0;
-       ret = xdr_alloc_bvec(xdr, GFP_KERNEL);
-       if (ret < 0)
-               return ret;
-
-       ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
-       if (ret < 0)
-               return ret;
-       *sentp += ret;
-       if (ret != rm.iov_len)
-               return -EAGAIN;
-
-       ret = svc_tcp_send_kvec(sock, head, 0);
-       if (ret < 0)
-               return ret;
-       *sentp += ret;
-       if (ret != head->iov_len)
-               goto out;
 
-       if (xdr_buf_pagecount(xdr))
-               xdr->bvec[0].bv_offset = offset_in_page(xdr->page_base);
-
-       msg.msg_flags = MSG_SPLICE_PAGES;
-       iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, xdr->bvec,
-                     xdr_buf_pagecount(xdr), xdr->page_len);
-       ret = sock_sendmsg(sock, &msg);
+       /* The stream record marker is copied into a temporary page
+        * fragment buffer so that it can be included in rq_bvec.
+        */
+       buf = page_frag_alloc(&svsk->sk_frag_cache, sizeof(marker),
+                             GFP_KERNEL);
+       if (!buf)
+               return -ENOMEM;
+       memcpy(buf, &marker, sizeof(marker));
+       bvec_set_virt(rqstp->rq_bvec, buf, sizeof(marker));
+
+       count = xdr_buf_to_bvec(rqstp->rq_bvec + 1,
+                               ARRAY_SIZE(rqstp->rq_bvec) - 1, &rqstp->rq_res);
+
+       iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, rqstp->rq_bvec,
+                     1 + count, sizeof(marker) + rqstp->rq_res.len);
+       ret = sock_sendmsg(svsk->sk_sock, &msg);
        if (ret < 0)
                return ret;
        *sentp += ret;
-
-       if (tail->iov_len) {
-               ret = svc_tcp_send_kvec(sock, tail, 0);
-               if (ret < 0)
-                       return ret;
-               *sentp += ret;
-       }
-
-out:
        return 0;
 }
 
@@ -1292,23 +1282,17 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
        svc_tcp_release_ctxt(xprt, rqstp->rq_xprt_ctxt);
        rqstp->rq_xprt_ctxt = NULL;
 
-       atomic_inc(&svsk->sk_sendqlen);
        mutex_lock(&xprt->xpt_mutex);
        if (svc_xprt_is_dead(xprt))
                goto out_notconn;
-       tcp_sock_set_cork(svsk->sk_sk, true);
-       err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
-       xdr_free_bvec(xdr);
+       err = svc_tcp_sendmsg(svsk, rqstp, marker, &sent);
        trace_svcsock_tcp_send(xprt, err < 0 ? (long)err : sent);
        if (err < 0 || sent != (xdr->len + sizeof(marker)))
                goto out_close;
-       if (atomic_dec_and_test(&svsk->sk_sendqlen))
-               tcp_sock_set_cork(svsk->sk_sk, false);
        mutex_unlock(&xprt->xpt_mutex);
        return sent;
 
 out_notconn:
-       atomic_dec(&svsk->sk_sendqlen);
        mutex_unlock(&xprt->xpt_mutex);
        return -ENOTCONN;
 out_close:
@@ -1317,7 +1301,6 @@ out_close:
                  (err < 0) ? "got error" : "sent",
                  (err < 0) ? err : sent, xdr->len);
        svc_xprt_deferred_close(xprt);
-       atomic_dec(&svsk->sk_sendqlen);
        mutex_unlock(&xprt->xpt_mutex);
        return -EAGAIN;
 }
@@ -1644,6 +1627,7 @@ static void svc_tcp_sock_detach(struct svc_xprt *xprt)
 static void svc_sock_free(struct svc_xprt *xprt)
 {
        struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+       struct page_frag_cache *pfc = &svsk->sk_frag_cache;
        struct socket *sock = svsk->sk_sock;
 
        trace_svcsock_free(svsk, sock);
@@ -1653,5 +1637,8 @@ static void svc_sock_free(struct svc_xprt *xprt)
                sockfd_put(sock);
        else
                sock_release(sock);
+       if (pfc->va)
+               __page_frag_cache_drain(virt_to_head_page(pfc->va),
+                                       pfc->pagecnt_bias);
        kfree(svsk);
 }