rxrpc: Allocate ACK records at proposal and queue for transmission
authorDavid Howells <dhowells@redhat.com>
Thu, 30 Jan 2020 21:48:13 +0000 (21:48 +0000)
committerDavid Howells <dhowells@redhat.com>
Tue, 8 Nov 2022 16:42:28 +0000 (16:42 +0000)
Allocate rxrpc_txbuf records for ACKs and put onto a queue for the
transmitter thread to dispatch.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/call_accept.c
net/rxrpc/call_event.c
net/rxrpc/input.c
net/rxrpc/local_object.c
net/rxrpc/output.c
net/rxrpc/recvmsg.c
net/rxrpc/sendmsg.c
net/rxrpc/txbuf.c

index 47b157b..1597ff7 100644 (file)
@@ -34,7 +34,8 @@
        EM(rxrpc_local_new,                     "NEW") \
        EM(rxrpc_local_processing,              "PRO") \
        EM(rxrpc_local_put,                     "PUT") \
-       E_(rxrpc_local_queued,                  "QUE")
+       EM(rxrpc_local_queued,                  "QUE") \
+       E_(rxrpc_local_tx_ack,                  "TAK")
 
 #define rxrpc_peer_traces \
        EM(rxrpc_peer_got,                      "GOT") \
        EM(rxrpc_txbuf_free,                    "FREE       ")  \
        EM(rxrpc_txbuf_get_trans,               "GET TRANS  ")  \
        EM(rxrpc_txbuf_get_retrans,             "GET RETRANS")  \
+       EM(rxrpc_txbuf_put_ack_tx,              "PUT ACK TX ")  \
        EM(rxrpc_txbuf_put_cleaned,             "PUT CLEANED")  \
+       EM(rxrpc_txbuf_put_nomem,               "PUT NOMEM  ")  \
        EM(rxrpc_txbuf_put_rotated,             "PUT ROTATED")  \
        EM(rxrpc_txbuf_put_send_aborted,        "PUT SEND-X ")  \
        EM(rxrpc_txbuf_see_send_more,           "SEE SEND+  ")  \
