rxrpc: Wrap accesses to get call state to put the barrier in one place
authorDavid Howells <dhowells@redhat.com>
Mon, 19 Dec 2022 15:32:32 +0000 (15:32 +0000)
committerDavid Howells <dhowells@redhat.com>
Fri, 6 Jan 2023 09:43:32 +0000 (09:43 +0000)
Wrap accesses to get the state of a call from outside of the I/O thread in
a single place so that the barrier needed to order wrt the error code and
abort code is in just that place.

Also use a barrier when setting the call state and again when reading the
call state such that the auxiliary completion info (error code, abort code)
can be read without taking a read lock on the call state lock.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

net/rxrpc/af_rxrpc.c
net/rxrpc/ar-internal.h
net/rxrpc/call_state.c
net/rxrpc/recvmsg.c
net/rxrpc/sendmsg.c

index f4e1fff..61c30d0 100644 (file)
@@ -379,7 +379,7 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
 bool rxrpc_kernel_check_life(const struct socket *sock,
                             const struct rxrpc_call *call)
 {
-       return call->state != RXRPC_CALL_COMPLETE;
+       return !rxrpc_call_is_complete(call);
 }
 EXPORT_SYMBOL(rxrpc_kernel_check_life);
 
index 203e035..9e99248 100644 (file)
@@ -903,6 +903,22 @@ bool __rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
 bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
                      u32 abort_code, int error, enum rxrpc_abort_reason why);
 
+static inline enum rxrpc_call_state rxrpc_call_state(const struct rxrpc_call *call)
+{
+       /* Order read ->state before read ->error. */
+       return smp_load_acquire(&call->state);
+}
+
+static inline bool rxrpc_call_is_complete(const struct rxrpc_call *call)
+{
+       return rxrpc_call_state(call) == RXRPC_CALL_COMPLETE;
+}
+
+static inline bool rxrpc_call_has_failed(const struct rxrpc_call *call)
+{
+       return rxrpc_call_is_complete(call) && call->completion != RXRPC_CALL_SUCCEEDED;
+}
+
 /*
  * conn_client.c
  */
index 8fbb211..649fb9e 100644 (file)
@@ -19,7 +19,8 @@ bool __rxrpc_set_call_completion(struct rxrpc_call *call,
                call->abort_code = abort_code;
                call->error = error;
                call->completion = compl;
-               call->state = RXRPC_CALL_COMPLETE;
+               /* Allow reader of completion state to operate locklessly */
+               smp_store_release(&call->state, RXRPC_CALL_COMPLETE);
                trace_rxrpc_call_complete(call);
                wake_up(&call->waitq);
                rxrpc_notify_socket(call);
index ff08f91..7bf36a8 100644 (file)
@@ -89,7 +89,7 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
                break;
        default:
-               pr_err("Invalid terminal call state %u\n", call->state);
+               pr_err("Invalid terminal call state %u\n", call->completion);
                BUG();
                break;
        }
@@ -111,7 +111,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 
        trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
 
-       if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
+       if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_RECV_REPLY)
                rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
 
        write_lock(&call->state_lock);
@@ -210,7 +210,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
        rx_pkt_offset = call->rx_pkt_offset;
        rx_pkt_len = call->rx_pkt_len;
 
-       if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
+       if (rxrpc_call_state(call) >= RXRPC_CALL_SERVER_ACK_REQUEST) {
                seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
                ret = 1;
                goto done;
@@ -416,7 +416,7 @@ try_again:
                msg->msg_namelen = len;
        }
 
