From 6dfb4367cd911d2b03878fffa045d545ba4507f6 Mon Sep 17 00:00:00 2001 From: Paolo Abeni Date: Tue, 16 May 2017 11:20:15 +0200 Subject: [PATCH] udp: keep the sk_receive_queue held when splicing On packet reception, when we are forced to splice the sk_receive_queue, we can keep the related lock held, so that we can avoid re-acquiring it, if fwd memory scheduling is required. v1 -> v2: the rx_queue_lock_held param in udp_rmem_release() is now a bool Signed-off-by: Paolo Abeni Acked-by: Eric Dumazet Signed-off-by: David S. Miller --- net/ipv4/udp.c | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c index 492c76b..7bd56c9 100644 --- a/net/ipv4/udp.c +++ b/net/ipv4/udp.c @@ -1164,7 +1164,8 @@ out: } /* fully reclaim rmem/fwd memory allocated for skb */ -static void udp_rmem_release(struct sock *sk, int size, int partial) +static void udp_rmem_release(struct sock *sk, int size, int partial, + bool rx_queue_lock_held) { struct udp_sock *up = udp_sk(sk); struct sk_buff_head *sk_queue; @@ -1181,9 +1182,13 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) } up->forward_deficit = 0; - /* acquire the sk_receive_queue for fwd allocated memory scheduling */ + /* acquire the sk_receive_queue for fwd allocated memory scheduling, + * if the called don't held it already + */ sk_queue = &sk->sk_receive_queue; - spin_lock(&sk_queue->lock); + if (!rx_queue_lock_held) + spin_lock(&sk_queue->lock); + sk->sk_forward_alloc += size; amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1); @@ -1197,7 +1202,8 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) /* this can save us from acquiring the rx queue lock on next receive */ skb_queue_splice_tail_init(sk_queue, &up->reader_queue); - spin_unlock(&sk_queue->lock); + if (!rx_queue_lock_held) + spin_unlock(&sk_queue->lock); } /* Note: called with reader_queue.lock held. @@ -1207,10 +1213,16 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) */ void udp_skb_destructor(struct sock *sk, struct sk_buff *skb) { - udp_rmem_release(sk, skb->dev_scratch, 1); + udp_rmem_release(sk, skb->dev_scratch, 1, false); } EXPORT_SYMBOL(udp_skb_destructor); +/* as above, but the caller held the rx queue lock, too */ +void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb) +{ + udp_rmem_release(sk, skb->dev_scratch, 1, true); +} + /* Idea of busylocks is to let producers grab an extra spinlock * to relieve pressure on the receive_queue spinlock shared by consumer. * Under flood, this means that only one producer can be in line @@ -1325,7 +1337,7 @@ void udp_destruct_sock(struct sock *sk) total += skb->truesize; kfree_skb(skb); } - udp_rmem_release(sk, total, 0); + udp_rmem_release(sk, total, 0, true); inet_sock_destruct(sk); } @@ -1397,7 +1409,7 @@ static int first_packet_length(struct sock *sk) } res = skb ? skb->len : -1; if (total) - udp_rmem_release(sk, total, 1); + udp_rmem_release(sk, total, 1, false); spin_unlock_bh(&rcvq->lock); return res; } @@ -1471,16 +1483,20 @@ struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags, goto busy_check; } - /* refill the reader queue and walk it again */ + /* refill the reader queue and walk it again + * keep both queues locked to avoid re-acquiring + * the sk_receive_queue lock if fwd memory scheduling + * is needed. + */ _off = *off; spin_lock(&sk_queue->lock); skb_queue_splice_tail_init(sk_queue, queue); - spin_unlock(&sk_queue->lock); skb = __skb_try_recv_from_queue(sk, queue, flags, - udp_skb_destructor, + udp_skb_dtor_locked, peeked, &_off, err, &last); + spin_unlock(&sk_queue->lock); spin_unlock_bh(&queue->lock); if (skb) { *off = _off; -- 2.7.4