Only perform call disconnection in the I/O thread to reduce the locking
requirement.
This is the first part of a fix for a race that exists between call
connection and call disconnection whereby the data transmission code adds
the call to the peer error distribution list after the call has been
disconnected (say by the rxrpc socket getting closed).
The fix is to complete the process of moving call connection, data
transmission and call disconnection into the I/O thread and thus forcibly
serialising them.
Note that the issue may predate the overhaul to an I/O thread model that
were included in the merge window for v6.2, but the timing is very much
changed by the change given below.
Fixes:
cf37b5987508 ("rxrpc: Move DATA transmission into call processor work item")
Reported-by: syzbot+c22650d2844392afdcfd@syzkaller.appspotmail.com
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
*/
#define rxrpc_call_poke_traces \
EM(rxrpc_call_poke_abort, "Abort") \
+ EM(rxrpc_call_poke_complete, "Compl") \
EM(rxrpc_call_poke_error, "Error") \
EM(rxrpc_call_poke_idle, "Idle") \
EM(rxrpc_call_poke_start, "Start") \
}
out:
- if (call->state == RXRPC_CALL_COMPLETE)
+ if (call->state == RXRPC_CALL_COMPLETE) {
del_timer_sync(&call->timer);
+ if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
+ rxrpc_disconnect_call(call);
+ if (call->security)
+ call->security->free_call_crypto(call);
+ }
if (call->acks_hard_ack != call->tx_bottom)
rxrpc_shrink_call_tx_buffer(call);
_leave("");
struct rxrpc_local *local = call->local;
bool busy;
- if (call->state < RXRPC_CALL_COMPLETE) {
+ if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) {
spin_lock_bh(&local->lock);
busy = !list_empty(&call->attend_link);
trace_rxrpc_poke_call(call, busy, what);
trace_rxrpc_call(call->debug_id, refcount_read(&call->ref),
call->flags, rxrpc_call_see_release);
- ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
-
if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
BUG();
rxrpc_put_call_slot(call);
- del_timer_sync(&call->timer);
/* Make sure we don't get any more notifications */
write_lock(&rx->recvmsg_lock);
_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
- if (conn && !test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
- rxrpc_disconnect_call(call);
- if (call->security)
- call->security->free_call_crypto(call);
_leave("");
}
*/
void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
{
- struct rxrpc_connection *conn = call->conn;
-
switch (READ_ONCE(call->state)) {
case RXRPC_CALL_SERVER_AWAIT_ACK:
rxrpc_call_completed(call);
}
rxrpc_input_call_event(call, skb);
-
- spin_lock(&conn->bundle->channel_lock);
- __rxrpc_disconnect_call(conn, call);
- spin_unlock(&conn->bundle->channel_lock);
}
case RXRPC_CALL_CLIENT_RECV_REPLY:
__rxrpc_call_completed(call);
write_unlock(&call->state_lock);
+ rxrpc_poke_call(call, rxrpc_call_poke_complete);
break;
case RXRPC_CALL_SERVER_RECV_REQUEST: