xfs: multithreaded iwalk implementation
authorDarrick J. Wong <darrick.wong@oracle.com>
Wed, 3 Jul 2019 14:33:26 +0000 (07:33 -0700)
committerDarrick J. Wong <darrick.wong@oracle.com>
Wed, 3 Jul 2019 14:33:26 +0000 (07:33 -0700)
Create a parallel iwalk implementation and switch quotacheck to use it.

Signed-off-by: Darrick J. Wong <darrick.wong@oracle.com>
Reviewed-by: Brian Foster <bfoster@redhat.com>
fs/xfs/Makefile
fs/xfs/xfs_globals.c
fs/xfs/xfs_iwalk.c
fs/xfs/xfs_iwalk.h
fs/xfs/xfs_pwork.c [new file with mode: 0644]
fs/xfs/xfs_pwork.h [new file with mode: 0644]
fs/xfs/xfs_qm.c
fs/xfs/xfs_sysctl.h
fs/xfs/xfs_sysfs.c
fs/xfs/xfs_trace.h

index e4dc99d..b74a471 100644 (file)
@@ -85,6 +85,7 @@ xfs-y                         += xfs_aops.o \
                                   xfs_message.o \
                                   xfs_mount.o \
                                   xfs_mru_cache.o \
+                                  xfs_pwork.o \
                                   xfs_reflink.o \
                                   xfs_stats.o \
                                   xfs_super.o \
index 4e4a7a2..fa55ab8 100644 (file)
@@ -40,4 +40,7 @@ struct xfs_globals xfs_globals = {
 #else
        .bug_on_assert          =       false,  /* assert failures WARN() */
 #endif
+#ifdef DEBUG
+       .pwork_threads          =       -1,     /* automatic thread detection */
+#endif
 };
index a090315..d610eef 100644 (file)
@@ -20,6 +20,7 @@
 #include "xfs_icache.h"
 #include "xfs_health.h"
 #include "xfs_trans.h"
