powerpc/qspinlock: allow new waiters to steal the lock before queueing
authorNicholas Piggin <npiggin@gmail.com>
Sat, 26 Nov 2022 09:59:19 +0000 (19:59 +1000)
committerMichael Ellerman <mpe@ellerman.id.au>
Fri, 2 Dec 2022 06:48:49 +0000 (17:48 +1100)
Allow new waiters to "steal" the lock before queueing. That is, to
acquire it while other CPUs have queued.

This particularly helps paravirt performance when physical CPUs are
oversubscribed, by keeping the lock from becoming a strict FIFO and
vCPU preemption causing queue train wrecks.

The new __queued_spin_trylock_steal() function is put in qspinlock.h
to save having to move it, because it will be used there by a later
change.

Signed-off-by: Nicholas Piggin <npiggin@gmail.com>
Signed-off-by: Michael Ellerman <mpe@ellerman.id.au>
Link: https://lore.kernel.org/r/20221126095932.1234527-5-npiggin@gmail.com
arch/powerpc/include/asm/qspinlock.h
arch/powerpc/lib/qspinlock.c

index c16e1f0..cebd2c8 100644 (file)
@@ -41,6 +41,29 @@ static __always_inline int queued_spin_trylock(struct qspinlock *lock)
        return likely(prev == 0);
 }
 
+static __always_inline int __queued_spin_trylock_steal(struct qspinlock *lock)
+{
+       u32 prev, tmp;
+
+       /* Trylock may get ahead of queued nodes if it finds unlocked */
+       asm volatile(
+"1:    lwarx   %0,0,%2,%5      # __queued_spin_trylock_steal           \n"
+"      andc.   %1,%0,%4                                                \n"
+"      bne-    2f                                                      \n"
+"      and     %1,%0,%4                                                \n"
+"      or      %1,%1,%3                                                \n"
+"      stwcx.  %1,0,%2                                                 \n"
+"      bne-    1b                                                      \n"
+"\t"   PPC_ACQUIRE_BARRIER "                                           \n"
+"2:                                                                    \n"
+       : "=&r" (prev), "=&r" (tmp)
+       : "r" (&lock->val), "r" (_Q_LOCKED_VAL), "r" (_Q_TAIL_CPU_MASK),
+         "i" (IS_ENABLED(CONFIG_PPC64))
+       : "cr0", "memory");
+
+       return likely(!(prev & ~_Q_TAIL_CPU_MASK));
+}
+
 void queued_spin_lock_slowpath(struct qspinlock *lock);
 
 static __always_inline void queued_spin_lock(struct qspinlock *lock)
index 645d9af..6ffd326 100644 (file)
@@ -19,8 +19,17 @@ struct qnodes {
        struct qnode nodes[MAX_NODES];
 };
 
+/* Tuning parameters */
+static int steal_spins __read_mostly = (1 << 5);
+static bool maybe_stealers __read_mostly = true;
+
 static DEFINE_PER_CPU_ALIGNED(struct qnodes, qnodes);
 
+static __always_inline int get_steal_spins(void)
+{
+       return steal_spins;
+}
+
 static inline u32 encode_tail_cpu(int cpu)
 {
        return (cpu + 1) << _Q_TAIL_CPU_OFFSET;
@@ -38,33 +47,35 @@ static inline int decode_tail_cpu(u32 val)
  * This is used by the head of the queue to acquire the lock and clean up
  * its tail if it was the last one queued.
  */
-static __always_inline u32 set_locked_clean_tail(struct qspinlock *lock, u32 tail)
+static __always_inline u32 trylock_clean_tail(struct qspinlock *lock, u32 tail)
 {
        u32 newval = _Q_LOCKED_VAL;
        u32 prev, tmp;
 
        asm volatile(
-"1:    lwarx   %0,0,%2,%6      # set_locked_clean_tail                 \n"
-       /* Test whether the lock tail == tail */
-"      and     %1,%0,%5                                                \n"
+"1:    lwarx   %0,0,%2,%7      # trylock_clean_tail                    \n"
+       /* This test is necessary if there could be stealers */
+"      andi.   %1,%0,%5                                                \n"
+"      bne     3f                                                      \n"
+       /* Test whether the lock tail == mytail */
+"      and     %1,%0,%6                                                \n"
 "      cmpw    0,%1,%3                                                 \n"
        /* Merge the new locked value */
 "      or      %1,%1,%4                                                \n"
 "      bne     2f                                                      \n"
        /* If the lock tail matched, then clear it, otherwise leave it. */
-"      andc    %1,%1,%5                                                \n"
+"      andc    %1,%1,%6                                                \n"
 "2:    stwcx.  %1,0,%2                                                 \n"
 "      bne-    1b                                                      \n"
 "\t"   PPC_ACQUIRE_BARRIER "                                           \n"
 "3:                                                                    \n"
        : "=&r" (prev), "=&r" (tmp)
        : "r" (&lock->val), "r"(tail), "r" (newval),
+         "i" (_Q_LOCKED_VAL),
          "r" (_Q_TAIL_CPU_MASK),
          "i" (IS_ENABLED(CONFIG_PPC64))
        : "cr0", "memory");
 
-       BUG_ON(prev & _Q_LOCKED_VAL);
-
        return prev;
 }
 
@@ -117,6 +128,30 @@ static struct qnode *get_tail_qnode(struct qspinlock *lock, u32 val)
        BUG();
 }
 
