Imported Upstream version 1.25.0
[platform/core/ml/nnfw.git] / runtime / onert / core / src / exec / DataflowExecutor.cc
1 /*
2  * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "DataflowExecutor.h"
18
19 #include <cassert>
20
21 #include "util/logging.h"
22
23 namespace onert
24 {
25 namespace exec
26 {
27
28 int64_t DataflowExecutor::calculateRank(const std::vector<ir::OperationIndex> &operations)
29 {
30   int64_t rank = 0;
31   if (!_indexed_ranks)
32   {
33     return rank;
34   }
35   for (const auto &operation_idx : operations)
36   {
37     auto it = _indexed_ranks->find(operation_idx);
38     if (it == _indexed_ranks->end())
39     {
40       assert(_graph.operations().at(operation_idx).opcode() == ir::OpCode::Permute &&
41              operations.size() == 1);
42       // run Permute ASAP for next operations to be ready for other backends
43       return std::numeric_limits<int64_t>::max();
44     }
45     else
46     {
47       rank += it->second;
48     }
49   }
50   return rank;
51 }
52
53 void DataflowExecutor::emplaceToReadyJobs(const uint32_t &id)
54 {
55   auto &job = _waiting_jobs[id];
56   assert(job != nullptr);
57   auto rank = calculateRank({_job_to_op[job->index()]});
58   _ready_jobs.emplace(rank, std::move(job));
59 }
60
61 void DataflowExecutor::notify(uint32_t finished_job_id)
62 {
63   for (auto &&id : _output_info[finished_job_id])
64   {
65     assert(_input_info[id] > 0);
66     auto count = --_input_info[id];
67     if (count == 0) // No dependent jobs left, ready for execution
68     {
69       emplaceToReadyJobs(id);
70     }
71   }
72 }
73 bool DataflowExecutor::noWaitingJobs()
74 {
75   return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
76                      [](const std::unique_ptr<Job> &job) { return job == nullptr; });
77 }
78
79 DataflowExecutor::DataflowExecutor(std::unique_ptr<compiler::LoweredGraph> lowered_graph,
80                                    backend::BackendContexts &&backend_contexts,
81                                    const compiler::TensorRegistries &tensor_regs,
82                                    compiler::CodeMap &&code_map,
83                                    const util::TracingCtx *tracing_ctx)
84   : ExecutorBase{std::move(lowered_graph), std::move(backend_contexts), tensor_regs, tracing_ctx},
85     _code_map{std::move(code_map)}
86 {
87   VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl;
88
89   // Assign jobs convert OperationIndex to job index(uint32_t)
90   uint32_t next_job_index = 0;
91   std::unordered_map<ir::OperationIndex, uint32_t> op_to_job;
92   const auto &operations = _lowered_graph->graph().operations();
93   operations.iterate([&](const ir::OperationIndex &op_ind, const ir::IOperation &) {
94     VERBOSE(DataflowExecutor) << "Create a job " << next_job_index << " with Operation " << op_ind
95                               << std::endl;
96     _finished_jobs.emplace_back(
97       std::make_unique<Job>(next_job_index, _code_map.at(op_ind).fn_seq.get()));
98     op_to_job[op_ind] = next_job_index++;
99   });
100
101   _waiting_jobs.resize(next_job_index);
102   _output_info.resize(next_job_index);
103   _initial_input_info.resize(next_job_index, 0);
104
105   operations.iterate([&](const ir::OperationIndex &op_ind, const ir::IOperation &op) {
106     auto job_index = op_to_job[op_ind];
107     for (auto &&output : op.getOutputs())
108     {
109       // Update output and input info
110       operations.iterate([&](const ir::OperationIndex &op_cur_ind, const ir::IOperation &op_cur) {
111         if (op_cur.getInputs().contains(output))
112         {
113           auto dep_index = op_to_job[op_cur_ind];
114           ++_initial_input_info[dep_index];
115           _output_info[job_index].push_back(dep_index);
116         }
117       });
118     }
119   });
120   for (const auto &s : op_to_job)
121     _job_to_op.emplace(s.second, s.first);
122
123   _input_info = _initial_input_info;
124 }
125
126 void DataflowExecutor::executeImpl()
127 {
128   assert(noWaitingJobs());
129
130   bool dynamic_input_exists = hasDynamicInput();
131
132   // Execution setup
133   _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
134
135   for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
136   {
137     if (_input_info[i] == 0)
138     {
139       emplaceToReadyJobs(i);
140     }
141   }
142   assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
143
144   auto profiling_subg_index = _tracing_ctx->getSubgraphIndex(&_graph);
145
146   _subject.notifySubgraphBegin(profiling_subg_index);
147
148   while (!_ready_jobs.empty())
149   {
150     auto job = std::move((_ready_jobs.begin())->second);
151     _ready_jobs.erase(_ready_jobs.begin());
152     auto job_index = job->index();
153     VERBOSE(DataflowExecutor) << "Run job " << job_index << std::endl;
154
155     auto op_ind = _job_to_op[job_index];
156     const backend::Backend *backend = _lowered_graph->lower_info().operation.at(op_ind).backend();
157
158     _subject.notifyJobBegin(this, profiling_subg_index, op_ind, backend);
159
160     job->fn_seq()->initRunning();
161
162     // check if FunctionSequence needs to handle dynamic tensor
163     bool handle_dynamic_tensor =
164       _lowered_graph->getHasDynamicTensor(op_ind) || dynamic_input_exists;
165     job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
166
167     job->run();
168
169     _subject.notifyJobEnd(this, profiling_subg_index, op_ind, backend);
170     notify(job_index);
171     _finished_jobs[job_index] = std::move(job);
172   }
173   assert(noWaitingJobs());
174
175   _subject.notifySubgraphEnd(profiling_subg_index);
176
177   // Reset input info for the next execution
178   _input_info = _initial_input_info;
179 }
180
181 } // namespace exec
182 } // namespace onert