2 * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "DataflowExecutor.h"
21 #include "util/logging.h"
28 int64_t DataflowExecutor::calculateRank(const std::vector<ir::OperationIndex> &operations)
35 for (const auto &operation_idx : operations)
37 auto it = _indexed_ranks->find(operation_idx);
38 if (it == _indexed_ranks->end())
40 assert(_graph.operations().at(operation_idx).opcode() == ir::OpCode::Permute &&
41 operations.size() == 1);
42 // run Permute ASAP for next operations to be ready for other backends
43 return std::numeric_limits<int64_t>::max();
53 void DataflowExecutor::emplaceToReadyJobs(const uint32_t &id)
55 auto &job = _waiting_jobs[id];
56 assert(job != nullptr);
57 auto &op_seq = _lowered_graph->op_seqs().at(_job_to_op_seq[job->index()]);
58 auto rank = calculateRank(op_seq.operations());
59 _ready_jobs.emplace(rank, std::move(job));
62 void DataflowExecutor::notify(uint32_t finished_job_id)
64 for (auto id : _output_info[finished_job_id])
66 assert(_input_info[id] > 0);
67 auto count = --_input_info[id];
68 if (count == 0) // No dependent jobs left, ready for execution
70 emplaceToReadyJobs(id);
74 bool DataflowExecutor::noWaitingJobs()
76 return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
77 [](const std::unique_ptr<Job> &job) { return job == nullptr; });
80 DataflowExecutor::DataflowExecutor(std::unique_ptr<compiler::LoweredGraph> lowered_graph,
81 const compiler::TensorRegistries &tensor_regs,
82 compiler::CodeMap &&code_map,
83 const util::TracingCtx *tracing_ctx)
84 : ExecutorBase{std::move(lowered_graph), tensor_regs, tracing_ctx},
85 _code_map{std::move(code_map)}
87 VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl;
89 const auto &op_seqs = _lowered_graph->op_seqs();
90 // Assign jobs convert OpSequenceIndex to job index(uint32_t)
91 uint32_t next_job_index = 0;
92 std::unordered_map<ir::OpSequenceIndex, uint32_t> op_seq_to_job;
93 op_seqs.iterate([&](const ir::OpSequenceIndex &op_seq_index, const ir::OpSequence &) {
94 VERBOSE(DataflowExecutor) << "Create a job #" << next_job_index << " with OpSequenceIndex "
95 << op_seq_index.value() << std::endl;
96 _finished_jobs.emplace_back(
97 std::make_unique<Job>(next_job_index, _code_map.at(op_seq_index).fn_seq.get()));
98 op_seq_to_job[op_seq_index] = next_job_index++;
101 _waiting_jobs.resize(next_job_index);
102 _output_info.resize(next_job_index);
103 _initial_input_info.resize(next_job_index, 0);
105 op_seqs.iterate([&](const ir::OpSequenceIndex &op_seq_index, const ir::OpSequence &op_seq) {
106 auto job_index = op_seq_to_job[op_seq_index];
107 for (auto output : op_seq.getOutputs())
109 // Update output and input info
111 [&](const ir::OpSequenceIndex &op_seq_cur_index, const ir::OpSequence &op_seq_cur) {
112 if (op_seq_cur.getInputs().contains(output))
114 auto dep_index = op_seq_to_job[op_seq_cur_index];
115 ++_initial_input_info[dep_index];
116 _output_info[job_index].push_back(dep_index);
121 for (const auto &s : op_seq_to_job)
122 _job_to_op_seq.emplace(s.second, s.first);
124 _input_info = _initial_input_info;
127 void DataflowExecutor::executeImpl()
129 assert(noWaitingJobs());
131 bool dynamic_input_exists = hasDynamicInput();
134 _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
136 for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
138 if (_input_info[i] == 0)
140 emplaceToReadyJobs(i);
143 assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
145 auto profiling_subg_index = _tracing_ctx->getSubgraphIndex(&_graph);
147 _subject.notifySubgraphBegin(profiling_subg_index);
149 while (!_ready_jobs.empty())
151 auto job = std::move((_ready_jobs.begin())->second);
152 _ready_jobs.erase(_ready_jobs.begin());
153 auto job_index = job->index();
154 VERBOSE(DataflowExecutor) << "Run job #" << job_index << std::endl;
156 auto op_seq_index = _job_to_op_seq[job_index];
157 auto op_seq = &_lowered_graph->op_seqs().at(op_seq_index);
158 const backend::Backend *backend =
159 _lowered_graph->getLowerInfo()->op_seq.at(op_seq_index)->backend();
161 _subject.notifyJobBegin(this, profiling_subg_index, op_seq, backend);
163 job->fn_seq()->initRunning();
165 // check if FunctionSequence needs to handle dynamic tensor
166 bool handle_dynamic_tensor = op_seq->has_dynamic_tensor() || dynamic_input_exists;
167 job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
171 _subject.notifyJobEnd(this, profiling_subg_index, op_seq, backend);
173 _finished_jobs[job_index] = std::move(job);
175 assert(noWaitingJobs());
177 _subject.notifySubgraphEnd(profiling_subg_index);
179 // Reset input info for the next execution
180 _input_info = _initial_input_info;