RDMA/rxe: Add workqueue support for rxe tasks
authorBob Pearson <rpearsonhpe@gmail.com>
Fri, 28 Apr 2023 17:13:22 +0000 (12:13 -0500)
committerJason Gunthorpe <jgg@nvidia.com>
Wed, 17 May 2023 18:34:25 +0000 (15:34 -0300)
Replace tasklets by work queues for the three main rxe tasklets:
rxe_requester, rxe_completer and rxe_responder.

work queues are a more modern way to process work from an IRQ and provide
more control over how that work is run for future patches.

Link: https://lore.kernel.org/r/20230428171321.5774-1-rpearsonhpe@gmail.com
Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
Reviewed-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
Tested-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
Signed-off-by: Jason Gunthorpe <jgg@nvidia.com>
drivers/infiniband/sw/rxe/rxe.c
drivers/infiniband/sw/rxe/rxe_task.c
drivers/infiniband/sw/rxe/rxe_task.h

index 7a7e713..54c723a 100644 (file)
@@ -212,10 +212,16 @@ static int __init rxe_module_init(void)
 {
        int err;
 
-       err = rxe_net_init();
+       err = rxe_alloc_wq();
        if (err)
                return err;
 
+       err = rxe_net_init();
+       if (err) {
+               rxe_destroy_wq();
+               return err;
+       }
+
        rdma_link_register(&rxe_link_ops);
        pr_info("loaded\n");
        return 0;
@@ -226,6 +232,7 @@ static void __exit rxe_module_exit(void)
        rdma_link_unregister(&rxe_link_ops);
        ib_unregister_driver(RDMA_DRIVER_RXE);
        rxe_net_exit();
+       rxe_destroy_wq();
 
        pr_info("unloaded\n");
 }
index fb9a6bc..1501120 100644 (file)
@@ -6,8 +6,24 @@
 
 #include "rxe.h"
 
+static struct workqueue_struct *rxe_wq;
+
+int rxe_alloc_wq(void)
+{
+       rxe_wq = alloc_workqueue("rxe_wq", WQ_UNBOUND, WQ_MAX_ACTIVE);
+       if (!rxe_wq)
+               return -ENOMEM;
+
+       return 0;
+}
+
+void rxe_destroy_wq(void)
+{
+       destroy_workqueue(rxe_wq);
+}
+
 /* Check if task is idle i.e. not running, not scheduled in
- * tasklet queue and not draining. If so move to busy to
+ * work queue and not draining. If so move to busy to
  * reserve a slot in do_task() by setting to busy and taking
  * a qp reference to cover the gap from now until the task finishes.
  * state will move out of busy if task returns a non zero value
@@ -21,9 +37,6 @@ static bool __reserve_if_idle(struct rxe_task *task)
 {
        WARN_ON(rxe_read(task->qp) <= 0);
 
-       if (task->tasklet.state & BIT(TASKLET_STATE_SCHED))
-               return false;
-
        if (task->state == TASK_STATE_IDLE) {
                rxe_get(task->qp);
                task->state = TASK_STATE_BUSY;
@@ -38,7 +51,7 @@ static bool __reserve_if_idle(struct rxe_task *task)
 }
 
 /* check if task is idle or drained and not currently
- * scheduled in the tasklet queue. This routine is
+ * scheduled in the work queue. This routine is
  * called by rxe_cleanup_task or rxe_disable_task to
  * see if the queue is empty.
  * Context: caller should hold task->lock.
@@ -46,7 +59,7 @@ static bool __reserve_if_idle(struct rxe_task *task)
  */
 static bool __is_done(struct rxe_task *task)
 {
-       if (task->tasklet.state & BIT(TASKLET_STATE_SCHED))
+       if (work_pending(&task->work))
                return false;
 
        if (task->state == TASK_STATE_IDLE ||
@@ -77,23 +90,23 @@ static bool is_done(struct rxe_task *task)
  * schedules the task. They must call __reserve_if_idle to
  * move the task to busy before calling or scheduling.
  * The task can also be moved to drained or invalid
- * by calls to rxe-cleanup_task or rxe_disable_task.
+ * by calls to rxe_cleanup_task or rxe_disable_task.
  * In that case tasks which get here are not executed but
  * just flushed. The tasks are designed to look to see if
- * there is work to do and do part of it before returning
+ * there is work to do and then do part of it before returning
  * here with a return value of zero until all the work
- * has been consumed then it retuens a non-zero value.
+ * has been consumed then it returns a non-zero value.
  * The number of times the task can be run is limited by
  * max iterations so one task cannot hold the cpu forever.
+ * If the limit is hit and work remains the task is rescheduled.
  */
-static void do_task(struct tasklet_struct *t)
+static void do_task(struct rxe_task *task)
 {
-       int cont;
-       int ret;
-       struct rxe_task *task = from_tasklet(task, t, tasklet);
        unsigned int iterations;
        unsigned long flags;
        int resched = 0;
+       int cont;
+       int ret;
 
        WARN_ON(rxe_read(task->qp) <= 0);
 
@@ -115,25 +128,22 @@ static void do_task(struct tasklet_struct *t)
                } while (ret == 0 && iterations-- > 0);
 
                spin_lock_irqsave(&task->lock, flags);
+               /* we're not done yet but we ran out of iterations.
+                * yield the cpu and reschedule the task
+                */
+               if (!ret) {
+                       task->state = TASK_STATE_IDLE;
+                       resched = 1;
+                       goto exit;
+               }
+
                switch (task->state) {
                case TASK_STATE_BUSY:
-                       if (ret) {
-                               task->state = TASK_STATE_IDLE;
-                       } else {
-                               /* This can happen if the client
-                                * can add work faster than the
-                                * tasklet can finish it.
-                                * Reschedule the tasklet and exit
-                                * the loop to give up the cpu
-                                */
-                               task->state = TASK_STATE_IDLE;
-                               resched = 1;
-                       }
+                       task->state = TASK_STATE_IDLE;
                        break;
 
-               /* someone tried to run the task since the last time we called
-                * func, so we will call one more time regardless of the
-                * return value
+               /* someone tried to schedule the task while we
+                * were running, keep going
                 */
                case TASK_STATE_ARMED:
                        task->state = TASK_STATE_BUSY;
@@ -141,22 +151,24 @@ static void do_task(struct tasklet_struct *t)
                        break;
 
                case TASK_STATE_DRAINING:
-                       if (ret)
-                               task->state = TASK_STATE_DRAINED;
-                       else
-                               cont = 1;
+                       task->state = TASK_STATE_DRAINED;
                        break;
 
                default:
                        WARN_ON(1);
-                       rxe_info_qp(task->qp, "unexpected task state = %d", task->state);
+                       rxe_dbg_qp(task->qp, "unexpected task state = %d",
+                                  task->state);
+                       task->state = TASK_STATE_IDLE;
                }
 
+exit:
                if (!cont) {
                        task->num_done++;
                        if (WARN_ON(task->num_done != task->num_sched))
-                               rxe_err_qp(task->qp, "%ld tasks scheduled, %ld tasks done",
-                                          task->num_sched, task->num_done);
+                               rxe_dbg_qp(
+                                       task->qp,
+                                       "%ld tasks scheduled, %ld tasks done",
+                                       task->num_sched, task->num_done);
                }
                spin_unlock_irqrestore(&task->lock, flags);
        } while (cont);
@@ -169,6 +181,12 @@ static void do_task(struct tasklet_struct *t)
        rxe_put(task->qp);
 }
 