@@ -1095,19 +1098,16 @@ TRACE_EVENT(rxrpc_rx_lose,
 
 TRACE_EVENT(rxrpc_propose_ack,
            TP_PROTO(struct rxrpc_call *call, enum rxrpc_propose_ack_trace why,
-                    u8 ack_reason, rxrpc_serial_t serial, bool immediate,
-                    bool background, enum rxrpc_propose_ack_outcome outcome),
+                    u8 ack_reason, rxrpc_serial_t serial,
+                    enum rxrpc_propose_ack_outcome outcome),
 
-           TP_ARGS(call, why, ack_reason, serial, immediate, background,
-                   outcome),
+           TP_ARGS(call, why, ack_reason, serial, outcome),
 
            TP_STRUCT__entry(
                    __field(unsigned int,                       call            )
                    __field(enum rxrpc_propose_ack_trace,       why             )
                    __field(rxrpc_serial_t,                     serial          )
                    __field(u8,                                 ack_reason      )
-                   __field(bool,                               immediate       )
-                   __field(bool,                               background      )
                    __field(enum rxrpc_propose_ack_outcome,     outcome         )
                             ),
 
@@ -1116,21 +1116,44 @@ TRACE_EVENT(rxrpc_propose_ack,
                    __entry->why        = why;
                    __entry->serial     = serial;
                    __entry->ack_reason = ack_reason;
-                   __entry->immediate  = immediate;
-                   __entry->background = background;
                    __entry->outcome    = outcome;
                           ),
 
-           TP_printk("c=%08x %s %s r=%08x i=%u b=%u%s",
+           TP_printk("c=%08x %s %s r=%08x%s",
                      __entry->call,
                      __print_symbolic(__entry->why, rxrpc_propose_ack_traces),
                      __print_symbolic(__entry->ack_reason, rxrpc_ack_names),
                      __entry->serial,
-                     __entry->immediate,
-                     __entry->background,
                      __print_symbolic(__entry->outcome, rxrpc_propose_ack_outcomes))
            );
 
+TRACE_EVENT(rxrpc_send_ack,
+           TP_PROTO(struct rxrpc_call *call, enum rxrpc_propose_ack_trace why,
+                    u8 ack_reason, rxrpc_serial_t serial),
+
+           TP_ARGS(call, why, ack_reason, serial),
+
+           TP_STRUCT__entry(
+                   __field(unsigned int,                       call            )
+                   __field(enum rxrpc_propose_ack_trace,       why             )
+                   __field(rxrpc_serial_t,                     serial          )
+                   __field(u8,                                 ack_reason      )
+                            ),
+
+           TP_fast_assign(
+                   __entry->call       = call->debug_id;
+                   __entry->why        = why;
+                   __entry->serial     = serial;
+                   __entry->ack_reason = ack_reason;
+                          ),
+
+           TP_printk("c=%08x %s %s r=%08x",
+                     __entry->call,
+                     __print_symbolic(__entry->why, rxrpc_propose_ack_traces),
+                     __print_symbolic(__entry->ack_reason, rxrpc_ack_names),
+                     __entry->serial)
+           );
+
 TRACE_EVENT(rxrpc_retransmit,
            TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, u8 annotation,
                     s64 expiry),
index 49e9061..802a8f3 100644 (file)
@@ -292,6 +292,8 @@ struct rxrpc_local {
        struct hlist_node       link;
        struct socket           *socket;        /* my UDP socket */
        struct work_struct      processor;
+       struct list_head        ack_tx_queue;   /* List of ACKs that need sending */
+       spinlock_t              ack_tx_lock;    /* ACK list lock */
        struct rxrpc_sock __rcu *service;       /* Service(s) listening on this endpoint */
        struct rw_semaphore     defrag_sem;     /* control re-enablement of IP DF bit */
        struct sk_buff_head     reject_queue;   /* packets awaiting rejection */
@@ -520,10 +522,8 @@ enum rxrpc_call_flag {
  * Events that can be raised on a call.
  */
 enum rxrpc_call_event {
-       RXRPC_CALL_EV_ACK,              /* need to generate ACK */
        RXRPC_CALL_EV_ABORT,            /* need to generate abort */
        RXRPC_CALL_EV_RESEND,           /* Tx resend required */
-       RXRPC_CALL_EV_PING,             /* Ping send required */
        RXRPC_CALL_EV_EXPIRED,          /* Expiry occurred */
        RXRPC_CALL_EV_ACK_LOST,         /* ACK may be lost, send ping */
 };
@@ -782,13 +782,20 @@ struct rxrpc_txbuf {
 #define RXRPC_TXBUF_LAST       2               /* Set if last packet in Tx phase */
 #define RXRPC_TXBUF_RESENT     3               /* Set if has been resent */
 #define RXRPC_TXBUF_RETRANS    4               /* Set if should be retransmitted */
+       u8 /*enum rxrpc_propose_ack_trace*/ ack_why;    /* If ack, why */
        struct {
                /* The packet for encrypting and DMA'ing.  We align it such
                 * that data[] aligns correctly for any crypto blocksize.
                 */
                u8              pad[64 - sizeof(struct rxrpc_wire_header)];
                struct rxrpc_wire_header wire;  /* Network-ready header */
-               u8              data[RXRPC_JUMBO_DATALEN]; /* Data packet */
+               union {
+                       u8      data[RXRPC_JUMBO_DATALEN]; /* Data packet */
+                       struct {
+                               struct rxrpc_ackpacket ack;
+                               u8 acks[0];
+                       };
+               };
        } __aligned(64);
 };
 
@@ -824,8 +831,10 @@ int rxrpc_user_charge_accept(struct rxrpc_sock *, unsigned long);
 /*
  * call_event.c
  */
-void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool, bool,
-                      enum rxrpc_propose_ack_trace);
+void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
+                       enum rxrpc_propose_ack_trace why);
+void rxrpc_send_ACK(struct rxrpc_call *, u8, rxrpc_serial_t, enum rxrpc_propose_ack_trace);
+void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, enum rxrpc_propose_ack_trace);
 void rxrpc_process_call(struct work_struct *);
 
 void rxrpc_reduce_call_timer(struct rxrpc_call *call,
@@ -1030,7 +1039,7 @@ static inline struct rxrpc_net *rxrpc_net(struct net *net)
 /*
  * output.c
  */
-int rxrpc_send_ack_packet(struct rxrpc_call *, bool, rxrpc_serial_t *);
+void rxrpc_transmit_ack_packets(struct rxrpc_local *);
 int rxrpc_send_abort_packet(struct rxrpc_call *);
 int rxrpc_send_data_packet(struct rxrpc_call *, struct sk_buff *, bool);
 void rxrpc_reject_packets(struct rxrpc_local *);
index 99e10ee..d8db277 100644 (file)
@@ -248,9 +248,8 @@ static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb)
 
        if (call->peer->rtt_count < 3 ||
            ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now))
-               rxrpc_propose_ACK(call, RXRPC_ACK_PING, sp->hdr.serial,
-                                 true, true,
-                                 rxrpc_propose_ack_ping_for_params);
+               rxrpc_send_ACK(call, RXRPC_ACK_PING, sp->hdr.serial,
+                              rxrpc_propose_ack_ping_for_params);
 }
 
 /*
index 70f70a0..67b54ad 100644 (file)
 /*
  * Propose a PING ACK be sent.
  */
-static void rxrpc_propose_ping(struct rxrpc_call *call,
-                              bool immediate, bool background)
+void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
+                       enum rxrpc_propose_ack_trace why)
 {
-       if (immediate) {
-               if (background &&
-                   !test_and_set_bit(RXRPC_CALL_EV_PING, &call->events))
-                       rxrpc_queue_call(call);
-       } else {
-               unsigned long now = jiffies;
-               unsigned long ping_at = now + rxrpc_idle_ack_delay;
+       unsigned long now = jiffies;
+       unsigned long ping_at = now + rxrpc_idle_ack_delay;
 
-               if (time_before(ping_at, call->ping_at)) {
-                       WRITE_ONCE(call->ping_at, ping_at);
-                       rxrpc_reduce_call_timer(call, ping_at, now,
-                                               rxrpc_timer_set_for_ping);
-               }
+       spin_lock_bh(&call->lock);
+
+       if (time_before(ping_at, call->ping_at)) {
+               rxrpc_inc_stat(call->rxnet, stat_tx_acks[RXRPC_ACK_PING]);
+               WRITE_ONCE(call->ping_at, ping_at);
+               rxrpc_reduce_call_timer(call, ping_at, now,
+                                       rxrpc_timer_set_for_ping);
+               trace_rxrpc_propose_ack(call, why, RXRPC_ACK_PING, serial,
+                                       rxrpc_propose_ack_use);
        }
+
+       spin_unlock_bh(&call->lock);
 }
 
 /*
  * propose an ACK be sent
  */
 static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
-                               u32 serial, bool immediate, bool background,
-                               enum rxrpc_propose_ack_trace why)
+                               u32 serial, enum rxrpc_propose_ack_trace why)
 {
        enum rxrpc_propose_ack_outcome outcome = rxrpc_propose_ack_use;
        unsigned long expiry = rxrpc_soft_ack_delay;
+       unsigned long now = jiffies, ack_at;
        s8 prior = rxrpc_ack_priority[ack_reason];
 
        rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);
 