+#include "xfs_pwork.h"
 
 /*
  * Walking Inodes in the Filesystem
@@ -45,6 +46,9 @@
  */
 
 struct xfs_iwalk_ag {
+       /* parallel work control data; will be null if single threaded */
+       struct xfs_pwork                pwork;
+
        struct xfs_mount                *mp;
        struct xfs_trans                *tp;
 
@@ -182,6 +186,9 @@ xfs_iwalk_ag_recs(
 
                trace_xfs_iwalk_ag_rec(mp, agno, irec);
 
+               if (xfs_pwork_want_abort(&iwag->pwork))
+                       return 0;
+
                if (iwag->inobt_walk_fn) {
                        error = iwag->inobt_walk_fn(mp, tp, agno, irec,
                                        iwag->data);
@@ -193,6 +200,9 @@ xfs_iwalk_ag_recs(
                        continue;
 
                for (j = 0; j < XFS_INODES_PER_CHUNK; j++) {
+                       if (xfs_pwork_want_abort(&iwag->pwork))
+                               return 0;
+
                        /* Skip if this inode is free */
                        if (XFS_INOBT_MASK(j) & irec->ir_free)
                                continue;
@@ -387,6 +397,8 @@ xfs_iwalk_ag(
                struct xfs_inobt_rec_incore     *irec;
 
                cond_resched();
+               if (xfs_pwork_want_abort(&iwag->pwork))
+                       goto out;
 
                /* Fetch the inobt record. */
                irec = &iwag->recs[iwag->nr_recs];
@@ -520,6 +532,7 @@ xfs_iwalk(
                .sz_recs        = xfs_iwalk_prefetch(inode_records),
                .trim_start     = 1,
                .skip_empty     = 1,
+               .pwork          = XFS_PWORK_SINGLE_THREADED,
        };
        xfs_agnumber_t          agno = XFS_INO_TO_AGNO(mp, startino);
        int                     error;
@@ -541,6 +554,74 @@ xfs_iwalk(
        return error;
 }
 
+/* Run per-thread iwalk work. */
+static int
+xfs_iwalk_ag_work(
+       struct xfs_mount        *mp,
+       struct xfs_pwork        *pwork)
+{
+       struct xfs_iwalk_ag     *iwag;
+       int                     error = 0;
+
+       iwag = container_of(pwork, struct xfs_iwalk_ag, pwork);
+       if (xfs_pwork_want_abort(pwork))
+               goto out;
+
+       error = xfs_iwalk_alloc(iwag);
+       if (error)
+               goto out;
+
+       error = xfs_iwalk_ag(iwag);
+       xfs_iwalk_free(iwag);
+out:
+       kmem_free(iwag);
+       return error;
+}
+
+/*
+ * Walk all the inodes in the filesystem using multiple threads to process each
+ * AG.
+ */
+int
+xfs_iwalk_threaded(
+       struct xfs_mount        *mp,
+       xfs_ino_t               startino,
+       xfs_iwalk_fn            iwalk_fn,
+       unsigned int            inode_records,
+       void                    *data)
+{
+       struct xfs_pwork_ctl    pctl;
+       xfs_agnumber_t          agno = XFS_INO_TO_AGNO(mp, startino);
+       unsigned int            nr_threads;
+       int                     error;
+
+       ASSERT(agno < mp->m_sb.sb_agcount);
+
+       nr_threads = xfs_pwork_guess_datadev_parallelism(mp);
+       error = xfs_pwork_init(mp, &pctl, xfs_iwalk_ag_work, "xfs_iwalk",
+                       nr_threads);
+       if (error)
+               return error;
+
+       for (; agno < mp->m_sb.sb_agcount; agno++) {
+               struct xfs_iwalk_ag     *iwag;
+
+               if (xfs_pwork_ctl_want_abort(&pctl))
+                       break;
+
+               iwag = kmem_zalloc(sizeof(struct xfs_iwalk_ag), KM_SLEEP);
+               iwag->mp = mp;
+               iwag->iwalk_fn = iwalk_fn;
+               iwag->data = data;
+               iwag->startino = startino;
+               iwag->sz_recs = xfs_iwalk_prefetch(inode_records);
+               xfs_pwork_queue(&pctl, &iwag->pwork);
+               startino = XFS_AGINO_TO_INO(mp, agno + 1, 0);
+       }
+
+       return xfs_pwork_destroy(&pctl);
+}
+
 /*
  * Allow callers to cache up to a page's worth of inobt records.  This reflects
  * the existing inumbers prefetching behavior.  Since the inobt walk does not
@@ -601,6 +682,7 @@ xfs_inobt_walk(
                .data           = data,
                .startino       = startino,
                .sz_recs        = xfs_inobt_walk_prefetch(inobt_records),
+               .pwork          = XFS_PWORK_SINGLE_THREADED,
        };
        xfs_agnumber_t          agno = XFS_INO_TO_AGNO(mp, startino);
        int                     error;
index 94fad06..22c3176 100644 (file)
@@ -15,6 +15,8 @@ typedef int (*xfs_iwalk_fn)(struct xfs_mount *mp, struct xfs_trans *tp,
 
 int xfs_iwalk(struct xfs_mount *mp, struct xfs_trans *tp, xfs_ino_t startino,
                xfs_iwalk_fn iwalk_fn, unsigned int inode_records, void *data);
+int xfs_iwalk_threaded(struct xfs_mount *mp, xfs_ino_t startino,
+               xfs_iwalk_fn iwalk_fn, unsigned int inode_records, void *data);
 
 /* Walk all inode btree records in the filesystem starting from @startino. */
 typedef int (*xfs_inobt_walk_fn)(struct xfs_mount *mp, struct xfs_trans *tp,
diff --git a/fs/xfs/xfs_pwork.c b/fs/xfs/xfs_pwork.c
new file mode 100644 (file)
index 0000000..752c897
--- /dev/null
@@ -0,0 +1,117 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (C) 2019 Oracle.  All Rights Reserved.
+ * Author: Darrick J. Wong <darrick.wong@oracle.com>
+ */
+#include "xfs.h"
+#include "xfs_fs.h"
+#include "xfs_shared.h"
+#include "xfs_format.h"
+#include "xfs_log_format.h"
+#include "xfs_trans_resv.h"
+#include "xfs_mount.h"
+#include "xfs_trace.h"
+#include "xfs_sysctl.h"
+#include "xfs_pwork.h"
+
+/*
+ * Parallel Work Queue
+ * ===================
+ *
+ * Abstract away the details of running a large and "obviously" parallelizable
+ * task across multiple CPUs.  Callers initialize the pwork control object with
+ * a desired level of parallelization and a work function.  Next, they embed
+ * struct xfs_pwork in whatever structure they use to pass work context to a
+ * worker thread and queue that pwork.  The work function will be passed the
+ * pwork item when it is run (from process context) and any returned error will
+ * be recorded in xfs_pwork_ctl.error.  Work functions should check for errors
+ * and abort if necessary; the non-zeroness of xfs_pwork_ctl.error does not
+ * stop workqueue item processing.
+ *
+ * This is the rough equivalent of the xfsprogs workqueue code, though we can't
+ * reuse that name here.
+ */
+
+/* Invoke our caller's function. */
+static void
+xfs_pwork_work(
+       struct work_struct      *work)
+{
+       struct xfs_pwork        *pwork;
+       struct xfs_pwork_ctl    *pctl;
+       int                     error;
+
+       pwork = container_of(work, struct xfs_pwork, work);
+       pctl = pwork->pctl;
+       error = pctl->work_fn(pctl->mp, pwork);
+       if (error && !pctl->error)
+               pctl->error = error;
+}
+
+/*
+ * Set up control data for parallel work.  @work_fn is the function that will
+ * be called.  @tag will be written into the kernel threads.  @nr_threads is
+ * the level of parallelism desired, or 0 for no limit.
+ */
+int
+xfs_pwork_init(
+       struct xfs_mount        *mp,
+       struct xfs_pwork_ctl    *pctl,
+       xfs_pwork_work_fn       work_fn,
+       const char              *tag,
+       unsigned int            nr_threads)
+{
+#ifdef DEBUG
+       if (xfs_globals.pwork_threads >= 0)
+               nr_threads = xfs_globals.pwork_threads;
+#endif
+       trace_xfs_pwork_init(mp, nr_threads, current->pid);
+
+       pctl->wq = alloc_workqueue("%s-%d", WQ_FREEZABLE, nr_threads, tag,
+                       current->pid);
+       if (!pctl->wq)
+               return -ENOMEM;
+       pctl->work_fn = work_fn;
+       pctl->error = 0;
+       pctl->mp = mp;
+
+       return 0;
+}
+
+/* Queue some parallel work. */
+void
+xfs_pwork_queue(
+       struct xfs_pwork_ctl    *pctl,
+       struct xfs_pwork        *pwork)
+{
+       INIT_WORK(&pwork->work, xfs_pwork_work);
+       pwork->pctl = pctl;
+       queue_work(pctl->wq, &pwork->work);
+}
+
+/* Wait for the work to finish and tear down the control structure. */
+int
+xfs_pwork_destroy(
+       struct xfs_pwork_ctl    *pctl)
+{
+       destroy_workqueue(pctl->wq);
+       pctl->wq = NULL;
+       return pctl->error;
+}
+
+/*
+ * Return the amount of parallelism that the data device can handle, or 0 for
+ * no limit.
+ */
+unsigned int
+xfs_pwork_guess_datadev_parallelism(
+       struct xfs_mount        *mp)
+{
+       struct xfs_buftarg      *btp = mp->m_ddev_targp;
+
+       /*
+        * For now we'll go with the most conservative setting possible,
+        * which is two threads for an SSD and 1 thread everywhere else.
+        */
+       return blk_queue_nonrot(btp->bt_bdev->bd_queue) ? 2 : 1;
+}
diff --git a/fs/xfs/xfs_pwork.h b/fs/xfs/xfs_pwork.h
new file mode 100644 (file)
index 0000000..99a9d21
--- /dev/null
@@ -0,0 +1,58 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+/*
+ * Copyright (C) 2019 Oracle.  All Rights Reserved.
+ * Author: Darrick J. Wong <darrick.wong@oracle.com>
+ */
+#ifndef __XFS_PWORK_H__
+#define __XFS_PWORK_H__
+
+struct xfs_pwork;
+struct xfs_mount;
+
+typedef int (*xfs_pwork_work_fn)(struct xfs_mount *mp, struct xfs_pwork *pwork);
+
+/*
+ * Parallel work coordination structure.
+ */
+struct xfs_pwork_ctl {
+       struct workqueue_struct *wq;
+       struct xfs_mount        *mp;
+       xfs_pwork_work_fn       work_fn;
+       int                     error;
+};
+
+/*
+ * Embed this parallel work control item inside your own work structure,
+ * then queue work with it.
+ */
+struct xfs_pwork {
+       struct work_struct      work;
+       struct xfs_pwork_ctl    *pctl;
+};
+
+#define XFS_PWORK_SINGLE_THREADED      { .pctl = NULL }
+
+/* Have we been told to abort? */
+static inline bool
+xfs_pwork_ctl_want_abort(
+       struct xfs_pwork_ctl    *pctl)
+{
+       return pctl && pctl->error;
+}
+
+/* Have we been told to abort? */
+static inline bool
+xfs_pwork_want_abort(
+       struct xfs_pwork        *pwork)
+{
+       return xfs_pwork_ctl_want_abort(pwork->pctl);
+}
+
+int xfs_pwork_init(struct xfs_mount *mp, struct xfs_pwork_ctl *pctl,
+               xfs_pwork_work_fn work_fn, const char *tag,
+               unsigned int nr_threads);
+void xfs_pwork_queue(struct xfs_pwork_ctl *pctl, struct xfs_pwork *pwork);
+int xfs_pwork_destroy(struct xfs_pwork_ctl *pctl);
+unsigned int xfs_pwork_guess_datadev_parallelism(struct xfs_mount *mp);
+
+#endif /* __XFS_PWORK_H__ */
index 588e36f..fb7a41f 100644 (file)
@@ -1300,7 +1300,7 @@ xfs_qm_quotacheck(
                flags |= XFS_PQUOTA_CHKD;
        }
 
-       error = xfs_iwalk(mp, NULL, 0, xfs_qm_dqusage_adjust, 0, NULL);
+       error = xfs_iwalk_threaded(mp, 0, xfs_qm_dqusage_adjust, 0, NULL);
        if (error)
                goto error_return;
 
index ad7f9be..8abf464 100644 (file)
@@ -82,6 +82,9 @@ enum {
 extern xfs_param_t     xfs_params;
 
 struct xfs_globals {
+#ifdef DEBUG
+       int     pwork_threads;          /* parallel workqueue threads */
+#endif
        int     log_recovery_delay;     /* log recovery delay (secs) */
        int     mount_delay;            /* mount setup delay (secs) */
        bool    bug_on_assert;          /* BUG() the kernel on assert failure */
index 688366d..ddd0bf7 100644 (file)
@@ -204,11 +204,51 @@ always_cow_show(
 }
 XFS_SYSFS_ATTR_RW(always_cow);
 
+#ifdef DEBUG
+/*
+ * Override how many threads the parallel work queue is allowed to create.
+ * This has to be a debug-only global (instead of an errortag) because one of
+ * the main users of parallel workqueues is mount time quotacheck.
+ */
+STATIC ssize_t
+pwork_threads_store(
+       struct kobject  *kobject,
+       const char      *buf,
+       size_t          count)
+{
+       int             ret;
+       int             val;
+
+       ret = kstrtoint(buf, 0, &val);
+       if (ret)
+               return ret;
+
+       if (val < -1 || val > num_possible_cpus())
+               return -EINVAL;
+
+       xfs_globals.pwork_threads = val;
+
+       return count;
+}
+
+STATIC ssize_t
+pwork_threads_show(
+       struct kobject  *kobject,
+       char            *buf)
+{
+       return snprintf(buf, PAGE_SIZE, "%d\n", xfs_globals.pwork_threads);
+}
+XFS_SYSFS_ATTR_RW(pwork_threads);
+#endif /* DEBUG */
+
 static struct attribute *xfs_dbg_attrs[] = {
        ATTR_LIST(bug_on_assert),
        ATTR_LIST(log_recovery_delay),
        ATTR_LIST(mount_delay),
        ATTR_LIST(always_cow),
+#ifdef DEBUG
+       ATTR_LIST(pwork_threads),
+#endif
        NULL,
 };
 
index e61d519..8094b19 100644 (file)
@@ -3557,6 +3557,24 @@ TRACE_EVENT(xfs_iwalk_ag_rec,
                  __entry->startino, __entry->freemask)
 )
 
+TRACE_EVENT(xfs_pwork_init,
+       TP_PROTO(struct xfs_mount *mp, unsigned int nr_threads, pid_t pid),
+       TP_ARGS(mp, nr_threads, pid),
+       TP_STRUCT__entry(
+               __field(dev_t, dev)
+               __field(unsigned int, nr_threads)
+               __field(pid_t, pid)
+       ),
+       TP_fast_assign(
+               __entry->dev = mp->m_super->s_dev;
+               __entry->nr_threads = nr_threads;
+               __entry->pid = pid;
+       ),
+       TP_printk("dev %d:%d nr_threads %u pid %u",
+                 MAJOR(__entry->dev), MINOR(__entry->dev),
+                 __entry->nr_threads, __entry->pid)
+)
+
 #endif /* _TRACE_XFS_H */
 
 #undef TRACE_INCLUDE_PATH