libceph: support for checking on status of watch
authorIlya Dryomov <idryomov@gmail.com>
Thu, 28 Apr 2016 14:07:27 +0000 (16:07 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 23:15:28 +0000 (01:15 +0200)
Implement ceph_osdc_watch_check() to be able to check on status of
watch.  Note that the time it takes for a watch/notify event to get
delivered through the notify_wq is taken into account.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/osd_client.h
net/ceph/osd_client.c

index 63054fae4f15a530347fe6de2724360775e519be..2ae7cfd82ec9a57e0145ae4840ba8d0e76bf2eb9 100644 (file)
@@ -213,6 +213,8 @@ struct ceph_osd_linger_request {
        struct ceph_osd_request *reg_req;
        struct ceph_osd_request *ping_req;
        unsigned long ping_sent;
+       unsigned long watch_valid_thru;
+       struct list_head pending_lworks;
 
        struct ceph_osd_request_target t;
        u32 last_force_resend;
@@ -417,5 +419,7 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
                     u32 timeout,
                     struct page ***preply_pages,
                     size_t *preply_len);
+int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
+                         struct ceph_osd_linger_request *lreq);
 #endif
 
index e6e3ab4223db47dd4e03e505885ea705b58b819d..5ac6dce74f079c01ea693a0636bde155c82187c7 100644 (file)
@@ -1746,6 +1746,7 @@ static void linger_release(struct kref *kref)
        WARN_ON(!RB_EMPTY_NODE(&lreq->node));
        WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
        WARN_ON(!list_empty(&lreq->scan_item));
+       WARN_ON(!list_empty(&lreq->pending_lworks));
        WARN_ON(lreq->osd);
 
        if (lreq->reg_req)
@@ -1783,6 +1784,7 @@ linger_alloc(struct ceph_osd_client *osdc)
        RB_CLEAR_NODE(&lreq->node);
        RB_CLEAR_NODE(&lreq->osdc_node);
        INIT_LIST_HEAD(&lreq->scan_item);
+       INIT_LIST_HEAD(&lreq->pending_lworks);
        init_completion(&lreq->reg_commit_wait);
        init_completion(&lreq->notify_finish_wait);
 
@@ -1890,6 +1892,8 @@ static void cancel_linger_request(struct ceph_osd_request *req)
 struct linger_work {
        struct work_struct work;
        struct ceph_osd_linger_request *lreq;
+       struct list_head pending_item;
+       unsigned long queued_stamp;
 
        union {
                struct {
@@ -1916,6 +1920,7 @@ static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
                return NULL;
 
        INIT_WORK(&lwork->work, workfn);
+       INIT_LIST_HEAD(&lwork->pending_item);
        lwork->lreq = linger_get(lreq);
 
        return lwork;
@@ -1925,6 +1930,10 @@ static void lwork_free(struct linger_work *lwork)
 {
        struct ceph_osd_linger_request *lreq = lwork->lreq;
 
+       mutex_lock(&lreq->lock);
+       list_del(&lwork->pending_item);
+       mutex_unlock(&lreq->lock);
+
        linger_put(lreq);
        kfree(lwork);
 }
@@ -1935,6 +1944,10 @@ static void lwork_queue(struct linger_work *lwork)
        struct ceph_osd_client *osdc = lreq->osdc;
 
        verify_lreq_locked(lreq);
+       WARN_ON(!list_empty(&lwork->pending_item));
+
+       lwork->queued_stamp = jiffies;
+       list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
        queue_work(osdc->notify_wq, &lwork->work);
 }
 
@@ -2116,7 +2129,9 @@ static void linger_ping_cb(struct ceph_osd_request *req)
             __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
             lreq->last_error);
        if (lreq->register_gen == req->r_ops[0].watch.gen) {
-               if (req->r_result && !lreq->last_error) {
+               if (!req->r_result) {
+                       lreq->watch_valid_thru = lreq->ping_sent;
+               } else if (!lreq->last_error) {
                        lreq->last_error = normalize_watch_error(req->r_result);
                        queue_watch_error(lreq);
                }
@@ -3316,6 +3331,7 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
        lreq->wcb = wcb;
        lreq->errcb = errcb;
        lreq->data = data;
+       lreq->watch_valid_thru = jiffies;
 
        ceph_oid_copy(&lreq->t.base_oid, oid);
        ceph_oloc_copy(&lreq->t.base_oloc, oloc);
@@ -3577,6 +3593,40 @@ out_put_lreq:
 }
 EXPORT_SYMBOL(ceph_osdc_notify);
 
+/*
+ * Return the number of milliseconds since the watch was last
+ * confirmed, or an error.  If there is an error, the watch is no
+ * longer valid, and should be destroyed with ceph_osdc_unwatch().
+ */
+int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
+                         struct ceph_osd_linger_request *lreq)
+{
+       unsigned long stamp, age;
+       int ret;
+
+       down_read(&osdc->lock);
+       mutex_lock(&lreq->lock);
+       stamp = lreq->watch_valid_thru;
+       if (!list_empty(&lreq->pending_lworks)) {
+               struct linger_work *lwork =
+                   list_first_entry(&lreq->pending_lworks,
+                                    struct linger_work,
+                                    pending_item);
+
+               if (time_before(lwork->queued_stamp, stamp))
+                       stamp = lwork->queued_stamp;
+       }
+       age = jiffies - stamp;
+       dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
+            lreq, lreq->linger_id, age, lreq->last_error);
+       /* we are truncating to msecs, so return a safe upper bound */
+       ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
+
+       mutex_unlock(&lreq->lock);
+       up_read(&osdc->lock);
+       return ret;
+}
+
 /*
  * Call all pending notify callbacks - for use after a watch is
  * unregistered, to make sure no more callbacks for it will be invoked