-       /* Pings are handled specially because we don't want to accidentally
-        * lose a ping response by subsuming it into a ping.
-        */
-       if (ack_reason == RXRPC_ACK_PING) {
-               rxrpc_propose_ping(call, immediate, background);
-               goto trace;
-       }
-
        /* Update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
         * numbers, but we don't alter the timeout.
         */
@@ -71,8 +64,6 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
                        outcome = rxrpc_propose_ack_update;
                        call->ackr_serial = serial;
                }
-               if (!immediate)
-                       goto trace;
        } else if (prior > rxrpc_ack_priority[call->ackr_reason]) {
                call->ackr_reason = ack_reason;
                call->ackr_serial = serial;
@@ -84,8 +75,6 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
        case RXRPC_ACK_REQUESTED:
                if (rxrpc_requested_ack_delay < expiry)
                        expiry = rxrpc_requested_ack_delay;
-               if (serial == 1)
-                       immediate = false;
                break;
 
        case RXRPC_ACK_DELAY:
@@ -99,53 +88,90 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
                break;
 
        default:
-               immediate = true;
-               break;
+               WARN_ON(1);
+               return;
        }
 
-       if (test_bit(RXRPC_CALL_EV_ACK, &call->events)) {
-               _debug("already scheduled");
-       } else if (immediate || expiry == 0) {
-               _debug("immediate ACK %lx", call->events);
-               if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events) &&
-                   background)
-                       rxrpc_queue_call(call);
-       } else {
-               unsigned long now = jiffies, ack_at;
-
-               if (call->peer->srtt_us != 0)
-                       ack_at = usecs_to_jiffies(call->peer->srtt_us >> 3);
-               else
-                       ack_at = expiry;
-
-               ack_at += READ_ONCE(call->tx_backoff);
-               ack_at += now;
-               if (time_before(ack_at, call->ack_at)) {
-                       WRITE_ONCE(call->ack_at, ack_at);
-                       rxrpc_reduce_call_timer(call, ack_at, now,
-                                               rxrpc_timer_set_for_ack);
-               }
+
+       if (call->peer->srtt_us != 0)
+               ack_at = usecs_to_jiffies(call->peer->srtt_us >> 3);
+       else
+               ack_at = expiry;
+
+       ack_at += READ_ONCE(call->tx_backoff);
+       ack_at += now;
+       if (time_before(ack_at, call->ack_at)) {
+               WRITE_ONCE(call->ack_at, ack_at);
+               rxrpc_reduce_call_timer(call, ack_at, now,
+                                       rxrpc_timer_set_for_ack);
        }
 
-trace:
-       trace_rxrpc_propose_ack(call, why, ack_reason, serial, immediate,
-                               background, outcome);
+       trace_rxrpc_propose_ack(call, why, ack_reason, serial, outcome);
 }
 
 /*
  * propose an ACK be sent, locking the call structure
  */
-void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
-                      u32 serial, bool immediate, bool background,
+void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, u32 serial,
                       enum rxrpc_propose_ack_trace why)
 {
        spin_lock_bh(&call->lock);
-       __rxrpc_propose_ACK(call, ack_reason, serial,
-                           immediate, background, why);
+       __rxrpc_propose_ACK(call, ack_reason, serial, why);
        spin_unlock_bh(&call->lock);
 }
 
 /*
+ * Queue an ACK for immediate transmission.
+ */
+void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
+                   rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why)
+{
+       struct rxrpc_local *local = call->conn->params.local;
+       struct rxrpc_txbuf *txb;
+
+       if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
+               return;
+
+       rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);
+
+       txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK,
+                               in_softirq() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS);
+       if (!txb) {
+               kleave(" = -ENOMEM");
+               return;
+       }
+
+       txb->ack_why            = why;
+       txb->wire.seq           = 0;
+       txb->wire.type          = RXRPC_PACKET_TYPE_ACK;
+       txb->wire.flags         |= RXRPC_SLOW_START_OK;
+       txb->ack.bufferSpace    = 0;
+       txb->ack.maxSkew        = 0;
+       txb->ack.firstPacket    = 0;
+       txb->ack.previousPacket = 0;
+       txb->ack.serial         = htonl(serial);
+       txb->ack.reason         = ack_reason;
+       txb->ack.nAcks          = 0;
+
+       if (!rxrpc_try_get_call(call, rxrpc_call_got)) {
+               rxrpc_put_txbuf(txb, rxrpc_txbuf_put_nomem);
+               return;
+       }
+
+       spin_lock_bh(&local->ack_tx_lock);
+       list_add_tail(&txb->tx_link, &local->ack_tx_queue);
+       spin_unlock_bh(&local->ack_tx_lock);
+       trace_rxrpc_send_ack(call, why, ack_reason, serial);
+
+       if (in_task()) {
+               rxrpc_transmit_ack_packets(call->peer->local);
+       } else {
+               rxrpc_get_local(local);
+               rxrpc_queue_local(local);
+       }
+}
+
+/*
  * Handle congestion being detected by the retransmit timeout.
  */
 static void rxrpc_congestion_timeout(struct rxrpc_call *call)
@@ -230,9 +256,8 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
                ack_ts = ktime_sub(now, call->acks_latest_ts);
                if (ktime_to_us(ack_ts) < (call->peer->srtt_us >> 3))
                        goto out;
-               rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
-                                 rxrpc_propose_ack_ping_for_lost_ack);
-               rxrpc_send_ack_packet(call, true, NULL);
+               rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
+                              rxrpc_propose_ack_ping_for_lost_ack);
                goto out;
        }
 
@@ -291,9 +316,10 @@ void rxrpc_process_call(struct work_struct *work)
 {
        struct rxrpc_call *call =
                container_of(work, struct rxrpc_call, processor);
-       rxrpc_serial_t *send_ack;
        unsigned long now, next, t;
        unsigned int iterations = 0;
+       rxrpc_serial_t ackr_serial;
+       u8 ackr_reason;
 
        rxrpc_see_call(call);
 
@@ -342,7 +368,15 @@ recheck_state:
        if (time_after_eq(now, t)) {
                trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now);
                cmpxchg(&call->ack_at, t, now + MAX_JIFFY_OFFSET);
-               set_bit(RXRPC_CALL_EV_ACK, &call->events);
+               spin_lock_bh(&call->lock);
+               ackr_reason = call->ackr_reason;
+               ackr_serial = call->ackr_serial;
+               call->ackr_reason = 0;
+               call->ackr_serial = 0;
+               spin_unlock_bh(&call->lock);
+               if (ackr_reason)
+                       rxrpc_send_ACK(call, ackr_reason, ackr_serial,
+                                      rxrpc_propose_ack_ping_for_lost_ack);
        }
 
        t = READ_ONCE(call->ack_lost_at);
@@ -356,16 +390,16 @@ recheck_state:
        if (time_after_eq(now, t)) {
                trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now);
                cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET);
-               rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, true,
-                                 rxrpc_propose_ack_ping_for_keepalive);
-               set_bit(RXRPC_CALL_EV_PING, &call->events);
+               rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
+                              rxrpc_propose_ack_ping_for_keepalive);
        }
 
        t = READ_ONCE(call->ping_at);
        if (time_after_eq(now, t)) {
                trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now);
                cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET);
-               set_bit(RXRPC_CALL_EV_PING, &call->events);
+               rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
+                              rxrpc_propose_ack_ping_for_keepalive);
        }
 
        t = READ_ONCE(call->resend_at);
@@ -388,25 +422,10 @@ recheck_state:
                goto recheck_state;
        }
 
-       send_ack = NULL;
        if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events)) {
                call->acks_lost_top = call->tx_top;
-               rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
-                                 rxrpc_propose_ack_ping_for_lost_ack);
-               send_ack = &call->acks_lost_ping;
-       }
-
-       if (test_and_clear_bit(RXRPC_CALL_EV_ACK, &call->events) ||
-           send_ack) {
-               if (call->ackr_reason) {
-                       rxrpc_send_ack_packet(call, false, send_ack);
-                       goto recheck_state;
-               }
-       }
-
-       if (test_and_clear_bit(RXRPC_CALL_EV_PING, &call->events)) {
-               rxrpc_send_ack_packet(call, true, NULL);
-               goto recheck_state;
+               rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
+                              rxrpc_propose_ack_ping_for_lost_ack);
        }
 
        if (test_and_clear_bit(RXRPC_CALL_EV_RESEND, &call->events) &&
index 0cb23ba..e23f66e 100644 (file)
@@ -398,8 +398,7 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
        unsigned int j, nr_subpackets, nr_unacked = 0;
        rxrpc_serial_t serial = sp->hdr.serial, ack_serial = serial;
        rxrpc_seq_t seq0 = sp->hdr.seq, hard_ack;
-       bool immediate_ack = false, jumbo_bad = false;
-       u8 ack = 0;
+       bool jumbo_bad = false;
 
        _enter("{%u,%u},{%u,%u}",
               call->rx_hard_ack, call->rx_top, skb->len, seq0);
@@ -447,9 +446,9 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
        nr_subpackets = sp->nr_subpackets;
        if (nr_subpackets > 1) {
                if (call->nr_jumbo_bad > 3) {
-                       ack = RXRPC_ACK_NOSPACE;
-                       ack_serial = serial;
-                       goto ack;
+                       rxrpc_send_ACK(call, RXRPC_ACK_NOSPACE, serial,
+                                      rxrpc_propose_ack_input_data);
+                       goto unlock;
                }
        }
 
@@ -459,6 +458,7 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
                unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
                bool terminal = (j == nr_subpackets - 1);
                bool last = terminal && (sp->rx_flags & RXRPC_SKB_INCL_LAST);
+               bool acked = false;
                u8 flags, annotation = j;
 
                _proto("Rx DATA+%u %%%u { #%x t=%u l=%u }",
@@ -488,25 +488,22 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
                trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
 
                if (before_eq(seq, hard_ack)) {
-                       ack = RXRPC_ACK_DUPLICATE;
-                       ack_serial = serial;
+                       rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
+                                      rxrpc_propose_ack_input_data);
                        continue;
                }
 
                if (call->rxtx_buffer[ix]) {
                        rxrpc_input_dup_data(call, seq, nr_subpackets > 1,
                                             &jumbo_bad);
-                       if (ack != RXRPC_ACK_DUPLICATE) {
-                               ack = RXRPC_ACK_DUPLICATE;
-                               ack_serial = serial;
-                       }
-                       immediate_ack = true;
+                       rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
+                                      rxrpc_propose_ack_input_data);
                        continue;
                }
 
                if (after(seq, hard_ack + call->rx_winsize)) {
-                       ack = RXRPC_ACK_EXCEEDS_WINDOW;
-                       ack_serial = serial;
+                       rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial,
+                                      rxrpc_propose_ack_input_data);
                        if (flags & RXRPC_JUMBO_PACKET) {
                                if (!jumbo_bad) {
                                        call->nr_jumbo_bad++;
@@ -514,12 +511,13 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
                                }
                        }
 
