ceph: avoid sending unnessesary FLUSHSNAP message
authorYan, Zheng <zyan@redhat.com>
Fri, 1 May 2015 08:57:16 +0000 (16:57 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 25 Jun 2015 08:49:28 +0000 (11:49 +0300)
when a snap notification contains no new snapshot, we can avoid
sending FLUSHSNAP message to MDS. But we still need to create
cap_snap in some case because it's required by write path and
page writeback path

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/caps.c
fs/ceph/snap.c
fs/ceph/super.h

index feb8ec9..f1dbcae 100644 (file)
@@ -1297,11 +1297,8 @@ retry:
                if (capsnap->dirty_pages || capsnap->writing)
                        break;
 
-               /*
-                * if cap writeback already occurred, we should have dropped
-                * the capsnap in ceph_put_wrbuffer_cap_refs.
-                */
-               BUG_ON(capsnap->dirty == 0);
+               /* should be removed by ceph_try_drop_cap_snap() */
+               BUG_ON(!capsnap->need_flush);
 
                /* pick mds, take s_mutex */
                if (ci->i_auth_cap == NULL) {
@@ -2347,6 +2344,27 @@ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
        spin_unlock(&ci->i_ceph_lock);
 }
 
+
+/*
+ * drop cap_snap that is not associated with any snapshot.
+ * we don't need to send FLUSHSNAP message for it.
+ */
+static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap)
+{
+       if (!capsnap->need_flush &&
+           !capsnap->writing && !capsnap->dirty_pages) {
+
+               dout("dropping cap_snap %p follows %llu\n",
+                    capsnap, capsnap->follows);
+               ceph_put_snap_context(capsnap->context);
+               list_del(&capsnap->ci_item);
+               list_del(&capsnap->flushing_item);
+               ceph_put_cap_snap(capsnap);
+               return 1;
+       }
+       return 0;
+}
+
 /*
  * Release cap refs.
  *
@@ -2360,7 +2378,6 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
 {
        struct inode *inode = &ci->vfs_inode;
        int last = 0, put = 0, flushsnaps = 0, wake = 0;
-       struct ceph_cap_snap *capsnap;
 
        spin_lock(&ci->i_ceph_lock);
        if (had & CEPH_CAP_PIN)
@@ -2382,17 +2399,17 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
        if (had & CEPH_CAP_FILE_WR)
                if (--ci->i_wr_ref == 0) {
                        last++;
-                       if (!list_empty(&ci->i_cap_snaps)) {
-                               capsnap = list_first_entry(&ci->i_cap_snaps,
-                                                    struct ceph_cap_snap,
-                                                    ci_item);
-                               if (capsnap->writing) {
-                                       capsnap->writing = 0;
-                                       flushsnaps =
-                                               __ceph_finish_cap_snap(ci,
-                                                                      capsnap);
-                                       wake = 1;
-                               }
+                       if (__ceph_have_pending_cap_snap(ci)) {
+                               struct ceph_cap_snap *capsnap =
+                                       list_last_entry(&ci->i_cap_snaps,
+                                                       struct ceph_cap_snap,
+                                                       ci_item);
+                               capsnap->writing = 0;
+                               if (ceph_try_drop_cap_snap(capsnap))
+                                       put++;
+                               else if (__ceph_finish_cap_snap(ci, capsnap))
+                                       flushsnaps = 1;
+                               wake = 1;
                        }
                        if (ci->i_wrbuffer_ref_head == 0 &&
                            ci->i_dirty_caps == 0 &&
@@ -2416,7 +2433,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
                ceph_flush_snaps(ci);
        if (wake)
                wake_up_all(&ci->i_cap_wq);
-       if (put)
+       while (put-- > 0)
                iput(inode);
 }
 
@@ -2467,25 +2484,15 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                capsnap->dirty_pages -= nr;
                if (capsnap->dirty_pages == 0) {
                        complete_capsnap = 1;
-                       if (capsnap->dirty == 0)
-                               /* cap writeback completed before we created
-                                * the cap_snap; no FLUSHSNAP is needed */
-                               drop_capsnap = 1;
+                       drop_capsnap = ceph_try_drop_cap_snap(capsnap);
                }
                dout("put_wrbuffer_cap_refs on %p cap_snap %p "
-                    " snap %lld %d/%d -> %d/%d %s%s%s\n",
+                    " snap %lld %d/%d -> %d/%d %s%s\n",
                     inode, capsnap, capsnap->context->seq,
                     ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
                     ci->i_wrbuffer_ref, capsnap->dirty_pages,
                     last ? " (wrbuffer last)" : "",
-                    complete_capsnap ? " (complete capsnap)" : "",
-                    drop_capsnap ? " (drop capsnap)" : "");
-               if (drop_capsnap) {
-                       ceph_put_snap_context(capsnap->context);
-                       list_del(&capsnap->ci_item);
-                       list_del(&capsnap->flushing_item);
-                       ceph_put_cap_snap(capsnap);
-               }
+                    complete_capsnap ? " (complete capsnap)" : "");
        }
 
        spin_unlock(&ci->i_ceph_lock);
