task_manager: fix memory leak issue 74/314374/6
authorInki Dae <inki.dae@samsung.com>
Thu, 11 Jul 2024 02:25:36 +0000 (11:25 +0900)
committerInki Dae <inki.dae@samsung.com>
Fri, 12 Jul 2024 03:53:33 +0000 (12:53 +0900)
Fix memory leak issue to SharedBuffer objects when task manager is performed.

The memory leak happended in two cases,
1. ref_count of SharedBuffer class wasn't discounted correctly because
   'std::atomic<int> ref_count' isn't initialized.[1] So clear it correctly.
2. missed to release input buffer. As for this, it moved the release call
   from each node class into threadCb of TaskManager class so that making
   sure to miss releasing the input buffer. With this, each node class
   is free to care about the buffer release.

Regarding the memory leak, this patch introduces buffer tracker which
manages to check if each buffer is released or not.

In addition, use call by reference as a parameter of setOutputBuffer function.

[1] https://saco-evaluator.org.za/docs/cppreference/en/cpp/atomic/atomic/atomic.html

Change-Id: I3b20a7b9962e086a6d95bc04d996f527efc545eb
Signed-off-by: Inki Dae <inki.dae@samsung.com>
13 files changed:
common/CMakeLists.txt [new file with mode: 0644]
common/include/SharedBuffer.h
services/CMakeLists.txt
services/task_manager/include/BranchNode.h
services/task_manager/include/BridgeNode.h
services/task_manager/include/CallbackNode.h
services/task_manager/include/EndpointNode.h
services/task_manager/include/INode.h
services/task_manager/include/TaskNode.h
services/task_manager/src/CallbackNode.cpp
services/task_manager/src/InferenceNode.cpp
services/task_manager/src/TaskManager.cpp
services/task_manager/src/TaskNode.cpp

diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
new file mode 100644 (file)
index 0000000..b075544
--- /dev/null
@@ -0,0 +1,3 @@
+SET(COMMON_DIRECTORY ${ROOT_DIRECTORY}/common)
+
+LIST(APPEND COMMON_HEADER_LIST ${COMMON_HEADER_LIST} ${COMMON_DIRECTORY}/include)
\ No newline at end of file
index aa6e9ac19c7ffed7bbf216887203083f027e83a2..2617541d7272d018d21c93cd4587bd7401bf63f2 100644 (file)
 #define __SHARED_BUFFER_H__
 
 #include <atomic>
-#include <vector>
+#include <map>
 #include <memory>
 
 #include "SingleoCommonTypes.h"
