2 * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "DataflowExecutor.h"
21 #include "util/logging.h"
28 int64_t DataflowExecutor::calculateRank(const std::vector<ir::OperationIndex> &operations)
35 for (const auto &operation_idx : operations)
37 auto it = _indexed_ranks->find(operation_idx);
38 if (it == _indexed_ranks->end())
40 assert(_graph.operations().at(operation_idx).opcode() == ir::OpCode::Permute &&
41 operations.size() == 1);
42 // run Permute ASAP for next operations to be ready for other backends
43 return std::numeric_limits<int64_t>::max();
53 void DataflowExecutor::emplaceToReadyJobs(const uint32_t &id)
55 auto &job = _waiting_jobs[id];
56 assert(job != nullptr);
57 auto &op_seq = _lowered_graph->op_seqs().at(_job_to_op_seq[job->index()]);
58 auto rank = calculateRank(op_seq.operations());
59 _ready_jobs.emplace(rank, std::move(job));
62 void DataflowExecutor::notify(uint32_t finished_job_id)
64 for (auto id : _output_info[finished_job_id])
66 assert(_input_info[id] > 0);
67 auto count = --_input_info[id];
68 if (count == 0) // No dependent jobs left, ready for execution
70 emplaceToReadyJobs(id);
74 bool DataflowExecutor::noWaitingJobs()
76 return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
77 [](const std::unique_ptr<Job> &job) { return job == nullptr; });
80 DataflowExecutor::DataflowExecutor(
81 std::unique_ptr<ir::LoweredGraph> lowered_graph,
82 const std::vector<std::shared_ptr<backend::ITensor>> &input_tensors,
83 const std::vector<std::shared_ptr<backend::ITensor>> &output_tensors,
84 const compiler::TensorBuilders &tensor_builders, compiler::CodeMap &&code_map)
85 : ExecutorBase{std::move(lowered_graph), input_tensors, output_tensors, tensor_builders},
86 _code_map{std::move(code_map)}
88 VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl;
90 const auto &op_seqs = _lowered_graph->op_seqs();
91 // Assign jobs convert OpSequenceIndex to job index(uint32_t)
92 uint32_t next_job_index = 0;
93 std::unordered_map<ir::OpSequenceIndex, uint32_t> op_seq_to_job;
94 op_seqs.iterate([&](const ir::OpSequenceIndex &op_seq_index, const ir::OpSequence &) {
95 VERBOSE(DataflowExecutor) << "Create a job #" << next_job_index << " with OpSequenceIndex "
96 << op_seq_index.value() << std::endl;
97 _finished_jobs.emplace_back(
98 std::make_unique<Job>(next_job_index, _code_map.at(op_seq_index).fn_seq.get()));
99 op_seq_to_job[op_seq_index] = next_job_index++;
102 _waiting_jobs.resize(next_job_index);
103 _output_info.resize(next_job_index);
104 _initial_input_info.resize(next_job_index, 0);
106 op_seqs.iterate([&](const ir::OpSequenceIndex &op_seq_index, const ir::OpSequence &op_seq) {
107 auto job_index = op_seq_to_job[op_seq_index];
108 for (auto output : op_seq.getOutputs())
110 // Update output and input info
112 [&](const ir::OpSequenceIndex &op_seq_cur_index, const ir::OpSequence &op_seq_cur) {
113 if (op_seq_cur.getInputs().contains(output))
115 auto dep_index = op_seq_to_job[op_seq_cur_index];
116 ++_initial_input_info[dep_index];
117 _output_info[job_index].push_back(dep_index);
122 for (const auto &s : op_seq_to_job)
123 _job_to_op_seq.emplace(s.second, s.first);
125 _input_info = _initial_input_info;
128 void DataflowExecutor::executeImpl()
130 assert(noWaitingJobs());
132 bool dynamic_input_exists = hasDynamicInput();
135 _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
137 for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
139 if (_input_info[i] == 0)
141 emplaceToReadyJobs(i);
144 assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
146 _subject.notifyModelBegin(this);
148 while (!_ready_jobs.empty())
150 auto job = std::move((_ready_jobs.begin())->second);
151 _ready_jobs.erase(_ready_jobs.begin());
152 auto job_index = job->index();
153 VERBOSE(DataflowExecutor) << "Run job #" << job_index << std::endl;
155 auto op_seq_index = _job_to_op_seq[job_index];
156 auto op_seq = &_lowered_graph->op_seqs().at(op_seq_index);
157 const backend::Backend *backend =
158 _lowered_graph->getLowerInfo()->op_seq.at(op_seq_index)->backend();
160 _subject.notifyJobBegin(this, op_seq, backend);
162 // check if FunctionSequence needs to handle dynamic tensor
163 bool handle_dynamic_tensor = op_seq->has_dynamic_tensor() || dynamic_input_exists;
164 job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
168 _subject.notifyJobEnd(this, op_seq, backend);
170 _finished_jobs[job_index] = std::move(job);
172 assert(noWaitingJobs());
174 _subject.notifyModelEnd(this);
176 // Reset input info for the next execution
177 _input_info = _initial_input_info;