con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
if (skip) {
/* skip this message */
- pr_err("alloc_msg returned NULL, skipping message\n");
+ dout("alloc_msg returned NULL, skipping message\n");
con->in_base_pos = -front_len - middle_len - data_len -
sizeof(m->footer);
con->in_tag = CEPH_MSGR_TAG_READY;
if (IS_ERR(con->in_msg)) {
ret = PTR_ERR(con->in_msg);
con->in_msg = NULL;
- con->error_msg = "error allocating memory for incoming message";
+ con->error_msg =
+ "error allocating memory for incoming message";
return ret;
}
m = con->in_msg;
struct ceph_mon_client *monc = con->private;
int type = le16_to_cpu(hdr->type);
int front_len = le32_to_cpu(hdr->front_len);
- struct ceph_msg *m;
+ struct ceph_msg *m = NULL;
*skip = 0;
case CEPH_MSG_AUTH_REPLY:
m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len);
break;
- default:
- return NULL;
+ case CEPH_MSG_MON_MAP:
+ case CEPH_MSG_MDS_MAP:
+ case CEPH_MSG_OSD_MAP:
+ m = ceph_msg_new(type, front_len, 0, 0, NULL);
+ break;
}
- if (!m)
+ if (!m) {
+ pr_info("alloc_msg unknown type %d\n", type);
*skip = 1;
-
+ }
return m;
}
ceph_msg_put(msg);
}
-static struct ceph_msg *alloc_msg(struct ceph_connection *con,
+/*
+ * lookup and return message for incoming reply
+ */
+static struct ceph_msg *get_reply(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip)
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc = osd->o_osdc;
- int type = le16_to_cpu(hdr->type);
- int front = le32_to_cpu(hdr->front_len);
- int data_len = le32_to_cpu(hdr->data_len);
struct ceph_msg *m;
struct ceph_osd_request *req;
+ int front = le32_to_cpu(hdr->front_len);
+ int data_len = le32_to_cpu(hdr->data_len);
u64 tid;
int err;
- *skip = 0;
- if (type != CEPH_MSG_OSD_OPREPLY)
- return NULL;
-
tid = le64_to_cpu(hdr->tid);
mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid);
if (!req) {
*skip = 1;
m = NULL;
- dout("alloc_msg unknown tid %llu\n", tid);
+ pr_info("alloc_msg unknown tid %llu from osd%d\n", tid,
+ osd->o_osd);
goto out;
}
m = __get_next_reply(con, req, front);
m = ERR_PTR(err);
}
}
+ *skip = 0;
out:
mutex_unlock(&osdc->request_mutex);
-
return m;
+
+}
+
+static struct ceph_msg *alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr,
+ int *skip)
+{
+ struct ceph_osd *osd = con->private;
+ int type = le16_to_cpu(hdr->type);
+ int front = le32_to_cpu(hdr->front_len);
+
+ switch (type) {
+ case CEPH_MSG_OSD_MAP:
+ return ceph_msg_new(type, front, 0, 0, NULL);
+ case CEPH_MSG_OSD_OPREPLY:
+ return get_reply(con, hdr, skip);
+ default:
+ pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
+ osd->o_osd);
+ *skip = 1;
+ return NULL;
+ }
}
/*