rxrpc: Don't hold a ref for call timer or workqueue
authorDavid Howells <dhowells@redhat.com>
Fri, 25 Nov 2022 09:00:55 +0000 (09:00 +0000)
committerDavid Howells <dhowells@redhat.com>
Thu, 1 Dec 2022 13:36:39 +0000 (13:36 +0000)
Currently, rxrpc gives the call timer a ref on the call when it starts it
and this is passed along to the workqueue by the timer expiration function.
The problem comes when queue_work() fails (ie. the work item is already
queued): the timer routine must put the ref - but this may cause the
cleanup code to run.

This has the unfortunate effect that the cleanup code may then be run in
softirq context - which means that any spinlocks it might need to touch
have to be guarded to disable softirqs (ie. they need a "_bh" suffix).

Fix this by:

 (1) Don't give a ref to the timer.

 (2) Making the expiration function not do anything if the refcount is 0.
     Note that this is more of an optimisation.

 (3) Make sure that the cleanup routine waits for timer to complete.

However, this has a consequence that timer cannot give a ref to the work
item.  Therefore the following fixes are also necessary:

 (4) Don't give a ref to the work item.

 (5) Make the work item return asap if it sees the ref count is 0.

 (6) Make sure that the cleanup routine waits for the work item to
     complete.

Unfortunately, neither the timer nor the work item can simply get around
the problem by just using refcount_inc_not_zero() as the waits would still
have to be done, and there would still be the possibility of having to put
the ref in the expiration function.

Note the call work item is going to go away with the work being transferred
to the I/O thread, so the wait in (6) will become obsolete.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/call_event.c
net/rxrpc/call_object.c
net/rxrpc/txbuf.c

index 5a2292baffc8a36871f2664dfb71faddd1f9a16b..4538de0079a5b6174581dcc49b08b9cbbef258f1 100644 (file)
        EM(rxrpc_call_get_release_sock,         "GET rel-sock") \
        EM(rxrpc_call_get_sendmsg,              "GET sendmsg ") \
        EM(rxrpc_call_get_send_ack,             "GET send-ack") \
        EM(rxrpc_call_get_release_sock,         "GET rel-sock") \
        EM(rxrpc_call_get_sendmsg,              "GET sendmsg ") \
        EM(rxrpc_call_get_send_ack,             "GET send-ack") \
-       EM(rxrpc_call_get_timer,                "GET timer   ") \
        EM(rxrpc_call_get_userid,               "GET user-id ") \
        EM(rxrpc_call_new_client,               "NEW client  ") \
        EM(rxrpc_call_new_prealloc_service,     "NEW prealloc") \
        EM(rxrpc_call_get_userid,               "GET user-id ") \
        EM(rxrpc_call_new_client,               "NEW client  ") \
        EM(rxrpc_call_new_prealloc_service,     "NEW prealloc") \
-       EM(rxrpc_call_put_already_queued,       "PUT alreadyq") \
        EM(rxrpc_call_put_discard_prealloc,     "PUT disc-pre") \
        EM(rxrpc_call_put_input,                "PUT input   ") \
        EM(rxrpc_call_put_kernel,               "PUT kernel  ") \
        EM(rxrpc_call_put_discard_prealloc,     "PUT disc-pre") \
        EM(rxrpc_call_put_input,                "PUT input   ") \
        EM(rxrpc_call_put_kernel,               "PUT kernel  ") \
        EM(rxrpc_call_put_release_sock_tba,     "PUT rls-sk-a") \
        EM(rxrpc_call_put_send_ack,             "PUT send-ack") \
        EM(rxrpc_call_put_sendmsg,              "PUT sendmsg ") \
        EM(rxrpc_call_put_release_sock_tba,     "PUT rls-sk-a") \
        EM(rxrpc_call_put_send_ack,             "PUT send-ack") \
        EM(rxrpc_call_put_sendmsg,              "PUT sendmsg ") \
-       EM(rxrpc_call_put_timer,                "PUT timer   ") \
-       EM(rxrpc_call_put_timer_already,        "PUT timer-al") \
        EM(rxrpc_call_put_unnotify,             "PUT unnotify") \
        EM(rxrpc_call_put_userid_exists,        "PUT u-exists") \
        EM(rxrpc_call_put_unnotify,             "PUT unnotify") \
        EM(rxrpc_call_put_userid_exists,        "PUT u-exists") \
