return rc;
}
+void ceph_con_v1_reset_protocol(struct ceph_connection *con)
+{
+ con->out_skip = 0;
+}
+
static void ceph_con_reset_protocol(struct ceph_connection *con)
{
dout("%s con %p\n", __func__, con);
con->out_msg = NULL;
}
- con->out_skip = 0;
+ ceph_con_v1_reset_protocol(con);
}
/*
}
}
+void ceph_con_v1_reset_session(struct ceph_connection *con)
+{
+ con->connect_seq = 0;
+ con->peer_global_seq = 0;
+}
+
void ceph_con_reset_session(struct ceph_connection *con)
{
dout("%s con %p\n", __func__, con);
con->in_seq = 0;
con->in_seq_acked = 0;
- con->connect_seq = 0;
- con->peer_global_seq = 0;
+ ceph_con_v1_reset_session(con);
}
/*
}
EXPORT_SYMBOL(ceph_con_open);
+bool ceph_con_v1_opened(struct ceph_connection *con)
+{
+ return con->connect_seq;
+}
+
/*
* return true if this connection ever successfully opened
*/
bool ceph_con_opened(struct ceph_connection *con)
{
- return con->connect_seq > 0;
+ return ceph_con_v1_opened(con);
}
/*
* Write something to the socket. Called in a worker thread when the
* socket appears to be writeable and we have something ready to send.
*/
-static int try_write(struct ceph_connection *con)
+int ceph_con_v1_try_write(struct ceph_connection *con)
{
int ret = 1;
/*
* Read what we can from the socket.
*/
-static int try_read(struct ceph_connection *con)
+int ceph_con_v1_try_read(struct ceph_connection *con)
{
int ret = -1;
BUG_ON(con->sock);
}
- ret = try_read(con);
+ ret = ceph_con_v1_try_read(con);
if (ret < 0) {
if (ret == -EAGAIN)
continue;
break;
}
- ret = try_write(con);
+ ret = ceph_con_v1_try_write(con);
if (ret < 0) {
if (ret == -EAGAIN)
continue;
}
EXPORT_SYMBOL(ceph_con_send);
+void ceph_con_v1_revoke(struct ceph_connection *con)
+{
+ struct ceph_msg *msg = con->out_msg;
+
+ WARN_ON(con->out_skip);
+ /* footer */
+ if (con->out_msg_done) {
+ con->out_skip += con_out_kvec_skip(con);
+ } else {
+ WARN_ON(!msg->data_length);
+ con->out_skip += sizeof_footer(con);
+ }
+ /* data, middle, front */
+ if (msg->data_length)
+ con->out_skip += msg->cursor.total_resid;
+ if (msg->middle)
+ con->out_skip += con_out_kvec_skip(con);
+ con->out_skip += con_out_kvec_skip(con);
+
+ dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
+ con->out_kvec_bytes, con->out_skip);
+}
+
/*
* Revoke a message that was previously queued for send
*/
}
mutex_lock(&con->mutex);
- if (!list_empty(&msg->list_head)) {
- dout("%s %p msg %p - was on queue\n", __func__, con, msg);
- list_del_init(&msg->list_head);
- msg->hdr.seq = 0;
-
- ceph_msg_put(msg);
+ if (list_empty(&msg->list_head)) {
+ WARN_ON(con->out_msg == msg);
+ dout("%s con %p msg %p not linked\n", __func__, con, msg);
+ mutex_unlock(&con->mutex);
+ return;
}
- if (con->out_msg == msg) {
- BUG_ON(con->out_skip);
- /* footer */
- if (con->out_msg_done) {
- con->out_skip += con_out_kvec_skip(con);
- } else {
- BUG_ON(!msg->data_length);
- con->out_skip += sizeof_footer(con);
- }
- /* data, middle, front */
- if (msg->data_length)
- con->out_skip += msg->cursor.total_resid;
- if (msg->middle)
- con->out_skip += con_out_kvec_skip(con);
- con->out_skip += con_out_kvec_skip(con);
- dout("%s %p msg %p - was sending, will write %d skip %d\n",
- __func__, con, msg, con->out_kvec_bytes, con->out_skip);
- msg->hdr.seq = 0;
+ dout("%s con %p msg %p was linked\n", __func__, con, msg);
+ msg->hdr.seq = 0;
+ ceph_msg_remove(msg);
+
+ if (con->out_msg == msg) {
+ WARN_ON(con->state != CEPH_CON_S_OPEN);
+ dout("%s con %p msg %p was sending\n", __func__, con, msg);
+ ceph_con_v1_revoke(con);
+ ceph_msg_put(con->out_msg);
con->out_msg = NULL;
- ceph_msg_put(msg);
+ } else {
+ dout("%s con %p msg %p not current, out_msg %p\n", __func__,
+ con, msg, con->out_msg);
}
-
mutex_unlock(&con->mutex);
}
+void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
+{
+ unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+ unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+ unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
+
+ /* skip rest of message */
+ con->in_base_pos = con->in_base_pos -
+ sizeof(struct ceph_msg_header) -
+ front_len -
+ middle_len -
+ data_len -
+ sizeof(struct ceph_msg_footer);
+
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ con->in_seq++;
+
+ dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos);
+}
+
/*
* Revoke a message that we may be reading data into
*/
mutex_lock(&con->mutex);
if (con->in_msg == msg) {
- unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
- unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
- unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
-
- /* skip rest of message */
- dout("%s %p msg %p revoked\n", __func__, con, msg);
- con->in_base_pos = con->in_base_pos -
- sizeof(struct ceph_msg_header) -
- front_len -
- middle_len -
- data_len -
- sizeof(struct ceph_msg_footer);
+ WARN_ON(con->state != CEPH_CON_S_OPEN);
+ dout("%s con %p msg %p was recving\n", __func__, con, msg);
+ ceph_con_v1_revoke_incoming(con);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
- con->in_tag = CEPH_MSGR_TAG_READY;
- con->in_seq++;
} else {
- dout("%s %p in_msg %p msg %p no-op\n",
- __func__, con, con->in_msg, msg);
+ dout("%s con %p msg %p not current, in_msg %p\n", __func__,
+ con, msg, con->in_msg);
}
mutex_unlock(&con->mutex);
}