From d0b19705e99939f5ae5aa9b57bfe41dd4777d951 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 28 Apr 2016 16:07:27 +0200 Subject: [PATCH] libceph: async MON client generic requests For map check, we are going to need to send CEPH_MSG_MON_GET_VERSION messages asynchronously and get a callback on completion. Refactor MON client to allow firing off generic requests asynchronously and add an async variant of ceph_monc_get_version(). ceph_monc_do_statfs() is switched over and remains sync. Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 4 +- include/linux/ceph/mon_client.h | 19 ++- net/ceph/mon_client.c | 316 ++++++++++++++++++++++++++-------------- 3 files changed, 228 insertions(+), 111 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index d0834c4..8eae6f5 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4896,8 +4896,8 @@ static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) again: ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name); if (ret == -ENOENT && tries++ < 1) { - ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap", - &newest_epoch); + ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap", + &newest_epoch); if (ret < 0) return ret; diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h index c14e9d8..19800d9 100644 --- a/include/linux/ceph/mon_client.h +++ b/include/linux/ceph/mon_client.h @@ -39,20 +39,31 @@ struct ceph_mon_request { ceph_monc_request_func_t do_request; }; +typedef void (*ceph_monc_callback_t)(struct ceph_mon_generic_request *); + /* * ceph_mon_generic_request is being used for the statfs and * mon_get_version requests which are being done a bit differently * because we need to get data back to the caller */ struct ceph_mon_generic_request { + struct ceph_mon_client *monc; struct kref kref; u64 tid; struct rb_node node; int result; - void *buf; + struct completion completion; + ceph_monc_callback_t complete_cb; + u64 private_data; /* r_tid/linger_id */ + struct ceph_msg *request; /* original request */ struct ceph_msg *reply; /* and reply */ + + union { + struct ceph_statfs *st; + u64 newest; + } u; }; struct ceph_mon_client { @@ -124,8 +135,10 @@ extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, extern int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf); -extern int ceph_monc_do_get_version(struct ceph_mon_client *monc, - const char *what, u64 *newest); +int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, + u64 *newest); +int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, + ceph_monc_callback_t cb, u64 private_data); extern int ceph_monc_open_session(struct ceph_mon_client *monc); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 98bfbe1..4e49b22 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -493,6 +493,10 @@ static void release_generic_request(struct kref *kref) struct ceph_mon_generic_request *req = container_of(kref, struct ceph_mon_generic_request, kref); + dout("%s greq %p request %p reply %p\n", __func__, req, req->request, + req->reply); + WARN_ON(!RB_EMPTY_NODE(&req->node)); + if (req->reply) ceph_msg_put(req->reply); if (req->request) @@ -503,7 +507,8 @@ static void release_generic_request(struct kref *kref) static void put_generic_request(struct ceph_mon_generic_request *req) { - kref_put(&req->kref, release_generic_request); + if (req) + kref_put(&req->kref, release_generic_request); } static void get_generic_request(struct ceph_mon_generic_request *req) @@ -511,6 +516,103 @@ static void get_generic_request(struct ceph_mon_generic_request *req) kref_get(&req->kref); } +static struct ceph_mon_generic_request * +alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) +{ + struct ceph_mon_generic_request *req; + + req = kzalloc(sizeof(*req), gfp); + if (!req) + return NULL; + + req->monc = monc; + kref_init(&req->kref); + RB_CLEAR_NODE(&req->node); + init_completion(&req->completion); + + dout("%s greq %p\n", __func__, req); + return req; +} + +static void register_generic_request(struct ceph_mon_generic_request *req) +{ + struct ceph_mon_client *monc = req->monc; + + WARN_ON(req->tid); + + get_generic_request(req); + req->tid = ++monc->last_tid; + insert_generic_request(&monc->generic_request_tree, req); +} + +static void send_generic_request(struct ceph_mon_client *monc, + struct ceph_mon_generic_request *req) +{ + WARN_ON(!req->tid); + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + req->request->hdr.tid = cpu_to_le64(req->tid); + ceph_con_send(&monc->con, ceph_msg_get(req->request)); +} + +static void __finish_generic_request(struct ceph_mon_generic_request *req) +{ + struct ceph_mon_client *monc = req->monc; + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + erase_generic_request(&monc->generic_request_tree, req); + + ceph_msg_revoke(req->request); + ceph_msg_revoke_incoming(req->reply); +} + +static void finish_generic_request(struct ceph_mon_generic_request *req) +{ + __finish_generic_request(req); + put_generic_request(req); +} + +static void complete_generic_request(struct ceph_mon_generic_request *req) +{ + if (req->complete_cb) + req->complete_cb(req); + else + complete_all(&req->completion); + put_generic_request(req); +} + +void cancel_generic_request(struct ceph_mon_generic_request *req) +{ + struct ceph_mon_client *monc = req->monc; + struct ceph_mon_generic_request *lookup_req; + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + + mutex_lock(&monc->mutex); + lookup_req = lookup_generic_request(&monc->generic_request_tree, + req->tid); + if (lookup_req) { + WARN_ON(lookup_req != req); + finish_generic_request(req); + } + + mutex_unlock(&monc->mutex); +} + +static int wait_generic_request(struct ceph_mon_generic_request *req) +{ + int ret; + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + ret = wait_for_completion_interruptible(&req->completion); + if (ret) + cancel_generic_request(req); + else + ret = req->result; /* completed */ + + return ret; +} + static struct ceph_msg *get_generic_reply(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip) @@ -540,40 +642,6 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, return m; } -static int __do_generic_request(struct ceph_mon_client *monc, u64 tid, - struct ceph_mon_generic_request *req) -{ - int err; - - /* register request */ - req->tid = tid != 0 ? tid : ++monc->last_tid; - req->request->hdr.tid = cpu_to_le64(req->tid); - insert_generic_request(&monc->generic_request_tree, req); - ceph_con_send(&monc->con, ceph_msg_get(req->request)); - mutex_unlock(&monc->mutex); - - err = wait_for_completion_interruptible(&req->completion); - - mutex_lock(&monc->mutex); - erase_generic_request(&monc->generic_request_tree, req); - - if (!err) - err = req->result; - return err; -} - -static int do_generic_request(struct ceph_mon_client *monc, - struct ceph_mon_generic_request *req) -{ - int err; - - mutex_lock(&monc->mutex); - err = __do_generic_request(monc, 0, req); - mutex_unlock(&monc->mutex); - - return err; -} - /* * statfs */ @@ -584,22 +652,24 @@ static void handle_statfs_reply(struct ceph_mon_client *monc, struct ceph_mon_statfs_reply *reply = msg->front.iov_base; u64 tid = le64_to_cpu(msg->hdr.tid); + dout("%s msg %p tid %llu\n", __func__, msg, tid); + if (msg->front.iov_len != sizeof(*reply)) goto bad; - dout("handle_statfs_reply %p tid %llu\n", msg, tid); mutex_lock(&monc->mutex); req = lookup_generic_request(&monc->generic_request_tree, tid); - if (req) { - *(struct ceph_statfs *)req->buf = reply->st; - req->result = 0; - get_generic_request(req); + if (!req) { + mutex_unlock(&monc->mutex); + return; } + + req->result = 0; + *req->u.st = reply->st; /* struct */ + __finish_generic_request(req); mutex_unlock(&monc->mutex); - if (req) { - complete_all(&req->completion); - put_generic_request(req); - } + + complete_generic_request(req); return; bad: @@ -614,39 +684,38 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) { struct ceph_mon_generic_request *req; struct ceph_mon_statfs *h; - int err; + int ret = -ENOMEM; - req = kzalloc(sizeof(*req), GFP_NOFS); + req = alloc_generic_request(monc, GFP_NOFS); if (!req) - return -ENOMEM; - - kref_init(&req->kref); - RB_CLEAR_NODE(&req->node); - req->buf = buf; - init_completion(&req->completion); + goto out; - err = -ENOMEM; req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, true); if (!req->request) goto out; - req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, - true); + + req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); if (!req->reply) goto out; + req->u.st = buf; + + mutex_lock(&monc->mutex); + register_generic_request(req); /* fill out request */ h = req->request->front.iov_base; h->monhdr.have_version = 0; h->monhdr.session_mon = cpu_to_le16(-1); h->monhdr.session_mon_tid = 0; h->fsid = monc->monmap->fsid; + send_generic_request(monc, req); + mutex_unlock(&monc->mutex); - err = do_generic_request(monc, req); - + ret = wait_generic_request(req); out: put_generic_request(req); - return err; + return ret; } EXPORT_SYMBOL(ceph_monc_do_statfs); @@ -659,7 +728,7 @@ static void handle_get_version_reply(struct ceph_mon_client *monc, void *end = p + msg->front_alloc_len; u64 handle; - dout("%s %p tid %llu\n", __func__, msg, tid); + dout("%s msg %p tid %llu\n", __func__, msg, tid); ceph_decode_need(&p, end, 2*sizeof(u64), bad); handle = ceph_decode_64(&p); @@ -668,77 +737,110 @@ static void handle_get_version_reply(struct ceph_mon_client *monc, mutex_lock(&monc->mutex); req = lookup_generic_request(&monc->generic_request_tree, handle); - if (req) { - *(u64 *)req->buf = ceph_decode_64(&p); - req->result = 0; - get_generic_request(req); + if (!req) { + mutex_unlock(&monc->mutex); + return; } + + req->result = 0; + req->u.newest = ceph_decode_64(&p); + __finish_generic_request(req); mutex_unlock(&monc->mutex); - if (req) { - complete_all(&req->completion); - put_generic_request(req); - } + complete_generic_request(req); return; + bad: pr_err("corrupt mon_get_version reply, tid %llu\n", tid); ceph_msg_dump(msg); } -/* - * Send MMonGetVersion and wait for the reply. - * - * @what: one of "mdsmap", "osdmap" or "monmap" - */ -int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, - u64 *newest) +static struct ceph_mon_generic_request * +__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, + ceph_monc_callback_t cb, u64 private_data) { struct ceph_mon_generic_request *req; - void *p, *end; - u64 tid; - int err; - req = kzalloc(sizeof(*req), GFP_NOFS); + req = alloc_generic_request(monc, GFP_NOIO); if (!req) - return -ENOMEM; - - kref_init(&req->kref); - RB_CLEAR_NODE(&req->node); - req->buf = newest; - init_completion(&req->completion); + goto err_put_req; req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, sizeof(u64) + sizeof(u32) + strlen(what), - GFP_NOFS, true); - if (!req->request) { - err = -ENOMEM; - goto out; - } + GFP_NOIO, true); + if (!req->request) + goto err_put_req; - req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024, - GFP_NOFS, true); - if (!req->reply) { - err = -ENOMEM; - goto out; - } + req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, + true); + if (!req->reply) + goto err_put_req; - p = req->request->front.iov_base; - end = p + req->request->front_alloc_len; + req->complete_cb = cb; + req->private_data = private_data; - /* fill out request */ mutex_lock(&monc->mutex); - tid = ++monc->last_tid; - ceph_encode_64(&p, tid); /* handle */ - ceph_encode_string(&p, end, what, strlen(what)); + register_generic_request(req); + { + void *p = req->request->front.iov_base; + void *const end = p + req->request->front_alloc_len; + + ceph_encode_64(&p, req->tid); /* handle */ + ceph_encode_string(&p, end, what, strlen(what)); + WARN_ON(p != end); + } + send_generic_request(monc, req); + mutex_unlock(&monc->mutex); - err = __do_generic_request(monc, tid, req); + return req; - mutex_unlock(&monc->mutex); -out: +err_put_req: put_generic_request(req); - return err; + return ERR_PTR(-ENOMEM); +} + +/* + * Send MMonGetVersion and wait for the reply. + * + * @what: one of "mdsmap", "osdmap" or "monmap" + */ +int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, + u64 *newest) +{ + struct ceph_mon_generic_request *req; + int ret; + + req = __ceph_monc_get_version(monc, what, NULL, 0); + if (IS_ERR(req)) + return PTR_ERR(req); + + ret = wait_generic_request(req); + if (!ret) + *newest = req->u.newest; + + put_generic_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_monc_get_version); + +/* + * Send MMonGetVersion, + * + * @what: one of "mdsmap", "osdmap" or "monmap" + */ +int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, + ceph_monc_callback_t cb, u64 private_data) +{ + struct ceph_mon_generic_request *req; + + req = __ceph_monc_get_version(monc, what, cb, private_data); + if (IS_ERR(req)) + return PTR_ERR(req); + + put_generic_request(req); + return 0; } -EXPORT_SYMBOL(ceph_monc_do_get_version); +EXPORT_SYMBOL(ceph_monc_get_version_async); /* * Resend pending generic requests. @@ -923,6 +1025,8 @@ void ceph_monc_stop(struct ceph_mon_client *monc) ceph_auth_destroy(monc->auth); + WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); + ceph_msg_put(monc->m_auth); ceph_msg_put(monc->m_auth_reply); ceph_msg_put(monc->m_subscribe); -- 2.7.4