namespace exec
{
-void DataflowExecutor::notify(const model::OperandIndexSequence &operands)
+void DataflowExecutor::notify(uint32_t finished_job_id)
{
- auto itr = _waiting_jobs.begin();
- while (itr != _waiting_jobs.end())
+ for (auto id : _output_info[finished_job_id])
{
- auto &&job = *itr;
-
- job->onNotified(operands);
-
- if (job->ready())
+ assert(_input_info[id] > 0);
+ auto count = --_input_info[id];
+ if (count == 0) // No dependent jobs left, ready for execution
{
- _ready_jobs.push(std::move(job));
- itr = _waiting_jobs.erase(itr);
- }
- else
- {
- ++itr;
+ assert(_waiting_jobs[id] != nullptr);
+ _ready_jobs.push(std::move(_waiting_jobs[id]));
}
}
}
+bool DataflowExecutor::noWaitingJobs()
+{
+ return std::all_of(_waiting_jobs.begin(), _waiting_jobs.end(),
+ [](const std::unique_ptr<Job> &job) { return job == nullptr; });
+}
+
DataflowExecutor::DataflowExecutor(const std::shared_ptr<const model::Model> &model,
std::unique_ptr<model::SubgraphContext> subg_ctx,
const std::shared_ptr<compiler::OperandContext> &operand_context,
assert(_subg_ctx);
- // Create jobs
- _subg_ctx->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &subg) {
- VERBOSE(DataflowExecutor) << "Add a job #" << subg_index.value() << std::endl;
- _finished_jobs.emplace_back(nnfw::cpp14::make_unique<Job>(
- subg_index, _code_map.at(subg_index).get(),
- _lower_info->operation.at(subg_index)->backend(), subg.getInputs(), subg.getOutputs()));
+ // Assign jobs convert SubgraphIndex to job index(uint32_t)
+ uint32_t next_job_index = 0;
+ std::unordered_map<model::SubgraphIndex, uint32_t> subgraph_to_job;
+ _subg_ctx->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &) {
+ VERBOSE(DataflowExecutor) << "Create a job #" << next_job_index << " with SubgraphIndex "
+ << subg_index.value() << std::endl;
+ _finished_jobs.emplace_back(
+ nnfw::cpp14::make_unique<Job>(next_job_index, _code_map.at(subg_index).get(),
+ _lower_info->operation.at(subg_index)->backend()));
+ subgraph_to_job[subg_index] = next_job_index++;
});
- // Save operands that are initially ready
- {
- _model->operands.iterate([&](const model::OperandIndex &index, const model::Operand &object) {
- if (object.getDef().size() == 0)
- {
- _initially_ready_operands.append(index);
- }
- });
-
- for (auto index : _model->inputs)
+ _waiting_jobs.resize(next_job_index);
+ _output_info.resize(next_job_index);
+ _initial_input_info.resize(next_job_index, 0);
+
+ _subg_ctx->iterate([&](const model::SubgraphIndex &subg_index, const model::Subgraph &subg) {
+ auto job_index = subgraph_to_job[subg_index];
+ for (auto output : subg.getOutputs())
{
- _initially_ready_operands.append(index);
+ // Update output and input info
+ _subg_ctx->iterate(
+ [&](const model::SubgraphIndex &subg_cur_index, const model::Subgraph &subg_cur) {
+ if (subg_cur.getInputs().contains(output))
+ {
+ auto dep_index = subgraph_to_job[subg_cur_index];
+ ++_initial_input_info[dep_index];
+ _output_info[job_index].push_back(dep_index);
+ }
+ });
}
- }
+ });
+
+ _input_info = _initial_input_info;
}
void DataflowExecutor::executeImpl()
{
- assert(_waiting_jobs.empty());
+ assert(noWaitingJobs());
// Execution setup
_waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
- notify(_initially_ready_operands);
+ for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
+ {
+ if (_input_info[i] == 0)
+ {
+ _ready_jobs.push(std::move(_waiting_jobs[i]));
+ }
+ }
assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
while (!_ready_jobs.empty())
{
- VERBOSE(DataflowExecutor) << "waiting : " << _waiting_jobs.size() << " / "
- << "ready : " << _ready_jobs.size() << std::endl;
auto job = std::move(_ready_jobs.front());
_ready_jobs.pop();
- VERBOSE(DataflowExecutor) << "Run job #" << job->index().value() << std::endl;
+ VERBOSE(DataflowExecutor) << "Run job #" << job->index() << std::endl;
job->run();
- notify(job->outputs());
-
- _finished_jobs.push_back(std::move(job));
+ auto job_index = job->index();
+ notify(job_index);
+ _finished_jobs[job_index] = std::move(job);
}
- assert(_waiting_jobs.empty());
+ assert(noWaitingJobs());
+
+ // Reset input info for the next execution
+ _input_info = _initial_input_info;
}
} // namespace exec
using CodeMap = std::unordered_map<model::SubgraphIndex, std::unique_ptr<FunctionSequence>>;
protected:
- virtual void notify(const model::OperandIndexSequence &operands);
+ virtual void notify(uint32_t finished_job_id);
+ bool noWaitingJobs();
public:
/**
protected:
CodeMap _code_map;
- model::OperandIndexSequence _initially_ready_operands;
/**
- * @brief A list of finished jobs for current execution
+ * @brief A vector of finished jobs for current execution
* After a run it has all the jobs of this execution for the next run
*/
- std::list<std::unique_ptr<Job>> _finished_jobs;
+ std::vector<std::unique_ptr<Job>> _finished_jobs;
/**
- * @brief A list of waiting jobs for current execution
+ * @brief A vector of waiting jobs for current execution
* All the jobs are moved from #_finished_jobs to it when start a run
*/
- std::list<std::unique_ptr<Job>> _waiting_jobs;
+ std::vector<std::unique_ptr<Job>> _waiting_jobs;
+ /**
+ * @brief Jobs' output info
+ * Used for notifying after finishing a job
+ */
+ std::vector<std::list<uint32_t>> _output_info;
+ std::vector<uint32_t> _initial_input_info;
+ std::vector<uint32_t> _input_info;
/**
* @brief A list of jobs that are ready for current execution
* Jobs in it are ready to be scheduled
namespace exec
{
-Job::Job(const model::SubgraphIndex &index, IFunction *fn, const backend::Backend *backend,
- const model::OperandIndexSequence &inputs, const model::OperandIndexSequence &outputs)
- : _index{index}, _fn{fn}, _backend{backend}, _inputs(inputs), _outputs{outputs}
+Job::Job(uint32_t index, IFunction *fn, const backend::Backend *backend)
+ : _index{index}, _fn{fn}, _backend{backend}
{
- reset();
-}
-
-void Job::onNotified(const model::OperandIndex &operand)
-{
- std::string s;
- for (auto e : _blocking_inputs)
- {
- s += std::to_string(e.value());
- s += " ";
- }
- VERBOSE(Job) << "Job #" << _index.value() << " notified operand : " << operand.value()
- << " / blocking inputs { " << s << "}" << std::endl;
-
- auto itr = _blocking_inputs.find(operand);
- if (itr != _blocking_inputs.end())
- {
- _blocking_inputs.erase(itr);
- }
-}
-
-void Job::onNotified(const model::OperandIndexSequence &operands)
-{
- for (auto operand : operands)
- {
- onNotified(operand);
- }
}
void Job::run() { _fn->run(); }
-void Job::reset()
-{
- assert(_blocking_inputs.empty());
- for (auto input : _inputs)
- {
- _blocking_inputs.insert(input);
- }
-}
-
} // namespace exec
} // namespace neurun
* @param inputs Input operand list
* @param outputs Output operand list
*/
- Job(const model::SubgraphIndex &index, IFunction *fn, const backend::Backend *backend,
- const model::OperandIndexSequence &inputs, const model::OperandIndexSequence &outputs);
- /**
- * @brief Remove the given operand from blocking input list
- *
- * @param operand Operand to be notified
- */
- void onNotified(const model::OperandIndex &operand);
- /**
- * @brief Remove the given operands from blocking input list
- *
- * @param operands Operands to be notified
- */
- void onNotified(const model::OperandIndexSequence &operands);
+ Job(uint32_t index, IFunction *fn, const backend::Backend *backend);
/**
* @brief Execute the compiled code
*/
void run();
/**
- * @brief Check if this job is ready for execution
+ * @brief Return job index
*
- * @return @true if there is no blockin inputs
+ * @return Job index
*/
- bool ready() const { return _blocking_inputs.empty(); }
- /**
- * @brief Return subgraph index
- *
- * @return Subgraph index
- */
- model::SubgraphIndex index() const { return _index; }
- /**
- * @brief Return output indices
- *
- * @return Output indices
- */
- const model::OperandIndexSequence &outputs() const { return _outputs; }
+ uint32_t index() const { return _index; }
/**
* @brief Return the function to be executed
*
*/
const backend::Backend *backend() { return _backend; }
- /**
- * @brief Reset blocking inputs for the next execution
- */
- void reset();
-
private:
- model::SubgraphIndex _index;
+ uint32_t _index;
IFunction *_fn;
const backend::Backend *_backend;
- std::unordered_set<model::OperandIndex> _blocking_inputs;
- const model::OperandIndexSequence _inputs;
- const model::OperandIndexSequence _outputs;
};
} // namespace exec
std::function<void()> _teardown;
};
-void ParallelExecutor::notify(const model::OperandIndexSequence &operands)
+void ParallelExecutor::notify(uint32_t finished_job_id)
{
std::unique_lock<std::mutex> lock{_mu_jobs};
- DataflowExecutor::notify(operands);
+ DataflowExecutor::notify(finished_job_id);
lock.unlock();
_cv_jobs.notify_all();
void ParallelExecutor::executeImpl()
{
- assert(_waiting_jobs.empty());
-
// Init scheduler
// TODO Consider to have distinct backend set in LowerInfoMap
graph::BackendSet backends;
}
_scheduler = nnfw::cpp14::make_unique<ParallelScheduler>(backends);
+ assert(noWaitingJobs());
+
// Execution setup
_waiting_jobs.swap(_finished_jobs); // Move finished jobs to waiting jobs
- notify(_initially_ready_operands);
+ for (uint32_t i = 0; i < _waiting_jobs.size(); ++i)
+ {
+ VERBOSE(ParallelExecutor) << i << ": " << _input_info[i] << std::endl;
+ if (_input_info[i] == 0)
+ {
+ _ready_jobs.push(std::move(_waiting_jobs[i]));
+ }
+ }
assert(!_ready_jobs.empty()); // Cannot begin if there is no initial jobs
- while (!_ready_jobs.empty() || !_waiting_jobs.empty())
+ VERBOSE(ParallelExecutor) << "INITIAL JOBS : " << _ready_jobs.size() << std::endl;
+
+ while (!_ready_jobs.empty() ||
+ // TODO Enhance this not to iterate over the array
+ std::any_of(_waiting_jobs.begin(), _waiting_jobs.end(),
+ [](const std::unique_ptr<Job> &job) { return job != nullptr; }))
{
std::unique_lock<std::mutex> lock{_mu_jobs};
if (_ready_jobs.empty())
_cv_jobs.wait(lock, [this] { return !_ready_jobs.empty(); });
}
- VERBOSE(ParallelExecutor) << "waiting : " << _waiting_jobs.size() << " / "
- << "ready : " << _ready_jobs.size() << std::endl;
auto job = std::move(_ready_jobs.front());
_ready_jobs.pop();
lock.unlock();
- VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index().value() << std::endl;
+ VERBOSE(ParallelExecutor) << "Assigning fn #" << job->index() << std::endl;
- auto job_ptr = job.get();
- auto teardown = [&, job_ptr]() { notify(job_ptr->outputs()); };
+ auto job_index = job->index();
+ auto teardown = [&, job_index]() { notify(job_index); };
_scheduler->assign(nnfw::cpp14::make_unique<HookFunction>(job->fn(), teardown), job->backend());
- _finished_jobs.push_back(std::move(job));
+ _finished_jobs[job_index] = std::move(job);
}
- assert(_waiting_jobs.empty());
-
- for (const auto &job : _finished_jobs)
- {
- job->reset();
- }
+ assert(noWaitingJobs());
+ // Wait for all the jobs done
_scheduler->finish();
+
+ // Reset input info for the next execution
+ _input_info = _initial_input_info;
}
} // namespace exec
*/
class ParallelExecutor : public DataflowExecutor
{
-public:
- using CodeMap = std::unordered_map<model::SubgraphIndex, std::unique_ptr<FunctionSequence>>;
-
protected:
- void notify(const model::OperandIndexSequence &operands) override;
+ void notify(uint32_t finished_job_id) override;
public:
/**