SUNRPC: Use poll() to fix up the socket requeue races
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Wed, 30 Jan 2019 19:51:26 +0000 (14:51 -0500)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Wed, 20 Feb 2019 22:33:54 +0000 (17:33 -0500)
Because we clear XPRT_SOCK_DATA_READY before reading, we can end up
with a situation where new data arrives, causing xs_data_ready() to
queue up a second receive worker job for the same socket, which then
immediately gets stuck waiting on the transport receive mutex.
The fix is to only clear XPRT_SOCK_DATA_READY once we're done reading,
and then to use poll() to check if we might need to queue up a new
job in order to deal with any new data.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
net/sunrpc/xprtsock.c

index f5d7dcd9e8d928ff4a508e3c2416e883f624a3db..da45bb1e931e7e11e4d6741a97c5a2538af7b298 100644 (file)
@@ -656,13 +656,34 @@ out_err:
        return ret != 0 ? ret : -ESHUTDOWN;
 }
 
+static __poll_t xs_poll_socket(struct sock_xprt *transport)
+{
+       return transport->sock->ops->poll(NULL, transport->sock, NULL);
+}
+
+static bool xs_poll_socket_readable(struct sock_xprt *transport)
+{
+       __poll_t events = xs_poll_socket(transport);
+
+       return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP);
+}
+
+static void xs_poll_check_readable(struct sock_xprt *transport)
+{
+
+       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+       if (!xs_poll_socket_readable(transport))
+               return;
+       if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+               queue_work(xprtiod_workqueue, &transport->recv_worker);
+}
+
 static void xs_stream_data_receive(struct sock_xprt *transport)
 {
        size_t read = 0;
        ssize_t ret = 0;
 
        mutex_lock(&transport->recv_mutex);
-       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
        if (transport->sock == NULL)
                goto out;
        for (;;) {
@@ -672,6 +693,7 @@ static void xs_stream_data_receive(struct sock_xprt *transport)
                read += ret;
                cond_resched();
        }
+       xs_poll_check_readable(transport);
 out:
        mutex_unlock(&transport->recv_mutex);
        trace_xs_stream_read_data(&transport->xprt, ret, read);
@@ -1362,7 +1384,6 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
        int err;
 
        mutex_lock(&transport->recv_mutex);
-       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
        sk = transport->inet;
        if (sk == NULL)
                goto out;
@@ -1374,6 +1395,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
                consume_skb(skb);
                cond_resched();
        }
+       xs_poll_check_readable(transport);
 out:
        mutex_unlock(&transport->recv_mutex);
 }