rxrpc: Move the client conn cache management to the I/O thread
authorDavid Howells <dhowells@redhat.com>
Wed, 2 Nov 2022 16:46:13 +0000 (16:46 +0000)
committerDavid Howells <dhowells@redhat.com>
Fri, 6 Jan 2023 09:43:33 +0000 (09:43 +0000)
Move the management of the client connection cache to the I/O thread rather
than managing it from the namespace as an aggregate across all the local
endpoints within the namespace.

This will allow a load of locking to be got rid of in a future patch as
only the I/O thread will be looking at the this.

The downside is that the total number of cached connections on the system
can get higher because the limit is now per-local rather than per-netns.
We can, however, keep the number of client conns in use across the entire
netfs and use that to reduce the expiration time of idle connection.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

net/rxrpc/ar-internal.h
net/rxrpc/conn_client.c
net/rxrpc/conn_object.c
net/rxrpc/io_thread.c
net/rxrpc/local_object.c
net/rxrpc/net_ns.c

index 751b1903fd6e1379f5f6bb08b3c627145f347ac0..de84061a54472b626f1f19caa093b5d3e1233467 100644 (file)
@@ -76,13 +76,7 @@ struct rxrpc_net {
 
        bool                    live;
 
 
        bool                    live;
 
-       bool                    kill_all_client_conns;
        atomic_t                nr_client_conns;
        atomic_t                nr_client_conns;
-       spinlock_t              client_conn_cache_lock; /* Lock for ->*_client_conns */
-       struct mutex            client_conn_discard_lock; /* Prevent multiple discarders */
-       struct list_head        idle_client_conns;
-       struct work_struct      client_conn_reaper;
-       struct timer_list       client_conn_reap_timer;
 
        struct hlist_head       local_endpoints;
        struct mutex            local_mutex;    /* Lock for ->local_endpoints */
 
        struct hlist_head       local_endpoints;
        struct mutex            local_mutex;    /* Lock for ->local_endpoints */
@@ -294,8 +288,16 @@ struct rxrpc_local {
        struct sk_buff_head     rx_queue;       /* Received packets */
        struct list_head        conn_attend_q;  /* Conns requiring immediate attention */
        struct list_head        call_attend_q;  /* Calls requiring immediate attention */
        struct sk_buff_head     rx_queue;       /* Received packets */
        struct list_head        conn_attend_q;  /* Conns requiring immediate attention */
        struct list_head        call_attend_q;  /* Calls requiring immediate attention */
+
        struct rb_root          client_bundles; /* Client connection bundles by socket params */
        spinlock_t              client_bundles_lock; /* Lock for client_bundles */
        struct rb_root          client_bundles; /* Client connection bundles by socket params */
        spinlock_t              client_bundles_lock; /* Lock for client_bundles */
+       bool                    kill_all_client_conns;
+       spinlock_t              client_conn_cache_lock; /* Lock for ->*_client_conns */
+       struct list_head        idle_client_conns;
+       struct timer_list       client_conn_reap_timer;
+       unsigned long           client_conn_flags;
+#define RXRPC_CLIENT_CONN_REAP_TIMER   0       /* The client conn reap timer expired */
+
        spinlock_t              lock;           /* access lock */
        rwlock_t                services_lock;  /* lock for services list */
        int                     debug_id;       /* debug ID for printks */
        spinlock_t              lock;           /* access lock */
        rwlock_t                services_lock;  /* lock for services list */
        int                     debug_id;       /* debug ID for printks */
@@ -946,8 +948,7 @@ void rxrpc_expose_client_call(struct rxrpc_call *);
 void rxrpc_disconnect_client_call(struct rxrpc_bundle *, struct rxrpc_call *);
 void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle);
 void rxrpc_put_client_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
 void rxrpc_disconnect_client_call(struct rxrpc_bundle *, struct rxrpc_call *);
 void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle);
 void rxrpc_put_client_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
