Merge tag 'nfsd-5.8' of git://linux-nfs.org/~bfields/linux
[platform/kernel/linux-starfive.git] / net / sunrpc / svcsock.c
index e7a0037..5c4ec93 100644 (file)
@@ -45,7 +45,6 @@
 #include <net/tcp_states.h>
 #include <linux/uaccess.h>
 #include <asm/ioctls.h>
-#include <trace/events/skb.h>
 
 #include <linux/sunrpc/types.h>
 #include <linux/sunrpc/clnt.h>
@@ -55,6 +54,8 @@
 #include <linux/sunrpc/stats.h>
 #include <linux/sunrpc/xprt.h>
 
+#include <trace/events/sunrpc.h>
+
 #include "socklib.h"
 #include "sunrpc.h"
 
@@ -108,31 +109,35 @@ static void svc_reclassify_socket(struct socket *sock)
 }
 #endif
 
-/*
- * Release an skbuff after use
+/**
+ * svc_tcp_release_rqst - Release transport-related resources
+ * @rqstp: request structure with resources to be released
+ *
  */
-static void svc_release_skb(struct svc_rqst *rqstp)
+static void svc_tcp_release_rqst(struct svc_rqst *rqstp)
 {
        struct sk_buff *skb = rqstp->rq_xprt_ctxt;
 
        if (skb) {
                struct svc_sock *svsk =
                        container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
-               rqstp->rq_xprt_ctxt = NULL;
 
-               dprintk("svc: service %p, releasing skb %p\n", rqstp, skb);
+               rqstp->rq_xprt_ctxt = NULL;
                skb_free_datagram_locked(svsk->sk_sk, skb);
        }
 }
 
-static void svc_release_udp_skb(struct svc_rqst *rqstp)
+/**
+ * svc_udp_release_rqst - Release transport-related resources
+ * @rqstp: request structure with resources to be released
+ *
+ */
+static void svc_udp_release_rqst(struct svc_rqst *rqstp)
 {
        struct sk_buff *skb = rqstp->rq_xprt_ctxt;
 
        if (skb) {
                rqstp->rq_xprt_ctxt = NULL;
-
-               dprintk("svc: service %p, releasing skb %p\n", rqstp, skb);
                consume_skb(skb);
        }
 }
@@ -218,34 +223,68 @@ static int svc_one_sock_name(struct svc_sock *svsk, char *buf, int remaining)
        return len;
 }
 
+#if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE
+static void svc_flush_bvec(const struct bio_vec *bvec, size_t size, size_t seek)
+{
+       struct bvec_iter bi = {
+               .bi_size        = size,
+       };
+       struct bio_vec bv;
+
+       bvec_iter_advance(bvec, &bi, seek & PAGE_MASK);
+       for_each_bvec(bv, bvec, bi, bi)
+               flush_dcache_page(bv.bv_page);
+}
+#else
+static inline void svc_flush_bvec(const struct bio_vec *bvec, size_t size,
+                                 size_t seek)
+{
+}
+#endif
+
 /*
- * Generic recvfrom routine.
+ * Read from @rqstp's transport socket. The incoming message fills whole
+ * pages in @rqstp's rq_pages array until the last page of the message
+ * has been received into a partial page.
  */