+#include "SingleoException.h"
 
 namespace singleo
 {
+static std::map<unsigned long, bool> _bufferTracker;
+static std::mutex _bufferTrackerMutex;
+
 class SharedBuffer
 {
 private:
        std::vector<std::shared_ptr<BaseDataType> > _data;
-       std::atomic<int> ref_count;
+       std::atomic<int> ref_count { 0 };
+       static void pushSharedBuffer(SharedBuffer *sharedBuffer)
+       {
+               std::unique_lock<std::mutex> lock(_bufferTrackerMutex);
+
+               _bufferTracker[reinterpret_cast<unsigned long>(sharedBuffer)] = true;
+       }
+
+       static void popSharedBuffer(SharedBuffer *sharedBuffer)
+       {
+               std::unique_lock<std::mutex> lock(_bufferTrackerMutex);
+
+               _bufferTracker[reinterpret_cast<unsigned long>(sharedBuffer)] = false;
+       }
 
 public:
-       SharedBuffer() = default;
+       SharedBuffer()
+       {
+               SharedBuffer::pushSharedBuffer(this);
+       }
        ~SharedBuffer() = default;
 
        void addInput(std::shared_ptr<BaseDataType> data)
@@ -52,9 +72,27 @@ public:
 
        void release()
        {
-               if (ref_count.fetch_sub(1, std::memory_order_acq_rel) == 1)
+               if (ref_count.fetch_sub(1, std::memory_order_acq_rel) == 1) {
                        for (auto &data : _data)
                                data->release();
+
+                       SharedBuffer::popSharedBuffer(this);
+               }
+       }
+
+       static void checkMemoryLeak()
+       {
+               std::unique_lock<std::mutex> lock(_bufferTrackerMutex);
+
+               unsigned int bufferLeakCnt = 0;
+
+               for (auto &bufferTracker : _bufferTracker)
+                       if (bufferTracker.second == true)
+                               bufferLeakCnt++;
+
+               if (bufferLeakCnt > 0)
+                       throw exception::InvalidOperation("SharedBuffer memory leak detected. " + std::to_string(bufferLeakCnt) +
+                                                                                         " buffer(s) are not released.");
        }
 };
 
index 771fe92b8b3fba62f3e16bd49922653a13ad64ff..6df9aa956aad0ae1f58d7c1353eda0c832e3dcb7 100644 (file)
@@ -5,6 +5,7 @@ FILE(GLOB SINGLEO_SERVICE_SOURCE_FILES "${PROJECT_SOURCE_DIR}/*.cpp"
                                        "${PROJECT_SOURCE_DIR}/src/*.cpp"
                                        "${PROJECT_SOURCE_DIR}/common/src/*.cpp")
 
+INCLUDE(${ROOT_DIRECTORY}/common/CMakeLists.txt)
 INCLUDE(${ROOT_DIRECTORY}/log/CMakeLists.txt)
 INCLUDE(${ROOT_DIRECTORY}/input/CMakeLists.txt)
 INCLUDE(${ROOT_DIRECTORY}/inference/CMakeLists.txt)
@@ -17,7 +18,7 @@ ENDIF()
 
 ADD_LIBRARY(${PROJECT_NAME} SHARED ${SINGLEO_SERVICE_SOURCE_FILES})
 
-TARGET_INCLUDE_DIRECTORIES(${PROJECT_NAME} PRIVATE include common/include ${ROOT_DIRECTORY}/capi/ ${ROOT_DIRECTORY}/common/include ${INPUT_HEADER_LIST} ${INFERENCE_HEADER_LIST} ${SERVICE_HEADER_LIST} ${LOG_HEADER_LIST})
+TARGET_INCLUDE_DIRECTORIES(${PROJECT_NAME} PRIVATE include common/include ${ROOT_DIRECTORY}/capi/ ${COMMON_HEADER_LIST} ${INPUT_HEADER_LIST} ${INFERENCE_HEADER_LIST} ${SERVICE_HEADER_LIST} ${LOG_HEADER_LIST})
 TARGET_LINK_LIBRARIES(${PROJECT_NAME} PRIVATE opencv_core opencv_imgcodecs opencv_highgui opencv_videoio ${INFERENCE_LIBRARY_LIST} ${SERVICE_LIBRARY_LIST} ${LOG_LIBRARY_LIST})
 
 INSTALL(TARGETS ${PROJECT_NAME} DESTINATION ${LIB_INSTALL_DIR})
\ No newline at end of file
index ba7dc0ea0835fd91ee010000b3c5effc5c82500f..831adb2a8e0f2832016b52d0c1ba7ce6f83ba5a6 100644 (file)
@@ -64,8 +64,6 @@ public:
                        n->setStatus(NodeStatus::INVALID);
 
                _cb(this);
-
-               _inputBuffer->release();
        }
 };
 
index cdc545e60f9be352d5187570d7b9c0f8e1b14fc3..f877f301440a096a1ed600328e3c7f4c32c02011 100644 (file)
@@ -62,8 +62,6 @@ public:
 
                _cb(this);
 
-               _inputBuffer->release();
-
                _status = NodeStatus::INVALID;
                // Bridge node got the result from previous task node so enable this bridge node.
                if (_outputBuffer)
index 079cc266bfdaa7c04a0e8b8d373c8081283dc16b..c4bac80f4418d176585934ad8ebee7adeaaf14b7 100644 (file)
@@ -63,7 +63,7 @@ public:
        void addDependency(INode *node) override;
        std::vector<INode *> &getDependencies() override;
        std::vector<INode *> &getNexts() override;
-       void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) override;
+       void setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer) override;
        std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
        void wait() override;
        void wakeup() override;
