rxrpc: Don't hold a ref for connection workqueue
authorDavid Howells <dhowells@redhat.com>
Fri, 25 Nov 2022 12:43:50 +0000 (12:43 +0000)
committerDavid Howells <dhowells@redhat.com>
Thu, 1 Dec 2022 13:36:40 +0000 (13:36 +0000)
Currently, rxrpc gives the connection's work item a ref on the connection
when it queues it - and this is called from the timer expiration function.
The problem comes when queue_work() fails (ie. the work item is already
queued): the timer routine must put the ref - but this may cause the
cleanup code to run.

This has the unfortunate effect that the cleanup code may then be run in
softirq context - which means that any spinlocks it might need to touch
have to be guarded to disable softirqs (ie. they need a "_bh" suffix).

 (1) Don't give a ref to the work item.

 (2) Simplify handling of service connections by adding a separate active
     count so that the refcount isn't also used for this.

 (3) Connection destruction for both client and service connections can
     then be cleaned up by putting rxrpc_put_connection() out of line and
     making a tidy progression through the destruction code (offloaded to a
     workqueue if put from softirq or processor function context).  The RCU
     part of the cleanup then only deals with the freeing at the end.

 (4) Make rxrpc_queue_conn() return immediately if it sees the active count
     is -1 rather then queuing the connection.

 (5) Make sure that the cleanup routine waits for the work item to
     complete.

 (6) Stash the rxrpc_net pointer in the conn struct so that the rcu free
     routine can use it, even if the local endpoint has been freed.

Unfortunately, neither the timer nor the work item can simply get around
the problem by just using refcount_inc_not_zero() as the waits would still
have to be done, and there would still be the possibility of having to put
the ref in the expiration function.

Note the connection work item is mostly going to go away with the main
event work being transferred to the I/O thread, so the wait in (6) will
become obsolete.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/call_accept.c
net/rxrpc/conn_client.c
net/rxrpc/conn_event.c
net/rxrpc/conn_object.c
net/rxrpc/conn_service.c
net/rxrpc/net_ns.c
net/rxrpc/proc.c

index 4538de0079a5b6174581dcc49b08b9cbbef258f1..44a9be9836f985e3d1642a18f96403905b8462a4 100644 (file)
        EM(rxrpc_conn_get_service_conn,         "GET svc-conn") \
        EM(rxrpc_conn_new_client,               "NEW client  ") \
        EM(rxrpc_conn_new_service,              "NEW service ") \
        EM(rxrpc_conn_get_service_conn,         "GET svc-conn") \
        EM(rxrpc_conn_new_client,               "NEW client  ") \
        EM(rxrpc_conn_new_service,              "NEW service ") \
-       EM(rxrpc_conn_put_already_queued,       "PUT alreadyq") \
        EM(rxrpc_conn_put_call,                 "PUT call    ") \
        EM(rxrpc_conn_put_call_input,           "PUT inp-call") \
        EM(rxrpc_conn_put_conn_input,           "PUT inp-conn") \
        EM(rxrpc_conn_put_call,                 "PUT call    ") \
        EM(rxrpc_conn_put_call_input,           "PUT inp-call") \
        EM(rxrpc_conn_put_conn_input,           "PUT inp-conn") \
        EM(rxrpc_conn_put_local_dead,           "PUT loc-dead") \
        EM(rxrpc_conn_put_noreuse,              "PUT noreuse ") \
        EM(rxrpc_conn_put_poke,                 "PUT poke    ") \
        EM(rxrpc_conn_put_local_dead,           "PUT loc-dead") \
        EM(rxrpc_conn_put_noreuse,              "PUT noreuse ") \
        EM(rxrpc_conn_put_poke,                 "PUT poke    ") \
+       EM(rxrpc_conn_put_service_reaped,       "PUT svc-reap") \
        EM(rxrpc_conn_put_unbundle,             "PUT unbundle") \
        EM(rxrpc_conn_put_unidle,               "PUT unidle  ") \
        EM(rxrpc_conn_put_unbundle,             "PUT unbundle") \
        EM(rxrpc_conn_put_unidle,               "PUT unidle  ") \
