rxrpc: Only set/transmit aborts in the I/O thread
authorDavid Howells <dhowells@redhat.com>
Wed, 12 Oct 2022 21:17:56 +0000 (22:17 +0100)
committerDavid Howells <dhowells@redhat.com>
Fri, 6 Jan 2023 09:43:31 +0000 (09:43 +0000)
Only set the abort call completion state in the I/O thread and only
transmit ABORT packets from there.  rxrpc_abort_call() can then be made to
actually send the packet.

Further, ABORT packets should only be sent if the call has been exposed to
the network (ie. at least one attempted DATA transmission has occurred for
it).

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/call_event.c
net/rxrpc/call_object.c
net/rxrpc/input.c
net/rxrpc/recvmsg.c
net/rxrpc/sendmsg.c

index b526d982da7eaf6dc54594914026f021d9b78de0..c44cc01de7501b5106373ea62aef3314a5f9624a 100644 (file)
@@ -17,6 +17,7 @@
  * Declare tracing information enums and their string mappings for display.
  */
 #define rxrpc_call_poke_traces \
  * Declare tracing information enums and their string mappings for display.
  */
 #define rxrpc_call_poke_traces \
+       EM(rxrpc_call_poke_abort,               "Abort")        \
        EM(rxrpc_call_poke_error,               "Error")        \
        EM(rxrpc_call_poke_idle,                "Idle")         \
        EM(rxrpc_call_poke_start,               "Start")        \
        EM(rxrpc_call_poke_error,               "Error")        \
        EM(rxrpc_call_poke_idle,                "Idle")         \
        EM(rxrpc_call_poke_start,               "Start")        \
index f3b8806e72411b6f41ceba3cb8afa737caae4df7..0cf28a56aec5d6063ff753b6ac6834d7b7419351 100644 (file)
@@ -625,7 +625,10 @@ struct rxrpc_call {
        unsigned long           events;
        spinlock_t              notify_lock;    /* Kernel notification lock */
        rwlock_t                state_lock;     /* lock for state transition */
        unsigned long           events;
        spinlock_t              notify_lock;    /* Kernel notification lock */
        rwlock_t                state_lock;     /* lock for state transition */
-       u32                     abort_code;     /* Local/remote abort code */
+       const char              *send_abort_why; /* String indicating why the abort was sent */
+       s32                     send_abort;     /* Abort code to be sent */
+       short                   send_abort_err; /* Error to be associated with the abort */
+       s32                     abort_code;     /* Local/remote abort code */
        int                     error;          /* Local error incurred */
        enum rxrpc_call_state   state;          /* current state of call */
        enum rxrpc_call_completion completion;  /* Call completion condition */
        int                     error;          /* Local error incurred */
        enum rxrpc_call_state   state;          /* current state of call */
        enum rxrpc_call_completion completion;  /* Call completion condition */
@@ -1146,6 +1149,8 @@ struct key *rxrpc_look_up_server_security(struct rxrpc_connection *,
 /*
  * sendmsg.c
  */
 /*
  * sendmsg.c
  */
+bool rxrpc_propose_abort(struct rxrpc_call *call,
+                        u32 abort_code, int error, const char *why);
 int rxrpc_do_sendmsg(struct rxrpc_sock *, struct msghdr *, size_t);
 
 /*
 int rxrpc_do_sendmsg(struct rxrpc_sock *, struct msghdr *, size_t);
 
 /*
index b2cf448fb02c0dcba93324754455955d287d16cf..b7efecf5ccfc01e9ac5f3a1180ae425408dc792b 100644 (file)
@@ -270,9 +270,11 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
 {
        struct rxrpc_txbuf *txb;
 
 {
        struct rxrpc_txbuf *txb;
 
-       if (rxrpc_is_client_call(call) &&
-           !test_bit(RXRPC_CALL_EXPOSED, &call->flags))
+       if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
+               if (list_empty(&call->tx_sendmsg))
+                       return;
                rxrpc_expose_client_call(call);
                rxrpc_expose_client_call(call);
+       }
 
        while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
                                               struct rxrpc_txbuf, call_link))) {
 
        while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
                                               struct rxrpc_txbuf, call_link))) {
@@ -336,6 +338,7 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
        unsigned long now, next, t;
        rxrpc_serial_t ackr_serial;
        bool resend = false, expired = false;
        unsigned long now, next, t;
        rxrpc_serial_t ackr_serial;
        bool resend = false, expired = false;
+       s32 abort_code;
 
        rxrpc_see_call(call, rxrpc_call_see_input);
 
 
        rxrpc_see_call(call, rxrpc_call_see_input);
 
@@ -346,6 +349,14 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
        if (call->state == RXRPC_CALL_COMPLETE)
                goto out;
 
        if (call->state == RXRPC_CALL_COMPLETE)
                goto out;
 
+       /* Handle abort request locklessly, vs rxrpc_propose_abort(). */
+       abort_code = smp_load_acquire(&call->send_abort);
+       if (abort_code) {
+               rxrpc_abort_call(call->send_abort_why, call, 0, call->send_abort,
+                                call->send_abort_err);
+               goto out;
+       }
+
        if (skb && skb->mark == RXRPC_SKB_MARK_ERROR)
                goto out;
 
        if (skb && skb->mark == RXRPC_SKB_MARK_ERROR)
                goto out;
 
@@ -433,7 +444,6 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
                } else {
                        rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, -ETIME);
                }
                } else {
                        rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, -ETIME);
                }
