2 * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "ParallelExecutor.h"
21 #include "util/logging.h"
22 #include "exec/IFunction.h"
29 class HookFunction : public IFunction
32 HookFunction(IFunction *fn, const std::function<void()> &setup,
33 const std::function<void()> &teardown)
34 : _fn{fn}, _setup{setup}, _teardown{teardown}
48 std::function<void()> _setup;
49 std::function<void()> _teardown;
52 void ParallelExecutor::notify(uint32_t finished_job_id)
54 std::unique_lock<std::mutex> lock{_mu_jobs};
56 DataflowExecutor::notify(finished_job_id);
59 _cv_jobs.notify_all();
62 ParallelExecutor::ParallelExecutor(
63 std::unique_ptr<ir::LoweredGraph> lowered_graph,
64 const std::vector<std::shared_ptr<backend::ITensor>> &input_tensors,
65 const std::vector<std::shared_ptr<backend::ITensor>> &output_tensors,
66 const compiler::TensorBuilders &tensor_builders, compiler::CodeMap &&code_map)
67 : DataflowExecutor{std::move(lowered_graph), input_tensors, output_tensors, tensor_builders,
70 VERBOSE(ParallelExecutor) << "Constructing Parallel Executor" << std::endl;
73 void ParallelExecutor::executeImpl()
75 bool dynamic_input_exists = hasDynamicInput();
78 // TODO Consider to have distinct backend set in LowerInfoMap
80 for (auto &itr : _lowered_graph->getLowerInfo()->op_seq)
82 backends.add(itr.second->backend());
84 _scheduler = std::make_unique<ParallelScheduler>(backends);
86 assert(noWaitingJobs());
89 _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
91 for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
93 VERBOSE(ParallelExecutor) << i << ": " << _input_info[i] << std::endl;
94 if (_input_info[i] == 0)
96 emplaceToReadyJobs(i);
99 assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
101 VERBOSE(ParallelExecutor) << "INITIAL JOBS : " << _ready_jobs.size() << std::endl;
103 _subject.notifyModelBegin(this);
106 std::unique_lock<std::mutex> lock{_mu_jobs};
108 if (_ready_jobs.empty())
110 _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty() || noWaitingJobs(); });
111 // Check finish condition
112 if (_ready_jobs.empty() && noWaitingJobs())
118 auto job = std::move(_ready_jobs.begin()->second);
119 _ready_jobs.erase(_ready_jobs.begin());
123 VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index() << std::endl;
125 auto job_index = job->index();
126 auto op_sequence_index = _job_to_op_seq[job_index];
127 auto op_seq = &_lowered_graph->op_seqs().at(op_sequence_index);
128 auto backend = _lowered_graph->getLowerInfo()->op_seq.at(op_sequence_index)->backend();
129 auto setup = [&, op_seq, backend]() { _subject.notifyJobBegin(this, op_seq, backend); };
130 auto teardown = [&, job_index, op_seq, backend]() {
131 _subject.notifyJobEnd(this, op_seq, backend);
135 // dynamic tensor setting
136 bool handle_dynamic_tensor = op_seq->has_dynamic_tensor() || dynamic_input_exists;
137 job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
139 _scheduler->assign(std::make_unique<HookFunction>(job->fn_seq(), setup, teardown), backend);
140 _finished_jobs[job_index] = std::move(job);
143 assert(noWaitingJobs());
145 // Wait for all the jobs done
146 _scheduler->finish();
147 _subject.notifyModelEnd(this);
149 // Reset input info for the next execution
150 _input_info = _initial_input_info;