From 4766ea5380a5d174864e13fe1c4c2f87c339a4b9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?=EC=9D=B4=ED=95=9C=EC=A2=85/On-Device=20Lab=28SR=29/Enginee?= =?utf8?q?r/=EC=82=BC=EC=84=B1=EC=A0=84=EC=9E=90?= Date: Wed, 15 May 2019 18:43:37 +0900 Subject: [PATCH] Introduce ParallelExecutor (#5171) This commit introduces ParallelExecutor without usage. Draft PR : #4920 Signed-off-by: Hanjoung Lee --- runtimes/neurun/core/src/exec/ParallelExecutor.cc | 161 ++++++++++++++++++++++ runtimes/neurun/core/src/exec/ParallelExecutor.h | 91 ++++++++++++ 2 files changed, 252 insertions(+) create mode 100644 runtimes/neurun/core/src/exec/ParallelExecutor.cc create mode 100644 runtimes/neurun/core/src/exec/ParallelExecutor.h diff --git a/runtimes/neurun/core/src/exec/ParallelExecutor.cc b/runtimes/neurun/core/src/exec/ParallelExecutor.cc new file mode 100644 index 0000000..7eac44f --- /dev/null +++ b/runtimes/neurun/core/src/exec/ParallelExecutor.cc @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ParallelExecutor.h" + +#include + +#include "util/logging.h" +#include "exec/IFunction.h" + +namespace neurun +{ +namespace exec +{ + +class HookFunction : public IFunction +{ +public: + HookFunction(IFunction *fn, std::function teardown) : _fn{fn}, _teardown{teardown} {} + +public: + void run() override + { + // TODO Introduce and call setup() function here + _fn->run(); + _teardown(); + } + +private: + IFunction *_fn; + std::function _teardown; +}; + +void ParallelExecutor::notify(const model::OperandIndexSequence &operands) +{ + std::unique_lock lock{_mu_jobs}; + + auto itr = _waiting_jobs.begin(); + while (itr != _waiting_jobs.end()) + { + auto &&job = *itr; + + job->onNotified(operands); + + if (job->ready()) + { + _ready_jobs.push(std::move(job)); + itr = _waiting_jobs.erase(itr); + } + else + { + ++itr; + } + } + lock.unlock(); + _cv_jobs.notify_all(); +} + +ParallelExecutor::ParallelExecutor(const std::shared_ptr &model, + std::unique_ptr subg_ctx, + const std::shared_ptr &operand_context, + std::unique_ptr lower_info, + CodeMap &&code_map) + : ExecutorBase{model, std::move(subg_ctx), operand_context, std::move(lower_info)}, + _code_map{std::move(code_map)} +{ + VERBOSE(ParallelExecutor) << "Constructing Parallel Executor" << std::endl; + + // Create jobs + _subg_ctx->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &subg) { + VERBOSE(DataflowExecutor) << "Add a job #" << subg_index.value() << std::endl; + _finished_jobs.emplace_back(nnfw::cpp14::make_unique( + subg_index, _code_map.at(subg_index).get(), + _lower_info->operation.at(subg_index)->backend(), subg.getInputs(), subg.getOutputs())); + }); + + // Save operands that are initially ready + { + _model->operands.iterate([&](const model::OperandIndex &index, const model::Operand &object) { + if (object.getDef().size() == 0) + { + _initially_ready_operands.append(index); + } + }); + + for (auto index : _model->inputs) + { + _initially_ready_operands.append(index); + } + } +} + +void ParallelExecutor::executeImpl() +{ + assert(_waiting_jobs.empty()); + + // Init scheduler + // TODO Consider to have distinct backend set in LowerInfoMap + graph::BackendSet backends; + for (auto &itr : _lower_info->operation) + { + backends.add(itr.second->backend()); + } + _scheduler = nnfw::cpp14::make_unique(backends); + + // Execution setup + _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs + notify(_initially_ready_operands); + + assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs + + while (!_ready_jobs.empty() || !_waiting_jobs.empty()) + { + std::unique_lock lock{_mu_jobs}; + if (_ready_jobs.empty()) + { + _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty(); }); + } + + VERBOSE(ParallelExecutor) << "waiting : " << _waiting_jobs.size() << " / " + << "ready : " << _ready_jobs.size() << std::endl; + auto job = std::move(_ready_jobs.front()); + _ready_jobs.pop(); + + lock.unlock(); + + VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index().value() << std::endl; + + auto job_ptr = job.get(); + auto teardown = [&, job_ptr]() { notify(job_ptr->outputs()); }; + + _scheduler->assign(nnfw::cpp14::make_unique(job->fn(), teardown), job->backend()); + + _finished_jobs.push_back(std::move(job)); + } + + assert(_waiting_jobs.empty()); + + for (const auto &job : _finished_jobs) + { + job->reset(); + } + + _scheduler->finish(); +} + +} // namespace exec +} // namespace neurun diff --git a/runtimes/neurun/core/src/exec/ParallelExecutor.h b/runtimes/neurun/core/src/exec/ParallelExecutor.h new file mode 100644 index 0000000..fa5c4b5 --- /dev/null +++ b/runtimes/neurun/core/src/exec/ParallelExecutor.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __NEURUN_EXEC_PARALLEL_EXECUTOR_H__ +#define __NEURUN_EXEC_PARALLEL_EXECUTOR_H__ + +#include +#include +#include + +#include "FunctionSequence.h" +#include "Job.h" +#include "model/OperandIndexSequence.h" +#include "model/Index.h" +#include "model/Model.h" +#include "cpp14/memory.h" +#include "exec/ExecutorBase.h" +#include "ParallelScheduler.h" + +namespace neurun +{ +namespace exec +{ + +/** + * @brief Class to execute Graph in parallel + */ +class ParallelExecutor : public ExecutorBase +{ +public: + using CodeMap = std::unordered_map>; + +private: + void notify(const model::OperandIndexSequence &operands); + +public: + /** + * @brief Constructs a ParallelExecutor object + * + * @param model Model object + * @param operand_context (Only for input/output operand data access) + * @param lower_info LowerInfo object (Only to know input/output operands layout) + * @param code_map Compiled code map + */ + ParallelExecutor(const std::shared_ptr &model, + std::unique_ptr subg_ctx, + const std::shared_ptr &operand_context, + std::unique_ptr lower_info, CodeMap &&code_map); + + void executeImpl() override; + +private: + CodeMap _code_map; + model::OperandIndexSequence _initially_ready_operands; + /** + * @brief A list of finished jobs for current execution + * After a run it has all the jobs of this execution for the next run + */ + std::list> _finished_jobs; + /** + * @brief A list of waiting jobs for current execution + * All the jobs are moved from #_finished_jobs to it when start a run + */ + std::list> _waiting_jobs; + /** + * @brief A list of jobs that are ready for current execution + * Jobs in it are ready to be scheduled + */ + std::queue> _ready_jobs; + std::condition_variable _cv_jobs; + std::mutex _mu_jobs; + std::unique_ptr _scheduler; +}; + +} // namespace exec +} // namespace neurun + +#endif // __NEURUN_EXEC_PARALLEL_EXECUTOR_H__ -- 2.7.4