d83ae31930325078d052a8383150ecc5fdc7d8b5
[platform/kernel/linux-rpi.git] / net / rxrpc / io_thread.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC packet reception
3  *
4  * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include "ar-internal.h"
11
12 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
13                                       struct sockaddr_rxrpc *peer_srx,
14                                       struct sk_buff *skb);
15
16 /*
17  * handle data received on the local endpoint
18  * - may be called in interrupt context
19  *
20  * [!] Note that as this is called from the encap_rcv hook, the socket is not
21  * held locked by the caller and nothing prevents sk_user_data on the UDP from
22  * being cleared in the middle of processing this function.
23  *
24  * Called with the RCU read lock held from the IP layer via UDP.
25  */
26 int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
27 {
28         struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
29
30         if (unlikely(!local)) {
31                 kfree_skb(skb);
32                 return 0;
33         }
34         if (skb->tstamp == 0)
35                 skb->tstamp = ktime_get_real();
36
37         skb->mark = RXRPC_SKB_MARK_PACKET;
38         rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
39         skb_queue_tail(&local->rx_queue, skb);
40         rxrpc_wake_up_io_thread(local);
41         return 0;
42 }
43
44 /*
45  * Handle an error received on the local endpoint.
46  */
47 void rxrpc_error_report(struct sock *sk)
48 {
49         struct rxrpc_local *local;
50         struct sk_buff *skb;
51
52         rcu_read_lock();
53         local = rcu_dereference_sk_user_data(sk);
54         if (unlikely(!local)) {
55                 rcu_read_unlock();
56                 return;
57         }
58
59         while ((skb = skb_dequeue(&sk->sk_error_queue))) {
60                 skb->mark = RXRPC_SKB_MARK_ERROR;
61                 rxrpc_new_skb(skb, rxrpc_skb_new_error_report);
62                 skb_queue_tail(&local->rx_queue, skb);
63         }
64
65         rxrpc_wake_up_io_thread(local);
66         rcu_read_unlock();
67 }
68
69 /*
70  * Process event packets targeted at a local endpoint.
71  */
72 static void rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb)
73 {
74         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
75         char v;
76
77         _enter("");
78
79         rxrpc_see_skb(skb, rxrpc_skb_see_version);
80         if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) {
81                 if (v == 0)
82                         rxrpc_send_version_request(local, &sp->hdr, skb);
83         }
84 }
85
86 /*
87  * Extract the wire header from a packet and translate the byte order.
88  */
89 static noinline
90 int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
91 {
92         struct rxrpc_wire_header whdr;
93
94         /* dig out the RxRPC connection details */
95         if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) {
96                 trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
97                                       tracepoint_string("bad_hdr"));
98                 return -EBADMSG;
99         }
100
101         memset(sp, 0, sizeof(*sp));
102         sp->hdr.epoch           = ntohl(whdr.epoch);
103         sp->hdr.cid             = ntohl(whdr.cid);
104         sp->hdr.callNumber      = ntohl(whdr.callNumber);
105         sp->hdr.seq             = ntohl(whdr.seq);
106         sp->hdr.serial          = ntohl(whdr.serial);
107         sp->hdr.flags           = whdr.flags;
108         sp->hdr.type            = whdr.type;
109         sp->hdr.userStatus      = whdr.userStatus;
110         sp->hdr.securityIndex   = whdr.securityIndex;
111         sp->hdr._rsvd           = ntohs(whdr._rsvd);
112         sp->hdr.serviceId       = ntohs(whdr.serviceId);
113         return 0;
114 }
115
116 /*
117  * Extract the abort code from an ABORT packet and stash it in skb->priority.
118  */
119 static bool rxrpc_extract_abort(struct sk_buff *skb)
120 {
121         __be32 wtmp;
122
123         if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
124                           &wtmp, sizeof(wtmp)) < 0)
125                 return false;
126         skb->priority = ntohl(wtmp);
127         return true;
128 }
129
130 /*
131  * Process packets received on the local endpoint
132  */
133 static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
134 {
135         struct rxrpc_connection *conn;
136         struct sockaddr_rxrpc peer_srx;
137         struct rxrpc_skb_priv *sp;
138         struct rxrpc_peer *peer = NULL;
139         struct sk_buff *skb = *_skb;
140         int ret = 0;
141
142         skb_pull(skb, sizeof(struct udphdr));
143
144         sp = rxrpc_skb(skb);
145
146         /* dig out the RxRPC connection details */
147         if (rxrpc_extract_header(sp, skb) < 0)
148                 goto bad_message;
149
150         if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
151                 static int lose;
152                 if ((lose++ & 7) == 7) {
153                         trace_rxrpc_rx_lose(sp);
154                         return 0;
155                 }
156         }
157
158         trace_rxrpc_rx_packet(sp);
159
160         switch (sp->hdr.type) {
161         case RXRPC_PACKET_TYPE_VERSION:
162                 if (rxrpc_to_client(sp))
163                         return 0;
164                 rxrpc_input_version(local, skb);
165                 return 0;
166
167         case RXRPC_PACKET_TYPE_BUSY:
168                 if (rxrpc_to_server(sp))
169                         return 0;
170                 fallthrough;
171         case RXRPC_PACKET_TYPE_ACK:
172         case RXRPC_PACKET_TYPE_ACKALL:
173                 if (sp->hdr.callNumber == 0)
174                         goto bad_message;
175                 break;
176         case RXRPC_PACKET_TYPE_ABORT:
177                 if (!rxrpc_extract_abort(skb))
178                         return 0; /* Just discard if malformed */
179                 break;
180
181         case RXRPC_PACKET_TYPE_DATA:
182                 if (sp->hdr.callNumber == 0 ||
183                     sp->hdr.seq == 0)
184                         goto bad_message;
185
186                 /* Unshare the packet so that it can be modified for in-place
187                  * decryption.
188                  */
189                 if (sp->hdr.securityIndex != 0) {
190                         skb = skb_unshare(skb, GFP_ATOMIC);
191                         if (!skb) {
192                                 rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare_nomem);
193                                 *_skb = NULL;
194                                 return 0;
195                         }
196
197                         if (skb != *_skb) {
198                                 rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare);
199                                 *_skb = skb;
200                                 rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
201                                 sp = rxrpc_skb(skb);
202                         }
203                 }
204                 break;
205
206         case RXRPC_PACKET_TYPE_CHALLENGE:
207                 if (rxrpc_to_server(sp))
208                         return 0;
209                 break;
210         case RXRPC_PACKET_TYPE_RESPONSE:
211                 if (rxrpc_to_client(sp))
212                         return 0;
213                 break;
214
215                 /* Packet types 9-11 should just be ignored. */
216         case RXRPC_PACKET_TYPE_PARAMS:
217         case RXRPC_PACKET_TYPE_10:
218         case RXRPC_PACKET_TYPE_11:
219                 return 0;
220
221         default:
222                 goto bad_message;
223         }
224
225         if (sp->hdr.serviceId == 0)
226                 goto bad_message;
227
228         if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0))
229                 return true; /* Unsupported address type - discard. */
230
231         if (peer_srx.transport.family != local->srx.transport.family &&
232             (peer_srx.transport.family == AF_INET &&
233              local->srx.transport.family != AF_INET6)) {
234                 pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n",
235                                     peer_srx.transport.family,
236                                     local->srx.transport.family);
237                 return true; /* Wrong address type - discard. */
238         }
239
240         if (rxrpc_to_client(sp)) {
241                 rcu_read_lock();
242                 conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb);
243                 conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
244                 rcu_read_unlock();
245                 if (!conn) {
246                         trace_rxrpc_abort(0, "NCC", sp->hdr.cid,
247                                           sp->hdr.callNumber, sp->hdr.seq,
248                                           RXKADINCONSISTENCY, EBADMSG);
249                         goto protocol_error;
250                 }
251
252                 ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
253                 rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
254                 return ret;
255         }
256
257         /* We need to look up service connections by the full protocol
258          * parameter set.  We look up the peer first as an intermediate step
259          * and then the connection from the peer's tree.
260          */
261         rcu_read_lock();
262
263         peer = rxrpc_lookup_peer_rcu(local, &peer_srx);
264         if (!peer) {
265                 rcu_read_unlock();
266                 return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb);
267         }
268
269         conn = rxrpc_find_service_conn_rcu(peer, skb);
270         conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
271         if (conn) {
272                 rcu_read_unlock();
273                 ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
274                 rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
275                 return ret;
276         }
277
278         peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input);
279         rcu_read_unlock();
280
281         ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb);
282         rxrpc_put_peer(peer, rxrpc_peer_put_input);
283         if (ret < 0)
284                 goto reject_packet;
285         return 0;
286
287 bad_message:
288         trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
289                           RX_PROTOCOL_ERROR, EBADMSG);
290 protocol_error:
291         skb->priority = RX_PROTOCOL_ERROR;
292         skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
293 reject_packet:
294         rxrpc_reject_packet(local, skb);
295         return ret;
296 }
297
298 /*
299  * Deal with a packet that's associated with an extant connection.
300  */
301 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
302                                       struct sockaddr_rxrpc *peer_srx,
303                                       struct sk_buff *skb)
304 {
305         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
306         struct rxrpc_channel *chan;
307         struct rxrpc_call *call = NULL;
308         unsigned int channel;
309
310         if (sp->hdr.securityIndex != conn->security_ix)
311                 goto wrong_security;
312
313         if (sp->hdr.serviceId != conn->service_id) {
314                 int old_id;
315
316                 if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
317                         goto reupgrade;
318                 old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
319                                  sp->hdr.serviceId);
320
321                 if (old_id != conn->orig_service_id &&
322                     old_id != sp->hdr.serviceId)
323                         goto reupgrade;
324         }
325
326         if (after(sp->hdr.serial, conn->hi_serial))
327                 conn->hi_serial = sp->hdr.serial;
328
329         /* It's a connection-level packet if the call number is 0. */
330         if (sp->hdr.callNumber == 0)
331                 return rxrpc_input_conn_packet(conn, skb);
332
333         /* Call-bound packets are routed by connection channel. */
334         channel = sp->hdr.cid & RXRPC_CHANNELMASK;
335         chan = &conn->channels[channel];
336
337         /* Ignore really old calls */
338         if (sp->hdr.callNumber < chan->last_call)
339                 return 0;
340
341         if (sp->hdr.callNumber == chan->last_call) {
342                 if (chan->call ||
343                     sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
344                         return 0;
345
346                 /* For the previous service call, if completed successfully, we
347                  * discard all further packets.
348                  */
349                 if (rxrpc_conn_is_service(conn) &&
350                     chan->last_type == RXRPC_PACKET_TYPE_ACK)
351                         return 0;
352
353                 /* But otherwise we need to retransmit the final packet from
354                  * data cached in the connection record.
355                  */
356                 if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
357                         trace_rxrpc_rx_data(chan->call_debug_id,
358                                             sp->hdr.seq,
359                                             sp->hdr.serial,
360                                             sp->hdr.flags);
361                 rxrpc_input_conn_packet(conn, skb);
362                 return 0;
363         }
364
365         rcu_read_lock();
366         call = rxrpc_try_get_call(rcu_dereference(chan->call),
367                                   rxrpc_call_get_input);
368         rcu_read_unlock();
369
370         if (sp->hdr.callNumber > chan->call_id) {
371                 if (rxrpc_to_client(sp)) {
372                         rxrpc_put_call(call, rxrpc_call_put_input);
373                         goto reject_packet;
374                 }
375
376                 if (call) {
377                         rxrpc_implicit_end_call(call, skb);
378                         rxrpc_put_call(call, rxrpc_call_put_input);
379                         call = NULL;
380                 }
381         }
382
383         if (!call) {
384                 if (rxrpc_to_client(sp))
385                         goto bad_message;
386                 if (rxrpc_new_incoming_call(conn->local, conn->peer, conn,
387                                             peer_srx, skb))
388                         return 0;
389                 goto reject_packet;
390         }
391
392         rxrpc_input_call_event(call, skb);
393         rxrpc_put_call(call, rxrpc_call_put_input);
394         return 0;
395
396 wrong_security:
397         trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
398                           RXKADINCONSISTENCY, EBADMSG);
399         skb->priority = RXKADINCONSISTENCY;
400         goto post_abort;
401
402 reupgrade:
403         trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
404                           RX_PROTOCOL_ERROR, EBADMSG);
405         goto protocol_error;
406
407 bad_message:
408         trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
409                           RX_PROTOCOL_ERROR, EBADMSG);
410 protocol_error:
411         skb->priority = RX_PROTOCOL_ERROR;
412 post_abort:
413         skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
414 reject_packet:
415         rxrpc_reject_packet(conn->local, skb);
416         return 0;
417 }
418
419 /*
420  * I/O and event handling thread.
421  */
422 int rxrpc_io_thread(void *data)
423 {
424         struct sk_buff_head rx_queue;
425         struct rxrpc_local *local = data;
426         struct rxrpc_call *call;
427         struct sk_buff *skb;
428
429         skb_queue_head_init(&rx_queue);
430
431         set_user_nice(current, MIN_NICE);
432
433         for (;;) {
434                 rxrpc_inc_stat(local->rxnet, stat_io_loop);
435
436                 /* Deal with calls that want immediate attention. */
437                 if ((call = list_first_entry_or_null(&local->call_attend_q,
438                                                      struct rxrpc_call,
439                                                      attend_link))) {
440                         spin_lock_bh(&local->lock);
441                         list_del_init(&call->attend_link);
442                         spin_unlock_bh(&local->lock);
443
444                         trace_rxrpc_call_poked(call);
445                         rxrpc_input_call_event(call, NULL);
446                         rxrpc_put_call(call, rxrpc_call_put_poke);
447                         continue;
448                 }
449
450                 /* Process received packets and errors. */
451                 if ((skb = __skb_dequeue(&rx_queue))) {
452                         switch (skb->mark) {
453                         case RXRPC_SKB_MARK_PACKET:
454                                 skb->priority = 0;
455                                 rxrpc_input_packet(local, &skb);
456                                 trace_rxrpc_rx_done(skb->mark, skb->priority);
457                                 rxrpc_free_skb(skb, rxrpc_skb_put_input);
458                                 break;
459                         case RXRPC_SKB_MARK_ERROR:
460                                 rxrpc_input_error(local, skb);
461                                 rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
462                                 break;
463                         default:
464                                 WARN_ON_ONCE(1);
465                                 rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
466                                 break;
467                         }
468                         continue;
469                 }
470
471                 if (!skb_queue_empty(&local->rx_queue)) {
472                         spin_lock_irq(&local->rx_queue.lock);
473                         skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
474                         spin_unlock_irq(&local->rx_queue.lock);
475                         continue;
476                 }
477
478                 set_current_state(TASK_INTERRUPTIBLE);
479                 if (!skb_queue_empty(&local->rx_queue) ||
480                     !list_empty(&local->call_attend_q)) {
481                         __set_current_state(TASK_RUNNING);
482                         continue;
483                 }
484
485                 if (kthread_should_stop())
486                         break;
487                 schedule();
488         }
489
490         __set_current_state(TASK_RUNNING);
491         rxrpc_see_local(local, rxrpc_local_stop);
492         rxrpc_destroy_local(local);
493         local->io_thread = NULL;
494         rxrpc_see_local(local, rxrpc_local_stopped);
495         return 0;
496 }