ceph: reorganize __send_cap for less spinlock abuse
authorJeff Layton <jlayton@kernel.org>
Tue, 17 Mar 2020 12:47:31 +0000 (08:47 -0400)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 1 Jun 2020 11:22:51 +0000 (13:22 +0200)
Get rid of the __releases annotation by breaking it up into two
functions: __prep_cap which is done under the spinlock and __send_cap
that is done outside it. Add new fields to cap_msg_args for the wake
boolean and old_xattr_buf pointer.

Nothing checks the return value from __send_cap, so make it void
return.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/caps.c

index 8291486..8b17f4b 100644 (file)
@@ -1181,6 +1181,7 @@ struct cap_msg_args {
        u64                     xattr_version;
        u64                     change_attr;
        struct ceph_buffer      *xattr_buf;
+       struct ceph_buffer      *old_xattr_buf;
        struct timespec64       atime, mtime, ctime, btime;
        int                     op, caps, wanted, dirty;
        u32                     seq, issue_seq, mseq, time_warp_seq;
@@ -1189,6 +1190,7 @@ struct cap_msg_args {
        kgid_t                  gid;
        umode_t                 mode;
        bool                    inline_data;
+       bool                    wake;
 };
 
 /*
@@ -1318,44 +1320,29 @@ void __ceph_remove_caps(struct ceph_inode_info *ci)
 }
 
 /*
- * Send a cap msg on the given inode.  Update our caps state, then
- * drop i_ceph_lock and send the message.
+ * Prepare to send a cap message to an MDS. Update the cap state, and populate
+ * the arg struct with the parameters that will need to be sent. This should
+ * be done under the i_ceph_lock to guard against changes to cap state.
  *
  * Make note of max_size reported/requested from mds, revoked caps
  * that have now been implemented.
- *
- * Return non-zero if delayed release, or we experienced an error
- * such that the caller should requeue + retry later.
- *
- * called with i_ceph_lock, then drops it.
- * caller should hold snap_rwsem (read), s_mutex.
  */
-static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
-                     int op, int flags, int used, int want, int retain,
-                     int flushing, u64 flush_tid, u64 oldest_flush_tid)
-       __releases(cap->ci->i_ceph_lock)
+static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
+                      int op, int flags, int used, int want, int retain,
+                      int flushing, u64 flush_tid, u64 oldest_flush_tid)
 {
        struct ceph_inode_info *ci = cap->ci;
        struct inode *inode = &ci->vfs_inode;
-       struct ceph_buffer *old_blob = NULL;
-       struct cap_msg_args arg;
        int held, revoking;
-       int wake = 0;
-       int ret;
 
-       /* Don't send anything if it's still being created. Return delayed */
-       if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
-               spin_unlock(&ci->i_ceph_lock);
-               dout("%s async create in flight for %p\n", __func__, inode);
-               return 1;
-       }
+       lockdep_assert_held(&ci->i_ceph_lock);
 
        held = cap->issued | cap->implemented;
        revoking = cap->implemented & ~cap->issued;
        retain &= ~revoking;
 
-       dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
-            inode, cap, cap->session,
+       dout("%s %p cap %p session %p %s -> %s (revoking %s)\n",
+            __func__, inode, cap, cap->session,
             ceph_cap_string(held), ceph_cap_string(held & retain),
             ceph_cap_string(revoking));
        BUG_ON((retain & CEPH_CAP_PIN) == 0);
@@ -1363,60 +1350,58 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        ci->i_ceph_flags &= ~CEPH_I_FLUSH;
 
        cap->issued &= retain;  /* drop bits we don't want */
-       if (cap->implemented & ~cap->issued) {
-               /*
-                * Wake up any waiters on wanted -> needed transition.
-                * This is due to the weird transition from buffered
-                * to sync IO... we need to flush dirty pages _before_
-                * allowing sync writes to avoid reordering.
-                */
-               wake = 1;
-       }
+       /*
+        * Wake up any waiters on wanted -> needed transition. This is due to
+        * the weird transition from buffered to sync IO... we need to flush
+        * dirty pages _before_ allowing sync writes to avoid reordering.
+        */
+       arg->wake = cap->implemented & ~cap->issued;
        cap->implemented &= cap->issued | used;
        cap->mds_wanted = want;
 