-       switch (READ_ONCE(call->state)) {
+       switch (rxrpc_call_state(call)) {
        case RXRPC_CALL_CLIENT_RECV_REPLY:
        case RXRPC_CALL_SERVER_RECV_REQUEST:
        case RXRPC_CALL_SERVER_ACK_REQUEST:
@@ -436,7 +436,7 @@ try_again:
        if (ret < 0)
                goto error_unlock_call;
 
-       if (call->state == RXRPC_CALL_COMPLETE) {
+       if (rxrpc_call_is_complete(call)) {
                ret = rxrpc_recvmsg_term(call, msg);
                if (ret < 0)
                        goto error_unlock_call;
@@ -516,7 +516,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 
        mutex_lock(&call->user_mutex);
 
-       switch (READ_ONCE(call->state)) {
+       switch (rxrpc_call_state(call)) {
        case RXRPC_CALL_CLIENT_RECV_REPLY:
        case RXRPC_CALL_SERVER_RECV_REQUEST:
        case RXRPC_CALL_SERVER_ACK_REQUEST:
index 2a003c3..f0b5822 100644 (file)
@@ -25,7 +25,7 @@ bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
 {
        _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
 
-       if (!call->send_abort && call->state < RXRPC_CALL_COMPLETE) {
+       if (!call->send_abort && !rxrpc_call_is_complete(call)) {
                call->send_abort_why = why;
                call->send_abort_err = error;
                call->send_abort_seq = 0;
@@ -60,7 +60,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
                if (rxrpc_check_tx_space(call, NULL))
                        return 0;
 
-               if (call->state >= RXRPC_CALL_COMPLETE)
+               if (rxrpc_call_is_complete(call))
                        return call->error;
 
                if (signal_pending(current))
@@ -95,7 +95,7 @@ static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
                if (rxrpc_check_tx_space(call, &tx_win))
                        return 0;
 
-               if (call->state >= RXRPC_CALL_COMPLETE)
+               if (rxrpc_call_is_complete(call))
                        return call->error;
 
                if (timeout == 0 &&
@@ -124,7 +124,7 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
                if (rxrpc_check_tx_space(call, NULL))
                        return 0;
 
-               if (call->state >= RXRPC_CALL_COMPLETE)
+               if (rxrpc_call_is_complete(call))
                        return call->error;
 
                trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
@@ -273,7 +273,7 @@ reload:
        ret = -EPIPE;
        if (sk->sk_shutdown & SEND_SHUTDOWN)
                goto maybe_error;
-       state = READ_ONCE(call->state);
+       state = rxrpc_call_state(call);
        ret = -ESHUTDOWN;
        if (state >= RXRPC_CALL_COMPLETE)
                goto maybe_error;
@@ -350,7 +350,7 @@ reload:
 
                /* check for the far side aborting the call or a network error
                 * occurring */
-               if (call->state == RXRPC_CALL_COMPLETE)
+               if (rxrpc_call_is_complete(call))
                        goto call_terminated;
 
                /* add the packet to the send queue if it's now full */
@@ -375,12 +375,9 @@ reload:
 
 success:
        ret = copied;
-       if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) {
-               read_lock(&call->state_lock);
-               if (call->error < 0)
-                       ret = call->error;
-               read_unlock(&call->state_lock);
-       }
+       if (rxrpc_call_is_complete(call) &&
+           call->error < 0)
+               ret = call->error;
 out:
        call->tx_pending = txb;
        _leave(" = %d", ret);
@@ -618,10 +615,10 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
                        return PTR_ERR(call);
                /* ... and we have the call lock. */
                ret = 0;
-               if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE)
+               if (rxrpc_call_is_complete(call))
                        goto out_put_unlock;
        } else {
-               switch (READ_ONCE(call->state)) {
+               switch (rxrpc_call_state(call)) {
                case RXRPC_CALL_UNINITIALISED:
                case RXRPC_CALL_CLIENT_AWAIT_CONN:
                case RXRPC_CALL_SERVER_PREALLOC:
@@ -675,7 +672,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
                break;
        }
 
-       state = READ_ONCE(call->state);
+       state = rxrpc_call_state(call);
        _debug("CALL %d USR %lx ST %d on CONN %p",
               call->debug_id, call->user_call_ID, state, call->conn);
 
@@ -735,7 +732,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
        _debug("CALL %d USR %lx ST %d on CONN %p",
               call->debug_id, call->user_call_ID, call->state, call->conn);
 
-       switch (READ_ONCE(call->state)) {
+       switch (rxrpc_call_state(call)) {
        case RXRPC_CALL_CLIENT_SEND_REQUEST:
        case RXRPC_CALL_SERVER_ACK_REQUEST:
        case RXRPC_CALL_SERVER_SEND_REPLY: