Fix memory leak issue to SharedBuffer objects when task manager is performed.
The memory leak happended in two cases,
1. ref_count of SharedBuffer class wasn't discounted correctly because
'std::atomic<int> ref_count' isn't initialized.[1] So clear it correctly.
2. missed to release input buffer. As for this, it moved the release call
from each node class into threadCb of TaskManager class so that making
sure to miss releasing the input buffer. With this, each node class
is free to care about the buffer release.
Regarding the memory leak, this patch introduces buffer tracker which
manages to check if each buffer is released or not.
In addition, use call by reference as a parameter of setOutputBuffer function.
[1] https://saco-evaluator.org.za/docs/cppreference/en/cpp/atomic/atomic/atomic.html
Change-Id: I3b20a7b9962e086a6d95bc04d996f527efc545eb
Signed-off-by: Inki Dae <inki.dae@samsung.com>
--- /dev/null
+SET(COMMON_DIRECTORY ${ROOT_DIRECTORY}/common)
+
+LIST(APPEND COMMON_HEADER_LIST ${COMMON_HEADER_LIST} ${COMMON_DIRECTORY}/include)
\ No newline at end of file
#define __SHARED_BUFFER_H__
#include <atomic>
-#include <vector>
+#include <map>
#include <memory>
#include "SingleoCommonTypes.h"
+#include "SingleoException.h"
namespace singleo
{
+static std::map<unsigned long, bool> _bufferTracker;
+static std::mutex _bufferTrackerMutex;
+
class SharedBuffer
{
private:
std::vector<std::shared_ptr<BaseDataType> > _data;
- std::atomic<int> ref_count;
+ std::atomic<int> ref_count { 0 };
+ static void pushSharedBuffer(SharedBuffer *sharedBuffer)
+ {
+ std::unique_lock<std::mutex> lock(_bufferTrackerMutex);
+
+ _bufferTracker[reinterpret_cast<unsigned long>(sharedBuffer)] = true;
+ }
+
+ static void popSharedBuffer(SharedBuffer *sharedBuffer)
+ {
+ std::unique_lock<std::mutex> lock(_bufferTrackerMutex);
+
+ _bufferTracker[reinterpret_cast<unsigned long>(sharedBuffer)] = false;
+ }
public:
- SharedBuffer() = default;
+ SharedBuffer()
+ {
+ SharedBuffer::pushSharedBuffer(this);
+ }
~SharedBuffer() = default;
void addInput(std::shared_ptr<BaseDataType> data)
void release()
{
- if (ref_count.fetch_sub(1, std::memory_order_acq_rel) == 1)
+ if (ref_count.fetch_sub(1, std::memory_order_acq_rel) == 1) {
for (auto &data : _data)
data->release();
+
+ SharedBuffer::popSharedBuffer(this);
+ }
+ }
+
+ static void checkMemoryLeak()
+ {
+ std::unique_lock<std::mutex> lock(_bufferTrackerMutex);
+
+ unsigned int bufferLeakCnt = 0;
+
+ for (auto &bufferTracker : _bufferTracker)
+ if (bufferTracker.second == true)
+ bufferLeakCnt++;
+
+ if (bufferLeakCnt > 0)
+ throw exception::InvalidOperation("SharedBuffer memory leak detected. " + std::to_string(bufferLeakCnt) +
+ " buffer(s) are not released.");
}
};
"${PROJECT_SOURCE_DIR}/src/*.cpp"
"${PROJECT_SOURCE_DIR}/common/src/*.cpp")
+INCLUDE(${ROOT_DIRECTORY}/common/CMakeLists.txt)
INCLUDE(${ROOT_DIRECTORY}/log/CMakeLists.txt)
INCLUDE(${ROOT_DIRECTORY}/input/CMakeLists.txt)
INCLUDE(${ROOT_DIRECTORY}/inference/CMakeLists.txt)
ADD_LIBRARY(${PROJECT_NAME} SHARED ${SINGLEO_SERVICE_SOURCE_FILES})
-TARGET_INCLUDE_DIRECTORIES(${PROJECT_NAME} PRIVATE include common/include ${ROOT_DIRECTORY}/capi/ ${ROOT_DIRECTORY}/common/include ${INPUT_HEADER_LIST} ${INFERENCE_HEADER_LIST} ${SERVICE_HEADER_LIST} ${LOG_HEADER_LIST})
+TARGET_INCLUDE_DIRECTORIES(${PROJECT_NAME} PRIVATE include common/include ${ROOT_DIRECTORY}/capi/ ${COMMON_HEADER_LIST} ${INPUT_HEADER_LIST} ${INFERENCE_HEADER_LIST} ${SERVICE_HEADER_LIST} ${LOG_HEADER_LIST})
TARGET_LINK_LIBRARIES(${PROJECT_NAME} PRIVATE opencv_core opencv_imgcodecs opencv_highgui opencv_videoio ${INFERENCE_LIBRARY_LIST} ${SERVICE_LIBRARY_LIST} ${LOG_LIBRARY_LIST})
INSTALL(TARGETS ${PROJECT_NAME} DESTINATION ${LIB_INSTALL_DIR})
\ No newline at end of file
n->setStatus(NodeStatus::INVALID);
_cb(this);
-
- _inputBuffer->release();
}
};
_cb(this);
- _inputBuffer->release();
-
_status = NodeStatus::INVALID;
// Bridge node got the result from previous task node so enable this bridge node.
if (_outputBuffer)
void addDependency(INode *node) override;
std::vector<INode *> &getDependencies() override;
std::vector<INode *> &getNexts() override;
- void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) override;
+ void setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer) override;
std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
void wait() override;
void wakeup() override;
{
_results.clear();
_status = NodeStatus::INVALID;
+
// If at least one dependency is valid, endpoint node is valid.
for (auto &dep : this->getDependencies()) {
if (dynamic_cast<TaskNode *>(dep)->getStatus() == NodeStatus::VALID) {
break;
}
}
+
if (_status == NodeStatus::INVALID)
return;
if (_cb)
_cb(this);
-
- _inputBuffer->release();
}
};
virtual void addNext(INode *node) = 0;
virtual std::vector<INode *> &getNexts() = 0;
virtual std::vector<INode *> &getDependencies() = 0;
- virtual void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) = 0;
+ virtual void setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer) = 0;
virtual std::shared_ptr<SharedBuffer> &getOutputBuffer() = 0;
virtual void configure() = 0;
virtual void invoke() = 0;
void addDependency(INode *node) override;
std::vector<INode *> &getDependencies() override;
std::vector<INode *> &getNexts() override;
- void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) override;
+ void setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer) override;
std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
void wait() override;
void wakeup() override;
return _nexts;
}
-void CallbackNode::setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer)
+void CallbackNode::setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer)
{
outputBuffer->addRef();
_outputBuffer = outputBuffer;
return;
}
}
+
auto &inputs = _inputBuffer->getInputs();
// TODO. consider for multiple inputs later.
_task->invoke(*inputs[0]);
- // Inference request has been completed so release input data.
- _inputBuffer->release();
-
_status = NodeStatus::INVALID;
if (!_task->result()._is_empty) {
_resultMutex.lock();
}
node->invoke();
+ if (node->getInputBuffer())
+ node->getInputBuffer()->release();
node->wakeup();
// Spawn threads for next nodes
_nodes.clear();
_results.clear();
_is_thread_created.clear();
+
+ SharedBuffer::checkMemoryLeak();
}
}
return _nexts;
}
-void TaskNode::setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer)
+void TaskNode::setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer)
{
outputBuffer->addRef();
_outputBuffer = outputBuffer;