-                       goto ack;
+                       goto unlock;
                }
 
-               if (flags & RXRPC_REQUEST_ACK && !ack) {
-                       ack = RXRPC_ACK_REQUESTED;
-                       ack_serial = serial;
+               if (flags & RXRPC_REQUEST_ACK) {
+                       rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial,
+                                      rxrpc_propose_ack_input_data);
+                       acked = true;
                }
 
                if (after(seq0, call->ackr_highest_seq))
@@ -542,11 +540,11 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
                        smp_store_release(&call->rx_top, seq);
                } else if (before(seq, call->rx_top)) {
                        /* Send an immediate ACK if we fill in a hole */
-                       if (!ack) {
-                               ack = RXRPC_ACK_DELAY;
-                               ack_serial = serial;
+                       if (!acked) {
+                               rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial,
+                                              rxrpc_propose_ack_input_data);
+                               acked = true;
                        }
-                       immediate_ack = true;
                }
 
                if (terminal) {
@@ -558,14 +556,8 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
                        sp = NULL;
                }
 
-               nr_unacked++;
-
                if (last) {
                        set_bit(RXRPC_CALL_RX_LAST, &call->flags);
-                       if (!ack) {
-                               ack = RXRPC_ACK_DELAY;
-                               ack_serial = serial;
-                       }
                        trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
                } else {
                        trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
@@ -574,32 +566,30 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
                if (after_eq(seq, call->rx_expect_next)) {
                        if (after(seq, call->rx_expect_next)) {
                                _net("OOS %u > %u", seq, call->rx_expect_next);
-                               ack = RXRPC_ACK_OUT_OF_SEQUENCE;
-                               ack_serial = serial;
+                               rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial,
+                                              rxrpc_propose_ack_input_data);
+                               acked = true;
                        }
                        call->rx_expect_next = seq + 1;
                }
-               if (!ack)
+
+               if (!acked) {
+                       nr_unacked++;
                        ack_serial = serial;
+               }
        }
 
-ack:
-       if (atomic_add_return(nr_unacked, &call->ackr_nr_unacked) > 2 && !ack)
-               ack = RXRPC_ACK_IDLE;
-
-       if (ack)
-               rxrpc_propose_ACK(call, ack, ack_serial,
-                                 immediate_ack, true,
-                                 rxrpc_propose_ack_input_data);
+unlock:
+       if (atomic_add_return(nr_unacked, &call->ackr_nr_unacked) > 2)
+               rxrpc_send_ACK(call, RXRPC_ACK_IDLE, ack_serial,
+                              rxrpc_propose_ack_input_data);
        else
-               rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial,
-                                 false, true,
+               rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, ack_serial,
                                  rxrpc_propose_ack_input_data);
 
        trace_rxrpc_notify_socket(call->debug_id, serial);
        rxrpc_notify_socket(call);
 
-unlock:
        spin_unlock(&call->input_lock);
        rxrpc_free_skb(skb, rxrpc_skb_freed);
        _leave(" [queued]");
@@ -893,13 +883,11 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
 
        if (buf.ack.reason == RXRPC_ACK_PING) {
                _proto("Rx ACK %%%u PING Request", ack_serial);
-               rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
-                                 ack_serial, true, true,
-                                 rxrpc_propose_ack_respond_to_ping);
+               rxrpc_send_ACK(call, RXRPC_ACK_PING_RESPONSE, ack_serial,
+                              rxrpc_propose_ack_respond_to_ping);
        } else if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