index af13aff5896c212eb5357b3c06ed0bfc303a46b5..956ae942af35b33c9e640ff186814edfd9da3cdb 100644 (file)
@@ -42,6 +42,7 @@ public:
        {
                _results.clear();
                _status = NodeStatus::INVALID;
+
                // If at least one dependency is valid, endpoint node is valid.
                for (auto &dep : this->getDependencies()) {
                        if (dynamic_cast<TaskNode *>(dep)->getStatus() == NodeStatus::VALID) {
@@ -49,6 +50,7 @@ public:
                                break;
                        }
                }
+
                if (_status == NodeStatus::INVALID)
                        return;
 
@@ -57,8 +59,6 @@ public:
 
                if (_cb)
                        _cb(this);
-
-               _inputBuffer->release();
        }
 };
 
index 46f947107d73a9fa07b6bcb7caaa10c84413fe2b..bdcc2a87edb82da7d53769872afc21ae4421d0d9 100644 (file)
@@ -47,7 +47,7 @@ public:
        virtual void addNext(INode *node) = 0;
        virtual std::vector<INode *> &getNexts() = 0;
        virtual std::vector<INode *> &getDependencies() = 0;
-       virtual void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) = 0;
+       virtual void setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer) = 0;
        virtual std::shared_ptr<SharedBuffer> &getOutputBuffer() = 0;
        virtual void configure() = 0;
        virtual void invoke() = 0;
index f50a4f807d1e256204951803b8dcbb23ef45552d..8a915d89f3c9643a0a8c11a3d95ef84b47ec0206 100644 (file)
@@ -61,7 +61,7 @@ public:
        void addDependency(INode *node) override;
        std::vector<INode *> &getDependencies() override;
        std::vector<INode *> &getNexts() override;
-       void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) override;
+       void setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer) override;
        std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
        void wait() override;
        void wakeup() override;
index 0c7814bcf2a2db7babf24da8e7852b1e04b225f3..f1ce185868f4ab7c994656b721201c655a3dc939 100644 (file)
@@ -95,7 +95,7 @@ std::vector<INode *> &CallbackNode::getNexts()
        return _nexts;
 }
 
-void CallbackNode::setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer)
+void CallbackNode::setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer)
 {
        outputBuffer->addRef();
        _outputBuffer = outputBuffer;
index 31aae47774bce0b637ae6902ea15c8a52ea4b36f..3a911b455f9a4628c46a2297a201fa7cbad9d98b 100644 (file)
@@ -52,14 +52,12 @@ void InferenceNode::invoke()
                        return;
                }
        }
+
        auto &inputs = _inputBuffer->getInputs();
 
        // TODO. consider for multiple inputs later.
        _task->invoke(*inputs[0]);
 
-       // Inference request has been completed so release input data.
-       _inputBuffer->release();
-
        _status = NodeStatus::INVALID;
        if (!_task->result()._is_empty) {
                _resultMutex.lock();
index cbf7f3bbd9d1c599a06188c60a3ce206e181f4d7..3f5ccd5e65f0f36139bcd00499078149c7ee16ee 100644 (file)
@@ -110,6 +110,8 @@ void TaskManager::threadCb(INode *node)
        }
 
        node->invoke();
+       if (node->getInputBuffer())
+               node->getInputBuffer()->release();
        node->wakeup();
 
        // Spawn threads for next nodes
@@ -338,6 +340,8 @@ void TaskManager::clear()
        _nodes.clear();
        _results.clear();
        _is_thread_created.clear();
+
+       SharedBuffer::checkMemoryLeak();
 }
 
 }
index db6342e57b49f24b1dd0b26917aa8ed130103a58..7d5ae1da74ec1a2368f1db41a332541c8a59548d 100644 (file)
@@ -89,7 +89,7 @@ std::vector<INode *> &TaskNode::getNexts()
        return _nexts;
 }
 
-void TaskNode::setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer)
+void TaskNode::setOutputBuffer(std::shared_ptr<SharedBuffer> &outputBuffer)
 {
        outputBuffer->addRef();
        _outputBuffer = outputBuffer;