2 * Copyright (c) 2019 Samsung Electronics Co., Ltd. All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "DataflowExecutor.h"
21 #include "util/logging.h"
28 int64_t DataflowExecutor::calculateRank(const std::vector<ir::OperationIndex> &operations)
35 for (const auto &operation_idx : operations)
37 auto it = _indexed_ranks->find(operation_idx);
38 if (it == _indexed_ranks->end())
40 assert(_graph.operations().at(operation_idx).opcode() == ir::OpCode::Permute &&
41 operations.size() == 1);
42 // run Permute ASAP for next operations to be ready for other backends
43 return std::numeric_limits<int64_t>::max();
53 void DataflowExecutor::emplaceToReadyJobs(const uint32_t &id)
55 auto &job = _waiting_jobs[id];
56 assert(job != nullptr);
57 auto rank = calculateRank({_job_to_op[job->index()]});
58 _ready_jobs.emplace(rank, std::move(job));
61 void DataflowExecutor::notify(uint32_t finished_job_id)
63 for (auto &&id : _output_info[finished_job_id])
65 assert(_input_info[id] > 0);
66 auto count = --_input_info[id];
67 if (count == 0) // No dependent jobs left, ready for execution
69 emplaceToReadyJobs(id);
73 bool DataflowExecutor::noWaitingJobs()
75 return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
76 [](const std::unique_ptr<Job> &job) { return job == nullptr; });
79 DataflowExecutor::DataflowExecutor(std::unique_ptr<compiler::LoweredGraph> lowered_graph,
80 backend::BackendContexts &&backend_contexts,
81 const compiler::TensorRegistries &tensor_regs,
82 compiler::CodeMap &&code_map,
83 const util::TracingCtx *tracing_ctx)
84 : ExecutorBase{std::move(lowered_graph), std::move(backend_contexts), tensor_regs, tracing_ctx},
85 _code_map{std::move(code_map)}
87 VERBOSE(DataflowExecutor) << "Constructing Dataflow Executor" << std::endl;
89 // Assign jobs convert OperationIndex to job index(uint32_t)
90 uint32_t next_job_index = 0;
91 std::unordered_map<ir::OperationIndex, uint32_t> op_to_job;
92 const auto &operations = _lowered_graph->graph().operations();
93 operations.iterate([&](const ir::OperationIndex &op_ind, const ir::IOperation &) {
94 VERBOSE(DataflowExecutor) << "Create a job " << next_job_index << " with Operation " << op_ind
96 _finished_jobs.emplace_back(
97 std::make_unique<Job>(next_job_index, _code_map.at(op_ind).fn_seq.get()));
98 op_to_job[op_ind] = next_job_index++;
101 _waiting_jobs.resize(next_job_index);
102 _output_info.resize(next_job_index);
103 _initial_input_info.resize(next_job_index, 0);
105 operations.iterate([&](const ir::OperationIndex &op_ind, const ir::IOperation &op) {
106 auto job_index = op_to_job[op_ind];
107 for (auto &&output : op.getOutputs())
109 // Update output and input info
110 operations.iterate([&](const ir::OperationIndex &op_cur_ind, const ir::IOperation &op_cur) {
111 if (op_cur.getInputs().contains(output))
113 auto dep_index = op_to_job[op_cur_ind];
114 ++_initial_input_info[dep_index];
115 _output_info[job_index].push_back(dep_index);
120 for (const auto &s : op_to_job)
121 _job_to_op.emplace(s.second, s.first);
123 _input_info = _initial_input_info;
126 void DataflowExecutor::executeImpl()
128 assert(noWaitingJobs());
130 bool dynamic_input_exists = hasDynamicInput();
133 _waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
135 for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
137 if (_input_info[i] == 0)
139 emplaceToReadyJobs(i);
142 assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
144 auto profiling_subg_index = _tracing_ctx->getSubgraphIndex(&_graph);
146 _subject.notifySubgraphBegin(profiling_subg_index);
148 while (!_ready_jobs.empty())
150 auto job = std::move((_ready_jobs.begin())->second);
151 _ready_jobs.erase(_ready_jobs.begin());
152 auto job_index = job->index();
153 VERBOSE(DataflowExecutor) << "Run job " << job_index << std::endl;
155 auto op_ind = _job_to_op[job_index];
156 const backend::Backend *backend = _lowered_graph->lower_info().operation.at(op_ind).backend();
158 _subject.notifyJobBegin(this, profiling_subg_index, op_ind, backend);
160 job->fn_seq()->initRunning();
162 // check if FunctionSequence needs to handle dynamic tensor
163 bool handle_dynamic_tensor =
164 _lowered_graph->getHasDynamicTensor(op_ind) || dynamic_input_exists;
165 job->fn_seq()->enableDynamicShapeInferer(handle_dynamic_tensor);
169 _subject.notifyJobEnd(this, profiling_subg_index, op_ind, backend);
171 _finished_jobs[job_index] = std::move(job);
173 assert(noWaitingJobs());
175 _subject.notifySubgraphEnd(profiling_subg_index);
177 // Reset input info for the next execution
178 _input_info = _initial_input_info;