-       EM(rxrpc_call_put_work,                 "PUT work    ") \
        EM(rxrpc_call_queue_abort,              "QUE abort   ") \
        EM(rxrpc_call_queue_requeue,            "QUE requeue ") \
        EM(rxrpc_call_queue_resend,             "QUE resend  ") \
        EM(rxrpc_call_queue_abort,              "QUE abort   ") \
        EM(rxrpc_call_queue_requeue,            "QUE requeue ") \
        EM(rxrpc_call_queue_resend,             "QUE resend  ") \
        EM(rxrpc_txbuf_put_rotated,             "PUT ROTATED")  \
        EM(rxrpc_txbuf_put_send_aborted,        "PUT SEND-X ")  \
        EM(rxrpc_txbuf_put_trans,               "PUT TRANS  ")  \
        EM(rxrpc_txbuf_put_rotated,             "PUT ROTATED")  \
        EM(rxrpc_txbuf_put_send_aborted,        "PUT SEND-X ")  \
        EM(rxrpc_txbuf_put_trans,               "PUT TRANS  ")  \
+       EM(rxrpc_txbuf_see_out_of_step,         "OUT-OF-STEP")  \
        EM(rxrpc_txbuf_see_send_more,           "SEE SEND+  ")  \
        E_(rxrpc_txbuf_see_unacked,             "SEE UNACKED")
 
        EM(rxrpc_txbuf_see_send_more,           "SEE SEND+  ")  \
        E_(rxrpc_txbuf_see_unacked,             "SEE UNACKED")
 
index c588c0e81f637df68d1a9dc87bbfb7529a33bcbf..03523a864c114b19f18df32b4f2697b3879f0839 100644 (file)
@@ -598,6 +598,7 @@ struct rxrpc_call {
        u32                     next_req_timo;  /* Timeout for next Rx request packet (jif) */
        struct timer_list       timer;          /* Combined event timer */
        struct work_struct      processor;      /* Event processor */
        u32                     next_req_timo;  /* Timeout for next Rx request packet (jif) */
        struct timer_list       timer;          /* Combined event timer */
        struct work_struct      processor;      /* Event processor */
+       struct work_struct      destroyer;      /* In-process-context destroyer */
        rxrpc_notify_rx_t       notify_rx;      /* kernel service Rx notification function */
        struct list_head        link;           /* link in master call list */
        struct list_head        chan_wait_link; /* Link in conn->bundle->waiting_calls */
        rxrpc_notify_rx_t       notify_rx;      /* kernel service Rx notification function */
        struct list_head        link;           /* link in master call list */
        struct list_head        chan_wait_link; /* Link in conn->bundle->waiting_calls */
@@ -827,8 +828,6 @@ void rxrpc_reduce_call_timer(struct rxrpc_call *call,
                             unsigned long now,
                             enum rxrpc_timer_trace why);
 
                             unsigned long now,
                             enum rxrpc_timer_trace why);
 
-void rxrpc_delete_call_timer(struct rxrpc_call *call);
-
 /*
  * call_object.c
  */
 /*
  * call_object.c
  */
@@ -847,8 +846,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *,
                         struct sk_buff *);
 void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
 void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
                         struct sk_buff *);
 void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
 void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
-bool __rxrpc_queue_call(struct rxrpc_call *, enum rxrpc_call_trace);
-bool rxrpc_queue_call(struct rxrpc_call *, enum rxrpc_call_trace);
+void rxrpc_queue_call(struct rxrpc_call *, enum rxrpc_call_trace);
 void rxrpc_see_call(struct rxrpc_call *, enum rxrpc_call_trace);
 bool rxrpc_try_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
 void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
 void rxrpc_see_call(struct rxrpc_call *, enum rxrpc_call_trace);
 bool rxrpc_try_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
 void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
index 29ca02e53c477da07cf85b0b5ded1c90a6aadbb3..049b92b1c0406415297a302598b0c3f37e46ab0b 100644 (file)
@@ -323,8 +323,8 @@ recheck_state:
                rxrpc_shrink_call_tx_buffer(call);
 
        if (call->state == RXRPC_CALL_COMPLETE) {
                rxrpc_shrink_call_tx_buffer(call);
 
        if (call->state == RXRPC_CALL_COMPLETE) {
-               rxrpc_delete_call_timer(call);
-               goto out_put;
+               del_timer_sync(&call->timer);
+               goto out;
        }
 
        /* Work out if any timeouts tripped */
        }
 
        /* Work out if any timeouts tripped */
@@ -432,16 +432,15 @@ recheck_state:
        rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
 
        /* other events may have been raised since we started checking */
        rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
 
        /* other events may have been raised since we started checking */