-       arg.session = cap->session;
-       arg.ino = ceph_vino(inode).ino;
-       arg.cid = cap->cap_id;
-       arg.follows = flushing ? ci->i_head_snapc->seq : 0;
-       arg.flush_tid = flush_tid;
-       arg.oldest_flush_tid = oldest_flush_tid;
+       arg->session = cap->session;
+       arg->ino = ceph_vino(inode).ino;
+       arg->cid = cap->cap_id;
+       arg->follows = flushing ? ci->i_head_snapc->seq : 0;
+       arg->flush_tid = flush_tid;
+       arg->oldest_flush_tid = oldest_flush_tid;
 
-       arg.size = inode->i_size;
-       ci->i_reported_size = arg.size;
-       arg.max_size = ci->i_wanted_max_size;
+       arg->size = inode->i_size;
+       ci->i_reported_size = arg->size;
+       arg->max_size = ci->i_wanted_max_size;
        if (cap == ci->i_auth_cap)
-               ci->i_requested_max_size = arg.max_size;
+               ci->i_requested_max_size = arg->max_size;
 
        if (flushing & CEPH_CAP_XATTR_EXCL) {
-               old_blob = __ceph_build_xattrs_blob(ci);
-               arg.xattr_version = ci->i_xattrs.version;
-               arg.xattr_buf = ci->i_xattrs.blob;
+               arg->old_xattr_buf = __ceph_build_xattrs_blob(ci);
+               arg->xattr_version = ci->i_xattrs.version;
+               arg->xattr_buf = ci->i_xattrs.blob;
        } else {
-               arg.xattr_buf = NULL;
+               arg->xattr_buf = NULL;
+               arg->old_xattr_buf = NULL;
        }
 
-       arg.mtime = inode->i_mtime;
-       arg.atime = inode->i_atime;
-       arg.ctime = inode->i_ctime;
-       arg.btime = ci->i_btime;
-       arg.change_attr = inode_peek_iversion_raw(inode);
+       arg->mtime = inode->i_mtime;
+       arg->atime = inode->i_atime;
+       arg->ctime = inode->i_ctime;
+       arg->btime = ci->i_btime;
+       arg->change_attr = inode_peek_iversion_raw(inode);
 
-       arg.op = op;
-       arg.caps = cap->implemented;
-       arg.wanted = want;
-       arg.dirty = flushing;
+       arg->op = op;
+       arg->caps = cap->implemented;
+       arg->wanted = want;
+       arg->dirty = flushing;
 
-       arg.seq = cap->seq;
-       arg.issue_seq = cap->issue_seq;
-       arg.mseq = cap->mseq;
-       arg.time_warp_seq = ci->i_time_warp_seq;
+       arg->seq = cap->seq;
+       arg->issue_seq = cap->issue_seq;
+       arg->mseq = cap->mseq;
+       arg->time_warp_seq = ci->i_time_warp_seq;
 
-       arg.uid = inode->i_uid;
-       arg.gid = inode->i_gid;
-       arg.mode = inode->i_mode;
+       arg->uid = inode->i_uid;
+       arg->gid = inode->i_gid;
+       arg->mode = inode->i_mode;
 
-       arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+       arg->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
        if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
            !list_empty(&ci->i_cap_snaps)) {
                struct ceph_cap_snap *capsnap;
@@ -1429,27 +1414,35 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
                        }
                }
        }
-       arg.flags = flags;
-
-       spin_unlock(&ci->i_ceph_lock);
+       arg->flags = flags;
+}
 
