ceph: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage
authorDavid Howells <dhowells@redhat.com>
Fri, 23 Jun 2023 22:55:00 +0000 (23:55 +0100)
committerJakub Kicinski <kuba@kernel.org>
Sat, 24 Jun 2023 22:50:12 +0000 (15:50 -0700)
Use sendmsg() and MSG_SPLICE_PAGES rather than sendpage in ceph when
transmitting data.  For the moment, this can only transmit one page at a
time because of the architecture of net/ceph/, but if
write_partial_message_data() can be given a bvec[] at a time by the
iteration code, this would allow pages to be sent in a batch.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Ilya Dryomov <idryomov@gmail.com>
cc: Xiubo Li <xiubli@redhat.com>
cc: Jeff Layton <jlayton@kernel.org>
cc: Jens Axboe <axboe@kernel.dk>
cc: Matthew Wilcox <willy@infradead.org>
Link: https://lore.kernel.org/r/20230623225513.2732256-4-dhowells@redhat.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
net/ceph/messenger_v1.c

index d664cb1..814579f 100644 (file)
@@ -74,37 +74,6 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
        return r;
 }
 
-/*
- * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
- */
-static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
-                            int offset, size_t size, int more)
-{
-       ssize_t (*sendpage)(struct socket *sock, struct page *page,
-                           int offset, size_t size, int flags);
-       int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
-       int ret;
-
-       /*
-        * sendpage cannot properly handle pages with page_count == 0,
-        * we need to fall back to sendmsg if that's the case.
-        *
-        * Same goes for slab pages: skb_can_coalesce() allows
-        * coalescing neighboring slab objects into a single frag which
-        * triggers one of hardened usercopy checks.
-        */
-       if (sendpage_ok(page))
-               sendpage = sock->ops->sendpage;
-       else
-               sendpage = sock_no_sendpage;
-
-       ret = sendpage(sock, page, offset, size, flags);
-       if (ret == -EAGAIN)
-               ret = 0;
-
-       return ret;
-}
-
 static void con_out_kvec_reset(struct ceph_connection *con)
 {
        BUG_ON(con->v1.out_skip);
@@ -464,7 +433,6 @@ static int write_partial_message_data(struct ceph_connection *con)
        struct ceph_msg *msg = con->out_msg;
        struct ceph_msg_data_cursor *cursor = &msg->cursor;
        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-       int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
        u32 crc;
 
        dout("%s %p msg %p\n", __func__, con, msg);
@@ -482,6 +450,10 @@ static int write_partial_message_data(struct ceph_connection *con)
         */
        crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
        while (cursor->total_resid) {
+               struct bio_vec bvec;
+               struct msghdr msghdr = {
+                       .msg_flags = MSG_SPLICE_PAGES,
+               };
                struct page *page;
                size_t page_offset;
                size_t length;
@@ -493,10 +465,13 @@ static int write_partial_message_data(struct ceph_connection *con)
                }
 
                page = ceph_msg_data_next(cursor, &page_offset, &length);
-               if (length == cursor->total_resid)
-                       more = MSG_MORE;
-               ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
-                                       more);
+               if (length != cursor->total_resid)
+                       msghdr.msg_flags |= MSG_MORE;
+
+               bvec_set_page(&bvec, page, length, page_offset);
+               iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, length);
+
+               ret = sock_sendmsg(con->sock, &msghdr);
                if (ret <= 0) {
                        if (do_datacrc)
                                msg->footer.data_crc = cpu_to_le32(crc);
@@ -526,7 +501,10 @@ static int write_partial_message_data(struct ceph_connection *con)
  */
 static int write_partial_skip(struct ceph_connection *con)
 {
-       int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
+       struct bio_vec bvec;
+       struct msghdr msghdr = {
+               .msg_flags = MSG_SPLICE_PAGES | MSG_MORE,
+       };
        int ret;
 
        dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
@@ -534,9 +512,11 @@ static int write_partial_skip(struct ceph_connection *con)
                size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
 
                if (size == con->v1.out_skip)
-                       more = MSG_MORE;
-               ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
-                                       more);
+                       msghdr.msg_flags &= ~MSG_MORE;
+               bvec_set_page(&bvec, ZERO_PAGE(0), size, 0);
+               iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, size);
+
+               ret = sock_sendmsg(con->sock, &msghdr);
                if (ret <= 0)
                        goto out;
                con->v1.out_skip -= ret;