Wrap accesses to get the state of a call from outside of the I/O thread in
a single place so that the barrier needed to order wrt the error code and
abort code is in just that place.
Also use a barrier when setting the call state and again when reading the
call state such that the auxiliary completion info (error code, abort code)
can be read without taking a read lock on the call state lock.
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
bool rxrpc_kernel_check_life(const struct socket *sock,
const struct rxrpc_call *call)
{
bool rxrpc_kernel_check_life(const struct socket *sock,
const struct rxrpc_call *call)
{
- return call->state != RXRPC_CALL_COMPLETE;
+ return !rxrpc_call_is_complete(call);
}
EXPORT_SYMBOL(rxrpc_kernel_check_life);
}
EXPORT_SYMBOL(rxrpc_kernel_check_life);
bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
u32 abort_code, int error, enum rxrpc_abort_reason why);
bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
u32 abort_code, int error, enum rxrpc_abort_reason why);
+static inline enum rxrpc_call_state rxrpc_call_state(const struct rxrpc_call *call)
+{
+ /* Order read ->state before read ->error. */
+ return smp_load_acquire(&call->state);
+}
+
+static inline bool rxrpc_call_is_complete(const struct rxrpc_call *call)
+{
+ return rxrpc_call_state(call) == RXRPC_CALL_COMPLETE;
+}
+
+static inline bool rxrpc_call_has_failed(const struct rxrpc_call *call)
+{
+ return rxrpc_call_is_complete(call) && call->completion != RXRPC_CALL_SUCCEEDED;
+}
+
call->abort_code = abort_code;
call->error = error;
call->completion = compl;
call->abort_code = abort_code;
call->error = error;
call->completion = compl;
- call->state = RXRPC_CALL_COMPLETE;
+ /* Allow reader of completion state to operate locklessly */
+ smp_store_release(&call->state, RXRPC_CALL_COMPLETE);
trace_rxrpc_call_complete(call);
wake_up(&call->waitq);
rxrpc_notify_socket(call);
trace_rxrpc_call_complete(call);
wake_up(&call->waitq);
rxrpc_notify_socket(call);
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
break;
default:
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
break;
default:
- pr_err("Invalid terminal call state %u\n", call->state);
+ pr_err("Invalid terminal call state %u\n", call->completion);
trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
- if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
+ if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_RECV_REPLY)
rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
write_lock(&call->state_lock);
rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
write_lock(&call->state_lock);
rx_pkt_offset = call->rx_pkt_offset;
rx_pkt_len = call->rx_pkt_len;
rx_pkt_offset = call->rx_pkt_offset;
rx_pkt_len = call->rx_pkt_len;
- if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
+ if (rxrpc_call_state(call) >= RXRPC_CALL_SERVER_ACK_REQUEST) {
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = 1;
goto done;
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = 1;
goto done;
msg->msg_namelen = len;
}
msg->msg_namelen = len;
}
- switch (READ_ONCE(call->state)) {
+ switch (rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
if (ret < 0)
goto error_unlock_call;
if (ret < 0)
goto error_unlock_call;
- if (call->state == RXRPC_CALL_COMPLETE) {
+ if (rxrpc_call_is_complete(call)) {
ret = rxrpc_recvmsg_term(call, msg);
if (ret < 0)
goto error_unlock_call;
ret = rxrpc_recvmsg_term(call, msg);
if (ret < 0)
goto error_unlock_call;
mutex_lock(&call->user_mutex);
mutex_lock(&call->user_mutex);
- switch (READ_ONCE(call->state)) {
+ switch (rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
{
_enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
{
_enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
- if (!call->send_abort && call->state < RXRPC_CALL_COMPLETE) {
+ if (!call->send_abort && !rxrpc_call_is_complete(call)) {
call->send_abort_why = why;
call->send_abort_err = error;
call->send_abort_seq = 0;
call->send_abort_why = why;
call->send_abort_err = error;
call->send_abort_seq = 0;
if (rxrpc_check_tx_space(call, NULL))
return 0;
if (rxrpc_check_tx_space(call, NULL))
return 0;
- if (call->state >= RXRPC_CALL_COMPLETE)
+ if (rxrpc_call_is_complete(call))
return call->error;
if (signal_pending(current))
return call->error;
if (signal_pending(current))
if (rxrpc_check_tx_space(call, &tx_win))
return 0;
if (rxrpc_check_tx_space(call, &tx_win))
return 0;
- if (call->state >= RXRPC_CALL_COMPLETE)
+ if (rxrpc_call_is_complete(call))
return call->error;
if (timeout == 0 &&
return call->error;
if (timeout == 0 &&
if (rxrpc_check_tx_space(call, NULL))
return 0;
if (rxrpc_check_tx_space(call, NULL))
return 0;
- if (call->state >= RXRPC_CALL_COMPLETE)
+ if (rxrpc_call_is_complete(call))
return call->error;
trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
return call->error;
trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
ret = -EPIPE;
if (sk->sk_shutdown & SEND_SHUTDOWN)
goto maybe_error;
ret = -EPIPE;
if (sk->sk_shutdown & SEND_SHUTDOWN)
goto maybe_error;
- state = READ_ONCE(call->state);
+ state = rxrpc_call_state(call);
ret = -ESHUTDOWN;
if (state >= RXRPC_CALL_COMPLETE)
goto maybe_error;
ret = -ESHUTDOWN;
if (state >= RXRPC_CALL_COMPLETE)
goto maybe_error;
/* check for the far side aborting the call or a network error
* occurring */
/* check for the far side aborting the call or a network error
* occurring */
- if (call->state == RXRPC_CALL_COMPLETE)
+ if (rxrpc_call_is_complete(call))
goto call_terminated;
/* add the packet to the send queue if it's now full */
goto call_terminated;
/* add the packet to the send queue if it's now full */
- if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) {
- read_lock(&call->state_lock);
- if (call->error < 0)
- ret = call->error;
- read_unlock(&call->state_lock);
- }
+ if (rxrpc_call_is_complete(call) &&
+ call->error < 0)
+ ret = call->error;
out:
call->tx_pending = txb;
_leave(" = %d", ret);
out:
call->tx_pending = txb;
_leave(" = %d", ret);
return PTR_ERR(call);
/* ... and we have the call lock. */
ret = 0;
return PTR_ERR(call);
/* ... and we have the call lock. */
ret = 0;
- if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE)
+ if (rxrpc_call_is_complete(call))
goto out_put_unlock;
} else {
goto out_put_unlock;
} else {
- switch (READ_ONCE(call->state)) {
+ switch (rxrpc_call_state(call)) {
case RXRPC_CALL_UNINITIALISED:
case RXRPC_CALL_CLIENT_AWAIT_CONN:
case RXRPC_CALL_SERVER_PREALLOC:
case RXRPC_CALL_UNINITIALISED:
case RXRPC_CALL_CLIENT_AWAIT_CONN:
case RXRPC_CALL_SERVER_PREALLOC:
- state = READ_ONCE(call->state);
+ state = rxrpc_call_state(call);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, state, call->conn);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, state, call->conn);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn);
- switch (READ_ONCE(call->state)) {
+ switch (rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_SEND_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
case RXRPC_CALL_SERVER_SEND_REPLY:
case RXRPC_CALL_CLIENT_SEND_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
case RXRPC_CALL_SERVER_SEND_REPLY: