xfs: Do background CIL flushes via a workqueue
authorDave Chinner <david@fromorbit.com>
Mon, 23 Apr 2012 07:54:32 +0000 (17:54 +1000)
committerBen Myers <bpm@sgi.com>
Mon, 14 May 2012 21:20:34 +0000 (16:20 -0500)
Doing background CIL flushes adds significant latency to whatever
async transaction that triggers it. To avoid blocking async
transactions on things like waiting for log buffer IO to complete,
move the CIL push off into a workqueue.  By moving the push work
into a workqueue, we remove all the latency that the commit adds
from the foreground transaction commit path. This also means that
single threaded workloads won't do the CIL push procssing, leaving
them more CPU to do more async transactions.

To do this, we need to keep track of the sequence number we have
pushed work for. This avoids having many transaction commits
attempting to schedule work for the same sequence, and ensures that
we only ever have one push (background or forced) in progress at a
time. It also means that we don't need to take the CIL lock in write
mode to check for potential background push races, which reduces
lock contention.

To avoid potential issues with "smart" IO schedulers, don't use the
workqueue for log force triggered flushes. Instead, do them directly
so that the log IO is done directly by the process issuing the log
force and so doesn't get stuck on IO elevator queue idling
incorrectly delaying the log IO from the workqueue.

Signed-off-by: Dave Chinner <dchinner@redhat.com>
Reviewed-by: Mark Tinguely <tinguely@sgi.com>
Signed-off-by: Ben Myers <bpm@sgi.com>
fs/xfs/xfs_log_cil.c
fs/xfs/xfs_log_priv.h
fs/xfs/xfs_mount.h
fs/xfs/xfs_super.c

index d4fadbe..5fc7ec5 100644 (file)
 #include "xfs_discard.h"
 
 /*
- * Perform initial CIL structure initialisation.
- */
-int
-xlog_cil_init(
-       struct log      *log)
-{
-       struct xfs_cil  *cil;
-       struct xfs_cil_ctx *ctx;
-
-       cil = kmem_zalloc(sizeof(*cil), KM_SLEEP|KM_MAYFAIL);
-       if (!cil)
-               return ENOMEM;
-
-       ctx = kmem_zalloc(sizeof(*ctx), KM_SLEEP|KM_MAYFAIL);
-       if (!ctx) {
-               kmem_free(cil);
-               return ENOMEM;
-       }
-
-       INIT_LIST_HEAD(&cil->xc_cil);
-       INIT_LIST_HEAD(&cil->xc_committing);
-       spin_lock_init(&cil->xc_cil_lock);
-       init_rwsem(&cil->xc_ctx_lock);
-       init_waitqueue_head(&cil->xc_commit_wait);
-
-       INIT_LIST_HEAD(&ctx->committing);
-       INIT_LIST_HEAD(&ctx->busy_extents);
-       ctx->sequence = 1;
-       ctx->cil = cil;
-       cil->xc_ctx = ctx;
-       cil->xc_current_sequence = ctx->sequence;
-
-       cil->xc_log = log;
-       log->l_cilp = cil;
-       return 0;
-}
-
-void
-xlog_cil_destroy(
-       struct log      *log)
-{
-       if (log->l_cilp->xc_ctx) {
-               if (log->l_cilp->xc_ctx->ticket)
-                       xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
-               kmem_free(log->l_cilp->xc_ctx);
-       }
-
-       ASSERT(list_empty(&log->l_cilp->xc_cil));
-       kmem_free(log->l_cilp);
-}
-
-/*
  * Allocate a new ticket. Failing to get a new ticket makes it really hard to
  * recover, so we don't allow failure here. Also, we allocate in a context that
  * we don't want to be issuing transactions from, so we need to tell the
@@ -426,8 +374,7 @@ xlog_cil_committed(
  */
 STATIC int
 xlog_cil_push(
-       struct log              *log,
-       xfs_lsn_t               push_seq)
+       struct log              *log)
 {
        struct xfs_cil          *cil = log->l_cilp;
        struct xfs_log_vec      *lv;
@@ -443,39 +390,36 @@ xlog_cil_push(
        struct xfs_log_iovec    lhdr;
        struct xfs_log_vec      lvhdr = { NULL };
        xfs_lsn_t               commit_lsn;
+       xfs_lsn_t               push_seq;
 
        if (!cil)
                return 0;
 
-       ASSERT(!push_seq || push_seq <= cil->xc_ctx->sequence);
-
        new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_SLEEP|KM_NOFS);
        new_ctx->ticket = xlog_cil_ticket_alloc(log);
 
-       /*
-        * Lock out transaction commit, but don't block for background pushes
-        * unless we are well over the CIL space limit. See the definition of
-        * XLOG_CIL_HARD_SPACE_LIMIT() for the full explanation of the logic
-        * used here.
-        */
-       if (!down_write_trylock(&cil->xc_ctx_lock)) {
-               if (!push_seq &&
-                   cil->xc_ctx->space_used < XLOG_CIL_HARD_SPACE_LIMIT(log))
-                       goto out_free_ticket;
-               down_write(&cil->xc_ctx_lock);
-       }
+       down_write(&cil->xc_ctx_lock);
        ctx = cil->xc_ctx;
 
-       /* check if we've anything to push */
-       if (list_empty(&cil->xc_cil))
-               goto out_skip;
+       spin_lock(&cil->xc_cil_lock);
+       push_seq = cil->xc_push_seq;
+       ASSERT(push_seq <= ctx->sequence);
 
-       /* check for spurious background flush */
-       if (!push_seq && cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log))
+       /*
+        * Check if we've anything to push. If there is nothing, then we don't
+        * move on to a new sequence number and so we have to be able to push
+        * this sequence again later.
+        */
+       if (list_empty(&cil->xc_cil)) {
+               cil->xc_push_seq = 0;
+               spin_unlock(&cil->xc_cil_lock);
                goto out_skip;
+       }
+       spin_unlock(&cil->xc_cil_lock);
+
 
        /* check for a previously pushed seqeunce */
