libceph: separate msgr1 protocol implementation
authorIlya Dryomov <idryomov@gmail.com>
Thu, 12 Nov 2020 11:55:39 +0000 (12:55 +0100)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 14 Dec 2020 22:21:49 +0000 (23:21 +0100)
In preparation for msgr2, define internal messenger <-> protocol
interface (as opposed to external messenger <-> client interface, which
is struct ceph_connection_operations) consisting of try_read(),
try_write(), revoke(), revoke_incoming(), opened(), reset_session() and
reset_protocol() ops.  The semantics are exactly the same as they are
now.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/messenger.h
net/ceph/messenger.c

index 93815f1..8cc8b08 100644 (file)
@@ -382,6 +382,14 @@ int ceph_con_in_msg_alloc(struct ceph_connection *con,
                          struct ceph_msg_header *hdr, int *skip);
 void ceph_con_get_out_msg(struct ceph_connection *con);
 
+int ceph_con_v1_try_read(struct ceph_connection *con);
+int ceph_con_v1_try_write(struct ceph_connection *con);
+void ceph_con_v1_revoke(struct ceph_connection *con);
+void ceph_con_v1_revoke_incoming(struct ceph_connection *con);
+bool ceph_con_v1_opened(struct ceph_connection *con);
+void ceph_con_v1_reset_session(struct ceph_connection *con);
+void ceph_con_v1_reset_protocol(struct ceph_connection *con);
+
 
 extern const char *ceph_pr_addr(const struct ceph_entity_addr *addr);
 
index 85d2037..4ca7d9b 100644 (file)
@@ -593,6 +593,11 @@ int ceph_con_close_socket(struct ceph_connection *con)
        return rc;
 }
 
+void ceph_con_v1_reset_protocol(struct ceph_connection *con)
+{
+       con->out_skip = 0;
+}
+
 static void ceph_con_reset_protocol(struct ceph_connection *con)
 {
        dout("%s con %p\n", __func__, con);
@@ -609,7 +614,7 @@ static void ceph_con_reset_protocol(struct ceph_connection *con)
                con->out_msg = NULL;
        }
 
-       con->out_skip = 0;
+       ceph_con_v1_reset_protocol(con);
 }
 
 /*
@@ -631,6 +636,12 @@ static void ceph_msg_remove_list(struct list_head *head)
        }
 }
 
+void ceph_con_v1_reset_session(struct ceph_connection *con)
+{
+       con->connect_seq = 0;
+       con->peer_global_seq = 0;
+}
+
 void ceph_con_reset_session(struct ceph_connection *con)
 {
        dout("%s con %p\n", __func__, con);
@@ -643,8 +654,7 @@ void ceph_con_reset_session(struct ceph_connection *con)
        con->in_seq = 0;
        con->in_seq_acked = 0;
 
-       con->connect_seq = 0;
-       con->peer_global_seq = 0;
+       ceph_con_v1_reset_session(con);
 }
 
 /*
@@ -692,12 +702,17 @@ void ceph_con_open(struct ceph_connection *con,
 }
 EXPORT_SYMBOL(ceph_con_open);
 
+bool ceph_con_v1_opened(struct ceph_connection *con)
+{
+       return con->connect_seq;
+}
+
 /*
  * return true if this connection ever successfully opened
  */
 bool ceph_con_opened(struct ceph_connection *con)
 {
-       return con->connect_seq > 0;
+       return ceph_con_v1_opened(con);
 }
 
 /*
@@ -2552,7 +2567,7 @@ static int read_keepalive_ack(struct ceph_connection *con)
  * Write something to the socket.  Called in a worker thread when the
  * socket appears to be writeable and we have something ready to send.
  */
-static int try_write(struct ceph_connection *con)
+int ceph_con_v1_try_write(struct ceph_connection *con)
 {
        int ret = 1;
 
@@ -2649,7 +2664,7 @@ out:
 /*
  * Read what we can from the socket.
  */
-static int try_read(struct ceph_connection *con)
+int ceph_con_v1_try_read(struct ceph_connection *con)
 {
        int ret = -1;
 
@@ -2930,7 +2945,7 @@ static void ceph_con_workfn(struct work_struct *work)
                        BUG_ON(con->sock);
                }
 
-               ret = try_read(con);
+               ret = ceph_con_v1_try_read(con);
                if (ret < 0) {
                        if (ret == -EAGAIN)
                                continue;
@@ -2940,7 +2955,7 @@ static void ceph_con_workfn(struct work_struct *work)
                        break;
                }
 
-               ret = try_write(con);
+               ret = ceph_con_v1_try_write(con);
                if (ret < 0) {
                        if (ret == -EAGAIN)
                                continue;
@@ -3116,6 +3131,29 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 }
 EXPORT_SYMBOL(ceph_con_send);
 
+void ceph_con_v1_revoke(struct ceph_connection *con)
+{
+       struct ceph_msg *msg = con->out_msg;
+
+       WARN_ON(con->out_skip);
+       /* footer */
+       if (con->out_msg_done) {
+               con->out_skip += con_out_kvec_skip(con);
+       } else {
+               WARN_ON(!msg->data_length);
+               con->out_skip += sizeof_footer(con);
+       }
+       /* data, middle, front */
+       if (msg->data_length)
+               con->out_skip += msg->cursor.total_resid;
+       if (msg->middle)
+               con->out_skip += con_out_kvec_skip(con);
+       con->out_skip += con_out_kvec_skip(con);
+
+       dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
+            con->out_kvec_bytes, con->out_skip);
+}
+
 /*
  * Revoke a message that was previously queued for send
  */
