Currently, rxrpc gives the call timer a ref on the call when it starts it
and this is passed along to the workqueue by the timer expiration function.
The problem comes when queue_work() fails (ie. the work item is already
queued): the timer routine must put the ref - but this may cause the
cleanup code to run.
This has the unfortunate effect that the cleanup code may then be run in
softirq context - which means that any spinlocks it might need to touch
have to be guarded to disable softirqs (ie. they need a "_bh" suffix).
Fix this by:
(1) Don't give a ref to the timer.
(2) Making the expiration function not do anything if the refcount is 0.
Note that this is more of an optimisation.
(3) Make sure that the cleanup routine waits for timer to complete.
However, this has a consequence that timer cannot give a ref to the work
item. Therefore the following fixes are also necessary:
(4) Don't give a ref to the work item.
(5) Make the work item return asap if it sees the ref count is 0.
(6) Make sure that the cleanup routine waits for the work item to
complete.
Unfortunately, neither the timer nor the work item can simply get around
the problem by just using refcount_inc_not_zero() as the waits would still
have to be done, and there would still be the possibility of having to put
the ref in the expiration function.
Note the call work item is going to go away with the work being transferred
to the I/O thread, so the wait in (6) will become obsolete.
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
EM(rxrpc_call_get_release_sock, "GET rel-sock") \
EM(rxrpc_call_get_sendmsg, "GET sendmsg ") \
EM(rxrpc_call_get_send_ack, "GET send-ack") \
EM(rxrpc_call_get_release_sock, "GET rel-sock") \
EM(rxrpc_call_get_sendmsg, "GET sendmsg ") \
EM(rxrpc_call_get_send_ack, "GET send-ack") \
- EM(rxrpc_call_get_timer, "GET timer ") \
EM(rxrpc_call_get_userid, "GET user-id ") \
EM(rxrpc_call_new_client, "NEW client ") \
EM(rxrpc_call_new_prealloc_service, "NEW prealloc") \
EM(rxrpc_call_get_userid, "GET user-id ") \
EM(rxrpc_call_new_client, "NEW client ") \
EM(rxrpc_call_new_prealloc_service, "NEW prealloc") \
- EM(rxrpc_call_put_already_queued, "PUT alreadyq") \
EM(rxrpc_call_put_discard_prealloc, "PUT disc-pre") \
EM(rxrpc_call_put_input, "PUT input ") \
EM(rxrpc_call_put_kernel, "PUT kernel ") \
EM(rxrpc_call_put_discard_prealloc, "PUT disc-pre") \
EM(rxrpc_call_put_input, "PUT input ") \
EM(rxrpc_call_put_kernel, "PUT kernel ") \
EM(rxrpc_call_put_release_sock_tba, "PUT rls-sk-a") \
EM(rxrpc_call_put_send_ack, "PUT send-ack") \
EM(rxrpc_call_put_sendmsg, "PUT sendmsg ") \
EM(rxrpc_call_put_release_sock_tba, "PUT rls-sk-a") \
EM(rxrpc_call_put_send_ack, "PUT send-ack") \
EM(rxrpc_call_put_sendmsg, "PUT sendmsg ") \
- EM(rxrpc_call_put_timer, "PUT timer ") \
- EM(rxrpc_call_put_timer_already, "PUT timer-al") \
EM(rxrpc_call_put_unnotify, "PUT unnotify") \
EM(rxrpc_call_put_userid_exists, "PUT u-exists") \
EM(rxrpc_call_put_unnotify, "PUT unnotify") \
EM(rxrpc_call_put_userid_exists, "PUT u-exists") \
- EM(rxrpc_call_put_work, "PUT work ") \
EM(rxrpc_call_queue_abort, "QUE abort ") \
EM(rxrpc_call_queue_requeue, "QUE requeue ") \
EM(rxrpc_call_queue_resend, "QUE resend ") \
EM(rxrpc_call_queue_abort, "QUE abort ") \
EM(rxrpc_call_queue_requeue, "QUE requeue ") \
EM(rxrpc_call_queue_resend, "QUE resend ") \
EM(rxrpc_txbuf_put_rotated, "PUT ROTATED") \
EM(rxrpc_txbuf_put_send_aborted, "PUT SEND-X ") \
EM(rxrpc_txbuf_put_trans, "PUT TRANS ") \
EM(rxrpc_txbuf_put_rotated, "PUT ROTATED") \
EM(rxrpc_txbuf_put_send_aborted, "PUT SEND-X ") \
EM(rxrpc_txbuf_put_trans, "PUT TRANS ") \
+ EM(rxrpc_txbuf_see_out_of_step, "OUT-OF-STEP") \
EM(rxrpc_txbuf_see_send_more, "SEE SEND+ ") \
E_(rxrpc_txbuf_see_unacked, "SEE UNACKED")
EM(rxrpc_txbuf_see_send_more, "SEE SEND+ ") \
E_(rxrpc_txbuf_see_unacked, "SEE UNACKED")
u32 next_req_timo; /* Timeout for next Rx request packet (jif) */
struct timer_list timer; /* Combined event timer */
struct work_struct processor; /* Event processor */
u32 next_req_timo; /* Timeout for next Rx request packet (jif) */
struct timer_list timer; /* Combined event timer */
struct work_struct processor; /* Event processor */
+ struct work_struct destroyer; /* In-process-context destroyer */
rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */
struct list_head link; /* link in master call list */
struct list_head chan_wait_link; /* Link in conn->bundle->waiting_calls */
rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */
struct list_head link; /* link in master call list */
struct list_head chan_wait_link; /* Link in conn->bundle->waiting_calls */
unsigned long now,
enum rxrpc_timer_trace why);
unsigned long now,
enum rxrpc_timer_trace why);
-void rxrpc_delete_call_timer(struct rxrpc_call *call);
-
struct sk_buff *);
void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
struct sk_buff *);
void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
-bool __rxrpc_queue_call(struct rxrpc_call *, enum rxrpc_call_trace);
-bool rxrpc_queue_call(struct rxrpc_call *, enum rxrpc_call_trace);
+void rxrpc_queue_call(struct rxrpc_call *, enum rxrpc_call_trace);
void rxrpc_see_call(struct rxrpc_call *, enum rxrpc_call_trace);
bool rxrpc_try_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
void rxrpc_see_call(struct rxrpc_call *, enum rxrpc_call_trace);
bool rxrpc_try_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
rxrpc_shrink_call_tx_buffer(call);
if (call->state == RXRPC_CALL_COMPLETE) {
rxrpc_shrink_call_tx_buffer(call);
if (call->state == RXRPC_CALL_COMPLETE) {
- rxrpc_delete_call_timer(call);
- goto out_put;
+ del_timer_sync(&call->timer);
+ goto out;
}
/* Work out if any timeouts tripped */
}
/* Work out if any timeouts tripped */
rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
/* other events may have been raised since we started checking */
rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
/* other events may have been raised since we started checking */
- if (call->events && call->state < RXRPC_CALL_COMPLETE)
-out_put:
- rxrpc_put_call(call, rxrpc_call_put_work);
out:
_leave("");
return;
requeue:
out:
_leave("");
return;
requeue:
- __rxrpc_queue_call(call, rxrpc_call_queue_requeue);
+ if (call->state < RXRPC_CALL_COMPLETE)
+ rxrpc_queue_call(call, rxrpc_call_queue_requeue);
if (call->state < RXRPC_CALL_COMPLETE) {
trace_rxrpc_timer_expired(call, jiffies);
if (call->state < RXRPC_CALL_COMPLETE) {
trace_rxrpc_timer_expired(call, jiffies);
- __rxrpc_queue_call(call, rxrpc_call_queue_timer);
- } else {
- rxrpc_put_call(call, rxrpc_call_put_already_queued);
+ rxrpc_queue_call(call, rxrpc_call_queue_timer);
unsigned long now,
enum rxrpc_timer_trace why)
{
unsigned long now,
enum rxrpc_timer_trace why)
{
- if (rxrpc_try_get_call(call, rxrpc_call_get_timer)) {
- trace_rxrpc_timer(call, why, now);
- if (timer_reduce(&call->timer, expire_at))
- rxrpc_put_call(call, rxrpc_call_put_timer_already);
- }
-}
-
-void rxrpc_delete_call_timer(struct rxrpc_call *call)
-{
- if (del_timer_sync(&call->timer))
- rxrpc_put_call(call, rxrpc_call_put_timer);
+ trace_rxrpc_timer(call, why, now);
+ timer_reduce(&call->timer, expire_at);
}
static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;
}
static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;
+static void rxrpc_destroy_call(struct work_struct *);
+
/*
* find an extant server call
* - called in process context with IRQs enabled
/*
* find an extant server call
* - called in process context with IRQs enabled
&rxrpc_call_user_mutex_lock_class_key);
timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
&rxrpc_call_user_mutex_lock_class_key);
timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
- INIT_WORK(&call->processor, &rxrpc_process_call);
+ INIT_WORK(&call->processor, rxrpc_process_call);
+ INIT_WORK(&call->destroyer, rxrpc_destroy_call);
INIT_LIST_HEAD(&call->link);
INIT_LIST_HEAD(&call->chan_wait_link);
INIT_LIST_HEAD(&call->accept_link);
INIT_LIST_HEAD(&call->link);
INIT_LIST_HEAD(&call->chan_wait_link);
INIT_LIST_HEAD(&call->accept_link);
- * Queue a call's work processor, getting a ref to pass to the work queue.
+ * Queue a call's work processor.
-bool rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
+void rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
- int n;
-
- if (!__refcount_inc_not_zero(&call->ref, &n))
- return false;
if (rxrpc_queue_work(&call->processor))
if (rxrpc_queue_work(&call->processor))
- trace_rxrpc_call(call->debug_id, n + 1, 0, why);
- else
- rxrpc_put_call(call, rxrpc_call_put_already_queued);
- return true;
-}
-
-/*
- * Queue a call's work processor, passing the callers ref to the work queue.
- */
-bool __rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
-{
- int n = refcount_read(&call->ref);
-
- ASSERTCMP(n, >=, 1);
- if (rxrpc_queue_work(&call->processor))
- trace_rxrpc_call(call->debug_id, n, 0, why);
- else
- rxrpc_put_call(call, rxrpc_call_put_already_queued);
- return true;
+ trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0, why);
BUG();
rxrpc_put_call_slot(call);
BUG();
rxrpc_put_call_slot(call);
- rxrpc_delete_call_timer(call);
+ del_timer_sync(&call->timer);
/* Make sure we don't get any more notifications */
write_lock_bh(&rx->recvmsg_lock);
/* Make sure we don't get any more notifications */
write_lock_bh(&rx->recvmsg_lock);
- * Final call destruction - but must be done in process context.
+ * Free up the call under RCU.
-static void rxrpc_destroy_call(struct work_struct *work)
+static void rxrpc_rcu_free_call(struct rcu_head *rcu)
- struct rxrpc_call *call = container_of(work, struct rxrpc_call, processor);
- struct rxrpc_net *rxnet = call->rxnet;
-
- rxrpc_delete_call_timer(call);
+ struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
+ struct rxrpc_net *rxnet = READ_ONCE(call->rxnet);
- rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
- rxrpc_put_peer(call->peer, rxrpc_peer_put_call);
kmem_cache_free(rxrpc_call_jar, call);
if (atomic_dec_and_test(&rxnet->nr_calls))
wake_up_var(&rxnet->nr_calls);
}
/*
kmem_cache_free(rxrpc_call_jar, call);
if (atomic_dec_and_test(&rxnet->nr_calls))
wake_up_var(&rxnet->nr_calls);
}
/*
- * Final call destruction under RCU.
+ * Final call destruction - but must be done in process context.
-static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
+static void rxrpc_destroy_call(struct work_struct *work)
- struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
+ struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
+ struct rxrpc_txbuf *txb;
- if (in_softirq()) {
- INIT_WORK(&call->processor, rxrpc_destroy_call);
- if (!rxrpc_queue_work(&call->processor))
- BUG();
- } else {
- rxrpc_destroy_call(&call->processor);
+ del_timer_sync(&call->timer);
+ cancel_work_sync(&call->processor); /* The processor may restart the timer */
+ del_timer_sync(&call->timer);
+
+ rxrpc_cleanup_ring(call);
+ while ((txb = list_first_entry_or_null(&call->tx_buffer,
+ struct rxrpc_txbuf, call_link))) {
+ list_del(&txb->call_link);
+ rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
+ rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
+ rxrpc_free_skb(call->acks_soft_tbl, rxrpc_skb_put_ack);
+ rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
+ rxrpc_put_peer(call->peer, rxrpc_peer_put_call);
+ call_rcu(&call->rcu, rxrpc_rcu_free_call);
*/
void rxrpc_cleanup_call(struct rxrpc_call *call)
{
*/
void rxrpc_cleanup_call(struct rxrpc_call *call)
{
- struct rxrpc_txbuf *txb;
-
memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
- rxrpc_cleanup_ring(call);
- while ((txb = list_first_entry_or_null(&call->tx_buffer,
- struct rxrpc_txbuf, call_link))) {
- list_del(&txb->call_link);
- rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
- }
- rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
- rxrpc_free_skb(call->acks_soft_tbl, rxrpc_skb_put_ack);
+ del_timer_sync(&call->timer);
+ cancel_work(&call->processor);
- call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
+ if (in_softirq() || work_busy(&call->processor))
+ /* Can't use the rxrpc workqueue as we need to cancel/flush
+ * something that may be running/waiting there.
+ */
+ schedule_work(&call->destroyer);
+ else
+ rxrpc_destroy_call(&call->destroyer);
if (before(hard_ack, txb->seq))
break;
if (before(hard_ack, txb->seq))
break;
+ if (txb->seq != call->tx_bottom + 1)
+ rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
call->tx_bottom++;
list_del_rcu(&txb->call_link);
ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
call->tx_bottom++;
list_del_rcu(&txb->call_link);