+static inline bool try_to_steal_lock(struct qspinlock *lock)
+{
+       int iters = 0;
+
+       if (!steal_spins)
+               return false;
+
+       /* Attempt to steal the lock */
+       do {
+               u32 val = READ_ONCE(lock->val);
+
+               if (unlikely(!(val & _Q_LOCKED_VAL))) {
+                       if (__queued_spin_trylock_steal(lock))
+                               return true;
+               } else {
+                       cpu_relax();
+               }
+
+               iters++;
+       } while (iters < get_steal_spins());
+
+       return false;
+}
+
 static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock)
 {
        struct qnodes *qnodesp;
@@ -166,6 +201,7 @@ static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock)
                smp_rmb(); /* acquire barrier for the mcs lock */
        }
 
+again:
        /* We're at the head of the waitqueue, wait for the lock. */
        for (;;) {
                val = READ_ONCE(lock->val);
@@ -176,9 +212,14 @@ static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock)
        }
 
        /* If we're the last queued, must clean up the tail. */
-       old = set_locked_clean_tail(lock, tail);
+       old = trylock_clean_tail(lock, tail);
+       if (unlikely(old & _Q_LOCKED_VAL)) {
+               BUG_ON(!maybe_stealers);
+               goto again; /* Can only be true if maybe_stealers. */
+       }
+
        if ((old & _Q_TAIL_CPU_MASK) == tail)
-               goto release; /* Another waiter must have enqueued */
+               goto release; /* We were the tail, no next. */
 
        /* There is a next, must wait for node->next != NULL (MCS protocol) */
        while (!(next = READ_ONCE(node->next)))
@@ -199,6 +240,9 @@ release:
 
 void queued_spin_lock_slowpath(struct qspinlock *lock)
 {
+       if (try_to_steal_lock(lock))
+               return;
+
        queued_spin_lock_mcs_queue(lock);
 }
 EXPORT_SYMBOL(queued_spin_lock_slowpath);
@@ -208,3 +252,51 @@ void pv_spinlocks_init(void)
 {
 }
 #endif
+
+#include <linux/debugfs.h>
+static int steal_spins_set(void *data, u64 val)
+{
+       static DEFINE_MUTEX(lock);
+
+       /*
+        * The lock slow path has a !maybe_stealers case that can assume
+        * the head of queue will not see concurrent waiters. That waiter
+        * is unsafe in the presence of stealers, so must keep them away
+        * from one another.
+        */
+
+       mutex_lock(&lock);
+       if (val && !steal_spins) {
+               maybe_stealers = true;
+               /* wait for queue head waiter to go away */
+               synchronize_rcu();
+               steal_spins = val;
+       } else if (!val && steal_spins) {
+               steal_spins = val;
+               /* wait for all possible stealers to go away */
+               synchronize_rcu();
+               maybe_stealers = false;
+       } else {
+               steal_spins = val;
+       }
+       mutex_unlock(&lock);
+
+       return 0;
+}
+
+static int steal_spins_get(void *data, u64 *val)
+{
+       *val = steal_spins;
+
+       return 0;
+}
+
+DEFINE_SIMPLE_ATTRIBUTE(fops_steal_spins, steal_spins_get, steal_spins_set, "%llu\n");
+
+static __init int spinlock_debugfs_init(void)
+{
+       debugfs_create_file("qspl_steal_spins", 0600, arch_debugfs_dir, NULL, &fops_steal_spins);
+
+       return 0;
+}
+device_initcall(spinlock_debugfs_init);