-static ssize_t svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov,
-                           unsigned int nr, size_t buflen, unsigned int base)
+static ssize_t svc_tcp_read_msg(struct svc_rqst *rqstp, size_t buflen,
+                               size_t seek)
 {
        struct svc_sock *svsk =
                container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+       struct bio_vec *bvec = rqstp->rq_bvec;
        struct msghdr msg = { NULL };
+       unsigned int i;
        ssize_t len;
+       size_t t;
 
        rqstp->rq_xprt_hlen = 0;
 
        clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
-       iov_iter_kvec(&msg.msg_iter, READ, iov, nr, buflen);
-       if (base != 0) {
-               iov_iter_advance(&msg.msg_iter, base);
-               buflen -= base;
+
+       for (i = 0, t = 0; t < buflen; i++, t += PAGE_SIZE) {
+               bvec[i].bv_page = rqstp->rq_pages[i];
+               bvec[i].bv_len = PAGE_SIZE;
+               bvec[i].bv_offset = 0;
+       }
+       rqstp->rq_respages = &rqstp->rq_pages[i];
+       rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+       iov_iter_bvec(&msg.msg_iter, READ, bvec, i, buflen);
+       if (seek) {
+               iov_iter_advance(&msg.msg_iter, seek);
+               buflen -= seek;
        }
        len = sock_recvmsg(svsk->sk_sock, &msg, MSG_DONTWAIT);
+       if (len > 0)
+               svc_flush_bvec(bvec, len, seek);
+
        /* If we read a full record, then assume there may be more
         * data to read (stream based sockets only!)
         */
        if (len == buflen)
                set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 
-       dprintk("svc: socket %p recvfrom(%p, %zu) = %zd\n",
-               svsk, iov[0].iov_base, iov[0].iov_len, len);
        return len;
 }
 
@@ -282,13 +321,10 @@ static void svc_data_ready(struct sock *sk)
        struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
 
        if (svsk) {
-               dprintk("svc: socket %p(inet %p), busy=%d\n",
-                       svsk, sk,
-                       test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags));
-
                /* Refer to svc_setup_socket() for details. */
                rmb();
                svsk->sk_odata(sk);
+               trace_svcsock_data_ready(&svsk->sk_xprt, 0);
                if (!test_and_set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags))
                        svc_xprt_enqueue(&svsk->sk_xprt);
        }
@@ -302,11 +338,9 @@ static void svc_write_space(struct sock *sk)
        struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data);
 
        if (svsk) {
-               dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
-                       svsk, sk, test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags));
-
                /* Refer to svc_setup_socket() for details. */
                rmb();
+               trace_svcsock_write_space(&svsk->sk_xprt, 0);
                svsk->sk_owspace(sk);
                svc_xprt_enqueue(&svsk->sk_xprt);
        }
@@ -383,8 +417,15 @@ static int svc_udp_get_dest_address(struct svc_rqst *rqstp,
        return 0;
 }
 
