kcm: Send multiple frags in one sendmsg()
authorDavid Howells <dhowells@redhat.com>
Fri, 9 Jun 2023 10:02:21 +0000 (11:02 +0100)
committerJakub Kicinski <kuba@kernel.org>
Tue, 13 Jun 2023 04:13:23 +0000 (21:13 -0700)
Rewrite the AF_KCM transmission loop to send all the fragments in a single
skb or frag_list-skb in one sendmsg() with MSG_SPLICE_PAGES set.  The list
of fragments in each skb is conveniently a bio_vec[] that can just be
attached to a BVEC iter.

Note: I'm working out the size of each fragment-skb by adding up bv_len for
all the bio_vecs in skb->frags[] - but surely this information is recorded
somewhere?  For the skbs in head->frag_list, this is equal to
skb->data_len, but not for the head.  head->data_len includes all the tail
frags too.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Tom Herbert <tom@herbertland.com>
cc: Tom Herbert <tom@quantonium.net>
cc: Jens Axboe <axboe@kernel.dk>
cc: Matthew Wilcox <willy@infradead.org>
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
include/net/kcm.h
net/kcm/kcmsock.c

index 2d704f8..90279e5 100644 (file)
@@ -47,9 +47,9 @@ struct kcm_stats {
 
 struct kcm_tx_msg {
        unsigned int sent;
-       unsigned int fragidx;
        unsigned int frag_offset;
        unsigned int msg_flags;
+       bool started_tx;
        struct sk_buff *frag_skb;
        struct sk_buff *last_skb;
 };
index 3bcac14..d75d775 100644 (file)
@@ -581,12 +581,10 @@ static void kcm_report_tx_retry(struct kcm_sock *kcm)
  */
 static int kcm_write_msgs(struct kcm_sock *kcm)
 {
+       unsigned int total_sent = 0;
        struct sock *sk = &kcm->sk;
        struct kcm_psock *psock;
-       struct sk_buff *skb, *head;
-       struct kcm_tx_msg *txm;
-       unsigned short fragidx, frag_offset;
-       unsigned int sent, total_sent = 0;
+       struct sk_buff *head;
        int ret = 0;
 
        kcm->tx_wait_more = false;
@@ -600,78 +598,57 @@ static int kcm_write_msgs(struct kcm_sock *kcm)
                if (skb_queue_empty(&sk->sk_write_queue))
                        return 0;
 
-               kcm_tx_msg(skb_peek(&sk->sk_write_queue))->sent = 0;
-
-       } else if (skb_queue_empty(&sk->sk_write_queue)) {
-               return 0;
+               kcm_tx_msg(skb_peek(&sk->sk_write_queue))->started_tx = false;
        }
 
-       head = skb_peek(&sk->sk_write_queue);
-       txm = kcm_tx_msg(head);
-
-       if (txm->sent) {
-               /* Send of first skbuff in queue already in progress */
-               if (WARN_ON(!psock)) {
-                       ret = -EINVAL;
-                       goto out;
+retry:
+       while ((head = skb_peek(&sk->sk_write_queue))) {
+               struct msghdr msg = {
+                       .msg_flags = MSG_DONTWAIT | MSG_SPLICE_PAGES,
+               };
+               struct kcm_tx_msg *txm = kcm_tx_msg(head);
+               struct sk_buff *skb;
+               unsigned int msize;
+               int i;
+
+               if (!txm->started_tx) {
+                       psock = reserve_psock(kcm);
+                       if (!psock)
+                               goto out;
+                       skb = head;
+                       txm->frag_offset = 0;
+                       txm->sent = 0;
+                       txm->started_tx = true;
+               } else {
+                       if (WARN_ON(!psock)) {
+                               ret = -EINVAL;
+                               goto out;
+                       }
+                       skb = txm->frag_skb;
                }
-               sent = txm->sent;
-               frag_offset = txm->frag_offset;
-               fragidx = txm->fragidx;
-               skb = txm->frag_skb;
-
-               goto do_frag;
-       }
-
-try_again:
-       psock = reserve_psock(kcm);
-       if (!psock)
-               goto out;
-
-       do {
-               skb = head;
-               txm = kcm_tx_msg(head);
-               sent = 0;
 
-do_frag_list:
                if (WARN_ON(!skb_shinfo(skb)->nr_frags)) {
                        ret = -EINVAL;
                        goto out;
                }
 
-               for (fragidx = 0; fragidx < skb_shinfo(skb)->nr_frags;
-                    fragidx++) {
-                       struct bio_vec bvec;
-                       struct msghdr msg = {
-                               .msg_flags = MSG_DONTWAIT | MSG_SPLICE_PAGES,
-                       };
-                       skb_frag_t *frag;
-
-                       frag_offset = 0;
-do_frag:
-                       frag = &skb_shinfo(skb)->frags[fragidx];
-                       if (WARN_ON(!skb_frag_size(frag))) {
-                               ret = -EINVAL;
-                               goto out;
-                       }
+               msize = 0;
+               for (i = 0; i < skb_shinfo(skb)->nr_frags; i++)
+                       msize += skb_shinfo(skb)->frags[i].bv_len;
+
+               iov_iter_bvec(&msg.msg_iter, ITER_SOURCE,
+                             skb_shinfo(skb)->frags, skb_shinfo(skb)->nr_frags,
+                             msize);
+               iov_iter_advance(&msg.msg_iter, txm->frag_offset);
 
-                       bvec_set_page(&bvec,
-                                     skb_frag_page(frag),
-                                     skb_frag_size(frag) - frag_offset,
-                                     skb_frag_off(frag) + frag_offset);
-                       iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1,
-                                     bvec.bv_len);
+               do {
                        ret = sock_sendmsg(psock->sk->sk_socket, &msg);
                        if (ret <= 0) {
                                if (ret == -EAGAIN) {
                                        /* Save state to try again when there's
                                         * write space on the socket
                                         */
-                                       txm->sent = sent;
-                                       txm->frag_offset = frag_offset;
-                                       txm->fragidx = fragidx;
                                        txm->frag_skb = skb;
-
                                        ret = 0;
                                        goto out;
                                }
@@ -685,39 +662,36 @@ do_frag:
                                                   true);
                                unreserve_psock(kcm);
 
-                               txm->sent = 0;
+                               txm->started_tx = false;
                                kcm_report_tx_retry(kcm);
                                ret = 0;
-
-                               goto try_again;
+                               goto retry;
                        }
 
-                       sent += ret;
-                       frag_offset += ret;
+                       txm->sent += ret;
+                       txm->frag_offset += ret;
                        KCM_STATS_ADD(psock->stats.tx_bytes, ret);
-                       if (frag_offset < skb_frag_size(frag)) {
-                               /* Not finished with this frag */
-                               goto do_frag;
-                       }
-               }
+               } while (msg.msg_iter.count > 0);
 
                if (skb == head) {
                        if (skb_has_frag_list(skb)) {
-                               skb = skb_shinfo(skb)->frag_list;
-                               goto do_frag_list;
+                               txm->frag_skb = skb_shinfo(skb)->frag_list;
+                               txm->frag_offset = 0;
+                               continue;
                        }
                } else if (skb->next) {
-                       skb = skb->next;
-                       goto do_frag_list;
+                       txm->frag_skb = skb->next;
+                       txm->frag_offset = 0;
+                       continue;
                }
 
                /* Successfully sent the whole packet, account for it. */
+               sk->sk_wmem_queued -= txm->sent;
+               total_sent += txm->sent;
                skb_dequeue(&sk->sk_write_queue);
                kfree_skb(head);
-               sk->sk_wmem_queued -= sent;
-               total_sent += sent;
                KCM_STATS_INCR(psock->stats.tx_msgs);
-       } while ((head = skb_peek(&sk->sk_write_queue)));
+       }
 out:
        if (!head) {
                /* Done with all queued messages. */