-       EM(rxrpc_conn_put_work,                 "PUT work    ") \
-       EM(rxrpc_conn_queue_challenge,          "GQ  chall   ") \
-       EM(rxrpc_conn_queue_retry_work,         "GQ  retry-wk") \
-       EM(rxrpc_conn_queue_rx_work,            "GQ  rx-work ") \
-       EM(rxrpc_conn_queue_timer,              "GQ  timer   ") \
+       EM(rxrpc_conn_queue_challenge,          "QUE chall   ") \
+       EM(rxrpc_conn_queue_retry_work,         "QUE retry-wk") \
+       EM(rxrpc_conn_queue_rx_work,            "QUE rx-work ") \
+       EM(rxrpc_conn_queue_timer,              "QUE timer   ") \
        EM(rxrpc_conn_see_new_service_conn,     "SEE new-svc ") \
        EM(rxrpc_conn_see_reap_service,         "SEE reap-svc") \
        E_(rxrpc_conn_see_work,                 "SEE work    ")
        EM(rxrpc_conn_see_new_service_conn,     "SEE new-svc ") \
        EM(rxrpc_conn_see_reap_service,         "SEE reap-svc") \
        E_(rxrpc_conn_see_work,                 "SEE work    ")
index 03523a864c114b19f18df32b4f2697b3879f0839..41a57c145f2bde9031d542fac8f0e94429d2903b 100644 (file)
@@ -76,7 +76,7 @@ struct rxrpc_net {
        bool                    kill_all_client_conns;
        atomic_t                nr_client_conns;
        spinlock_t              client_conn_cache_lock; /* Lock for ->*_client_conns */
        bool                    kill_all_client_conns;
        atomic_t                nr_client_conns;
        spinlock_t              client_conn_cache_lock; /* Lock for ->*_client_conns */
-       spinlock_t              client_conn_discard_lock; /* Prevent multiple discarders */
+       struct mutex            client_conn_discard_lock; /* Prevent multiple discarders */
        struct list_head        idle_client_conns;
        struct work_struct      client_conn_reaper;
        struct timer_list       client_conn_reap_timer;
        struct list_head        idle_client_conns;
        struct work_struct      client_conn_reaper;
        struct timer_list       client_conn_reap_timer;
@@ -432,9 +432,11 @@ struct rxrpc_connection {
        struct rxrpc_conn_proto proto;
        struct rxrpc_local      *local;         /* Representation of local endpoint */
        struct rxrpc_peer       *peer;          /* Remote endpoint */
        struct rxrpc_conn_proto proto;
        struct rxrpc_local      *local;         /* Representation of local endpoint */
        struct rxrpc_peer       *peer;          /* Remote endpoint */
+       struct rxrpc_net        *rxnet;         /* Network namespace to which call belongs */
        struct key              *key;           /* Security details */
 
        refcount_t              ref;
        struct key              *key;           /* Security details */
 
        refcount_t              ref;
+       atomic_t                active;         /* Active count for service conns */
        struct rcu_head         rcu;
        struct list_head        cache_link;
 
        struct rcu_head         rcu;
        struct list_head        cache_link;
 
@@ -455,6 +457,7 @@ struct rxrpc_connection {
 
        struct timer_list       timer;          /* Conn event timer */
        struct work_struct      processor;      /* connection event processor */
 
        struct timer_list       timer;          /* Conn event timer */
        struct work_struct      processor;      /* connection event processor */
+       struct work_struct      destructor;     /* In-process-context destroyer */
        struct rxrpc_bundle     *bundle;        /* Client connection bundle */
        struct rb_node          service_node;   /* Node in peer->service_conns */
        struct list_head        proc_link;      /* link in procfs list */
        struct rxrpc_bundle     *bundle;        /* Client connection bundle */
        struct rb_node          service_node;   /* Node in peer->service_conns */
        struct list_head        proc_link;      /* link in procfs list */
@@ -897,20 +900,20 @@ void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
 extern unsigned int rxrpc_connection_expiry;
 extern unsigned int rxrpc_closed_conn_expiry;
 
 extern unsigned int rxrpc_connection_expiry;
 extern unsigned int rxrpc_closed_conn_expiry;
 
-struct rxrpc_connection *rxrpc_alloc_connection(gfp_t);
+struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t);
 struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
                                                   struct sk_buff *,
                                                   struct rxrpc_peer **);
 void __rxrpc_disconnect_call(struct rxrpc_connection *, struct rxrpc_call *);
 void rxrpc_disconnect_call(struct rxrpc_call *);
 struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
                                                   struct sk_buff *,
                                                   struct rxrpc_peer **);
 void __rxrpc_disconnect_call(struct rxrpc_connection *, struct rxrpc_call *);
 void rxrpc_disconnect_call(struct rxrpc_call *);
