Enhance DataflowExecutor dependency resolution (#5352)
author이한종/On-Device Lab(SR)/Engineer/삼성전자 <hanjoung.lee@samsung.com>
Mon, 10 Jun 2019 09:30:30 +0000 (18:30 +0900)
committer이춘석/On-Device Lab(SR)/Staff Engineer/삼성전자 <chunseok.lee@samsung.com>
Mon, 10 Jun 2019 09:30:30 +0000 (18:30 +0900)
Enhance inefficent dependency resolution to count-based resolution for
DataflowExecutor and ParallelExecutor.

Resolve #5223

Signed-off-by: Hanjoung Lee <hanjoung.lee@samsung.com>
runtimes/neurun/core/src/exec/DataflowExecutor.cc
runtimes/neurun/core/src/exec/DataflowExecutor.h
runtimes/neurun/core/src/exec/Job.cc
runtimes/neurun/core/src/exec/Job.h
runtimes/neurun/core/src/exec/ParallelExecutor.cc
runtimes/neurun/core/src/exec/ParallelExecutor.h

index 04f98ff..f66cd68 100644 (file)
@@ -25,27 +25,26 @@ namespace neurun
 namespace exec
 {
 
-void DataflowExecutor::notify(const model::OperandIndexSequence &operands)
+void DataflowExecutor::notify(uint32_t finished_job_id)
 {
-  auto itr = _waiting_jobs.begin();
-  while (itr != _waiting_jobs.end())
+  for (auto id : _output_info[finished_job_id])
   {
-    auto &&job = *itr;
-
-    job->onNotified(operands);
-
-    if (job->ready())
+    assert(_input_info[id] > 0);
+    auto count = --_input_info[id];
+    if (count == 0) // No dependent jobs left, ready for execution
     {
-      _ready_jobs.push(std::move(job));
-      itr = _waiting_jobs.erase(itr);
-    }
-    else
-    {
-      ++itr;
+      assert(_waiting_jobs[id] != nullptr);
+      _ready_jobs.push(std::move(_waiting_jobs[id]));
     }
   }
 }
 
+bool DataflowExecutor::noWaitingJobs()
+{
+  return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
+                     [](const std::unique_ptr<Job> &job) { return job == nullptr; });
+}
+
 DataflowExecutor::DataflowExecutor(const std::shared_ptr<const model::Model> &model,
                                    std::unique_ptr<model::SubgraphContext> subg_ctx,
                                    const std::shared_ptr<compiler::OperandContext> &operand_context,
@@ -58,56 +57,75 @@ DataflowExecutor::DataflowExecutor(const std::shared_ptr<const model::Model> &mo
 
   assert(_subg_ctx);
 
-  // Create jobs
-  _subg_ctx->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &subg) {
-    VERBOSE(DataflowExecutor) << "Add a job #" << subg_index.value() << std::endl;
-    _finished_jobs.emplace_back(nnfw::cpp14::make_unique<Job>(
-        subg_index, _code_map.at(subg_index).get(),
-        _lower_info->operation.at(subg_index)->backend(), subg.getInputs(), subg.getOutputs()));
+  // Assign jobs convert SubgraphIndex to job index(uint32_t)
+  uint32_t next_job_index = 0;
+  std::unordered_map<model::SubgraphIndex, uint32_t> subgraph_to_job;
+  _subg_ctx->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &) {
+    VERBOSE(DataflowExecutor) << "Create a job #" << next_job_index << " with SubgraphIndex "
+                              << subg_index.value() << std::endl;
+    _finished_jobs.emplace_back(
+        nnfw::cpp14::make_unique<Job>(next_job_index, _code_map.at(subg_index).get(),
+                                      _lower_info->operation.at(subg_index)->backend()));
+    subgraph_to_job[subg_index] = next_job_index++;
   });
 
-  // Save operands that are initially ready
-  {
-    _model->operands.iterate([&](const model::OperandIndex &index, const model::Operand &object) {
-      if (object.getDef().size() == 0)
-      {
-        _initially_ready_operands.append(index);
-      }
-    });
-
-    for (auto index : _model->inputs)
+  _waiting_jobs.resize(next_job_index);
+  _output_info.resize(next_job_index);
+  _initial_input_info.resize(next_job_index, 0);
+
+  _subg_ctx->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &subg) {
+    auto job_index = subgraph_to_job[subg_index];
+    for (auto output : subg.getOutputs())
     {
-      _initially_ready_operands.append(index);
+      // Update output and input info
+      _subg_ctx->iterate(
+          [&](const model::SubgraphIndex &subg_cur_index, const model::Subgraph &subg_cur) {
+            if (subg_cur.getInputs().contains(output))
+            {
+              auto dep_index = subgraph_to_job[subg_cur_index];
+              ++_initial_input_info[dep_index];
+              _output_info[job_index].push_back(dep_index);
+            }
+          });
     }
-  }
+  });
+
+  _input_info = _initial_input_info;
 }
 
 void DataflowExecutor::executeImpl()
 {
-  assert(_waiting_jobs.empty());
+  assert(noWaitingJobs());
 
   // Execution setup
   _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
-  notify(_initially_ready_operands);
 
+  for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
+  {
+    if (_input_info[i] == 0)
+    {
+      _ready_jobs.push(std::move(_waiting_jobs[i]));
+    }
+  }
   assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
 
   while (!_ready_jobs.empty())
   {
-    VERBOSE(DataflowExecutor) << "waiting : " << _waiting_jobs.size() << " / "
-                              << "ready : " << _ready_jobs.size() << std::endl;
     auto job = std::move(_ready_jobs.front());
     _ready_jobs.pop();
 
-    VERBOSE(DataflowExecutor) << "Run job #" << job->index().value() << std::endl;
+    VERBOSE(DataflowExecutor) << "Run job #" << job->index() << std::endl;
 
     job->run();
-    notify(job->outputs());
-
-    _finished_jobs.push_back(std::move(job));
+    auto job_index = job->index();
+    notify(job_index);
+    _finished_jobs[job_index] = std::move(job);
   }
 
-  assert(_waiting_jobs.empty());
+  assert(noWaitingJobs());
+
+  // Reset input info for the next execution
+  _input_info = _initial_input_info;
 }
 
 } // namespace exec
