ceph: carry explicit msg reference for currently sending message
authorSage Weil <sage@newdream.net>
Mon, 14 Dec 2009 22:04:30 +0000 (14:04 -0800)
committerSage Weil <sage@newdream.net>
Tue, 22 Dec 2009 00:39:38 +0000 (16:39 -0800)
Carry a ceph_msg reference for connection->out_msg.  This will allow us to
make out_sent optional.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/messenger.c
fs/ceph/messenger.h

index b0571b0..96fd556 100644 (file)
@@ -322,7 +322,10 @@ static void reset_connection(struct ceph_connection *con)
 
        con->connect_seq = 0;
        con->out_seq = 0;
-       con->out_msg = NULL;
+       if (con->out_msg) {
+               ceph_msg_put(con->out_msg);
+               con->out_msg = NULL;
+       }
        con->in_seq = 0;
        mutex_unlock(&con->out_mutex);
 }
@@ -423,7 +426,7 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v)
        con->out_kvec_bytes += sizeof(m->footer);
        con->out_kvec_left++;
        con->out_more = m->more_to_follow;
-       con->out_msg = NULL;   /* we're done with this one */
+       con->out_msg_done = true;
 }
 
 /*
@@ -436,6 +439,7 @@ static void prepare_write_message(struct ceph_connection *con)
 
        con->out_kvec_bytes = 0;
        con->out_kvec_is_msg = true;
+       con->out_msg_done = false;
 
        /* Sneak an ack in there first?  If we can get it into the same
         * TCP packet that's a good thing. */
@@ -452,8 +456,9 @@ static void prepare_write_message(struct ceph_connection *con)
        /* move message to sending/sent list */
        m = list_first_entry(&con->out_queue,
                       struct ceph_msg, list_head);
+       con->out_msg = m;
+       ceph_msg_get(m);
        list_move_tail(&m->list_head, &con->out_sent);
-       con->out_msg = m;   /* we don't bother taking a reference here. */
 
        m->hdr.seq = cpu_to_le64(++con->out_seq);
 
@@ -1521,6 +1526,12 @@ more_kvec:
 
        /* msg pages? */
        if (con->out_msg) {
+               if (con->out_msg_done) {
+                       ceph_msg_put(con->out_msg);
+                       con->out_msg = NULL;   /* we're done with this one */
+                       goto do_next;
+               }
+
                ret = write_partial_msg_pages(con);
                if (ret == 1)
                        goto more_kvec;  /* we need to send the footer, too! */
@@ -1533,6 +1544,7 @@ more_kvec:
                }
        }
 
+do_next:
        if (!test_bit(CONNECTING, &con->state)) {
                /* is anything else pending? */
                if (!list_empty(&con->out_queue)) {
@@ -1923,8 +1935,10 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
                list_del_init(&msg->list_head);
                ceph_msg_put(msg);
                msg->hdr.seq = 0;
-               if (con->out_msg == msg)
+               if (con->out_msg == msg) {
+                       ceph_msg_put(con->out_msg);
                        con->out_msg = NULL;
+               }
                if (con->out_kvec_is_msg) {
                        con->out_skip = con->out_kvec_bytes;
                        con->out_kvec_is_msg = false;
index 981b7c0..eff5cb5 100644 (file)
@@ -182,6 +182,7 @@ struct ceph_connection {
        /* message out temps */
        struct ceph_msg *out_msg;        /* sending message (== tail of
                                            out_sent) */
+       bool out_msg_done;
        struct ceph_msg_pos out_msg_pos;
 
        struct kvec out_kvec[8],         /* sending header/footer data */