-               rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED,
-                                 ack_serial, true, true,
-                                 rxrpc_propose_ack_respond_to_ack);
+               rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, ack_serial,
+                              rxrpc_propose_ack_respond_to_ack);
        }
 
        /* If we get an EXCEEDS_WINDOW ACK from the server, it probably
@@ -1011,9 +999,8 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
            RXRPC_TX_ANNO_LAST &&
            summary.nr_acks == call->tx_top - hard_ack &&
            rxrpc_is_client_call(call))
-               rxrpc_propose_ACK(call, RXRPC_ACK_PING, ack_serial,
-                                 false, true,
-                                 rxrpc_propose_ack_ping_for_lost_reply);
+               rxrpc_propose_ping(call, ack_serial,
+                                  rxrpc_propose_ack_ping_for_lost_reply);
 
        rxrpc_congestion_management(call, skb, &summary, acked_serial);
 out:
index e95e75e..a178f71 100644 (file)
@@ -97,6 +97,8 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
                local->rxnet = rxnet;
                INIT_HLIST_NODE(&local->link);
                INIT_WORK(&local->processor, rxrpc_local_processor);
+               INIT_LIST_HEAD(&local->ack_tx_queue);
+               spin_lock_init(&local->ack_tx_lock);
                init_rwsem(&local->defrag_sem);
                skb_queue_head_init(&local->reject_queue);
                skb_queue_head_init(&local->event_queue);
@@ -432,6 +434,11 @@ static void rxrpc_local_processor(struct work_struct *work)
                        break;
                }
 
+               if (!list_empty(&local->ack_tx_queue)) {
+                       rxrpc_transmit_ack_packets(local);
+                       again = true;
+               }
+
                if (!skb_queue_empty(&local->reject_queue)) {
                        rxrpc_reject_packets(local);
                        again = true;
index 71885d7..1edbc67 100644 (file)
 #include <net/udp.h>
 #include "ar-internal.h"
 
-struct rxrpc_ack_buffer {
-       struct rxrpc_wire_header whdr;
-       struct rxrpc_ackpacket ack;
-       u8 acks[255];
-       u8 pad[3];
-       struct rxrpc_ackinfo ackinfo;
-};
-
 extern int udpv6_sendmsg(struct sock *sk, struct msghdr *msg, size_t len);
 
 static ssize_t do_udp_sendmsg(struct socket *sk, struct msghdr *msg, size_t len)
@@ -82,22 +74,21 @@ static void rxrpc_set_keepalive(struct rxrpc_call *call)
  */
 static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
                                 struct rxrpc_call *call,
-                                struct rxrpc_ack_buffer *pkt,
+                                struct rxrpc_txbuf *txb,
                                 rxrpc_seq_t *_hard_ack,
