ceph: alloc message data pages and check if tid exists
authorYehuda Sadeh <yehuda@hq.newdream.net>
Mon, 11 Jan 2010 22:47:13 +0000 (14:47 -0800)
committerSage Weil <sage@newdream.net>
Mon, 25 Jan 2010 20:57:46 +0000 (12:57 -0800)
Now doing it in the same callback that is also responsible for
allocating the 'front' part of the message. If we get a message
that we haven't got a corresponding tid for, mark it for skipping.

Moving the mutex unlock/lock from the osd alloc_msg callback
to the calling function in the messenger.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
fs/ceph/messenger.c
fs/ceph/messenger.h
fs/ceph/mon_client.c
fs/ceph/osd_client.c

index e8742cc..f708803 100644 (file)
@@ -2114,25 +2114,6 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
        return 0;
 }
 
-static int ceph_alloc_data_section(struct ceph_connection *con, struct ceph_msg *msg)
-{
-       int ret;
-       int want;
-       int data_len = le32_to_cpu(msg->hdr.data_len);
-       unsigned data_off = le16_to_cpu(msg->hdr.data_off);
-
-       want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
-       ret = -1;
-       mutex_unlock(&con->mutex);
-       if (con->ops->prepare_pages)
-               ret = con->ops->prepare_pages(con, msg, want);
-       mutex_lock(&con->mutex);
-
-       BUG_ON(msg->nr_pages < want);
-
-       return ret;
-}
-
 /*
  * Generic message allocator, for incoming messages.
  */
@@ -2143,12 +2124,13 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
        int type = le16_to_cpu(hdr->type);
        int front_len = le32_to_cpu(hdr->front_len);
        int middle_len = le32_to_cpu(hdr->middle_len);
-       int data_len = le32_to_cpu(hdr->data_len);
        struct ceph_msg *msg = NULL;
        int ret;
 
        if (con->ops->alloc_msg) {
+               mutex_unlock(&con->mutex);
                msg = con->ops->alloc_msg(con, hdr, skip);
+               mutex_lock(&con->mutex);
                if (IS_ERR(msg))
                        return msg;
 
@@ -2175,17 +2157,6 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
                }
        }
 
-       if (data_len) {
-               ret = ceph_alloc_data_section(con, msg);
-
-               if (ret < 0) {
-                       *skip = 1;
-                       ceph_msg_put(msg);
-                       return NULL;
-               }
-       }
-
-
        return msg;
 }
 
index b6bec59..dca2d32 100644 (file)
@@ -46,10 +46,6 @@ struct ceph_connection_operations {
        struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
                                        struct ceph_msg_header *hdr,
                                        int *skip);
-       /* an incoming message has a data payload; tell me what pages I
-        * should read the data into. */
-       int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
-                             int want);
 };
 
 extern const char *ceph_name_type_str(int t);
index 6c00b37..3f7ae7f 100644 (file)
@@ -701,6 +701,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
        struct ceph_msg *m;
 
        *skip = 0;
+
        switch (type) {
        case CEPH_MSG_MON_SUBSCRIBE_ACK:
                m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len);
index 545e936..44abe29 100644 (file)
@@ -998,31 +998,26 @@ bad:
  * find those pages.
  *  0 = success, -1 failure.
  */
-static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
-                        int want)
+static int prepare_pages(struct ceph_connection *con,
+                        struct ceph_msg_header *hdr,
+                        struct ceph_osd_request *req,
+                        u64 tid,
+                        struct ceph_msg *m)
 {
        struct ceph_osd *osd = con->private;
        struct ceph_osd_client *osdc;
-       struct ceph_osd_request *req;
-       u64 tid;
        int ret = -1;
-       int type = le16_to_cpu(m->hdr.type);
+       int data_len = le32_to_cpu(hdr->data_len);
+       unsigned data_off = le16_to_cpu(hdr->data_off);
+
+       int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
 
        if (!osd)
                return -1;
+
        osdc = osd->o_osdc;
 
        dout("prepare_pages on msg %p want %d\n", m, want);
-       if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
-               return -1;  /* hmm! */
-
-       tid = le64_to_cpu(m->hdr.tid);
-       mutex_lock(&osdc->request_mutex);
-       req = __lookup_request(osdc, tid);
-       if (!req) {
-               dout("prepare_pages unknown tid %llu\n", tid);
-               goto out;
-       }
        dout("prepare_pages tid %llu has %d pages, want %d\n",
             tid, req->r_num_pages, want);
        if (unlikely(req->r_num_pages < want))
@@ -1040,7 +1035,8 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
        m->nr_pages = req->r_num_pages;
        ret = 0; /* success */
 out:
-       mutex_unlock(&osdc->request_mutex);
+       BUG_ON(ret < 0 || m->nr_pages < want);
+
        return ret;
 }
 
@@ -1311,19 +1307,42 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
        struct ceph_osd_client *osdc = osd->o_osdc;
        int type = le16_to_cpu(hdr->type);
        int front = le32_to_cpu(hdr->front_len);
+       int data_len = le32_to_cpu(hdr->data_len);
        struct ceph_msg *m;
+       struct ceph_osd_request *req;
+       u64 tid;
+       int err;
 
        *skip = 0;
-       switch (type) {
-       case CEPH_MSG_OSD_OPREPLY:
-               m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
-               break;
-       default:
+       if (type != CEPH_MSG_OSD_OPREPLY)
                return NULL;
-       }
 
-       if (!m)
+       tid = le64_to_cpu(hdr->tid);
+       mutex_lock(&osdc->request_mutex);
+       req = __lookup_request(osdc, tid);
+       if (!req) {
+               *skip = 1;
+               m = NULL;
+               dout("prepare_pages unknown tid %llu\n", tid);
+               goto out;
+       }
+       m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
+       if (!m) {
                *skip = 1;
+               goto out;
+       }
+
+       if (data_len > 0) {
+               err = prepare_pages(con, hdr, req, tid, m);
+               if (err < 0) {
+                       *skip = 1;
+                       ceph_msg_put(m);
+                       m = ERR_PTR(err);
+               }
+       }
+
+out:
+       mutex_unlock(&osdc->request_mutex);
 
        return m;
 }
@@ -1400,5 +1419,4 @@ const static struct ceph_connection_operations osd_con_ops = {
        .verify_authorizer_reply = verify_authorizer_reply,
        .alloc_msg = alloc_msg,
        .fault = osd_reset,
-       .prepare_pages = prepare_pages,
 };