-               rxrpc_send_abort_packet(call);
                goto out;
        }
 
                goto out;
        }
 
index 239fc3c7507909c98289bdc0d7e58ca5d6b76cfe..298b7c465d7eea5981d185ce8a23e0be37b74e8d 100644 (file)
@@ -430,6 +430,8 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
        call->state             = RXRPC_CALL_SERVER_SECURING;
        call->cong_tstamp       = skb->tstamp;
 
        call->state             = RXRPC_CALL_SERVER_SECURING;
        call->cong_tstamp       = skb->tstamp;
 
+       __set_bit(RXRPC_CALL_EXPOSED, &call->flags);
+
        spin_lock(&conn->state_lock);
 
        switch (conn->state) {
        spin_lock(&conn->state_lock);
 
        switch (conn->state) {
@@ -590,7 +592,7 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
                call = list_entry(rx->to_be_accepted.next,
                                  struct rxrpc_call, accept_link);
                list_del(&call->accept_link);
                call = list_entry(rx->to_be_accepted.next,
                                  struct rxrpc_call, accept_link);
                list_del(&call->accept_link);
-               rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, -ECONNRESET);
+               rxrpc_propose_abort(call, RX_CALL_DEAD, -ECONNRESET, "SKR");
                rxrpc_put_call(call, rxrpc_call_put_release_sock_tba);
        }
 
                rxrpc_put_call(call, rxrpc_call_put_release_sock_tba);
        }
 
@@ -598,8 +600,7 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
                call = list_entry(rx->sock_calls.next,
                                  struct rxrpc_call, sock_link);
                rxrpc_get_call(call, rxrpc_call_get_release_sock);
                call = list_entry(rx->sock_calls.next,
                                  struct rxrpc_call, sock_link);
                rxrpc_get_call(call, rxrpc_call_get_release_sock);
-               rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, -ECONNRESET);
-               rxrpc_send_abort_packet(call);
+               rxrpc_propose_abort(call, RX_CALL_DEAD, -ECONNRESET, "SKT");
                rxrpc_release_call(rx, call);
                rxrpc_put_call(call, rxrpc_call_put_release_sock);
        }
                rxrpc_release_call(rx, call);
                rxrpc_put_call(call, rxrpc_call_put_release_sock);
        }
