Merge tag 'ceph-for-5.14-rc6' of git://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Fri, 13 Aug 2021 02:16:01 +0000 (16:16 -1000)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 13 Aug 2021 02:16:01 +0000 (16:16 -1000)
Pull ceph fixes from Ilya Dryomov:
 "A patch to avoid a soft lockup in ceph_check_delayed_caps() from Luis
  and a reference handling fix from Jeff that should address some memory
  corruption reports in the snaprealm area.

  Both marked for stable"

* tag 'ceph-for-5.14-rc6' of git://github.com/ceph/ceph-client:
  ceph: take snap_empty_lock atomically with snaprealm refcount change
  ceph: reduce contention in ceph_check_delayed_caps()

fs/ceph/caps.c
fs/ceph/mds_client.c
fs/ceph/snap.c
fs/ceph/super.h

index 7bdefd0..2a29009 100644 (file)
@@ -4150,11 +4150,19 @@ bad:
 
 /*
  * Delayed work handler to process end of delayed cap release LRU list.
+ *
+ * If new caps are added to the list while processing it, these won't get
+ * processed in this run.  In this case, the ci->i_hold_caps_max will be
+ * returned so that the work can be scheduled accordingly.
  */
-void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
+unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
 {
        struct inode *inode;
        struct ceph_inode_info *ci;
+       struct ceph_mount_options *opt = mdsc->fsc->mount_options;
+       unsigned long delay_max = opt->caps_wanted_delay_max * HZ;
+       unsigned long loop_start = jiffies;
+       unsigned long delay = 0;
 
        dout("check_delayed_caps\n");
        spin_lock(&mdsc->cap_delay_lock);
@@ -4162,6 +4170,11 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
                ci = list_first_entry(&mdsc->cap_delay_list,
                                      struct ceph_inode_info,
                                      i_cap_delay_list);
+               if (time_before(loop_start, ci->i_hold_caps_max - delay_max)) {
+                       dout("%s caps added recently.  Exiting loop", __func__);
+                       delay = ci->i_hold_caps_max;
+                       break;
+               }
                if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
                    time_before(jiffies, ci->i_hold_caps_max))
                        break;
@@ -4177,6 +4190,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
                }
        }
        spin_unlock(&mdsc->cap_delay_lock);
+
+       return delay;
 }
 
 /*
index 9db1b39..afdc202 100644 (file)
@@ -4490,22 +4490,29 @@ void inc_session_sequence(struct ceph_mds_session *s)
 }
 
 /*
- * delayed work -- periodically trim expired leases, renew caps with mds
+ * delayed work -- periodically trim expired leases, renew caps with mds.  If
+ * the @delay parameter is set to 0 or if it's more than 5 secs, the default
+ * workqueue delay value of 5 secs will be used.
  */
-static void schedule_delayed(struct ceph_mds_client *mdsc)
+static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
 {
-       int delay = 5;
-       unsigned hz = round_jiffies_relative(HZ * delay);
-       schedule_delayed_work(&mdsc->delayed_work, hz);
+       unsigned long max_delay = HZ * 5;
+
+       /* 5 secs default delay */
+       if (!delay || (delay > max_delay))
+               delay = max_delay;
+       schedule_delayed_work(&mdsc->delayed_work,
+                             round_jiffies_relative(delay));
 }
 
 static void delayed_work(struct work_struct *work)
 {
-       int i;
        struct ceph_mds_client *mdsc =
                container_of(work, struct ceph_mds_client, delayed_work.work);
+       unsigned long delay;
        int renew_interval;
        int renew_caps;
+       int i;
 
        dout("mdsc delayed_work\n");
 
@@ -4545,7 +4552,7 @@ static void delayed_work(struct work_struct *work)
        }
        mutex_unlock(&mdsc->mutex);
 
-       ceph_check_delayed_caps(mdsc);
+       delay = ceph_check_delayed_caps(mdsc);
 
        ceph_queue_cap_reclaim_work(mdsc);
 
@@ -4553,7 +4560,7 @@ static void delayed_work(struct work_struct *work)
 
        maybe_recover_session(mdsc);
 
-       schedule_delayed(mdsc);
+       schedule_delayed(mdsc, delay);
 }
 
 int ceph_mdsc_init(struct ceph_fs_client *fsc)
@@ -5030,7 +5037,7 @@ void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
                          mdsc->mdsmap->m_epoch);
 
        mutex_unlock(&mdsc->mutex);
-       schedule_delayed(mdsc);
+       schedule_delayed(mdsc, 0);
        return;
 
 bad_unlock:
index 4ac0606..4c6bd10 100644 (file)
@@ -67,19 +67,19 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
 {
        lockdep_assert_held(&mdsc->snap_rwsem);
 
-       dout("get_realm %p %d -> %d\n", realm,
-            atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
        /*
-        * since we _only_ increment realm refs or empty the empty
-        * list with snap_rwsem held, adjusting the empty list here is
-        * safe.  we do need to protect against concurrent empty list
-        * additions, however.
+        * The 0->1 and 1->0 transitions must take the snap_empty_lock
+        * atomically with the refcount change. Go ahead and bump the
+        * nref here, unless it's 0, in which case we take the spinlock
+        * and then do the increment and remove it from the list.
         */
-       if (atomic_inc_return(&realm->nref) == 1) {
-               spin_lock(&mdsc->snap_empty_lock);
+       if (atomic_inc_not_zero(&realm->nref))
+               return;
+
+       spin_lock(&mdsc->snap_empty_lock);
+       if (atomic_inc_return(&realm->nref) == 1)
                list_del_init(&realm->empty_item);
-               spin_unlock(&mdsc->snap_empty_lock);
-       }
+       spin_unlock(&mdsc->snap_empty_lock);
 }
 
 static void __insert_snap_realm(struct rb_root *root,
@@ -208,28 +208,28 @@ static void __put_snap_realm(struct ceph_mds_client *mdsc,
 {
        lockdep_assert_held_write(&mdsc->snap_rwsem);
 
-       dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
-            atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
+       /*
+        * We do not require the snap_empty_lock here, as any caller that
+        * increments the value must hold the snap_rwsem.
+        */
        if (atomic_dec_and_test(&realm->nref))
                __destroy_snap_realm(mdsc, realm);
 }
 
 /*
- * caller needn't hold any locks
+ * See comments in ceph_get_snap_realm. Caller needn't hold any locks.
  */
 void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
                         struct ceph_snap_realm *realm)
 {
-       dout("put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
-            atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
-       if (!atomic_dec_and_test(&realm->nref))
+       if (!atomic_dec_and_lock(&realm->nref, &mdsc->snap_empty_lock))
                return;
 
        if (down_write_trylock(&mdsc->snap_rwsem)) {
+               spin_unlock(&mdsc->snap_empty_lock);
                __destroy_snap_realm(mdsc, realm);
                up_write(&mdsc->snap_rwsem);
        } else {
-               spin_lock(&mdsc->snap_empty_lock);
                list_add(&realm->empty_item, &mdsc->snap_empty);
                spin_unlock(&mdsc->snap_empty_lock);
        }
index 6b6332a..9215a2f 100644 (file)
@@ -1167,7 +1167,7 @@ extern void ceph_flush_snaps(struct ceph_inode_info *ci,
 extern bool __ceph_should_report_size(struct ceph_inode_info *ci);
 extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
                            struct ceph_mds_session *session);
-extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
+extern unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
 extern void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc);
 extern int  ceph_drop_caps_for_unlink(struct inode *inode);
 extern int ceph_encode_inode_release(void **p, struct inode *inode,