index 769feaf..6c7b3b1 100644 (file)
@@ -40,7 +40,8 @@ public:
   using CodeMap = std::unordered_map<model::SubgraphIndex, std::unique_ptr<FunctionSequence>>;
 
 protected:
-  virtual void notify(const model::OperandIndexSequence &operands);
+  virtual void notify(uint32_t finished_job_id);
+  bool noWaitingJobs();
 
 public:
   /**
@@ -60,17 +61,23 @@ public:
 
 protected:
   CodeMap _code_map;
-  model::OperandIndexSequence _initially_ready_operands;
   /**
-   * @brief A list of finished jobs for current execution
+   * @brief A vector of finished jobs for current execution
    *        After a run it has all the jobs of this execution for the next run
    */
-  std::list<std::unique_ptr<Job>> _finished_jobs;
+  std::vector<std::unique_ptr<Job>> _finished_jobs;
   /**
-   * @brief A list of waiting jobs for current execution
+   * @brief A vector of waiting jobs for current execution
    *        All the jobs are moved from #_finished_jobs to it when start a run
    */
-  std::list<std::unique_ptr<Job>> _waiting_jobs;
+  std::vector<std::unique_ptr<Job>> _waiting_jobs;
+  /**
+   * @brief Jobs' output info
+   *        Used for notifying after finishing a job
+   */
+  std::vector<std::list<uint32_t>> _output_info;
+  std::vector<uint32_t> _initial_input_info;
+  std::vector<uint32_t> _input_info;
   /**
    * @brief A list of jobs that are ready for current execution
    *        Jobs in it are ready to be scheduled
index 2690c86..6ce3a84 100644 (file)
@@ -25,49 +25,12 @@ namespace neurun
 namespace exec
 {
 
-Job::Job(const model::SubgraphIndex &index, IFunction *fn, const backend::Backend *backend,
-         const model::OperandIndexSequence &inputs, const model::OperandIndexSequence &outputs)
-    : _index{index}, _fn{fn}, _backend{backend}, _inputs(inputs), _outputs{outputs}
+Job::Job(uint32_t index, IFunction *fn, const backend::Backend *backend)
+    : _index{index}, _fn{fn}, _backend{backend}
 {
-  reset();
-}
-
-void Job::onNotified(const model::OperandIndex &operand)
-{
-  std::string s;
-  for (auto e : _blocking_inputs)
-  {
-    s += std::to_string(e.value());
-    s += " ";
-  }
-  VERBOSE(Job) << "Job #" << _index.value() << " notified operand : " << operand.value()
-               << " / blocking inputs { " << s << "}" << std::endl;
-
-  auto itr = _blocking_inputs.find(operand);
-  if (itr != _blocking_inputs.end())
-  {
-    _blocking_inputs.erase(itr);
-  }
-}
-
-void Job::onNotified(const model::OperandIndexSequence &operands)
-{
-  for (auto operand : operands)
-  {
-    onNotified(operand);
-  }
 }
 
 void Job::run() { _fn->run(); }
 
-void Job::reset()
-{
-  assert(_blocking_inputs.empty());
-  for (auto input : _inputs)
-  {
-    _blocking_inputs.insert(input);
-  }
-}
-
 } // namespace exec
 } // namespace neurun
index 7beda3e..108f39e 100644 (file)
@@ -40,42 +40,17 @@ public:
    * @param inputs Input operand list
    * @param outputs Output operand list
    */
-  Job(const model::SubgraphIndex &index, IFunction *fn, const backend::Backend *backend,
-      const model::OperandIndexSequence &inputs, const model::OperandIndexSequence &outputs);
-  /**
-   * @brief Remove the given operand from blocking input list
-   *
-   * @param operand Operand to be notified
-   */
-  void onNotified(const model::OperandIndex &operand);
-  /**
-   * @brief Remove the given operands from blocking input list
-   *
-   * @param operands Operands to be notified
-   */
-  void onNotified(const model::OperandIndexSequence &operands);
+  Job(uint32_t index, IFunction *fn, const backend::Backend *backend);
   /**
    * @brief Execute the compiled code
    */
   void run();
   /**
-   * @brief Check if this job is ready for execution
+   * @brief Return job index
    *
-   * @return @true if there is no blockin inputs
+   * @return Job index
    */
