Check a task status inside HandleExt to handle concurrent issue
[platform/upstream/csr-framework.git] / src / framework / client / handle-ext.cpp
index 83cc706..cf97924 100644 (file)
  */
 #include "client/handle-ext.h"
 
-#include <algorithm>
+#include <utility>
 
-#include "client/utils.h"
-#include "common/dispatcher.h"
 #include "common/audit/logger.h"
+#include "common/exception.h"
 
 namespace Csr {
 namespace Client {
 
-HandleExt::HandleExt(std::shared_ptr<Context> &&context) :
-       Handle(std::move(context)),
-       m_stop(false)
+HandleExt::HandleExt(SockId id, ContextShPtr &&context) :
+       Handle(id, std::move(context)), m_stop(false), m_isRunning(false)
 {
 }
 
 HandleExt::~HandleExt()
 {
-       DEBUG("Destroying extended handle... join all workers...");
-       eraseJoinableIf();
+       DEBUG("Destroying extended handle... join worker...");
+       if (this->m_worker.joinable())
+               this->m_worker.join();
 }
 
-void HandleExt::stop()
+void HandleExt::setStopFunc(std::function<void()> &&func)
 {
-       DEBUG("Stop & join all workers...");
-       m_stop = true;
-       eraseJoinableIf();
+       std::lock_guard<std::mutex> l(this->m_flagMutex);
+       this->m_stopFunc = std::move(func);
 }
 
-bool HandleExt::isStopped() const
-{
-       return m_stop.load();
-}
-
-void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
+bool HandleExt::stop()
 {
-       std::unique_lock<std::mutex> l(m_mutex);
-       DEBUG("clean joinable workers! current worker map size: " <<
-                 m_workerMap.size());
-       auto it = m_workerMap.begin();
+       DEBUG("Stop & join worker...");
 
-       while (it != m_workerMap.end()) {
-               DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
+       {
+               std::lock_guard<std::mutex> l(this->m_flagMutex);
 
-               if (!it->second.t.joinable())
-                       throw std::logic_error(FORMAT("All workers should be joinable "
-                                                                                 "but it isn't. tid: " << it->first));
+               if (!this->m_isRunning || this->m_stop)
+                       return false;
 
-               if (!pred(*it)) {
-                       ++it;
-                       continue;
-               }
+               this->m_stop = true;
 
-               DEBUG("Joining worker! tid:" << it->first);
-               l.unlock();
-               it->second.t.join(); // release lock for worker who calls done()
-               l.lock();
-               DEBUG("Joined worker! tid:" << it->first);
-               it = m_workerMap.erase(it);
+               if (this->m_stopFunc != nullptr)
+                       this->m_stopFunc();
        }
+
+       if (this->m_worker.joinable())
+               this->m_worker.join();
+
+       return true;
 }
 
-void HandleExt::done()
+bool HandleExt::isStopped() const
 {
-       std::lock_guard<std::mutex> l(m_mutex);
-       auto it = m_workerMap.find(std::this_thread::get_id());
-
-       if (it == m_workerMap.end())
-               throw std::logic_error(FORMAT("worker done but it's not registered in map. "
-                                                                         "tid: " << std::this_thread::get_id()));
+       std::lock_guard<std::mutex> l(this->m_flagMutex);
+       return this->m_stop;
+}
 
-       it->second.isDone = true;
+bool HandleExt::isRunning() const
+{
+       std::lock_guard<std::mutex> l(this->m_flagMutex);
+       return this->m_isRunning;
 }
 
-void HandleExt::dispatchAsync(const Task &f)
+void HandleExt::dispatchAsync(const std::shared_ptr<Task> &f)
 {
-       eraseJoinableIf([](const WorkerMapPair & pair) {
-               return pair.second.isDone.load();
-       });
-       // TODO: how to handle exceptions in workers
-       std::thread t([this, f] {
+       std::lock_guard<std::mutex> l(this->m_flagMutex);
+
+       if (this->m_isRunning)
+               ThrowExc(CSR_ERROR_BUSY, "Worker is already running. Async is busy!");
+
+       if (this->m_worker.joinable())
+               this->m_worker.join();
+
+       this->m_isRunning = true;
+       this->m_stop = false;
+
+       this->m_worker = std::thread([this, f] {
                DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
 
-               f();
-               done();
+               {
+                       // Wait for client lib API func returned & mutex freed by scoped-lock dtor
+                       // This is for invoking registered callbacks follows returning API func
+                       std::lock_guard<std::mutex> _l(this->m_dispatchMutex);
+               }
+                       (*f)();
+
+               {
+                       std::lock_guard<std::mutex> _l(this->m_flagMutex);
+                       this->m_isRunning = false;
+               }
 
                DEBUG("client async thread done! tid: " << std::this_thread::get_id());
        });
-       {
-               std::lock_guard<std::mutex> l(m_mutex);
-               m_workerMap.emplace(t.get_id(), std::move(t));
-       }
 }
 
-HandleExt::Worker::Worker() : isDone(false)
+void HandleExt::add(ResultPtr &&ptr)
 {
-       DEBUG("Worker default constructor called");
-}
+       std::lock_guard<std::mutex> l(this->m_resultsMutex);
 
-HandleExt::Worker::Worker(std::thread &&_t) :
-       isDone(false),
-       t(std::forward<std::thread>(_t))
-{
+       this->m_results.emplace_back(std::move(ptr));
 }
 
-HandleExt::Worker::Worker(HandleExt::Worker &&other) :
-       isDone(other.isDone.load()),
-       t(std::move(other.t))
+void HandleExt::add(ResultListPtr &&ptr)
 {
-}
+       std::lock_guard<std::mutex> l(this->m_resultsMutex);
 
-HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
-{
-       isDone = other.isDone.load();
-       t = std::move(other.t);
-       return *this;
+       this->m_resultLists.emplace_back(std::move(ptr));
 }
 
 } // namespace Client