-/*
- * Receive a datagram from a UDP socket.
+/**
+ * svc_udp_recvfrom - Receive a datagram from a UDP socket.
+ * @rqstp: request structure into which to receive an RPC Call
+ *
+ * Called in a loop when XPT_DATA has been set.
+ *
+ * Returns:
+ *   On success, the number of bytes in a received RPC Call, or
+ *   %0 if a complete RPC Call message was not ready to return
  */
 static int svc_udp_recvfrom(struct svc_rqst *rqstp)
 {
@@ -418,20 +459,14 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
            svc_sock_setbufsize(svsk, serv->sv_nrthreads + 3);
 
        clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
-       skb = NULL;
        err = kernel_recvmsg(svsk->sk_sock, &msg, NULL,
                             0, 0, MSG_PEEK | MSG_DONTWAIT);
-       if (err >= 0)
-               skb = skb_recv_udp(svsk->sk_sk, 0, 1, &err);
-
-       if (skb == NULL) {
-               if (err != -EAGAIN) {
-                       /* possibly an icmp error */
-                       dprintk("svc: recvfrom returned error %d\n", -err);
-                       set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
-               }
-               return 0;
-       }
+       if (err < 0)
+               goto out_recv_err;
+       skb = skb_recv_udp(svsk->sk_sk, 0, 1, &err);
+       if (!skb)
+               goto out_recv_err;
+
        len = svc_addr_len(svc_addr(rqstp));
        rqstp->rq_addrlen = len;
        if (skb->tstamp == 0) {
@@ -442,26 +477,21 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
        sock_write_timestamp(svsk->sk_sk, skb->tstamp);
        set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); /* there may be more data... */
 
-       len  = skb->len;
+       len = skb->len;
        rqstp->rq_arg.len = len;
+       trace_svcsock_udp_recv(&svsk->sk_xprt, len);
 
        rqstp->rq_prot = IPPROTO_UDP;
 
-       if (!svc_udp_get_dest_address(rqstp, cmh)) {
-               net_warn_ratelimited("svc: received unknown control message %d/%d; dropping RPC reply datagram\n",
-                                    cmh->cmsg_level, cmh->cmsg_type);
-               goto out_free;
-       }
+       if (!svc_udp_get_dest_address(rqstp, cmh))
+               goto out_cmsg_err;
        rqstp->rq_daddrlen = svc_addr_len(svc_daddr(rqstp));
 
        if (skb_is_nonlinear(skb)) {
                /* we have to copy */
                local_bh_disable();
-               if (csum_partial_copy_to_xdr(&rqstp->rq_arg, skb)) {
-                       local_bh_enable();
-                       /* checksum error */
-                       goto out_free;
-               }
+               if (csum_partial_copy_to_xdr(&rqstp->rq_arg, skb))
+                       goto out_bh_enable;
                local_bh_enable();
                consume_skb(skb);
        } else {
@@ -489,6 +519,20 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
                serv->sv_stats->netudpcnt++;
 
        return len;
+
+out_recv_err:
+       if (err != -EAGAIN) {
+               /* possibly an icmp error */
+               set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+       }
+       trace_svcsock_udp_recv_err(&svsk->sk_xprt, err);
+       return 0;
+out_cmsg_err:
+       net_warn_ratelimited("svc: received unknown control message %d/%d; dropping RPC reply datagram\n",
+                            cmh->cmsg_level, cmh->cmsg_type);
+       goto out_free;
+out_bh_enable:
+       local_bh_enable();
 out_free:
        kfree_skb(skb);
        return 0;
@@ -498,6 +542,9 @@ out_free:
  * svc_udp_sendto - Send out a reply on a UDP socket
  * @rqstp: completed svc_rqst
  *
+ * xpt_mutex ensures @rqstp's whole message is written to the socket
+ * without interruption.
+ *
  * Returns the number of bytes sent, or a negative errno.
  */
 static int svc_udp_sendto(struct svc_rqst *rqstp)
@@ -519,10 +566,15 @@ static int svc_udp_sendto(struct svc_rqst *rqstp)
        unsigned int uninitialized_var(sent);
        int err;
 
-       svc_release_udp_skb(rqstp);
+       svc_udp_release_rqst(rqstp);
 
        svc_set_cmsg_data(rqstp, cmh);
 
+       mutex_lock(&xprt->xpt_mutex);
+
+       if (svc_xprt_is_dead(xprt))
+               goto out_notconn;
+
        err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
        xdr_free_bvec(xdr);
        if (err == -ECONNREFUSED) {
@@ -530,9 +582,16 @@ static int svc_udp_sendto(struct svc_rqst *rqstp)
                err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
                xdr_free_bvec(xdr);
        }
+       trace_svcsock_udp_send(xprt, err);
+
+       mutex_unlock(&xprt->xpt_mutex);
        if (err < 0)
                return err;
        return sent;
+
+out_notconn:
+       mutex_unlock(&xprt->xpt_mutex);
+       return -ENOTCONN;
 }
 
 static int svc_udp_has_wspace(struct svc_xprt *xprt)
@@ -576,7 +635,7 @@ static const struct svc_xprt_ops svc_udp_ops = {
        .xpo_recvfrom = svc_udp_recvfrom,
        .xpo_sendto = svc_udp_sendto,
        .xpo_read_payload = svc_sock_read_payload,
-       .xpo_release_rqst = svc_release_udp_skb,
+       .xpo_release_rqst = svc_udp_release_rqst,
        .xpo_detach = svc_sock_detach,
        .xpo_free = svc_sock_free,
        .xpo_has_wspace = svc_udp_has_wspace,
@@ -632,9 +691,6 @@ static void svc_tcp_listen_data_ready(struct sock *sk)
 {
        struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
 
-       dprintk("svc: socket %p TCP (listen) state change %d\n",
-               sk, sk->sk_state);
-
        if (svsk) {
                /* Refer to svc_setup_socket() for details. */
                rmb();
@@ -655,8 +711,7 @@ static void svc_tcp_listen_data_ready(struct sock *sk)
                if (svsk) {
                        set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
                        svc_xprt_enqueue(&svsk->sk_xprt);
-               } else
-                       printk("svc: socket %p: no user data\n", sk);
+               }
        }
 }
 
@@ -667,15 +722,11 @@ static void svc_tcp_state_change(struct sock *sk)
 {
        struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
 
-       dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n",
-               sk, sk->sk_state, sk->sk_user_data);
-
-       if (!svsk)
-               printk("svc: socket %p: no user data\n", sk);
-       else {
+       if (svsk) {
                /* Refer to svc_setup_socket() for details. */
                rmb();
                svsk->sk_ostate(sk);
+               trace_svcsock_tcp_state(&svsk->sk_xprt, svsk->sk_sock);
                if (sk->sk_state != TCP_ESTABLISHED) {
                        set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
                        svc_xprt_enqueue(&svsk->sk_xprt);
@@ -696,9 +747,7 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)
        struct socket   *newsock;
        struct svc_sock *newsvsk;
        int             err, slen;
-       RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]);
 
-       dprintk("svc: tcp_accept %p sock %p\n", svsk, sock);
        if (!sock)
                return NULL;
 
@@ -711,30 +760,18 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)
                else if (err != -EAGAIN)
                        net_warn_ratelimited("%s: accept failed (err %d)!\n",
                                             serv->sv_name, -err);
