53bc3c20460215f3337e1d1ad953a2fd6ba95f36
[platform/core/ml/nnfw.git] / runtime / onert / core / src / exec / DataflowExecutor.cc
1 /*
2  * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "DataflowExecutor.h"
18
19 #include <cassert>
20
21 #include "util/logging.h"
22
23 namespace onert
24 {
25 namespace exec
26 {
27
28 int64_t DataflowExecutor::calculateRank(const std::vector<ir::OperationIndex> &operations)
29 {
30   int64_t rank = 0;
31   if (!_indexed_ranks)
32   {
33     return rank;
34   }
35   for (const auto &operation_idx : operations)
36   {
37     auto it = _indexed_ranks->find(operation_idx);
38     if (it == _indexed_ranks->end())
39     {
40       assert(_graph.operations().at(operation_idx).opcode() == ir::OpCode::Permute &&
41              operations.size() == 1);
42       // run Permute ASAP for next operations to be ready for other backends
43       return std::numeric_limits<int64_t>::max();
44     }
45     else
46     {
47       rank += it->second;
48     }
49   }
50   return rank;
51 }
52
53 void DataflowExecutor::emplaceToReadyJobs(const uint32_t &id)
54 {
55   auto &job = _waiting_jobs[id];
56   assert(job != nullptr);
57   auto &op_seq = _lowered_graph->op_seqs().at(_job_to_op_seq[job->index()]);
58   auto rank = calculateRank(op_seq.operations());
59   _ready_jobs.emplace(rank, std::move(job));
60 }
61
62 void DataflowExecutor::notify(uint32_t finished_job_id)
63 {
64   for (auto id : _output_info[finished_job_id])
65   {
66     assert(_input_info[id] > 0);
67     auto count = --_input_info[id];
68     if (count == 0) // No dependent jobs left, ready for execution
69     {
70       emplaceToReadyJobs(id);
71     }
72   }
73 }
74 bool DataflowExecutor::noWaitingJobs()
75 {
76   return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
77                      [](const std::unique_ptr<Job> &job) { return job == nullptr; });
78 }
79
80 DataflowExecutor::DataflowExecutor(std::unique_ptr<compiler::LoweredGraph> lowered_graph,
81                                    const std::vector<backend::ITensor *> &input_tensors,
82                                    const std::vector<backend::ITensor *> &output_tensors,
83                                    const compiler::TensorRegistries &tensor_regs,
84                                    compiler::CodeMap &&code_map)
85     : ExecutorBase{std::move(lowered_graph), input_tensors, output_tensors, tensor_regs},
86       _code_map{std::move(code_map)}
87 {
88   VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl;
89
90   const auto &op_seqs = _lowered_graph->op_seqs();
91   // Assign jobs convert OpSequenceIndex to job index(uint32_t)
92   uint32_t next_job_index = 0;
93   std::unordered_map<ir::OpSequenceIndex, uint32_t> op_seq_to_job;
94   op_seqs.iterate([&](const ir::OpSequenceIndex &op_seq_index, const ir::OpSequence &) {
95     VERBOSE(DataflowExecutor) << "Create a job #" << next_job_index << " with OpSequenceIndex "
96                               << op_seq_index.value() << std::endl;
97     _finished_jobs.emplace_back(
98         std::make_unique<Job>(next_job_index, _code_map.at(op_seq_index).fn_seq.get()));
99     op_seq_to_job[op_seq_index] = next_job_index++;
100   });
101
102   _waiting_jobs.resize(next_job_index);
103   _output_info.resize(next_job_index);
104   _initial_input_info.resize(next_job_index, 0);
105
106   op_seqs.iterate([&](const ir::OpSequenceIndex &op_seq_index, const ir::OpSequence &op_seq) {
107     auto job_index = op_seq_to_job[op_seq_index];
108     for (auto output : op_seq.getOutputs())
109     {
110       // Update output and input info
111       op_seqs.iterate(
112           [&](const ir::OpSequenceIndex &op_seq_cur_index, const ir::OpSequence &op_seq_cur) {
113             if (op_seq_cur.getInputs().contains(output))
114             {
115               auto dep_index = op_seq_to_job[op_seq_cur_index];
116               ++_initial_input_info[dep_index];
117               _output_info[job_index].push_back(dep_index);
118             }
119           });
120     }
121   });
122   for (const auto &s : op_seq_to_job)
123     _job_to_op_seq.emplace(s.second, s.first);
124
125   _input_info = _initial_input_info;
126 }
127
128 void DataflowExecutor::executeImpl()
129 {
130   assert(noWaitingJobs());
131
132   bool dynamic_input_exists = hasDynamicInput();
133
134   // Execution setup
135   _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
136
137   for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
138   {
139     if (_input_info[i] == 0)
140     {
141       emplaceToReadyJobs(i);
142     }
143   }
144   assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
145
146   _subject.notifyModelBegin(this);
147
148   while (!_ready_jobs.empty())
149   {
150     auto job = std::move((_ready_jobs.begin())->second);
151     _ready_jobs.erase(_ready_jobs.begin());
152     auto job_index = job->index();
153     VERBOSE(DataflowExecutor) << "Run job #" << job_index << std::endl;
154
155     auto op_seq_index = _job_to_op_seq[job_index];
156     auto op_seq = &_lowered_graph->op_seqs().at(op_seq_index);
157     const backend::Backend *backend =
158         _lowered_graph->getLowerInfo()->op_seq.at(op_seq_index)->backend();
159
160     _subject.notifyJobBegin(this, op_seq, backend);
161
162     job->fn_seq()->initRunning();
163
164     // check if FunctionSequence needs to handle dynamic tensor
165     bool handle_dynamic_tensor = op_seq->has_dynamic_tensor() || dynamic_input_exists;
166     job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
167
168     job->run();
169
170     _subject.notifyJobEnd(this, op_seq, backend);
171     notify(job_index);
172     _finished_jobs[job_index] = std::move(job);
173   }
174   assert(noWaitingJobs());
175
176   _subject.notifyModelEnd(this);
177
178   // Reset input info for the next execution
179   _input_info = _initial_input_info;
180 }
181
182 } // namespace exec
183 } // namespace onert