rcu/kvfree: Split ready for reclaim objects from a batch
authorUladzislau Rezki (Sony) <urezki@gmail.com>
Wed, 14 Dec 2022 12:06:30 +0000 (13:06 +0100)
committerPaul E. McKenney <paulmck@kernel.org>
Wed, 4 Jan 2023 01:48:41 +0000 (17:48 -0800)
This patch splits the lists of objects so as to avoid sending any
through RCU that have already been queued for more than one grace
period.  These long-term-resident objects are immediately freed.
The remaining short-term-resident objects are queued for later freeing
using queue_rcu_work().

This change avoids delaying workqueue handlers with synchronize_rcu()
invocations.  Yes, workqueue handlers are designed to handle blocking,
but avoiding blocking when unnecessary improves performance during
low-memory situations.

Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
kernel/rcu/tree.c

index 52f4c7e..0b4f7dd 100644 (file)
@@ -2900,15 +2900,13 @@ struct kvfree_rcu_bulk_data {
  * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
  * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
  * @head_free: List of kfree_rcu() objects waiting for a grace period
- * @head_free_gp_snap: Snapshot of RCU state for objects placed to "@head_free"
  * @bulk_head_free: Bulk-List of kvfree_rcu() objects waiting for a grace period
  * @krcp: Pointer to @kfree_rcu_cpu structure
  */
 
 struct kfree_rcu_cpu_work {
-       struct work_struct rcu_work;
+       struct rcu_work rcu_work;
        struct rcu_head *head_free;
-       unsigned long head_free_gp_snap;
        struct list_head bulk_head_free[FREE_N_CHANNELS];
        struct kfree_rcu_cpu *krcp;
 };
@@ -2916,6 +2914,7 @@ struct kfree_rcu_cpu_work {
 /**
  * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
  * @head: List of kfree_rcu() objects not yet waiting for a grace period
+ * @head_gp_snap: Snapshot of RCU state for objects placed to "@head"
  * @bulk_head: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period
  * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
  * @lock: Synchronize access to this structure
@@ -2943,6 +2942,7 @@ struct kfree_rcu_cpu {
        // Objects queued on a linked list
        // through their rcu_head structures.
        struct rcu_head *head;
+       unsigned long head_gp_snap;
        atomic_t head_count;
 
        // Objects queued on a bulk-list.
@@ -3111,10 +3111,9 @@ static void kfree_rcu_work(struct work_struct *work)
        struct rcu_head *head;
        struct kfree_rcu_cpu *krcp;
        struct kfree_rcu_cpu_work *krwp;
-       unsigned long head_free_gp_snap;
        int i;
 
-       krwp = container_of(work,
+       krwp = container_of(to_rcu_work(work),
                struct kfree_rcu_cpu_work, rcu_work);
        krcp = krwp->krcp;
 
@@ -3126,26 +3125,11 @@ static void kfree_rcu_work(struct work_struct *work)
        // Channel 3.
        head = krwp->head_free;
        krwp->head_free = NULL;
-       head_free_gp_snap = krwp->head_free_gp_snap;
        raw_spin_unlock_irqrestore(&krcp->lock, flags);
 
        // Handle the first two channels.
        for (i = 0; i < FREE_N_CHANNELS; i++) {
                // Start from the tail page, so a GP is likely passed for it.
-               list_for_each_entry_safe_reverse(bnode, n, &bulk_head[i], list) {
-                       // Not yet ready? Bail out since we need one more GP.
-                       if (!poll_state_synchronize_rcu(bnode->gp_snap))
-                               break;
-
-                       list_del_init(&bnode->list);
-                       kvfree_rcu_bulk(krcp, bnode, i);
-               }
-
-               // Please note a request for one more extra GP can
-               // occur only once for all objects in this batch.
-               if (!list_empty(&bulk_head[i]))
-                       synchronize_rcu();
-
                list_for_each_entry_safe(bnode, n, &bulk_head[i], list)
                        kvfree_rcu_bulk(krcp, bnode, i);
        }
@@ -3157,10 +3141,7 @@ static void kfree_rcu_work(struct work_struct *work)
         * queued on a linked list through their rcu_head structures.
         * This list is named "Channel 3".
         */
-       if (head) {
-               cond_synchronize_rcu(head_free_gp_snap);
-               kvfree_rcu_list(head);
-       }
+       kvfree_rcu_list(head);
 }
 
 static bool
@@ -3201,6 +3182,44 @@ schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp)
        queue_delayed_work(system_wq, &krcp->monitor_work, delay);
 }
 