-                                rxrpc_seq_t *_top,
-                                u8 reason)
+                                rxrpc_seq_t *_top)
 {
-       rxrpc_serial_t serial;
+       struct rxrpc_ackinfo ackinfo;
        unsigned int tmp;
        rxrpc_seq_t hard_ack, top, seq;
        int ix;
        u32 mtu, jmax;
-       u8 *ackp = pkt->acks;
+       u8 *ackp = txb->acks;
 
        tmp = atomic_xchg(&call->ackr_nr_unacked, 0);
        tmp |= atomic_xchg(&call->ackr_nr_consumed, 0);
-       if (!tmp && (reason == RXRPC_ACK_DELAY ||
-                    reason == RXRPC_ACK_IDLE)) {
+       if (!tmp && (txb->ack.reason == RXRPC_ACK_DELAY ||
+                    txb->ack.reason == RXRPC_ACK_IDLE)) {
                rxrpc_inc_stat(call->rxnet, stat_tx_ack_skip);
                return 0;
        }
@@ -105,24 +96,16 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
        rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
 
        /* Barrier against rxrpc_input_data(). */
-       serial = call->ackr_serial;
        hard_ack = READ_ONCE(call->rx_hard_ack);
        top = smp_load_acquire(&call->rx_top);
        *_hard_ack = hard_ack;
        *_top = top;
 
-       pkt->ack.bufferSpace    = htons(0);
-       pkt->ack.maxSkew        = htons(0);
-       pkt->ack.firstPacket    = htonl(hard_ack + 1);
-       pkt->ack.previousPacket = htonl(call->ackr_highest_seq);
-       pkt->ack.serial         = htonl(serial);
-       pkt->ack.reason         = reason;
-       pkt->ack.nAcks          = top - hard_ack;
-
-       if (reason == RXRPC_ACK_PING)
-               pkt->whdr.flags |= RXRPC_REQUEST_ACK;
+       txb->ack.firstPacket    = htonl(hard_ack + 1);
+       txb->ack.previousPacket = htonl(call->ackr_highest_seq);
+       txb->ack.nAcks          = top - hard_ack;
 
-       if (after(top, hard_ack)) {
+       if (txb->ack.nAcks) {
                seq = hard_ack + 1;
                do {
                        ix = seq & RXRPC_RXTX_BUFF_MASK;
@@ -137,15 +120,16 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
        mtu = conn->params.peer->if_mtu;
        mtu -= conn->params.peer->hdrsize;
        jmax = (call->nr_jumbo_bad > 3) ? 1 : rxrpc_rx_jumbo_max;
-       pkt->ackinfo.rxMTU      = htonl(rxrpc_rx_mtu);
-       pkt->ackinfo.maxMTU     = htonl(mtu);
-       pkt->ackinfo.rwind      = htonl(call->rx_winsize);
-       pkt->ackinfo.jumbo_max  = htonl(jmax);
+       ackinfo.rxMTU           = htonl(rxrpc_rx_mtu);
+       ackinfo.maxMTU          = htonl(mtu);
+       ackinfo.rwind           = htonl(call->rx_winsize);
+       ackinfo.jumbo_max       = htonl(jmax);
 
        *ackp++ = 0;
        *ackp++ = 0;
        *ackp++ = 0;
-       return top - hard_ack + 3;
+       memcpy(ackp, &ackinfo, sizeof(ackinfo));
+       return top - hard_ack + 3 + sizeof(ackinfo);
 }
 
 /*
@@ -194,26 +178,21 @@ static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call,
 /*
  * Send an ACK call packet.
  */
-int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
-                         rxrpc_serial_t *_serial)
+static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *txb)
 {
        struct rxrpc_connection *conn;
        struct rxrpc_ack_buffer *pkt;
+       struct rxrpc_call *call = txb->call;
        struct msghdr msg;
-       struct kvec iov[2];
+       struct kvec iov[1];
        rxrpc_serial_t serial;
        rxrpc_seq_t hard_ack, top;
        size_t len, n;
        int ret, rtt_slot = -1;
-       u8 reason;
 
        if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
                return -ECONNRESET;
 
-       pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
-       if (!pkt)
-               return -ENOMEM;
-
        conn = call->conn;
 
        msg.msg_name    = &call->peer->srx.transport;
@@ -222,86 +201,94 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
        msg.msg_controllen = 0;
        msg.msg_flags   = 0;
 
-       pkt->whdr.epoch         = htonl(conn->proto.epoch);
-       pkt->whdr.cid           = htonl(call->cid);
-       pkt->whdr.callNumber    = htonl(call->call_id);
-       pkt->whdr.seq           = 0;
-       pkt->whdr.type          = RXRPC_PACKET_TYPE_ACK;
-       pkt->whdr.flags         = RXRPC_SLOW_START_OK | conn->out_clientflag;
-       pkt->whdr.userStatus    = 0;
-       pkt->whdr.securityIndex = call->security_ix;
-       pkt->whdr._rsvd         = 0;
-       pkt->whdr.serviceId     = htons(call->service_id);
+       if (txb->ack.reason == RXRPC_ACK_PING)
+               txb->wire.flags |= RXRPC_REQUEST_ACK;
 
        spin_lock_bh(&call->lock);
-       if (ping) {
-               reason = RXRPC_ACK_PING;
-       } else {
-               reason = call->ackr_reason;
-               if (!call->ackr_reason) {
-                       spin_unlock_bh(&call->lock);
-                       ret = 0;
-                       goto out;
-               }
-               call->ackr_reason = 0;
-       }
-       n = rxrpc_fill_out_ack(conn, call, pkt, &hard_ack, &top, reason);
-
+       n = rxrpc_fill_out_ack(conn, call, txb, &hard_ack, &top);
        spin_unlock_bh(&call->lock);
        if (n == 0) {
                kfree(pkt);
                return 0;
        }
 
-       iov[0].iov_base = pkt;
-       iov[0].iov_len  = sizeof(pkt->whdr) + sizeof(pkt->ack) + n;
-       iov[1].iov_base = &pkt->ackinfo;
-       iov[1].iov_len  = sizeof(pkt->ackinfo);
-       len = iov[0].iov_len + iov[1].iov_len;
+       iov[0].iov_base = &txb->wire;
+       iov[0].iov_len  = sizeof(txb->wire) + sizeof(txb->ack) + n;
+       len = iov[0].iov_len;
 
        serial = atomic_inc_return(&conn->serial);
-       pkt->whdr.serial = htonl(serial);
+       txb->wire.serial = htonl(serial);
        trace_rxrpc_tx_ack(call->debug_id, serial,
-                          ntohl(pkt->ack.firstPacket),
-                          ntohl(pkt->ack.serial),
-                          pkt->ack.reason, pkt->ack.nAcks);
-       if (_serial)
-               *_serial = serial;
+                          ntohl(txb->ack.firstPacket),
+                          ntohl(txb->ack.serial), txb->ack.reason, txb->ack.nAcks);
+       if (txb->ack_why == rxrpc_propose_ack_ping_for_lost_ack)
+               call->acks_lost_ping = serial;
 
-       if (ping)
+       if (txb->ack.reason == RXRPC_ACK_PING)
                rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_ping);
 
        rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);
 
-       iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
+       iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
        ret = do_udp_sendmsg(conn->params.local->socket, &msg, len);
        call->peer->last_tx_at = ktime_get_seconds();
        if (ret < 0)
                trace_rxrpc_tx_fail(call->debug_id, serial, ret,
                                    rxrpc_tx_point_call_ack);
        else
