676bdb5fa4106f82ce8efe774447cb7bcb61cb0d
[platform/core/ml/nnfw.git] / runtime / onert / core / src / exec / ParallelExecutor.cc
1 /*
2  * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "ParallelExecutor.h"
18
19 #include <cassert>
20
21 #include "util/logging.h"
22 #include "exec/IFunction.h"
23
24 namespace onert
25 {
26 namespace exec
27 {
28
29 class HookFunction : public IFunction
30 {
31 public:
32   HookFunction(IFunction *fn, const std::function<void()> &setup,
33                const std::function<void()> &teardown)
34       : _fn{fn}, _setup{setup}, _teardown{teardown}
35   {
36   }
37
38 public:
39   void run() override
40   {
41     _setup();
42     _fn->run();
43     _teardown();
44   }
45
46 private:
47   IFunction *_fn;
48   std::function<void()> _setup;
49   std::function<void()> _teardown;
50 };
51
52 void ParallelExecutor::notify(uint32_t finished_job_id)
53 {
54   std::unique_lock<std::mutex> lock{_mu_jobs};
55
56   DataflowExecutor::notify(finished_job_id);
57
58   lock.unlock();
59   _cv_jobs.notify_all();
60 }
61
62 ParallelExecutor::ParallelExecutor(std::unique_ptr<compiler::LoweredGraph> lowered_graph,
63                                    const std::vector<backend::ITensor *> &input_tensors,
64                                    const std::vector<backend::ITensor *> &output_tensors,
65                                    const compiler::TensorRegistries &tensor_regs,
66                                    compiler::CodeMap &&code_map)
67     : DataflowExecutor{std::move(lowered_graph), input_tensors, output_tensors, tensor_regs,
68                        std::move(code_map)}
69 {
70   VERBOSE(ParallelExecutor) << "Constructing Parallel Executor" << std::endl;
71 }
72
73 void ParallelExecutor::executeImpl()
74 {
75   bool dynamic_input_exists = hasDynamicInput();
76
77   // Init scheduler
78   // TODO Consider to have distinct backend set in LowerInfoMap
79   BackendSet backends;
80   for (auto &itr : _lowered_graph->getLowerInfo()->op_seq)
81   {
82     backends.add(itr.second->backend());
83   }
84   _scheduler = std::make_unique<ParallelScheduler>(backends);
85
86   assert(noWaitingJobs());
87
88   // Execution setup
89   _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
90
91   for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
92   {
93     VERBOSE(ParallelExecutor) << i << ": " << _input_info[i] << std::endl;
94     if (_input_info[i] == 0)
95     {
96       emplaceToReadyJobs(i);
97     }
98   }
99   assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
100
101   VERBOSE(ParallelExecutor) << "INITIAL JOBS : " << _ready_jobs.size() << std::endl;
102
103   _subject.notifyModelBegin(this);
104   while (true)
105   {
106     std::unique_lock<std::mutex> lock{_mu_jobs};
107
108     if (_ready_jobs.empty())
109     {
110       _cv_jobs.wait(lock, [this] { return !_ready_jobs.empty() || noWaitingJobs(); });
111       // Check finish condition
112       if (_ready_jobs.empty() && noWaitingJobs())
113       {
114         break;
115       }
116     }
117
118     auto job = std::move(_ready_jobs.begin()->second);
119     _ready_jobs.erase(_ready_jobs.begin());
120
121     lock.unlock();
122
123     VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index() << std::endl;
124
125     auto job_index = job->index();
126     auto op_sequence_index = _job_to_op_seq[job_index];
127     auto op_seq = &_lowered_graph->op_seqs().at(op_sequence_index);
128     auto backend = _lowered_graph->getLowerInfo()->op_seq.at(op_sequence_index)->backend();
129     auto setup = [&, op_seq, backend]() { _subject.notifyJobBegin(this, op_seq, backend); };
130     auto teardown = [&, job_index, op_seq, backend]() {
131       _subject.notifyJobEnd(this, op_seq, backend);
132       notify(job_index);
133     };
134
135     job->fn_seq()->initRunning();
136
137     // dynamic tensor setting
138     bool handle_dynamic_tensor = op_seq->has_dynamic_tensor() || dynamic_input_exists;
139     job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
140
141     _scheduler->assign(std::make_unique<HookFunction>(job->fn_seq(), setup, teardown), backend);
142     _finished_jobs[job_index] = std::move(job);
143   }
144
145   assert(noWaitingJobs());
146
147   // Wait for all the jobs done
148   _scheduler->finish();
149   _subject.notifyModelEnd(this);
150
151   // Reset input info for the next execution
152   _input_info = _initial_input_info;
153 }
154
155 } // namespace exec
156 } // namespace onert