-void rxrpc_kill_connection(struct rxrpc_connection *);
-bool rxrpc_queue_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
+void rxrpc_kill_client_conn(struct rxrpc_connection *);
+void rxrpc_queue_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
 void rxrpc_see_connection(struct rxrpc_connection *, enum rxrpc_conn_trace);
 struct rxrpc_connection *rxrpc_get_connection(struct rxrpc_connection *,
                                              enum rxrpc_conn_trace);
 struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *,
                                                    enum rxrpc_conn_trace);
 void rxrpc_see_connection(struct rxrpc_connection *, enum rxrpc_conn_trace);
 struct rxrpc_connection *rxrpc_get_connection(struct rxrpc_connection *,
                                              enum rxrpc_conn_trace);
 struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *,
                                                    enum rxrpc_conn_trace);
-void rxrpc_put_service_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
+void rxrpc_put_connection(struct rxrpc_connection *, enum rxrpc_conn_trace);
 void rxrpc_service_connection_reaper(struct work_struct *);
 void rxrpc_destroy_all_connections(struct rxrpc_net *);
 
 void rxrpc_service_connection_reaper(struct work_struct *);
 void rxrpc_destroy_all_connections(struct rxrpc_net *);
 
@@ -924,18 +927,6 @@ static inline bool rxrpc_conn_is_service(const struct rxrpc_connection *conn)
        return !rxrpc_conn_is_client(conn);
 }
 
        return !rxrpc_conn_is_client(conn);
 }
 
-static inline void rxrpc_put_connection(struct rxrpc_connection *conn,
-                                       enum rxrpc_conn_trace why)
-{
-       if (!conn)
-               return;
-
-       if (rxrpc_conn_is_client(conn))
-               rxrpc_put_client_conn(conn, why);
-       else
-               rxrpc_put_service_conn(conn, why);
-}
-
 static inline void rxrpc_reduce_conn_timer(struct rxrpc_connection *conn,
                                           unsigned long expire_at)
 {
 static inline void rxrpc_reduce_conn_timer(struct rxrpc_connection *conn,
                                           unsigned long expire_at)
 {
index dd4ca4bee77f6429a43ffbc16f3b27dc8b05b947..8d106b626aa3c9f2abf2359f14a91b42f45d0b5b 100644 (file)
@@ -308,6 +308,7 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
                rxrpc_new_incoming_connection(rx, conn, sec, skb);
        } else {
                rxrpc_get_connection(conn, rxrpc_conn_get_service_conn);
                rxrpc_new_incoming_connection(rx, conn, sec, skb);
        } else {
                rxrpc_get_connection(conn, rxrpc_conn_get_service_conn);
+               atomic_inc(&conn->active);
        }
 
        /* And now we can allocate and set up a new call */
        }
 
        /* And now we can allocate and set up a new call */
index 34ff6fa85c328120d05b70d96fccdf6cab439e4b..9485a3d18f2974fa8d7731c12034a0a3daa99d83 100644 (file)
@@ -51,7 +51,7 @@ static void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle);
 static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
                                          gfp_t gfp)
 {
 static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
                                          gfp_t gfp)
 {
-       struct rxrpc_net *rxnet = conn->local->rxnet;
+       struct rxrpc_net *rxnet = conn->rxnet;
        int id;
 
        _enter("");
        int id;
 
        _enter("");
@@ -179,7 +179,7 @@ rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle, gfp_t gfp)
 
        _enter("");
 
 
        _enter("");
 
-       conn = rxrpc_alloc_connection(gfp);
+       conn = rxrpc_alloc_connection(rxnet, gfp);
        if (!conn) {
                _leave(" = -ENOMEM");
                return ERR_PTR(-ENOMEM);
        if (!conn) {
                _leave(" = -ENOMEM");
                return ERR_PTR(-ENOMEM);
@@ -243,7 +243,7 @@ static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn)
        if (!conn)
                goto dont_reuse;
 
        if (!conn)
                goto dont_reuse;
 
-       rxnet = conn->local->rxnet;
+       rxnet = conn->rxnet;
        if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags))
                goto dont_reuse;
 
        if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags))
                goto dont_reuse;
 
@@ -970,7 +970,7 @@ static void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle)
 /*
  * Clean up a dead client connection.
  */
 /*
  * Clean up a dead client connection.
  */
