Add sk_parallel_for()
authormtklein <mtklein@chromium.org>
Wed, 17 Jun 2015 22:26:15 +0000 (15:26 -0700)
committerCommit bot <commit-bot@chromium.org>
Wed, 17 Jun 2015 22:26:15 +0000 (15:26 -0700)
This should be a drop-in replacement for most for-loops to make them run in parallel:
   for (int i = 0; i < N; i++) { code... }
   ~~~>
   sk_parallel_for(N, [&](int i) { code... });

This is just syntax sugar over SkTaskGroup to make this use case really easy to write.
There's no more overhead that we weren't already forced to add using an interface like batch(),
and no extra heap allocations.

I've replaced 3 uses of SkTaskGroup with sk_parallel_for:
  1) My unit tests for SkOnce.
  2) Cary's path fuzzer.
  3) SkMultiPictureDraw.
Performance should be the same.  Please compare left and right for readability. :)

BUG=skia:

No public API changes.
TBR=reed@google.com

Review URL: https://codereview.chromium.org/1184373003

12 files changed:
include/core/SkMultiPictureDraw.h
samplecode/SamplePathFuzz.cpp
src/core/SkMultiPictureDraw.cpp
src/core/SkTaskGroup.cpp
src/core/SkTaskGroup.h
tests/LazyPtrTest.cpp
tests/OnceTest.cpp
tests/PathOpsExtendedTest.cpp
tests/PathOpsSkpClipTest.cpp
tests/PathOpsThreadedCommon.cpp
tests/SkpSkGrTest.cpp
tools/skpdiff/SkDiffContext.cpp

index 809e2df..8d24e61 100644 (file)
@@ -66,8 +66,6 @@ private:
         void draw();
 
         static void Reset(SkTDArray<DrawData>&);
-
-        static void Draw(DrawData* d) { d->draw(); }
     };
 
     SkTDArray<DrawData> fThreadSafeDrawData;