-  bool ready() const { return _blocking_inputs.empty(); }
-  /**
-   * @brief Return subgraph index
-   *
-   * @return Subgraph index
-   */
-  model::SubgraphIndex index() const { return _index; }
-  /**
-   * @brief Return output indices
-   *
-   * @return Output indices
-   */
-  const model::OperandIndexSequence &outputs() const { return _outputs; }
+  uint32_t index() const { return _index; }
   /**
    * @brief Return the function to be executed
    *
@@ -90,18 +65,10 @@ public:
    */
   const backend::Backend *backend() { return _backend; }
 
-  /**
-   * @brief Reset blocking inputs for the next execution
-   */
-  void reset();
-
 private:
-  model::SubgraphIndex _index;
+  uint32_t _index;
   IFunction *_fn;
   const backend::Backend *_backend;
-  std::unordered_set<model::OperandIndex> _blocking_inputs;
-  const model::OperandIndexSequence _inputs;
-  const model::OperandIndexSequence _outputs;
 };
 
 } // namespace exec
index e2c6998..1df40c5 100644 (file)
@@ -44,11 +44,11 @@ private:
   std::function<void()> _teardown;
 };
 
-void ParallelExecutor::notify(const model::OperandIndexSequence &operands)
+void ParallelExecutor::notify(uint32_t finished_job_id)
 {
   std::unique_lock<std::mutex> lock{_mu_jobs};
 
-  DataflowExecutor::notify(operands);
+  DataflowExecutor::notify(finished_job_id);
 
   lock.unlock();
   _cv_jobs.notify_all();
@@ -67,8 +67,6 @@ ParallelExecutor::ParallelExecutor(const std::shared_ptr<const model::Model> &mo
 
 void ParallelExecutor::executeImpl()
 {
-  assert(_waiting_jobs.empty());
-
   // Init scheduler
   // TODO Consider to have distinct backend set in LowerInfoMap
   graph::BackendSet backends;
@@ -78,13 +76,27 @@ void ParallelExecutor::executeImpl()
   }
   _scheduler = nnfw::cpp14::make_unique<ParallelScheduler>(backends);
 
+  assert(noWaitingJobs());
+
   // Execution setup
   _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
-  notify(_initially_ready_operands);
 
+  for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
+  {
+    VERBOSE(ParallelExecutor) << i << ": " << _input_info[i] << std::endl;
+    if (_input_info[i] == 0)
+    {
+      _ready_jobs.push(std::move(_waiting_jobs[i]));
+    }
+  }
   assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
 
-  while (!_ready_jobs.empty() || !_waiting_jobs.empty())
+  VERBOSE(ParallelExecutor) << "INITIAL JOBS : " << _ready_jobs.size() << std::endl;
+
+  while (!_ready_jobs.empty() ||
+         // TODO Enhance this not to iterate over the array
+         std::any_of(_waiting_jobs.begin(), _waiting_jobs.end(),
+                     [](const std::unique_ptr<Job> &job) { return job != nullptr; }))
   {
     std::unique_lock<std::mutex> lock{_mu_jobs};
     if (_ready_jobs.empty())
@@ -92,31 +104,28 @@ void ParallelExecutor::executeImpl()
       _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty(); });
     }
 
-    VERBOSE(ParallelExecutor) << "waiting : " << _waiting_jobs.size() << " / "
-                              << "ready : " << _ready_jobs.size() << std::endl;
     auto job = std::move(_ready_jobs.front());
     _ready_jobs.pop();
 
     lock.unlock();
 
-    VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index().value() << std::endl;
+    VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index() << std::endl;
 
-    auto job_ptr = job.get();
-    auto teardown = [&, job_ptr]() { notify(job_ptr->outputs()); };
+    auto job_index = job->index();
+    auto teardown = [&, job_index]() { notify(job_index); };
 
     _scheduler->assign(nnfw::cpp14::make_unique<HookFunction>(job->fn(), teardown), job->backend());
 
-    _finished_jobs.push_back(std::move(job));
+    _finished_jobs[job_index] = std::move(job);
   }
 
-  assert(_waiting_jobs.empty());
-
-  for (const auto &job : _finished_jobs)
-  {
-    job->reset();
-  }
+  assert(noWaitingJobs());
 
+  // Wait for all the jobs done
   _scheduler->finish();
+
+  // Reset input info for the next execution
+  _input_info = _initial_input_info;
 }
 
 } // namespace exec
index 46d844d..d23f3ab 100644 (file)
@@ -40,11 +40,8 @@ namespace exec
  */
 class ParallelExecutor : public DataflowExecutor
 {
-public:
-  using CodeMap = std::unordered_map<model::SubgraphIndex, std::unique_ptr<FunctionSequence>>;
-
 protected:
-  void notify(const model::OperandIndexSequence &operands) override;
+  void notify(uint32_t finished_job_id) override;
 
 public:
   /**