+               trace_svcsock_accept_err(xprt, serv->sv_name, err);
                return NULL;
        }
        set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
 
        err = kernel_getpeername(newsock, sin);
        if (err < 0) {
-               net_warn_ratelimited("%s: peername failed (err %d)!\n",
-                                    serv->sv_name, -err);
+               trace_svcsock_getpeername_err(xprt, serv->sv_name, err);
                goto failed;            /* aborted connection or whatever */
        }
        slen = err;
 
-       /* Ideally, we would want to reject connections from unauthorized
-        * hosts here, but when we get encryption, the IP of the host won't
-        * tell us anything.  For now just warn about unpriv connections.
-        */
-       if (!svc_port_is_privileged(sin)) {
-               dprintk("%s: connect from unprivileged port: %s\n",
-                       serv->sv_name,
-                       __svc_print_addr(sin, buf, sizeof(buf)));
-       }
-       dprintk("%s: connect from %s\n", serv->sv_name,
-               __svc_print_addr(sin, buf, sizeof(buf)));
-
        /* Reset the inherited callbacks before calling svc_setup_socket */
        newsock->sk->sk_state_change = svsk->sk_ostate;
        newsock->sk->sk_data_ready = svsk->sk_odata;
@@ -752,10 +789,8 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)
        svc_xprt_set_remote(&newsvsk->sk_xprt, sin, slen);
        err = kernel_getsockname(newsock, sin);
        slen = err;
-       if (unlikely(err < 0)) {
-               dprintk("svc_tcp_accept: kernel_getsockname error %d\n", -err);
+       if (unlikely(err < 0))
                slen = offsetof(struct sockaddr, sa_data);
-       }
        svc_xprt_set_local(&newsvsk->sk_xprt, sin, slen);
 
        if (sock_is_loopback(newsock->sk))
@@ -772,13 +807,14 @@ failed:
        return NULL;
 }
 
-static unsigned int svc_tcp_restore_pages(struct svc_sock *svsk, struct svc_rqst *rqstp)
+static size_t svc_tcp_restore_pages(struct svc_sock *svsk,
+                                   struct svc_rqst *rqstp)
 {
-       unsigned int i, len, npages;
+       size_t len = svsk->sk_datalen;
+       unsigned int i, npages;
 
-       if (svsk->sk_datalen == 0)
+       if (!len)
                return 0;
-       len = svsk->sk_datalen;
        npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
        for (i = 0; i < npages; i++) {
                if (rqstp->rq_pages[i] != NULL)
@@ -827,47 +863,45 @@ out:
 }
 
 /*
- * Receive fragment record header.
- * If we haven't gotten the record length yet, get the next four bytes.
+ * Receive fragment record header into sk_marker.
  */
-static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp)
+static ssize_t svc_tcp_read_marker(struct svc_sock *svsk,
+                                  struct svc_rqst *rqstp)
 {
-       struct svc_serv *serv = svsk->sk_xprt.xpt_server;
-       unsigned int want;
-       int len;
+       ssize_t want, len;
 
+       /* If we haven't gotten the record length yet,
+        * get the next four bytes.
+        */
        if (svsk->sk_tcplen < sizeof(rpc_fraghdr)) {
+               struct msghdr   msg = { NULL };
                struct kvec     iov;
 
                want = sizeof(rpc_fraghdr) - svsk->sk_tcplen;
-               iov.iov_base = ((char *) &svsk->sk_reclen) + svsk->sk_tcplen;
+               iov.iov_base = ((char *)&svsk->sk_marker) + svsk->sk_tcplen;
                iov.iov_len  = want;
-               len = svc_recvfrom(rqstp, &iov, 1, want, 0);
+               iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, want);
+               len = sock_recvmsg(svsk->sk_sock, &msg, MSG_DONTWAIT);
                if (len < 0)
-                       goto error;
+                       return len;
                svsk->sk_tcplen += len;
-
                if (len < want) {
-                       dprintk("svc: short recvfrom while reading record "
-                               "length (%d of %d)\n", len, want);
-                       return -EAGAIN;
+                       /* call again to read the remaining bytes */
+                       goto err_short;
                }
-
-               dprintk("svc: TCP record, %d bytes\n", svc_sock_reclen(svsk));
+               trace_svcsock_marker(&svsk->sk_xprt, svsk->sk_marker);
                if (svc_sock_reclen(svsk) + svsk->sk_datalen >
-                                                       serv->sv_max_mesg) {
-                       net_notice_ratelimited("RPC: fragment too large: %d\n",
-                                       svc_sock_reclen(svsk));
-                       goto err_delete;
-               }
+                   svsk->sk_xprt.xpt_server->sv_max_mesg)
+                       goto err_too_large;
        }
