rxrpc: Make the local endpoint hold a ref on a connected call
authorDavid Howells <dhowells@redhat.com>
Wed, 2 Nov 2022 10:24:29 +0000 (10:24 +0000)
committerDavid Howells <dhowells@redhat.com>
Fri, 6 Jan 2023 09:43:31 +0000 (09:43 +0000)
Make the local endpoint and it's I/O thread hold a reference on a connected
call until that call is disconnected.  Without this, we're reliant on
either the AF_RXRPC socket to hold a ref (which is dropped when the call is
released) or a queued work item to hold a ref (the work item is being
replaced with the I/O thread).

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

include/trace/events/rxrpc.h
net/rxrpc/call_object.c
net/rxrpc/conn_client.c
net/rxrpc/conn_object.c

index 5f9dd73..b526d98 100644 (file)
        E_(rxrpc_client_to_idle,                "->Idle")
 
 #define rxrpc_call_traces \
+       EM(rxrpc_call_get_io_thread,            "GET iothread") \
        EM(rxrpc_call_get_input,                "GET input   ") \
        EM(rxrpc_call_get_kernel_service,       "GET krnl-srv") \
        EM(rxrpc_call_get_notify_socket,        "GET notify  ") \
        EM(rxrpc_call_new_prealloc_service,     "NEW prealloc") \
        EM(rxrpc_call_put_discard_prealloc,     "PUT disc-pre") \
        EM(rxrpc_call_put_discard_error,        "PUT disc-err") \
+       EM(rxrpc_call_put_io_thread,            "PUT iothread") \
        EM(rxrpc_call_put_input,                "PUT input   ") \
        EM(rxrpc_call_put_kernel,               "PUT kernel  ") \
        EM(rxrpc_call_put_poke,                 "PUT poke    ") \
        EM(rxrpc_call_see_activate_client,      "SEE act-clnt") \
        EM(rxrpc_call_see_connect_failed,       "SEE con-fail") \
        EM(rxrpc_call_see_connected,            "SEE connect ") \
+       EM(rxrpc_call_see_disconnected,         "SEE disconn ") \
        EM(rxrpc_call_see_distribute_error,     "SEE dist-err") \
        EM(rxrpc_call_see_input,                "SEE input   ") \
        EM(rxrpc_call_see_release,              "SEE release ") \
index 89dcf60..239fc3c 100644 (file)
@@ -453,6 +453,8 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
                BUG();
        }
 
+       rxrpc_get_call(call, rxrpc_call_get_io_thread);
+
        /* Set the channel for this call.  We don't get channel_lock as we're
         * only defending against the data_ready handler (which we're called
         * from) and the RESPONSE packet parser (which is only really
index e4063c4..1edd658 100644 (file)
@@ -725,8 +725,11 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
 
        rxrpc_discard_expired_client_conns(&rxnet->client_conn_reaper);
 
+       rxrpc_get_call(call, rxrpc_call_get_io_thread);
+
        bundle = rxrpc_prep_call(rx, call, cp, srx, gfp);
        if (IS_ERR(bundle)) {
+               rxrpc_put_call(call, rxrpc_call_get_io_thread);
                ret = PTR_ERR(bundle);
                goto out;
        }
@@ -820,7 +823,6 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
        _enter("c=%x", call->debug_id);
 
        spin_lock(&bundle->channel_lock);
-       set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
 
        /* Calls that have never actually been assigned a channel can simply be
         * discarded.
@@ -912,8 +914,6 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
 
 out:
        spin_unlock(&bundle->channel_lock);
-       _leave("");
-       return;
 }
 
 /*
index 3c8f83d..2bd3f62 100644 (file)
@@ -178,6 +178,9 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
 {
        struct rxrpc_connection *conn = call->conn;
 
+       set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
+       rxrpc_see_call(call, rxrpc_call_see_disconnected);
+
        call->peer->cong_ssthresh = call->cong_ssthresh;
 
        if (!hlist_unhashed(&call->error_link)) {
@@ -186,18 +189,20 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
                spin_unlock(&call->peer->lock);
        }
 
-       if (rxrpc_is_client_call(call))
-               return rxrpc_disconnect_client_call(conn->bundle, call);
+       if (rxrpc_is_client_call(call)) {
+               rxrpc_disconnect_client_call(conn->bundle, call);
+       } else {
+               spin_lock(&conn->bundle->channel_lock);
+               __rxrpc_disconnect_call(conn, call);
+               spin_unlock(&conn->bundle->channel_lock);
 
-       spin_lock(&conn->bundle->channel_lock);
-       __rxrpc_disconnect_call(conn, call);
-       spin_unlock(&conn->bundle->channel_lock);
+               conn->idle_timestamp = jiffies;
+               if (atomic_dec_and_test(&conn->active))
+                       rxrpc_set_service_reap_timer(conn->rxnet,
+                                                    jiffies + rxrpc_connection_expiry);
+       }
 
-       set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
-       conn->idle_timestamp = jiffies;
-       if (atomic_dec_and_test(&conn->active))
-               rxrpc_set_service_reap_timer(conn->rxnet,
-                                            jiffies + rxrpc_connection_expiry);
+       rxrpc_put_call(call, rxrpc_call_put_io_thread);
 }
 
 /*