rxrpc: Move DATA transmission into call processor work item
[platform/kernel/linux-rpi.git] / net / rxrpc / sendmsg.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* AF_RXRPC sendmsg() implementation.
3  *
4  * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include <linux/net.h>
11 #include <linux/gfp.h>
12 #include <linux/skbuff.h>
13 #include <linux/export.h>
14 #include <linux/sched/signal.h>
15
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19
20 /*
21  * Return true if there's sufficient Tx queue space.
22  */
23 static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
24 {
25         if (_tx_win)
26                 *_tx_win = call->tx_bottom;
27         return call->tx_prepared - call->tx_bottom < 256;
28 }
29
30 /*
31  * Wait for space to appear in the Tx queue or a signal to occur.
32  */
33 static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
34                                          struct rxrpc_call *call,
35                                          long *timeo)
36 {
37         for (;;) {
38                 set_current_state(TASK_INTERRUPTIBLE);
39                 if (rxrpc_check_tx_space(call, NULL))
40                         return 0;
41
42                 if (call->state >= RXRPC_CALL_COMPLETE)
43                         return call->error;
44
45                 if (signal_pending(current))
46                         return sock_intr_errno(*timeo);
47
48                 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
49                 *timeo = schedule_timeout(*timeo);
50         }
51 }
52
53 /*
54  * Wait for space to appear in the Tx queue uninterruptibly, but with
55  * a timeout of 2*RTT if no progress was made and a signal occurred.
56  */
57 static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
58                                             struct rxrpc_call *call)
59 {
60         rxrpc_seq_t tx_start, tx_win;
61         signed long rtt, timeout;
62
63         rtt = READ_ONCE(call->peer->srtt_us) >> 3;
64         rtt = usecs_to_jiffies(rtt) * 2;
65         if (rtt < 2)
66                 rtt = 2;
67
68         timeout = rtt;
69         tx_start = smp_load_acquire(&call->acks_hard_ack);
70
71         for (;;) {
72                 set_current_state(TASK_UNINTERRUPTIBLE);
73
74                 if (rxrpc_check_tx_space(call, &tx_win))
75                         return 0;
76
77                 if (call->state >= RXRPC_CALL_COMPLETE)
78                         return call->error;
79
80                 if (timeout == 0 &&
81                     tx_win == tx_start && signal_pending(current))
82                         return -EINTR;
83
84                 if (tx_win != tx_start) {
85                         timeout = rtt;
86                         tx_start = tx_win;
87                 }
88
89                 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
90                 timeout = schedule_timeout(timeout);
91         }
92 }
93
94 /*
95  * Wait for space to appear in the Tx queue uninterruptibly.
96  */
97 static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
98                                             struct rxrpc_call *call,
99                                             long *timeo)
100 {
101         for (;;) {
102                 set_current_state(TASK_UNINTERRUPTIBLE);
103                 if (rxrpc_check_tx_space(call, NULL))
104                         return 0;
105
106                 if (call->state >= RXRPC_CALL_COMPLETE)
107                         return call->error;
108
109                 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
110                 *timeo = schedule_timeout(*timeo);
111         }
112 }
113
114 /*
115  * wait for space to appear in the transmit/ACK window
116  * - caller holds the socket locked
117  */
118 static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
119                                     struct rxrpc_call *call,
120                                     long *timeo,
121                                     bool waitall)
122 {
123         DECLARE_WAITQUEUE(myself, current);
124         int ret;
125
126         _enter(",{%u,%u,%u,%u}",
127                call->tx_bottom, call->acks_hard_ack, call->tx_top, call->tx_winsize);
128
129         add_wait_queue(&call->waitq, &myself);
130
131         switch (call->interruptibility) {
132         case RXRPC_INTERRUPTIBLE:
133                 if (waitall)
134                         ret = rxrpc_wait_for_tx_window_waitall(rx, call);
135                 else
136                         ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
137                 break;
138         case RXRPC_PREINTERRUPTIBLE:
139         case RXRPC_UNINTERRUPTIBLE:
140         default:
141                 ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
142                 break;
143         }
144
145         remove_wait_queue(&call->waitq, &myself);
146         set_current_state(TASK_RUNNING);
147         _leave(" = %d", ret);
148         return ret;
149 }
150
151 /*
152  * Notify the owner of the call that the transmit phase is ended and the last
153  * packet has been queued.
154  */
155 static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
156                                 rxrpc_notify_end_tx_t notify_end_tx)
157 {
158         if (notify_end_tx)
159                 notify_end_tx(&rx->sk, call, call->user_call_ID);
160 }
161
162 /*
163  * Queue a DATA packet for transmission, set the resend timeout and send
164  * the packet immediately.  Returns the error from rxrpc_send_data_packet()
165  * in case the caller wants to do something with it.
166  */
167 static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
168                                struct rxrpc_txbuf *txb,
169                                rxrpc_notify_end_tx_t notify_end_tx)
170 {
171         unsigned long now;
172         rxrpc_seq_t seq = txb->seq;
173         bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags);
174
175         rxrpc_inc_stat(call->rxnet, stat_tx_data);
176
177         ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
178
179         /* We have to set the timestamp before queueing as the retransmit
180          * algorithm can see the packet as soon as we queue it.
181          */
182         txb->last_sent = ktime_get_real();
183
184         if (last)
185                 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
186         else
187                 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
188
189         /* Add the packet to the call's output buffer */
190         spin_lock(&call->tx_lock);
191         list_add_tail(&txb->call_link, &call->tx_sendmsg);
192         call->tx_prepared = seq;
193         spin_unlock(&call->tx_lock);
194
195         if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
196                 _debug("________awaiting reply/ACK__________");
197                 write_lock_bh(&call->state_lock);
198                 switch (call->state) {
199                 case RXRPC_CALL_CLIENT_SEND_REQUEST:
200                         call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
201                         rxrpc_notify_end_tx(rx, call, notify_end_tx);
202                         break;
203                 case RXRPC_CALL_SERVER_ACK_REQUEST:
204                         call->state = RXRPC_CALL_SERVER_SEND_REPLY;
205                         now = jiffies;
206                         WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
207                         if (call->ackr_reason == RXRPC_ACK_DELAY)
208                                 call->ackr_reason = 0;
209                         trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
210                         if (!last)
211                                 break;
212                         fallthrough;
213                 case RXRPC_CALL_SERVER_SEND_REPLY:
214                         call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
215                         rxrpc_notify_end_tx(rx, call, notify_end_tx);
216                         break;
217                 default:
218                         break;
219                 }
220                 write_unlock_bh(&call->state_lock);
221         }
222
223
224         /* Stick the packet on the crypto queue or the transmission queue as
225          * appropriate.
226          */
227         rxrpc_queue_call(call, rxrpc_call_queue_tx_data);
228 }
229
230 /*
231  * send data through a socket
232  * - must be called in process context
233  * - The caller holds the call user access mutex, but not the socket lock.
234  */
235 static int rxrpc_send_data(struct rxrpc_sock *rx,
236                            struct rxrpc_call *call,
237                            struct msghdr *msg, size_t len,
238                            rxrpc_notify_end_tx_t notify_end_tx,
239                            bool *_dropped_lock)
240 {
241         struct rxrpc_txbuf *txb;
242         struct sock *sk = &rx->sk;
243         enum rxrpc_call_state state;
244         long timeo;
245         bool more = msg->msg_flags & MSG_MORE;
246         int ret, copied = 0;
247
248         timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
249
250         /* this should be in poll */
251         sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
252
253 reload:
254         ret = -EPIPE;
255         if (sk->sk_shutdown & SEND_SHUTDOWN)
256                 goto maybe_error;
257         state = READ_ONCE(call->state);
258         ret = -ESHUTDOWN;
259         if (state >= RXRPC_CALL_COMPLETE)
260                 goto maybe_error;
261         ret = -EPROTO;
262         if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
263             state != RXRPC_CALL_SERVER_ACK_REQUEST &&
264             state != RXRPC_CALL_SERVER_SEND_REPLY)
265                 goto maybe_error;
266
267         ret = -EMSGSIZE;
268         if (call->tx_total_len != -1) {
269                 if (len - copied > call->tx_total_len)
270                         goto maybe_error;
271                 if (!more && len - copied != call->tx_total_len)
272                         goto maybe_error;
273         }
274
275         txb = call->tx_pending;
276         call->tx_pending = NULL;
277         if (txb)
278                 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_send_more);
279
280         do {
281                 rxrpc_transmit_ack_packets(call->peer->local);
282
283                 if (!txb) {
284                         size_t remain, bufsize, chunk, offset;
285
286                         _debug("alloc");
287
288                         if (!rxrpc_check_tx_space(call, NULL))
289                                 goto wait_for_space;
290
291                         /* Work out the maximum size of a packet.  Assume that
292                          * the security header is going to be in the padded
293                          * region (enc blocksize), but the trailer is not.
294                          */
295                         remain = more ? INT_MAX : msg_data_left(msg);
296                         ret = call->conn->security->how_much_data(call, remain,
297                                                                   &bufsize, &chunk, &offset);
298                         if (ret < 0)
299                                 goto maybe_error;
300
301                         _debug("SIZE: %zu/%zu @%zu", chunk, bufsize, offset);
302
303                         /* create a buffer that we can retain until it's ACK'd */
304                         ret = -ENOMEM;
305                         txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_DATA,
306                                                 GFP_KERNEL);
307                         if (!txb)
308                                 goto maybe_error;
309
310                         txb->offset = offset;
311                         txb->space -= offset;
312                         txb->space = min_t(size_t, chunk, txb->space);
313                 }
314
315                 _debug("append");
316
317                 /* append next segment of data to the current buffer */
318                 if (msg_data_left(msg) > 0) {
319                         size_t copy = min_t(size_t, txb->space, msg_data_left(msg));
320
321                         _debug("add %zu", copy);
322                         if (!copy_from_iter_full(txb->data + txb->offset, copy,
323                                                  &msg->msg_iter))
324                                 goto efault;
325                         _debug("added");
326                         txb->space -= copy;
327                         txb->len += copy;
328                         txb->offset += copy;
329                         copied += copy;
330                         if (call->tx_total_len != -1)
331                                 call->tx_total_len -= copy;
332                 }
333
334                 /* check for the far side aborting the call or a network error
335                  * occurring */
336                 if (call->state == RXRPC_CALL_COMPLETE)
337                         goto call_terminated;
338
339                 /* add the packet to the send queue if it's now full */
340                 if (!txb->space ||
341                     (msg_data_left(msg) == 0 && !more)) {
342                         if (msg_data_left(msg) == 0 && !more) {
343                                 txb->wire.flags |= RXRPC_LAST_PACKET;
344                                 __set_bit(RXRPC_TXBUF_LAST, &txb->flags);
345                         }
346                         else if (call->tx_top - call->acks_hard_ack <
347                                  call->tx_winsize)
348                                 txb->wire.flags |= RXRPC_MORE_PACKETS;
349
350                         ret = call->security->secure_packet(call, txb);
351                         if (ret < 0)
352                                 goto out;
353
354                         rxrpc_queue_packet(rx, call, txb, notify_end_tx);
355                         txb = NULL;
356                 }
357         } while (msg_data_left(msg) > 0);
358
359 success:
360         ret = copied;
361         if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) {
362                 read_lock_bh(&call->state_lock);
363                 if (call->error < 0)
364                         ret = call->error;
365                 read_unlock_bh(&call->state_lock);
366         }
367 out:
368         call->tx_pending = txb;
369         _leave(" = %d", ret);
370         return ret;
371
372 call_terminated:
373         rxrpc_put_txbuf(txb, rxrpc_txbuf_put_send_aborted);
374         _leave(" = %d", call->error);
375         return call->error;
376
377 maybe_error:
378         if (copied)
379                 goto success;
380         goto out;
381
382 efault:
383         ret = -EFAULT;
384         goto out;
385
386 wait_for_space:
387         ret = -EAGAIN;
388         if (msg->msg_flags & MSG_DONTWAIT)
389                 goto maybe_error;
390         mutex_unlock(&call->user_mutex);
391         *_dropped_lock = true;
392         ret = rxrpc_wait_for_tx_window(rx, call, &timeo,
393                                        msg->msg_flags & MSG_WAITALL);
394         if (ret < 0)
395                 goto maybe_error;
396         if (call->interruptibility == RXRPC_INTERRUPTIBLE) {
397                 if (mutex_lock_interruptible(&call->user_mutex) < 0) {
398                         ret = sock_intr_errno(timeo);
399                         goto maybe_error;
400                 }
401         } else {
402                 mutex_lock(&call->user_mutex);
403         }
404         *_dropped_lock = false;
405         goto reload;
406 }
407
408 /*
409  * extract control messages from the sendmsg() control buffer
410  */
411 static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)
412 {
413         struct cmsghdr *cmsg;
414         bool got_user_ID = false;
415         int len;
416
417         if (msg->msg_controllen == 0)
418                 return -EINVAL;
419
420         for_each_cmsghdr(cmsg, msg) {
421                 if (!CMSG_OK(msg, cmsg))
422                         return -EINVAL;
423
424                 len = cmsg->cmsg_len - sizeof(struct cmsghdr);
425                 _debug("CMSG %d, %d, %d",
426                        cmsg->cmsg_level, cmsg->cmsg_type, len);
427
428                 if (cmsg->cmsg_level != SOL_RXRPC)
429                         continue;
430
431                 switch (cmsg->cmsg_type) {
432                 case RXRPC_USER_CALL_ID:
433                         if (msg->msg_flags & MSG_CMSG_COMPAT) {
434                                 if (len != sizeof(u32))
435                                         return -EINVAL;
436                                 p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg);
437                         } else {
438                                 if (len != sizeof(unsigned long))
439                                         return -EINVAL;
440                                 p->call.user_call_ID = *(unsigned long *)
441                                         CMSG_DATA(cmsg);
442                         }
443                         got_user_ID = true;
444                         break;
445
446                 case RXRPC_ABORT:
447                         if (p->command != RXRPC_CMD_SEND_DATA)
448                                 return -EINVAL;
449                         p->command = RXRPC_CMD_SEND_ABORT;
450                         if (len != sizeof(p->abort_code))
451                                 return -EINVAL;
452                         p->abort_code = *(unsigned int *)CMSG_DATA(cmsg);
453                         if (p->abort_code == 0)
454                                 return -EINVAL;
455                         break;
456
457                 case RXRPC_CHARGE_ACCEPT:
458                         if (p->command != RXRPC_CMD_SEND_DATA)
459                                 return -EINVAL;
460                         p->command = RXRPC_CMD_CHARGE_ACCEPT;
461                         if (len != 0)
462                                 return -EINVAL;
463                         break;
464
465                 case RXRPC_EXCLUSIVE_CALL:
466                         p->exclusive = true;
467                         if (len != 0)
468                                 return -EINVAL;
469                         break;
470
471                 case RXRPC_UPGRADE_SERVICE:
472                         p->upgrade = true;
473                         if (len != 0)
474                                 return -EINVAL;
475                         break;
476
477                 case RXRPC_TX_LENGTH:
478                         if (p->call.tx_total_len != -1 || len != sizeof(__s64))
479                                 return -EINVAL;
480                         p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg);
481                         if (p->call.tx_total_len < 0)
482                                 return -EINVAL;
483                         break;
484
485                 case RXRPC_SET_CALL_TIMEOUT:
486                         if (len & 3 || len < 4 || len > 12)
487                                 return -EINVAL;
488                         memcpy(&p->call.timeouts, CMSG_DATA(cmsg), len);
489                         p->call.nr_timeouts = len / 4;
490                         if (p->call.timeouts.hard > INT_MAX / HZ)
491                                 return -ERANGE;
492                         if (p->call.nr_timeouts >= 2 && p->call.timeouts.idle > 60 * 60 * 1000)
493                                 return -ERANGE;
494                         if (p->call.nr_timeouts >= 3 && p->call.timeouts.normal > 60 * 60 * 1000)
495                                 return -ERANGE;
496                         break;
497
498                 default:
499                         return -EINVAL;
500                 }
501         }
502
503         if (!got_user_ID)
504                 return -EINVAL;
505         if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)
506                 return -EINVAL;
507         _leave(" = 0");
508         return 0;
509 }
510
511 /*
512  * Create a new client call for sendmsg().
513  * - Called with the socket lock held, which it must release.
514  * - If it returns a call, the call's lock will need releasing by the caller.
515  */
516 static struct rxrpc_call *
517 rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
518                                   struct rxrpc_send_params *p)
519         __releases(&rx->sk.sk_lock.slock)
520         __acquires(&call->user_mutex)
521 {
522         struct rxrpc_conn_parameters cp;
523         struct rxrpc_call *call;
524         struct key *key;
525
526         DECLARE_SOCKADDR(struct sockaddr_rxrpc *, srx, msg->msg_name);
527
528         _enter("");
529
530         if (!msg->msg_name) {
531                 release_sock(&rx->sk);
532                 return ERR_PTR(-EDESTADDRREQ);
533         }
534
535         key = rx->key;
536         if (key && !rx->key->payload.data[0])
537                 key = NULL;
538
539         memset(&cp, 0, sizeof(cp));
540         cp.local                = rx->local;
541         cp.key                  = rx->key;
542         cp.security_level       = rx->min_sec_level;
543         cp.exclusive            = rx->exclusive | p->exclusive;
544         cp.upgrade              = p->upgrade;
545         cp.service_id           = srx->srx_service;
546         call = rxrpc_new_client_call(rx, &cp, srx, &p->call, GFP_KERNEL,
547                                      atomic_inc_return(&rxrpc_debug_id));
548         /* The socket is now unlocked */
549
550         rxrpc_put_peer(cp.peer, rxrpc_peer_put_discard_tmp);
551         _leave(" = %p\n", call);
552         return call;
553 }
554
555 /*
556  * send a message forming part of a client call through an RxRPC socket
557  * - caller holds the socket locked
558  * - the socket may be either a client socket or a server socket
559  */
560 int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
561         __releases(&rx->sk.sk_lock.slock)
562 {
563         enum rxrpc_call_state state;
564         struct rxrpc_call *call;
565         unsigned long now, j;
566         bool dropped_lock = false;
567         int ret;
568
569         struct rxrpc_send_params p = {
570                 .call.tx_total_len      = -1,
571                 .call.user_call_ID      = 0,
572                 .call.nr_timeouts       = 0,
573                 .call.interruptibility  = RXRPC_INTERRUPTIBLE,
574                 .abort_code             = 0,
575                 .command                = RXRPC_CMD_SEND_DATA,
576                 .exclusive              = false,
577                 .upgrade                = false,
578         };
579
580         _enter("");
581
582         ret = rxrpc_sendmsg_cmsg(msg, &p);
583         if (ret < 0)
584                 goto error_release_sock;
585
586         if (p.command == RXRPC_CMD_CHARGE_ACCEPT) {
587                 ret = -EINVAL;
588                 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
589                         goto error_release_sock;
590                 ret = rxrpc_user_charge_accept(rx, p.call.user_call_ID);
591                 goto error_release_sock;
592         }
593
594         call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);
595         if (!call) {
596                 ret = -EBADSLT;
597                 if (p.command != RXRPC_CMD_SEND_DATA)
598                         goto error_release_sock;
599                 call = rxrpc_new_client_call_for_sendmsg(rx, msg, &p);
600                 /* The socket is now unlocked... */
601                 if (IS_ERR(call))
602                         return PTR_ERR(call);
603                 /* ... and we have the call lock. */
604                 ret = 0;
605                 if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE)
606                         goto out_put_unlock;
607         } else {
608                 switch (READ_ONCE(call->state)) {
609                 case RXRPC_CALL_UNINITIALISED:
610                 case RXRPC_CALL_CLIENT_AWAIT_CONN:
611                 case RXRPC_CALL_SERVER_PREALLOC:
612                 case RXRPC_CALL_SERVER_SECURING:
613                         rxrpc_put_call(call, rxrpc_call_put_sendmsg);
614                         ret = -EBUSY;
615                         goto error_release_sock;
616                 default:
617                         break;
618                 }
619
620                 ret = mutex_lock_interruptible(&call->user_mutex);
621                 release_sock(&rx->sk);
622                 if (ret < 0) {
623                         ret = -ERESTARTSYS;
624                         goto error_put;
625                 }
626
627                 if (p.call.tx_total_len != -1) {
628                         ret = -EINVAL;
629                         if (call->tx_total_len != -1 ||
630                             call->tx_pending ||
631                             call->tx_top != 0)
632                                 goto error_put;
633                         call->tx_total_len = p.call.tx_total_len;
634                 }
635         }
636
637         switch (p.call.nr_timeouts) {
638         case 3:
639                 j = msecs_to_jiffies(p.call.timeouts.normal);
640                 if (p.call.timeouts.normal > 0 && j == 0)
641                         j = 1;
642                 WRITE_ONCE(call->next_rx_timo, j);
643                 fallthrough;
644         case 2:
645                 j = msecs_to_jiffies(p.call.timeouts.idle);
646                 if (p.call.timeouts.idle > 0 && j == 0)
647                         j = 1;
648                 WRITE_ONCE(call->next_req_timo, j);
649                 fallthrough;
650         case 1:
651                 if (p.call.timeouts.hard > 0) {
652                         j = msecs_to_jiffies(p.call.timeouts.hard);
653                         now = jiffies;
654                         j += now;
655                         WRITE_ONCE(call->expect_term_by, j);
656                         rxrpc_reduce_call_timer(call, j, now,
657                                                 rxrpc_timer_set_for_hard);
658                 }
659                 break;
660         }
661
662         state = READ_ONCE(call->state);
663         _debug("CALL %d USR %lx ST %d on CONN %p",
664                call->debug_id, call->user_call_ID, state, call->conn);
665
666         if (state >= RXRPC_CALL_COMPLETE) {
667                 /* it's too late for this call */
668                 ret = -ESHUTDOWN;
669         } else if (p.command == RXRPC_CMD_SEND_ABORT) {
670                 ret = 0;
671                 if (rxrpc_abort_call("CMD", call, 0, p.abort_code, -ECONNABORTED))
672                         ret = rxrpc_send_abort_packet(call);
673         } else if (p.command != RXRPC_CMD_SEND_DATA) {
674                 ret = -EINVAL;
675         } else {
676                 ret = rxrpc_send_data(rx, call, msg, len, NULL, &dropped_lock);
677         }
678
679 out_put_unlock:
680         if (!dropped_lock)
681                 mutex_unlock(&call->user_mutex);
682 error_put:
683         rxrpc_put_call(call, rxrpc_call_put_sendmsg);
684         _leave(" = %d", ret);
685         return ret;
686
687 error_release_sock:
688         release_sock(&rx->sk);
689         return ret;
690 }
691
692 /**
693  * rxrpc_kernel_send_data - Allow a kernel service to send data on a call
694  * @sock: The socket the call is on
695  * @call: The call to send data through
696  * @msg: The data to send
697  * @len: The amount of data to send
698  * @notify_end_tx: Notification that the last packet is queued.
699  *
700  * Allow a kernel service to send data on a call.  The call must be in an state
701  * appropriate to sending data.  No control data should be supplied in @msg,
702  * nor should an address be supplied.  MSG_MORE should be flagged if there's
703  * more data to come, otherwise this data will end the transmission phase.
704  */
705 int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
706                            struct msghdr *msg, size_t len,
707                            rxrpc_notify_end_tx_t notify_end_tx)
708 {
709         bool dropped_lock = false;
710         int ret;
711
712         _enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]);
713
714         ASSERTCMP(msg->msg_name, ==, NULL);
715         ASSERTCMP(msg->msg_control, ==, NULL);
716
717         mutex_lock(&call->user_mutex);
718
719         _debug("CALL %d USR %lx ST %d on CONN %p",
720                call->debug_id, call->user_call_ID, call->state, call->conn);
721
722         switch (READ_ONCE(call->state)) {
723         case RXRPC_CALL_CLIENT_SEND_REQUEST:
724         case RXRPC_CALL_SERVER_ACK_REQUEST:
725         case RXRPC_CALL_SERVER_SEND_REPLY:
726                 ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
727                                       notify_end_tx, &dropped_lock);
728                 break;
729         case RXRPC_CALL_COMPLETE:
730                 read_lock_bh(&call->state_lock);
731                 ret = call->error;
732                 read_unlock_bh(&call->state_lock);
733                 break;
734         default:
735                 /* Request phase complete for this client call */
736                 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("late_send"));
737                 ret = -EPROTO;
738                 break;
739         }
740
741         if (!dropped_lock)
742                 mutex_unlock(&call->user_mutex);
743         _leave(" = %d", ret);
744         return ret;
745 }
746 EXPORT_SYMBOL(rxrpc_kernel_send_data);
747
748 /**
749  * rxrpc_kernel_abort_call - Allow a kernel service to abort a call
750  * @sock: The socket the call is on
751  * @call: The call to be aborted
752  * @abort_code: The abort code to stick into the ABORT packet
753  * @error: Local error value
754  * @why: 3-char string indicating why.
755  *
756  * Allow a kernel service to abort a call, if it's still in an abortable state
757  * and return true if the call was aborted, false if it was already complete.
758  */
759 bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
760                              u32 abort_code, int error, const char *why)
761 {
762         bool aborted;
763
764         _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
765
766         mutex_lock(&call->user_mutex);
767
768         aborted = rxrpc_abort_call(why, call, 0, abort_code, error);
769         if (aborted)
770                 rxrpc_send_abort_packet(call);
771
772         mutex_unlock(&call->user_mutex);
773         return aborted;
774 }
775 EXPORT_SYMBOL(rxrpc_kernel_abort_call);
776
777 /**
778  * rxrpc_kernel_set_tx_length - Set the total Tx length on a call
779  * @sock: The socket the call is on
780  * @call: The call to be informed
781  * @tx_total_len: The amount of data to be transmitted for this call
782  *
783  * Allow a kernel service to set the total transmit length on a call.  This
784  * allows buffer-to-packet encrypt-and-copy to be performed.
785  *
786  * This function is primarily for use for setting the reply length since the
787  * request length can be set when beginning the call.
788  */
789 void rxrpc_kernel_set_tx_length(struct socket *sock, struct rxrpc_call *call,
790                                 s64 tx_total_len)
791 {
792         WARN_ON(call->tx_total_len != -1);
793         call->tx_total_len = tx_total_len;
794 }
795 EXPORT_SYMBOL(rxrpc_kernel_set_tx_length);