EM(rxrpc_call_get_release_sock, "GET rel-sock") \
EM(rxrpc_call_get_sendmsg, "GET sendmsg ") \
EM(rxrpc_call_get_send_ack, "GET send-ack") \
- EM(rxrpc_call_get_timer, "GET timer ") \
EM(rxrpc_call_get_userid, "GET user-id ") \
EM(rxrpc_call_new_client, "NEW client ") \
EM(rxrpc_call_new_prealloc_service, "NEW prealloc") \
- EM(rxrpc_call_put_already_queued, "PUT alreadyq") \
EM(rxrpc_call_put_discard_prealloc, "PUT disc-pre") \
EM(rxrpc_call_put_input, "PUT input ") \
EM(rxrpc_call_put_kernel, "PUT kernel ") \
EM(rxrpc_call_put_release_sock_tba, "PUT rls-sk-a") \
EM(rxrpc_call_put_send_ack, "PUT send-ack") \
EM(rxrpc_call_put_sendmsg, "PUT sendmsg ") \
- EM(rxrpc_call_put_timer, "PUT timer ") \
- EM(rxrpc_call_put_timer_already, "PUT timer-al") \
EM(rxrpc_call_put_unnotify, "PUT unnotify") \
EM(rxrpc_call_put_userid_exists, "PUT u-exists") \
- EM(rxrpc_call_put_work, "PUT work ") \
EM(rxrpc_call_queue_abort, "QUE abort ") \
EM(rxrpc_call_queue_requeue, "QUE requeue ") \
EM(rxrpc_call_queue_resend, "QUE resend ") \
EM(rxrpc_txbuf_put_rotated, "PUT ROTATED") \
EM(rxrpc_txbuf_put_send_aborted, "PUT SEND-X ") \
EM(rxrpc_txbuf_put_trans, "PUT TRANS ") \
+ EM(rxrpc_txbuf_see_out_of_step, "OUT-OF-STEP") \
EM(rxrpc_txbuf_see_send_more, "SEE SEND+ ") \
E_(rxrpc_txbuf_see_unacked, "SEE UNACKED")
if (call->state < RXRPC_CALL_COMPLETE) {
trace_rxrpc_timer_expired(call, jiffies);
- __rxrpc_queue_call(call, rxrpc_call_queue_timer);
- } else {
- rxrpc_put_call(call, rxrpc_call_put_already_queued);
+ rxrpc_queue_call(call, rxrpc_call_queue_timer);
}
}
unsigned long now,
enum rxrpc_timer_trace why)
{
- if (rxrpc_try_get_call(call, rxrpc_call_get_timer)) {
- trace_rxrpc_timer(call, why, now);
- if (timer_reduce(&call->timer, expire_at))
- rxrpc_put_call(call, rxrpc_call_put_timer_already);
- }
-}
-
-void rxrpc_delete_call_timer(struct rxrpc_call *call)
-{
- if (del_timer_sync(&call->timer))
- rxrpc_put_call(call, rxrpc_call_put_timer);
+ trace_rxrpc_timer(call, why, now);
+ timer_reduce(&call->timer, expire_at);
}
static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;
+static void rxrpc_destroy_call(struct work_struct *);
+
/*
* find an extant server call
* - called in process context with IRQs enabled
&rxrpc_call_user_mutex_lock_class_key);
timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
- INIT_WORK(&call->processor, &rxrpc_process_call);
+ INIT_WORK(&call->processor, rxrpc_process_call);
+ INIT_WORK(&call->destroyer, rxrpc_destroy_call);
INIT_LIST_HEAD(&call->link);
INIT_LIST_HEAD(&call->chan_wait_link);
INIT_LIST_HEAD(&call->accept_link);
}
/*
- * Queue a call's work processor, getting a ref to pass to the work queue.
+ * Queue a call's work processor.
*/
-bool rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
+void rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
{
- int n;
-
- if (!__refcount_inc_not_zero(&call->ref, &n))
- return false;
if (rxrpc_queue_work(&call->processor))
- trace_rxrpc_call(call->debug_id, n + 1, 0, why);
- else
- rxrpc_put_call(call, rxrpc_call_put_already_queued);
- return true;
-}
-
-/*
- * Queue a call's work processor, passing the callers ref to the work queue.
- */
-bool __rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
-{
- int n = refcount_read(&call->ref);
-
- ASSERTCMP(n, >=, 1);
- if (rxrpc_queue_work(&call->processor))
- trace_rxrpc_call(call->debug_id, n, 0, why);
- else
- rxrpc_put_call(call, rxrpc_call_put_already_queued);
- return true;
+ trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0, why);
}
/*
BUG();
rxrpc_put_call_slot(call);
- rxrpc_delete_call_timer(call);
+ del_timer_sync(&call->timer);
/* Make sure we don't get any more notifications */
write_lock_bh(&rx->recvmsg_lock);
}
/*
- * Final call destruction - but must be done in process context.
+ * Free up the call under RCU.
*/
-static void rxrpc_destroy_call(struct work_struct *work)
+static void rxrpc_rcu_free_call(struct rcu_head *rcu)
{
- struct rxrpc_call *call = container_of(work, struct rxrpc_call, processor);
- struct rxrpc_net *rxnet = call->rxnet;
-
- rxrpc_delete_call_timer(call);
+ struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
+ struct rxrpc_net *rxnet = READ_ONCE(call->rxnet);
- rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
- rxrpc_put_peer(call->peer, rxrpc_peer_put_call);
kmem_cache_free(rxrpc_call_jar, call);
if (atomic_dec_and_test(&rxnet->nr_calls))
wake_up_var(&rxnet->nr_calls);
}
/*
- * Final call destruction under RCU.
+ * Final call destruction - but must be done in process context.
*/
-static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
+static void rxrpc_destroy_call(struct work_struct *work)
{
- struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
+ struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
+ struct rxrpc_txbuf *txb;
- if (in_softirq()) {
- INIT_WORK(&call->processor, rxrpc_destroy_call);
- if (!rxrpc_queue_work(&call->processor))
- BUG();
- } else {
- rxrpc_destroy_call(&call->processor);
+ del_timer_sync(&call->timer);
+ cancel_work_sync(&call->processor); /* The processor may restart the timer */
+ del_timer_sync(&call->timer);
+
+ rxrpc_cleanup_ring(call);
+ while ((txb = list_first_entry_or_null(&call->tx_buffer,
+ struct rxrpc_txbuf, call_link))) {
+ list_del(&txb->call_link);
+ rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
}
+ rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
+ rxrpc_free_skb(call->acks_soft_tbl, rxrpc_skb_put_ack);
+ rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
+ rxrpc_put_peer(call->peer, rxrpc_peer_put_call);
+ call_rcu(&call->rcu, rxrpc_rcu_free_call);
}
/*
*/
void rxrpc_cleanup_call(struct rxrpc_call *call)
{
- struct rxrpc_txbuf *txb;
-
memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
- rxrpc_cleanup_ring(call);
- while ((txb = list_first_entry_or_null(&call->tx_buffer,
- struct rxrpc_txbuf, call_link))) {
- list_del(&txb->call_link);
- rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
- }
- rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
- rxrpc_free_skb(call->acks_soft_tbl, rxrpc_skb_put_ack);
+ del_timer_sync(&call->timer);
+ cancel_work(&call->processor);
- call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
+ if (in_softirq() || work_busy(&call->processor))
+ /* Can't use the rxrpc workqueue as we need to cancel/flush
+ * something that may be running/waiting there.
+ */
+ schedule_work(&call->destroyer);
+ else
+ rxrpc_destroy_call(&call->destroyer);
}
/*