-static void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
+void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
 {
        struct rxrpc_local *local = conn->local;
        struct rxrpc_net *rxnet = local->rxnet;
 {
        struct rxrpc_local *local = conn->local;
        struct rxrpc_net *rxnet = local->rxnet;
@@ -981,23 +981,6 @@ static void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
        atomic_dec(&rxnet->nr_client_conns);
 
        rxrpc_put_client_connection_id(conn);
        atomic_dec(&rxnet->nr_client_conns);
 
        rxrpc_put_client_connection_id(conn);
-       rxrpc_kill_connection(conn);
-}
-
-/*
- * Clean up a dead client connections.
- */
-void rxrpc_put_client_conn(struct rxrpc_connection *conn,
-                          enum rxrpc_conn_trace why)
-{
-       unsigned int debug_id = conn->debug_id;
-       bool dead;
-       int r;
-
-       dead = __refcount_dec_and_test(&conn->ref, &r);
-       trace_rxrpc_conn(debug_id, r - 1, why);
-       if (dead)
-               rxrpc_kill_client_conn(conn);
 }
 
 /*
 }
 
 /*
@@ -1023,7 +1006,7 @@ void rxrpc_discard_expired_client_conns(struct work_struct *work)
        }
 
        /* Don't double up on the discarding */
        }
 
        /* Don't double up on the discarding */
-       if (!spin_trylock(&rxnet->client_conn_discard_lock)) {
+       if (!mutex_trylock(&rxnet->client_conn_discard_lock)) {
                _leave(" [already]");
                return;
        }
                _leave(" [already]");
                return;
        }
@@ -1061,6 +1044,7 @@ next:
                        goto not_yet_expired;
        }
 
                        goto not_yet_expired;
        }
 
+       atomic_dec(&conn->active);
        trace_rxrpc_client(conn, -1, rxrpc_client_discard);
        list_del_init(&conn->cache_link);
 
        trace_rxrpc_client(conn, -1, rxrpc_client_discard);
        list_del_init(&conn->cache_link);
 
@@ -1087,7 +1071,7 @@ not_yet_expired:
 
 out:
        spin_unlock(&rxnet->client_conn_cache_lock);
 
 out:
        spin_unlock(&rxnet->client_conn_cache_lock);
-       spin_unlock(&rxnet->client_conn_discard_lock);
+       mutex_unlock(&rxnet->client_conn_discard_lock);
        _leave("");
 }
 
        _leave("");
 }
 
@@ -1127,6 +1111,7 @@ void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
        list_for_each_entry_safe(conn, tmp, &rxnet->idle_client_conns,
                                 cache_link) {
                if (conn->local == local) {
        list_for_each_entry_safe(conn, tmp, &rxnet->idle_client_conns,
                                 cache_link) {
                if (conn->local == local) {
+                       atomic_dec(&conn->active);
                        trace_rxrpc_client(conn, -1, rxrpc_client_discard);
                        list_move(&conn->cache_link, &graveyard);
                }
                        trace_rxrpc_client(conn, -1, rxrpc_client_discard);
                        list_move(&conn->cache_link, &graveyard);
                }
index 49d885f73fa5b520d33ebfff725decd3ba2bfee9..23a74e35052d3228fa381962c1bf01b8070c5301 100644 (file)
@@ -478,8 +478,4 @@ void rxrpc_process_connection(struct work_struct *work)
                rxrpc_do_process_connection(conn);
                rxrpc_unuse_local(conn->local, rxrpc_local_unuse_conn_work);
        }
                rxrpc_do_process_connection(conn);
                rxrpc_unuse_local(conn->local, rxrpc_local_unuse_conn_work);
        }
-
-       rxrpc_put_connection(conn, rxrpc_conn_put_work);
-       _leave("");
-       return;
 }
 }
index f7c271a740eddc103db33746184efd95063c3024..c2e05ea29f122e0dcbf0569bd276cc6e73a3de5b 100644 (file)
@@ -19,7 +19,9 @@
 unsigned int __read_mostly rxrpc_connection_expiry = 10 * 60;
 unsigned int __read_mostly rxrpc_closed_conn_expiry = 10;
 
 unsigned int __read_mostly rxrpc_connection_expiry = 10 * 60;
 unsigned int __read_mostly rxrpc_closed_conn_expiry = 10;
 
