rxrpc: Split out the call state changing functions into their own file
[platform/kernel/linux-rpi.git] / net / rxrpc / recvmsg.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25         struct rxrpc_sock *rx;
26         struct sock *sk;
27
28         _enter("%d", call->debug_id);
29
30         if (!list_empty(&call->recvmsg_link))
31                 return;
32
33         rcu_read_lock();
34
35         rx = rcu_dereference(call->socket);
36         sk = &rx->sk;
37         if (rx && sk->sk_state < RXRPC_CLOSE) {
38                 if (call->notify_rx) {
39                         spin_lock(&call->notify_lock);
40                         call->notify_rx(sk, call, call->user_call_ID);
41                         spin_unlock(&call->notify_lock);
42                 } else {
43                         write_lock(&rx->recvmsg_lock);
44                         if (list_empty(&call->recvmsg_link)) {
45                                 rxrpc_get_call(call, rxrpc_call_get_notify_socket);
46                                 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
47                         }
48                         write_unlock(&rx->recvmsg_lock);
49
50                         if (!sock_flag(sk, SOCK_DEAD)) {
51                                 _debug("call %ps", sk->sk_data_ready);
52                                 sk->sk_data_ready(sk);
53                         }
54                 }
55         }
56
57         rcu_read_unlock();
58         _leave("");
59 }
60
61 /*
62  * Pass a call terminating message to userspace.
63  */
64 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
65 {
66         u32 tmp = 0;
67         int ret;
68
69         switch (call->completion) {
70         case RXRPC_CALL_SUCCEEDED:
71                 ret = 0;
72                 if (rxrpc_is_service_call(call))
73                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
74                 break;
75         case RXRPC_CALL_REMOTELY_ABORTED:
76                 tmp = call->abort_code;
77                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
78                 break;
79         case RXRPC_CALL_LOCALLY_ABORTED:
80                 tmp = call->abort_code;
81                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82                 break;
83         case RXRPC_CALL_NETWORK_ERROR:
84                 tmp = -call->error;
85                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
86                 break;
87         case RXRPC_CALL_LOCAL_ERROR:
88                 tmp = -call->error;
89                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
90                 break;
91         default:
92                 pr_err("Invalid terminal call state %u\n", call->state);
93                 BUG();
94                 break;
95         }
96
97         trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
98                              lower_32_bits(atomic64_read(&call->ackr_window)) - 1,
99                              call->rx_pkt_offset, call->rx_pkt_len, ret);
100         return ret;
101 }
102
103 /*
104  * End the packet reception phase.
105  */
106 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
107 {
108         rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
109
110         _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
111
112         trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
113
114         if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
115                 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
116
117         write_lock(&call->state_lock);
118
119         switch (call->state) {
120         case RXRPC_CALL_CLIENT_RECV_REPLY:
121                 __rxrpc_call_completed(call);
122                 write_unlock(&call->state_lock);
123                 rxrpc_poke_call(call, rxrpc_call_poke_complete);
124                 break;
125
126         case RXRPC_CALL_SERVER_RECV_REQUEST:
127                 call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
128                 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
129                 write_unlock(&call->state_lock);
130                 rxrpc_propose_delay_ACK(call, serial,
131                                         rxrpc_propose_ack_processing_op);
132                 break;
133         default:
134                 write_unlock(&call->state_lock);
135                 break;
136         }
137 }
138
139 /*
140  * Discard a packet we've used up and advance the Rx window by one.
141  */
142 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
143 {
144         struct rxrpc_skb_priv *sp;
145         struct sk_buff *skb;
146         rxrpc_serial_t serial;
147         rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
148         bool last;
149         int acked;
150
151         _enter("%d", call->debug_id);
152
153         skb = skb_dequeue(&call->recvmsg_queue);
154         rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
155
156         sp = rxrpc_skb(skb);
157         tseq   = sp->hdr.seq;
158         serial = sp->hdr.serial;
159         last   = sp->hdr.flags & RXRPC_LAST_PACKET;
160
161         /* Barrier against rxrpc_input_data(). */
162         if (after(tseq, call->rx_consumed))
163                 smp_store_release(&call->rx_consumed, tseq);
164
165         rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
166
167         trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
168                             serial, call->rx_consumed);
169         if (last) {
170                 rxrpc_end_rx_phase(call, serial);
171                 return;
172         }
173
174         /* Check to see if there's an ACK that needs sending. */
175         acked = atomic_add_return(call->rx_consumed - old_consumed,
176                                   &call->ackr_nr_consumed);
177         if (acked > 2 &&
178             !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
179                 rxrpc_poke_call(call, rxrpc_call_poke_idle);
180 }
181
182 /*
183  * Decrypt and verify a DATA packet.
184  */
185 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
186 {
187         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
188
189         if (sp->flags & RXRPC_RX_VERIFIED)
190                 return 0;
191         return call->security->verify_packet(call, skb);
192 }
193
194 /*
195  * Deliver messages to a call.  This keeps processing packets until the buffer
196  * is filled and we find either more DATA (returns 0) or the end of the DATA
197  * (returns 1).  If more packets are required, it returns -EAGAIN.
198  */
199 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
200                               struct msghdr *msg, struct iov_iter *iter,
201                               size_t len, int flags, size_t *_offset)
202 {
203         struct rxrpc_skb_priv *sp;
204         struct sk_buff *skb;
205         rxrpc_seq_t seq = 0;
206         size_t remain;
207         unsigned int rx_pkt_offset, rx_pkt_len;
208         int copy, ret = -EAGAIN, ret2;
209
210         rx_pkt_offset = call->rx_pkt_offset;
211         rx_pkt_len = call->rx_pkt_len;
212
213         if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
214                 seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
215                 ret = 1;
216                 goto done;
217         }
218
219         /* No one else can be removing stuff from the queue, so we shouldn't
220          * need the Rx lock to walk it.
221          */
222         skb = skb_peek(&call->recvmsg_queue);
223         while (skb) {
224                 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
225                 sp = rxrpc_skb(skb);
226                 seq = sp->hdr.seq;
227
228                 if (!(flags & MSG_PEEK))
229                         trace_rxrpc_receive(call, rxrpc_receive_front,
230                                             sp->hdr.serial, seq);
231
232                 if (msg)
233                         sock_recv_timestamp(msg, sock->sk, skb);
234
235                 if (rx_pkt_offset == 0) {
236                         ret2 = rxrpc_verify_data(call, skb);
237                         rx_pkt_offset = sp->offset;
238                         rx_pkt_len = sp->len;
239                         trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
240                                              rx_pkt_offset, rx_pkt_len, ret2);
241                         if (ret2 < 0) {
242                                 ret = ret2;
243                                 goto out;
244                         }
245                 } else {
246                         trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
247                                              rx_pkt_offset, rx_pkt_len, 0);
248                 }
249
250                 /* We have to handle short, empty and used-up DATA packets. */
251                 remain = len - *_offset;
252                 copy = rx_pkt_len;
253                 if (copy > remain)
254                         copy = remain;
255                 if (copy > 0) {
256                         ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
257                                                       copy);
258                         if (ret2 < 0) {
259                                 ret = ret2;
260                                 goto out;
261                         }
262
263                         /* handle piecemeal consumption of data packets */
264                         rx_pkt_offset += copy;
265                         rx_pkt_len -= copy;
266                         *_offset += copy;
267                 }
268
269                 if (rx_pkt_len > 0) {
270                         trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
271                                              rx_pkt_offset, rx_pkt_len, 0);
272                         ASSERTCMP(*_offset, ==, len);
273                         ret = 0;
274                         break;
275                 }
276
277                 /* The whole packet has been transferred. */
278                 if (sp->hdr.flags & RXRPC_LAST_PACKET)
279                         ret = 1;
280                 rx_pkt_offset = 0;
281                 rx_pkt_len = 0;
282
283                 skb = skb_peek_next(skb, &call->recvmsg_queue);
284
285                 if (!(flags & MSG_PEEK))
286                         rxrpc_rotate_rx_window(call);
287         }
288
289 out:
290         if (!(flags & MSG_PEEK)) {
291                 call->rx_pkt_offset = rx_pkt_offset;
292                 call->rx_pkt_len = rx_pkt_len;
293         }
294 done:
295         trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
296                              rx_pkt_offset, rx_pkt_len, ret);
297         if (ret == -EAGAIN)
298                 set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
299         return ret;
300 }
301
302 /*
303  * Receive a message from an RxRPC socket
304  * - we need to be careful about two or more threads calling recvmsg
305  *   simultaneously
306  */
307 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
308                   int flags)
309 {
310         struct rxrpc_call *call;
311         struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
312         struct list_head *l;
313         unsigned int call_debug_id = 0;
314         size_t copied = 0;
315         long timeo;
316         int ret;
317
318         DEFINE_WAIT(wait);
319
320         trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
321
322         if (flags & (MSG_OOB | MSG_TRUNC))
323                 return -EOPNOTSUPP;
324
325         timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
326
327 try_again:
328         lock_sock(&rx->sk);
329
330         /* Return immediately if a client socket has no outstanding calls */
331         if (RB_EMPTY_ROOT(&rx->calls) &&
332             list_empty(&rx->recvmsg_q) &&
333             rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
334                 release_sock(&rx->sk);
335                 return -EAGAIN;
336         }
337
338         if (list_empty(&rx->recvmsg_q)) {
339                 ret = -EWOULDBLOCK;
340                 if (timeo == 0) {
341                         call = NULL;
342                         goto error_no_call;
343                 }
344
345                 release_sock(&rx->sk);
346
347                 /* Wait for something to happen */
348                 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
349                                           TASK_INTERRUPTIBLE);
350                 ret = sock_error(&rx->sk);
351                 if (ret)
352                         goto wait_error;
353
354                 if (list_empty(&rx->recvmsg_q)) {
355                         if (signal_pending(current))
356                                 goto wait_interrupted;
357                         trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
358                         timeo = schedule_timeout(timeo);
359                 }
360                 finish_wait(sk_sleep(&rx->sk), &wait);
361                 goto try_again;
362         }
363
364         /* Find the next call and dequeue it if we're not just peeking.  If we
365          * do dequeue it, that comes with a ref that we will need to release.
366          */
367         write_lock(&rx->recvmsg_lock);
368         l = rx->recvmsg_q.next;
369         call = list_entry(l, struct rxrpc_call, recvmsg_link);
370         if (!(flags & MSG_PEEK))
371                 list_del_init(&call->recvmsg_link);
372         else
373                 rxrpc_get_call(call, rxrpc_call_get_recvmsg);
374         write_unlock(&rx->recvmsg_lock);
375
376         call_debug_id = call->debug_id;
377         trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
378
379         /* We're going to drop the socket lock, so we need to lock the call
380          * against interference by sendmsg.
381          */
382         if (!mutex_trylock(&call->user_mutex)) {
383                 ret = -EWOULDBLOCK;
384                 if (flags & MSG_DONTWAIT)
385                         goto error_requeue_call;
386                 ret = -ERESTARTSYS;
387                 if (mutex_lock_interruptible(&call->user_mutex) < 0)
388                         goto error_requeue_call;
389         }
390
391         release_sock(&rx->sk);
392
393         if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
394                 BUG();
395
396         if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
397                 if (flags & MSG_CMSG_COMPAT) {
398                         unsigned int id32 = call->user_call_ID;
399
400                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
401                                        sizeof(unsigned int), &id32);
402                 } else {
403                         unsigned long idl = call->user_call_ID;
404
405                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
406                                        sizeof(unsigned long), &idl);
407                 }
408                 if (ret < 0)
409                         goto error_unlock_call;
410         }
411
412         if (msg->msg_name && call->peer) {
413                 size_t len = sizeof(call->dest_srx);
414
415                 memcpy(msg->msg_name, &call->dest_srx, len);
416                 msg->msg_namelen = len;
417         }
418
419         switch (READ_ONCE(call->state)) {
420         case RXRPC_CALL_CLIENT_RECV_REPLY:
421         case RXRPC_CALL_SERVER_RECV_REQUEST:
422         case RXRPC_CALL_SERVER_ACK_REQUEST:
423                 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
424                                          flags, &copied);
425                 if (ret == -EAGAIN)
426                         ret = 0;
427
428                 if (!skb_queue_empty(&call->recvmsg_queue))
429                         rxrpc_notify_socket(call);
430                 break;
431         default:
432                 ret = 0;
433                 break;
434         }
435
436         if (ret < 0)
437                 goto error_unlock_call;
438
439         if (call->state == RXRPC_CALL_COMPLETE) {
440                 ret = rxrpc_recvmsg_term(call, msg);
441                 if (ret < 0)
442                         goto error_unlock_call;
443                 if (!(flags & MSG_PEEK))
444                         rxrpc_release_call(rx, call);
445                 msg->msg_flags |= MSG_EOR;
446                 ret = 1;
447         }
448
449         if (ret == 0)
450                 msg->msg_flags |= MSG_MORE;
451         else
452                 msg->msg_flags &= ~MSG_MORE;
453         ret = copied;
454
455 error_unlock_call:
456         mutex_unlock(&call->user_mutex);
457         rxrpc_put_call(call, rxrpc_call_put_recvmsg);
458         trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
459         return ret;
460
461 error_requeue_call:
462         if (!(flags & MSG_PEEK)) {
463                 write_lock(&rx->recvmsg_lock);
464                 list_add(&call->recvmsg_link, &rx->recvmsg_q);
465                 write_unlock(&rx->recvmsg_lock);
466                 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
467         } else {
468                 rxrpc_put_call(call, rxrpc_call_put_recvmsg);
469         }
470 error_no_call:
471         release_sock(&rx->sk);
472 error_trace:
473         trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
474         return ret;
475
476 wait_interrupted:
477         ret = sock_intr_errno(timeo);
478 wait_error:
479         finish_wait(sk_sleep(&rx->sk), &wait);
480         call = NULL;
481         goto error_trace;
482 }
483
484 /**
485  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
486  * @sock: The socket that the call exists on
487  * @call: The call to send data through
488  * @iter: The buffer to receive into
489  * @_len: The amount of data we want to receive (decreased on return)
490  * @want_more: True if more data is expected to be read
491  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
492  * @_service: Where to store the actual service ID (may be upgraded)
493  *
494  * Allow a kernel service to receive data and pick up information about the
495  * state of a call.  Returns 0 if got what was asked for and there's more
496  * available, 1 if we got what was asked for and we're at the end of the data
497  * and -EAGAIN if we need more data.
498  *
499  * Note that we may return -EAGAIN to drain empty packets at the end of the
500  * data, even if we've already copied over the requested data.
501  *
502  * *_abort should also be initialised to 0.
503  */
504 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
505                            struct iov_iter *iter, size_t *_len,
506                            bool want_more, u32 *_abort, u16 *_service)
507 {
508         size_t offset = 0;
509         int ret;
510
511         _enter("{%d,%s},%zu,%d",
512                call->debug_id, rxrpc_call_states[call->state],
513                *_len, want_more);
514
515         ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING);
516
517         mutex_lock(&call->user_mutex);
518
519         switch (READ_ONCE(call->state)) {
520         case RXRPC_CALL_CLIENT_RECV_REPLY:
521         case RXRPC_CALL_SERVER_RECV_REQUEST:
522         case RXRPC_CALL_SERVER_ACK_REQUEST:
523                 ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
524                                          *_len, 0, &offset);
525                 *_len -= offset;
526                 if (ret < 0)
527                         goto out;
528
529                 /* We can only reach here with a partially full buffer if we
530                  * have reached the end of the data.  We must otherwise have a
531                  * full buffer or have been given -EAGAIN.
532                  */
533                 if (ret == 1) {
534                         if (iov_iter_count(iter) > 0)
535                                 goto short_data;
536                         if (!want_more)
537                                 goto read_phase_complete;
538                         ret = 0;
539                         goto out;
540                 }
541
542                 if (!want_more)
543                         goto excess_data;
544                 goto out;
545
546         case RXRPC_CALL_COMPLETE:
547                 goto call_complete;
548
549         default:
550                 ret = -EINPROGRESS;
551                 goto out;
552         }
553
554 read_phase_complete:
555         ret = 1;
556 out:
557         if (_service)
558                 *_service = call->dest_srx.srx_service;
559         mutex_unlock(&call->user_mutex);
560         _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
561         return ret;
562
563 short_data:
564         trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
565                           call->cid, call->call_id, call->rx_consumed,
566                           0, -EBADMSG);
567         ret = -EBADMSG;
568         goto out;
569 excess_data:
570         trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
571                           call->cid, call->call_id, call->rx_consumed,
572                           0, -EMSGSIZE);
573         ret = -EMSGSIZE;
574         goto out;
575 call_complete:
576         *_abort = call->abort_code;
577         ret = call->error;
578         if (call->completion == RXRPC_CALL_SUCCEEDED) {
579                 ret = 1;
580                 if (iov_iter_count(iter) > 0)
581                         ret = -ECONNRESET;
582         }
583         goto out;
584 }
585 EXPORT_SYMBOL(rxrpc_kernel_recv_data);