rxrpc: Split the receive code
[platform/kernel/linux-rpi.git] / net / rxrpc / input.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* Processing of received RxRPC packets
3  *
4  * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include "ar-internal.h"
11
12 static void rxrpc_proto_abort(const char *why,
13                               struct rxrpc_call *call, rxrpc_seq_t seq)
14 {
15         if (rxrpc_abort_call(why, call, seq, RX_PROTOCOL_ERROR, -EBADMSG)) {
16                 set_bit(RXRPC_CALL_EV_ABORT, &call->events);
17                 rxrpc_queue_call(call, rxrpc_call_queue_abort);
18         }
19 }
20
21 /*
22  * Do TCP-style congestion management [RFC 5681].
23  */
24 static void rxrpc_congestion_management(struct rxrpc_call *call,
25                                         struct sk_buff *skb,
26                                         struct rxrpc_ack_summary *summary,
27                                         rxrpc_serial_t acked_serial)
28 {
29         enum rxrpc_congest_change change = rxrpc_cong_no_change;
30         unsigned int cumulative_acks = call->cong_cumul_acks;
31         unsigned int cwnd = call->cong_cwnd;
32         bool resend = false;
33
34         summary->flight_size =
35                 (call->tx_top - call->acks_hard_ack) - summary->nr_acks;
36
37         if (test_and_clear_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags)) {
38                 summary->retrans_timeo = true;
39                 call->cong_ssthresh = max_t(unsigned int,
40                                             summary->flight_size / 2, 2);
41                 cwnd = 1;
42                 if (cwnd >= call->cong_ssthresh &&
43                     call->cong_mode == RXRPC_CALL_SLOW_START) {
44                         call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
45                         call->cong_tstamp = skb->tstamp;
46                         cumulative_acks = 0;
47                 }
48         }
49
50         cumulative_acks += summary->nr_new_acks;
51         cumulative_acks += summary->nr_rot_new_acks;
52         if (cumulative_acks > 255)
53                 cumulative_acks = 255;
54
55         summary->mode = call->cong_mode;
56         summary->cwnd = call->cong_cwnd;
57         summary->ssthresh = call->cong_ssthresh;
58         summary->cumulative_acks = cumulative_acks;
59         summary->dup_acks = call->cong_dup_acks;
60
61         /* If we haven't transmitted anything for >1RTT, we should reset the
62          * congestion management state.
63          */
64         if ((call->cong_mode == RXRPC_CALL_SLOW_START ||
65              call->cong_mode == RXRPC_CALL_CONGEST_AVOIDANCE) &&
66             ktime_before(ktime_add_us(call->tx_last_sent,
67                                       call->peer->srtt_us >> 3),
68                          ktime_get_real())
69             ) {
70                 change = rxrpc_cong_idle_reset;
71                 summary->mode = RXRPC_CALL_SLOW_START;
72                 if (RXRPC_TX_SMSS > 2190)
73                         summary->cwnd = 2;
74                 else if (RXRPC_TX_SMSS > 1095)
75                         summary->cwnd = 3;
76                 else
77                         summary->cwnd = 4;
78         }
79
80         switch (call->cong_mode) {
81         case RXRPC_CALL_SLOW_START:
82                 if (summary->saw_nacks)
83                         goto packet_loss_detected;
84                 if (summary->cumulative_acks > 0)
85                         cwnd += 1;
86                 if (cwnd >= call->cong_ssthresh) {
87                         call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
88                         call->cong_tstamp = skb->tstamp;
89                 }
90                 goto out;
91
92         case RXRPC_CALL_CONGEST_AVOIDANCE:
93                 if (summary->saw_nacks)
94                         goto packet_loss_detected;
95
96                 /* We analyse the number of packets that get ACK'd per RTT
97                  * period and increase the window if we managed to fill it.
98                  */
99                 if (call->peer->rtt_count == 0)
100                         goto out;
101                 if (ktime_before(skb->tstamp,
102                                  ktime_add_us(call->cong_tstamp,
103                                               call->peer->srtt_us >> 3)))
104                         goto out_no_clear_ca;
105                 change = rxrpc_cong_rtt_window_end;
106                 call->cong_tstamp = skb->tstamp;
107                 if (cumulative_acks >= cwnd)
108                         cwnd++;
109                 goto out;
110
111         case RXRPC_CALL_PACKET_LOSS:
112                 if (!summary->saw_nacks)
113                         goto resume_normality;
114
115                 if (summary->new_low_nack) {
116                         change = rxrpc_cong_new_low_nack;
117                         call->cong_dup_acks = 1;
118                         if (call->cong_extra > 1)
119                                 call->cong_extra = 1;
120                         goto send_extra_data;
121                 }
122
123                 call->cong_dup_acks++;
124                 if (call->cong_dup_acks < 3)
125                         goto send_extra_data;
126
127                 change = rxrpc_cong_begin_retransmission;
128                 call->cong_mode = RXRPC_CALL_FAST_RETRANSMIT;
129                 call->cong_ssthresh = max_t(unsigned int,
130                                             summary->flight_size / 2, 2);
131                 cwnd = call->cong_ssthresh + 3;
132                 call->cong_extra = 0;
133                 call->cong_dup_acks = 0;
134                 resend = true;
135                 goto out;
136
137         case RXRPC_CALL_FAST_RETRANSMIT:
138                 if (!summary->new_low_nack) {
139                         if (summary->nr_new_acks == 0)
140                                 cwnd += 1;
141                         call->cong_dup_acks++;
142                         if (call->cong_dup_acks == 2) {
143                                 change = rxrpc_cong_retransmit_again;
144                                 call->cong_dup_acks = 0;
145                                 resend = true;
146                         }
147                 } else {
148                         change = rxrpc_cong_progress;
149                         cwnd = call->cong_ssthresh;
150                         if (!summary->saw_nacks)
151                                 goto resume_normality;
152                 }
153                 goto out;
154
155         default:
156                 BUG();
157                 goto out;
158         }
159
160 resume_normality:
161         change = rxrpc_cong_cleared_nacks;
162         call->cong_dup_acks = 0;
163         call->cong_extra = 0;
164         call->cong_tstamp = skb->tstamp;
165         if (cwnd < call->cong_ssthresh)
166                 call->cong_mode = RXRPC_CALL_SLOW_START;
167         else
168                 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
169 out:
170         cumulative_acks = 0;
171 out_no_clear_ca:
172         if (cwnd >= RXRPC_TX_MAX_WINDOW)
173                 cwnd = RXRPC_TX_MAX_WINDOW;
174         call->cong_cwnd = cwnd;
175         call->cong_cumul_acks = cumulative_acks;
176         trace_rxrpc_congest(call, summary, acked_serial, change);
177         if (resend && !test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events))
178                 rxrpc_queue_call(call, rxrpc_call_queue_resend);
179         return;
180
181 packet_loss_detected:
182         change = rxrpc_cong_saw_nack;
183         call->cong_mode = RXRPC_CALL_PACKET_LOSS;
184         call->cong_dup_acks = 0;
185         goto send_extra_data;
186
187 send_extra_data:
188         /* Send some previously unsent DATA if we have some to advance the ACK
189          * state.
190          */
191         if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) ||
192             summary->nr_acks != call->tx_top - call->acks_hard_ack) {
193                 call->cong_extra++;
194                 wake_up(&call->waitq);
195         }
196         goto out_no_clear_ca;
197 }
198
199 /*
200  * Apply a hard ACK by advancing the Tx window.
201  */
202 static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
203                                    struct rxrpc_ack_summary *summary)
204 {
205         struct rxrpc_txbuf *txb;
206         bool rot_last = false;
207
208         list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) {
209                 if (before_eq(txb->seq, call->acks_hard_ack))
210                         continue;
211                 summary->nr_rot_new_acks++;
212                 if (test_bit(RXRPC_TXBUF_LAST, &txb->flags)) {
213                         set_bit(RXRPC_CALL_TX_LAST, &call->flags);
214                         rot_last = true;
215                 }
216                 if (txb->seq == to)
217                         break;
218         }
219
220         if (rot_last)
221                 set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags);
222
223         _enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);
224
225         if (call->acks_lowest_nak == call->acks_hard_ack) {
226                 call->acks_lowest_nak = to;
227         } else if (after(to, call->acks_lowest_nak)) {
228                 summary->new_low_nack = true;
229                 call->acks_lowest_nak = to;
230         }
231
232         smp_store_release(&call->acks_hard_ack, to);
233
234         trace_rxrpc_txqueue(call, (rot_last ?
235                                    rxrpc_txqueue_rotate_last :
236                                    rxrpc_txqueue_rotate));
237         wake_up(&call->waitq);
238         return rot_last;
239 }
240
241 /*
242  * End the transmission phase of a call.
243  *
244  * This occurs when we get an ACKALL packet, the first DATA packet of a reply,
245  * or a final ACK packet.
246  */
247 static bool rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun,
248                                const char *abort_why)
249 {
250         unsigned int state;
251
252         ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags));
253
254         write_lock(&call->state_lock);
255
256         state = call->state;
257         switch (state) {
258         case RXRPC_CALL_CLIENT_SEND_REQUEST:
259         case RXRPC_CALL_CLIENT_AWAIT_REPLY:
260                 if (reply_begun)
261                         call->state = state = RXRPC_CALL_CLIENT_RECV_REPLY;
262                 else
263                         call->state = state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
264                 break;
265
266         case RXRPC_CALL_SERVER_AWAIT_ACK:
267                 __rxrpc_call_completed(call);
268                 state = call->state;
269                 break;
270
271         default:
272                 goto bad_state;
273         }
274
275         write_unlock(&call->state_lock);
276         if (state == RXRPC_CALL_CLIENT_AWAIT_REPLY)
277                 trace_rxrpc_txqueue(call, rxrpc_txqueue_await_reply);
278         else
279                 trace_rxrpc_txqueue(call, rxrpc_txqueue_end);
280         _leave(" = ok");
281         return true;
282
283 bad_state:
284         write_unlock(&call->state_lock);
285         kdebug("end_tx %s", rxrpc_call_states[call->state]);
286         rxrpc_proto_abort(abort_why, call, call->tx_top);
287         return false;
288 }
289
290 /*
291  * Begin the reply reception phase of a call.
292  */
293 static bool rxrpc_receiving_reply(struct rxrpc_call *call)
294 {
295         struct rxrpc_ack_summary summary = { 0 };
296         unsigned long now, timo;
297         rxrpc_seq_t top = READ_ONCE(call->tx_top);
298
299         if (call->ackr_reason) {
300                 now = jiffies;
301                 timo = now + MAX_JIFFY_OFFSET;
302                 WRITE_ONCE(call->resend_at, timo);
303                 WRITE_ONCE(call->delay_ack_at, timo);
304                 trace_rxrpc_timer(call, rxrpc_timer_init_for_reply, now);
305         }
306
307         if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) {
308                 if (!rxrpc_rotate_tx_window(call, top, &summary)) {
309                         rxrpc_proto_abort("TXL", call, top);
310                         return false;
311                 }
312         }
313         return rxrpc_end_tx_phase(call, true, "ETD");
314 }
315
316 static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
317                                           rxrpc_seq_t window, rxrpc_seq_t wtop)
318 {
319         atomic64_set_release(&call->ackr_window, ((u64)wtop) << 32 | window);
320 }
321
322 /*
323  * Push a DATA packet onto the Rx queue.
324  */
325 static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
326                                    rxrpc_seq_t window, rxrpc_seq_t wtop,
327                                    enum rxrpc_receive_trace why)
328 {
329         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
330         bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
331
332         __skb_queue_tail(&call->recvmsg_queue, skb);
333         rxrpc_input_update_ack_window(call, window, wtop);
334
335         trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq);
336 }
337
338 /*
339  * Process a DATA packet.
340  */
341 static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
342 {
343         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
344         struct sk_buff *oos;
345         rxrpc_serial_t serial = sp->hdr.serial;
346         u64 win = atomic64_read(&call->ackr_window);
347         rxrpc_seq_t window = lower_32_bits(win);
348         rxrpc_seq_t wtop = upper_32_bits(win);
349         rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
350         rxrpc_seq_t seq = sp->hdr.seq;
351         bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
352         int ack_reason = -1;
353
354         rxrpc_inc_stat(call->rxnet, stat_rx_data);
355         if (sp->hdr.flags & RXRPC_REQUEST_ACK)
356                 rxrpc_inc_stat(call->rxnet, stat_rx_data_reqack);
357         if (sp->hdr.flags & RXRPC_JUMBO_PACKET)
358                 rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo);
359
360         if (last) {
361                 if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
362                     seq + 1 != wtop) {
363                         rxrpc_proto_abort("LSN", call, seq);
364                         goto err_free;
365                 }
366         } else {
367                 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
368                     after_eq(seq, wtop)) {
369                         pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n",
370                                 call->debug_id, seq, window, wtop, wlimit);
371                         rxrpc_proto_abort("LSA", call, seq);
372                         goto err_free;
373                 }
374         }
375
376         if (after(seq, call->rx_highest_seq))
377                 call->rx_highest_seq = seq;
378
379         trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags);
380
381         if (before(seq, window)) {
382                 ack_reason = RXRPC_ACK_DUPLICATE;
383                 goto send_ack;
384         }
385         if (after(seq, wlimit)) {
386                 ack_reason = RXRPC_ACK_EXCEEDS_WINDOW;
387                 goto send_ack;
388         }
389
390         /* Queue the packet. */
391         if (seq == window) {
392                 rxrpc_seq_t reset_from;
393                 bool reset_sack = false;
394
395                 if (sp->hdr.flags & RXRPC_REQUEST_ACK)
396                         ack_reason = RXRPC_ACK_REQUESTED;
397                 /* Send an immediate ACK if we fill in a hole */
398                 else if (!skb_queue_empty(&call->rx_oos_queue))
399                         ack_reason = RXRPC_ACK_DELAY;
400
401                 window++;
402                 if (after(window, wtop))
403                         wtop = window;
404
405                 spin_lock(&call->recvmsg_queue.lock);
406                 rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue);
407                 skb = NULL;
408
409                 while ((oos = skb_peek(&call->rx_oos_queue))) {
410                         struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
411
412                         if (after(osp->hdr.seq, window))
413                                 break;
414
415                         __skb_unlink(oos, &call->rx_oos_queue);
416                         last = osp->hdr.flags & RXRPC_LAST_PACKET;
417                         seq = osp->hdr.seq;
418                         if (!reset_sack) {
419                                 reset_from = seq;
420                                 reset_sack = true;
421                         }
422
423                         window++;
424                         rxrpc_input_queue_data(call, oos, window, wtop,
425                                                  rxrpc_receive_queue_oos);
426                 }
427
428                 spin_unlock(&call->recvmsg_queue.lock);
429
430                 if (reset_sack) {
431                         do {
432                                 call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0;
433                         } while (reset_from++, before(reset_from, window));
434                 }
435         } else {
436                 bool keep = false;
437
438                 ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE;
439
440                 if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) {
441                         call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1;
442                         keep = 1;
443                 }
444
445                 if (after(seq + 1, wtop)) {
446                         wtop = seq + 1;
447                         rxrpc_input_update_ack_window(call, window, wtop);
448                 }
449
450                 if (!keep) {
451                         ack_reason = RXRPC_ACK_DUPLICATE;
452                         goto send_ack;
453                 }
454
455                 skb_queue_walk(&call->rx_oos_queue, oos) {
456                         struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
457
458                         if (after(osp->hdr.seq, seq)) {
459                                 __skb_queue_before(&call->rx_oos_queue, oos, skb);
460                                 goto oos_queued;
461                         }
462                 }
463
464                 __skb_queue_tail(&call->rx_oos_queue, skb);
465         oos_queued:
466                 trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos,
467                                     sp->hdr.serial, sp->hdr.seq);
468                 skb = NULL;
469         }
470
471 send_ack:
472         if (ack_reason < 0 &&
473             atomic_inc_return(&call->ackr_nr_unacked) > 2 &&
474             test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
475                 ack_reason = RXRPC_ACK_IDLE;
476         } else if (ack_reason >= 0) {
477                 set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags);
478         }
479
480         if (ack_reason >= 0)
481                 rxrpc_send_ACK(call, ack_reason, serial,
482                                rxrpc_propose_ack_input_data);
483         else
484                 rxrpc_propose_delay_ACK(call, serial,
485                                         rxrpc_propose_ack_input_data);
486
487 err_free:
488         rxrpc_free_skb(skb, rxrpc_skb_put_input);
489 }
490
491 /*
492  * Split a jumbo packet and file the bits separately.
493  */
494 static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb)
495 {
496         struct rxrpc_jumbo_header jhdr;
497         struct rxrpc_skb_priv *sp = rxrpc_skb(skb), *jsp;
498         struct sk_buff *jskb;
499         unsigned int offset = sizeof(struct rxrpc_wire_header);
500         unsigned int len = skb->len - offset;
501
502         while (sp->hdr.flags & RXRPC_JUMBO_PACKET) {
503                 if (len < RXRPC_JUMBO_SUBPKTLEN)
504                         goto protocol_error;
505                 if (sp->hdr.flags & RXRPC_LAST_PACKET)
506                         goto protocol_error;
507                 if (skb_copy_bits(skb, offset + RXRPC_JUMBO_DATALEN,
508                                   &jhdr, sizeof(jhdr)) < 0)
509                         goto protocol_error;
510
511                 jskb = skb_clone(skb, GFP_ATOMIC);
512                 if (!jskb) {
513                         kdebug("couldn't clone");
514                         return false;
515                 }
516                 rxrpc_new_skb(jskb, rxrpc_skb_new_jumbo_subpacket);
517                 jsp = rxrpc_skb(jskb);
518                 jsp->offset = offset;
519                 jsp->len = RXRPC_JUMBO_DATALEN;
520                 rxrpc_input_data_one(call, jskb);
521
522                 sp->hdr.flags = jhdr.flags;
523                 sp->hdr._rsvd = ntohs(jhdr._rsvd);
524                 sp->hdr.seq++;
525                 sp->hdr.serial++;
526                 offset += RXRPC_JUMBO_SUBPKTLEN;
527                 len -= RXRPC_JUMBO_SUBPKTLEN;
528         }
529
530         sp->offset = offset;
531         sp->len    = len;
532         rxrpc_input_data_one(call, skb);
533         return true;
534
535 protocol_error:
536         return false;
537 }
538
539 /*
540  * Process a DATA packet, adding the packet to the Rx ring.  The caller's
541  * packet ref must be passed on or discarded.
542  */
543 static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
544 {
545         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
546         enum rxrpc_call_state state;
547         rxrpc_serial_t serial = sp->hdr.serial;
548         rxrpc_seq_t seq0 = sp->hdr.seq;
549
550         _enter("{%llx,%x},{%u,%x}",
551                atomic64_read(&call->ackr_window), call->rx_highest_seq,
552                skb->len, seq0);
553
554         state = READ_ONCE(call->state);
555         if (state >= RXRPC_CALL_COMPLETE) {
556                 rxrpc_free_skb(skb, rxrpc_skb_put_input);
557                 return;
558         }
559
560         /* Unshare the packet so that it can be modified for in-place
561          * decryption.
562          */
563         if (sp->hdr.securityIndex != 0) {
564                 struct sk_buff *nskb = skb_unshare(skb, GFP_ATOMIC);
565                 if (!nskb) {
566                         rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare_nomem);
567                         return;
568                 }
569
570                 if (nskb != skb) {
571                         rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare);
572                         skb = nskb;
573                         rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
574                         sp = rxrpc_skb(skb);
575                 }
576         }
577
578         if (state == RXRPC_CALL_SERVER_RECV_REQUEST) {
579                 unsigned long timo = READ_ONCE(call->next_req_timo);
580                 unsigned long now, expect_req_by;
581
582                 if (timo) {
583                         now = jiffies;
584                         expect_req_by = now + timo;
585                         WRITE_ONCE(call->expect_req_by, expect_req_by);
586                         rxrpc_reduce_call_timer(call, expect_req_by, now,
587                                                 rxrpc_timer_set_for_idle);
588                 }
589         }
590
591         spin_lock(&call->input_lock);
592
593         /* Received data implicitly ACKs all of the request packets we sent
594          * when we're acting as a client.
595          */
596         if ((state == RXRPC_CALL_CLIENT_SEND_REQUEST ||
597              state == RXRPC_CALL_CLIENT_AWAIT_REPLY) &&
598             !rxrpc_receiving_reply(call))
599                 goto out;
600
601         if (!rxrpc_input_split_jumbo(call, skb)) {
602                 rxrpc_proto_abort("VLD", call, sp->hdr.seq);
603                 goto out;
604         }
605         skb = NULL;
606
607 out:
608         trace_rxrpc_notify_socket(call->debug_id, serial);
609         rxrpc_notify_socket(call);
610
611         spin_unlock(&call->input_lock);
612         rxrpc_free_skb(skb, rxrpc_skb_put_input);
613         _leave(" [queued]");
614 }
615
616 /*
617  * See if there's a cached RTT probe to complete.
618  */
619 static void rxrpc_complete_rtt_probe(struct rxrpc_call *call,
620                                      ktime_t resp_time,
621                                      rxrpc_serial_t acked_serial,
622                                      rxrpc_serial_t ack_serial,
623                                      enum rxrpc_rtt_rx_trace type)
624 {
625         rxrpc_serial_t orig_serial;
626         unsigned long avail;
627         ktime_t sent_at;
628         bool matched = false;
629         int i;
630
631         avail = READ_ONCE(call->rtt_avail);
632         smp_rmb(); /* Read avail bits before accessing data. */
633
634         for (i = 0; i < ARRAY_SIZE(call->rtt_serial); i++) {
635                 if (!test_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &avail))
636                         continue;
637
638                 sent_at = call->rtt_sent_at[i];
639                 orig_serial = call->rtt_serial[i];
640
641                 if (orig_serial == acked_serial) {
642                         clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
643                         smp_mb(); /* Read data before setting avail bit */
644                         set_bit(i, &call->rtt_avail);
645                         if (type != rxrpc_rtt_rx_cancel)
646                                 rxrpc_peer_add_rtt(call, type, i, acked_serial, ack_serial,
647                                                    sent_at, resp_time);
648                         else
649                                 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_cancel, i,
650                                                    orig_serial, acked_serial, 0, 0);
651                         matched = true;
652                 }
653
654                 /* If a later serial is being acked, then mark this slot as
655                  * being available.
656                  */
657                 if (after(acked_serial, orig_serial)) {
658                         trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_obsolete, i,
659                                            orig_serial, acked_serial, 0, 0);
660                         clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
661                         smp_wmb();
662                         set_bit(i, &call->rtt_avail);
663                 }
664         }
665
666         if (!matched)
667                 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_lost, 9, 0, acked_serial, 0, 0);
668 }
669
670 /*
671  * Process the response to a ping that we sent to find out if we lost an ACK.
672  *
673  * If we got back a ping response that indicates a lower tx_top than what we
674  * had at the time of the ping transmission, we adjudge all the DATA packets
675  * sent between the response tx_top and the ping-time tx_top to have been lost.
676  */
677 static void rxrpc_input_check_for_lost_ack(struct rxrpc_call *call)
678 {
679         if (after(call->acks_lost_top, call->acks_prev_seq) &&
680             !test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events))
681                 rxrpc_queue_call(call, rxrpc_call_queue_resend);
682 }
683
684 /*
685  * Process a ping response.
686  */
687 static void rxrpc_input_ping_response(struct rxrpc_call *call,
688                                       ktime_t resp_time,
689                                       rxrpc_serial_t acked_serial,
690                                       rxrpc_serial_t ack_serial)
691 {
692         if (acked_serial == call->acks_lost_ping)
693                 rxrpc_input_check_for_lost_ack(call);
694 }
695
696 /*
697  * Process the extra information that may be appended to an ACK packet
698  */
699 static void rxrpc_input_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
700                                 struct rxrpc_ackinfo *ackinfo)
701 {
702         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
703         struct rxrpc_peer *peer;
704         unsigned int mtu;
705         bool wake = false;
706         u32 rwind = ntohl(ackinfo->rwind);
707
708         if (rwind > RXRPC_TX_MAX_WINDOW)
709                 rwind = RXRPC_TX_MAX_WINDOW;
710         if (call->tx_winsize != rwind) {
711                 if (rwind > call->tx_winsize)
712                         wake = true;
713                 trace_rxrpc_rx_rwind_change(call, sp->hdr.serial, rwind, wake);
714                 call->tx_winsize = rwind;
715         }
716
717         if (call->cong_ssthresh > rwind)
718                 call->cong_ssthresh = rwind;
719
720         mtu = min(ntohl(ackinfo->rxMTU), ntohl(ackinfo->maxMTU));
721
722         peer = call->peer;
723         if (mtu < peer->maxdata) {
724                 spin_lock_bh(&peer->lock);
725                 peer->maxdata = mtu;
726                 peer->mtu = mtu + peer->hdrsize;
727                 spin_unlock_bh(&peer->lock);
728         }
729
730         if (wake)
731                 wake_up(&call->waitq);
732 }
733
734 /*
735  * Process individual soft ACKs.
736  *
737  * Each ACK in the array corresponds to one packet and can be either an ACK or
738  * a NAK.  If we get find an explicitly NAK'd packet we resend immediately;
739  * packets that lie beyond the end of the ACK list are scheduled for resend by
740  * the timer on the basis that the peer might just not have processed them at
741  * the time the ACK was sent.
742  */
743 static void rxrpc_input_soft_acks(struct rxrpc_call *call, u8 *acks,
744                                   rxrpc_seq_t seq, int nr_acks,
745                                   struct rxrpc_ack_summary *summary)
746 {
747         unsigned int i;
748
749         for (i = 0; i < nr_acks; i++) {
750                 if (acks[i] == RXRPC_ACK_TYPE_ACK) {
751                         summary->nr_acks++;
752                         summary->nr_new_acks++;
753                 } else {
754                         if (!summary->saw_nacks &&
755                             call->acks_lowest_nak != seq + i) {
756                                 call->acks_lowest_nak = seq + i;
757                                 summary->new_low_nack = true;
758                         }
759                         summary->saw_nacks = true;
760                 }
761         }
762 }
763
764 /*
765  * Return true if the ACK is valid - ie. it doesn't appear to have regressed
766  * with respect to the ack state conveyed by preceding ACKs.
767  */
768 static bool rxrpc_is_ack_valid(struct rxrpc_call *call,
769                                rxrpc_seq_t first_pkt, rxrpc_seq_t prev_pkt)
770 {
771         rxrpc_seq_t base = READ_ONCE(call->acks_first_seq);
772
773         if (after(first_pkt, base))
774                 return true; /* The window advanced */
775
776         if (before(first_pkt, base))
777                 return false; /* firstPacket regressed */
778
779         if (after_eq(prev_pkt, call->acks_prev_seq))
780                 return true; /* previousPacket hasn't regressed. */
781
782         /* Some rx implementations put a serial number in previousPacket. */
783         if (after_eq(prev_pkt, base + call->tx_winsize))
784                 return false;
785         return true;
786 }
787
788 /*
789  * Process an ACK packet.
790  *
791  * ack.firstPacket is the sequence number of the first soft-ACK'd/NAK'd packet
792  * in the ACK array.  Anything before that is hard-ACK'd and may be discarded.
793  *
794  * A hard-ACK means that a packet has been processed and may be discarded; a
795  * soft-ACK means that the packet may be discarded and retransmission
796  * requested.  A phase is complete when all packets are hard-ACK'd.
797  */
798 static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
799 {
800         struct rxrpc_ack_summary summary = { 0 };
801         struct rxrpc_ackpacket ack;
802         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
803         struct rxrpc_ackinfo info;
804         struct sk_buff *skb_old = NULL, *skb_put = skb;
805         rxrpc_serial_t ack_serial, acked_serial;
806         rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt;
807         int nr_acks, offset, ioffset;
808
809         _enter("");
810
811         offset = sizeof(struct rxrpc_wire_header);
812         if (skb_copy_bits(skb, offset, &ack, sizeof(ack)) < 0) {
813                 rxrpc_proto_abort("XAK", call, 0);
814                 goto out_not_locked;
815         }
816         offset += sizeof(ack);
817
818         ack_serial = sp->hdr.serial;
819         acked_serial = ntohl(ack.serial);
820         first_soft_ack = ntohl(ack.firstPacket);
821         prev_pkt = ntohl(ack.previousPacket);
822         hard_ack = first_soft_ack - 1;
823         nr_acks = ack.nAcks;
824         summary.ack_reason = (ack.reason < RXRPC_ACK__INVALID ?
825                               ack.reason : RXRPC_ACK__INVALID);
826
827         trace_rxrpc_rx_ack(call, ack_serial, acked_serial,
828                            first_soft_ack, prev_pkt,
829                            summary.ack_reason, nr_acks);
830         rxrpc_inc_stat(call->rxnet, stat_rx_acks[ack.reason]);
831
832         switch (ack.reason) {
833         case RXRPC_ACK_PING_RESPONSE:
834                 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial,
835                                          rxrpc_rtt_rx_ping_response);
836                 break;
837         case RXRPC_ACK_REQUESTED:
838                 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial,
839                                          rxrpc_rtt_rx_requested_ack);
840                 break;
841         default:
842                 if (acked_serial != 0)
843                         rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial,
844                                                  rxrpc_rtt_rx_cancel);
845                 break;
846         }
847
848         if (ack.reason == RXRPC_ACK_PING) {
849                 rxrpc_send_ACK(call, RXRPC_ACK_PING_RESPONSE, ack_serial,
850                                rxrpc_propose_ack_respond_to_ping);
851         } else if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
852                 rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, ack_serial,
853                                rxrpc_propose_ack_respond_to_ack);
854         }
855
856         /* If we get an EXCEEDS_WINDOW ACK from the server, it probably
857          * indicates that the client address changed due to NAT.  The server
858          * lost the call because it switched to a different peer.
859          */
860         if (unlikely(ack.reason == RXRPC_ACK_EXCEEDS_WINDOW) &&
861             first_soft_ack == 1 &&
862             prev_pkt == 0 &&
863             rxrpc_is_client_call(call)) {
864                 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
865                                           0, -ENETRESET);
866                 return;
867         }
868
869         /* If we get an OUT_OF_SEQUENCE ACK from the server, that can also
870          * indicate a change of address.  However, we can retransmit the call
871          * if we still have it buffered to the beginning.
872          */
873         if (unlikely(ack.reason == RXRPC_ACK_OUT_OF_SEQUENCE) &&
874             first_soft_ack == 1 &&
875             prev_pkt == 0 &&
876             call->acks_hard_ack == 0 &&
877             rxrpc_is_client_call(call)) {
878                 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
879                                           0, -ENETRESET);
880                 return;
881         }
882
883         /* Discard any out-of-order or duplicate ACKs (outside lock). */
884         if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) {
885                 trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial,
886                                            first_soft_ack, call->acks_first_seq,
887                                            prev_pkt, call->acks_prev_seq);
888                 goto out_not_locked;
889         }
890
891         info.rxMTU = 0;
892         ioffset = offset + nr_acks + 3;
893         if (skb->len >= ioffset + sizeof(info) &&
894             skb_copy_bits(skb, ioffset, &info, sizeof(info)) < 0) {
895                 rxrpc_proto_abort("XAI", call, 0);
896                 goto out_not_locked;
897         }
898
899         if (nr_acks > 0)
900                 skb_condense(skb);
901
902         spin_lock(&call->input_lock);
903
904         /* Discard any out-of-order or duplicate ACKs (inside lock). */
905         if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) {
906                 trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial,
907                                            first_soft_ack, call->acks_first_seq,
908                                            prev_pkt, call->acks_prev_seq);
909                 goto out;
910         }
911         call->acks_latest_ts = skb->tstamp;
912
913         call->acks_first_seq = first_soft_ack;
914         call->acks_prev_seq = prev_pkt;
915
916         switch (ack.reason) {
917         case RXRPC_ACK_PING:
918                 break;
919         case RXRPC_ACK_PING_RESPONSE:
920                 rxrpc_input_ping_response(call, skb->tstamp, acked_serial,
921                                           ack_serial);
922                 fallthrough;
923         default:
924                 if (after(acked_serial, call->acks_highest_serial))
925                         call->acks_highest_serial = acked_serial;
926                 break;
927         }
928
929         /* Parse rwind and mtu sizes if provided. */
930         if (info.rxMTU)
931                 rxrpc_input_ackinfo(call, skb, &info);
932
933         if (first_soft_ack == 0) {
934                 rxrpc_proto_abort("AK0", call, 0);
935                 goto out;
936         }
937
938         /* Ignore ACKs unless we are or have just been transmitting. */
939         switch (READ_ONCE(call->state)) {
940         case RXRPC_CALL_CLIENT_SEND_REQUEST:
941         case RXRPC_CALL_CLIENT_AWAIT_REPLY:
942         case RXRPC_CALL_SERVER_SEND_REPLY:
943         case RXRPC_CALL_SERVER_AWAIT_ACK:
944                 break;
945         default:
946                 goto out;
947         }
948
949         if (before(hard_ack, call->acks_hard_ack) ||
950             after(hard_ack, call->tx_top)) {
951                 rxrpc_proto_abort("AKW", call, 0);
952                 goto out;
953         }
954         if (nr_acks > call->tx_top - hard_ack) {
955                 rxrpc_proto_abort("AKN", call, 0);
956                 goto out;
957         }
958
959         if (after(hard_ack, call->acks_hard_ack)) {
960                 if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) {
961                         rxrpc_end_tx_phase(call, false, "ETA");
962                         goto out;
963                 }
964         }
965
966         if (nr_acks > 0) {
967                 if (offset > (int)skb->len - nr_acks) {
968                         rxrpc_proto_abort("XSA", call, 0);
969                         goto out;
970                 }
971
972                 spin_lock(&call->acks_ack_lock);
973                 skb_old = call->acks_soft_tbl;
974                 call->acks_soft_tbl = skb;
975                 spin_unlock(&call->acks_ack_lock);
976
977                 rxrpc_input_soft_acks(call, skb->data + offset, first_soft_ack,
978                                       nr_acks, &summary);
979                 skb_put = NULL;
980         } else if (call->acks_soft_tbl) {
981                 spin_lock(&call->acks_ack_lock);
982                 skb_old = call->acks_soft_tbl;
983                 call->acks_soft_tbl = NULL;
984                 spin_unlock(&call->acks_ack_lock);
985         }
986
987         if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) &&
988             summary.nr_acks == call->tx_top - hard_ack &&
989             rxrpc_is_client_call(call))
990                 rxrpc_propose_ping(call, ack_serial,
991                                    rxrpc_propose_ack_ping_for_lost_reply);
992
993         rxrpc_congestion_management(call, skb, &summary, acked_serial);
994 out:
995         spin_unlock(&call->input_lock);
996 out_not_locked:
997         rxrpc_free_skb(skb_put, rxrpc_skb_put_input);
998         rxrpc_free_skb(skb_old, rxrpc_skb_put_ack);
999 }
1000
1001 /*
1002  * Process an ACKALL packet.
1003  */
1004 static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb)
1005 {
1006         struct rxrpc_ack_summary summary = { 0 };
1007
1008         spin_lock(&call->input_lock);
1009
1010         if (rxrpc_rotate_tx_window(call, call->tx_top, &summary))
1011                 rxrpc_end_tx_phase(call, false, "ETL");
1012
1013         spin_unlock(&call->input_lock);
1014 }
1015
1016 /*
1017  * Process an ABORT packet directed at a call.
1018  */
1019 static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
1020 {
1021         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
1022
1023         trace_rxrpc_rx_abort(call, sp->hdr.serial, skb->priority);
1024
1025         rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
1026                                   skb->priority, -ECONNABORTED);
1027 }
1028
1029 /*
1030  * Process an incoming call packet.
1031  */
1032 void rxrpc_input_call_packet(struct rxrpc_call *call,
1033                                     struct sk_buff *skb)
1034 {
1035         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
1036         unsigned long timo;
1037
1038         _enter("%p,%p", call, skb);
1039
1040         timo = READ_ONCE(call->next_rx_timo);
1041         if (timo) {
1042                 unsigned long now = jiffies, expect_rx_by;
1043
1044                 expect_rx_by = now + timo;
1045                 WRITE_ONCE(call->expect_rx_by, expect_rx_by);
1046                 rxrpc_reduce_call_timer(call, expect_rx_by, now,
1047                                         rxrpc_timer_set_for_normal);
1048         }
1049
1050         switch (sp->hdr.type) {
1051         case RXRPC_PACKET_TYPE_DATA:
1052                 rxrpc_input_data(call, skb);
1053                 goto no_free;
1054
1055         case RXRPC_PACKET_TYPE_ACK:
1056                 rxrpc_input_ack(call, skb);
1057                 goto no_free;
1058
1059         case RXRPC_PACKET_TYPE_BUSY:
1060                 /* Just ignore BUSY packets from the server; the retry and
1061                  * lifespan timers will take care of business.  BUSY packets
1062                  * from the client don't make sense.
1063                  */
1064                 break;
1065
1066         case RXRPC_PACKET_TYPE_ABORT:
1067                 rxrpc_input_abort(call, skb);
1068                 break;
1069
1070         case RXRPC_PACKET_TYPE_ACKALL:
1071                 rxrpc_input_ackall(call, skb);
1072                 break;
1073
1074         default:
1075                 break;
1076         }
1077
1078         rxrpc_free_skb(skb, rxrpc_skb_put_input);
1079 no_free:
1080         _leave("");
1081 }
1082
1083 /*
1084  * Handle a new service call on a channel implicitly completing the preceding
1085  * call on that channel.  This does not apply to client conns.
1086  *
1087  * TODO: If callNumber > call_id + 1, renegotiate security.
1088  */
1089 void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
1090                                    struct rxrpc_connection *conn,
1091                                    struct rxrpc_call *call)
1092 {
1093         switch (READ_ONCE(call->state)) {
1094         case RXRPC_CALL_SERVER_AWAIT_ACK:
1095                 rxrpc_call_completed(call);
1096                 fallthrough;
1097         case RXRPC_CALL_COMPLETE:
1098                 break;
1099         default:
1100                 if (rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN)) {
1101                         set_bit(RXRPC_CALL_EV_ABORT, &call->events);
1102                         rxrpc_queue_call(call, rxrpc_call_queue_abort);
1103                 }
1104                 trace_rxrpc_improper_term(call);
1105                 break;
1106         }
1107
1108         spin_lock(&rx->incoming_lock);
1109         __rxrpc_disconnect_call(conn, call);
1110         spin_unlock(&rx->incoming_lock);
1111 }