rcu/nocb: Remove rcu_node structure from nocb list when de-offloaded
authorFrederic Weisbecker <frederic@kernel.org>
Tue, 23 Nov 2021 00:37:03 +0000 (01:37 +0100)
committerPaul E. McKenney <paulmck@kernel.org>
Thu, 9 Dec 2021 19:34:07 +0000 (11:34 -0800)
The nocb_gp_wait() function iterates over all CPUs in its group,
including even those CPUs that have been de-offloaded.  This is of
course suboptimal, especially if none of the CPUs within the group are
currently offloaded.  This will become even more of a problem once a
nocb kthread is created for all possible CPUs.

Therefore use a standard double linked list to link all the offloaded
rcu_data structures and safely add or delete these structure as we
offload or de-offload them, respectively.

Reviewed-by: Neeraj Upadhyay <quic_neeraju@quicinc.com>
Signed-off-by: Frederic Weisbecker <frederic@kernel.org>
Cc: Boqun Feng <boqun.feng@gmail.com>
Cc: Uladzislau Rezki <urezki@gmail.com>
Cc: Josh Triplett <josh@joshtriplett.org>
Cc: Joel Fernandes <joel@joelfernandes.org>
Tested-by: Juri Lelli <juri.lelli@redhat.com>
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
kernel/rcu/tree.h
kernel/rcu/tree_nocb.h

index 4f6c67b..5884380 100644 (file)
@@ -227,8 +227,11 @@ struct rcu_data {
        struct swait_queue_head nocb_gp_wq; /* For nocb kthreads to sleep on. */
        bool nocb_cb_sleep;             /* Is the nocb CB thread asleep? */
        struct task_struct *nocb_cb_kthread;
-       struct rcu_data *nocb_next_cb_rdp;
-                                       /* Next rcu_data in wakeup chain. */
+       struct list_head nocb_head_rdp; /*
+                                        * Head of rcu_data list in wakeup chain,
+                                        * if rdp_gp.
+                                        */
+       struct list_head nocb_entry_rdp; /* rcu_data node in wakeup chain. */
 
        /* The following fields are used by CB kthread, hence new cacheline. */
        struct rcu_data *nocb_gp_rdp ____cacheline_internodealigned_in_smp;
index 2461fe8..8e94a53 100644 (file)
@@ -625,7 +625,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
         * and the global grace-period kthread are awakened if needed.
         */
        WARN_ON_ONCE(my_rdp->nocb_gp_rdp != my_rdp);
-       for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
+       /*
+        * An rcu_data structure is removed from the list after its
+        * CPU is de-offloaded and added to the list before that CPU is
+        * (re-)offloaded.  If the following loop happens to be referencing
+        * that rcu_data structure during the time that the corresponding
+        * CPU is de-offloaded and then immediately re-offloaded, this
+        * loop's rdp pointer will be carried to the end of the list by
+        * the resulting pair of list operations.  This can cause the loop
+        * to skip over some of the rcu_data structures that were supposed
+        * to have been scanned.  Fortunately a new iteration through the
+        * entire loop is forced after a given CPU's rcu_data structure
+        * is added to the list, so the skipped-over rcu_data structures
+        * won't be ignored for long.
+        */
+       list_for_each_entry_rcu(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp, 1) {
                bool needwake_state = false;
 
                if (!nocb_gp_enabled_cb(rdp))
@@ -1003,6 +1017,8 @@ static long rcu_nocb_rdp_deoffload(void *arg)
        swait_event_exclusive(rdp->nocb_state_wq,
                              !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
                                                        SEGCBLIST_KTHREAD_GP));
+       /* Stop nocb_gp_wait() from iterating over this structure. */
+       list_del_rcu(&rdp->nocb_entry_rdp);
        /*
         * Lock one last time to acquire latest callback updates from kthreads
         * so we can later handle callbacks locally without locking.
@@ -1066,6 +1082,17 @@ static long rcu_nocb_rdp_offload(void *arg)
                return -EINVAL;
 
        pr_info("Offloading %d\n", rdp->cpu);
+
+       /*
+        * Cause future nocb_gp_wait() invocations to iterate over
+        * structure, resetting ->nocb_gp_sleep and waking up the related
+        * "rcuog".  Since nocb_gp_wait() in turn locks ->nocb_gp_lock
+        * before setting ->nocb_gp_sleep again, we are guaranteed to
+        * iterate this newly added structure before "rcuog" goes to
+        * sleep again.
+        */
+       list_add_tail_rcu(&rdp->nocb_entry_rdp, &rdp->nocb_gp_rdp->nocb_head_rdp);
+
        /*
         * Can't use rcu_nocb_lock_irqsave() before SEGCBLIST_LOCKING
         * is set.
@@ -1268,7 +1295,6 @@ static void __init rcu_organize_nocb_kthreads(void)
        int nl = 0;  /* Next GP kthread. */
        struct rcu_data *rdp;
        struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */
-       struct rcu_data *rdp_prev = NULL;
 
        if (!cpumask_available(rcu_nocb_mask))
                return;
@@ -1288,8 +1314,8 @@ static void __init rcu_organize_nocb_kthreads(void)
                        /* New GP kthread, set up for CBs & next GP. */
                        gotnocbs = true;
                        nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
-                       rdp->nocb_gp_rdp = rdp;
                        rdp_gp = rdp;
+                       INIT_LIST_HEAD(&rdp->nocb_head_rdp);
                        if (dump_tree) {
                                if (!firsttime)
                                        pr_cont("%s\n", gotnocbscbs
@@ -1302,12 +1328,11 @@ static void __init rcu_organize_nocb_kthreads(void)
                } else {
                        /* Another CB kthread, link to previous GP kthread. */
                        gotnocbscbs = true;
-                       rdp->nocb_gp_rdp = rdp_gp;
-                       rdp_prev->nocb_next_cb_rdp = rdp;
                        if (dump_tree)
                                pr_cont(" %d", cpu);
                }
-               rdp_prev = rdp;
+               rdp->nocb_gp_rdp = rdp_gp;
+               list_add_tail(&rdp->nocb_entry_rdp, &rdp_gp->nocb_head_rdp);
        }
        if (gotnocbs && dump_tree)
                pr_cont("%s\n", gotnocbscbs ? "" : " (self only)");
@@ -1369,6 +1394,7 @@ static void show_rcu_nocb_state(struct rcu_data *rdp)
 {
        char bufw[20];
        char bufr[20];
+       struct rcu_data *nocb_next_rdp;
        struct rcu_segcblist *rsclp = &rdp->cblist;
        bool waslocked;
        bool wassleep;
@@ -1376,11 +1402,16 @@ static void show_rcu_nocb_state(struct rcu_data *rdp)
        if (rdp->nocb_gp_rdp == rdp)
                show_rcu_nocb_gp_state(rdp);
 
+       nocb_next_rdp = list_next_or_null_rcu(&rdp->nocb_gp_rdp->nocb_head_rdp,
+                                             &rdp->nocb_entry_rdp,
+                                             typeof(*rdp),
+                                             nocb_entry_rdp);
+
        sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]);
        sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]);
        pr_info("   CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n",
                rdp->cpu, rdp->nocb_gp_rdp->cpu,
-               rdp->nocb_next_cb_rdp ? rdp->nocb_next_cb_rdp->cpu : -1,
+               nocb_next_rdp ? nocb_next_rdp->cpu : -1,
                "kK"[!!rdp->nocb_cb_kthread],
                "bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
                "cC"[!!atomic_read(&rdp->nocb_lock_contended)],