76b1e2e89c1eeae1ce184f50d0923c80f74f16ea
[platform/kernel/linux-rpi.git] / net / rxrpc / sendmsg.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* AF_RXRPC sendmsg() implementation.
3  *
4  * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include <linux/net.h>
11 #include <linux/gfp.h>
12 #include <linux/skbuff.h>
13 #include <linux/export.h>
14 #include <linux/sched/signal.h>
15
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19
20 /*
21  * Return true if there's sufficient Tx queue space.
22  */
23 static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
24 {
25         unsigned int win_size;
26         rxrpc_seq_t tx_win = smp_load_acquire(&call->acks_hard_ack);
27
28         /* If we haven't transmitted anything for >1RTT, we should reset the
29          * congestion management state.
30          */
31         if (ktime_before(ktime_add_us(call->tx_last_sent,
32                                       call->peer->srtt_us >> 3),
33                          ktime_get_real())) {
34                 if (RXRPC_TX_SMSS > 2190)
35                         win_size = 2;
36                 else if (RXRPC_TX_SMSS > 1095)
37                         win_size = 3;
38                 else
39                         win_size = 4;
40                 win_size += call->cong_extra;
41         } else {
42                 win_size = min_t(unsigned int, call->tx_winsize,
43                                  call->cong_cwnd + call->cong_extra);
44         }
45
46         if (_tx_win)
47                 *_tx_win = tx_win;
48         return call->tx_top - tx_win < win_size;
49 }
50
51 /*
52  * Wait for space to appear in the Tx queue or a signal to occur.
53  */
54 static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
55                                          struct rxrpc_call *call,
56                                          long *timeo)
57 {
58         for (;;) {
59                 set_current_state(TASK_INTERRUPTIBLE);
60                 if (rxrpc_check_tx_space(call, NULL))
61                         return 0;
62
63                 if (call->state >= RXRPC_CALL_COMPLETE)
64                         return call->error;
65
66                 if (signal_pending(current))
67                         return sock_intr_errno(*timeo);
68
69                 if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
70                         rxrpc_shrink_call_tx_buffer(call);
71                         continue;
72                 }
73
74                 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
75                 *timeo = schedule_timeout(*timeo);
76         }
77 }
78
79 /*
80  * Wait for space to appear in the Tx queue uninterruptibly, but with
81  * a timeout of 2*RTT if no progress was made and a signal occurred.
82  */
83 static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
84                                             struct rxrpc_call *call)
85 {
86         rxrpc_seq_t tx_start, tx_win;
87         signed long rtt, timeout;
88
89         rtt = READ_ONCE(call->peer->srtt_us) >> 3;
90         rtt = usecs_to_jiffies(rtt) * 2;
91         if (rtt < 2)
92                 rtt = 2;
93
94         timeout = rtt;
95         tx_start = smp_load_acquire(&call->acks_hard_ack);
96
97         for (;;) {
98                 set_current_state(TASK_UNINTERRUPTIBLE);
99
100                 if (rxrpc_check_tx_space(call, &tx_win))
101                         return 0;
102
103                 if (call->state >= RXRPC_CALL_COMPLETE)
104                         return call->error;
105
106                 if (timeout == 0 &&
107                     tx_win == tx_start && signal_pending(current))
108                         return -EINTR;
109
110                 if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
111                         rxrpc_shrink_call_tx_buffer(call);
112                         continue;
113                 }
114
115                 if (tx_win != tx_start) {
116                         timeout = rtt;
117                         tx_start = tx_win;
118                 }
119
120                 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
121                 timeout = schedule_timeout(timeout);
122         }
123 }
124
125 /*
126  * Wait for space to appear in the Tx queue uninterruptibly.
127  */
128 static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
129                                             struct rxrpc_call *call,
130                                             long *timeo)
131 {
132         for (;;) {
133                 set_current_state(TASK_UNINTERRUPTIBLE);
134                 if (rxrpc_check_tx_space(call, NULL))
135                         return 0;
136
137                 if (call->state >= RXRPC_CALL_COMPLETE)
138                         return call->error;
139
140                 if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
141                         rxrpc_shrink_call_tx_buffer(call);
142                         continue;
143                 }
144
145                 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
146                 *timeo = schedule_timeout(*timeo);
147         }
148 }
149
150 /*
151  * wait for space to appear in the transmit/ACK window
152  * - caller holds the socket locked
153  */
154 static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
155                                     struct rxrpc_call *call,
156                                     long *timeo,
157                                     bool waitall)
158 {
159         DECLARE_WAITQUEUE(myself, current);
160         int ret;
161
162         _enter(",{%u,%u,%u,%u}",
163                call->tx_bottom, call->acks_hard_ack, call->tx_top, call->tx_winsize);
164
165         add_wait_queue(&call->waitq, &myself);
166
167         switch (call->interruptibility) {
168         case RXRPC_INTERRUPTIBLE:
169                 if (waitall)
170                         ret = rxrpc_wait_for_tx_window_waitall(rx, call);
171                 else
172                         ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
173                 break;
174         case RXRPC_PREINTERRUPTIBLE:
175         case RXRPC_UNINTERRUPTIBLE:
176         default:
177                 ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
178                 break;
179         }
180
181         remove_wait_queue(&call->waitq, &myself);
182         set_current_state(TASK_RUNNING);
183         _leave(" = %d", ret);
184         return ret;
185 }
186
187 /*
188  * Notify the owner of the call that the transmit phase is ended and the last
189  * packet has been queued.
190  */
191 static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
192                                 rxrpc_notify_end_tx_t notify_end_tx)
193 {
194         if (notify_end_tx)
195                 notify_end_tx(&rx->sk, call, call->user_call_ID);
196 }
197
198 /*
199  * Queue a DATA packet for transmission, set the resend timeout and send
200  * the packet immediately.  Returns the error from rxrpc_send_data_packet()
201  * in case the caller wants to do something with it.
202  */
203 static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
204                                struct rxrpc_txbuf *txb,
205                                rxrpc_notify_end_tx_t notify_end_tx)
206 {
207         unsigned long now;
208         rxrpc_seq_t seq = txb->seq;
209         bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags);
210         int ret;
211
212         rxrpc_inc_stat(call->rxnet, stat_tx_data);
213
214         ASSERTCMP(seq, ==, call->tx_top + 1);
215
216         /* We have to set the timestamp before queueing as the retransmit
217          * algorithm can see the packet as soon as we queue it.
218          */
219         txb->last_sent = ktime_get_real();
220
221         /* Add the packet to the call's output buffer */
222         rxrpc_get_txbuf(txb, rxrpc_txbuf_get_buffer);
223         spin_lock(&call->tx_lock);
224         list_add_tail(&txb->call_link, &call->tx_buffer);
225         call->tx_top = seq;
226         spin_unlock(&call->tx_lock);
227
228         if (last)
229                 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
230         else
231                 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
232
233         if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
234                 _debug("________awaiting reply/ACK__________");
235                 write_lock_bh(&call->state_lock);
236                 switch (call->state) {
237                 case RXRPC_CALL_CLIENT_SEND_REQUEST:
238                         call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
239                         rxrpc_notify_end_tx(rx, call, notify_end_tx);
240                         break;
241                 case RXRPC_CALL_SERVER_ACK_REQUEST:
242                         call->state = RXRPC_CALL_SERVER_SEND_REPLY;
243                         now = jiffies;
244                         WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
245                         if (call->ackr_reason == RXRPC_ACK_DELAY)
246                                 call->ackr_reason = 0;
247                         trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
248                         if (!last)
249                                 break;
250                         fallthrough;
251                 case RXRPC_CALL_SERVER_SEND_REPLY:
252                         call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
253                         rxrpc_notify_end_tx(rx, call, notify_end_tx);
254                         break;
255                 default:
256                         break;
257                 }
258                 write_unlock_bh(&call->state_lock);
259         }
260
261         if (seq == 1 && rxrpc_is_client_call(call))
262                 rxrpc_expose_client_call(call);
263
264         ret = rxrpc_send_data_packet(call, txb);
265         if (ret < 0) {
266                 switch (ret) {
267                 case -ENETUNREACH:
268                 case -EHOSTUNREACH:
269                 case -ECONNREFUSED:
270                         rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
271                                                   0, ret);
272                         goto out;
273                 }
274         } else {
275                 unsigned long now = jiffies;
276                 unsigned long resend_at = now + call->peer->rto_j;
277
278                 WRITE_ONCE(call->resend_at, resend_at);
279                 rxrpc_reduce_call_timer(call, resend_at, now,
280                                         rxrpc_timer_set_for_send);
281         }
282
283 out:
284         rxrpc_put_txbuf(txb, rxrpc_txbuf_put_trans);
285 }
286
287 /*
288  * send data through a socket
289  * - must be called in process context
290  * - The caller holds the call user access mutex, but not the socket lock.
291  */
292 static int rxrpc_send_data(struct rxrpc_sock *rx,
293                            struct rxrpc_call *call,
294                            struct msghdr *msg, size_t len,
295                            rxrpc_notify_end_tx_t notify_end_tx,
296                            bool *_dropped_lock)
297 {
298         struct rxrpc_txbuf *txb;
299         struct sock *sk = &rx->sk;
300         enum rxrpc_call_state state;
301         long timeo;
302         bool more = msg->msg_flags & MSG_MORE;
303         int ret, copied = 0;
304
305         timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
306
307         /* this should be in poll */
308         sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
309
310 reload:
311         ret = -EPIPE;
312         if (sk->sk_shutdown & SEND_SHUTDOWN)
313                 goto maybe_error;
314         state = READ_ONCE(call->state);
315         ret = -ESHUTDOWN;
316         if (state >= RXRPC_CALL_COMPLETE)
317                 goto maybe_error;
318         ret = -EPROTO;
319         if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
320             state != RXRPC_CALL_SERVER_ACK_REQUEST &&
321             state != RXRPC_CALL_SERVER_SEND_REPLY)
322                 goto maybe_error;
323
324         ret = -EMSGSIZE;
325         if (call->tx_total_len != -1) {
326                 if (len - copied > call->tx_total_len)
327                         goto maybe_error;
328                 if (!more && len - copied != call->tx_total_len)
329                         goto maybe_error;
330         }
331
332         txb = call->tx_pending;
333         call->tx_pending = NULL;
334         if (txb)
335                 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_send_more);
336
337         do {
338                 rxrpc_transmit_ack_packets(call->peer->local);
339
340                 if (!txb) {
341                         size_t remain, bufsize, chunk, offset;
342
343                         _debug("alloc");
344
345                         if (!rxrpc_check_tx_space(call, NULL))
346                                 goto wait_for_space;
347
348                         /* Work out the maximum size of a packet.  Assume that
349                          * the security header is going to be in the padded
350                          * region (enc blocksize), but the trailer is not.
351                          */
352                         remain = more ? INT_MAX : msg_data_left(msg);
353                         ret = call->conn->security->how_much_data(call, remain,
354                                                                   &bufsize, &chunk, &offset);
355                         if (ret < 0)
356                                 goto maybe_error;
357
358                         _debug("SIZE: %zu/%zu @%zu", chunk, bufsize, offset);
359
360                         /* create a buffer that we can retain until it's ACK'd */
361                         ret = -ENOMEM;
362                         txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_DATA,
363                                                 GFP_KERNEL);
364                         if (!txb)
365                                 goto maybe_error;
366
367                         txb->offset = offset;
368                         txb->space -= offset;
369                         txb->space = min_t(size_t, chunk, txb->space);
370                 }
371
372                 _debug("append");
373
374                 /* append next segment of data to the current buffer */
375                 if (msg_data_left(msg) > 0) {
376                         size_t copy = min_t(size_t, txb->space, msg_data_left(msg));
377
378                         _debug("add %zu", copy);
379                         if (!copy_from_iter_full(txb->data + txb->offset, copy,
380                                                  &msg->msg_iter))
381                                 goto efault;
382                         _debug("added");
383                         txb->space -= copy;
384                         txb->len += copy;
385                         txb->offset += copy;
386                         copied += copy;
387                         if (call->tx_total_len != -1)
388                                 call->tx_total_len -= copy;
389                 }
390
391                 /* check for the far side aborting the call or a network error
392                  * occurring */
393                 if (call->state == RXRPC_CALL_COMPLETE)
394                         goto call_terminated;
395
396                 /* add the packet to the send queue if it's now full */
397                 if (!txb->space ||
398                     (msg_data_left(msg) == 0 && !more)) {
399                         if (msg_data_left(msg) == 0 && !more) {
400                                 txb->wire.flags |= RXRPC_LAST_PACKET;
401                                 __set_bit(RXRPC_TXBUF_LAST, &txb->flags);
402                         }
403                         else if (call->tx_top - call->acks_hard_ack <
404                                  call->tx_winsize)
405                                 txb->wire.flags |= RXRPC_MORE_PACKETS;
406
407                         ret = call->security->secure_packet(call, txb);
408                         if (ret < 0)
409                                 goto out;
410
411                         rxrpc_queue_packet(rx, call, txb, notify_end_tx);
412                         txb = NULL;
413                 }
414         } while (msg_data_left(msg) > 0);
415
416 success:
417         ret = copied;
418         if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) {
419                 read_lock_bh(&call->state_lock);
420                 if (call->error < 0)
421                         ret = call->error;
422                 read_unlock_bh(&call->state_lock);
423         }
424 out:
425         call->tx_pending = txb;
426         _leave(" = %d", ret);
427         return ret;
428
429 call_terminated:
430         rxrpc_put_txbuf(txb, rxrpc_txbuf_put_send_aborted);
431         _leave(" = %d", call->error);
432         return call->error;
433
434 maybe_error:
435         if (copied)
436                 goto success;
437         goto out;
438
439 efault:
440         ret = -EFAULT;
441         goto out;
442
443 wait_for_space:
444         ret = -EAGAIN;
445         if (msg->msg_flags & MSG_DONTWAIT)
446                 goto maybe_error;
447         mutex_unlock(&call->user_mutex);
448         *_dropped_lock = true;
449         ret = rxrpc_wait_for_tx_window(rx, call, &timeo,
450                                        msg->msg_flags & MSG_WAITALL);
451         if (ret < 0)
452                 goto maybe_error;
453         if (call->interruptibility == RXRPC_INTERRUPTIBLE) {
454                 if (mutex_lock_interruptible(&call->user_mutex) < 0) {
455                         ret = sock_intr_errno(timeo);
456                         goto maybe_error;
457                 }
458         } else {
459                 mutex_lock(&call->user_mutex);
460         }
461         *_dropped_lock = false;
462         goto reload;
463 }
464
465 /*
466  * extract control messages from the sendmsg() control buffer
467  */
468 static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)
469 {
470         struct cmsghdr *cmsg;
471         bool got_user_ID = false;
472         int len;
473
474         if (msg->msg_controllen == 0)
475                 return -EINVAL;
476
477         for_each_cmsghdr(cmsg, msg) {
478                 if (!CMSG_OK(msg, cmsg))
479                         return -EINVAL;
480
481                 len = cmsg->cmsg_len - sizeof(struct cmsghdr);
482                 _debug("CMSG %d, %d, %d",
483                        cmsg->cmsg_level, cmsg->cmsg_type, len);
484
485                 if (cmsg->cmsg_level != SOL_RXRPC)
486                         continue;
487
488                 switch (cmsg->cmsg_type) {
489                 case RXRPC_USER_CALL_ID:
490                         if (msg->msg_flags & MSG_CMSG_COMPAT) {
491                                 if (len != sizeof(u32))
492                                         return -EINVAL;
493                                 p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg);
494                         } else {
495                                 if (len != sizeof(unsigned long))
496                                         return -EINVAL;
497                                 p->call.user_call_ID = *(unsigned long *)
498                                         CMSG_DATA(cmsg);
499                         }
500                         got_user_ID = true;
501                         break;
502
503                 case RXRPC_ABORT:
504                         if (p->command != RXRPC_CMD_SEND_DATA)
505                                 return -EINVAL;
506                         p->command = RXRPC_CMD_SEND_ABORT;
507                         if (len != sizeof(p->abort_code))
508                                 return -EINVAL;
509                         p->abort_code = *(unsigned int *)CMSG_DATA(cmsg);
510                         if (p->abort_code == 0)
511                                 return -EINVAL;
512                         break;
513
514                 case RXRPC_CHARGE_ACCEPT:
515                         if (p->command != RXRPC_CMD_SEND_DATA)
516                                 return -EINVAL;
517                         p->command = RXRPC_CMD_CHARGE_ACCEPT;
518                         if (len != 0)
519                                 return -EINVAL;
520                         break;
521
522                 case RXRPC_EXCLUSIVE_CALL:
523                         p->exclusive = true;
524                         if (len != 0)
525                                 return -EINVAL;
526                         break;
527
528                 case RXRPC_UPGRADE_SERVICE:
529                         p->upgrade = true;
530                         if (len != 0)
531                                 return -EINVAL;
532                         break;
533
534                 case RXRPC_TX_LENGTH:
535                         if (p->call.tx_total_len != -1 || len != sizeof(__s64))
536                                 return -EINVAL;
537                         p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg);
538                         if (p->call.tx_total_len < 0)
539                                 return -EINVAL;
540                         break;
541
542                 case RXRPC_SET_CALL_TIMEOUT:
543                         if (len & 3 || len < 4 || len > 12)
544                                 return -EINVAL;
545                         memcpy(&p->call.timeouts, CMSG_DATA(cmsg), len);
546                         p->call.nr_timeouts = len / 4;
547                         if (p->call.timeouts.hard > INT_MAX / HZ)
548                                 return -ERANGE;
549                         if (p->call.nr_timeouts >= 2 && p->call.timeouts.idle > 60 * 60 * 1000)
550                                 return -ERANGE;
551                         if (p->call.nr_timeouts >= 3 && p->call.timeouts.normal > 60 * 60 * 1000)
552                                 return -ERANGE;
553                         break;
554
555                 default:
556                         return -EINVAL;
557                 }
558         }
559
560         if (!got_user_ID)
561                 return -EINVAL;
562         if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)
563                 return -EINVAL;
564         _leave(" = 0");
565         return 0;
566 }
567
568 /*
569  * Create a new client call for sendmsg().
570  * - Called with the socket lock held, which it must release.
571  * - If it returns a call, the call's lock will need releasing by the caller.
572  */
573 static struct rxrpc_call *
574 rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
575                                   struct rxrpc_send_params *p)
576         __releases(&rx->sk.sk_lock.slock)
577         __acquires(&call->user_mutex)
578 {
579         struct rxrpc_conn_parameters cp;
580         struct rxrpc_call *call;
581         struct key *key;
582
583         DECLARE_SOCKADDR(struct sockaddr_rxrpc *, srx, msg->msg_name);
584
585         _enter("");
586
587         if (!msg->msg_name) {
588                 release_sock(&rx->sk);
589                 return ERR_PTR(-EDESTADDRREQ);
590         }
591
592         key = rx->key;
593         if (key && !rx->key->payload.data[0])
594                 key = NULL;
595
596         memset(&cp, 0, sizeof(cp));
597         cp.local                = rx->local;
598         cp.key                  = rx->key;
599         cp.security_level       = rx->min_sec_level;
600         cp.exclusive            = rx->exclusive | p->exclusive;
601         cp.upgrade              = p->upgrade;
602         cp.service_id           = srx->srx_service;
603         call = rxrpc_new_client_call(rx, &cp, srx, &p->call, GFP_KERNEL,
604                                      atomic_inc_return(&rxrpc_debug_id));
605         /* The socket is now unlocked */
606
607         rxrpc_put_peer(cp.peer, rxrpc_peer_put_discard_tmp);
608         _leave(" = %p\n", call);
609         return call;
610 }
611
612 /*
613  * send a message forming part of a client call through an RxRPC socket
614  * - caller holds the socket locked
615  * - the socket may be either a client socket or a server socket
616  */
617 int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
618         __releases(&rx->sk.sk_lock.slock)
619 {
620         enum rxrpc_call_state state;
621         struct rxrpc_call *call;
622         unsigned long now, j;
623         bool dropped_lock = false;
624         int ret;
625
626         struct rxrpc_send_params p = {
627                 .call.tx_total_len      = -1,
628                 .call.user_call_ID      = 0,
629                 .call.nr_timeouts       = 0,
630                 .call.interruptibility  = RXRPC_INTERRUPTIBLE,
631                 .abort_code             = 0,
632                 .command                = RXRPC_CMD_SEND_DATA,
633                 .exclusive              = false,
634                 .upgrade                = false,
635         };
636
637         _enter("");
638
639         ret = rxrpc_sendmsg_cmsg(msg, &p);
640         if (ret < 0)
641                 goto error_release_sock;
642
643         if (p.command == RXRPC_CMD_CHARGE_ACCEPT) {
644                 ret = -EINVAL;
645                 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
646                         goto error_release_sock;
647                 ret = rxrpc_user_charge_accept(rx, p.call.user_call_ID);
648                 goto error_release_sock;
649         }
650
651         call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);
652         if (!call) {
653                 ret = -EBADSLT;
654                 if (p.command != RXRPC_CMD_SEND_DATA)
655                         goto error_release_sock;
656                 call = rxrpc_new_client_call_for_sendmsg(rx, msg, &p);
657                 /* The socket is now unlocked... */
658                 if (IS_ERR(call))
659                         return PTR_ERR(call);
660                 /* ... and we have the call lock. */
661                 ret = 0;
662                 if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE)
663                         goto out_put_unlock;
664         } else {
665                 switch (READ_ONCE(call->state)) {
666                 case RXRPC_CALL_UNINITIALISED:
667                 case RXRPC_CALL_CLIENT_AWAIT_CONN:
668                 case RXRPC_CALL_SERVER_PREALLOC:
669                 case RXRPC_CALL_SERVER_SECURING:
670                         rxrpc_put_call(call, rxrpc_call_put_sendmsg);
671                         ret = -EBUSY;
672                         goto error_release_sock;
673                 default:
674                         break;
675                 }
676
677                 ret = mutex_lock_interruptible(&call->user_mutex);
678                 release_sock(&rx->sk);
679                 if (ret < 0) {
680                         ret = -ERESTARTSYS;
681                         goto error_put;
682                 }
683
684                 if (p.call.tx_total_len != -1) {
685                         ret = -EINVAL;
686                         if (call->tx_total_len != -1 ||
687                             call->tx_pending ||
688                             call->tx_top != 0)
689                                 goto error_put;
690                         call->tx_total_len = p.call.tx_total_len;
691                 }
692         }
693
694         switch (p.call.nr_timeouts) {
695         case 3:
696                 j = msecs_to_jiffies(p.call.timeouts.normal);
697                 if (p.call.timeouts.normal > 0 && j == 0)
698                         j = 1;
699                 WRITE_ONCE(call->next_rx_timo, j);
700                 fallthrough;
701         case 2:
702                 j = msecs_to_jiffies(p.call.timeouts.idle);
703                 if (p.call.timeouts.idle > 0 && j == 0)
704                         j = 1;
705                 WRITE_ONCE(call->next_req_timo, j);
706                 fallthrough;
707         case 1:
708                 if (p.call.timeouts.hard > 0) {
709                         j = msecs_to_jiffies(p.call.timeouts.hard);
710                         now = jiffies;
711                         j += now;
712                         WRITE_ONCE(call->expect_term_by, j);
713                         rxrpc_reduce_call_timer(call, j, now,
714                                                 rxrpc_timer_set_for_hard);
715                 }
716                 break;
717         }
718
719         state = READ_ONCE(call->state);
720         _debug("CALL %d USR %lx ST %d on CONN %p",
721                call->debug_id, call->user_call_ID, state, call->conn);
722
723         if (state >= RXRPC_CALL_COMPLETE) {
724                 /* it's too late for this call */
725                 ret = -ESHUTDOWN;
726         } else if (p.command == RXRPC_CMD_SEND_ABORT) {
727                 ret = 0;
728                 if (rxrpc_abort_call("CMD", call, 0, p.abort_code, -ECONNABORTED))
729                         ret = rxrpc_send_abort_packet(call);
730         } else if (p.command != RXRPC_CMD_SEND_DATA) {
731                 ret = -EINVAL;
732         } else {
733                 ret = rxrpc_send_data(rx, call, msg, len, NULL, &dropped_lock);
734         }
735
736 out_put_unlock:
737         if (!dropped_lock)
738                 mutex_unlock(&call->user_mutex);
739 error_put:
740         rxrpc_put_call(call, rxrpc_call_put_sendmsg);
741         _leave(" = %d", ret);
742         return ret;
743
744 error_release_sock:
745         release_sock(&rx->sk);
746         return ret;
747 }
748
749 /**
750  * rxrpc_kernel_send_data - Allow a kernel service to send data on a call
751  * @sock: The socket the call is on
752  * @call: The call to send data through
753  * @msg: The data to send
754  * @len: The amount of data to send
755  * @notify_end_tx: Notification that the last packet is queued.
756  *
757  * Allow a kernel service to send data on a call.  The call must be in an state
758  * appropriate to sending data.  No control data should be supplied in @msg,
759  * nor should an address be supplied.  MSG_MORE should be flagged if there's
760  * more data to come, otherwise this data will end the transmission phase.
761  */
762 int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
763                            struct msghdr *msg, size_t len,
764                            rxrpc_notify_end_tx_t notify_end_tx)
765 {
766         bool dropped_lock = false;
767         int ret;
768
769         _enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]);
770
771         ASSERTCMP(msg->msg_name, ==, NULL);
772         ASSERTCMP(msg->msg_control, ==, NULL);
773
774         mutex_lock(&call->user_mutex);
775
776         _debug("CALL %d USR %lx ST %d on CONN %p",
777                call->debug_id, call->user_call_ID, call->state, call->conn);
778
779         switch (READ_ONCE(call->state)) {
780         case RXRPC_CALL_CLIENT_SEND_REQUEST:
781         case RXRPC_CALL_SERVER_ACK_REQUEST:
782         case RXRPC_CALL_SERVER_SEND_REPLY:
783                 ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
784                                       notify_end_tx, &dropped_lock);
785                 break;
786         case RXRPC_CALL_COMPLETE:
787                 read_lock_bh(&call->state_lock);
788                 ret = call->error;
789                 read_unlock_bh(&call->state_lock);
790                 break;
791         default:
792                 /* Request phase complete for this client call */
793                 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("late_send"));
794                 ret = -EPROTO;
795                 break;
796         }
797
798         if (!dropped_lock)
799                 mutex_unlock(&call->user_mutex);
800         _leave(" = %d", ret);
801         return ret;
802 }
803 EXPORT_SYMBOL(rxrpc_kernel_send_data);
804
805 /**
806  * rxrpc_kernel_abort_call - Allow a kernel service to abort a call
807  * @sock: The socket the call is on
808  * @call: The call to be aborted
809  * @abort_code: The abort code to stick into the ABORT packet
810  * @error: Local error value
811  * @why: 3-char string indicating why.
812  *
813  * Allow a kernel service to abort a call, if it's still in an abortable state
814  * and return true if the call was aborted, false if it was already complete.
815  */
816 bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
817                              u32 abort_code, int error, const char *why)
818 {
819         bool aborted;
820
821         _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
822
823         mutex_lock(&call->user_mutex);
824
825         aborted = rxrpc_abort_call(why, call, 0, abort_code, error);
826         if (aborted)
827                 rxrpc_send_abort_packet(call);
828
829         mutex_unlock(&call->user_mutex);
830         return aborted;
831 }
832 EXPORT_SYMBOL(rxrpc_kernel_abort_call);
833
834 /**
835  * rxrpc_kernel_set_tx_length - Set the total Tx length on a call
836  * @sock: The socket the call is on
837  * @call: The call to be informed
838  * @tx_total_len: The amount of data to be transmitted for this call
839  *
840  * Allow a kernel service to set the total transmit length on a call.  This
841  * allows buffer-to-packet encrypt-and-copy to be performed.
842  *
843  * This function is primarily for use for setting the reply length since the
844  * request length can be set when beginning the call.
845  */
846 void rxrpc_kernel_set_tx_length(struct socket *sock, struct rxrpc_call *call,
847                                 s64 tx_total_len)
848 {
849         WARN_ON(call->tx_total_len != -1);
850         call->tx_total_len = tx_total_len;
851 }
852 EXPORT_SYMBOL(rxrpc_kernel_set_tx_length);