libceph: record message data length
authorAlex Elder <elder@inktank.com>
Thu, 14 Mar 2013 19:09:06 +0000 (14:09 -0500)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:17:57 +0000 (21:17 -0700)
Keep track of the length of the data portion for a message in a
separate field in the ceph_msg structure.  This information has
been maintained in wire byte order in the message header, but
that's going to change soon.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
include/linux/ceph/messenger.h
net/ceph/messenger.c
net/ceph/osd_client.c

index 3181321..b832c0c 100644 (file)
@@ -139,6 +139,7 @@ struct ceph_msg {
        struct kvec front;              /* unaligned blobs of message */
        struct ceph_buffer *middle;
 
+       size_t                  data_length;
        struct ceph_msg_data    *data;  /* data payload */
 
        struct ceph_connection *con;
@@ -270,7 +271,8 @@ extern void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
                                size_t length, size_t alignment);
 extern void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
                                struct ceph_pagelist *pagelist);
-extern void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio);
+extern void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
+                               size_t length);
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
                                     bool can_fail);
index ee16086..fa9b4d0 100644 (file)
@@ -2981,6 +2981,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
 
        BUG_ON(!pages);
        BUG_ON(!length);
+       BUG_ON(msg->data_length);
        BUG_ON(msg->data != NULL);
 
        data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
@@ -2990,6 +2991,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
        data->alignment = alignment & ~PAGE_MASK;
 
        msg->data = data;
+       msg->data_length = length;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pages);
 
@@ -3000,6 +3002,7 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
 
        BUG_ON(!pagelist);
        BUG_ON(!pagelist->length);
+       BUG_ON(msg->data_length);
        BUG_ON(msg->data != NULL);
 
        data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
@@ -3007,14 +3010,17 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
        data->pagelist = pagelist;
 
        msg->data = data;
+       msg->data_length = pagelist->length;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
 
-void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
+void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
+               size_t length)
 {
        struct ceph_msg_data *data;
 
        BUG_ON(!bio);
+       BUG_ON(msg->data_length);
        BUG_ON(msg->data != NULL);
 
        data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
@@ -3022,6 +3028,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
        data->bio = bio;
 
        msg->data = data;
+       msg->data_length = length;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_bio);
 
@@ -3200,6 +3207,7 @@ void ceph_msg_last_put(struct kref *kref)
        }
        ceph_msg_data_destroy(m->data);
        m->data = NULL;
+       m->data_length = 0;
 
        if (m->pool)
                ceph_msgpool_put(m->pool, m);
index e088792..0b4951e 100644 (file)
@@ -1848,7 +1848,7 @@ static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
                ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
 #ifdef CONFIG_BLOCK
        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
-               ceph_msg_data_set_bio(msg, osd_data->bio);
+               ceph_msg_data_set_bio(msg, osd_data->bio, osd_data->bio_length);
 #endif
        } else {
                BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);