-
        return svc_sock_reclen(svsk);
-error:
-       dprintk("RPC: TCP recv_record got %d\n", len);
-       return len;
-err_delete:
+
+err_too_large:
+       net_notice_ratelimited("svc: %s %s RPC fragment too large: %d\n",
+                              __func__, svsk->sk_xprt.xpt_server->sv_name,
+                              svc_sock_reclen(svsk));
        set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
+err_short:
        return -EAGAIN;
 }
 
@@ -916,87 +950,58 @@ unlock_eagain:
        return -EAGAIN;
 }
 
-static int copy_pages_to_kvecs(struct kvec *vec, struct page **pages, int len)
-{
-       int i = 0;
-       int t = 0;
-
-       while (t < len) {
-               vec[i].iov_base = page_address(pages[i]);
-               vec[i].iov_len = PAGE_SIZE;
-               i++;
-               t += PAGE_SIZE;
-       }
-       return i;
-}
-
 static void svc_tcp_fragment_received(struct svc_sock *svsk)
 {
        /* If we have more data, signal svc_xprt_enqueue() to try again */
-       dprintk("svc: TCP %s record (%d bytes)\n",
-               svc_sock_final_rec(svsk) ? "final" : "nonfinal",
-               svc_sock_reclen(svsk));
        svsk->sk_tcplen = 0;
-       svsk->sk_reclen = 0;
+       svsk->sk_marker = xdr_zero;
 }
 
-/*
- * Receive data from a TCP socket.
+/**
+ * svc_tcp_recvfrom - Receive data from a TCP socket
+ * @rqstp: request structure into which to receive an RPC Call
+ *
+ * Called in a loop when XPT_DATA has been set.
+ *
+ * Read the 4-byte stream record marker, then use the record length
+ * in that marker to set up exactly the resources needed to receive
+ * the next RPC message into @rqstp.
+ *
+ * Returns:
+ *   On success, the number of bytes in a received RPC Call, or
+ *   %0 if a complete RPC Call message was not ready to return
+ *
+ * The zero return case handles partial receives and callback Replies.
+ * The state of a partial receive is preserved in the svc_sock for
+ * the next call to svc_tcp_recvfrom.
  */
 static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 {
        struct svc_sock *svsk =
                container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
        struct svc_serv *serv = svsk->sk_xprt.xpt_server;
-       int             len;
-       struct kvec *vec;
-       unsigned int want, base;
+       size_t want, base;
+       ssize_t len;
        __be32 *p;
        __be32 calldir;
-       int pnum;
-
-       dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
-               svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags),
-               test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags),
-               test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags));
 
-       len = svc_tcp_recv_record(svsk, rqstp);
+       clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+       len = svc_tcp_read_marker(svsk, rqstp);
        if (len < 0)
                goto error;
 
        base = svc_tcp_restore_pages(svsk, rqstp);
