AutoZoom::~AutoZoom()
{
- if (_async_mode) {
+ if (_async_mode)
_input_service->streamOff();
- _async_manager->clear();
- }
_taskManager->clear();
}
_thread_handle = std::make_unique<std::thread>(&AsyncManager::invokeThread, this);
}
- ~AsyncManager() = default;
-
- void clear()
+ ~AsyncManager()
{
_exit_thread = true;
_thread_handle->join();
_inputBuffer->release();
_enabled = false;
- if (!_outputBuffer)
- return;
-
- _enabled = true;
+ // Bridge node got the result from previous task node so enable this bridge node.
+ if (_outputBuffer)
+ _enabled = true;
}
- void updateRunQueue(RunQueueManager *runQueueManager) final
+ bool isEnabled()
{
- // If this bridge node failed then wakeup all next nodes because
- // other nodes can wait for them to finish. In this case,
- // the next nodes aren't invoked.
- if (!_enabled) {
- for (const auto &n : _nexts)
- n->wakeup();
-
- return;
- }
-
- for (const auto &n : _nexts)
- runQueueManager->pushNode(n);
+ return _enabled;
}
};
#include <condition_variable>
#include "INode.h"
-#include "RunQueueManager.h"
#include "SingleoException.h"
#include "SharedBuffer.h"
CallbackNode() = default;
virtual ~CallbackNode() = default;
- virtual void configure() = 0;
- virtual void invoke() = 0;
- virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
-
NodeType getType() override;
std::string &getName() override
{
std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
void wait() override;
void wakeup() override;
+ virtual void configure() = 0;
+ virtual void invoke() = 0;
void clear() override;
void setCb(const NodeCb &cb);
std::vector<std::shared_ptr<BaseResultType> > &results() override;
_inputBuffer->release();
}
-
- void updateRunQueue(RunQueueManager *runQueueManager) override
- {}
};
}
namespace services
{
enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, ENDPOINT };
-class RunQueueManager;
class INode
{
virtual void wait() = 0;
virtual void wakeup() = 0;
virtual void clear() = 0;
- virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
};
using NodeCb = std::function<void(INode *node)>;
#define __INFERENCE_NODE_H__
#include <mutex>
-#include <queue>
#include <condition_variable>
#include "TaskNode.h"
void configure() final;
void invoke() final;
std::vector<std::shared_ptr<BaseResultType> > &results() final;
- void updateRunQueue(RunQueueManager *runQueueManager) override;
};
}
+++ /dev/null
-/**
- * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __RUN_QUEUE_MANAGER_H__
-#define __RUN_QUEUE_MANAGER_H__
-
-#include <queue>
-#include <unordered_set>
-#include <memory>
-#include <mutex>
-
-#include "INode.h"
-#include "SingleoLog.h"
-
-namespace singleo
-{
-namespace services
-{
-class RunQueueManager
-{
-private:
- std::unordered_set<std::shared_ptr<INode> > _uniqueRunNodes;
- std::queue<std::shared_ptr<INode> > _runQueue;
- std::mutex _runQueueMutex;
-
- bool isDuplicated(const std::shared_ptr<INode> &node)
- {
- return _uniqueRunNodes.find(node) != _uniqueRunNodes.end();
- }
-
-public:
- void pushNode(const std::shared_ptr<INode> node)
- {
- std::lock_guard<std::mutex> lock(_runQueueMutex);
-
- if (isDuplicated(node))
- return;
-
- _runQueue.push(node);
- _uniqueRunNodes.insert(node);
- }
-
- std::shared_ptr<INode> &popNode()
- {
- std::lock_guard<std::mutex> lock(_runQueueMutex);
- auto &node = _runQueue.front();
-
- _runQueue.pop();
-
- return node;
- }
-
- bool isEmpty()
- {
- std::lock_guard<std::mutex> lock(_runQueueMutex);
-
- return _runQueue.empty();
- }
-
- void clear()
- {
- std::lock_guard<std::mutex> lock(_runQueueMutex);
-
- _uniqueRunNodes.clear();
- }
-
-public:
- RunQueueManager() = default;
- ~RunQueueManager() = default;
-};
-
-}
-}
-
-#endif
\ No newline at end of file
#include "IInferenceTaskInterface.h"
#include "SingleoCommonTypes.h"
#include "INode.h"
-#include "RunQueueManager.h"
namespace singleo
{
std::vector<std::shared_ptr<INode> > _nodes;
std::vector<std::shared_ptr<BaseResultType> > _results;
std::queue<std::shared_ptr<std::thread> > _threads;
+ std::unordered_set<std::shared_ptr<INode> > _is_thread_created;
+
std::mutex _thread_mutex;
- std::unique_ptr<RunQueueManager> _runQueueManager;
void threadCb(std::shared_ptr<INode> &node);
void verifyGraph();
#define __TASK_NODE_H__
#include <mutex>
-#include <queue>
#include <condition_variable>
#include "INode.h"
virtual void invoke() = 0;
void clear() override;
virtual std::vector<std::shared_ptr<BaseResultType> > &results() = 0;
- virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
};
}
#define __TRAINING_NODE_H__
#include <mutex>
-#include <queue>
#include <condition_variable>
#include "TaskNode.h"
{
// TODO. implement results here.
}
-
- void updateRunQueue(RunQueueManager *runQueueManager) override
- {}
};
}
#include "SingleoLog.h"
#include "InferenceNode.h"
-#include "RunQueueManager.h"
using namespace std;
using namespace singleo::inference;
return _results;
}
-void InferenceNode::updateRunQueue(RunQueueManager *runQueueManager)
-{
- for (auto &n : _nexts)
- runQueueManager->pushNode(n);
-}
-
}
}
\ No newline at end of file
for (auto &d : node->getDependencies())
d->wait();
+ SINGLEO_LOGD("Launched node name = %s", node->getName().c_str());
+
if (node->getType() == NodeType::INFERENCE) {
if (_inputs[0]->_data_type != DataType::IMAGE) {
SINGLEO_LOGE("Invalid input data type.");
node->invoke();
node->wakeup();
- node->updateRunQueue(_runQueueManager.get());
-
- while (!_runQueueManager->isEmpty()) {
- auto &n = _runQueueManager->popNode();
+ // Spawn threads for next nodes
+ for (auto &n : node->getNexts()) {
+ if (node->getType() == NodeType::BRIDGE) {
+ auto b_node = dynamic_pointer_cast<BridgeNode>(node);
- std::unique_lock<std::mutex> lock(_thread_mutex);
+ // In case of BRIDGE node, if this bridge node didn't get the result from previous task node,
+ // isEnabled() is false. So if isEnabled() is false, stop all sub graph pipelines connected to this node.
+ if (!b_node->isEnabled()) {
+ n->wakeup();
+ continue;
+ }
+ }
- _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
- _thread_mutex.unlock();
+ std::lock_guard<std::mutex> lock(_thread_mutex);
+ if (_is_thread_created.find(n) == _is_thread_created.end()) {
+ _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+ _is_thread_created.insert(n);
+ }
}
}
auto inputBuffer = make_shared<SharedBuffer>();
- _runQueueManager = make_unique<RunQueueManager>();
-
for (auto &i : _inputs)
inputBuffer->addInput(i);
throw InvalidOperation("root node should be inference node.");
}
- _runQueueManager->pushNode(n);
+ // TODO. consider for multiple sources later.
+ n->setInputBuffer(inputBuffer);
+ _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+ _is_thread_created.insert(n);
}
}
- while (!_runQueueManager->isEmpty()) {
- auto &n = _runQueueManager->popNode();
-
- n->setInputBuffer(inputBuffer);
- _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
- }
-
while (true) {
std::unique_lock<std::mutex> lock(_thread_mutex);
if (_threads.empty())
break;
auto t = _threads.front();
-
_threads.pop();
+
lock.unlock();
+
t->join();
}
+ _is_thread_created.clear();
_inputs.clear();
// the result has been returned to user so clear each node.
_inputs.clear();
_nodes.clear();
_results.clear();
- _runQueueManager->clear();
+ _is_thread_created.clear();
}
}