Imported Upstream version 1.12.0
[platform/core/ml/nnfw.git] / runtime / onert / core / src / exec / DataflowExecutor.cc
1 /*
2  * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "DataflowExecutor.h"
18
19 #include <cassert>
20
21 #include "util/logging.h"
22
23 namespace onert
24 {
25 namespace exec
26 {
27
28 int64_t DataflowExecutor::calculateRank(const std::vector<ir::OperationIndex> &operations)
29 {
30   int64_t rank = 0;
31   if (!_indexed_ranks)
32   {
33     return rank;
34   }
35   for (const auto &operation_idx : operations)
36   {
37     auto it = _indexed_ranks->find(operation_idx);
38     if (it == _indexed_ranks->end())
39     {
40       assert(_graph.operations().at(operation_idx).opcode() == ir::OpCode::Permute &&
41              operations.size() == 1);
42       // run Permute ASAP for next operations to be ready for other backends
43       return std::numeric_limits<int64_t>::max();
44     }
45     else
46     {
47       rank += it->second;
48     }
49   }
50   return rank;
51 }
52
53 void DataflowExecutor::emplaceToReadyJobs(const uint32_t &id)
54 {
55   auto &job = _waiting_jobs[id];
56   assert(job != nullptr);
57   auto &op_seq = _lowered_graph->op_seqs().at(_job_to_op_seq[job->index()]);
58   auto rank = calculateRank(op_seq.operations());
59   _ready_jobs.emplace(rank, std::move(job));
60 }
61
62 void DataflowExecutor::notify(uint32_t finished_job_id)
63 {
64   for (auto id : _output_info[finished_job_id])
65   {
66     assert(_input_info[id] > 0);
67     auto count = --_input_info[id];
68     if (count == 0) // No dependent jobs left, ready for execution
69     {
70       emplaceToReadyJobs(id);
71     }
72   }
73 }
74 bool DataflowExecutor::noWaitingJobs()
75 {
76   return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
77                      [](const std::unique_ptr<Job> &job) { return job == nullptr; });
78 }
79
80 DataflowExecutor::DataflowExecutor(std::unique_ptr<compiler::LoweredGraph> lowered_graph,
81                                    const compiler::TensorRegistries &tensor_regs,
82                                    compiler::CodeMap &&code_map,
83                                    const util::TracingCtx *tracing_ctx)
84     : ExecutorBase{std::move(lowered_graph), tensor_regs, tracing_ctx},
85       _code_map{std::move(code_map)}
86 {
87   VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl;
88
89   const auto &op_seqs = _lowered_graph->op_seqs();
90   // Assign jobs convert OpSequenceIndex to job index(uint32_t)
91   uint32_t next_job_index = 0;
92   std::unordered_map<ir::OpSequenceIndex, uint32_t> op_seq_to_job;
93   op_seqs.iterate([&](const ir::OpSequenceIndex &op_seq_index, const ir::OpSequence &) {
94     VERBOSE(DataflowExecutor) << "Create a job #" << next_job_index << " with OpSequenceIndex "
95                               << op_seq_index.value() << std::endl;
96     _finished_jobs.emplace_back(
97         std::make_unique<Job>(next_job_index, _code_map.at(op_seq_index).fn_seq.get()));
98     op_seq_to_job[op_seq_index] = next_job_index++;
99   });
100
101   _waiting_jobs.resize(next_job_index);
102   _output_info.resize(next_job_index);
103   _initial_input_info.resize(next_job_index, 0);
104
105   op_seqs.iterate([&](const ir::OpSequenceIndex &op_seq_index, const ir::OpSequence &op_seq) {
106     auto job_index = op_seq_to_job[op_seq_index];
107     for (auto output : op_seq.getOutputs())
108     {
109       // Update output and input info
110       op_seqs.iterate(
111           [&](const ir::OpSequenceIndex &op_seq_cur_index, const ir::OpSequence &op_seq_cur) {
112             if (op_seq_cur.getInputs().contains(output))
113             {
114               auto dep_index = op_seq_to_job[op_seq_cur_index];
115               ++_initial_input_info[dep_index];
116               _output_info[job_index].push_back(dep_index);
117             }
118           });
119     }
120   });
121   for (const auto &s : op_seq_to_job)
122     _job_to_op_seq.emplace(s.second, s.first);
123
124   _input_info = _initial_input_info;
125 }
126
127 void DataflowExecutor::executeImpl()
128 {
129   assert(noWaitingJobs());
130
131   bool dynamic_input_exists = hasDynamicInput();
132
133   // Execution setup
134   _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
135
136   for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
137   {
138     if (_input_info[i] == 0)
139     {
140       emplaceToReadyJobs(i);
141     }
142   }
143   assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
144
145   auto profiling_subg_index = _tracing_ctx->getSubgraphIndex(&_graph);
146
147   _subject.notifySubgraphBegin(profiling_subg_index);
148
149   while (!_ready_jobs.empty())
150   {
151     auto job = std::move((_ready_jobs.begin())->second);
152     _ready_jobs.erase(_ready_jobs.begin());
153     auto job_index = job->index();
154     VERBOSE(DataflowExecutor) << "Run job #" << job_index << std::endl;
155
156     auto op_seq_index = _job_to_op_seq[job_index];
157     auto op_seq = &_lowered_graph->op_seqs().at(op_seq_index);
158     const backend::Backend *backend =
159         _lowered_graph->getLowerInfo()->op_seq.at(op_seq_index)->backend();
160
161     _subject.notifyJobBegin(this, profiling_subg_index, op_seq, backend);
162
163     job->fn_seq()->initRunning();
164
165     // check if FunctionSequence needs to handle dynamic tensor
166     bool handle_dynamic_tensor = op_seq->has_dynamic_tensor() || dynamic_input_exists;
167     job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
168
169     job->run();
170
171     _subject.notifyJobEnd(this, profiling_subg_index, op_seq, backend);
172     notify(job_index);
173     _finished_jobs[job_index] = std::move(job);
174   }
175   assert(noWaitingJobs());
176
177   _subject.notifySubgraphEnd(profiling_subg_index);
178
179   // Reset input info for the next execution
180   _input_info = _initial_input_info;
181 }
182
183 } // namespace exec
184 } // namespace onert