-static void rxrpc_destroy_connection(struct rcu_head *);
+static void rxrpc_clean_up_connection(struct work_struct *work);
+static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
+                                        unsigned long reap_at);
 
 static void rxrpc_connection_timer(struct timer_list *timer)
 {
 
 static void rxrpc_connection_timer(struct timer_list *timer)
 {
@@ -32,7 +34,8 @@ static void rxrpc_connection_timer(struct timer_list *timer)
 /*
  * allocate a new connection
  */
 /*
  * allocate a new connection
  */
-struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
+struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *rxnet,
+                                               gfp_t gfp)
 {
        struct rxrpc_connection *conn;
 
 {
        struct rxrpc_connection *conn;
 
@@ -42,10 +45,12 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
        if (conn) {
                INIT_LIST_HEAD(&conn->cache_link);
                timer_setup(&conn->timer, &rxrpc_connection_timer, 0);
        if (conn) {
                INIT_LIST_HEAD(&conn->cache_link);
                timer_setup(&conn->timer, &rxrpc_connection_timer, 0);
-               INIT_WORK(&conn->processor, &rxrpc_process_connection);
+               INIT_WORK(&conn->processor, rxrpc_process_connection);
+               INIT_WORK(&conn->destructor, rxrpc_clean_up_connection);
                INIT_LIST_HEAD(&conn->proc_link);
                INIT_LIST_HEAD(&conn->link);
                skb_queue_head_init(&conn->rx_queue);
                INIT_LIST_HEAD(&conn->proc_link);
                INIT_LIST_HEAD(&conn->link);
                skb_queue_head_init(&conn->rx_queue);
+               conn->rxnet = rxnet;
                conn->security = &rxrpc_no_security;
                spin_lock_init(&conn->state_lock);
                conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
                conn->security = &rxrpc_no_security;
                spin_lock_init(&conn->state_lock);
                conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
@@ -224,53 +229,20 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
 
        set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
        conn->idle_timestamp = jiffies;
 
        set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
        conn->idle_timestamp = jiffies;
-}
-
-/*
- * Kill off a connection.
- */
-void rxrpc_kill_connection(struct rxrpc_connection *conn)
-{
-       struct rxrpc_net *rxnet = conn->local->rxnet;
-
-       ASSERT(!rcu_access_pointer(conn->channels[0].call) &&
-              !rcu_access_pointer(conn->channels[1].call) &&
-              !rcu_access_pointer(conn->channels[2].call) &&
-              !rcu_access_pointer(conn->channels[3].call));
-       ASSERT(list_empty(&conn->cache_link));
-
-       write_lock(&rxnet->conn_lock);
-       list_del_init(&conn->proc_link);
-       write_unlock(&rxnet->conn_lock);
-
-       /* Drain the Rx queue.  Note that even though we've unpublished, an
-        * incoming packet could still be being added to our Rx queue, so we
-        * will need to drain it again in the RCU cleanup handler.
-        */
-       rxrpc_purge_queue(&conn->rx_queue);
-
-       /* Leave final destruction to RCU.  The connection processor work item
-        * must carry a ref on the connection to prevent us getting here whilst
-        * it is queued or running.
-        */
-       call_rcu(&conn->rcu, rxrpc_destroy_connection);
+       if (atomic_dec_and_test(&conn->active))
+               rxrpc_set_service_reap_timer(conn->rxnet,
+                                            jiffies + rxrpc_connection_expiry);
 }
 
 /*
  * Queue a connection's work processor, getting a ref to pass to the work
  * queue.
  */
 }
 
 /*
  * Queue a connection's work processor, getting a ref to pass to the work
  * queue.
  */
-bool rxrpc_queue_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
+void rxrpc_queue_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
 {
 {
-       int r;
-
-       if (!__refcount_inc_not_zero(&conn->ref, &r))
-               return false;
-       if (rxrpc_queue_work(&conn->processor))
-               trace_rxrpc_conn(conn->debug_id, why, r + 1);
-       else
-               rxrpc_put_connection(conn, rxrpc_conn_put_already_queued);
-       return true;
+       if (atomic_read(&conn->active) >= 0 &&
+           rxrpc_queue_work(&conn->processor))
+               rxrpc_see_connection(conn, why);
 }
 
 /*
 }
 
 /*
@@ -327,51 +299,96 @@ static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
                timer_reduce(&rxnet->service_conn_reap_timer, reap_at);
 }
 
                timer_reduce(&rxnet->service_conn_reap_timer, reap_at);
 }
 
-/*
- * Release a service connection
- */
-void rxrpc_put_service_conn(struct rxrpc_connection *conn,
-                           enum rxrpc_conn_trace why)
-{
-       unsigned int debug_id = conn->debug_id;
-       int r;
-
-       __refcount_dec(&conn->ref, &r);
-       trace_rxrpc_conn(debug_id, r - 1, why);
-       if (r - 1 == 1)
-               rxrpc_set_service_reap_timer(conn->local->rxnet,
-                                            jiffies + rxrpc_connection_expiry);
-}
-
 /*
  * destroy a virtual connection
  */
 /*
  * destroy a virtual connection
  */
-static void rxrpc_destroy_connection(struct rcu_head *rcu)
+static void rxrpc_rcu_free_connection(struct rcu_head *rcu)
 {
        struct rxrpc_connection *conn =
                container_of(rcu, struct rxrpc_connection, rcu);
 {
        struct rxrpc_connection *conn =
                container_of(rcu, struct rxrpc_connection, rcu);
+       struct rxrpc_net *rxnet = conn->rxnet;
 
        _enter("{%d,u=%d}", conn->debug_id, refcount_read(&conn->ref));
 
        trace_rxrpc_conn(conn->debug_id, refcount_read(&conn->ref),
                         rxrpc_conn_free);
 
        _enter("{%d,u=%d}", conn->debug_id, refcount_read(&conn->ref));
 
        trace_rxrpc_conn(conn->debug_id, refcount_read(&conn->ref),
                         rxrpc_conn_free);
+       kfree(conn);
 
 
-       ASSERTCMP(refcount_read(&conn->ref), ==, 0);
+       if (atomic_dec_and_test(&rxnet->nr_conns))
+               wake_up_var(&rxnet->nr_conns);
+}
+
+/*
+ * Clean up a dead connection.
+ */
+static void rxrpc_clean_up_connection(struct work_struct *work)
+{
+       struct rxrpc_connection *conn =
+               container_of(work, struct rxrpc_connection, destructor);
+       struct rxrpc_net *rxnet = conn->rxnet;
+
+       ASSERT(!rcu_access_pointer(conn->channels[0].call) &&
+              !rcu_access_pointer(conn->channels[1].call) &&
+              !rcu_access_pointer(conn->channels[2].call) &&
+              !rcu_access_pointer(conn->channels[3].call));
+       ASSERT(list_empty(&conn->cache_link));
 
        del_timer_sync(&conn->timer);
 
        del_timer_sync(&conn->timer);
+       cancel_work_sync(&conn->processor); /* Processing may restart the timer */
+       del_timer_sync(&conn->timer);
+
+       write_lock(&rxnet->conn_lock);
+       list_del_init(&conn->proc_link);
+       write_unlock(&rxnet->conn_lock);
+
        rxrpc_purge_queue(&conn->rx_queue);
 
        rxrpc_purge_queue(&conn->rx_queue);
 
+       rxrpc_kill_client_conn(conn);
+
        conn->security->clear(conn);
        key_put(conn->key);
        rxrpc_put_bundle(conn->bundle, rxrpc_bundle_put_conn);
        rxrpc_put_peer(conn->peer, rxrpc_peer_put_conn);
        conn->security->clear(conn);
        key_put(conn->key);
        rxrpc_put_bundle(conn->bundle, rxrpc_bundle_put_conn);
        rxrpc_put_peer(conn->peer, rxrpc_peer_put_conn);
-
-       if (atomic_dec_and_test(&conn->local->rxnet->nr_conns))
-               wake_up_var(&conn->local->rxnet->nr_conns);
        rxrpc_put_local(conn->local, rxrpc_local_put_kill_conn);
 
        rxrpc_put_local(conn->local, rxrpc_local_put_kill_conn);
 
-       kfree(conn);
-       _leave("");
+       /* Drain the Rx queue.  Note that even though we've unpublished, an
+        * incoming packet could still be being added to our Rx queue, so we
+        * will need to drain it again in the RCU cleanup handler.
+        */
+       rxrpc_purge_queue(&conn->rx_queue);
+
+       call_rcu(&conn->rcu, rxrpc_rcu_free_connection);
+}
+
+/*
+ * Drop a ref on a connection.
+ */
+void rxrpc_put_connection(struct rxrpc_connection *conn,
+                         enum rxrpc_conn_trace why)
+{
+       unsigned int debug_id;
+       bool dead;
+       int r;
+
+       if (!conn)
+               return;
+
+       debug_id = conn->debug_id;
+       dead = __refcount_dec_and_test(&conn->ref, &r);
+       trace_rxrpc_conn(debug_id, r - 1, why);
+       if (dead) {
+               del_timer(&conn->timer);
+               cancel_work(&conn->processor);
+
+               if (in_softirq() || work_busy(&conn->processor) ||
+                   timer_pending(&conn->timer))
+                       /* Can't use the rxrpc workqueue as we need to cancel/flush
+                        * something that may be running/waiting there.
+                        */
+                       schedule_work(&conn->destructor);
+               else
+                       rxrpc_clean_up_connection(&conn->destructor);
+       }
 }
 
 /*
 }
 
 /*
@@ -383,6 +400,7 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
        struct rxrpc_net *rxnet =
                container_of(work, struct rxrpc_net, service_conn_reaper);
        unsigned long expire_at, earliest, idle_timestamp, now;
        struct rxrpc_net *rxnet =
                container_of(work, struct rxrpc_net, service_conn_reaper);
        unsigned long expire_at, earliest, idle_timestamp, now;
+       int active;
 
        LIST_HEAD(graveyard);
 
 
        LIST_HEAD(graveyard);
 
@@ -393,8 +411,8 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
 
        write_lock(&rxnet->conn_lock);
        list_for_each_entry_safe(conn, _p, &rxnet->service_conns, link) {
 
        write_lock(&rxnet->conn_lock);
        list_for_each_entry_safe(conn, _p, &rxnet->service_conns, link) {
-               ASSERTCMP(refcount_read(&conn->ref), >, 0);
-               if (likely(refcount_read(&conn->ref) > 1))
+               ASSERTCMP(atomic_read(&conn->active), >=, 0);
+               if (likely(atomic_read(&conn->active) > 0))
                        continue;
                if (conn->state == RXRPC_CONN_SERVICE_PREALLOC)
                        continue;
                        continue;
                if (conn->state == RXRPC_CONN_SERVICE_PREALLOC)
                        continue;
@@ -405,8 +423,8 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
                        if (conn->local->service_closed)
                                expire_at = idle_timestamp + rxrpc_closed_conn_expiry * HZ;
 
                        if (conn->local->service_closed)
                                expire_at = idle_timestamp + rxrpc_closed_conn_expiry * HZ;
 
-                       _debug("reap CONN %d { u=%d,t=%ld }",
-                              conn->debug_id, refcount_read(&conn->ref),
+                       _debug("reap CONN %d { a=%d,t=%ld }",
+                              conn->debug_id, atomic_read(&conn->active),
                               (long)expire_at - (long)now);
 
                        if (time_before(now, expire_at)) {
                               (long)expire_at - (long)now);
 
                        if (time_before(now, expire_at)) {
@@ -416,10 +434,11 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
                        }
                }
 
                        }
                }
 
-               /* The usage count sits at 1 whilst the object is unused on the
-                * list; we reduce that to 0 to make the object unavailable.
+               /* The activity count sits at 0 whilst the conn is unused on
+                * the list; we reduce that to -1 to make the conn unavailable.
                 */
                 */
-               if (!refcount_dec_if_one(&conn->ref))
+               active = 0;
+               if (!atomic_try_cmpxchg(&conn->active, &active, -1))
                        continue;
                rxrpc_see_connection(conn, rxrpc_conn_see_reap_service);
 
                        continue;
                rxrpc_see_connection(conn, rxrpc_conn_see_reap_service);
 
@@ -443,8 +462,8 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
                                  link);
                list_del_init(&conn->link);
 
                                  link);
                list_del_init(&conn->link);
 
-               ASSERTCMP(refcount_read(&conn->ref), ==, 0);
-               rxrpc_kill_connection(conn);
+               ASSERTCMP(atomic_read(&conn->active), ==, -1);
+               rxrpc_put_connection(conn, rxrpc_conn_put_service_reaped);
        }
 
        _leave("");
        }
 
        _leave("");
index 2c44d67b43dcc31896ebfef72899067575e760ad..b5ae7c753fc35e006624c56494c602b21902e3b1 100644 (file)
@@ -125,7 +125,7 @@ replace_old_connection:
 struct rxrpc_connection *rxrpc_prealloc_service_connection(struct rxrpc_net *rxnet,
                                                           gfp_t gfp)
 {
 struct rxrpc_connection *rxrpc_prealloc_service_connection(struct rxrpc_net *rxnet,
                                                           gfp_t gfp)
 {
-       struct rxrpc_connection *conn = rxrpc_alloc_connection(gfp);
+       struct rxrpc_connection *conn = rxrpc_alloc_connection(rxnet, gfp);
 
        if (conn) {
                /* We maintain an extra ref on the connection whilst it is on
 
        if (conn) {
                /* We maintain an extra ref on the connection whilst it is on
@@ -181,6 +181,8 @@ void rxrpc_new_incoming_connection(struct rxrpc_sock *rx,
            conn->service_id == rx->service_upgrade.from)
                conn->service_id = rx->service_upgrade.to;
 
            conn->service_id == rx->service_upgrade.from)
                conn->service_id = rx->service_upgrade.to;
 
+       atomic_set(&conn->active, 1);
+
        /* Make the connection a target for incoming packets. */
        rxrpc_publish_service_conn(conn->peer, conn);
 }
        /* Make the connection a target for incoming packets. */
        rxrpc_publish_service_conn(conn->peer, conn);
 }
index 84242c0e467c129f87b5f323deab369165a2ab4a..5905530e2f33b439173f90b6b6c69beb1464e5f2 100644 (file)
@@ -65,7 +65,7 @@ static __net_init int rxrpc_init_net(struct net *net)
        atomic_set(&rxnet->nr_client_conns, 0);
        rxnet->kill_all_client_conns = false;
        spin_lock_init(&rxnet->client_conn_cache_lock);
        atomic_set(&rxnet->nr_client_conns, 0);
        rxnet->kill_all_client_conns = false;
        spin_lock_init(&rxnet->client_conn_cache_lock);
-       spin_lock_init(&rxnet->client_conn_discard_lock);
+       mutex_init(&rxnet->client_conn_discard_lock);
        INIT_LIST_HEAD(&rxnet->idle_client_conns);
        INIT_WORK(&rxnet->client_conn_reaper,
                  rxrpc_discard_expired_client_conns);
        INIT_LIST_HEAD(&rxnet->idle_client_conns);
        INIT_WORK(&rxnet->client_conn_reaper,
                  rxrpc_discard_expired_client_conns);
index bb2edf6db896ffe5e05d1333260481a19d37229e..d3a6d24cf871394c816ae13c877ddb65723aa4ea 100644 (file)
@@ -159,7 +159,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
                seq_puts(seq,
                         "Proto Local                                          "
                         " Remote                                         "
                seq_puts(seq,
                         "Proto Local                                          "
                         " Remote                                         "
-                        " SvID ConnID   End Use State    Key     "
+                        " SvID ConnID   End Ref Act State    Key     "
                         " Serial   ISerial  CallId0  CallId1  CallId2  CallId3\n"
                         );
                return 0;
                         " Serial   ISerial  CallId0  CallId1  CallId2  CallId3\n"
                         );
                return 0;
@@ -177,7 +177,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
        sprintf(rbuff, "%pISpc", &conn->peer->srx.transport);
 print:
        seq_printf(seq,
        sprintf(rbuff, "%pISpc", &conn->peer->srx.transport);
 print:
        seq_printf(seq,
-                  "UDP   %-47.47s %-47.47s %4x %08x %s %3u"
+                  "UDP   %-47.47s %-47.47s %4x %08x %s %3u %3d"
                   " %s %08x %08x %08x %08x %08x %08x %08x\n",
                   lbuff,
                   rbuff,
                   " %s %08x %08x %08x %08x %08x %08x %08x\n",
                   lbuff,
                   rbuff,
@@ -185,6 +185,7 @@ print:
                   conn->proto.cid,
                   rxrpc_conn_is_service(conn) ? "Svc" : "Clt",
                   refcount_read(&conn->ref),
                   conn->proto.cid,
                   rxrpc_conn_is_service(conn) ? "Svc" : "Clt",
                   refcount_read(&conn->ref),
+                  atomic_read(&conn->active),
                   rxrpc_conn_states[conn->state],
                   key_serial(conn->key),
                   atomic_read(&conn->serial),
                   rxrpc_conn_states[conn->state],
                   key_serial(conn->key),
                   atomic_read(&conn->serial),