ceph: shut down access to inode when async create fails
authorJeff Layton <jlayton@kernel.org>
Tue, 31 Aug 2021 17:39:13 +0000 (13:39 -0400)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 8 Nov 2021 02:29:51 +0000 (03:29 +0100)
Add proper error handling for when an async create fails. The inode
never existed, so any dirty caps or data are now toast. We already
d_drop the dentry in that case, but the now-stale inode may still be
around. We want to shut down access to these inodes, and ensure that
they can't harbor any more dirty data, which can cause problems at
umount time.

When this occurs, flag such inodes as being SHUTDOWN, and trash any caps
and cap flushes that may be in flight for them, and invalidate the
pagecache for the inode. Add a new helper that can check whether an
inode or an entire mount is now shut down, and call it instead of
accessing the mount_state directly in places where we test that now.

URL: https://tracker.ceph.com/issues/51279
Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/export.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/locks.c
fs/ceph/super.h

index b429204..b39aebc 100644 (file)
@@ -724,7 +724,7 @@ static int ceph_writepages_start(struct address_space *mapping,
             wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
             (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 
-       if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
+       if (ceph_inode_is_shutdown(inode)) {
                if (ci->i_wrbuffer_ref > 0) {
                        pr_warn_ratelimited(
                                "writepage_start %p %lld forced umount\n",
@@ -1145,12 +1145,12 @@ static struct ceph_snap_context *
 ceph_find_incompatible(struct page *page)
 {
        struct inode *inode = page->mapping->host;
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_inode_info *ci = ceph_inode(inode);
 
-       if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
-               dout(" page %p forced umount\n", page);
-               return ERR_PTR(-EIO);
+       if (ceph_inode_is_shutdown(inode)) {
+               dout(" page %p %llx:%llx is shutdown\n", page,
+                    ceph_vinop(inode));
+               return ERR_PTR(-ESTALE);
        }
 
        for (;;) {
@@ -1345,6 +1345,9 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
        sigset_t oldset;
        vm_fault_t ret = VM_FAULT_SIGBUS;
 
+       if (ceph_inode_is_shutdown(inode))
+               return ret;
+
        ceph_block_sigs(&oldset);
 
        dout("filemap_fault %p %llx.%llx %llu trying to get caps\n",
@@ -1436,6 +1439,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
        sigset_t oldset;
        vm_fault_t ret = VM_FAULT_SIGBUS;
 
+       if (ceph_inode_is_shutdown(inode))
+               return ret;
+
        prealloc_cf = ceph_alloc_cap_flush();
        if (!prealloc_cf)
                return VM_FAULT_OOM;
index e8e663e..b9460b6 100644 (file)
@@ -1188,11 +1188,11 @@ void ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 
        lockdep_assert_held(&ci->i_ceph_lock);
 
-       fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+       fsc = ceph_inode_to_client(&ci->vfs_inode);
        WARN_ON_ONCE(ci->i_auth_cap == cap &&
                     !list_empty(&ci->i_dirty_item) &&
                     !fsc->blocklisted &&
-                    READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN);
+                    !ceph_inode_is_shutdown(&ci->vfs_inode));
 
        __ceph_remove_cap(cap, queue_release);
 }
@@ -2750,9 +2750,9 @@ again:
                        goto out_unlock;
                }
 
-               if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
-                       dout("get_cap_refs %p forced umount\n", inode);
-                       ret = -EIO;
+               if (ceph_inode_is_shutdown(inode)) {
+                       dout("get_cap_refs %p inode is shutdown\n", inode);
+                       ret = -ESTALE;
                        goto out_unlock;
                }
                mds_wanted = __ceph_caps_mds_wanted(ci, false);
@@ -4604,7 +4604,7 @@ int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invali
        if (is_auth) {
                struct ceph_cap_flush *cf;
 
-               if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
+               if (ceph_inode_is_shutdown(inode)) {
                        if (inode->i_data.nrpages > 0)
                                *invalidate = true;
                        if (ci->i_wrbuffer_ref > 0)
index 1d65934..e0fa66a 100644 (file)
@@ -157,6 +157,11 @@ static struct inode *__lookup_inode(struct super_block *sb, u64 ino)
                ceph_mdsc_put_request(req);
                if (!inode)
                        return err < 0 ? ERR_PTR(err) : ERR_PTR(-ESTALE);
+       } else {
+               if (ceph_inode_is_shutdown(inode)) {
+                       iput(inode);
+                       return ERR_PTR(-ESTALE);
+               }
        }
        return inode;
 }
@@ -223,8 +228,13 @@ static struct dentry *__snapfh_to_dentry(struct super_block *sb,
                return ERR_PTR(-ESTALE);
 
        inode = ceph_find_inode(sb, vino);
-       if (inode)
+       if (inode) {
+               if (ceph_inode_is_shutdown(inode)) {
+                       iput(inode);
+                       return ERR_PTR(-ESTALE);
+               }
                return d_obtain_alias(inode);
+       }
 
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
                                       USE_ANY_MDS);
index b18dd40..91173d3 100644 (file)
@@ -525,6 +525,7 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
 
        if (result) {
                struct dentry *dentry = req->r_dentry;
+               struct inode *inode = d_inode(dentry);
                int pathlen = 0;
                u64 base = 0;
                char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
@@ -534,7 +535,8 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
                if (!d_unhashed(dentry))
                        d_drop(dentry);
 
-               /* FIXME: start returning I/O errors on all accesses? */
+               ceph_inode_shutdown(inode);
+
                pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
                        base, IS_ERR(path) ? "<<bad>>" : path, result);
                ceph_mdsc_free_path(path, pathlen);
@@ -1526,6 +1528,9 @@ again:
        dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
             inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
 
+       if (ceph_inode_is_shutdown(inode))
+               return -ESTALE;
+
        if (direct_lock)
                ceph_start_io_direct(inode);
        else
@@ -1678,6 +1683,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
        loff_t pos;
        loff_t limit = max(i_size_read(inode), fsc->max_file_size);
 
+       if (ceph_inode_is_shutdown(inode))
+               return -ESTALE;
+
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
 
index d8d07e6..c22c067 100644 (file)
@@ -1841,13 +1841,12 @@ void ceph_queue_inode_work(struct inode *inode, int work_bit)
 static void ceph_do_invalidate_pages(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        u32 orig_gen;
        int check = 0;
 
        mutex_lock(&ci->i_truncate_mutex);
 
-       if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
+       if (ceph_inode_is_shutdown(inode)) {
                pr_warn_ratelimited("%s: inode %llx.%llx is shut down\n",
                                    __func__, ceph_vinop(inode));
                mapping_set_error(inode->i_mapping, -EIO);
@@ -2218,6 +2217,9 @@ int ceph_setattr(struct user_namespace *mnt_userns, struct dentry *dentry,
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
 
+       if (ceph_inode_is_shutdown(inode))
+               return -ESTALE;
+
        err = setattr_prepare(&init_user_ns, dentry, attr);
        if (err != 0)
                return err;
@@ -2348,6 +2350,9 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path,
        u32 valid_mask = STATX_BASIC_STATS;
        int err = 0;
 
+       if (ceph_inode_is_shutdown(inode))
+               return -ESTALE;
+
        /* Skip the getattr altogether if we're asked not to sync */
        if (!(flags & AT_STATX_DONT_SYNC)) {
                err = ceph_do_getattr(inode,
@@ -2395,3 +2400,27 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path,
        stat->result_mask = request_mask & valid_mask;
        return err;
 }
+
+void ceph_inode_shutdown(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct rb_node *p;
+       int iputs = 0;
+       bool invalidate = false;
+
+       spin_lock(&ci->i_ceph_lock);
+       ci->i_ceph_flags |= CEPH_I_SHUTDOWN;
+       p = rb_first(&ci->i_caps);
+       while (p) {
+               struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
+
+               p = rb_next(p);
+               iputs += ceph_purge_inode_cap(inode, cap, &invalidate);
+       }
+       spin_unlock(&ci->i_ceph_lock);
+
+       if (invalidate)
+               ceph_queue_invalidate(inode);
+       while (iputs--)
+               iput(inode);
+}
index bdeb271..74c227d 100644 (file)
@@ -241,6 +241,9 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
        if (!(fl->fl_flags & FL_POSIX))
                return -ENOLCK;
 
+       if (ceph_inode_is_shutdown(inode))
+               return -ESTALE;
+
        dout("ceph_lock, fl_owner: %p\n", fl->fl_owner);
 
        /* set wait bit as appropriate, then make command as Ceph expects it*/
@@ -306,6 +309,9 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
        if (fl->fl_type & LOCK_MAND)
                return -EOPNOTSUPP;
 
+       if (ceph_inode_is_shutdown(inode))
+               return -ESTALE;
+
        dout("ceph_flock, fl_file: %p\n", fl->fl_file);
 
        spin_lock(&ci->i_ceph_lock);
index daf7e7e..5c91158 100644 (file)
@@ -581,6 +581,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 #define CEPH_I_ODIRECT         (1 << 11) /* inode in direct I/O mode */
 #define CEPH_ASYNC_CREATE_BIT  (12)      /* async create in flight for this */
 #define CEPH_I_ASYNC_CREATE    (1 << CEPH_ASYNC_CREATE_BIT)
+#define CEPH_I_SHUTDOWN                (1 << 13) /* inode is no longer usable */
 
 /*
  * Masks of ceph inode work.
@@ -1028,6 +1029,16 @@ extern int ceph_setattr(struct user_namespace *mnt_userns,
 extern int ceph_getattr(struct user_namespace *mnt_userns,
                        const struct path *path, struct kstat *stat,
                        u32 request_mask, unsigned int flags);
+void ceph_inode_shutdown(struct inode *inode);
+
+static inline bool ceph_inode_is_shutdown(struct inode *inode)
+{
+       unsigned long flags = READ_ONCE(ceph_inode(inode)->i_ceph_flags);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       int state = READ_ONCE(fsc->mount_state);
+
+       return (flags & CEPH_I_SHUTDOWN) || state >= CEPH_MOUNT_SHUTDOWN;
+}
 
 /* xattr.c */
 int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int);