rxrpc: Only disconnect calls in the I/O thread
authorDavid Howells <dhowells@redhat.com>
Wed, 12 Oct 2022 16:01:25 +0000 (17:01 +0100)
committerDavid Howells <dhowells@redhat.com>
Fri, 6 Jan 2023 09:43:31 +0000 (09:43 +0000)
Only perform call disconnection in the I/O thread to reduce the locking
requirement.

This is the first part of a fix for a race that exists between call
connection and call disconnection whereby the data transmission code adds
the call to the peer error distribution list after the call has been
disconnected (say by the rxrpc socket getting closed).

The fix is to complete the process of moving call connection, data
transmission and call disconnection into the I/O thread and thus forcibly
serialising them.

Note that the issue may predate the overhaul to an I/O thread model that
were included in the merge window for v6.2, but the timing is very much
changed by the change given below.

Fixes: cf37b5987508 ("rxrpc: Move DATA transmission into call processor work item")
Reported-by: syzbot+c22650d2844392afdcfd@syzkaller.appspotmail.com
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

include/trace/events/rxrpc.h
net/rxrpc/call_event.c
net/rxrpc/call_object.c
net/rxrpc/input.c
net/rxrpc/recvmsg.c

index c44cc01..eac5136 100644 (file)
@@ -18,6 +18,7 @@
  */
 #define rxrpc_call_poke_traces \
        EM(rxrpc_call_poke_abort,               "Abort")        \
+       EM(rxrpc_call_poke_complete,            "Compl")        \
        EM(rxrpc_call_poke_error,               "Error")        \
        EM(rxrpc_call_poke_idle,                "Idle")         \
        EM(rxrpc_call_poke_start,               "Start")        \
index b7efecf..b2fc3fa 100644 (file)
@@ -484,8 +484,13 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
        }
 
 out:
-       if (call->state == RXRPC_CALL_COMPLETE)
+       if (call->state == RXRPC_CALL_COMPLETE) {
                del_timer_sync(&call->timer);
+               if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
+                       rxrpc_disconnect_call(call);
+               if (call->security)
+                       call->security->free_call_crypto(call);
+       }
        if (call->acks_hard_ack != call->tx_bottom)
                rxrpc_shrink_call_tx_buffer(call);
        _leave("");
index 298b7c4..13aac3c 100644 (file)
@@ -50,7 +50,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
        struct rxrpc_local *local = call->local;
        bool busy;
 
-       if (call->state < RXRPC_CALL_COMPLETE) {
+       if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) {
                spin_lock_bh(&local->lock);
                busy = !list_empty(&call->attend_link);
                trace_rxrpc_poke_call(call, busy, what);
@@ -533,13 +533,10 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
        trace_rxrpc_call(call->debug_id, refcount_read(&call->ref),
                         call->flags, rxrpc_call_see_release);
 
-       ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
-
        if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
                BUG();
 
        rxrpc_put_call_slot(call);
-       del_timer_sync(&call->timer);
 
        /* Make sure we don't get any more notifications */
        write_lock(&rx->recvmsg_lock);
@@ -572,10 +569,6 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 
        _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
 
-       if (conn && !test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
-               rxrpc_disconnect_call(call);
-       if (call->security)
-               call->security->free_call_crypto(call);
        _leave("");
 }
 
index 1f03a28..bb4beb4 100644 (file)
@@ -997,8 +997,6 @@ void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb)
  */
 void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
 {
-       struct rxrpc_connection *conn = call->conn;
-
        switch (READ_ONCE(call->state)) {
        case RXRPC_CALL_SERVER_AWAIT_ACK:
                rxrpc_call_completed(call);
@@ -1012,8 +1010,4 @@ void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
        }
 
        rxrpc_input_call_event(call, skb);
-
-       spin_lock(&conn->bundle->channel_lock);
-       __rxrpc_disconnect_call(conn, call);
-       spin_unlock(&conn->bundle->channel_lock);
 }
index a4ccdc0..8d5fe65 100644 (file)
@@ -201,6 +201,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
        case RXRPC_CALL_CLIENT_RECV_REPLY:
                __rxrpc_call_completed(call);
                write_unlock(&call->state_lock);
+               rxrpc_poke_call(call, rxrpc_call_poke_complete);
                break;
 
        case RXRPC_CALL_SERVER_RECV_REQUEST: