1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* AF_RXRPC sendmsg() implementation.
4 * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
10 #include <linux/net.h>
11 #include <linux/gfp.h>
12 #include <linux/skbuff.h>
13 #include <linux/export.h>
14 #include <linux/sched/signal.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
21 * Propose an abort to be made in the I/O thread.
23 bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
24 enum rxrpc_abort_reason why)
26 _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
28 if (!call->send_abort && call->state < RXRPC_CALL_COMPLETE) {
29 call->send_abort_why = why;
30 call->send_abort_err = error;
31 call->send_abort_seq = 0;
32 /* Request abort locklessly vs rxrpc_input_call_event(). */
33 smp_store_release(&call->send_abort, abort_code);
34 rxrpc_poke_call(call, rxrpc_call_poke_abort);
42 * Return true if there's sufficient Tx queue space.
44 static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
47 *_tx_win = call->tx_bottom;
48 return call->tx_prepared - call->tx_bottom < 256;
52 * Wait for space to appear in the Tx queue or a signal to occur.
54 static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
55 struct rxrpc_call *call,
59 set_current_state(TASK_INTERRUPTIBLE);
60 if (rxrpc_check_tx_space(call, NULL))
63 if (call->state >= RXRPC_CALL_COMPLETE)
66 if (signal_pending(current))
67 return sock_intr_errno(*timeo);
69 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
70 *timeo = schedule_timeout(*timeo);
75 * Wait for space to appear in the Tx queue uninterruptibly, but with
76 * a timeout of 2*RTT if no progress was made and a signal occurred.
78 static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
79 struct rxrpc_call *call)
81 rxrpc_seq_t tx_start, tx_win;
82 signed long rtt, timeout;
84 rtt = READ_ONCE(call->peer->srtt_us) >> 3;
85 rtt = usecs_to_jiffies(rtt) * 2;
90 tx_start = smp_load_acquire(&call->acks_hard_ack);
93 set_current_state(TASK_UNINTERRUPTIBLE);
95 if (rxrpc_check_tx_space(call, &tx_win))
98 if (call->state >= RXRPC_CALL_COMPLETE)
102 tx_win == tx_start && signal_pending(current))
105 if (tx_win != tx_start) {
110 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
111 timeout = schedule_timeout(timeout);
116 * Wait for space to appear in the Tx queue uninterruptibly.
118 static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
119 struct rxrpc_call *call,
123 set_current_state(TASK_UNINTERRUPTIBLE);
124 if (rxrpc_check_tx_space(call, NULL))
127 if (call->state >= RXRPC_CALL_COMPLETE)
130 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
131 *timeo = schedule_timeout(*timeo);
136 * wait for space to appear in the transmit/ACK window
137 * - caller holds the socket locked
139 static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
140 struct rxrpc_call *call,
144 DECLARE_WAITQUEUE(myself, current);
147 _enter(",{%u,%u,%u,%u}",
148 call->tx_bottom, call->acks_hard_ack, call->tx_top, call->tx_winsize);
150 add_wait_queue(&call->waitq, &myself);
152 switch (call->interruptibility) {
153 case RXRPC_INTERRUPTIBLE:
155 ret = rxrpc_wait_for_tx_window_waitall(rx, call);
157 ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
159 case RXRPC_PREINTERRUPTIBLE:
160 case RXRPC_UNINTERRUPTIBLE:
162 ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
166 remove_wait_queue(&call->waitq, &myself);
167 set_current_state(TASK_RUNNING);
168 _leave(" = %d", ret);
173 * Notify the owner of the call that the transmit phase is ended and the last
174 * packet has been queued.
176 static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
177 rxrpc_notify_end_tx_t notify_end_tx)
180 notify_end_tx(&rx->sk, call, call->user_call_ID);
184 * Queue a DATA packet for transmission, set the resend timeout and send
185 * the packet immediately. Returns the error from rxrpc_send_data_packet()
186 * in case the caller wants to do something with it.
188 static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
189 struct rxrpc_txbuf *txb,
190 rxrpc_notify_end_tx_t notify_end_tx)
193 rxrpc_seq_t seq = txb->seq;
194 bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
196 rxrpc_inc_stat(call->rxnet, stat_tx_data);
198 ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
200 /* We have to set the timestamp before queueing as the retransmit
201 * algorithm can see the packet as soon as we queue it.
203 txb->last_sent = ktime_get_real();
206 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
208 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
210 /* Add the packet to the call's output buffer */
211 spin_lock(&call->tx_lock);
212 poke = list_empty(&call->tx_sendmsg);
213 list_add_tail(&txb->call_link, &call->tx_sendmsg);
214 call->tx_prepared = seq;
215 spin_unlock(&call->tx_lock);
217 if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
218 _debug("________awaiting reply/ACK__________");
219 write_lock(&call->state_lock);
220 switch (call->state) {
221 case RXRPC_CALL_CLIENT_SEND_REQUEST:
222 call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
223 rxrpc_notify_end_tx(rx, call, notify_end_tx);
225 case RXRPC_CALL_SERVER_ACK_REQUEST:
226 call->state = RXRPC_CALL_SERVER_SEND_REPLY;
228 WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
229 if (call->ackr_reason == RXRPC_ACK_DELAY)
230 call->ackr_reason = 0;
231 trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
235 case RXRPC_CALL_SERVER_SEND_REPLY:
236 call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
237 rxrpc_notify_end_tx(rx, call, notify_end_tx);
242 write_unlock(&call->state_lock);
246 rxrpc_poke_call(call, rxrpc_call_poke_start);
250 * send data through a socket
251 * - must be called in process context
252 * - The caller holds the call user access mutex, but not the socket lock.
254 static int rxrpc_send_data(struct rxrpc_sock *rx,
255 struct rxrpc_call *call,
256 struct msghdr *msg, size_t len,
257 rxrpc_notify_end_tx_t notify_end_tx,
260 struct rxrpc_txbuf *txb;
261 struct sock *sk = &rx->sk;
262 enum rxrpc_call_state state;
264 bool more = msg->msg_flags & MSG_MORE;
267 timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
269 /* this should be in poll */
270 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
274 if (sk->sk_shutdown & SEND_SHUTDOWN)
276 state = READ_ONCE(call->state);
278 if (state >= RXRPC_CALL_COMPLETE)
281 if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
282 state != RXRPC_CALL_SERVER_ACK_REQUEST &&
283 state != RXRPC_CALL_SERVER_SEND_REPLY)
287 if (call->tx_total_len != -1) {
288 if (len - copied > call->tx_total_len)
290 if (!more && len - copied != call->tx_total_len)
294 txb = call->tx_pending;
295 call->tx_pending = NULL;
297 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_send_more);
301 size_t remain, bufsize, chunk, offset;
305 if (!rxrpc_check_tx_space(call, NULL))
308 /* Work out the maximum size of a packet. Assume that
309 * the security header is going to be in the padded
310 * region (enc blocksize), but the trailer is not.
312 remain = more ? INT_MAX : msg_data_left(msg);
313 ret = call->conn->security->how_much_data(call, remain,
314 &bufsize, &chunk, &offset);
318 _debug("SIZE: %zu/%zu @%zu", chunk, bufsize, offset);
320 /* create a buffer that we can retain until it's ACK'd */
322 txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_DATA,
327 txb->offset = offset;
328 txb->space -= offset;
329 txb->space = min_t(size_t, chunk, txb->space);
334 /* append next segment of data to the current buffer */
335 if (msg_data_left(msg) > 0) {
336 size_t copy = min_t(size_t, txb->space, msg_data_left(msg));
338 _debug("add %zu", copy);
339 if (!copy_from_iter_full(txb->data + txb->offset, copy,
347 if (call->tx_total_len != -1)
348 call->tx_total_len -= copy;
351 /* check for the far side aborting the call or a network error
353 if (call->state == RXRPC_CALL_COMPLETE)
354 goto call_terminated;
356 /* add the packet to the send queue if it's now full */
358 (msg_data_left(msg) == 0 && !more)) {
359 if (msg_data_left(msg) == 0 && !more) {
360 txb->wire.flags |= RXRPC_LAST_PACKET;
361 __set_bit(RXRPC_TXBUF_LAST, &txb->flags);
363 else if (call->tx_top - call->acks_hard_ack <
365 txb->wire.flags |= RXRPC_MORE_PACKETS;
367 ret = call->security->secure_packet(call, txb);
371 rxrpc_queue_packet(rx, call, txb, notify_end_tx);
374 } while (msg_data_left(msg) > 0);
378 if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) {
379 read_lock(&call->state_lock);
382 read_unlock(&call->state_lock);
385 call->tx_pending = txb;
386 _leave(" = %d", ret);
390 rxrpc_put_txbuf(txb, rxrpc_txbuf_put_send_aborted);
391 _leave(" = %d", call->error);
405 if (msg->msg_flags & MSG_DONTWAIT)
407 mutex_unlock(&call->user_mutex);
408 *_dropped_lock = true;
409 ret = rxrpc_wait_for_tx_window(rx, call, &timeo,
410 msg->msg_flags & MSG_WAITALL);
413 if (call->interruptibility == RXRPC_INTERRUPTIBLE) {
414 if (mutex_lock_interruptible(&call->user_mutex) < 0) {
415 ret = sock_intr_errno(timeo);
419 mutex_lock(&call->user_mutex);
421 *_dropped_lock = false;
426 * extract control messages from the sendmsg() control buffer
428 static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)
430 struct cmsghdr *cmsg;
431 bool got_user_ID = false;
434 if (msg->msg_controllen == 0)
437 for_each_cmsghdr(cmsg, msg) {
438 if (!CMSG_OK(msg, cmsg))
441 len = cmsg->cmsg_len - sizeof(struct cmsghdr);
442 _debug("CMSG %d, %d, %d",
443 cmsg->cmsg_level, cmsg->cmsg_type, len);
445 if (cmsg->cmsg_level != SOL_RXRPC)
448 switch (cmsg->cmsg_type) {
449 case RXRPC_USER_CALL_ID:
450 if (msg->msg_flags & MSG_CMSG_COMPAT) {
451 if (len != sizeof(u32))
453 p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg);
455 if (len != sizeof(unsigned long))
457 p->call.user_call_ID = *(unsigned long *)
464 if (p->command != RXRPC_CMD_SEND_DATA)
466 p->command = RXRPC_CMD_SEND_ABORT;
467 if (len != sizeof(p->abort_code))
469 p->abort_code = *(unsigned int *)CMSG_DATA(cmsg);
470 if (p->abort_code == 0)
474 case RXRPC_CHARGE_ACCEPT:
475 if (p->command != RXRPC_CMD_SEND_DATA)
477 p->command = RXRPC_CMD_CHARGE_ACCEPT;
482 case RXRPC_EXCLUSIVE_CALL:
488 case RXRPC_UPGRADE_SERVICE:
494 case RXRPC_TX_LENGTH:
495 if (p->call.tx_total_len != -1 || len != sizeof(__s64))
497 p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg);
498 if (p->call.tx_total_len < 0)
502 case RXRPC_SET_CALL_TIMEOUT:
503 if (len & 3 || len < 4 || len > 12)
505 memcpy(&p->call.timeouts, CMSG_DATA(cmsg), len);
506 p->call.nr_timeouts = len / 4;
507 if (p->call.timeouts.hard > INT_MAX / HZ)
509 if (p->call.nr_timeouts >= 2 && p->call.timeouts.idle > 60 * 60 * 1000)
511 if (p->call.nr_timeouts >= 3 && p->call.timeouts.normal > 60 * 60 * 1000)
522 if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)
529 * Create a new client call for sendmsg().
530 * - Called with the socket lock held, which it must release.
531 * - If it returns a call, the call's lock will need releasing by the caller.
533 static struct rxrpc_call *
534 rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
535 struct rxrpc_send_params *p)
536 __releases(&rx->sk.sk_lock.slock)
537 __acquires(&call->user_mutex)
539 struct rxrpc_conn_parameters cp;
540 struct rxrpc_call *call;
543 DECLARE_SOCKADDR(struct sockaddr_rxrpc *, srx, msg->msg_name);
547 if (!msg->msg_name) {
548 release_sock(&rx->sk);
549 return ERR_PTR(-EDESTADDRREQ);
553 if (key && !rx->key->payload.data[0])
556 memset(&cp, 0, sizeof(cp));
557 cp.local = rx->local;
559 cp.security_level = rx->min_sec_level;
560 cp.exclusive = rx->exclusive | p->exclusive;
561 cp.upgrade = p->upgrade;
562 cp.service_id = srx->srx_service;
563 call = rxrpc_new_client_call(rx, &cp, srx, &p->call, GFP_KERNEL,
564 atomic_inc_return(&rxrpc_debug_id));
565 /* The socket is now unlocked */
567 rxrpc_put_peer(cp.peer, rxrpc_peer_put_discard_tmp);
568 _leave(" = %p\n", call);
573 * send a message forming part of a client call through an RxRPC socket
574 * - caller holds the socket locked
575 * - the socket may be either a client socket or a server socket
577 int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
578 __releases(&rx->sk.sk_lock.slock)
580 enum rxrpc_call_state state;
581 struct rxrpc_call *call;
582 unsigned long now, j;
583 bool dropped_lock = false;
586 struct rxrpc_send_params p = {
587 .call.tx_total_len = -1,
588 .call.user_call_ID = 0,
589 .call.nr_timeouts = 0,
590 .call.interruptibility = RXRPC_INTERRUPTIBLE,
592 .command = RXRPC_CMD_SEND_DATA,
599 ret = rxrpc_sendmsg_cmsg(msg, &p);
601 goto error_release_sock;
603 if (p.command == RXRPC_CMD_CHARGE_ACCEPT) {
605 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
606 goto error_release_sock;
607 ret = rxrpc_user_charge_accept(rx, p.call.user_call_ID);
608 goto error_release_sock;
611 call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);
614 if (p.command != RXRPC_CMD_SEND_DATA)
615 goto error_release_sock;
616 call = rxrpc_new_client_call_for_sendmsg(rx, msg, &p);
617 /* The socket is now unlocked... */
619 return PTR_ERR(call);
620 /* ... and we have the call lock. */
622 if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE)
625 switch (READ_ONCE(call->state)) {
626 case RXRPC_CALL_UNINITIALISED:
627 case RXRPC_CALL_CLIENT_AWAIT_CONN:
628 case RXRPC_CALL_SERVER_PREALLOC:
629 case RXRPC_CALL_SERVER_SECURING:
630 rxrpc_put_call(call, rxrpc_call_put_sendmsg);
632 goto error_release_sock;
637 ret = mutex_lock_interruptible(&call->user_mutex);
638 release_sock(&rx->sk);
644 if (p.call.tx_total_len != -1) {
646 if (call->tx_total_len != -1 ||
650 call->tx_total_len = p.call.tx_total_len;
654 switch (p.call.nr_timeouts) {
656 j = msecs_to_jiffies(p.call.timeouts.normal);
657 if (p.call.timeouts.normal > 0 && j == 0)
659 WRITE_ONCE(call->next_rx_timo, j);
662 j = msecs_to_jiffies(p.call.timeouts.idle);
663 if (p.call.timeouts.idle > 0 && j == 0)
665 WRITE_ONCE(call->next_req_timo, j);
668 if (p.call.timeouts.hard > 0) {
669 j = msecs_to_jiffies(p.call.timeouts.hard);
672 WRITE_ONCE(call->expect_term_by, j);
673 rxrpc_reduce_call_timer(call, j, now,
674 rxrpc_timer_set_for_hard);
679 state = READ_ONCE(call->state);
680 _debug("CALL %d USR %lx ST %d on CONN %p",
681 call->debug_id, call->user_call_ID, state, call->conn);
683 if (state >= RXRPC_CALL_COMPLETE) {
684 /* it's too late for this call */
686 } else if (p.command == RXRPC_CMD_SEND_ABORT) {
687 rxrpc_propose_abort(call, p.abort_code, -ECONNABORTED,
688 rxrpc_abort_call_sendmsg);
690 } else if (p.command != RXRPC_CMD_SEND_DATA) {
693 ret = rxrpc_send_data(rx, call, msg, len, NULL, &dropped_lock);
698 mutex_unlock(&call->user_mutex);
700 rxrpc_put_call(call, rxrpc_call_put_sendmsg);
701 _leave(" = %d", ret);
705 release_sock(&rx->sk);
710 * rxrpc_kernel_send_data - Allow a kernel service to send data on a call
711 * @sock: The socket the call is on
712 * @call: The call to send data through
713 * @msg: The data to send
714 * @len: The amount of data to send
715 * @notify_end_tx: Notification that the last packet is queued.
717 * Allow a kernel service to send data on a call. The call must be in an state
718 * appropriate to sending data. No control data should be supplied in @msg,
719 * nor should an address be supplied. MSG_MORE should be flagged if there's
720 * more data to come, otherwise this data will end the transmission phase.
722 int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
723 struct msghdr *msg, size_t len,
724 rxrpc_notify_end_tx_t notify_end_tx)
726 bool dropped_lock = false;
729 _enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]);
731 ASSERTCMP(msg->msg_name, ==, NULL);
732 ASSERTCMP(msg->msg_control, ==, NULL);
734 mutex_lock(&call->user_mutex);
736 _debug("CALL %d USR %lx ST %d on CONN %p",
737 call->debug_id, call->user_call_ID, call->state, call->conn);
739 switch (READ_ONCE(call->state)) {
740 case RXRPC_CALL_CLIENT_SEND_REQUEST:
741 case RXRPC_CALL_SERVER_ACK_REQUEST:
742 case RXRPC_CALL_SERVER_SEND_REPLY:
743 ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
744 notify_end_tx, &dropped_lock);
746 case RXRPC_CALL_COMPLETE:
747 read_lock(&call->state_lock);
749 read_unlock(&call->state_lock);
752 /* Request phase complete for this client call */
753 trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
754 call->cid, call->call_id, call->rx_consumed,
761 mutex_unlock(&call->user_mutex);
762 _leave(" = %d", ret);
765 EXPORT_SYMBOL(rxrpc_kernel_send_data);
768 * rxrpc_kernel_abort_call - Allow a kernel service to abort a call
769 * @sock: The socket the call is on
770 * @call: The call to be aborted
771 * @abort_code: The abort code to stick into the ABORT packet
772 * @error: Local error value
773 * @why: Indication as to why.
775 * Allow a kernel service to abort a call, if it's still in an abortable state
776 * and return true if the call was aborted, false if it was already complete.
778 bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
779 u32 abort_code, int error, enum rxrpc_abort_reason why)
783 _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
785 mutex_lock(&call->user_mutex);
786 aborted = rxrpc_propose_abort(call, abort_code, error, why);
787 mutex_unlock(&call->user_mutex);
790 EXPORT_SYMBOL(rxrpc_kernel_abort_call);
793 * rxrpc_kernel_set_tx_length - Set the total Tx length on a call
794 * @sock: The socket the call is on
795 * @call: The call to be informed
796 * @tx_total_len: The amount of data to be transmitted for this call
798 * Allow a kernel service to set the total transmit length on a call. This
799 * allows buffer-to-packet encrypt-and-copy to be performed.
801 * This function is primarily for use for setting the reply length since the
802 * request length can be set when beginning the call.
804 void rxrpc_kernel_set_tx_length(struct socket *sock, struct rxrpc_call *call,
807 WARN_ON(call->tx_total_len != -1);
808 call->tx_total_len = tx_total_len;
810 EXPORT_SYMBOL(rxrpc_kernel_set_tx_length);