ceph: send cap releases more aggressively
authorYan, Zheng <zyan@redhat.com>
Mon, 14 Jan 2019 09:21:19 +0000 (17:21 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Tue, 5 Mar 2019 17:55:16 +0000 (18:55 +0100)
When pending cap releases fill up one message, start a work to send
cap release message. (old way is sending cap releases every 5 seconds)

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Reviewed-by: Jeff Layton <jlayton@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/caps.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.c
fs/ceph/super.h

index 0eaf1b4..da5b56e 100644 (file)
@@ -1081,9 +1081,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
            (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
                cap->queue_release = 1;
                if (removed) {
-                       list_add_tail(&cap->session_caps,
-                                     &session->s_cap_releases);
-                       session->s_num_cap_releases++;
+                       __ceph_queue_cap_release(session, cap);
                        removed = 0;
                }
        } else {
@@ -1245,7 +1243,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
  * Queue cap releases when an inode is dropped from our cache.  Since
  * inode is about to be destroyed, there is no need for i_ceph_lock.
  */
-void ceph_queue_caps_release(struct inode *inode)
+void __ceph_remove_caps(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct rb_node *p;
@@ -3886,12 +3884,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                        cap->seq = seq;
                        cap->issue_seq = seq;
                        spin_lock(&session->s_cap_lock);
-                       list_add_tail(&cap->session_caps,
-                                       &session->s_cap_releases);
-                       session->s_num_cap_releases++;
+                       __ceph_queue_cap_release(session, cap);
                        spin_unlock(&session->s_cap_lock);
                }
-               goto flush_cap_releases;
+               goto done;
        }
 
        /* these will work even if we don't have a cap yet */
@@ -3961,7 +3957,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                       ceph_cap_op_name(op));
        }
 
-       goto done;
+done:
+       mutex_unlock(&session->s_mutex);
+done_unlocked:
+       iput(inode);
+       ceph_put_string(extra_info.pool_ns);
+       return;
 
 flush_cap_releases:
        /*
@@ -3969,14 +3970,8 @@ flush_cap_releases:
         * along for the mds (who clearly thinks we still have this
         * cap).
         */
-       ceph_send_cap_releases(mdsc, session);
-
-done:
-       mutex_unlock(&session->s_mutex);
-done_unlocked:
-       iput(inode);
-       ceph_put_string(extra_info.pool_ns);
-       return;
+       ceph_flush_cap_releases(mdsc, session);
+       goto done;
 
 bad:
        pr_err("ceph_handle_caps: corrupt message\n");
index e6012de..f588b2d 100644 (file)
@@ -537,7 +537,7 @@ void ceph_destroy_inode(struct inode *inode)
 
        ceph_fscache_unregister_inode_cookie(ci);
 
-       ceph_queue_caps_release(inode);
+       __ceph_remove_caps(inode);
 
        if (__ceph_has_any_quota(ci))
                ceph_adjust_quota_realms_count(inode, false);
index ddfb6a4..c9d4561 100644 (file)
@@ -57,6 +57,7 @@ struct ceph_reconnect_state {
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
                            struct list_head *head);
+static void ceph_cap_release_work(struct work_struct *work);
 
 static const struct ceph_connection_operations mds_con_ops;
 
@@ -636,6 +637,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_cap_reconnect = 0;
        s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
+       INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+
        INIT_LIST_HEAD(&s->s_cap_flushing);
 
        mdsc->sessions[mds] = s;
@@ -661,6 +664,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
        dout("__unregister_session mds%d %p\n", s->s_mds, s);
        BUG_ON(mdsc->sessions[s->s_mds] != s);
        mdsc->sessions[s->s_mds] = NULL;
+       s->s_state = 0;
        ceph_con_close(&s->s_con);
        ceph_put_mds_session(s);
        atomic_dec(&mdsc->num_sessions);
@@ -1323,13 +1327,10 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                        cap->session = NULL;
                        list_del_init(&cap->session_caps);
                        session->s_nr_caps--;
-                       if (cap->queue_release) {
-                               list_add_tail(&cap->session_caps,
-                                             &session->s_cap_releases);
-                               session->s_num_cap_releases++;
-                       } else {
+                       if (cap->queue_release)
+                               __ceph_queue_cap_release(session, cap);
+                       else
                                old_cap = cap;  /* put_cap it w/o locks held */
-                       }
                }
                if (ret < 0)
                        goto out;
@@ -1764,7 +1765,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
                session->s_trim_caps = 0;
        }
 
-       ceph_send_cap_releases(mdsc, session);
+       ceph_flush_cap_releases(mdsc, session);
        return 0;
 }
 
@@ -1807,8 +1808,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
 /*
  * called under s_mutex
  */
-void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-                           struct ceph_mds_session *session)
+static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+                                  struct ceph_mds_session *session)
 {
        struct ceph_msg *msg = NULL;
        struct ceph_mds_cap_release *head;
@@ -1900,6 +1901,48 @@ out_err:
        spin_unlock(&session->s_cap_lock);
 }
 
