From 72f0c6fb057971864fe4d42b289b8e6ede836ef1 Mon Sep 17 00:00:00 2001 From: David Howells Date: Thu, 30 Jan 2020 21:48:13 +0000 Subject: [PATCH] rxrpc: Allocate ACK records at proposal and queue for transmission Allocate rxrpc_txbuf records for ACKs and put onto a queue for the transmitter thread to dispatch. Signed-off-by: David Howells cc: Marc Dionne cc: linux-afs@lists.infradead.org --- include/trace/events/rxrpc.h | 47 ++++++--- net/rxrpc/ar-internal.h | 21 ++-- net/rxrpc/call_accept.c | 5 +- net/rxrpc/call_event.c | 189 +++++++++++++++++++---------------- net/rxrpc/input.c | 89 +++++++---------- net/rxrpc/local_object.c | 7 ++ net/rxrpc/output.c | 157 +++++++++++++---------------- net/rxrpc/recvmsg.c | 33 ++---- net/rxrpc/sendmsg.c | 4 +- net/rxrpc/txbuf.c | 1 + 10 files changed, 285 insertions(+), 268 deletions(-) diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h index 47b157b1d32b..1597ff7ad97e 100644 --- a/include/trace/events/rxrpc.h +++ b/include/trace/events/rxrpc.h @@ -34,7 +34,8 @@ EM(rxrpc_local_new, "NEW") \ EM(rxrpc_local_processing, "PRO") \ EM(rxrpc_local_put, "PUT") \ - E_(rxrpc_local_queued, "QUE") + EM(rxrpc_local_queued, "QUE") \ + E_(rxrpc_local_tx_ack, "TAK") #define rxrpc_peer_traces \ EM(rxrpc_peer_got, "GOT") \ @@ -258,7 +259,9 @@ EM(rxrpc_txbuf_free, "FREE ") \ EM(rxrpc_txbuf_get_trans, "GET TRANS ") \ EM(rxrpc_txbuf_get_retrans, "GET RETRANS") \ + EM(rxrpc_txbuf_put_ack_tx, "PUT ACK TX ") \ EM(rxrpc_txbuf_put_cleaned, "PUT CLEANED") \ + EM(rxrpc_txbuf_put_nomem, "PUT NOMEM ") \ EM(rxrpc_txbuf_put_rotated, "PUT ROTATED") \ EM(rxrpc_txbuf_put_send_aborted, "PUT SEND-X ") \ EM(rxrpc_txbuf_see_send_more, "SEE SEND+ ") \ @@ -1095,19 +1098,16 @@ TRACE_EVENT(rxrpc_rx_lose, TRACE_EVENT(rxrpc_propose_ack, TP_PROTO(struct rxrpc_call *call, enum rxrpc_propose_ack_trace why, - u8 ack_reason, rxrpc_serial_t serial, bool immediate, - bool background, enum rxrpc_propose_ack_outcome outcome), + u8 ack_reason, rxrpc_serial_t serial, + enum rxrpc_propose_ack_outcome outcome), - TP_ARGS(call, why, ack_reason, serial, immediate, background, - outcome), + TP_ARGS(call, why, ack_reason, serial, outcome), TP_STRUCT__entry( __field(unsigned int, call ) __field(enum rxrpc_propose_ack_trace, why ) __field(rxrpc_serial_t, serial ) __field(u8, ack_reason ) - __field(bool, immediate ) - __field(bool, background ) __field(enum rxrpc_propose_ack_outcome, outcome ) ), @@ -1116,21 +1116,44 @@ TRACE_EVENT(rxrpc_propose_ack, __entry->why = why; __entry->serial = serial; __entry->ack_reason = ack_reason; - __entry->immediate = immediate; - __entry->background = background; __entry->outcome = outcome; ), - TP_printk("c=%08x %s %s r=%08x i=%u b=%u%s", + TP_printk("c=%08x %s %s r=%08x%s", __entry->call, __print_symbolic(__entry->why, rxrpc_propose_ack_traces), __print_symbolic(__entry->ack_reason, rxrpc_ack_names), __entry->serial, - __entry->immediate, - __entry->background, __print_symbolic(__entry->outcome, rxrpc_propose_ack_outcomes)) ); +TRACE_EVENT(rxrpc_send_ack, + TP_PROTO(struct rxrpc_call *call, enum rxrpc_propose_ack_trace why, + u8 ack_reason, rxrpc_serial_t serial), + + TP_ARGS(call, why, ack_reason, serial), + + TP_STRUCT__entry( + __field(unsigned int, call ) + __field(enum rxrpc_propose_ack_trace, why ) + __field(rxrpc_serial_t, serial ) + __field(u8, ack_reason ) + ), + + TP_fast_assign( + __entry->call = call->debug_id; + __entry->why = why; + __entry->serial = serial; + __entry->ack_reason = ack_reason; + ), + + TP_printk("c=%08x %s %s r=%08x", + __entry->call, + __print_symbolic(__entry->why, rxrpc_propose_ack_traces), + __print_symbolic(__entry->ack_reason, rxrpc_ack_names), + __entry->serial) + ); + TRACE_EVENT(rxrpc_retransmit, TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, u8 annotation, s64 expiry), diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index 49e90614d5e5..802a8f372af2 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -292,6 +292,8 @@ struct rxrpc_local { struct hlist_node link; struct socket *socket; /* my UDP socket */ struct work_struct processor; + struct list_head ack_tx_queue; /* List of ACKs that need sending */ + spinlock_t ack_tx_lock; /* ACK list lock */ struct rxrpc_sock __rcu *service; /* Service(s) listening on this endpoint */ struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */ struct sk_buff_head reject_queue; /* packets awaiting rejection */ @@ -520,10 +522,8 @@ enum rxrpc_call_flag { * Events that can be raised on a call. */ enum rxrpc_call_event { - RXRPC_CALL_EV_ACK, /* need to generate ACK */ RXRPC_CALL_EV_ABORT, /* need to generate abort */ RXRPC_CALL_EV_RESEND, /* Tx resend required */ - RXRPC_CALL_EV_PING, /* Ping send required */ RXRPC_CALL_EV_EXPIRED, /* Expiry occurred */ RXRPC_CALL_EV_ACK_LOST, /* ACK may be lost, send ping */ }; @@ -782,13 +782,20 @@ struct rxrpc_txbuf { #define RXRPC_TXBUF_LAST 2 /* Set if last packet in Tx phase */ #define RXRPC_TXBUF_RESENT 3 /* Set if has been resent */ #define RXRPC_TXBUF_RETRANS 4 /* Set if should be retransmitted */ + u8 /*enum rxrpc_propose_ack_trace*/ ack_why; /* If ack, why */ struct { /* The packet for encrypting and DMA'ing. We align it such * that data[] aligns correctly for any crypto blocksize. */ u8 pad[64 - sizeof(struct rxrpc_wire_header)]; struct rxrpc_wire_header wire; /* Network-ready header */ - u8 data[RXRPC_JUMBO_DATALEN]; /* Data packet */ + union { + u8 data[RXRPC_JUMBO_DATALEN]; /* Data packet */ + struct { + struct rxrpc_ackpacket ack; + u8 acks[0]; + }; + }; } __aligned(64); }; @@ -824,8 +831,10 @@ int rxrpc_user_charge_accept(struct rxrpc_sock *, unsigned long); /* * call_event.c */ -void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool, bool, - enum rxrpc_propose_ack_trace); +void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial, + enum rxrpc_propose_ack_trace why); +void rxrpc_send_ACK(struct rxrpc_call *, u8, rxrpc_serial_t, enum rxrpc_propose_ack_trace); +void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, enum rxrpc_propose_ack_trace); void rxrpc_process_call(struct work_struct *); void rxrpc_reduce_call_timer(struct rxrpc_call *call, @@ -1030,7 +1039,7 @@ static inline struct rxrpc_net *rxrpc_net(struct net *net) /* * output.c */ -int rxrpc_send_ack_packet(struct rxrpc_call *, bool, rxrpc_serial_t *); +void rxrpc_transmit_ack_packets(struct rxrpc_local *); int rxrpc_send_abort_packet(struct rxrpc_call *); int rxrpc_send_data_packet(struct rxrpc_call *, struct sk_buff *, bool); void rxrpc_reject_packets(struct rxrpc_local *); diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c index 99e10eea3732..d8db277d5ebe 100644 --- a/net/rxrpc/call_accept.c +++ b/net/rxrpc/call_accept.c @@ -248,9 +248,8 @@ static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb) if (call->peer->rtt_count < 3 || ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now)) - rxrpc_propose_ACK(call, RXRPC_ACK_PING, sp->hdr.serial, - true, true, - rxrpc_propose_ack_ping_for_params); + rxrpc_send_ACK(call, RXRPC_ACK_PING, sp->hdr.serial, + rxrpc_propose_ack_ping_for_params); } /* diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c index 70f70a0393f7..67b54ad914a1 100644 --- a/net/rxrpc/call_event.c +++ b/net/rxrpc/call_event.c @@ -20,46 +20,39 @@ /* * Propose a PING ACK be sent. */ -static void rxrpc_propose_ping(struct rxrpc_call *call, - bool immediate, bool background) +void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial, + enum rxrpc_propose_ack_trace why) { - if (immediate) { - if (background && - !test_and_set_bit(RXRPC_CALL_EV_PING, &call->events)) - rxrpc_queue_call(call); - } else { - unsigned long now = jiffies; - unsigned long ping_at = now + rxrpc_idle_ack_delay; + unsigned long now = jiffies; + unsigned long ping_at = now + rxrpc_idle_ack_delay; - if (time_before(ping_at, call->ping_at)) { - WRITE_ONCE(call->ping_at, ping_at); - rxrpc_reduce_call_timer(call, ping_at, now, - rxrpc_timer_set_for_ping); - } + spin_lock_bh(&call->lock); + + if (time_before(ping_at, call->ping_at)) { + rxrpc_inc_stat(call->rxnet, stat_tx_acks[RXRPC_ACK_PING]); + WRITE_ONCE(call->ping_at, ping_at); + rxrpc_reduce_call_timer(call, ping_at, now, + rxrpc_timer_set_for_ping); + trace_rxrpc_propose_ack(call, why, RXRPC_ACK_PING, serial, + rxrpc_propose_ack_use); } + + spin_unlock_bh(&call->lock); } /* * propose an ACK be sent */ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, - u32 serial, bool immediate, bool background, - enum rxrpc_propose_ack_trace why) + u32 serial, enum rxrpc_propose_ack_trace why) { enum rxrpc_propose_ack_outcome outcome = rxrpc_propose_ack_use; unsigned long expiry = rxrpc_soft_ack_delay; + unsigned long now = jiffies, ack_at; s8 prior = rxrpc_ack_priority[ack_reason]; rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]); - /* Pings are handled specially because we don't want to accidentally - * lose a ping response by subsuming it into a ping. - */ - if (ack_reason == RXRPC_ACK_PING) { - rxrpc_propose_ping(call, immediate, background); - goto trace; - } - /* Update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial * numbers, but we don't alter the timeout. */ @@ -71,8 +64,6 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, outcome = rxrpc_propose_ack_update; call->ackr_serial = serial; } - if (!immediate) - goto trace; } else if (prior > rxrpc_ack_priority[call->ackr_reason]) { call->ackr_reason = ack_reason; call->ackr_serial = serial; @@ -84,8 +75,6 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, case RXRPC_ACK_REQUESTED: if (rxrpc_requested_ack_delay < expiry) expiry = rxrpc_requested_ack_delay; - if (serial == 1) - immediate = false; break; case RXRPC_ACK_DELAY: @@ -99,52 +88,89 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, break; default: - immediate = true; - break; + WARN_ON(1); + return; } - if (test_bit(RXRPC_CALL_EV_ACK, &call->events)) { - _debug("already scheduled"); - } else if (immediate || expiry == 0) { - _debug("immediate ACK %lx", call->events); - if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events) && - background) - rxrpc_queue_call(call); - } else { - unsigned long now = jiffies, ack_at; - - if (call->peer->srtt_us != 0) - ack_at = usecs_to_jiffies(call->peer->srtt_us >> 3); - else - ack_at = expiry; - - ack_at += READ_ONCE(call->tx_backoff); - ack_at += now; - if (time_before(ack_at, call->ack_at)) { - WRITE_ONCE(call->ack_at, ack_at); - rxrpc_reduce_call_timer(call, ack_at, now, - rxrpc_timer_set_for_ack); - } + + if (call->peer->srtt_us != 0) + ack_at = usecs_to_jiffies(call->peer->srtt_us >> 3); + else + ack_at = expiry; + + ack_at += READ_ONCE(call->tx_backoff); + ack_at += now; + if (time_before(ack_at, call->ack_at)) { + WRITE_ONCE(call->ack_at, ack_at); + rxrpc_reduce_call_timer(call, ack_at, now, + rxrpc_timer_set_for_ack); } -trace: - trace_rxrpc_propose_ack(call, why, ack_reason, serial, immediate, - background, outcome); + trace_rxrpc_propose_ack(call, why, ack_reason, serial, outcome); } /* * propose an ACK be sent, locking the call structure */ -void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, - u32 serial, bool immediate, bool background, +void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, u32 serial, enum rxrpc_propose_ack_trace why) { spin_lock_bh(&call->lock); - __rxrpc_propose_ACK(call, ack_reason, serial, - immediate, background, why); + __rxrpc_propose_ACK(call, ack_reason, serial, why); spin_unlock_bh(&call->lock); } +/* + * Queue an ACK for immediate transmission. + */ +void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason, + rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why) +{ + struct rxrpc_local *local = call->conn->params.local; + struct rxrpc_txbuf *txb; + + if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) + return; + + rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]); + + txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK, + in_softirq() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS); + if (!txb) { + kleave(" = -ENOMEM"); + return; + } + + txb->ack_why = why; + txb->wire.seq = 0; + txb->wire.type = RXRPC_PACKET_TYPE_ACK; + txb->wire.flags |= RXRPC_SLOW_START_OK; + txb->ack.bufferSpace = 0; + txb->ack.maxSkew = 0; + txb->ack.firstPacket = 0; + txb->ack.previousPacket = 0; + txb->ack.serial = htonl(serial); + txb->ack.reason = ack_reason; + txb->ack.nAcks = 0; + + if (!rxrpc_try_get_call(call, rxrpc_call_got)) { + rxrpc_put_txbuf(txb, rxrpc_txbuf_put_nomem); + return; + } + + spin_lock_bh(&local->ack_tx_lock); + list_add_tail(&txb->tx_link, &local->ack_tx_queue); + spin_unlock_bh(&local->ack_tx_lock); + trace_rxrpc_send_ack(call, why, ack_reason, serial); + + if (in_task()) { + rxrpc_transmit_ack_packets(call->peer->local); + } else { + rxrpc_get_local(local); + rxrpc_queue_local(local); + } +} + /* * Handle congestion being detected by the retransmit timeout. */ @@ -230,9 +256,8 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j) ack_ts = ktime_sub(now, call->acks_latest_ts); if (ktime_to_us(ack_ts) < (call->peer->srtt_us >> 3)) goto out; - rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false, - rxrpc_propose_ack_ping_for_lost_ack); - rxrpc_send_ack_packet(call, true, NULL); + rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, + rxrpc_propose_ack_ping_for_lost_ack); goto out; } @@ -291,9 +316,10 @@ void rxrpc_process_call(struct work_struct *work) { struct rxrpc_call *call = container_of(work, struct rxrpc_call, processor); - rxrpc_serial_t *send_ack; unsigned long now, next, t; unsigned int iterations = 0; + rxrpc_serial_t ackr_serial; + u8 ackr_reason; rxrpc_see_call(call); @@ -342,7 +368,15 @@ recheck_state: if (time_after_eq(now, t)) { trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now); cmpxchg(&call->ack_at, t, now + MAX_JIFFY_OFFSET); - set_bit(RXRPC_CALL_EV_ACK, &call->events); + spin_lock_bh(&call->lock); + ackr_reason = call->ackr_reason; + ackr_serial = call->ackr_serial; + call->ackr_reason = 0; + call->ackr_serial = 0; + spin_unlock_bh(&call->lock); + if (ackr_reason) + rxrpc_send_ACK(call, ackr_reason, ackr_serial, + rxrpc_propose_ack_ping_for_lost_ack); } t = READ_ONCE(call->ack_lost_at); @@ -356,16 +390,16 @@ recheck_state: if (time_after_eq(now, t)) { trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now); cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET); - rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, true, - rxrpc_propose_ack_ping_for_keepalive); - set_bit(RXRPC_CALL_EV_PING, &call->events); + rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, + rxrpc_propose_ack_ping_for_keepalive); } t = READ_ONCE(call->ping_at); if (time_after_eq(now, t)) { trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now); cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET); - set_bit(RXRPC_CALL_EV_PING, &call->events); + rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, + rxrpc_propose_ack_ping_for_keepalive); } t = READ_ONCE(call->resend_at); @@ -388,25 +422,10 @@ recheck_state: goto recheck_state; } - send_ack = NULL; if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events)) { call->acks_lost_top = call->tx_top; - rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false, - rxrpc_propose_ack_ping_for_lost_ack); - send_ack = &call->acks_lost_ping; - } - - if (test_and_clear_bit(RXRPC_CALL_EV_ACK, &call->events) || - send_ack) { - if (call->ackr_reason) { - rxrpc_send_ack_packet(call, false, send_ack); - goto recheck_state; - } - } - - if (test_and_clear_bit(RXRPC_CALL_EV_PING, &call->events)) { - rxrpc_send_ack_packet(call, true, NULL); - goto recheck_state; + rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, + rxrpc_propose_ack_ping_for_lost_ack); } if (test_and_clear_bit(RXRPC_CALL_EV_RESEND, &call->events) && diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index 0cb23ba79e3b..e23f66ef8c06 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -398,8 +398,7 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) unsigned int j, nr_subpackets, nr_unacked = 0; rxrpc_serial_t serial = sp->hdr.serial, ack_serial = serial; rxrpc_seq_t seq0 = sp->hdr.seq, hard_ack; - bool immediate_ack = false, jumbo_bad = false; - u8 ack = 0; + bool jumbo_bad = false; _enter("{%u,%u},{%u,%u}", call->rx_hard_ack, call->rx_top, skb->len, seq0); @@ -447,9 +446,9 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) nr_subpackets = sp->nr_subpackets; if (nr_subpackets > 1) { if (call->nr_jumbo_bad > 3) { - ack = RXRPC_ACK_NOSPACE; - ack_serial = serial; - goto ack; + rxrpc_send_ACK(call, RXRPC_ACK_NOSPACE, serial, + rxrpc_propose_ack_input_data); + goto unlock; } } @@ -459,6 +458,7 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK; bool terminal = (j == nr_subpackets - 1); bool last = terminal && (sp->rx_flags & RXRPC_SKB_INCL_LAST); + bool acked = false; u8 flags, annotation = j; _proto("Rx DATA+%u %%%u { #%x t=%u l=%u }", @@ -488,25 +488,22 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation); if (before_eq(seq, hard_ack)) { - ack = RXRPC_ACK_DUPLICATE; - ack_serial = serial; + rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial, + rxrpc_propose_ack_input_data); continue; } if (call->rxtx_buffer[ix]) { rxrpc_input_dup_data(call, seq, nr_subpackets > 1, &jumbo_bad); - if (ack != RXRPC_ACK_DUPLICATE) { - ack = RXRPC_ACK_DUPLICATE; - ack_serial = serial; - } - immediate_ack = true; + rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial, + rxrpc_propose_ack_input_data); continue; } if (after(seq, hard_ack + call->rx_winsize)) { - ack = RXRPC_ACK_EXCEEDS_WINDOW; - ack_serial = serial; + rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial, + rxrpc_propose_ack_input_data); if (flags & RXRPC_JUMBO_PACKET) { if (!jumbo_bad) { call->nr_jumbo_bad++; @@ -514,12 +511,13 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) } } - goto ack; + goto unlock; } - if (flags & RXRPC_REQUEST_ACK && !ack) { - ack = RXRPC_ACK_REQUESTED; - ack_serial = serial; + if (flags & RXRPC_REQUEST_ACK) { + rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial, + rxrpc_propose_ack_input_data); + acked = true; } if (after(seq0, call->ackr_highest_seq)) @@ -542,11 +540,11 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) smp_store_release(&call->rx_top, seq); } else if (before(seq, call->rx_top)) { /* Send an immediate ACK if we fill in a hole */ - if (!ack) { - ack = RXRPC_ACK_DELAY; - ack_serial = serial; + if (!acked) { + rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial, + rxrpc_propose_ack_input_data); + acked = true; } - immediate_ack = true; } if (terminal) { @@ -558,14 +556,8 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) sp = NULL; } - nr_unacked++; - if (last) { set_bit(RXRPC_CALL_RX_LAST, &call->flags); - if (!ack) { - ack = RXRPC_ACK_DELAY; - ack_serial = serial; - } trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq); } else { trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq); @@ -574,32 +566,30 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) if (after_eq(seq, call->rx_expect_next)) { if (after(seq, call->rx_expect_next)) { _net("OOS %u > %u", seq, call->rx_expect_next); - ack = RXRPC_ACK_OUT_OF_SEQUENCE; - ack_serial = serial; + rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial, + rxrpc_propose_ack_input_data); + acked = true; } call->rx_expect_next = seq + 1; } - if (!ack) + + if (!acked) { + nr_unacked++; ack_serial = serial; + } } -ack: - if (atomic_add_return(nr_unacked, &call->ackr_nr_unacked) > 2 && !ack) - ack = RXRPC_ACK_IDLE; - - if (ack) - rxrpc_propose_ACK(call, ack, ack_serial, - immediate_ack, true, - rxrpc_propose_ack_input_data); +unlock: + if (atomic_add_return(nr_unacked, &call->ackr_nr_unacked) > 2) + rxrpc_send_ACK(call, RXRPC_ACK_IDLE, ack_serial, + rxrpc_propose_ack_input_data); else - rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, - false, true, + rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, ack_serial, rxrpc_propose_ack_input_data); trace_rxrpc_notify_socket(call->debug_id, serial); rxrpc_notify_socket(call); -unlock: spin_unlock(&call->input_lock); rxrpc_free_skb(skb, rxrpc_skb_freed); _leave(" [queued]"); @@ -893,13 +883,11 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) if (buf.ack.reason == RXRPC_ACK_PING) { _proto("Rx ACK %%%u PING Request", ack_serial); - rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE, - ack_serial, true, true, - rxrpc_propose_ack_respond_to_ping); + rxrpc_send_ACK(call, RXRPC_ACK_PING_RESPONSE, ack_serial, + rxrpc_propose_ack_respond_to_ping); } else if (sp->hdr.flags & RXRPC_REQUEST_ACK) { - rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED, - ack_serial, true, true, - rxrpc_propose_ack_respond_to_ack); + rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, ack_serial, + rxrpc_propose_ack_respond_to_ack); } /* If we get an EXCEEDS_WINDOW ACK from the server, it probably @@ -1011,9 +999,8 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) RXRPC_TX_ANNO_LAST && summary.nr_acks == call->tx_top - hard_ack && rxrpc_is_client_call(call)) - rxrpc_propose_ACK(call, RXRPC_ACK_PING, ack_serial, - false, true, - rxrpc_propose_ack_ping_for_lost_reply); + rxrpc_propose_ping(call, ack_serial, + rxrpc_propose_ack_ping_for_lost_reply); rxrpc_congestion_management(call, skb, &summary, acked_serial); out: diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c index e95e75e785fb..a178f71e5082 100644 --- a/net/rxrpc/local_object.c +++ b/net/rxrpc/local_object.c @@ -97,6 +97,8 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet, local->rxnet = rxnet; INIT_HLIST_NODE(&local->link); INIT_WORK(&local->processor, rxrpc_local_processor); + INIT_LIST_HEAD(&local->ack_tx_queue); + spin_lock_init(&local->ack_tx_lock); init_rwsem(&local->defrag_sem); skb_queue_head_init(&local->reject_queue); skb_queue_head_init(&local->event_queue); @@ -432,6 +434,11 @@ static void rxrpc_local_processor(struct work_struct *work) break; } + if (!list_empty(&local->ack_tx_queue)) { + rxrpc_transmit_ack_packets(local); + again = true; + } + if (!skb_queue_empty(&local->reject_queue)) { rxrpc_reject_packets(local); again = true; diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c index 71885d741987..1edbc678e8be 100644 --- a/net/rxrpc/output.c +++ b/net/rxrpc/output.c @@ -16,14 +16,6 @@ #include #include "ar-internal.h" -struct rxrpc_ack_buffer { - struct rxrpc_wire_header whdr; - struct rxrpc_ackpacket ack; - u8 acks[255]; - u8 pad[3]; - struct rxrpc_ackinfo ackinfo; -}; - extern int udpv6_sendmsg(struct sock *sk, struct msghdr *msg, size_t len); static ssize_t do_udp_sendmsg(struct socket *sk, struct msghdr *msg, size_t len) @@ -82,22 +74,21 @@ static void rxrpc_set_keepalive(struct rxrpc_call *call) */ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn, struct rxrpc_call *call, - struct rxrpc_ack_buffer *pkt, + struct rxrpc_txbuf *txb, rxrpc_seq_t *_hard_ack, - rxrpc_seq_t *_top, - u8 reason) + rxrpc_seq_t *_top) { - rxrpc_serial_t serial; + struct rxrpc_ackinfo ackinfo; unsigned int tmp; rxrpc_seq_t hard_ack, top, seq; int ix; u32 mtu, jmax; - u8 *ackp = pkt->acks; + u8 *ackp = txb->acks; tmp = atomic_xchg(&call->ackr_nr_unacked, 0); tmp |= atomic_xchg(&call->ackr_nr_consumed, 0); - if (!tmp && (reason == RXRPC_ACK_DELAY || - reason == RXRPC_ACK_IDLE)) { + if (!tmp && (txb->ack.reason == RXRPC_ACK_DELAY || + txb->ack.reason == RXRPC_ACK_IDLE)) { rxrpc_inc_stat(call->rxnet, stat_tx_ack_skip); return 0; } @@ -105,24 +96,16 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn, rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill); /* Barrier against rxrpc_input_data(). */ - serial = call->ackr_serial; hard_ack = READ_ONCE(call->rx_hard_ack); top = smp_load_acquire(&call->rx_top); *_hard_ack = hard_ack; *_top = top; - pkt->ack.bufferSpace = htons(0); - pkt->ack.maxSkew = htons(0); - pkt->ack.firstPacket = htonl(hard_ack + 1); - pkt->ack.previousPacket = htonl(call->ackr_highest_seq); - pkt->ack.serial = htonl(serial); - pkt->ack.reason = reason; - pkt->ack.nAcks = top - hard_ack; - - if (reason == RXRPC_ACK_PING) - pkt->whdr.flags |= RXRPC_REQUEST_ACK; + txb->ack.firstPacket = htonl(hard_ack + 1); + txb->ack.previousPacket = htonl(call->ackr_highest_seq); + txb->ack.nAcks = top - hard_ack; - if (after(top, hard_ack)) { + if (txb->ack.nAcks) { seq = hard_ack + 1; do { ix = seq & RXRPC_RXTX_BUFF_MASK; @@ -137,15 +120,16 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn, mtu = conn->params.peer->if_mtu; mtu -= conn->params.peer->hdrsize; jmax = (call->nr_jumbo_bad > 3) ? 1 : rxrpc_rx_jumbo_max; - pkt->ackinfo.rxMTU = htonl(rxrpc_rx_mtu); - pkt->ackinfo.maxMTU = htonl(mtu); - pkt->ackinfo.rwind = htonl(call->rx_winsize); - pkt->ackinfo.jumbo_max = htonl(jmax); + ackinfo.rxMTU = htonl(rxrpc_rx_mtu); + ackinfo.maxMTU = htonl(mtu); + ackinfo.rwind = htonl(call->rx_winsize); + ackinfo.jumbo_max = htonl(jmax); *ackp++ = 0; *ackp++ = 0; *ackp++ = 0; - return top - hard_ack + 3; + memcpy(ackp, &ackinfo, sizeof(ackinfo)); + return top - hard_ack + 3 + sizeof(ackinfo); } /* @@ -194,26 +178,21 @@ static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call, /* * Send an ACK call packet. */ -int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping, - rxrpc_serial_t *_serial) +static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *txb) { struct rxrpc_connection *conn; struct rxrpc_ack_buffer *pkt; + struct rxrpc_call *call = txb->call; struct msghdr msg; - struct kvec iov[2]; + struct kvec iov[1]; rxrpc_serial_t serial; rxrpc_seq_t hard_ack, top; size_t len, n; int ret, rtt_slot = -1; - u8 reason; if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) return -ECONNRESET; - pkt = kzalloc(sizeof(*pkt), GFP_KERNEL); - if (!pkt) - return -ENOMEM; - conn = call->conn; msg.msg_name = &call->peer->srx.transport; @@ -222,85 +201,93 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping, msg.msg_controllen = 0; msg.msg_flags = 0; - pkt->whdr.epoch = htonl(conn->proto.epoch); - pkt->whdr.cid = htonl(call->cid); - pkt->whdr.callNumber = htonl(call->call_id); - pkt->whdr.seq = 0; - pkt->whdr.type = RXRPC_PACKET_TYPE_ACK; - pkt->whdr.flags = RXRPC_SLOW_START_OK | conn->out_clientflag; - pkt->whdr.userStatus = 0; - pkt->whdr.securityIndex = call->security_ix; - pkt->whdr._rsvd = 0; - pkt->whdr.serviceId = htons(call->service_id); + if (txb->ack.reason == RXRPC_ACK_PING) + txb->wire.flags |= RXRPC_REQUEST_ACK; spin_lock_bh(&call->lock); - if (ping) { - reason = RXRPC_ACK_PING; - } else { - reason = call->ackr_reason; - if (!call->ackr_reason) { - spin_unlock_bh(&call->lock); - ret = 0; - goto out; - } - call->ackr_reason = 0; - } - n = rxrpc_fill_out_ack(conn, call, pkt, &hard_ack, &top, reason); - + n = rxrpc_fill_out_ack(conn, call, txb, &hard_ack, &top); spin_unlock_bh(&call->lock); if (n == 0) { kfree(pkt); return 0; } - iov[0].iov_base = pkt; - iov[0].iov_len = sizeof(pkt->whdr) + sizeof(pkt->ack) + n; - iov[1].iov_base = &pkt->ackinfo; - iov[1].iov_len = sizeof(pkt->ackinfo); - len = iov[0].iov_len + iov[1].iov_len; + iov[0].iov_base = &txb->wire; + iov[0].iov_len = sizeof(txb->wire) + sizeof(txb->ack) + n; + len = iov[0].iov_len; serial = atomic_inc_return(&conn->serial); - pkt->whdr.serial = htonl(serial); + txb->wire.serial = htonl(serial); trace_rxrpc_tx_ack(call->debug_id, serial, - ntohl(pkt->ack.firstPacket), - ntohl(pkt->ack.serial), - pkt->ack.reason, pkt->ack.nAcks); - if (_serial) - *_serial = serial; + ntohl(txb->ack.firstPacket), + ntohl(txb->ack.serial), txb->ack.reason, txb->ack.nAcks); + if (txb->ack_why == rxrpc_propose_ack_ping_for_lost_ack) + call->acks_lost_ping = serial; - if (ping) + if (txb->ack.reason == RXRPC_ACK_PING) rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_ping); rxrpc_inc_stat(call->rxnet, stat_tx_ack_send); - iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len); + iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len); ret = do_udp_sendmsg(conn->params.local->socket, &msg, len); call->peer->last_tx_at = ktime_get_seconds(); if (ret < 0) trace_rxrpc_tx_fail(call->debug_id, serial, ret, rxrpc_tx_point_call_ack); else - trace_rxrpc_tx_packet(call->debug_id, &pkt->whdr, + trace_rxrpc_tx_packet(call->debug_id, &txb->wire, rxrpc_tx_point_call_ack); rxrpc_tx_backoff(call, ret); if (call->state < RXRPC_CALL_COMPLETE) { - if (ret < 0) { + if (ret < 0) rxrpc_cancel_rtt_probe(call, serial, rtt_slot); - rxrpc_propose_ACK(call, pkt->ack.reason, - ntohl(pkt->ack.serial), - false, true, - rxrpc_propose_ack_retry_tx); - } - rxrpc_set_keepalive(call); } -out: kfree(pkt); return ret; } +/* + * ACK transmitter for a local endpoint. The UDP socket locks around each + * transmission, so we can only transmit one packet at a time, ACK, DATA or + * otherwise. + */ +void rxrpc_transmit_ack_packets(struct rxrpc_local *local) +{ + LIST_HEAD(queue); + int ret; + + trace_rxrpc_local(local->debug_id, rxrpc_local_tx_ack, + refcount_read(&local->ref), NULL); + + if (list_empty(&local->ack_tx_queue)) + return; + + spin_lock_bh(&local->ack_tx_lock); + list_splice_tail_init(&local->ack_tx_queue, &queue); + spin_unlock_bh(&local->ack_tx_lock); + + while (!list_empty(&queue)) { + struct rxrpc_txbuf *txb = + list_entry(queue.next, struct rxrpc_txbuf, tx_link); + + ret = rxrpc_send_ack_packet(local, txb); + if (ret < 0 && ret != -ECONNRESET) { + spin_lock_bh(&local->ack_tx_lock); + list_splice_init(&queue, &local->ack_tx_queue); + spin_unlock_bh(&local->ack_tx_lock); + break; + } + + list_del_init(&txb->tx_link); + rxrpc_put_call(txb->call, rxrpc_call_put); + rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx); + } +} + /* * Send an ABORT call packet. */ diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index a378e7a672a8..104dd4a29f05 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -189,7 +189,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) ASSERTCMP(call->rx_hard_ack, ==, call->rx_top); if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) { - rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true, + rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, rxrpc_propose_ack_terminal_ack); //rxrpc_send_ack_packet(call, false, NULL); } @@ -206,7 +206,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) call->state = RXRPC_CALL_SERVER_ACK_REQUEST; call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; write_unlock_bh(&call->state_lock); - rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true, + rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, rxrpc_propose_ack_processing_op); break; default: @@ -259,12 +259,11 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call) rxrpc_end_rx_phase(call, serial); } else { /* Check to see if there's an ACK that needs sending. */ - if (atomic_inc_return(&call->ackr_nr_consumed) > 2) - rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, - true, false, - rxrpc_propose_ack_rotate_rx); - if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY) - rxrpc_send_ack_packet(call, false, NULL); + if (atomic_inc_return(&call->ackr_nr_consumed) > 2) { + rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial, + rxrpc_propose_ack_rotate_rx); + rxrpc_transmit_ack_packets(call->peer->local); + } } } @@ -363,10 +362,6 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, unsigned int rx_pkt_offset, rx_pkt_len; int ix, copy, ret = -EAGAIN, ret2; - if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) && - call->ackr_reason) - rxrpc_send_ack_packet(call, false, NULL); - rx_pkt_offset = call->rx_pkt_offset; rx_pkt_len = call->rx_pkt_len; rx_pkt_last = call->rx_pkt_last; @@ -389,6 +384,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, if (!skb) { trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq, rx_pkt_offset, rx_pkt_len, 0); + rxrpc_transmit_ack_packets(call->peer->local); break; } smp_rmb(); @@ -604,6 +600,7 @@ try_again: if (ret == -EAGAIN) ret = 0; + rxrpc_transmit_ack_packets(call->peer->local); if (after(call->rx_top, call->rx_hard_ack) && call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK]) rxrpc_notify_socket(call); @@ -734,17 +731,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, read_phase_complete: ret = 1; out: - switch (call->ackr_reason) { - case RXRPC_ACK_IDLE: - break; - case RXRPC_ACK_DELAY: - if (ret != -EAGAIN) - break; - fallthrough; - default: - rxrpc_send_ack_packet(call, false, NULL); - } - + rxrpc_transmit_ack_packets(call->peer->local); if (_service) *_service = call->service_id; mutex_unlock(&call->user_mutex); diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index b2d28aa12e10..ef4949259020 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -332,9 +332,7 @@ reload: rxrpc_see_skb(skb, rxrpc_skb_seen); do { - /* Check to see if there's a ping ACK to reply to. */ - if (call->ackr_reason == RXRPC_ACK_PING_RESPONSE) - rxrpc_send_ack_packet(call, false, NULL); + rxrpc_transmit_ack_packets(call->peer->local); if (!skb) { size_t remain, bufsize, chunk, offset; diff --git a/net/rxrpc/txbuf.c b/net/rxrpc/txbuf.c index 66d922ad65d0..45a7b48a5e10 100644 --- a/net/rxrpc/txbuf.c +++ b/net/rxrpc/txbuf.c @@ -33,6 +33,7 @@ struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type, txb->len = 0; txb->offset = 0; txb->flags = 0; + txb->ack_why = 0; txb->seq = call->tx_top + 1; txb->wire.epoch = htonl(call->conn->proto.epoch); txb->wire.cid = htonl(call->cid); -- 2.34.1