-       ceph_buffer_put(old_blob);
+/*
+ * Send a cap msg on the given inode.
+ *
+ * Caller should hold snap_rwsem (read), s_mutex.
+ */
+static void __send_cap(struct ceph_mds_client *mdsc, struct cap_msg_args *arg,
+                      struct ceph_inode_info *ci)
+{
+       struct inode *inode = &ci->vfs_inode;
+       int ret;
 
-       ret = send_cap_msg(&arg);
+       ret = send_cap_msg(arg);
        if (ret < 0) {
                pr_err("error sending cap msg, ino (%llx.%llx) "
                       "flushing %s tid %llu, requeue\n",
-                      ceph_vinop(inode), ceph_cap_string(flushing),
-                      flush_tid);
+                      ceph_vinop(inode), ceph_cap_string(arg->dirty),
+                      arg->flush_tid);
                spin_lock(&ci->i_ceph_lock);
                __cap_delay_requeue(mdsc, ci);
                spin_unlock(&ci->i_ceph_lock);
        }
 
-       if (wake)
-               wake_up_all(&ci->i_cap_wq);
+       ceph_buffer_put(arg->old_xattr_buf);
 
-       return ret;
+       if (arg->wake)
+               wake_up_all(&ci->i_cap_wq);
 }
 
 static inline int __send_flush_snap(struct inode *inode,
@@ -1470,6 +1463,7 @@ static inline int __send_flush_snap(struct inode *inode,
        arg.max_size = 0;
        arg.xattr_version = capsnap->xattr_version;
        arg.xattr_buf = capsnap->xattr_blob;
+       arg.old_xattr_buf = NULL;
 
        arg.atime = capsnap->atime;
        arg.mtime = capsnap->mtime;
@@ -1493,6 +1487,7 @@ static inline int __send_flush_snap(struct inode *inode,
 
        arg.inline_data = capsnap->inline_data;
        arg.flags = 0;
+       arg.wake = false;
 
        return send_cap_msg(&arg);
 }
@@ -1967,6 +1962,8 @@ retry_locked:
        }
 
        for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+               struct cap_msg_args arg;
+
                cap = rb_entry(p, struct ceph_cap, ci_node);
 
                /* avoid looping forever */
@@ -2094,9 +2091,12 @@ ack:
 
                mds = cap->mds;  /* remember mds, so we don't repeat */
 
-               /* __send_cap drops i_ceph_lock */
-               __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, 0, cap_used, want,
+               __prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, 0, cap_used, want,
                           retain, flushing, flush_tid, oldest_flush_tid);
+               spin_unlock(&ci->i_ceph_lock);
+
+               __send_cap(mdsc, &arg, ci);
+
                goto retry; /* retake i_ceph_lock and restart our cap scan. */
        }
 
@@ -2135,6 +2135,7 @@ retry:
 retry_locked:
        if (ci->i_dirty_caps && ci->i_auth_cap) {
                struct ceph_cap *cap = ci->i_auth_cap;
+               struct cap_msg_args arg;
 
                if (session != cap->session) {
                        spin_unlock(&ci->i_ceph_lock);
@@ -2162,11 +2163,13 @@ retry_locked:
                flush_tid = __mark_caps_flushing(inode, session, true,
                                                 &oldest_flush_tid);
 
-               /* __send_cap drops i_ceph_lock */
-               __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
+               __prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
                           __ceph_caps_used(ci), __ceph_caps_wanted(ci),
                           (cap->issued | cap->implemented),
                           flushing, flush_tid, oldest_flush_tid);
+               spin_unlock(&ci->i_ceph_lock);
+
+               __send_cap(mdsc, &arg, ci);
        } else {
                if (!list_empty(&ci->i_cap_flush_list)) {
                        struct ceph_cap_flush *cf =
@@ -2368,15 +2371,19 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
                first_tid = cf->tid + 1;
 
                if (cf->caps) {
+                       struct cap_msg_args arg;
+
                        dout("kick_flushing_caps %p cap %p tid %llu %s\n",
                             inode, cap, cf->tid, ceph_cap_string(cf->caps));
-                       __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+                       __prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH,
                                         (cf->tid < last_snap_flush ?
                                          CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
                                          __ceph_caps_used(ci),
                                          __ceph_caps_wanted(ci),
                                          (cap->issued | cap->implemented),
                                          cf->caps, cf->tid, oldest_flush_tid);
+                       spin_unlock(&ci->i_ceph_lock);
+                       __send_cap(mdsc, &arg, ci);
                } else {
                        struct ceph_cap_snap *capsnap =
                                        container_of(cf, struct ceph_cap_snap,