Introduce ParallelScheduler (#5155)
author이한종/On-Device Lab(SR)/Engineer/삼성전자 <hanjoung.lee@samsung.com>
Thu, 9 May 2019 05:09:37 +0000 (14:09 +0900)
committer이춘석/On-Device Lab(SR)/Staff Engineer/삼성전자 <chunseok.lee@samsung.com>
Thu, 9 May 2019 05:09:37 +0000 (14:09 +0900)
Introduce `ParallelScheduler` along with `Worker` and `ThreadPool`.

Part of #4920

Signed-off-by: Hanjoung Lee <hanjoung.lee@samsung.com>
runtimes/neurun/core/src/exec/ParallelScheduler.cc [new file with mode: 0644]
runtimes/neurun/core/src/exec/ParallelScheduler.h [new file with mode: 0644]

diff --git a/runtimes/neurun/core/src/exec/ParallelScheduler.cc b/runtimes/neurun/core/src/exec/ParallelScheduler.cc
new file mode 100644 (file)
index 0000000..9424a10
--- /dev/null
@@ -0,0 +1,154 @@
+#include "ParallelScheduler.h"
+
+#include <cassert>
+
+#include "cpp14/memory.h"
+#include "util/logging.h"
+
+namespace neurun
+{
+namespace exec
+{
+
+Worker::~Worker()
+{
+  {
+    std::unique_lock<std::mutex> lock(_mu);
+    _state = State::FORCE_FINISHING;
+  }
+  _cv.notify_all();
+}
+
+void Worker::operator()()
+{
+  while (true)
+  {
+    std::unique_ptr<IFunction> fn = nullptr;
+
+    {
+      std::unique_lock<std::mutex> lock{_mu};
+      _cv.wait(lock, [this] {
+        return (_state == State::FORCE_FINISHING) || (_state == State::FINISHING) ||
+               (_state == State::ONLINE && !_functions.empty());
+      });
+
+      if (_state == State::FORCE_FINISHING)
+      {
+        assert(_functions.empty() && "Terminating with unfinished jobs");
+        return;
+      }
+      else if (_state == State::FINISHING && _functions.empty())
+      {
+        return;
+      }
+      else
+      {
+        assert(((_state == State::FINISHING) || (_state == State::ONLINE)) && !_functions.empty());
+        fn = std::move(_functions.front());
+        _functions.pop();
+      }
+    }
+
+    assert(fn);
+    fn->run();
+  }
+}
+
+void Worker::enqueue(std::unique_ptr<IFunction> &&fn)
+{
+  {
+    std::unique_lock<std::mutex> lock{_mu};
+    _functions.emplace(std::move(fn));
+  }
+  _cv.notify_one();
+}
+
+void Worker::terminate()
+{
+  {
+    std::unique_lock<std::mutex> lock{_mu};
+    _state = State::FORCE_FINISHING;
+  }
+  _cv.notify_all();
+}
+
+void Worker::finish()
+{
+  {
+    std::unique_lock<std::mutex> lock{_mu};
+    _state = State::FINISHING;
+  }
+  _cv.notify_all();
+}
+
+uint32_t Worker::numJobsInQueue()
+{
+  std::unique_lock<std::mutex> lock{_mu};
+  return _functions.size();
+}
+
+ThreadPool::ThreadPool(uint32_t num_threads)
+{
+  assert(num_threads >= 1);
+
+  for (uint32_t i = 0; i < num_threads; i++)
+  {
+    _threads.emplace_back(std::ref(_worker));
+  }
+}
+
+ThreadPool::~ThreadPool()
+{
+  if (!_threads.empty())
+  {
+    _worker.terminate();
+    join();
+  }
+}
+
+void ThreadPool::enqueue(std::unique_ptr<IFunction> &&fn) { _worker.enqueue(std::move(fn)); }
+
+uint32_t ThreadPool::numJobsInQueue() { return _worker.numJobsInQueue(); }
+
+void ThreadPool::join()
+{
+  for (auto &thread : _threads)
+  {
+    thread.join();
+  }
+  _threads.clear();
+}
+
+void ThreadPool::finish()
+{
+  _worker.finish();
+  join();
+}
+
+ParallelScheduler::ParallelScheduler(const graph::BackendSet &backends)
+{
+  assert(!backends.empty());
+
+  for (auto backend : backends)
+  {
+    _thread_pools[backend] = nnfw::cpp14::make_unique<ThreadPool>();
+  }
+}
+
+void ParallelScheduler::assign(std::unique_ptr<IFunction> &&fn, const backend::Backend *backend)
+{
+  assert(!_thread_pools.empty());
+
+  _thread_pools.at(backend)->enqueue(std::move(fn));
+}
+
+void ParallelScheduler::finish()
+{
+  for (auto &itr : _thread_pools)
+  {
+    itr.second->finish();
+  }
+}
+
+} // namespace exec
+} // namespace neurun
diff --git a/runtimes/neurun/core/src/exec/ParallelScheduler.h b/runtimes/neurun/core/src/exec/ParallelScheduler.h
new file mode 100644 (file)
index 0000000..f4e7865
--- /dev/null
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __NEURUN_EXEC_PARALLEL_SCHEDULER_H__
+#define __NEURUN_EXEC_PARALLEL_SCHEDULER_H__
+
+#include <unordered_map>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <memory>
+#include <queue>
+#include <vector>
+#include <unordered_set>
+
+#include "exec/IFunction.h"
+#include "graph/BackendSet.h"
+
+namespace neurun
+{
+namespace exec
+{
+
+// TODO Extract this class to a separate file
+class Worker
+{
+public:
+  enum class State
+  {
+    ONLINE,
+    FINISHING,
+    FORCE_FINISHING
+  };
+
+public:
+  /**
+   * @brief Create Worker object
+   */
+  Worker() = default;
+  /**
+   * @brief Destroy Worker object
+   */
+  ~Worker();
+  /**
+   * @brief Thread entry function
+   */
+  void operator()();
+  /**
+   * @brief Push the given Task to the job queue
+   *
+   * @param fn Function to be executed(a job)
+   */
+  void enqueue(std::unique_ptr<IFunction> &&fn);
+  /**
+   * @brief Flag as terminating so all the worker threads can terminate
+   */
+  void terminate();
+  /**
+   * @brief Flag as terminating so all the worker threads can terminate
+   */
+  void finish();
+  /**
+   * @brief Check if it has pending jobs. Even if this returns fals, Worker threads may be still
+   * running
+   *
+   * @return true if the job queue not empty otherwise false
+   */
+  uint32_t numJobsInQueue();
+
+private:
+  State _state{State::ONLINE};
+  std::queue<std::unique_ptr<IFunction>> _functions;
+  std::mutex _mu;
+  std::condition_variable _cv;
+};
+
+// TODO Extract this class to a separate file
+class ThreadPool
+{
+public:
+  /**
+   * @brief Coustruct ThreadPool object
+   *
+   * @param num_threads Number of threads
+   */
+  ThreadPool(uint32_t num_threads = 1);
+  /**
+   * @brief Destroy ThreadPool object
+   */
+  ~ThreadPool();
+  /**
+   * @brief Enqueue a function
+   *
+   * @param fn A function to be queued
+   */
+  void enqueue(std::unique_ptr<IFunction> &&fn);
+  /**
+   * @brief Get number of jobs in worker's queue
+   *
+   * @return Number of jobs
+   */
+  uint32_t numJobsInQueue();
+
+  /**
+   * @brief Block until all jobs are finished
+   */
+  void finish();
+
+private:
+  void join();
+
+private:
+  Worker _worker;
+  std::vector<std::thread> _threads;
+};
+
+class ParallelScheduler
+{
+public:
+  /**
+   * @brief Constructs ParallelScheduler object
+   *
+   * @param backends Backend set
+   */
+  ParallelScheduler(const graph::BackendSet &backends);
+  /**
+   * @brief Assign a task to the given backend
+   *
+   * @param[in] fn Function to be assigned
+   * @param[in] fn Target backend
+   */
+  void assign(std::unique_ptr<IFunction> &&fn, const backend::Backend *backend);
+  /**
+   * @brief Block until all jobs are finished
+   */
+  void finish();
+
+private:
+  std::unordered_map<const backend::Backend *, std::unique_ptr<ThreadPool>> _thread_pools;
+};
+
+} // namespace exec
+} // namespace neurun
+
+#endif // __NEURUN_EXEC_PARALLEL_SCHEDULER_H__