Imported Upstream version 1.12.0
[platform/core/ml/nnfw.git] / runtime / onert / core / src / exec / ParallelExecutor.cc
1 /*
2  * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "ParallelExecutor.h"
18
19 #include <cassert>
20
21 #include "util/logging.h"
22 #include "exec/IFunction.h"
23
24 namespace onert
25 {
26 namespace exec
27 {
28
29 class HookFunction : public IFunction
30 {
31 public:
32   HookFunction(IFunction *fn, const std::function<void()> &setup,
33                const std::function<void()> &teardown)
34       : _fn{fn}, _setup{setup}, _teardown{teardown}
35   {
36   }
37
38 public:
39   void run() override
40   {
41     _setup();
42     _fn->run();
43     _teardown();
44   }
45
46 private:
47   IFunction *_fn;
48   std::function<void()> _setup;
49   std::function<void()> _teardown;
50 };
51
52 void ParallelExecutor::notify(uint32_t finished_job_id)
53 {
54   std::unique_lock<std::mutex> lock{_mu_jobs};
55
56   DataflowExecutor::notify(finished_job_id);
57
58   lock.unlock();
59   _cv_jobs.notify_all();
60 }
61
62 ParallelExecutor::ParallelExecutor(std::unique_ptr<compiler::LoweredGraph> lowered_graph,
63                                    const compiler::TensorRegistries &tensor_regs,
64                                    compiler::CodeMap &&code_map,
65                                    const util::TracingCtx *tracing_ctx)
66     : DataflowExecutor{std::move(lowered_graph), tensor_regs, std::move(code_map), tracing_ctx}
67 {
68   VERBOSE(ParallelExecutor) << "Constructing Parallel Executor" << std::endl;
69 }
70
71 void ParallelExecutor::executeImpl()
72 {
73   bool dynamic_input_exists = hasDynamicInput();
74
75   // Init scheduler
76   // TODO Consider to have distinct backend set in LowerInfoMap
77   BackendSet backends;
78   for (auto &itr : _lowered_graph->getLowerInfo()->op_seq)
79   {
80     backends.add(itr.second->backend());
81   }
82   _scheduler = std::make_unique<ParallelScheduler>(backends);
83
84   assert(noWaitingJobs());
85
86   // Execution setup
87   _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
88
89   for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
90   {
91     VERBOSE(ParallelExecutor) << i << ": " << _input_info[i] << std::endl;
92     if (_input_info[i] == 0)
93     {
94       emplaceToReadyJobs(i);
95     }
96   }
97   assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
98
99   VERBOSE(ParallelExecutor) << "INITIAL JOBS : " << _ready_jobs.size() << std::endl;
100
101   auto profiling_subg_index = _tracing_ctx->getSubgraphIndex(&_graph);
102
103   _subject.notifySubgraphBegin(profiling_subg_index);
104
105   while (true)
106   {
107     std::unique_lock<std::mutex> lock{_mu_jobs};
108
109     if (_ready_jobs.empty())
110     {
111       _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty() || noWaitingJobs(); });
112       // Check finish condition
113       if (_ready_jobs.empty() && noWaitingJobs())
114       {
115         break;
116       }
117     }
118
119     auto job = std::move(_ready_jobs.begin()->second);
120     _ready_jobs.erase(_ready_jobs.begin());
121
122     lock.unlock();
123
124     VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index() << std::endl;
125
126     auto job_index = job->index();
127     auto op_sequence_index = _job_to_op_seq[job_index];
128     auto op_seq = &_lowered_graph->op_seqs().at(op_sequence_index);
129     auto backend = _lowered_graph->getLowerInfo()->op_seq.at(op_sequence_index)->backend();
130     auto setup = [&, op_seq, backend]() {
131       _subject.notifyJobBegin(this, profiling_subg_index, op_seq, backend);
132     };
133     auto teardown = [&, job_index, op_seq, backend]() {
134       _subject.notifyJobEnd(this, profiling_subg_index, op_seq, backend);
135       notify(job_index);
136     };
137
138     job->fn_seq()->initRunning();
139
140     // dynamic tensor setting
141     bool handle_dynamic_tensor = op_seq->has_dynamic_tensor() || dynamic_input_exists;
142     job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
143
144     _scheduler->assign(std::make_unique<HookFunction>(job->fn_seq(), setup, teardown), backend);
145     _finished_jobs[job_index] = std::move(job);
146   }
147
148   assert(noWaitingJobs());
149
150   // Wait for all the jobs done
151   _scheduler->finish();
152   _subject.notifySubgraphEnd(profiling_subg_index);
153
154   // Reset input info for the next execution
155   _input_info = _initial_input_info;
156 }
157
158 } // namespace exec
159 } // namespace onert