+/* wrapper around do_task to fix argument for work queue */
+static void do_work(struct work_struct *work)
+{
+       do_task(container_of(work, struct rxe_task, work));
+}
+
 int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
                  int (*func)(struct rxe_qp *))
 {
@@ -176,11 +194,9 @@ int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
 
        task->qp = qp;
        task->func = func;
-
-       tasklet_setup(&task->tasklet, do_task);
-
        task->state = TASK_STATE_IDLE;
        spin_lock_init(&task->lock);
+       INIT_WORK(&task->work, do_work);
 
        return 0;
 }
@@ -213,8 +229,6 @@ void rxe_cleanup_task(struct rxe_task *task)
        while (!is_done(task))
                cond_resched();
 
-       tasklet_kill(&task->tasklet);
-
        spin_lock_irqsave(&task->lock, flags);
        task->state = TASK_STATE_INVALID;
        spin_unlock_irqrestore(&task->lock, flags);
@@ -226,7 +240,7 @@ void rxe_cleanup_task(struct rxe_task *task)
 void rxe_run_task(struct rxe_task *task)
 {
        unsigned long flags;
-       int run;
+       bool run;
 
        WARN_ON(rxe_read(task->qp) <= 0);
 
@@ -235,11 +249,11 @@ void rxe_run_task(struct rxe_task *task)
        spin_unlock_irqrestore(&task->lock, flags);
 
        if (run)
-               do_task(&task->tasklet);
+               do_task(task);
 }
 
-/* schedule the task to run later as a tasklet.
- * the tasklet)schedule call can be called holding
+/* schedule the task to run later as a work queue entry.
+ * the queue_work call can be called holding
  * the lock.
  */
 void rxe_sched_task(struct rxe_task *task)
@@ -250,7 +264,7 @@ void rxe_sched_task(struct rxe_task *task)
 
        spin_lock_irqsave(&task->lock, flags);
        if (__reserve_if_idle(task))
-               tasklet_schedule(&task->tasklet);
+               queue_work(rxe_wq, &task->work);
        spin_unlock_irqrestore(&task->lock, flags);
 }
 
@@ -277,7 +291,9 @@ void rxe_disable_task(struct rxe_task *task)
        while (!is_done(task))
                cond_resched();
 
-       tasklet_disable(&task->tasklet);
+       spin_lock_irqsave(&task->lock, flags);
+       task->state = TASK_STATE_DRAINED;
+       spin_unlock_irqrestore(&task->lock, flags);
 }
 
 void rxe_enable_task(struct rxe_task *task)
@@ -291,7 +307,7 @@ void rxe_enable_task(struct rxe_task *task)
                spin_unlock_irqrestore(&task->lock, flags);
                return;
        }
+
        task->state = TASK_STATE_IDLE;
-       tasklet_enable(&task->tasklet);
        spin_unlock_irqrestore(&task->lock, flags);
 }
index facb7c8..a63e258 100644 (file)
@@ -22,7 +22,7 @@ enum {
  * called again.
  */
 struct rxe_task {
-       struct tasklet_struct   tasklet;
+       struct work_struct      work;
        int                     state;
        spinlock_t              lock;
        struct rxe_qp           *qp;
@@ -32,6 +32,10 @@ struct rxe_task {
        long                    num_done;
 };
 
+int rxe_alloc_wq(void);
+
+void rxe_destroy_wq(void);
+
 /*
  * init rxe_task structure
  *     qp  => parameter to pass to func