-       want = svc_sock_reclen(svsk) - (svsk->sk_tcplen - sizeof(rpc_fraghdr));
-
-       vec = rqstp->rq_vec;
-
-       pnum = copy_pages_to_kvecs(&vec[0], &rqstp->rq_pages[0], base + want);
-
-       rqstp->rq_respages = &rqstp->rq_pages[pnum];
-       rqstp->rq_next_page = rqstp->rq_respages + 1;
-
-       /* Now receive data */
-       len = svc_recvfrom(rqstp, vec, pnum, base + want, base);
+       want = len - (svsk->sk_tcplen - sizeof(rpc_fraghdr));
+       len = svc_tcp_read_msg(rqstp, base + want, base);
        if (len >= 0) {
+               trace_svcsock_tcp_recv(&svsk->sk_xprt, len);
                svsk->sk_tcplen += len;
                svsk->sk_datalen += len;
        }
-       if (len != want || !svc_sock_final_rec(svsk)) {
-               svc_tcp_save_pages(svsk, rqstp);
-               if (len < 0 && len != -EAGAIN)
-                       goto err_delete;
-               if (len == want)
-                       svc_tcp_fragment_received(svsk);
-               else
-                       dprintk("svc: incomplete TCP record (%d of %d)\n",
-                               (int)(svsk->sk_tcplen - sizeof(rpc_fraghdr)),
-                               svc_sock_reclen(svsk));
-               goto err_noclose;
-       }
-
-       if (svsk->sk_datalen < 8) {
-               svsk->sk_datalen = 0;
-               goto err_delete; /* client is nuts. */
-       }
+       if (len != want || !svc_sock_final_rec(svsk))
+               goto err_incomplete;
+       if (svsk->sk_datalen < 8)
+               goto err_nuts;
 
        rqstp->rq_arg.len = svsk->sk_datalen;
        rqstp->rq_arg.page_base = 0;
@@ -1031,14 +1036,26 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 
        return rqstp->rq_arg.len;
 
+err_incomplete:
+       svc_tcp_save_pages(svsk, rqstp);
+       if (len < 0 && len != -EAGAIN)
+               goto err_delete;
+       if (len == want)
+               svc_tcp_fragment_received(svsk);
+       else
+               trace_svcsock_tcp_recv_short(&svsk->sk_xprt,
+                               svc_sock_reclen(svsk),
+                               svsk->sk_tcplen - sizeof(rpc_fraghdr));
+       goto err_noclose;
 error:
        if (len != -EAGAIN)
                goto err_delete;
-       dprintk("RPC: TCP recvfrom got EAGAIN\n");
+       trace_svcsock_tcp_recv_eagain(&svsk->sk_xprt, 0);
        return 0;
+err_nuts:
+       svsk->sk_datalen = 0;
 err_delete:
-       printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
-              svsk->sk_xprt.xpt_server->sv_name, -len);
+       trace_svcsock_tcp_recv_err(&svsk->sk_xprt, len);
        set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
 err_noclose:
        return 0;       /* record not complete */
@@ -1048,6 +1065,9 @@ err_noclose:
  * svc_tcp_sendto - Send out a reply on a TCP socket
  * @rqstp: completed svc_rqst
  *
+ * xpt_mutex ensures @rqstp's whole message is written to the socket
+ * without interruption.
+ *
  * Returns the number of bytes sent, or a negative errno.
  */
 static int svc_tcp_sendto(struct svc_rqst *rqstp)
@@ -1063,14 +1083,22 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
        unsigned int uninitialized_var(sent);
        int err;
 
-       svc_release_skb(rqstp);
+       svc_tcp_release_rqst(rqstp);
 
+       mutex_lock(&xprt->xpt_mutex);
+       if (svc_xprt_is_dead(xprt))
+               goto out_notconn;
        err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, marker, &sent);
        xdr_free_bvec(xdr);
+       trace_svcsock_tcp_send(xprt, err < 0 ? err : sent);
        if (err < 0 || sent != (xdr->len + sizeof(marker)))
                goto out_close;
+       mutex_unlock(&xprt->xpt_mutex);
        return sent;
 
+out_notconn:
+       mutex_unlock(&xprt->xpt_mutex);
+       return -ENOTCONN;
 out_close:
        pr_notice("rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n",
                  xprt->xpt_server->sv_name,
@@ -1078,6 +1106,7 @@ out_close:
                  (err < 0) ? err : sent, xdr->len);
        set_bit(XPT_CLOSE, &xprt->xpt_flags);
        svc_xprt_enqueue(xprt);
+       mutex_unlock(&xprt->xpt_mutex);
        return -EAGAIN;
 }
 