+static void ceph_cap_release_work(struct work_struct *work)
+{
+       struct ceph_mds_session *session =
+               container_of(work, struct ceph_mds_session, s_cap_release_work);
+
+       mutex_lock(&session->s_mutex);
+       if (session->s_state == CEPH_MDS_SESSION_OPEN ||
+           session->s_state == CEPH_MDS_SESSION_HUNG)
+               ceph_send_cap_releases(session->s_mdsc, session);
+       mutex_unlock(&session->s_mutex);
+       ceph_put_mds_session(session);
+}
+
+void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+                            struct ceph_mds_session *session)
+{
+       if (mdsc->stopping)
+               return;
+
+       get_session(session);
+       if (queue_work(mdsc->fsc->cap_wq,
+                      &session->s_cap_release_work)) {
+               dout("cap release work queued\n");
+       } else {
+               ceph_put_mds_session(session);
+               dout("failed to queue cap release work\n");
+       }
+}
+
+/*
+ * caller holds session->s_cap_lock
+ */
+void __ceph_queue_cap_release(struct ceph_mds_session *session,
+                             struct ceph_cap *cap)
+{
+       list_add_tail(&cap->session_caps, &session->s_cap_releases);
+       session->s_num_cap_releases++;
+
+       if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
+               ceph_flush_cap_releases(session->s_mdsc, session);
+}
+
 /*
  * requests
  */
index af3b25e..2147ecd 100644 (file)
@@ -172,12 +172,13 @@ struct ceph_mds_session {
        /* protected by s_cap_lock */
        spinlock_t        s_cap_lock;
        struct list_head  s_caps;     /* all caps issued by this session */
+       struct ceph_cap  *s_cap_iterator;
        int               s_nr_caps, s_trim_caps;
        int               s_num_cap_releases;
        int               s_cap_reconnect;
        int               s_readonly;
        struct list_head  s_cap_releases; /* waiting cap_release messages */
-       struct ceph_cap  *s_cap_iterator;
+       struct work_struct s_cap_release_work;
 
        /* protected by mutex */
        struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
@@ -457,9 +458,10 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
        kref_put(&req->r_kref, ceph_mdsc_release_request);
 }
 
-extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-                                  struct ceph_mds_session *session);
-
+extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
+                                   struct ceph_cap *cap);
+extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+                                   struct ceph_mds_session *session);
 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
 
 extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
index da2cd8e..200836b 100644 (file)
@@ -671,6 +671,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
        fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
        if (!fsc->trunc_wq)
                goto fail_pg_inv_wq;
+       fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1);
+       if (!fsc->cap_wq)
+               goto fail_trunc_wq;
 
        /* set up mempools */
        err = -ENOMEM;
@@ -678,13 +681,15 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
        size = sizeof (struct page *) * (page_count ? page_count : 1);
        fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
        if (!fsc->wb_pagevec_pool)
-               goto fail_trunc_wq;
+               goto fail_cap_wq;
 
        /* caps */
        fsc->min_caps = fsopt->max_readdir;
 
        return fsc;
 
+fail_cap_wq:
+       destroy_workqueue(fsc->cap_wq);
 fail_trunc_wq:
        destroy_workqueue(fsc->trunc_wq);
 fail_pg_inv_wq:
@@ -706,6 +711,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc)
        flush_workqueue(fsc->wb_wq);
        flush_workqueue(fsc->pg_inv_wq);
        flush_workqueue(fsc->trunc_wq);
+       flush_workqueue(fsc->cap_wq);
 }
 
 static void destroy_fs_client(struct ceph_fs_client *fsc)
@@ -715,6 +721,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
        destroy_workqueue(fsc->wb_wq);
        destroy_workqueue(fsc->pg_inv_wq);
        destroy_workqueue(fsc->trunc_wq);
+       destroy_workqueue(fsc->cap_wq);
 
        mempool_destroy(fsc->wb_pagevec_pool);
 
index df44a77..c4a79ea 100644 (file)
@@ -107,10 +107,12 @@ struct ceph_fs_client {
 
        /* writeback */
        mempool_t *wb_pagevec_pool;
+       atomic_long_t writeback_count;
+
        struct workqueue_struct *wb_wq;
        struct workqueue_struct *pg_inv_wq;
        struct workqueue_struct *trunc_wq;
-       atomic_long_t writeback_count;
+       struct workqueue_struct *cap_wq;
 
 #ifdef CONFIG_DEBUG_FS
        struct dentry *debugfs_dentry_lru, *debugfs_caps;
@@ -988,11 +990,11 @@ extern void ceph_add_cap(struct inode *inode,
                         unsigned cap, unsigned seq, u64 realmino, int flags,
                         struct ceph_cap **new_cap);
 extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
+extern void __ceph_remove_caps(struct inode* inode);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
                         struct ceph_cap *cap);
 extern int ceph_is_any_caps(struct inode *inode);
 
-extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
 extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
                      int datasync);