Introduce ParallelExecutor (#5171)
author이한종/On-Device Lab(SR)/Engineer/삼성전자 <hanjoung.lee@samsung.com>
Wed, 15 May 2019 09:43:37 +0000 (18:43 +0900)
committer박세희/On-Device Lab(SR)/Principal Engineer/삼성전자 <saehie.park@samsung.com>
Wed, 15 May 2019 09:43:37 +0000 (18:43 +0900)
This commit introduces ParallelExecutor without usage.

Draft PR : #4920

Signed-off-by: Hanjoung Lee <hanjoung.lee@samsung.com>
runtimes/neurun/core/src/exec/ParallelExecutor.cc [new file with mode: 0644]
runtimes/neurun/core/src/exec/ParallelExecutor.h [new file with mode: 0644]

diff --git a/runtimes/neurun/core/src/exec/ParallelExecutor.cc b/runtimes/neurun/core/src/exec/ParallelExecutor.cc
new file mode 100644 (file)
index 0000000..7eac44f
--- /dev/null
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ParallelExecutor.h"
+
+#include <cassert>
+
+#include "util/logging.h"
+#include "exec/IFunction.h"
+
+namespace neurun
+{
+namespace exec
+{
+
+class HookFunction : public IFunction
+{
+public:
+  HookFunction(IFunction *fn, std::function<void()> teardown) : _fn{fn}, _teardown{teardown} {}
+
+public:
+  void run() override
+  {
+    // TODO Introduce and call setup() function here
+    _fn->run();
+    _teardown();
+  }
+
+private:
+  IFunction *_fn;
+  std::function<void()> _teardown;
+};
+
+void ParallelExecutor::notify(const model::OperandIndexSequence &operands)
+{
+  std::unique_lock<std::mutex> lock{_mu_jobs};
+
+  auto itr = _waiting_jobs.begin();
+  while (itr != _waiting_jobs.end())
+  {
+    auto &&job = *itr;
+
+    job->onNotified(operands);
+
+    if (job->ready())
+    {
+      _ready_jobs.push(std::move(job));
+      itr = _waiting_jobs.erase(itr);
+    }
+    else
+    {
+      ++itr;
+    }
+  }
+  lock.unlock();
+  _cv_jobs.notify_all();
+}
+
+ParallelExecutor::ParallelExecutor(const std::shared_ptr<const model::Model> &model,
+                                   std::unique_ptr<model::SubgraphContext> subg_ctx,
+                                   const std::shared_ptr<compiler::OperandContext> &operand_context,
+                                   std::unique_ptr<graph::LowerInfoMap> lower_info,
+                                   CodeMap &&code_map)
+    : ExecutorBase{model, std::move(subg_ctx), operand_context, std::move(lower_info)},
+      _code_map{std::move(code_map)}
+{
+  VERBOSE(ParallelExecutor) << "Constructing Parallel Executor" << std::endl;
+
+  // Create jobs
+  _subg_ctx->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &subg) {
+    VERBOSE(DataflowExecutor) << "Add a job #" << subg_index.value() << std::endl;
+    _finished_jobs.emplace_back(nnfw::cpp14::make_unique<Job>(
+        subg_index, _code_map.at(subg_index).get(),
+        _lower_info->operation.at(subg_index)->backend(), subg.getInputs(), subg.getOutputs()));
+  });
+
+  // Save operands that are initially ready
+  {
+    _model->operands.iterate([&](const model::OperandIndex &index, const model::Operand &object) {
+      if (object.getDef().size() == 0)
+      {
+        _initially_ready_operands.append(index);
+      }
+    });
+
+    for (auto index : _model->inputs)
+    {
+      _initially_ready_operands.append(index);
+    }
+  }
+}
+
+void ParallelExecutor::executeImpl()
+{
+  assert(_waiting_jobs.empty());
+
+  // Init scheduler
+  // TODO Consider to have distinct backend set in LowerInfoMap
+  graph::BackendSet backends;
+  for (auto &itr : _lower_info->operation)
+  {
+    backends.add(itr.second->backend());
+  }
+  _scheduler = nnfw::cpp14::make_unique<ParallelScheduler>(backends);
+
+  // Execution setup
+  _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
+  notify(_initially_ready_operands);
+
+  assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
+
+  while (!_ready_jobs.empty() || !_waiting_jobs.empty())
+  {
+    std::unique_lock<std::mutex> lock{_mu_jobs};
+    if (_ready_jobs.empty())
+    {
+      _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty(); });
+    }
+
+    VERBOSE(ParallelExecutor) << "waiting : " << _waiting_jobs.size() << " / "
+                              << "ready : " << _ready_jobs.size() << std::endl;
+    auto job = std::move(_ready_jobs.front());
+    _ready_jobs.pop();
+
+    lock.unlock();
+
+    VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index().value() << std::endl;
+
+    auto job_ptr = job.get();
+    auto teardown = [&, job_ptr]() { notify(job_ptr->outputs()); };
+
+    _scheduler->assign(nnfw::cpp14::make_unique<HookFunction>(job->fn(), teardown), job->backend());
+
+    _finished_jobs.push_back(std::move(job));
+  }
+
+  assert(_waiting_jobs.empty());
+
+  for (const auto &job : _finished_jobs)
+  {
+    job->reset();
+  }
+
+  _scheduler->finish();
+}
+
+} // namespace exec
+} // namespace neurun
diff --git a/runtimes/neurun/core/src/exec/ParallelExecutor.h b/runtimes/neurun/core/src/exec/ParallelExecutor.h
new file mode 100644 (file)
index 0000000..fa5c4b5
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __NEURUN_EXEC_PARALLEL_EXECUTOR_H__
+#define __NEURUN_EXEC_PARALLEL_EXECUTOR_H__
+
+#include <list>
+#include <queue>
+#include <unordered_map>
+
+#include "FunctionSequence.h"
+#include "Job.h"
+#include "model/OperandIndexSequence.h"
+#include "model/Index.h"
+#include "model/Model.h"
+#include "cpp14/memory.h"
+#include "exec/ExecutorBase.h"
+#include "ParallelScheduler.h"
+
+namespace neurun
+{
+namespace exec
+{
+
+/**
+ * @brief Class to execute Graph in parallel
+ */
+class ParallelExecutor : public ExecutorBase
+{
+public:
+  using CodeMap = std::unordered_map<model::SubgraphIndex, std::unique_ptr<FunctionSequence>>;
+
+private:
+  void notify(const model::OperandIndexSequence &operands);
+
+public:
+  /**
+   * @brief Constructs a ParallelExecutor object
+   *
+   * @param model Model object
+   * @param operand_context (Only for input/output operand data access)
+   * @param lower_info LowerInfo object (Only to know input/output operands layout)
+   * @param code_map Compiled code map
+   */
+  ParallelExecutor(const std::shared_ptr<const model::Model> &model,
+                   std::unique_ptr<model::SubgraphContext> subg_ctx,
+                   const std::shared_ptr<compiler::OperandContext> &operand_context,
+                   std::unique_ptr<graph::LowerInfoMap> lower_info, CodeMap &&code_map);
+
+  void executeImpl() override;
+
+private:
+  CodeMap _code_map;
+  model::OperandIndexSequence _initially_ready_operands;
+  /**
+   * @brief A list of finished jobs for current execution
+   *        After a run it has all the jobs of this execution for the next run
+   */
+  std::list<std::unique_ptr<Job>> _finished_jobs;
+  /**
+   * @brief A list of waiting jobs for current execution
+   *        All the jobs are moved from #_finished_jobs to it when start a run
+   */
+  std::list<std::unique_ptr<Job>> _waiting_jobs;
+  /**
+   * @brief A list of jobs that are ready for current execution
+   *        Jobs in it are ready to be scheduled
+   */
+  std::queue<std::unique_ptr<Job>> _ready_jobs;
+  std::condition_variable _cv_jobs;
+  std::mutex _mu_jobs;
+  std::unique_ptr<ParallelScheduler> _scheduler;
+};
+
+} // namespace exec
+} // namespace neurun
+
+#endif // __NEURUN_EXEC_PARALLEL_EXECUTOR_H__