index f2595c5..55f7f41 100644 (file)
@@ -617,42 +617,31 @@ static bool contains_only_moveTo(const SkPath& path) {
 #include "SkTaskGroup.h"
 #include "SkTDArray.h"
 
-struct ThreadState {
-    int fSeed;
-    const SkBitmap* fBitmap;
-};
-
-static void test_fuzz(ThreadState* data) {
-    FuzzPath fuzzPath;
-    fuzzPath.setStrokeOnly();
-    fuzzPath.setSeed(data->fSeed);
-    fuzzPath.randomize();
-    const SkPath& path = fuzzPath.getPath();
-    const SkPaint& paint = fuzzPath.getPaint();
-    const SkImageInfo& info = data->fBitmap->info();
-    SkCanvas* canvas(SkCanvas::NewRasterDirect(info, data->fBitmap->getPixels(),
-            data->fBitmap->rowBytes()));
-    int w = info.width() / 4;
-    int h = info.height() / 4;
-    int x = data->fSeed / 4 % 4;
-    int y = data->fSeed % 4;
-    SkRect clipBounds = SkRect::MakeXYWH(SkIntToScalar(x) * w, SkIntToScalar(y) * h, 
-        SkIntToScalar(w), SkIntToScalar(h));
-    canvas->save();
-        canvas->clipRect(clipBounds);
-        canvas->translate(SkIntToScalar(x) * w, SkIntToScalar(y) * h);
-        canvas->drawPath(path, paint);
-    canvas->restore();
-}
-
 static void path_fuzz_stroker(SkBitmap* bitmap, int seed) {
-    ThreadState states[100];
-    for (size_t i = 0; i < SK_ARRAY_COUNT(states); i++) {
-        states[i].fSeed   = seed + (int) i;
-        states[i].fBitmap = bitmap;
-    }
-    SkTaskGroup tg;
-    tg.batch(test_fuzz, states, SK_ARRAY_COUNT(states));
+    sk_parallel_for(100, [&](int i) {
+        int localSeed = seed + i;
+
+        FuzzPath fuzzPath;
+        fuzzPath.setStrokeOnly();
+        fuzzPath.setSeed(localSeed);
+        fuzzPath.randomize();
+        const SkPath& path = fuzzPath.getPath();
+        const SkPaint& paint = fuzzPath.getPaint();
+        const SkImageInfo& info = bitmap->info();
+        SkCanvas* canvas(
+            SkCanvas::NewRasterDirect(info, bitmap->getPixels(), bitmap->rowBytes()));
+        int w = info.width() / 4;
+        int h = info.height() / 4;
+        int x = localSeed / 4 % 4;
+        int y = localSeed % 4;
+        SkRect clipBounds = SkRect::MakeXYWH(SkIntToScalar(x) * w, SkIntToScalar(y) * h,
+            SkIntToScalar(w), SkIntToScalar(h));
+        canvas->save();
+            canvas->clipRect(clipBounds);
+            canvas->translate(SkIntToScalar(x) * w, SkIntToScalar(y) * h);
+            canvas->drawPath(path, paint);
+        canvas->restore();
+    });
 }
 
 class PathFuzzView : public SampleView {
@@ -673,7 +662,7 @@ protected:
 
     void onOnceBeforeDraw() override {
         fIndex = 0;
-        SkImageInfo info(SkImageInfo::MakeN32Premul(SkScalarRoundToInt(width()), 
+        SkImageInfo info(SkImageInfo::MakeN32Premul(SkScalarRoundToInt(width()),
                 SkScalarRoundToInt(height())));
         offscreen.allocPixels(info);
         path_fuzz_stroker(&offscreen, fIndex);
index 22fd05f..286d3b6 100644 (file)
@@ -91,17 +91,16 @@ void SkMultiPictureDraw::draw(bool flush) {
 
 #ifdef FORCE_SINGLE_THREAD_DRAWING_FOR_TESTING
     for (int i = 0; i < fThreadSafeDrawData.count(); ++i) {
-        DrawData* dd = &fThreadSafeDrawData.begin()[i];
-        dd->fCanvas->drawPicture(dd->fPicture, &dd->fMatrix, dd->fPaint);
+        fThreadSafeDrawData[i].draw();
     }
 #else
-    // we place the taskgroup after the MPDReset, to ensure that we don't delete the DrawData
-    // objects until after we're finished the tasks (which have pointers to the data).
-    SkTaskGroup group;
-    group.batch(DrawData::Draw, fThreadSafeDrawData.begin(), fThreadSafeDrawData.count());
+    sk_parallel_for(fThreadSafeDrawData.count(), [&](int i) {
+        fThreadSafeDrawData[i].draw();
+    });
 #endif
-    // we deliberately don't call wait() here, since the destructor will do that, this allows us
-    // to continue processing gpu-data without having to wait on the cpu tasks.
+
+    // N.B. we could get going on any GPU work from this main thread while the CPU work runs.
+    // But in practice, we've either got GPU work or CPU work, not both.
 
     const int count = fGPUDrawData.count();
     if (0 == count) {
index 59319c1..4504def 100644 (file)
@@ -5,6 +5,7 @@
  * found in the LICENSE file.
  */
 
+#include "SkOnce.h"
 #include "SkRunnable.h"
 #include "SkSemaphore.h"
 #include "SkSpinlock.h"
 #include "SkThreadUtils.h"
 
 #if defined(SK_BUILD_FOR_WIN32)
-    static inline int num_cores() {
+    static void query_num_cores(int* num_cores) {
         SYSTEM_INFO sysinfo;
         GetSystemInfo(&sysinfo);
-        return sysinfo.dwNumberOfProcessors;
+        *num_cores = sysinfo.dwNumberOfProcessors;
     }
 #else
     #include <unistd.h>
-    static inline int num_cores() {
-        return (int) sysconf(_SC_NPROCESSORS_ONLN);
+    static void query_num_cores(int* num_cores) {
+        *num_cores = (int)sysconf(_SC_NPROCESSORS_ONLN);
     }
 #endif
 
+// We cache sk_num_cores() so we only query the OS once.
+SK_DECLARE_STATIC_ONCE(g_query_num_cores_once);
+int sk_num_cores() {
+    static int num_cores = 0;
+    SkOnce(&g_query_num_cores_once, query_num_cores, &num_cores);
+    SkASSERT(num_cores > 0);
+    return num_cores;
+}
+
 namespace {
 
 class ThreadPool : SkNoncopyable {
@@ -98,7 +108,7 @@ private:
 
     explicit ThreadPool(int threads) {
         if (threads == -1) {
-            threads = num_cores();
+            threads = sk_num_cores();
         }
         for (int i = 0; i < threads; i++) {
             fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
index 8c7369d..3af64d7 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "SkTypes.h"
 #include "SkAtomics.h"
+#include "SkTemplates.h"
 
 struct SkRunnable;
 
@@ -49,4 +50,42 @@ private:
     SkAtomic<int32_t> fPending;
 };
 
+// Returns best estimate of number of CPU cores available to use.
+int sk_num_cores();
+
+// Call f(i) for i in [0, end).
+template <typename Func>
+void sk_parallel_for(int end, const Func& f) {
+    if (end <= 0) { return; }
+
+    struct Chunk {
+        const Func* f;
+        int start, end;
+    };
+
+    // TODO(mtklein): this chunking strategy could probably use some tuning.
+    int max_chunks  = sk_num_cores() * 2,
+        stride      = (end + max_chunks - 1 ) / max_chunks,
+        nchunks     = (end + stride - 1 ) / stride;
+    SkASSERT(nchunks <= max_chunks);
+
+    // With the chunking strategy above this won't malloc until we have a machine with >512 cores.
+    SkAutoSTMalloc<1024, Chunk> chunks(nchunks);
+
+    for (int i = 0; i < nchunks; i++) {
+        Chunk& c = chunks[i];
+        c.f     = &f;
+        c.start = i * stride;
+        c.end   = SkTMin(c.start + stride, end);
+        SkASSERT(c.start < c.end);  // Nothing will break if start >= end, but it's a wasted chunk.
+    }
+
+    void(*run_chunk)(Chunk*) = [](Chunk* c) {
+        for (int i = c->start; i < c->end; i++) {
+            (*c->f)(i);
+        }
+    };
+    SkTaskGroup().batch(run_chunk, chunks.get(), nchunks);
+}
+
 #endif//SkTaskGroup_DEFINED
index 1b845bc..89443f9 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright 2014 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
 #include "Test.h"
 #include "SkLazyPtr.h"
 #include "SkRunnable.h"
@@ -44,37 +51,20 @@ DEF_TEST(LazyPtr, r) {
     SkDELETE(ptr);
 }
 
-namespace {
-
-struct Racer : public SkRunnable {
-    Racer() : fLazy(NULL), fSeen(NULL) {}
-
-    void run() override { fSeen = fLazy->get(); }
-
-    SkLazyPtr<int>* fLazy;
-    int* fSeen;
-};
-
-} // namespace
-
 DEF_TEST(LazyPtr_Threaded, r) {
     static const int kRacers = 321;
 
+    // Race to intialize the pointer by calling .get().
     SkLazyPtr<int> lazy;
+    int* seen[kRacers];
 
-    Racer racers[kRacers];
-    for (int i = 0; i < kRacers; i++) {
-        racers[i].fLazy = &lazy;
-    }
-
-    SkTaskGroup tg;
-    for (int i = 0; i < kRacers; i++) {
-        tg.add(racers + i);
-    }
-    tg.wait();
+    sk_parallel_for(kRacers, [&](int i) {
+        seen[i] = lazy.get();
+    });
 
+    // lazy.get() should return the same pointer to all threads.
     for (int i = 1; i < kRacers; i++) {
-        REPORTER_ASSERT(r, racers[i].fSeen);
-        REPORTER_ASSERT(r, racers[i].fSeen == racers[0].fSeen);
+        REPORTER_ASSERT(r, seen[i] != nullptr);
+        REPORTER_ASSERT(r, seen[i] == seen[0]);
     }
 }
index 034c5d9..35c2015 100644 (file)
@@ -28,42 +28,14 @@ DEF_TEST(SkOnce_Singlethreaded, r) {
     REPORTER_ASSERT(r, 5 == x);
 }
 
-static void add_six(int* x) {
-    *x += 6;
-}
-
-namespace {
-
-class Racer : public SkRunnable {
-public:
-    SkOnceFlag* once;
-    int* ptr;
-
-    void run() override {
-        SkOnce(once, add_six, ptr);
-    }
-};
-
-}  // namespace
-
 SK_DECLARE_STATIC_ONCE(mt_once);
 DEF_TEST(SkOnce_Multithreaded, r) {
-    const int kTasks = 16;
-
-    // Make a bunch of tasks that will race to be the first to add six to x.
-    Racer racers[kTasks];
     int x = 0;
-    for (int i = 0; i < kTasks; i++) {
-        racers[i].once = &mt_once;
-        racers[i].ptr = &x;
-    }
-
-    // Let them race.
-    SkTaskGroup tg;
-    for (int i = 0; i < kTasks; i++) {
-        tg.add(&racers[i]);
-    }
-    tg.wait();
+    // Run a bunch of tasks to be the first to add six to x.
+    sk_parallel_for(1021, [&](int) {
+        void(*add_six)(int*) = [](int* p) { *p += 6; };
+        SkOnce(&mt_once, add_six, &x);
+    });
 
     // Only one should have done the +=.
     REPORTER_ASSERT(r, 6 == x);
index a8079cf..df80f46 100644 (file)
@@ -14,7 +14,6 @@
 #include "SkPaint.h"
 #include "SkRTConf.h"
 #include "SkStream.h"
-#include "SkTaskGroup.h"
 #include "SkThread.h"
 
 #ifdef SK_BUILD_FOR_MAC
index 1f69ff0..7ea3184 100755 (executable)
@@ -303,10 +303,11 @@ TestRunner::~TestRunner() {
 }
 
 void TestRunner::render() {
-    SkTaskGroup tg;
-    for (int index = 0; index < fRunnables.count(); ++ index) {
-        tg.add(fRunnables[index]);
-    }
+    // TODO: this doesn't really need to use SkRunnables any more.
+    // We can just write the code to run in the for-loop directly.
+    sk_parallel_for(fRunnables.count(), [&](int i) {
+        fRunnables[i]->run();
+    });
 }
 
 ////////////////////////////////////////////////
index 0adde91..10501d1 100644 (file)
@@ -16,8 +16,7 @@ PathOpsThreadedTestRunner::~PathOpsThreadedTestRunner() {
 }
 
 void PathOpsThreadedTestRunner::render() {
-    SkTaskGroup tg;
-    for (int index = 0; index < fRunnables.count(); ++ index) {
-        tg.add(fRunnables[index]);
-    }
+    sk_parallel_for(fRunnables.count(), [&](int i) {
+        fRunnables[i]->run();
+    });
 }
index 2c3c5b7..212b0f6 100644 (file)
@@ -1,6 +1,9 @@
-#if !SK_SUPPORT_GPU
-#error "GPU support required"
-#endif
+/*
+ * Copyright 2013 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
 
 #include "GrContext.h"
 #include "GrContextFactory.h"
 #include "SkTime.h"
 #include "Test.h"
 
+#if !SK_SUPPORT_GPU
+#error "GPU support required"
+#endif
+
 #ifdef SK_BUILD_FOR_WIN
     #define PATH_SLASH "\\"
     #define IN_DIR "D:\\9-30-13\\"
@@ -162,10 +169,11 @@ SkpSkGrThreadedTestRunner::~SkpSkGrThreadedTestRunner() {
 }
 
 void SkpSkGrThreadedTestRunner::render() {
-    SkTaskGroup tg;
-    for (int index = 0; index < fRunnables.count(); ++ index) {
-        tg.add(fRunnables[index]);
-    }
+    // TODO: we don't really need to be using SkRunnables here anymore.
+    // We can just write the code we'd run right in the for loop.
+    sk_parallel_for(fRunnables.count(), [&](int i) {
+        fRunnables[i]->run();
+    });
 }
 
 ////////////////////////////////////////////////
index c422636..d145460 100644 (file)
@@ -209,26 +209,6 @@ void SkDiffContext::addDiff(const char* baselinePath, const char* testPath) {
     }
 }
 
-class SkThreadedDiff : public SkRunnable {
-public:
-    SkThreadedDiff() : fDiffContext(NULL) { }
-
-    void setup(SkDiffContext* diffContext, const SkString& baselinePath, const SkString& testPath) {
-        fDiffContext = diffContext;
-        fBaselinePath = baselinePath;
-        fTestPath = testPath;
-    }
-
-    void run() override {
-        fDiffContext->addDiff(fBaselinePath.c_str(), fTestPath.c_str());
-    }
-
-private:
-    SkDiffContext* fDiffContext;
-    SkString fBaselinePath;
-    SkString fTestPath;
-};
-
 void SkDiffContext::diffDirectories(const char baselinePath[], const char testPath[]) {
     // Get the files in the baseline, we will then look for those inside the test path
     SkTArray<SkString> baselineEntries;
@@ -237,12 +217,8 @@ void SkDiffContext::diffDirectories(const char baselinePath[], const char testPa
         return;
     }
 
-    SkTaskGroup tg;
-    SkTArray<SkThreadedDiff> runnableDiffs;
-    runnableDiffs.reset(baselineEntries.count());
-
-    for (int x = 0; x < baselineEntries.count(); x++) {
-        const char* baseFilename = baselineEntries[x].c_str();
+    sk_parallel_for(baselineEntries.count(), [&](int i) {
+        const char* baseFilename = baselineEntries[i].c_str();
 
         // Find the real location of each file to compare
         SkString baselineFile = SkOSPath::Join(baselinePath, baseFilename);
@@ -250,13 +226,11 @@ void SkDiffContext::diffDirectories(const char baselinePath[], const char testPa
 
         // Check that the test file exists and is a file
         if (sk_exists(testFile.c_str()) && !sk_isdir(testFile.c_str())) {
-            // Queue up the comparison with the differ
-            runnableDiffs[x].setup(this, baselineFile, testFile);
-            tg.add(&runnableDiffs[x]);
+            this->addDiff(baselineFile.c_str(), testFile.c_str());
         } else {
             SkDebugf("Baseline file \"%s\" has no corresponding test file\n", baselineFile.c_str());
         }
-    }
+    });
 }
 
 
@@ -281,15 +255,9 @@ void SkDiffContext::diffPatterns(const char baselinePattern[], const char testPa
         return;
     }
 
-    SkTaskGroup tg;
-    SkTArray<SkThreadedDiff> runnableDiffs;
-    runnableDiffs.reset(baselineEntries.count());
-
-    for (int x = 0; x < baselineEntries.count(); x++) {
-        runnableDiffs[x].setup(this, baselineEntries[x], testEntries[x]);
-        tg.add(&runnableDiffs[x]);
-    }
-    tg.wait();
+    sk_parallel_for(baselineEntries.count(), [&](int i) {
+        this->addDiff(baselineEntries[i].c_str(), testEntries[i].c_str());
+    });
 }
 
 void SkDiffContext::outputRecords(SkWStream& stream, bool useJSONP) {