From 70db4f3629b3476cf506be869ef9d15688d2d44a Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 21 Oct 2014 18:09:56 -0700 Subject: [PATCH] ceph: introduce a new inode flag indicating if cached dentries are ordered After creating/deleting/renaming file, offsets of sibling dentries may change. So we can not use cached dentries to satisfy readdir. But we can still use the cached dentries to conclude -ENOENT for lookup. This patch introduces a new inode flag indicating if child dentries are ordered. The flag is set at the same time marking a directory complete. After creating/deleting/renaming file, we clear the flag on directory inode. This prevents ceph_readdir() from using cached dentries to satisfy readdir syscall. Signed-off-by: Yan, Zheng --- fs/ceph/dir.c | 23 +++++++++++++++-------- fs/ceph/inode.c | 13 ++++++++----- fs/ceph/super.h | 38 ++++++++++++++++++++++++++++++++------ 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index e6d63f8..6526199 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -183,7 +183,7 @@ more: spin_unlock(&parent->d_lock); /* make sure a dentry wasn't dropped while we didn't have parent lock */ - if (!ceph_dir_is_complete(dir)) { + if (!ceph_dir_is_complete_ordered(dir)) { dout(" lost dir complete on %p; falling back to mds\n", dir); dput(dentry); err = -EAGAIN; @@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* always start with . and .. */ if (ctx->pos == 0) { - /* note dir version at start of readdir so we can tell - * if any dentries get dropped */ - fi->dir_release_count = atomic_read(&ci->i_release_count); - dout("readdir off 0 -> '.'\n"); if (!dir_emit(ctx, ".", 1, ceph_translate_ino(inode->i_sb, inode->i_ino), @@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) if ((ctx->pos == 2 || fi->dentry) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && - __ceph_dir_is_complete(ci) && + __ceph_dir_is_complete_ordered(ci) && __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { u32 shared_gen = ci->i_shared_gen; spin_unlock(&ci->i_ceph_lock); @@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* proceed with a normal readdir */ + if (ctx->pos == 2) { + /* note dir version at start of readdir so we can tell + * if any dentries get dropped */ + fi->dir_release_count = atomic_read(&ci->i_release_count); + fi->dir_ordered_count = ci->i_ordered_count; + } + more: /* do we have the correct frag content buffered? */ if (fi->frag != frag || fi->last_readdir == NULL) { @@ -446,8 +449,12 @@ more: */ spin_lock(&ci->i_ceph_lock); if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { - dout(" marking %p complete\n", inode); - __ceph_dir_set_complete(ci, fi->dir_release_count); + if (ci->i_ordered_count == fi->dir_ordered_count) + dout(" marking %p complete and ordered\n", inode); + else + dout(" marking %p complete\n", inode); + __ceph_dir_set_complete(ci, fi->dir_release_count, + fi->dir_ordered_count); } spin_unlock(&ci->i_ceph_lock); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 7b61390..72607c1 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -389,6 +389,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; + ci->i_ordered_count = 0; atomic_set(&ci->i_release_count, 1); atomic_set(&ci->i_complete_count, 0); ci->i_symlink = NULL; @@ -845,7 +846,8 @@ static int fill_inode(struct inode *inode, (issued & CEPH_CAP_FILE_EXCL) == 0 && !__ceph_dir_is_complete(ci)) { dout(" marking %p complete (empty)\n", inode); - __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); + __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count), + ci->i_ordered_count); } /* were we issued a capability? */ @@ -1206,8 +1208,8 @@ retry_lookup: ceph_invalidate_dentry_lease(dn); /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_complete(dir); - ceph_dir_clear_complete(olddir); + ceph_dir_clear_ordered(dir); + ceph_dir_clear_ordered(olddir); dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); @@ -1219,6 +1221,7 @@ retry_lookup: if (!rinfo->head->is_target) { dout("fill_trace null dentry\n"); if (dn->d_inode) { + ceph_dir_clear_ordered(dir); dout("d_delete %p\n", dn); d_delete(dn); } else { @@ -1235,7 +1238,7 @@ retry_lookup: /* attach proper inode */ if (!dn->d_inode) { - ceph_dir_clear_complete(dir); + ceph_dir_clear_ordered(dir); ihold(in); dn = splice_dentry(dn, in, &have_lease); if (IS_ERR(dn)) { @@ -1265,7 +1268,7 @@ retry_lookup: BUG_ON(!dir); BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR); dout(" linking snapped dir %p to dn %p\n", in, dn); - ceph_dir_clear_complete(dir); + ceph_dir_clear_ordered(dir); ihold(in); dn = splice_dentry(dn, in, NULL); if (IS_ERR(dn)) { diff --git a/fs/ceph/super.h b/fs/ceph/super.h index b82f507..aca2287 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -256,6 +256,7 @@ struct ceph_inode_info { u32 i_time_warp_seq; unsigned i_ceph_flags; + int i_ordered_count; atomic_t i_release_count; atomic_t i_complete_count; @@ -434,14 +435,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, /* * Ceph inode. */ -#define CEPH_I_NODELAY 4 /* do not delay cap release */ -#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ -#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ +#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */ +#define CEPH_I_NODELAY 4 /* do not delay cap release */ +#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ +#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, - int release_count) + int release_count, int ordered_count) { atomic_set(&ci->i_complete_count, release_count); + if (ci->i_ordered_count == ordered_count) + ci->i_ceph_flags |= CEPH_I_DIR_ORDERED; + else + ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; } static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) @@ -455,16 +461,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci) atomic_read(&ci->i_release_count); } +static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci) +{ + return __ceph_dir_is_complete(ci) && + (ci->i_ceph_flags & CEPH_I_DIR_ORDERED); +} + static inline void ceph_dir_clear_complete(struct inode *inode) { __ceph_dir_clear_complete(ceph_inode(inode)); } -static inline bool ceph_dir_is_complete(struct inode *inode) +static inline void ceph_dir_clear_ordered(struct inode *inode) { - return __ceph_dir_is_complete(ceph_inode(inode)); + struct ceph_inode_info *ci = ceph_inode(inode); + spin_lock(&ci->i_ceph_lock); + ci->i_ordered_count++; + ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; + spin_unlock(&ci->i_ceph_lock); } +static inline bool ceph_dir_is_complete_ordered(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + bool ret; + spin_lock(&ci->i_ceph_lock); + ret = __ceph_dir_is_complete_ordered(ci); + spin_unlock(&ci->i_ceph_lock); + return ret; +} /* find a specific frag @f */ extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, @@ -580,6 +605,7 @@ struct ceph_file_info { char *last_name; /* last entry in previous chunk */ struct dentry *dentry; /* next dentry (for dcache readdir) */ int dir_release_count; + int dir_ordered_count; /* used for -o dirstat read() on directory thing */ char *dir_info; -- 2.7.4