@@ -3129,39 +3167,50 @@ void ceph_msg_revoke(struct ceph_msg *msg)
        }
 
        mutex_lock(&con->mutex);
-       if (!list_empty(&msg->list_head)) {
-               dout("%s %p msg %p - was on queue\n", __func__, con, msg);
-               list_del_init(&msg->list_head);
-               msg->hdr.seq = 0;
-
-               ceph_msg_put(msg);
+       if (list_empty(&msg->list_head)) {
+               WARN_ON(con->out_msg == msg);
+               dout("%s con %p msg %p not linked\n", __func__, con, msg);
+               mutex_unlock(&con->mutex);
+               return;
        }
-       if (con->out_msg == msg) {
-               BUG_ON(con->out_skip);
-               /* footer */
-               if (con->out_msg_done) {
-                       con->out_skip += con_out_kvec_skip(con);
-               } else {
-                       BUG_ON(!msg->data_length);
-                       con->out_skip += sizeof_footer(con);
-               }
-               /* data, middle, front */
-               if (msg->data_length)
-                       con->out_skip += msg->cursor.total_resid;
-               if (msg->middle)
-                       con->out_skip += con_out_kvec_skip(con);
-               con->out_skip += con_out_kvec_skip(con);
 
-               dout("%s %p msg %p - was sending, will write %d skip %d\n",
-                    __func__, con, msg, con->out_kvec_bytes, con->out_skip);
-               msg->hdr.seq = 0;
+       dout("%s con %p msg %p was linked\n", __func__, con, msg);
+       msg->hdr.seq = 0;
+       ceph_msg_remove(msg);
+
+       if (con->out_msg == msg) {
+               WARN_ON(con->state != CEPH_CON_S_OPEN);
+               dout("%s con %p msg %p was sending\n", __func__, con, msg);
+               ceph_con_v1_revoke(con);
+               ceph_msg_put(con->out_msg);
                con->out_msg = NULL;
-               ceph_msg_put(msg);
+       } else {
+               dout("%s con %p msg %p not current, out_msg %p\n", __func__,
+                    con, msg, con->out_msg);
        }
-
        mutex_unlock(&con->mutex);
 }
 
+void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
+{
+       unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+       unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+       unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
+
+       /* skip rest of message */
+       con->in_base_pos = con->in_base_pos -
+                       sizeof(struct ceph_msg_header) -
+                       front_len -
+                       middle_len -
+                       data_len -
+                       sizeof(struct ceph_msg_footer);
+
+       con->in_tag = CEPH_MSGR_TAG_READY;
+       con->in_seq++;
+
+       dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos);
+}
+
 /*
  * Revoke a message that we may be reading data into
  */
@@ -3176,25 +3225,14 @@ void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 
        mutex_lock(&con->mutex);
        if (con->in_msg == msg) {
-               unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
-               unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
-               unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
-
-               /* skip rest of message */
-               dout("%s %p msg %p revoked\n", __func__, con, msg);
-               con->in_base_pos = con->in_base_pos -
-                               sizeof(struct ceph_msg_header) -
-                               front_len -
-                               middle_len -
-                               data_len -
-                               sizeof(struct ceph_msg_footer);
+               WARN_ON(con->state != CEPH_CON_S_OPEN);
+               dout("%s con %p msg %p was recving\n", __func__, con, msg);
+               ceph_con_v1_revoke_incoming(con);
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               con->in_tag = CEPH_MSGR_TAG_READY;
-               con->in_seq++;
        } else {
-               dout("%s %p in_msg %p msg %p no-op\n",
-                    __func__, con, con->in_msg, msg);
+               dout("%s con %p msg %p not current, in_msg %p\n", __func__,
+                    con, msg, con->in_msg);
        }
        mutex_unlock(&con->mutex);
 }