-void rxrpc_discard_expired_client_conns(struct work_struct *);
-void rxrpc_destroy_all_client_connections(struct rxrpc_net *);
+void rxrpc_discard_expired_client_conns(struct rxrpc_local *local);
 void rxrpc_clean_up_local_conns(struct rxrpc_local *);
 
 /*
 void rxrpc_clean_up_local_conns(struct rxrpc_local *);
 
 /*
index 8b5ea68dc47e740d7a50bf045f9dd3246dd19cb9..ebb43f65ebc51388d7a9417e8ec374dc8bb9021e 100644 (file)
@@ -578,17 +578,17 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
  */
 static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn)
 {
  */
 static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn)
 {
-       struct rxrpc_net *rxnet = bundle->local->rxnet;
+       struct rxrpc_local *local = bundle->local;
        bool drop_ref;
 
        if (!list_empty(&conn->cache_link)) {
                drop_ref = false;
        bool drop_ref;
 
        if (!list_empty(&conn->cache_link)) {
                drop_ref = false;
-               spin_lock(&rxnet->client_conn_cache_lock);
+               spin_lock(&local->client_conn_cache_lock);
                if (!list_empty(&conn->cache_link)) {
                        list_del_init(&conn->cache_link);
                        drop_ref = true;
                }
                if (!list_empty(&conn->cache_link)) {
                        list_del_init(&conn->cache_link);
                        drop_ref = true;
                }
-               spin_unlock(&rxnet->client_conn_cache_lock);
+               spin_unlock(&local->client_conn_cache_lock);
                if (drop_ref)
                        rxrpc_put_connection(conn, rxrpc_conn_put_unidle);
        }
                if (drop_ref)
                        rxrpc_put_connection(conn, rxrpc_conn_put_unidle);
        }
@@ -710,14 +710,10 @@ out:
 int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
 {
        struct rxrpc_bundle *bundle;
 int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
 {
        struct rxrpc_bundle *bundle;
-       struct rxrpc_local *local = call->local;
-       struct rxrpc_net *rxnet = local->rxnet;
        int ret = 0;
 
        _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
 
        int ret = 0;
 
        _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
 
-       rxrpc_discard_expired_client_conns(&rxnet->client_conn_reaper);
-
        rxrpc_get_call(call, rxrpc_call_get_io_thread);
 
        bundle = rxrpc_prep_call(call, gfp);
        rxrpc_get_call(call, rxrpc_call_get_io_thread);
 
        bundle = rxrpc_prep_call(call, gfp);
@@ -787,14 +783,14 @@ void rxrpc_expose_client_call(struct rxrpc_call *call)
 /*
  * Set the reap timer.
  */
 /*
  * Set the reap timer.
  */
-static void rxrpc_set_client_reap_timer(struct rxrpc_net *rxnet)
+static void rxrpc_set_client_reap_timer(struct rxrpc_local *local)
 {
 {
-       if (!rxnet->kill_all_client_conns) {
+       if (!local->kill_all_client_conns) {
                unsigned long now = jiffies;
                unsigned long reap_at = now + rxrpc_conn_idle_client_expiry;
 
                unsigned long now = jiffies;
                unsigned long reap_at = now + rxrpc_conn_idle_client_expiry;
 
-               if (rxnet->live)
-                       timer_reduce(&rxnet->client_conn_reap_timer, reap_at);
+               if (local->rxnet->live)
+                       timer_reduce(&local->client_conn_reap_timer, reap_at);
        }
 }
 
        }
 }
 
@@ -805,7 +801,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
 {
        struct rxrpc_connection *conn;
        struct rxrpc_channel *chan = NULL;
 {
        struct rxrpc_connection *conn;
        struct rxrpc_channel *chan = NULL;
-       struct rxrpc_net *rxnet = bundle->local->rxnet;
+       struct rxrpc_local *local = bundle->local;
        unsigned int channel;
        bool may_reuse;
        u32 cid;
        unsigned int channel;
        bool may_reuse;
        u32 cid;
@@ -895,11 +891,11 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
                conn->idle_timestamp = jiffies;
 
                rxrpc_get_connection(conn, rxrpc_conn_get_idle);
                conn->idle_timestamp = jiffies;
 
                rxrpc_get_connection(conn, rxrpc_conn_get_idle);
-               spin_lock(&rxnet->client_conn_cache_lock);
-               list_move_tail(&conn->cache_link, &rxnet->idle_client_conns);
-               spin_unlock(&rxnet->client_conn_cache_lock);
+               spin_lock(&local->client_conn_cache_lock);
+               list_move_tail(&conn->cache_link, &local->idle_client_conns);
+               spin_unlock(&local->client_conn_cache_lock);
 
 
-               rxrpc_set_client_reap_timer(rxnet);
+               rxrpc_set_client_reap_timer(local);
        }
 
 out:
        }
 
 out:
@@ -986,42 +982,34 @@ void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
  * This may be called from conn setup or from a work item so cannot be
  * considered non-reentrant.
  */
  * This may be called from conn setup or from a work item so cannot be
  * considered non-reentrant.
  */
-void rxrpc_discard_expired_client_conns(struct work_struct *work)
+void rxrpc_discard_expired_client_conns(struct rxrpc_local *local)
 {
        struct rxrpc_connection *conn;
 {
        struct rxrpc_connection *conn;
-       struct rxrpc_net *rxnet =
-               container_of(work, struct rxrpc_net, client_conn_reaper);
        unsigned long expiry, conn_expires_at, now;
        unsigned int nr_conns;
 
        _enter("");
 
        unsigned long expiry, conn_expires_at, now;
        unsigned int nr_conns;
 
        _enter("");
 
-       if (list_empty(&rxnet->idle_client_conns)) {
+       if (list_empty(&local->idle_client_conns)) {
                _leave(" [empty]");
                return;
        }
 
                _leave(" [empty]");
                return;
        }
 
-       /* Don't double up on the discarding */
-       if (!mutex_trylock(&rxnet->client_conn_discard_lock)) {
-               _leave(" [already]");
-               return;
-       }
-
        /* We keep an estimate of what the number of conns ought to be after
         * we've discarded some so that we don't overdo the discarding.
         */
        /* We keep an estimate of what the number of conns ought to be after
         * we've discarded some so that we don't overdo the discarding.
         */
-       nr_conns = atomic_read(&rxnet->nr_client_conns);
+       nr_conns = atomic_read(&local->rxnet->nr_client_conns);
 
 next:
 
 next:
-       spin_lock(&rxnet->client_conn_cache_lock);
+       spin_lock(&local->client_conn_cache_lock);
 
 
-       if (list_empty(&rxnet->idle_client_conns))
+       if (list_empty(&local->idle_client_conns))
                goto out;
 
                goto out;
 
-       conn = list_entry(rxnet->idle_client_conns.next,
+       conn = list_entry(local->idle_client_conns.next,
                          struct rxrpc_connection, cache_link);
 
                          struct rxrpc_connection, cache_link);
 
-       if (!rxnet->kill_all_client_conns) {
+       if (!local->kill_all_client_conns) {
                /* If the number of connections is over the reap limit, we
                 * expedite discard by reducing the expiry timeout.  We must,
                 * however, have at least a short grace period to be able to do
                /* If the number of connections is over the reap limit, we
                 * expedite discard by reducing the expiry timeout.  We must,
                 * however, have at least a short grace period to be able to do
@@ -1044,7 +1032,7 @@ next:
        trace_rxrpc_client(conn, -1, rxrpc_client_discard);
        list_del_init(&conn->cache_link);
 
        trace_rxrpc_client(conn, -1, rxrpc_client_discard);
        list_del_init(&conn->cache_link);
 
-       spin_unlock(&rxnet->client_conn_cache_lock);
+       spin_unlock(&local->client_conn_cache_lock);
 
        rxrpc_unbundle_conn(conn);
        /* Drop the ->cache_link ref */
 
        rxrpc_unbundle_conn(conn);
        /* Drop the ->cache_link ref */
@@ -1062,32 +1050,11 @@ not_yet_expired:
         * then things get messier.
         */
        _debug("not yet");
         * then things get messier.
         */
        _debug("not yet");
-       if (!rxnet->kill_all_client_conns)
-               timer_reduce(&rxnet->client_conn_reap_timer, conn_expires_at);
+       if (!local->kill_all_client_conns)
+               timer_reduce(&local->client_conn_reap_timer, conn_expires_at);
 
 out:
 
 out:
-       spin_unlock(&rxnet->client_conn_cache_lock);
-       mutex_unlock(&rxnet->client_conn_discard_lock);
-       _leave("");
-}
-
-/*
- * Preemptively destroy all the client connection records rather than waiting
- * for them to time out
- */
-void rxrpc_destroy_all_client_connections(struct rxrpc_net *rxnet)
-{
-       _enter("");
-
-       spin_lock(&rxnet->client_conn_cache_lock);
-       rxnet->kill_all_client_conns = true;
-       spin_unlock(&rxnet->client_conn_cache_lock);
-
-       del_timer_sync(&rxnet->client_conn_reap_timer);
-
-       if (!rxrpc_queue_work(&rxnet->client_conn_reaper))
-               _debug("destroy: queue failed");
-
+       spin_unlock(&local->client_conn_cache_lock);
        _leave("");
 }
 
        _leave("");
 }
 
@@ -1097,14 +1064,19 @@ void rxrpc_destroy_all_client_connections(struct rxrpc_net *rxnet)
 void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
 {
        struct rxrpc_connection *conn, *tmp;
 void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
 {
        struct rxrpc_connection *conn, *tmp;
-       struct rxrpc_net *rxnet = local->rxnet;
        LIST_HEAD(graveyard);
 
        _enter("");
 
        LIST_HEAD(graveyard);
 
        _enter("");
 
-       spin_lock(&rxnet->client_conn_cache_lock);
+       spin_lock(&local->client_conn_cache_lock);
+       local->kill_all_client_conns = true;
+       spin_unlock(&local->client_conn_cache_lock);
+
+       del_timer_sync(&local->client_conn_reap_timer);
+
+       spin_lock(&local->client_conn_cache_lock);
 
 
-       list_for_each_entry_safe(conn, tmp, &rxnet->idle_client_conns,
+       list_for_each_entry_safe(conn, tmp, &local->idle_client_conns,
                                 cache_link) {
                if (conn->local == local) {
                        atomic_dec(&conn->active);
                                 cache_link) {
                if (conn->local == local) {
                        atomic_dec(&conn->active);
@@ -1113,7 +1085,7 @@ void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
                }
        }
 
                }
        }
 
-       spin_unlock(&rxnet->client_conn_cache_lock);
+       spin_unlock(&local->client_conn_cache_lock);
 
        while (!list_empty(&graveyard)) {
                conn = list_entry(graveyard.next,
 
        while (!list_empty(&graveyard)) {
                conn = list_entry(graveyard.next,
index 2a7d5378300cc38a897c623766a2a77d78c249f4..3d8c1dc6a82ae6e76125d834f7775a70f78f541e 100644 (file)
@@ -470,7 +470,6 @@ void rxrpc_destroy_all_connections(struct rxrpc_net *rxnet)
        _enter("");
 
        atomic_dec(&rxnet->nr_conns);
        _enter("");
 
        atomic_dec(&rxnet->nr_conns);
-       rxrpc_destroy_all_client_connections(rxnet);
 
        del_timer_sync(&rxnet->service_conn_reap_timer);
        rxrpc_queue_work(&rxnet->service_conn_reaper);
 
        del_timer_sync(&rxnet->service_conn_reap_timer);
        rxrpc_queue_work(&rxnet->service_conn_reaper);
index 751139b3c1ac964e4ba3415cc9e294debcec7f5f..a299cc34c1403aaa2c9e8fb24ec56b6a1f015469 100644 (file)
@@ -435,6 +435,10 @@ int rxrpc_io_thread(void *data)
                        continue;
                }
 
                        continue;
                }
 
+               if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
+                                      &local->client_conn_flags))
+                       rxrpc_discard_expired_client_conns(local);
+
                /* Deal with calls that want immediate attention. */
                if ((call = list_first_entry_or_null(&local->call_attend_q,
                                                     struct rxrpc_call,
                /* Deal with calls that want immediate attention. */
                if ((call = list_first_entry_or_null(&local->call_attend_q,
                                                     struct rxrpc_call,
index ca8b3ee68b59700e7675631690f1dfcfcec2dfd1..9bc8d08ca12cf09136dfd9a6805bb947b72e296c 100644 (file)
@@ -82,6 +82,16 @@ static long rxrpc_local_cmp_key(const struct rxrpc_local *local,
        }
 }
 
        }
 }
 
+static void rxrpc_client_conn_reap_timeout(struct timer_list *timer)
+{
+       struct rxrpc_local *local =
+               container_of(timer, struct rxrpc_local, client_conn_reap_timer);
+
+       if (local->kill_all_client_conns &&
+           test_and_set_bit(RXRPC_CLIENT_CONN_REAP_TIMER, &local->client_conn_flags))
+               rxrpc_wake_up_io_thread(local);
+}
+
 /*
  * Allocate a new local endpoint.
  */
 /*
  * Allocate a new local endpoint.
  */
@@ -103,8 +113,15 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
                skb_queue_head_init(&local->rx_queue);
                INIT_LIST_HEAD(&local->conn_attend_q);
                INIT_LIST_HEAD(&local->call_attend_q);
                skb_queue_head_init(&local->rx_queue);
                INIT_LIST_HEAD(&local->conn_attend_q);
                INIT_LIST_HEAD(&local->call_attend_q);
+
                local->client_bundles = RB_ROOT;
                spin_lock_init(&local->client_bundles_lock);
                local->client_bundles = RB_ROOT;
                spin_lock_init(&local->client_bundles_lock);
+               local->kill_all_client_conns = false;
+               spin_lock_init(&local->client_conn_cache_lock);
+               INIT_LIST_HEAD(&local->idle_client_conns);
+               timer_setup(&local->client_conn_reap_timer,
+                           rxrpc_client_conn_reap_timeout, 0);
+
                spin_lock_init(&local->lock);
                rwlock_init(&local->services_lock);
                local->debug_id = atomic_inc_return(&rxrpc_debug_id);
                spin_lock_init(&local->lock);
                rwlock_init(&local->services_lock);
                local->debug_id = atomic_inc_return(&rxrpc_debug_id);
index 5905530e2f33b439173f90b6b6c69beb1464e5f2..a0319c040c25dea6a34441292409c9a39b61283e 100644 (file)
 
 unsigned int rxrpc_net_id;
 
 
 unsigned int rxrpc_net_id;
 
-static void rxrpc_client_conn_reap_timeout(struct timer_list *timer)
-{
-       struct rxrpc_net *rxnet =
-               container_of(timer, struct rxrpc_net, client_conn_reap_timer);
-
-       if (rxnet->live)
-               rxrpc_queue_work(&rxnet->client_conn_reaper);
-}
-
 static void rxrpc_service_conn_reap_timeout(struct timer_list *timer)
 {
        struct rxrpc_net *rxnet =
 static void rxrpc_service_conn_reap_timeout(struct timer_list *timer)
 {
        struct rxrpc_net *rxnet =
@@ -63,14 +54,6 @@ static __net_init int rxrpc_init_net(struct net *net)
                    rxrpc_service_conn_reap_timeout, 0);
 
        atomic_set(&rxnet->nr_client_conns, 0);
                    rxrpc_service_conn_reap_timeout, 0);
 
        atomic_set(&rxnet->nr_client_conns, 0);
-       rxnet->kill_all_client_conns = false;
-       spin_lock_init(&rxnet->client_conn_cache_lock);
-       mutex_init(&rxnet->client_conn_discard_lock);
-       INIT_LIST_HEAD(&rxnet->idle_client_conns);
-       INIT_WORK(&rxnet->client_conn_reaper,
-                 rxrpc_discard_expired_client_conns);
-       timer_setup(&rxnet->client_conn_reap_timer,
-                   rxrpc_client_conn_reap_timeout, 0);
 
        INIT_HLIST_HEAD(&rxnet->local_endpoints);
        mutex_init(&rxnet->local_mutex);
 
        INIT_HLIST_HEAD(&rxnet->local_endpoints);
        mutex_init(&rxnet->local_mutex);