}
/*
+ * Revoke a page vector that we may be reading data into
+ */
+void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages)
+{
+ mutex_lock(&con->mutex);
+ if (con->in_msg && con->in_msg->pages == pages) {
+ unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
+
+ /* skip rest of message */
+ dout("con_revoke_pages %p msg %p pages %p revoked\n", con,
+ con->in_msg, pages);
+ if (con->in_msg_pos.data_pos < data_len)
+ con->in_base_pos = con->in_msg_pos.data_pos - data_len;
+ else
+ con->in_base_pos = con->in_base_pos -
+ sizeof(struct ceph_msg_header) -
+ sizeof(struct ceph_msg_footer);
+ con->in_msg->pages = NULL;
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ } else {
+ dout("con_revoke_pages %p msg %p pages %p no-op\n",
+ con, con->in_msg, pages);
+ }
+ mutex_unlock(&con->mutex);
+}
+
+/*
* Queue a keepalive byte to ensure the tcp connection is alive.
*/
void ceph_con_keepalive(struct ceph_connection *con)
ceph_msg_put(req->r_request);
if (req->r_reply)
ceph_msg_put(req->r_reply);
+ if (req->r_con_filling_pages) {
+ dout("release_request revoking pages %p from con %p\n",
+ req->r_pages, req->r_con_filling_pages);
+ ceph_con_revoke_pages(req->r_con_filling_pages,
+ req->r_pages);
+ ceph_con_put(req->r_con_filling_pages);
+ }
if (req->r_own_pages)
ceph_release_page_vector(req->r_pages,
req->r_num_pages);
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
*/
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
+ struct ceph_connection *con)
{
struct ceph_osd_reply_head *rhead = msg->front.iov_base;
struct ceph_osd_request *req;
ceph_osdc_get_request(req);
flags = le32_to_cpu(rhead->flags);
+ /*
+ * if this connection filled our pages, drop our reference now, to
+ * avoid a (safe but slower) revoke later.
+ */
+ if (req->r_con_filling_pages == con && req->r_pages == msg->pages) {
+ dout(" got pages, dropping con_filling_pages ref %p\n", con);
+ req->r_con_filling_pages = NULL;
+ ceph_con_put(con);
+ }
+
if (req->r_reply) {
/*
* once we see the message has been received, we don't
}
dout("prepare_pages tid %llu has %d pages, want %d\n",
tid, req->r_num_pages, want);
- if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) {
- m->pages = req->r_pages;
- m->nr_pages = req->r_num_pages;
- req->r_reply = m; /* only for duration of read over socket */
- ceph_msg_get(m);
- req->r_prepared_pages = 1;
- ret = 0; /* success */
+ if (unlikely(req->r_num_pages < want))
+ goto out;
+
+ if (req->r_con_filling_pages) {
+ dout("revoking pages %p from old con %p\n", req->r_pages,
+ req->r_con_filling_pages);
+ ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
+ ceph_con_put(req->r_con_filling_pages);
}
+ req->r_con_filling_pages = ceph_con_get(con);
+ req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
+ m->pages = req->r_pages;
+ m->nr_pages = req->r_num_pages;
+ ret = 0; /* success */
out:
mutex_unlock(&osdc->request_mutex);
return ret;
ceph_osdc_handle_map(osdc, msg);
break;
case CEPH_MSG_OSD_OPREPLY:
- handle_reply(osdc, msg);
+ handle_reply(osdc, msg, con);
break;
default: