Add RunQueueManager class which schedules nodes of graph in runtime.
With this patch, we can drop some node dependency from task manager
by making each concrete node to update next nodes which will be
scheduled by task manager.
Change-Id: I7c448aa5ae579b2f61ec2e5b765789f2e82966ad
Signed-off-by: Inki Dae <inki.dae@samsung.com>
AutoZoom::~AutoZoom()
{
- if (_async_mode)
+ if (_async_mode) {
_input_service->streamOff();
+ _async_manager->clear();
+ }
_taskManager->clear();
}
_thread_handle = std::make_unique<std::thread>(&AsyncManager::invokeThread, this);
}
- ~AsyncManager()
+ ~AsyncManager() = default;
+
+ void clear()
{
_exit_thread = true;
_thread_handle->join();
_inputBuffer->release();
_enabled = false;
- // Bridge node got the result from previous task node so enable this bridge node.
- if (_outputBuffer)
- _enabled = true;
+ if (!_outputBuffer)
+ return;
+
+ _enabled = true;
}
- bool isEnabled()
+ void updateRunQueue(RunQueueManager *runQueueManager) final
{
- return _enabled;
+ // If this bridge node failed then wakeup all next nodes because
+ // other nodes can wait for them to finish. In this case,
+ // the next nodes aren't invoked.
+ if (!_enabled) {
+ for (const auto &n : _nexts)
+ n->wakeup();
+
+ return;
+ }
+
+ for (const auto &n : _nexts)
+ runQueueManager->pushNode(n);
}
};
#include <condition_variable>
#include "INode.h"
+#include "RunQueueManager.h"
#include "SingleoException.h"
#include "SharedBuffer.h"
CallbackNode() = default;
virtual ~CallbackNode() = default;
+ virtual void configure() = 0;
+ virtual void invoke() = 0;
+ virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
+
NodeType getType() override;
std::string &getName() override
{
std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
void wait() override;
void wakeup() override;
- virtual void configure() = 0;
- virtual void invoke() = 0;
void clear() override;
void setCb(const NodeCb &cb);
std::vector<std::shared_ptr<BaseResultType> > &results() override;
_inputBuffer->release();
}
+
+ void updateRunQueue(RunQueueManager *runQueueManager) override
+ {}
};
}
namespace services
{
enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, ENDPOINT };
+class RunQueueManager;
class INode
{
virtual void wait() = 0;
virtual void wakeup() = 0;
virtual void clear() = 0;
+ virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
};
using NodeCb = std::function<void(INode *node)>;
#define __INFERENCE_NODE_H__
#include <mutex>
+#include <queue>
#include <condition_variable>
#include "TaskNode.h"
void configure() final;
void invoke() final;
std::vector<std::shared_ptr<BaseResultType> > &results() final;
+ void updateRunQueue(RunQueueManager *runQueueManager) override;
};
}
--- /dev/null
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RUN_QUEUE_MANAGER_H__
+#define __RUN_QUEUE_MANAGER_H__
+
+#include <queue>
+#include <unordered_set>
+#include <memory>
+#include <mutex>
+
+#include "INode.h"
+#include "SingleoLog.h"
+
+namespace singleo
+{
+namespace services
+{
+class RunQueueManager
+{
+private:
+ std::unordered_set<std::shared_ptr<INode> > _uniqueRunNodes;
+ std::queue<std::shared_ptr<INode> > _runQueue;
+ std::mutex _runQueueMutex;
+
+ bool isDuplicated(const std::shared_ptr<INode> &node)
+ {
+ return _uniqueRunNodes.find(node) != _uniqueRunNodes.end();
+ }
+
+public:
+ void pushNode(const std::shared_ptr<INode> node)
+ {
+ std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+ if (isDuplicated(node))
+ return;
+
+ _runQueue.push(node);
+ _uniqueRunNodes.insert(node);
+ }
+
+ std::shared_ptr<INode> &popNode()
+ {
+ std::lock_guard<std::mutex> lock(_runQueueMutex);
+ auto &node = _runQueue.front();
+
+ _runQueue.pop();
+
+ return node;
+ }
+
+ bool isEmpty()
+ {
+ std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+ return _runQueue.empty();
+ }
+
+ void clear()
+ {
+ std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+ _uniqueRunNodes.clear();
+ }
+
+public:
+ RunQueueManager() = default;
+ ~RunQueueManager() = default;
+};
+
+}
+}
+
+#endif
\ No newline at end of file
#include "IInferenceTaskInterface.h"
#include "SingleoCommonTypes.h"
#include "INode.h"
+#include "RunQueueManager.h"
namespace singleo
{
std::vector<std::shared_ptr<INode> > _nodes;
std::vector<std::shared_ptr<BaseResultType> > _results;
std::queue<std::shared_ptr<std::thread> > _threads;
- std::unordered_set<std::shared_ptr<INode> > _is_thread_created;
-
std::mutex _thread_mutex;
+ std::unique_ptr<RunQueueManager> _runQueueManager;
void threadCb(std::shared_ptr<INode> &node);
void verifyGraph();
#define __TASK_NODE_H__
#include <mutex>
+#include <queue>
#include <condition_variable>
#include "INode.h"
virtual void invoke() = 0;
void clear() override;
virtual std::vector<std::shared_ptr<BaseResultType> > &results() = 0;
+ virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
};
}
#define __TRAINING_NODE_H__
#include <mutex>
+#include <queue>
#include <condition_variable>
#include "TaskNode.h"
{
// TODO. implement results here.
}
+
+ void updateRunQueue(RunQueueManager *runQueueManager) override
+ {}
};
}
#include "SingleoLog.h"
#include "InferenceNode.h"
+#include "RunQueueManager.h"
using namespace std;
using namespace singleo::inference;
return _results;
}
+void InferenceNode::updateRunQueue(RunQueueManager *runQueueManager)
+{
+ for (auto &n : _nexts)
+ runQueueManager->pushNode(n);
+}
+
}
}
\ No newline at end of file
for (auto &d : node->getDependencies())
d->wait();
- SINGLEO_LOGD("Launched node name = %s", node->getName().c_str());
-
if (node->getType() == NodeType::INFERENCE) {
if (_inputs[0]->_data_type != DataType::IMAGE) {
SINGLEO_LOGE("Invalid input data type.");
node->invoke();
node->wakeup();
- // Spawn threads for next nodes
- for (auto &n : node->getNexts()) {
- if (node->getType() == NodeType::BRIDGE) {
- auto b_node = dynamic_pointer_cast<BridgeNode>(node);
+ node->updateRunQueue(_runQueueManager.get());
- // In case of BRIDGE node, if this bridge node didn't get the result from previous task node,
- // isEnabled() is false. So if isEnabled() is false, stop all sub graph pipelines connected to this node.
- if (!b_node->isEnabled()) {
- n->wakeup();
- continue;
- }
- }
+ while (!_runQueueManager->isEmpty()) {
+ auto &n = _runQueueManager->popNode();
- std::lock_guard<std::mutex> lock(_thread_mutex);
- if (_is_thread_created.find(n) == _is_thread_created.end()) {
- _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
- _is_thread_created.insert(n);
- }
+ std::unique_lock<std::mutex> lock(_thread_mutex);
+
+ _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+ _thread_mutex.unlock();
}
}
auto inputBuffer = make_shared<SharedBuffer>();
+ _runQueueManager = make_unique<RunQueueManager>();
+
for (auto &i : _inputs)
inputBuffer->addInput(i);
throw InvalidOperation("root node should be inference node.");
}
- // TODO. consider for multiple sources later.
- n->setInputBuffer(inputBuffer);
- _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
- _is_thread_created.insert(n);
+ _runQueueManager->pushNode(n);
}
}
+ while (!_runQueueManager->isEmpty()) {
+ auto &n = _runQueueManager->popNode();
+
+ n->setInputBuffer(inputBuffer);
+ _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+ }
+
while (true) {
std::unique_lock<std::mutex> lock(_thread_mutex);
if (_threads.empty())
break;
auto t = _threads.front();
- _threads.pop();
+ _threads.pop();
lock.unlock();
-
t->join();
}
- _is_thread_created.clear();
_inputs.clear();
// the result has been returned to user so clear each node.
_inputs.clear();
_nodes.clear();
_results.clear();
- _is_thread_created.clear();
+ _runQueueManager->clear();
}
}