-       if (push_seq && push_seq < cil->xc_ctx->sequence)
+       if (push_seq < cil->xc_ctx->sequence)
                goto out_skip;
 
        /*
@@ -629,7 +573,6 @@ restart:
 
 out_skip:
        up_write(&cil->xc_ctx_lock);
-out_free_ticket:
        xfs_log_ticket_put(new_ctx->ticket);
        kmem_free(new_ctx);
        return 0;
@@ -641,6 +584,82 @@ out_abort:
        return XFS_ERROR(EIO);
 }
 
+static void
+xlog_cil_push_work(
+       struct work_struct      *work)
+{
+       struct xfs_cil          *cil = container_of(work, struct xfs_cil,
+                                                       xc_push_work);
+       xlog_cil_push(cil->xc_log);
+}
+
+/*
+ * We need to push CIL every so often so we don't cache more than we can fit in
+ * the log. The limit really is that a checkpoint can't be more than half the
+ * log (the current checkpoint is not allowed to overwrite the previous
+ * checkpoint), but commit latency and memory usage limit this to a smaller
+ * size.
+ */
+static void
+xlog_cil_push_background(
+       struct log      *log)
+{
+       struct xfs_cil  *cil = log->l_cilp;
+
+       /*
+        * The cil won't be empty because we are called while holding the
+        * context lock so whatever we added to the CIL will still be there
+        */
+       ASSERT(!list_empty(&cil->xc_cil));
+
+       /*
+        * don't do a background push if we haven't used up all the
+        * space available yet.
+        */
+       if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log))
+               return;
+
+       spin_lock(&cil->xc_cil_lock);
+       if (cil->xc_push_seq < cil->xc_current_sequence) {
+               cil->xc_push_seq = cil->xc_current_sequence;
+               queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work);
+       }
+       spin_unlock(&cil->xc_cil_lock);
+
+}
+
+static void
+xlog_cil_push_foreground(
+       struct log      *log,
+       xfs_lsn_t       push_seq)
+{
+       struct xfs_cil  *cil = log->l_cilp;
+
+       if (!cil)
+               return;
+
+       ASSERT(push_seq && push_seq <= cil->xc_current_sequence);
+
+       /* start on any pending background push to minimise wait time on it */
+       flush_work(&cil->xc_push_work);
+
+       /*
+        * If the CIL is empty or we've already pushed the sequence then
+        * there's no work we need to do.
+        */
+       spin_lock(&cil->xc_cil_lock);
+       if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) {
+               spin_unlock(&cil->xc_cil_lock);
+               return;
+       }
+
+       cil->xc_push_seq = push_seq;
+       spin_unlock(&cil->xc_cil_lock);
+
+       /* do the push now */
+       xlog_cil_push(log);
+}
+
 /*
  * Commit a transaction with the given vector to the Committed Item List.
  *
@@ -667,7 +686,6 @@ xfs_log_commit_cil(
 {
        struct log              *log = mp->m_log;
        int                     log_flags = 0;
-       int                     push = 0;
        struct xfs_log_vec      *log_vector;
 
        if (flags & XFS_TRANS_RELEASE_LOG_RES)
@@ -719,21 +737,9 @@ xfs_log_commit_cil(
         */
        xfs_trans_free_items(tp, *commit_lsn, 0);
 
-       /* check for background commit before unlock */
-       if (log->l_cilp->xc_ctx->space_used > XLOG_CIL_SPACE_LIMIT(log))
-               push = 1;
+       xlog_cil_push_background(log);
 
        up_read(&log->l_cilp->xc_ctx_lock);
