Keep single worker thread in async client 68/73168/3
authorKyungwook Tak <k.tak@samsung.com>
Fri, 3 Jun 2016 08:07:16 +0000 (17:07 +0900)
committerkyungwook tak <k.tak@samsung.com>
Tue, 7 Jun 2016 05:06:22 +0000 (22:06 -0700)
Change-Id: I6265e7bc642fa5b57be5b7cbdc6b385fb76b3f0a
Signed-off-by: Kyungwook Tak <k.tak@samsung.com>
src/framework/client/content-screening.cpp
src/framework/client/handle-ext.cpp
src/framework/client/handle-ext.h
src/framework/common/exception.h
test/test-common.cpp

index b5cdc72..a86afda 100644 (file)
@@ -371,7 +371,7 @@ int csr_cs_scan_files_async(csr_cs_context_h handle, const char *file_paths[],
 
        auto hExt = reinterpret_cast<Client::HandleExt *>(handle);
 
-       if (hExt->hasRunning()) {
+       if (hExt->isRunning()) {
                ERROR("Async scanning already running with this handle.");
                return CSR_ERROR_BUSY;
        }
@@ -385,7 +385,7 @@ int csr_cs_scan_files_async(csr_cs_context_h handle, const char *file_paths[],
                fileSet->emplace(Client::getAbsolutePath(file_paths[i]));
        }
 
-       hExt->dispatchAsync([hExt, user_data, fileSet] {
+       auto task = std::make_shared<Task>([hExt, user_data, fileSet] {
                auto ret = hExt->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
                                        CommandId::CANONICALIZE_PATHS, *fileSet);
 
@@ -408,6 +408,8 @@ int csr_cs_scan_files_async(csr_cs_context_h handle, const char *file_paths[],
                l.scanFiles(*canonicalizedFiles).second();
        });
 
+       hExt->dispatchAsync(task);
+
        return CSR_ERROR_NONE;
 
        EXCEPTION_SAFE_END
@@ -426,19 +428,21 @@ int csr_cs_scan_dir_async(csr_cs_context_h handle, const char *dir_path,
 
        auto hExt = reinterpret_cast<Client::HandleExt *>(handle);
 
-       if (hExt->hasRunning()) {
+       if (hExt->isRunning()) {
                ERROR("Async scanning already running with this handle.");
                return CSR_ERROR_BUSY;
        }
 
        auto dir = std::make_shared<std::string>(Client::getAbsolutePath(dir_path));
 
-       hExt->dispatchAsync([hExt, user_data, dir] {
+       auto task = std::make_shared<Task>([hExt, user_data, dir] {
                Client::AsyncLogic l(hExt, user_data, [&hExt] { return hExt->isStopped(); });
 
                l.scanDir(*dir).second();
        });
 
+       hExt->dispatchAsync(task);
+
        return CSR_ERROR_NONE;
 
        EXCEPTION_SAFE_END
@@ -457,7 +461,7 @@ int csr_cs_scan_dirs_async(csr_cs_context_h handle, const char *dir_paths[],
 
        auto hExt = reinterpret_cast<Client::HandleExt *>(handle);
 
-       if (hExt->hasRunning()) {
+       if (hExt->isRunning()) {
                ERROR("Async scanning already running with this handle.");
                return CSR_ERROR_BUSY;
        }
@@ -471,7 +475,7 @@ int csr_cs_scan_dirs_async(csr_cs_context_h handle, const char *dir_paths[],
                dirSet->insert(Client::getAbsolutePath(dir_paths[i]));
        }
 
-       hExt->dispatchAsync([hExt, user_data, dirSet] {
+       auto task = std::make_shared<Task>([hExt, user_data, dirSet] {
                auto ret = hExt->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
                                        CommandId::CANONICALIZE_PATHS, *dirSet);
 
@@ -496,6 +500,8 @@ int csr_cs_scan_dirs_async(csr_cs_context_h handle, const char *dir_paths[],
                l.scanDirs(*canonicalizedDirs).second();
        });
 
+       hExt->dispatchAsync(task);
+
        return CSR_ERROR_NONE;
 
        EXCEPTION_SAFE_END
@@ -511,7 +517,7 @@ int csr_cs_cancel_scanning(csr_cs_context_h handle)
 
        auto hExt = reinterpret_cast<Client::HandleExt *>(handle);
 
-       if (!hExt->hasRunning() || hExt->isStopped())
+       if (!hExt->isRunning() || hExt->isStopped())
                return CSR_ERROR_NO_TASK;
 
        hExt->stop();
index 59c8e9a..e2262cf 100644 (file)
  */
 #include "client/handle-ext.h"
 
-#include <algorithm>
+#include <utility>
 
-#include "client/utils.h"
-#include "common/dispatcher.h"
 #include "common/audit/logger.h"
 #include "common/exception.h"
 
@@ -32,138 +30,82 @@ namespace Csr {
 namespace Client {
 
 HandleExt::HandleExt(SockId id, ContextShPtr &&context) :
-       Handle(id, std::move(context)),
-       m_stop(false)
+       Handle(id, std::move(context)), m_stop(false), m_isRunning(false)
 {
 }
 
 HandleExt::~HandleExt()
 {
-       DEBUG("Destroying extended handle... join all workers...");
-       eraseJoinableIf();
+       DEBUG("Destroying extended handle... join worker...");
+       if (this->m_worker.joinable())
+               this->m_worker.join();
 }
 
 void HandleExt::stop()
 {
-       DEBUG("Stop & join all workers...");
-       m_stop = true;
-       eraseJoinableIf();
-}
-
-bool HandleExt::isStopped() const noexcept
-{
-       return m_stop.load();
-}
-
-bool HandleExt::hasRunning()
-{
-       std::unique_lock<std::mutex> l(m_mutex);
-       auto it = m_workerMap.begin();
+       DEBUG("Stop & join worker...");
 
-       while (it != m_workerMap.end()) {
-               if (!it->second.isDone.load())
-                       return true;
+       {
+               std::lock_guard<std::mutex> l(this->m_flagMutex);
+               this->m_stop = true;
        }
 
-       return false;
+       if (this->m_worker.joinable())
+               this->m_worker.join();
 }
 
-void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
+bool HandleExt::isStopped() const
 {
-       std::unique_lock<std::mutex> l(m_mutex);
-       DEBUG("clean joinable workers! current worker map size: " <<
-                 m_workerMap.size());
-       auto it = m_workerMap.begin();
-
-       while (it != m_workerMap.end()) {
-               DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
-
-               if (!it->second.t.joinable())
-                       ThrowExc(InternalError, "All workers should be joinable but it isn't. "
-                                        "tid: " << it->first);
-
-               if (!pred(*it)) {
-                       ++it;
-                       continue;
-               }
-
-               DEBUG("Joining worker! tid:" << it->first);
-               l.unlock();
-               it->second.t.join(); // release lock for worker who calls done()
-               l.lock();
-               DEBUG("Joined worker! tid:" << it->first);
-               it = m_workerMap.erase(it);
-       }
+       std::lock_guard<std::mutex> l(this->m_flagMutex);
+       return this->m_stop;
 }
 
-void HandleExt::done()
+bool HandleExt::isRunning() const
 {
-       std::lock_guard<std::mutex> l(m_mutex);
-       auto it = m_workerMap.find(std::this_thread::get_id());
-
-       if (it == m_workerMap.end())
-               ThrowExc(InternalError, "worker done but it's not registered in map. "
-                                "tid: " << std::this_thread::get_id());
-
-       it->second.isDone = true;
+       std::lock_guard<std::mutex> l(this->m_flagMutex);
+       return this->m_isRunning;
 }
 
-void HandleExt::dispatchAsync(const Task &f)
+void HandleExt::dispatchAsync(const std::shared_ptr<Task> &f)
 {
-       eraseJoinableIf([](const WorkerMapPair & pair) {
-               return pair.second.isDone.load();
-       });
-       // TODO: how to handle exceptions in workers
-       std::thread t([this, f] {
-               DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
-               m_stop = false;
+       std::lock_guard<std::mutex> l(this->m_flagMutex);
 
-               f();
-               done();
+       if (this->m_isRunning)
+               ThrowExc(BusyError, "Worker is already running. Async is busy!");
 
-               DEBUG("client async thread done! tid: " << std::this_thread::get_id());
-       });
+       if (this->m_worker.joinable())
+               this->m_worker.join();
 
-       {
-               std::lock_guard<std::mutex> l(m_mutex);
-               m_workerMap.emplace(t.get_id(), std::move(t));
-       }
-}
+       this->m_isRunning = true;
+       this->m_stop = false;
 
-HandleExt::Worker::Worker() : isDone(false)
-{
-       DEBUG("Worker default constructor called");
-}
+       // TODO: how to handle exceptions in workers
+       this->m_worker = std::thread([this, f] {
+               DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
 
-HandleExt::Worker::Worker(std::thread &&_t) :
-       isDone(false),
-       t(std::forward<std::thread>(_t))
-{
-}
+               (*f)();
 
-HandleExt::Worker::Worker(HandleExt::Worker &&other) :
-       isDone(other.isDone.load()),
-       t(std::move(other.t))
-{
-}
+               {
+                       std::lock_guard<std::mutex> _l(this->m_flagMutex);
+                       this->m_isRunning = false;
+               }
 
-HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
-{
-       isDone = other.isDone.load();
-       t = std::move(other.t);
-       return *this;
+               DEBUG("client async thread done! tid: " << std::this_thread::get_id());
+       });
 }
 
 void HandleExt::add(ResultPtr &&ptr)
 {
-       std::lock_guard<std::mutex> l(m_resultsMutex);
-       m_results.emplace_back(std::forward<ResultPtr>(ptr));
+       std::lock_guard<std::mutex> l(this->m_resultsMutex);
+
+       this->m_results.emplace_back(std::move(ptr));
 }
 
 void HandleExt::add(ResultListPtr &&ptr)
 {
-       std::lock_guard<std::mutex> l(m_resultsMutex);
-       m_resultLists.emplace_back(std::forward<ResultListPtr>(ptr));
+       std::lock_guard<std::mutex> l(this->m_resultsMutex);
+
+       this->m_resultLists.emplace_back(std::move(ptr));
 }
 
 } // namespace Client
index dd56d6a..756d9b1 100644 (file)
 
 #include <mutex>
 #include <thread>
-#include <atomic>
-
-#include <string>
-#include <map>
-#include <set>
-#include <utility>
+#include <memory>
 
 #include "client/handle.h"
 #include "client/callback.h"
-#include "common/icontext.h"
 
 namespace Csr {
 namespace Client {
@@ -42,10 +36,10 @@ public:
        explicit HandleExt(SockId id, ContextShPtr &&);
        virtual ~HandleExt();
 
-       void dispatchAsync(const Task &task);
+       void dispatchAsync(const std::shared_ptr<Task> &task);
        void stop(void);
-       bool isStopped(void) const noexcept;
-       bool hasRunning(void);
+       bool isStopped(void) const;
+       bool isRunning(void) const;
 
        Callback m_cb; // TODO: to refine..
 
@@ -53,30 +47,11 @@ public:
        virtual void add(ResultListPtr &&) override;
 
 private:
-       struct Worker {
-               std::atomic<bool> isDone;
-               std::thread t;
-
-               Worker();
-               Worker(const std::thread &_t) = delete; // to prevent thread instance copied
-               Worker(std::thread &&_t);
-               Worker(Worker &&other);
-               Worker &operator=(Worker &&other);
-       };
-
-       using WorkerMapPair = std::pair<const std::thread::id, Worker>;
-
-       void eraseJoinableIf(std::function<bool(const WorkerMapPair &)>
-                                                = [](const WorkerMapPair &)
-       {
-               return true;
-       });
-       void done(void);
-
-       std::atomic<bool> m_stop;
-       std::mutex m_mutex;
-       std::mutex m_resultsMutex;
-       std::map<std::thread::id, Worker> m_workerMap;
+       bool m_stop;
+       bool m_isRunning;
+       std::thread m_worker;
+       mutable std::mutex m_resultsMutex;
+       mutable std::mutex m_flagMutex;
 };
 
 } // namespace Client
index d3890e2..716d7ef 100644 (file)
@@ -81,6 +81,7 @@ using InternalError      = DerivedException<CSR_ERROR_SERVER>;
 using SocketError        = DerivedException<CSR_ERROR_SOCKET>;
 using FileDoNotExist     = DerivedException<CSR_ERROR_FILE_DO_NOT_EXIST>;
 using FileSystemError    = DerivedException<CSR_ERROR_FILE_SYSTEM>;
+using BusyError          = DerivedException<CSR_ERROR_BUSY>;
 
 } // namespace Csr
 
index b839215..569a9c9 100644 (file)
@@ -107,6 +107,8 @@ std::string capi_ec_to_string(csr_error_e ec)
        ERRORDESCRIBE(CSR_ERROR_INVALID_PARAMETER);
        ERRORDESCRIBE(CSR_ERROR_OUT_OF_MEMORY);
        ERRORDESCRIBE(CSR_ERROR_PERMISSION_DENIED);
+       ERRORDESCRIBE(CSR_ERROR_NOT_SUPPORTED);
+       ERRORDESCRIBE(CSR_ERROR_BUSY);
        ERRORDESCRIBE(CSR_ERROR_SOCKET);
        ERRORDESCRIBE(CSR_ERROR_INVALID_HANDLE);
        ERRORDESCRIBE(CSR_ERROR_SERVER);