-       if (call->events && call->state < RXRPC_CALL_COMPLETE)
+       if (call->events)
                goto requeue;
 
                goto requeue;
 
-out_put:
-       rxrpc_put_call(call, rxrpc_call_put_work);
 out:
        _leave("");
        return;
 
 requeue:
 out:
        _leave("");
        return;
 
 requeue:
-       __rxrpc_queue_call(call, rxrpc_call_queue_requeue);
+       if (call->state < RXRPC_CALL_COMPLETE)
+               rxrpc_queue_call(call, rxrpc_call_queue_requeue);
        goto out;
 }
        goto out;
 }
index 815209673115431eef74bf59fd8d85be82639361..9cd7e0190ef4c35c764a852d727bcdf9879b54b8 100644 (file)
@@ -53,9 +53,7 @@ static void rxrpc_call_timer_expired(struct timer_list *t)
 
        if (call->state < RXRPC_CALL_COMPLETE) {
                trace_rxrpc_timer_expired(call, jiffies);
 
        if (call->state < RXRPC_CALL_COMPLETE) {
                trace_rxrpc_timer_expired(call, jiffies);
-               __rxrpc_queue_call(call, rxrpc_call_queue_timer);
-       } else {
-               rxrpc_put_call(call, rxrpc_call_put_already_queued);
+               rxrpc_queue_call(call, rxrpc_call_queue_timer);
        }
 }
 
        }
 }
 
@@ -64,21 +62,14 @@ void rxrpc_reduce_call_timer(struct rxrpc_call *call,
                             unsigned long now,
                             enum rxrpc_timer_trace why)
 {
                             unsigned long now,
                             enum rxrpc_timer_trace why)
 {
-       if (rxrpc_try_get_call(call, rxrpc_call_get_timer)) {
-               trace_rxrpc_timer(call, why, now);
-               if (timer_reduce(&call->timer, expire_at))
-                       rxrpc_put_call(call, rxrpc_call_put_timer_already);
-       }
-}
-
-void rxrpc_delete_call_timer(struct rxrpc_call *call)
-{
-       if (del_timer_sync(&call->timer))
-               rxrpc_put_call(call, rxrpc_call_put_timer);
+       trace_rxrpc_timer(call, why, now);
+       timer_reduce(&call->timer, expire_at);
 }
 
 static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;
 
 }
 
 static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;
 
+static void rxrpc_destroy_call(struct work_struct *);
+
 /*
  * find an extant server call
  * - called in process context with IRQs enabled
 /*
  * find an extant server call
  * - called in process context with IRQs enabled
@@ -139,7 +130,8 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
                                  &rxrpc_call_user_mutex_lock_class_key);
 
        timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
                                  &rxrpc_call_user_mutex_lock_class_key);
 
        timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
-       INIT_WORK(&call->processor, &rxrpc_process_call);
+       INIT_WORK(&call->processor, rxrpc_process_call);
+       INIT_WORK(&call->destroyer, rxrpc_destroy_call);
        INIT_LIST_HEAD(&call->link);
        INIT_LIST_HEAD(&call->chan_wait_link);
        INIT_LIST_HEAD(&call->accept_link);
        INIT_LIST_HEAD(&call->link);
        INIT_LIST_HEAD(&call->chan_wait_link);
        INIT_LIST_HEAD(&call->accept_link);
@@ -423,34 +415,12 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
 }
 
 /*
 }
 
 /*
- * Queue a call's work processor, getting a ref to pass to the work queue.
+ * Queue a call's work processor.
  */
  */
-bool rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
+void rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 {
 {
-       int n;
-
-       if (!__refcount_inc_not_zero(&call->ref, &n))
-               return false;
        if (rxrpc_queue_work(&call->processor))
        if (rxrpc_queue_work(&call->processor))
-               trace_rxrpc_call(call->debug_id, n + 1, 0, why);
-       else
-               rxrpc_put_call(call, rxrpc_call_put_already_queued);
-       return true;
-}
-
-/*
- * Queue a call's work processor, passing the callers ref to the work queue.
- */
-bool __rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
-{
-       int n = refcount_read(&call->ref);
-
-       ASSERTCMP(n, >=, 1);
-       if (rxrpc_queue_work(&call->processor))
-               trace_rxrpc_call(call->debug_id, n, 0, why);
-       else
-               rxrpc_put_call(call, rxrpc_call_put_already_queued);
-       return true;
+               trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0, why);
 }
 
 /*
 }
 
 /*
@@ -514,7 +484,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
                BUG();
 
        rxrpc_put_call_slot(call);
                BUG();
 
        rxrpc_put_call_slot(call);
-       rxrpc_delete_call_timer(call);
+       del_timer_sync(&call->timer);
 
        /* Make sure we don't get any more notifications */
        write_lock_bh(&rx->recvmsg_lock);
 
        /* Make sure we don't get any more notifications */
        write_lock_bh(&rx->recvmsg_lock);