index 5bfdab9..ba70801 100644 (file)
@@ -436,6 +436,14 @@ static int dup_array(u64 **dst, __le64 *src, u32 num)
        return 0;
 }
 
+static bool has_new_snaps(struct ceph_snap_context *o,
+                         struct ceph_snap_context *n)
+{
+       if (n->num_snaps == 0)
+               return false;
+       /* snaps are in descending order */
+       return n->snaps[0] > o->seq;
+}
 
 /*
  * When a snapshot is applied, the size/mtime inode metadata is queued
@@ -455,7 +463,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 {
        struct inode *inode = &ci->vfs_inode;
        struct ceph_cap_snap *capsnap;
-       struct ceph_snap_context *old_snapc;
+       struct ceph_snap_context *old_snapc, *new_snapc;
        int used, dirty;
 
        capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
@@ -469,6 +477,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
        dirty = __ceph_caps_dirty(ci);
 
        old_snapc = ci->i_head_snapc;
+       new_snapc = ci->i_snap_realm->cached_context;
 
        /*
         * If there is a write in progress, treat that as a dirty Fw,
@@ -486,20 +495,37 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
                dout("queue_cap_snap %p already pending\n", inode);
                goto update_snapc;
        }
-       if (ci->i_snap_realm->cached_context == ceph_empty_snapc) {
-               dout("queue_cap_snap %p empty snapc\n", inode);
-               goto update_snapc;
-       }
-       if (!(dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
-                      CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
+       if (ci->i_wrbuffer_ref_head == 0 &&
+           !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) {
                dout("queue_cap_snap %p nothing dirty|writing\n", inode);
                goto update_snapc;
        }
 
        BUG_ON(!old_snapc);
 
-       dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
-            inode, capsnap, old_snapc, ceph_cap_string(dirty));
+       /*
+        * There is no need to send FLUSHSNAP message to MDS if there is
+        * no new snapshot. But when there is dirty pages or on-going
+        * writes, we still need to create cap_snap. cap_snap is needed
+        * by the write path and page writeback path.
+        *
+        * also see ceph_try_drop_cap_snap()
+        */
+       if (has_new_snaps(old_snapc, new_snapc)) {
+               if (dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))
+                       capsnap->need_flush = true;
+       } else {
+               if (!(used & CEPH_CAP_FILE_WR) &&
+                   ci->i_wrbuffer_ref_head == 0) {
+                       dout("queue_cap_snap %p "
+                            "no new_snap|dirty_page|writing\n", inode);
+                       goto update_snapc;
+               }
+       }
+
+       dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n",
+            inode, capsnap, old_snapc, ceph_cap_string(dirty),
+            capsnap->need_flush ? "" : "no_flush");
        ihold(inode);
 
        atomic_set(&capsnap->nref, 1);
@@ -549,9 +575,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 
 update_snapc:
        if (ci->i_head_snapc) {
-               ci->i_head_snapc = ceph_get_snap_context(
-                               ci->i_snap_realm->cached_context);
-               dout(" new snapc is %p\n", ci->i_head_snapc);
+               ci->i_head_snapc = ceph_get_snap_context(new_snapc);
+               dout(" new snapc is %p\n", new_snapc);
        }
        spin_unlock(&ci->i_ceph_lock);
 
index b182fd7..4ef1ae9 100644 (file)
@@ -164,6 +164,7 @@ struct ceph_cap_snap {
        int writing;   /* a sync write is still in progress */
        int dirty_pages;     /* dirty pages awaiting writeback */
        bool inline_data;
+       bool need_flush;
 };
 
 static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
@@ -719,8 +720,8 @@ extern void ceph_snap_exit(void);
 static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci)
 {
        return !list_empty(&ci->i_cap_snaps) &&
-               list_entry(ci->i_cap_snaps.prev, struct ceph_cap_snap,
-                          ci_item)->writing;
+              list_last_entry(&ci->i_cap_snaps, struct ceph_cap_snap,
+                              ci_item)->writing;
 }
 
 /* inode.c */