-
-       /*
-        * We need to push CIL every so often so we don't cache more than we
-        * can fit in the log. The limit really is that a checkpoint can't be
-        * more than half the log (the current checkpoint is not allowed to
-        * overwrite the previous checkpoint), but commit latency and memory
-        * usage limit this to a smaller size in most cases.
-        */
-       if (push)
-               xlog_cil_push(log, 0);
        return 0;
 }
 
@@ -746,9 +752,6 @@ xfs_log_commit_cil(
  *
  * We return the current commit lsn to allow the callers to determine if a
  * iclog flush is necessary following this call.
- *
- * XXX: Initially, just push the CIL unconditionally and return whatever
- * commit lsn is there. It'll be empty, so this is broken for now.
  */
 xfs_lsn_t
 xlog_cil_force_lsn(
@@ -766,8 +769,7 @@ xlog_cil_force_lsn(
         * xlog_cil_push() handles racing pushes for the same sequence,
         * so no need to deal with it here.
         */
-       if (sequence == cil->xc_current_sequence)
-               xlog_cil_push(log, sequence);
+       xlog_cil_push_foreground(log, sequence);
 
        /*
         * See if we can find a previous sequence still committing.
@@ -826,3 +828,57 @@ xfs_log_item_in_current_chkpt(
                return false;
        return true;
 }
+
+/*
+ * Perform initial CIL structure initialisation.
+ */
+int
+xlog_cil_init(
+       struct log      *log)
+{
+       struct xfs_cil  *cil;
+       struct xfs_cil_ctx *ctx;
+
+       cil = kmem_zalloc(sizeof(*cil), KM_SLEEP|KM_MAYFAIL);
+       if (!cil)
+               return ENOMEM;
+
+       ctx = kmem_zalloc(sizeof(*ctx), KM_SLEEP|KM_MAYFAIL);
+       if (!ctx) {
+               kmem_free(cil);
+               return ENOMEM;
+       }
+
+       INIT_WORK(&cil->xc_push_work, xlog_cil_push_work);
+       INIT_LIST_HEAD(&cil->xc_cil);
+       INIT_LIST_HEAD(&cil->xc_committing);
+       spin_lock_init(&cil->xc_cil_lock);
+       init_rwsem(&cil->xc_ctx_lock);
+       init_waitqueue_head(&cil->xc_commit_wait);
+
+       INIT_LIST_HEAD(&ctx->committing);
+       INIT_LIST_HEAD(&ctx->busy_extents);
+       ctx->sequence = 1;
+       ctx->cil = cil;
+       cil->xc_ctx = ctx;
+       cil->xc_current_sequence = ctx->sequence;
+
+       cil->xc_log = log;
+       log->l_cilp = cil;
+       return 0;
+}
+
+void
+xlog_cil_destroy(
+       struct log      *log)
+{
+       if (log->l_cilp->xc_ctx) {
+               if (log->l_cilp->xc_ctx->ticket)
+                       xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
+               kmem_free(log->l_cilp->xc_ctx);
+       }
+
+       ASSERT(list_empty(&log->l_cilp->xc_cil));
+       kmem_free(log->l_cilp);
+}
+
index 2152900..735ff1e 100644 (file)
@@ -417,6 +417,8 @@ struct xfs_cil {
        struct list_head        xc_committing;
        wait_queue_head_t       xc_commit_wait;
        xfs_lsn_t               xc_current_sequence;
+       struct work_struct      xc_push_work;
+       xfs_lsn_t               xc_push_seq;
 };
 
 /*
index 19fd5ed..8b89c5a 100644 (file)
@@ -214,6 +214,7 @@ typedef struct xfs_mount {
 
        struct workqueue_struct *m_data_workqueue;
        struct workqueue_struct *m_unwritten_workqueue;
+       struct workqueue_struct *m_cil_workqueue;
 } xfs_mount_t;
 
 /*
index fa07b77..49197e2 100644 (file)
@@ -773,8 +773,14 @@ xfs_init_mount_workqueues(
        if (!mp->m_unwritten_workqueue)
                goto out_destroy_data_iodone_queue;
 
+       mp->m_cil_workqueue = alloc_workqueue("xfs-cil/%s",
+                       WQ_MEM_RECLAIM, 0, mp->m_fsname);
+       if (!mp->m_cil_workqueue)
+               goto out_destroy_unwritten;
        return 0;
 
+out_destroy_unwritten:
+       destroy_workqueue(mp->m_unwritten_workqueue);
 out_destroy_data_iodone_queue:
        destroy_workqueue(mp->m_data_workqueue);
 out:
@@ -785,6 +791,7 @@ STATIC void
 xfs_destroy_mount_workqueues(
        struct xfs_mount        *mp)
 {
+       destroy_workqueue(mp->m_cil_workqueue);
        destroy_workqueue(mp->m_data_workqueue);
        destroy_workqueue(mp->m_unwritten_workqueue);
 }