--- /dev/null
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ParallelExecutor.h"
+
+#include <cassert>
+
+#include "util/logging.h"
+#include "exec/IFunction.h"
+
+namespace neurun
+{
+namespace exec
+{
+
+class HookFunction : public IFunction
+{
+public:
+ HookFunction(IFunction *fn, std::function<void()> teardown) : _fn{fn}, _teardown{teardown} {}
+
+public:
+ void run() override
+ {
+ // TODO Introduce and call setup() function here
+ _fn->run();
+ _teardown();
+ }
+
+private:
+ IFunction *_fn;
+ std::function<void()> _teardown;
+};
+
+void ParallelExecutor::notify(const model::OperandIndexSequence &operands)
+{
+ std::unique_lock<std::mutex> lock{_mu_jobs};
+
+ auto itr = _waiting_jobs.begin();
+ while (itr != _waiting_jobs.end())
+ {
+ auto &&job = *itr;
+
+ job->onNotified(operands);
+
+ if (job->ready())
+ {
+ _ready_jobs.push(std::move(job));
+ itr = _waiting_jobs.erase(itr);
+ }
+ else
+ {
+ ++itr;
+ }
+ }
+ lock.unlock();
+ _cv_jobs.notify_all();
+}
+
+ParallelExecutor::ParallelExecutor(const std::shared_ptr<const model::Model> &model,
+ std::unique_ptr<model::SubgraphContext> subg_ctx,
+ const std::shared_ptr<compiler::OperandContext> &operand_context,
+ std::unique_ptr<graph::LowerInfoMap> lower_info,
+ CodeMap &&code_map)
+ : ExecutorBase{model, std::move(subg_ctx), operand_context, std::move(lower_info)},
+ _code_map{std::move(code_map)}
+{
+ VERBOSE(ParallelExecutor) << "Constructing Parallel Executor" << std::endl;
+
+ // Create jobs
+ _subg_ctx->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &subg) {
+ VERBOSE(DataflowExecutor) << "Add a job #" << subg_index.value() << std::endl;
+ _finished_jobs.emplace_back(nnfw::cpp14::make_unique<Job>(
+ subg_index, _code_map.at(subg_index).get(),
+ _lower_info->operation.at(subg_index)->backend(), subg.getInputs(), subg.getOutputs()));
+ });
+
+ // Save operands that are initially ready
+ {
+ _model->operands.iterate([&](const model::OperandIndex &index, const model::Operand &object) {
+ if (object.getDef().size() == 0)
+ {
+ _initially_ready_operands.append(index);
+ }
+ });
+
+ for (auto index : _model->inputs)
+ {
+ _initially_ready_operands.append(index);
+ }
+ }
+}
+
+void ParallelExecutor::executeImpl()
+{
+ assert(_waiting_jobs.empty());
+
+ // Init scheduler
+ // TODO Consider to have distinct backend set in LowerInfoMap
+ graph::BackendSet backends;
+ for (auto &itr : _lower_info->operation)
+ {
+ backends.add(itr.second->backend());
+ }
+ _scheduler = nnfw::cpp14::make_unique<ParallelScheduler>(backends);
+
+ // Execution setup
+ _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
+ notify(_initially_ready_operands);
+
+ assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
+
+ while (!_ready_jobs.empty() || !_waiting_jobs.empty())
+ {
+ std::unique_lock<std::mutex> lock{_mu_jobs};
+ if (_ready_jobs.empty())
+ {
+ _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty(); });
+ }
+
+ VERBOSE(ParallelExecutor) << "waiting : " << _waiting_jobs.size() << " / "
+ << "ready : " << _ready_jobs.size() << std::endl;
+ auto job = std::move(_ready_jobs.front());
+ _ready_jobs.pop();
+
+ lock.unlock();
+
+ VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index().value() << std::endl;
+
+ auto job_ptr = job.get();
+ auto teardown = [&, job_ptr]() { notify(job_ptr->outputs()); };
+
+ _scheduler->assign(nnfw::cpp14::make_unique<HookFunction>(job->fn(), teardown), job->backend());
+
+ _finished_jobs.push_back(std::move(job));
+ }
+
+ assert(_waiting_jobs.empty());
+
+ for (const auto &job : _finished_jobs)
+ {
+ job->reset();
+ }
+
+ _scheduler->finish();
+}
+
+} // namespace exec
+} // namespace neurun
--- /dev/null
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __NEURUN_EXEC_PARALLEL_EXECUTOR_H__
+#define __NEURUN_EXEC_PARALLEL_EXECUTOR_H__
+
+#include <list>
+#include <queue>
+#include <unordered_map>
+
+#include "FunctionSequence.h"
+#include "Job.h"
+#include "model/OperandIndexSequence.h"
+#include "model/Index.h"
+#include "model/Model.h"
+#include "cpp14/memory.h"
+#include "exec/ExecutorBase.h"
+#include "ParallelScheduler.h"
+
+namespace neurun
+{
+namespace exec
+{
+
+/**
+ * @brief Class to execute Graph in parallel
+ */
+class ParallelExecutor : public ExecutorBase
+{
+public:
+ using CodeMap = std::unordered_map<model::SubgraphIndex, std::unique_ptr<FunctionSequence>>;
+
+private:
+ void notify(const model::OperandIndexSequence &operands);
+
+public:
+ /**
+ * @brief Constructs a ParallelExecutor object
+ *
+ * @param model Model object
+ * @param operand_context (Only for input/output operand data access)
+ * @param lower_info LowerInfo object (Only to know input/output operands layout)
+ * @param code_map Compiled code map
+ */
+ ParallelExecutor(const std::shared_ptr<const model::Model> &model,
+ std::unique_ptr<model::SubgraphContext> subg_ctx,
+ const std::shared_ptr<compiler::OperandContext> &operand_context,
+ std::unique_ptr<graph::LowerInfoMap> lower_info, CodeMap &&code_map);
+
+ void executeImpl() override;
+
+private:
+ CodeMap _code_map;
+ model::OperandIndexSequence _initially_ready_operands;
+ /**
+ * @brief A list of finished jobs for current execution
+ * After a run it has all the jobs of this execution for the next run
+ */
+ std::list<std::unique_ptr<Job>> _finished_jobs;
+ /**
+ * @brief A list of waiting jobs for current execution
+ * All the jobs are moved from #_finished_jobs to it when start a run
+ */
+ std::list<std::unique_ptr<Job>> _waiting_jobs;
+ /**
+ * @brief A list of jobs that are ready for current execution
+ * Jobs in it are ready to be scheduled
+ */
+ std::queue<std::unique_ptr<Job>> _ready_jobs;
+ std::condition_variable _cv_jobs;
+ std::mutex _mu_jobs;
+ std::unique_ptr<ParallelScheduler> _scheduler;
+};
+
+} // namespace exec
+} // namespace neurun
+
+#endif // __NEURUN_EXEC_PARALLEL_EXECUTOR_H__