@@ -612,36 +582,41 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 }
 
 /*
 }
 
 /*
- * Final call destruction - but must be done in process context.
+ * Free up the call under RCU.
  */
  */
-static void rxrpc_destroy_call(struct work_struct *work)
+static void rxrpc_rcu_free_call(struct rcu_head *rcu)
 {
 {
-       struct rxrpc_call *call = container_of(work, struct rxrpc_call, processor);
-       struct rxrpc_net *rxnet = call->rxnet;
-
-       rxrpc_delete_call_timer(call);
+       struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
+       struct rxrpc_net *rxnet = READ_ONCE(call->rxnet);
 
 
-       rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
-       rxrpc_put_peer(call->peer, rxrpc_peer_put_call);
        kmem_cache_free(rxrpc_call_jar, call);
        if (atomic_dec_and_test(&rxnet->nr_calls))
                wake_up_var(&rxnet->nr_calls);
 }
 
 /*
        kmem_cache_free(rxrpc_call_jar, call);
        if (atomic_dec_and_test(&rxnet->nr_calls))
                wake_up_var(&rxnet->nr_calls);
 }
 
 /*
- * Final call destruction under RCU.
+ * Final call destruction - but must be done in process context.
  */
  */
-static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
+static void rxrpc_destroy_call(struct work_struct *work)
 {
 {
-       struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
+       struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
+       struct rxrpc_txbuf *txb;
 
 
-       if (in_softirq()) {
-               INIT_WORK(&call->processor, rxrpc_destroy_call);
-               if (!rxrpc_queue_work(&call->processor))
-                       BUG();
-       } else {
-               rxrpc_destroy_call(&call->processor);
+       del_timer_sync(&call->timer);
+       cancel_work_sync(&call->processor); /* The processor may restart the timer */
+       del_timer_sync(&call->timer);
+
+       rxrpc_cleanup_ring(call);
+       while ((txb = list_first_entry_or_null(&call->tx_buffer,
+                                              struct rxrpc_txbuf, call_link))) {
+               list_del(&txb->call_link);
+               rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
        }
        }
+       rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
+       rxrpc_free_skb(call->acks_soft_tbl, rxrpc_skb_put_ack);
+       rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
+       rxrpc_put_peer(call->peer, rxrpc_peer_put_call);
+       call_rcu(&call->rcu, rxrpc_rcu_free_call);
 }
 
 /*
 }
 
 /*
@@ -649,23 +624,21 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
  */
 void rxrpc_cleanup_call(struct rxrpc_call *call)
 {
  */
 void rxrpc_cleanup_call(struct rxrpc_call *call)
 {
-       struct rxrpc_txbuf *txb;
-
        memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
 
        ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
        ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
 
        memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
 
        ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
        ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
 
-       rxrpc_cleanup_ring(call);
-       while ((txb = list_first_entry_or_null(&call->tx_buffer,
-                                              struct rxrpc_txbuf, call_link))) {
-               list_del(&txb->call_link);
-               rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
-       }
-       rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
-       rxrpc_free_skb(call->acks_soft_tbl, rxrpc_skb_put_ack);
+       del_timer_sync(&call->timer);
+       cancel_work(&call->processor);
 
 
-       call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
+       if (in_softirq() || work_busy(&call->processor))
+               /* Can't use the rxrpc workqueue as we need to cancel/flush
+                * something that may be running/waiting there.
+                */
+               schedule_work(&call->destroyer);
+       else
+               rxrpc_destroy_call(&call->destroyer);
 }
 
 /*
 }
 
 /*
index 96bfee89927bcc2dfd2a287d2dbf0fc953214d6b..f93dc666a3a02391aae5970642cef61f618ba9b6 100644 (file)
@@ -120,6 +120,8 @@ void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
                if (before(hard_ack, txb->seq))
                        break;
 
                if (before(hard_ack, txb->seq))
                        break;
 
+               if (txb->seq != call->tx_bottom + 1)
+                       rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
                ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
                call->tx_bottom++;
                list_del_rcu(&txb->call_link);
                ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
                call->tx_bottom++;
                list_del_rcu(&txb->call_link);