index d0e20e946e48d56af203cb42c3547723870bf842..1f03a286620d5680051ba6d4b03085880eb3fadf 100644 (file)
@@ -12,8 +12,7 @@
 static void rxrpc_proto_abort(const char *why,
                              struct rxrpc_call *call, rxrpc_seq_t seq)
 {
 static void rxrpc_proto_abort(const char *why,
                              struct rxrpc_call *call, rxrpc_seq_t seq)
 {
-       if (rxrpc_abort_call(why, call, seq, RX_PROTOCOL_ERROR, -EBADMSG))
-               rxrpc_send_abort_packet(call);
+       rxrpc_abort_call(why, call, seq, RX_PROTOCOL_ERROR, -EBADMSG);
 }
 
 /*
 }
 
 /*
@@ -1007,8 +1006,7 @@ void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
        case RXRPC_CALL_COMPLETE:
                break;
        default:
        case RXRPC_CALL_COMPLETE:
                break;
        default:
-               if (rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN))
-                       rxrpc_send_abort_packet(call);
+               rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN);
                trace_rxrpc_improper_term(call);
                break;
        }
                trace_rxrpc_improper_term(call);
                break;
        }
index 6ebd6440a2b7c91246a5b2a2dfb8a65b2d5860ce..a4ccdc006d0fee22d1c9d3e02a0a2d46098af560 100644 (file)
@@ -134,6 +134,8 @@ bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
        write_lock(&call->state_lock);
        ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
        write_unlock(&call->state_lock);
        write_lock(&call->state_lock);
        ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
        write_unlock(&call->state_lock);
+       if (ret && test_bit(RXRPC_CALL_EXPOSED, &call->flags))
+               rxrpc_send_abort_packet(call);
        return ret;
 }
 
        return ret;
 }
 
index cde1e65f16b4548c218b653aa306aa29a16ca13f..dc3c2a834fc8b9b5868adaa1cbd07d0ec5d043b2 100644 (file)
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
+/*
+ * Propose an abort to be made in the I/O thread.
+ */
+bool rxrpc_propose_abort(struct rxrpc_call *call,
+                        u32 abort_code, int error, const char *why)
+{
+       _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
+
+       if (!call->send_abort && call->state < RXRPC_CALL_COMPLETE) {
+               call->send_abort_why = why;
+               call->send_abort_err = error;
+               /* Request abort locklessly vs rxrpc_input_call_event(). */
+               smp_store_release(&call->send_abort, abort_code);
+               rxrpc_poke_call(call, rxrpc_call_poke_abort);
+               return true;
+       }
+
+       return false;
+}
+
 /*
  * Return true if there's sufficient Tx queue space.
  */
 /*
  * Return true if there's sufficient Tx queue space.
  */
@@ -663,9 +683,8 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
                /* it's too late for this call */
                ret = -ESHUTDOWN;
        } else if (p.command == RXRPC_CMD_SEND_ABORT) {
                /* it's too late for this call */
                ret = -ESHUTDOWN;
        } else if (p.command == RXRPC_CMD_SEND_ABORT) {
+               rxrpc_propose_abort(call, p.abort_code, -ECONNABORTED, "CMD");
                ret = 0;
                ret = 0;
-               if (rxrpc_abort_call("CMD", call, 0, p.abort_code, -ECONNABORTED))
-                       ret = rxrpc_send_abort_packet(call);
        } else if (p.command != RXRPC_CMD_SEND_DATA) {
                ret = -EINVAL;
        } else {
        } else if (p.command != RXRPC_CMD_SEND_DATA) {
                ret = -EINVAL;
        } else {
@@ -760,11 +779,7 @@ bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
        _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
 
        mutex_lock(&call->user_mutex);
        _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
 
        mutex_lock(&call->user_mutex);
-
-       aborted = rxrpc_abort_call(why, call, 0, abort_code, error);
-       if (aborted)
-               rxrpc_send_abort_packet(call);
-
+       aborted = rxrpc_propose_abort(call, abort_code, error, why);
        mutex_unlock(&call->user_mutex);
        return aborted;
 }
        mutex_unlock(&call->user_mutex);
        return aborted;
 }