+static void
+kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp)
+{
+       struct list_head bulk_ready[FREE_N_CHANNELS];
+       struct kvfree_rcu_bulk_data *bnode, *n;
+       struct rcu_head *head_ready = NULL;
+       unsigned long flags;
+       int i;
+
+       raw_spin_lock_irqsave(&krcp->lock, flags);
+       for (i = 0; i < FREE_N_CHANNELS; i++) {
+               INIT_LIST_HEAD(&bulk_ready[i]);
+
+               list_for_each_entry_safe_reverse(bnode, n, &krcp->bulk_head[i], list) {
+                       if (!poll_state_synchronize_rcu(bnode->gp_snap))
+                               break;
+
+                       atomic_sub(bnode->nr_records, &krcp->bulk_count[i]);
+                       list_move(&bnode->list, &bulk_ready[i]);
+               }
+       }
+
+       if (krcp->head && poll_state_synchronize_rcu(krcp->head_gp_snap)) {
+               head_ready = krcp->head;
+               atomic_set(&krcp->head_count, 0);
+               WRITE_ONCE(krcp->head, NULL);
+       }
+       raw_spin_unlock_irqrestore(&krcp->lock, flags);
+
+       for (i = 0; i < FREE_N_CHANNELS; i++) {
+               list_for_each_entry_safe(bnode, n, &bulk_ready[i], list)
+                       kvfree_rcu_bulk(krcp, bnode, i);
+       }
+
+       if (head_ready)
+               kvfree_rcu_list(head_ready);
+}
+
 /*
  * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
  */
@@ -3211,6 +3230,9 @@ static void kfree_rcu_monitor(struct work_struct *work)
        unsigned long flags;
        int i, j;
 
+       // Drain ready for reclaim.
+       kvfree_rcu_drain_ready(krcp);
+
        raw_spin_lock_irqsave(&krcp->lock, flags);
 
        // Attempt to start a new batch.
@@ -3230,8 +3252,9 @@ static void kfree_rcu_monitor(struct work_struct *work)
                        // Channel 2 corresponds to vmalloc-pointer bulk path.
                        for (j = 0; j < FREE_N_CHANNELS; j++) {
                                if (list_empty(&krwp->bulk_head_free[j])) {
-                                       list_replace_init(&krcp->bulk_head[j], &krwp->bulk_head_free[j]);
                                        atomic_set(&krcp->bulk_count[j], 0);
+                                       list_replace_init(&krcp->bulk_head[j],
+                                               &krwp->bulk_head_free[j]);
                                }
                        }
 
@@ -3239,13 +3262,8 @@ static void kfree_rcu_monitor(struct work_struct *work)
                        // objects queued on the linked list.
                        if (!krwp->head_free) {
                                krwp->head_free = krcp->head;
-                               WRITE_ONCE(krcp->head, NULL);
                                atomic_set(&krcp->head_count, 0);
-
-                               // Take a snapshot for this krwp. Please note no more
-                               // any objects can be added to attached head_free channel
-                               // therefore fixate a GP for it here.
-                               krwp->head_free_gp_snap = get_state_synchronize_rcu();
+                               WRITE_ONCE(krcp->head, NULL);
                        }
 
                        // One work is per one batch, so there are three
@@ -3253,7 +3271,7 @@ static void kfree_rcu_monitor(struct work_struct *work)
                        // be that the work is in the pending state when
                        // channels have been detached following by each
                        // other.
-                       queue_work(system_wq, &krwp->rcu_work);
+                       queue_rcu_work(system_wq, &krwp->rcu_work);
                }
        }
 
@@ -3440,6 +3458,9 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr)
                head->next = krcp->head;
                WRITE_ONCE(krcp->head, head);
                atomic_inc(&krcp->head_count);
+
+               // Take a snapshot for this krcp.
+               krcp->head_gp_snap = get_state_synchronize_rcu();
                success = true;
        }
 
@@ -4834,7 +4855,7 @@ static void __init kfree_rcu_batch_init(void)
                struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
                for (i = 0; i < KFREE_N_BATCHES; i++) {
-                       INIT_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
+                       INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
                        krcp->krw_arr[i].krcp = krcp;
 
                        for (j = 0; j < FREE_N_CHANNELS; j++)