AutoZoom::~AutoZoom()
{
- if (_async_mode)
+ if (_async_mode) {
_input_service->streamOff();
+ _async_manager->clear();
+ }
_taskManager->clear();
}
_thread_handle = std::make_unique<std::thread>(&AsyncManager::invokeThread, this);
}
- ~AsyncManager()
+ ~AsyncManager() = default;
+
+ void clear()
{
_exit_thread = true;
_thread_handle->join();
_inputBuffer->release();
_enabled = false;
- // Bridge node got the result from previous task node so enable this bridge node.
- if (_outputBuffer)
- _enabled = true;
+ if (!_outputBuffer)
+ return;
+
+ _enabled = true;
}
- bool isEnabled()
+ void updateRunQueue(RunQueueManager *runQueueManager) final
{
- return _enabled;
+ // If this bridge node failed then wakeup all next nodes because
+ // other nodes can wait for them to finish. In this case,
+ // the next nodes aren't invoked.
+ if (!_enabled) {
+ for (const auto &n : _nexts)
+ n->wakeup();
+
+ return;
+ }
+
+ for (auto &n : _nexts)
+ runQueueManager->pushNode(n);
}
};
#define __CALLBACK_NODE_H__
#include <mutex>
+#include <queue>
#include <condition_variable>
#include "INode.h"
void clear() override;
void setCb(const NodeCb &cb);
std::vector<std::shared_ptr<BaseResultType> > &results() override;
+ virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
};
}
_inputBuffer->release();
}
+
+ void updateRunQueue(RunQueueManager *runQueueManager) override
+ {}
};
}
namespace services
{
enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, ENDPOINT };
+class RunQueueManager;
class INode
{
virtual void wait() = 0;
virtual void wakeup() = 0;
virtual void clear() = 0;
+ virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
};
using NodeCb = std::function<void(INode *node)>;
#define __INFERENCE_NODE_H__
#include <mutex>
+#include <queue>
#include <condition_variable>
#include "TaskNode.h"
void configure() final;
void invoke() final;
std::vector<std::shared_ptr<BaseResultType> > &results() final;
+ void updateRunQueue(RunQueueManager *runQueueManager) override;
};
}
--- /dev/null
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RUN_QUEUE_MANAGER_H__
+#define __RUN_QUEUE_MANAGER_H__
+
+#include <queue>
+#include <unordered_set>
+#include <memory>
+#include <mutex>
+
+#include "INode.h"
+#include "SingleoLog.h"
+
+namespace singleo
+{
+namespace services
+{
+class RunQueueManager
+{
+private:
+ std::unordered_set<std::shared_ptr<INode> > _uniqueRunNodes;
+ std::queue<std::shared_ptr<INode> > _runQueue;
+ std::mutex _runQueueMutex;
+
+ bool isDuplicated(std::shared_ptr<INode> &node)
+ {
+ if (_uniqueRunNodes.find(node) != _uniqueRunNodes.end())
+ return true;
+
+ return false;
+ }
+
+public:
+ void pushNode(std::shared_ptr<INode> &node)
+ {
+ std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+ if (isDuplicated(node))
+ return;
+
+ _runQueue.push(node);
+ _uniqueRunNodes.insert(node);
+ }
+
+ std::shared_ptr<INode> &popNode()
+ {
+ std::lock_guard<std::mutex> lock(_runQueueMutex);
+ auto &node = _runQueue.front();
+
+ _runQueue.pop();
+
+ return node;
+ }
+
+ bool isEmpty()
+ {
+ std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+ return _runQueue.empty();
+ }
+
+ void clear()
+ {
+ std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+ _uniqueRunNodes.clear();
+ }
+
+public:
+ RunQueueManager() = default;
+ ~RunQueueManager() = default;
+};
+
+}
+}
+
+#endif
\ No newline at end of file
#include "IInferenceTaskInterface.h"
#include "SingleoCommonTypes.h"
#include "INode.h"
+#include "RunQueueManager.h"
namespace singleo
{
std::vector<std::shared_ptr<INode> > _nodes;
std::vector<std::shared_ptr<BaseResultType> > _results;
std::queue<std::shared_ptr<std::thread> > _threads;
- std::unordered_set<std::shared_ptr<INode> > _is_thread_created;
-
std::mutex _thread_mutex;
+ std::unique_ptr<RunQueueManager> _runQueueManager;
void threadCb(std::shared_ptr<INode> &node);
void verifyGraph();
#define __TASK_NODE_H__
#include <mutex>
+#include <queue>
#include <condition_variable>
#include "INode.h"
virtual void invoke() = 0;
void clear() override;
virtual std::vector<std::shared_ptr<BaseResultType> > &results() = 0;
+ virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
};
}
#define __TRAINING_NODE_H__
#include <mutex>
+#include <queue>
#include <condition_variable>
#include "TaskNode.h"
{
// TODO. implement results here.
}
+
+ void updateRunQueue(RunQueueManager *runQueueManager) override
+ {}
};
}
#include "SingleoLog.h"
#include "InferenceNode.h"
+#include "RunQueueManager.h"
using namespace std;
using namespace singleo::inference;
return _results;
}
+void InferenceNode::updateRunQueue(RunQueueManager *runQueueManager)
+{
+ for (auto &n : _nexts)
+ runQueueManager->pushNode(n);
+}
+
}
}
\ No newline at end of file
for (auto &d : node->getDependencies())
d->wait();
- SINGLEO_LOGD("Launched node name = %s", node->getName().c_str());
-
if (node->getType() == NodeType::INFERENCE) {
if (_inputs[0]->_data_type != DataType::IMAGE) {
SINGLEO_LOGE("Invalid input data type.");
node->invoke();
node->wakeup();
- // Spawn threads for next nodes
- for (auto &n : node->getNexts()) {
- if (node->getType() == NodeType::BRIDGE) {
- auto b_node = dynamic_pointer_cast<BridgeNode>(node);
+ node->updateRunQueue(_runQueueManager.get());
- // In case of BRIDGE node, if this bridge node didn't get the result from previous task node,
- // isEnabled() is false. So if isEnabled() is false, stop all sub graph pipelines connected to this node.
- if (!b_node->isEnabled()) {
- n->wakeup();
- continue;
- }
- }
+ while (!_runQueueManager->isEmpty()) {
+ auto &n = _runQueueManager->popNode();
- std::lock_guard<std::mutex> lock(_thread_mutex);
- if (_is_thread_created.find(n) == _is_thread_created.end()) {
- _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
- _is_thread_created.insert(n);
- }
+ std::unique_lock<std::mutex> lock(_thread_mutex);
+
+ _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+ _thread_mutex.unlock();
}
}
auto inputBuffer = make_shared<SharedBuffer>();
+ _runQueueManager = make_unique<RunQueueManager>();
+
for (auto &i : _inputs)
inputBuffer->addInput(i);
throw InvalidOperation("root node should be inference node.");
}
- // TODO. consider for multiple sources later.
- n->setInputBuffer(inputBuffer);
- _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
- _is_thread_created.insert(n);
+ _runQueueManager->pushNode(n);
}
}
+ while (!_runQueueManager->isEmpty()) {
+ auto &n = _runQueueManager->popNode();
+
+ n->setInputBuffer(inputBuffer);
+ _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+ }
+
while (true) {
std::unique_lock<std::mutex> lock(_thread_mutex);
if (_threads.empty())
break;
auto t = _threads.front();
- _threads.pop();
+ _threads.pop();
lock.unlock();
-
t->join();
}
- _is_thread_created.clear();
_inputs.clear();
// the result has been returned to user so clear each node.
_inputs.clear();
_nodes.clear();
_results.clear();
- _is_thread_created.clear();
+ _runQueueManager->clear();
}
}