-               trace_rxrpc_tx_packet(call->debug_id, &pkt->whdr,
+               trace_rxrpc_tx_packet(call->debug_id, &txb->wire,
                                      rxrpc_tx_point_call_ack);
        rxrpc_tx_backoff(call, ret);
 
        if (call->state < RXRPC_CALL_COMPLETE) {
-               if (ret < 0) {
+               if (ret < 0)
                        rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
-                       rxrpc_propose_ACK(call, pkt->ack.reason,
-                                         ntohl(pkt->ack.serial),
-                                         false, true,
-                                         rxrpc_propose_ack_retry_tx);
-               }
-
                rxrpc_set_keepalive(call);
        }
 
-out:
        kfree(pkt);
        return ret;
 }
 
 /*
+ * ACK transmitter for a local endpoint.  The UDP socket locks around each
+ * transmission, so we can only transmit one packet at a time, ACK, DATA or
+ * otherwise.
+ */
+void rxrpc_transmit_ack_packets(struct rxrpc_local *local)
+{
+       LIST_HEAD(queue);
+       int ret;
+
+       trace_rxrpc_local(local->debug_id, rxrpc_local_tx_ack,
+                         refcount_read(&local->ref), NULL);
+
+       if (list_empty(&local->ack_tx_queue))
+               return;
+
+       spin_lock_bh(&local->ack_tx_lock);
+       list_splice_tail_init(&local->ack_tx_queue, &queue);
+       spin_unlock_bh(&local->ack_tx_lock);
+
+       while (!list_empty(&queue)) {
+               struct rxrpc_txbuf *txb =
+                       list_entry(queue.next, struct rxrpc_txbuf, tx_link);
+
+               ret = rxrpc_send_ack_packet(local, txb);
+               if (ret < 0 && ret != -ECONNRESET) {
+                       spin_lock_bh(&local->ack_tx_lock);
+                       list_splice_init(&queue, &local->ack_tx_queue);
+                       spin_unlock_bh(&local->ack_tx_lock);
+                       break;
+               }
+
+               list_del_init(&txb->tx_link);
+               rxrpc_put_call(txb->call, rxrpc_call_put);
+               rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx);
+       }
+}
+
+/*
  * Send an ABORT call packet.
  */
 int rxrpc_send_abort_packet(struct rxrpc_call *call)
index a378e7a..104dd4a 100644 (file)
@@ -189,7 +189,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
        ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
 
        if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
-               rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true,
+               rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial,
                                  rxrpc_propose_ack_terminal_ack);
                //rxrpc_send_ack_packet(call, false, NULL);
        }
@@ -206,7 +206,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
                call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
                call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
                write_unlock_bh(&call->state_lock);
-               rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true,
+               rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial,
                                  rxrpc_propose_ack_processing_op);
                break;
        default:
@@ -259,12 +259,11 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
                rxrpc_end_rx_phase(call, serial);
        } else {
                /* Check to see if there's an ACK that needs sending. */
-               if (atomic_inc_return(&call->ackr_nr_consumed) > 2)
-                       rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial,
-                                         true, false,
-                                         rxrpc_propose_ack_rotate_rx);
-               if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
-                       rxrpc_send_ack_packet(call, false, NULL);
+               if (atomic_inc_return(&call->ackr_nr_consumed) > 2) {
+                       rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
+                                      rxrpc_propose_ack_rotate_rx);
+                       rxrpc_transmit_ack_packets(call->peer->local);
+               }
        }
 }
 
@@ -363,10 +362,6 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
        unsigned int rx_pkt_offset, rx_pkt_len;
        int ix, copy, ret = -EAGAIN, ret2;
 
-       if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
-           call->ackr_reason)
-               rxrpc_send_ack_packet(call, false, NULL);
-
        rx_pkt_offset = call->rx_pkt_offset;
        rx_pkt_len = call->rx_pkt_len;
        rx_pkt_last = call->rx_pkt_last;
@@ -389,6 +384,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
                if (!skb) {
                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
                                            rx_pkt_offset, rx_pkt_len, 0);
+                       rxrpc_transmit_ack_packets(call->peer->local);
                        break;
                }
                smp_rmb();
@@ -604,6 +600,7 @@ try_again:
                if (ret == -EAGAIN)
                        ret = 0;
 
+               rxrpc_transmit_ack_packets(call->peer->local);
                if (after(call->rx_top, call->rx_hard_ack) &&
                    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
                        rxrpc_notify_socket(call);
@@ -734,17 +731,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 read_phase_complete:
        ret = 1;
 out:
-       switch (call->ackr_reason) {
-       case RXRPC_ACK_IDLE:
-               break;
-       case RXRPC_ACK_DELAY:
-               if (ret != -EAGAIN)
-                       break;
-               fallthrough;
-       default:
-               rxrpc_send_ack_packet(call, false, NULL);
-       }
-
+       rxrpc_transmit_ack_packets(call->peer->local);
        if (_service)
                *_service = call->service_id;
        mutex_unlock(&call->user_mutex);
index b2d28aa..ef49492 100644 (file)
@@ -332,9 +332,7 @@ reload:
        rxrpc_see_skb(skb, rxrpc_skb_seen);
 
        do {
-               /* Check to see if there's a ping ACK to reply to. */
-               if (call->ackr_reason == RXRPC_ACK_PING_RESPONSE)
-                       rxrpc_send_ack_packet(call, false, NULL);
+               rxrpc_transmit_ack_packets(call->peer->local);
 
                if (!skb) {
                        size_t remain, bufsize, chunk, offset;
index 66d922a..45a7b48 100644 (file)
@@ -33,6 +33,7 @@ struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
                txb->len                = 0;
                txb->offset             = 0;
                txb->flags              = 0;
+               txb->ack_why            = 0;
                txb->seq                = call->tx_top + 1;
                txb->wire.epoch         = htonl(call->conn->proto.epoch);
                txb->wire.cid           = htonl(call->cid);