@@ -1094,7 +1123,7 @@ static const struct svc_xprt_ops svc_tcp_ops = {
        .xpo_recvfrom = svc_tcp_recvfrom,
        .xpo_sendto = svc_tcp_sendto,
        .xpo_read_payload = svc_sock_read_payload,
-       .xpo_release_rqst = svc_release_skb,
+       .xpo_release_rqst = svc_tcp_release_rqst,
        .xpo_detach = svc_tcp_sock_detach,
        .xpo_free = svc_sock_free,
        .xpo_has_wspace = svc_tcp_has_wspace,
@@ -1132,18 +1161,16 @@ static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv)
        set_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags);
        set_bit(XPT_CONG_CTRL, &svsk->sk_xprt.xpt_flags);
        if (sk->sk_state == TCP_LISTEN) {
-               dprintk("setting up TCP socket for listening\n");
                strcpy(svsk->sk_xprt.xpt_remotebuf, "listener");
                set_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags);
                sk->sk_data_ready = svc_tcp_listen_data_ready;
                set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
        } else {
-               dprintk("setting up TCP socket for reading\n");
                sk->sk_state_change = svc_tcp_state_change;
                sk->sk_data_ready = svc_data_ready;
                sk->sk_write_space = svc_write_space;
 
-               svsk->sk_reclen = 0;
+               svsk->sk_marker = xdr_zero;
                svsk->sk_tcplen = 0;
                svsk->sk_datalen = 0;
                memset(&svsk->sk_pages[0], 0, sizeof(svsk->sk_pages));
@@ -1188,7 +1215,6 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
        int             pmap_register = !(flags & SVC_SOCK_ANONYMOUS);
        int             err = 0;
 
-       dprintk("svc: svc_setup_socket %p\n", sock);
        svsk = kzalloc(sizeof(*svsk), GFP_KERNEL);
        if (!svsk)
                return ERR_PTR(-ENOMEM);
@@ -1225,12 +1251,7 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
        else
                svc_tcp_init(svsk, serv);
 
-       dprintk("svc: svc_setup_socket created %p (inet %p), "
-                       "listen %d close %d\n",
-                       svsk, svsk->sk_sk,
-                       test_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags),
-                       test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags));
-
+       trace_svcsock_new_socket(sock);
        return svsk;
 }
 
@@ -1322,11 +1343,6 @@ static struct svc_xprt *svc_create_socket(struct svc_serv *serv,
        struct sockaddr *newsin = (struct sockaddr *)&addr;
        int             newlen;
        int             family;
-       RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]);
-
-       dprintk("svc: svc_create_socket(%s, %d, %s)\n",
-                       serv->sv_program->pg_name, protocol,
-                       __svc_print_addr(sin, buf, sizeof(buf)));
 
        if (protocol != IPPROTO_UDP && protocol != IPPROTO_TCP) {
                printk(KERN_WARNING "svc: only UDP and TCP "
@@ -1383,7 +1399,6 @@ static struct svc_xprt *svc_create_socket(struct svc_serv *serv,
        svc_xprt_set_local(&svsk->sk_xprt, newsin, newlen);
        return (struct svc_xprt *)svsk;
 bummer:
-       dprintk("svc: svc_create_socket error = %d\n", -error);
        sock_release(sock);
        return ERR_PTR(error);
 }
@@ -1397,8 +1412,6 @@ static void svc_sock_detach(struct svc_xprt *xprt)
        struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
        struct sock *sk = svsk->sk_sk;
 
-       dprintk("svc: svc_sock_detach(%p)\n", svsk);
-
        /* put back the old socket callbacks */
        lock_sock(sk);
        sk->sk_state_change = svsk->sk_ostate;
@@ -1415,8 +1428,6 @@ static void svc_tcp_sock_detach(struct svc_xprt *xprt)
 {
        struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
 
-       dprintk("svc: svc_tcp_sock_detach(%p)\n", svsk);
-
        svc_sock_detach(xprt);
 
        if (!test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
@@ -1431,7 +1442,6 @@ static void svc_tcp_sock_detach(struct svc_xprt *xprt)
 static void svc_sock_free(struct svc_xprt *xprt)
 {
        struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
-       dprintk("svc: svc_sock_free(%p)\n", svsk);
 
        if (svsk->sk_sock->file)
                sockfd_put(svsk->sk_sock);