Change thread-pool to static for stability 36/75236/2
authorKyungwook Tak <k.tak@samsung.com>
Fri, 17 Jun 2016 06:04:25 +0000 (15:04 +0900)
committersangwan kwon <sangwan.kwon@samsung.com>
Fri, 17 Jun 2016 06:49:11 +0000 (23:49 -0700)
Change-Id: I0ace06b0cefcdbbcf97d23e31893a870fb300ee1
Signed-off-by: Kyungwook Tak <k.tak@samsung.com>
src/framework/service/server-service.cpp
src/framework/service/thread-pool.cpp
src/framework/service/thread-pool.h
test/thread-pool/test-thread-pool.cpp

index cfdb568..6263815 100644 (file)
@@ -83,9 +83,7 @@ inline CommandId extractCommandId(BinaryQueue &q)
 
 }
 
-ServerService::ServerService() :
-       Service(),
-       m_workqueue(2, 10)
+ServerService::ServerService() : Service(), m_workqueue(5)
 {
        this->m_db = std::make_shared<Db::Manager>(RW_DBSPACE "/.csr.db", RO_DBSPACE);
 
index a09b568..c28ac45 100644 (file)
 #include "common/audit/logger.h"
 #include "common/exception.h"
 
-#define __BEGIN_CRITICAL__ { std::lock_guard<std::mutex> lock(this->m_mutex);
+#define __BEGIN_CRITICAL__ { std::unique_lock<std::mutex> lock(this->m_mutex);
 #define __END_CRITICAL__   }
 
 namespace Csr {
 
-ThreadPool::ThreadPool(size_t min, size_t max) :
-       m_min(min),
-       m_max(max),
-       m_stop(false),
-       m_runningWorkersNum(0)
+ThreadPool::ThreadPool(size_t threads) : m_stop(false), m_runningWorkersNum(0)
 {
-       if (this->m_min > this->m_max)
-               ThrowExc(CSR_ERROR_SERVER, "thread pool MIN shouldn't be bigger than MAX");
+       for (size_t i = 0; i < threads; ++i) {
+               this->m_workers.emplace_back([this] {
+                       INFO("Thread[" << std::this_thread::get_id() << "] start in pool");
 
-       for (size_t i = 0; i < this->m_min; i++)
-               this->add();
+                       while (true) {
+                               std::function<void()> task;
 
-       DEBUG("Thread pool initialized with [" << this->m_workers.size() << "] threads");
-}
-
-void ThreadPool::add(void)
-{
-       std::thread t([this]() {
-               DEBUG("Thread[" << std::this_thread::get_id() << "] start in pool");
-
-               while (true) {
-                       std::unique_lock<std::mutex> lock(this->m_mutex);
-                       this->m_cv.wait(lock, [this]() {
-                               return this->m_workers.size() > this->m_min ||
-                                          this->m_stop ||
-                                          !this->m_tasks.empty();
-                       });
-
-                       if (this->m_stop && this->m_tasks.empty()) {
-                               DEBUG("Thread pool stop requested. "
-                                         "thread[" << std::this_thread::get_id() << "] returning.");
-                               break;
-                       }
-
-                       if (this->m_workers.size() > this->m_min && this->m_tasks.empty()) {
-                               DEBUG("Terminate idle thread[" << std::this_thread::get_id() << "]");
-
-                               // move thread itself to me and erase dummy in this->m_workers
-                               auto currentId = std::this_thread::get_id();
-                               this->m_workers[currentId].detach();
-                               this->m_workers.erase(currentId);
-                               break;
-                       }
-
-                       auto task = std::move(this->m_tasks.front());
-                       this->m_tasks.pop();
+                               __BEGIN_CRITICAL__
+                               this->m_cv.wait(lock, [this]() {
+                                       return this->m_stop || !this->m_tasks.empty();
+                               });
 
-                       lock.unlock();
+                               if (this->m_stop && this->m_tasks.empty()) {
+                                       INFO("Thread pool stop requested. "
+                                                 "thread[" << std::this_thread::get_id() << "] returning.");
+                                       break;
+                               }
 
-                       INFO("Start task on thread[" << std::this_thread::get_id() << "]");
+                               task = std::move(this->m_tasks.front());
+                               this->m_tasks.pop();
+                               __END_CRITICAL__
 
-                       ++this->m_runningWorkersNum;
+                               DEBUG("Start task on thread[" << std::this_thread::get_id() << "]");
 
-                       task();
+                               ++this->m_runningWorkersNum;
 
-                       --this->m_runningWorkersNum;
-               }
-       });
+                               task();
 
-       this->m_workers[t.get_id()] = std::move(t);
-}
+                               --this->m_runningWorkersNum;
+                       }
+               });
+       }
 
-size_t ThreadPool::size() const
-{
-       return this->m_workers.size();
+       DEBUG("Thread pool initialized with [" << this->m_workers.size() << "] threads");
 }
 
 bool ThreadPool::isTaskRunning() const
@@ -107,23 +78,20 @@ bool ThreadPool::isTaskRunning() const
 
 ThreadPool::~ThreadPool()
 {
+       __BEGIN_CRITICAL__
        this->m_stop = true;
+       __END_CRITICAL__
 
        this->m_cv.notify_all();
 
        for (auto &worker : this->m_workers) {
-               if (worker.second.joinable())
-                       worker.second.join();
+               if (worker.joinable())
+                       worker.join();
        }
 }
 
 void ThreadPool::submit(std::function<void()> &&task)
 {
-       if (!this->m_tasks.empty() && this->m_workers.size() < this->m_max) {
-               DEBUG("more workers needed. let's add.");
-               this->add();
-       }
-
        __BEGIN_CRITICAL__
 
        if (!this->m_stop)
index 5dcb528..a808ffa 100644 (file)
@@ -27,7 +27,7 @@
 #include <atomic>
 #include <condition_variable>
 #include <functional>
-#include <map>
+#include <vector>
 #include <queue>
 
 namespace Csr {
@@ -35,7 +35,7 @@ namespace Csr {
 class ThreadPool {
 public:
        // worker thread dynamically created / joined from min to max
-       explicit ThreadPool(size_t min, size_t max);
+       explicit ThreadPool(size_t threads);
        virtual ~ThreadPool();
 
        ThreadPool(const ThreadPool &) = delete;
@@ -46,20 +46,13 @@ public:
        // submit task to thread pool
        void submit(std::function<void()> &&task);
 
-       // get workers size in thread pool
-       size_t size(void) const;
-
        // check whether there's running task by worker.
        bool isTaskRunning(void) const;
 
 private:
-       void add(void);
-
-       const size_t m_min;
-       const size_t m_max;
-       std::atomic<bool> m_stop;
+       bool m_stop;
        std::atomic<size_t> m_runningWorkersNum;
-       std::map<std::thread::id, std::thread> m_workers;
+       std::vector<std::thread> m_workers;
        std::queue<std::function<void()>> m_tasks;
        std::mutex m_mutex;
        std::condition_variable m_cv;
index 67edecf..e5ed8e9 100644 (file)
@@ -58,8 +58,7 @@ inline void START_TIME(void)
 
 inline long long int END_TIME(void)
 {
-       return duration_cast<milliseconds>(high_resolution_clock::now() -
-                                                                          _start).count();
+       return duration_cast<milliseconds>(high_resolution_clock::now() - _start).count();
 }
 
 inline void INC_EXPECTED_TIME(long long int t)
@@ -75,8 +74,7 @@ inline void CHECK_TIME(void)
        BOOST_MESSAGE("Elapsed time[" << END_TIME() << "]. "
                                  "Expected scope: (" << _expected << ", " << _expected + PoolLogicUnit << ")");
 
-       BOOST_REQUIRE_MESSAGE(END_TIME() < _expected + PoolLogicUnit,
-                                                 "Too much time elapsed");
+       BOOST_REQUIRE_MESSAGE(END_TIME() < _expected + PoolLogicUnit, "Too much time elapsed");
        BOOST_REQUIRE_MESSAGE(END_TIME() >= _expected, "Too less time elapsed");
 }
 
@@ -90,11 +88,7 @@ void runStaticPool(size_t cnt)
                        std::this_thread::sleep_for(milliseconds(TaskSleepUnit));
                };
 
-               Csr::ThreadPool pool(cnt, cnt);
-
-               BOOST_REQUIRE_MESSAGE(pool.size() == cnt,
-                                                         "Thread pool isn't initialized well. "
-                                                         "Pool size[" << pool.size() << "] and correct size[" << cnt << "]");
+               Csr::ThreadPool pool(cnt);
 
                for (size_t i = 0; i < cnt; i++)
                        pool.submit(task);
@@ -105,57 +99,7 @@ void runStaticPool(size_t cnt)
        CHECK_TIME();
 }
 
-void runDynamicPool(size_t min, size_t max)
-{
-       BOOST_REQUIRE(min < max);
-
-       START_TIME();
-
-       exceptionGuard([&]() {
-               INC_EXPECTED_TIME(TaskSleepUnit);
-               auto task = [&]() {
-                       std::this_thread::sleep_for(milliseconds(TaskSleepUnit));
-               };
-
-               Csr::ThreadPool pool(min, max);
-
-               BOOST_REQUIRE_MESSAGE(pool.size() == min,
-                                                         "Thread pool isn't initialized well. "
-                                                         "Pool size[" << pool.size() << "] and corret min size[" << min << "]");
-
-               // Task assigned to already existing workers
-               for (size_t i = 0; i < min; i++)
-                       pool.submit(task);
-
-               // Task assigned new worker which is dynamically added to pool
-               for (size_t i = min; i < max; i++)
-                       pool.submit(task);
-
-               // wait for expected time to tasks done
-               // additional time for thread pool logic running time
-               INC_EXPECTED_TIME(PoolLogicUnit);
-               std::this_thread::sleep_for(milliseconds(TaskSleepUnit + PoolLogicUnit));
-               BOOST_REQUIRE_MESSAGE(pool.size() == min,
-                                                         "To dtor idle threads in pool doesn't work well. "
-                                                         "Pool size[" << pool.size() << "] shouldn't exceed given min[" << min << "]");
-
-               // make all(maximum) workers busy
-               INC_EXPECTED_TIME(TaskSleepUnit);
-
-               for (size_t i = 0; i < max; i++)
-                       pool.submit(task);
-
-               INC_EXPECTED_TIME(TaskSleepUnit);
-               pool.submit(task); // One more task than maximum workers at the time
-               BOOST_REQUIRE_MESSAGE(pool.size() == max,
-                                                         "Upper bound to make thread dynamically to pool doesn't work well. "
-                                                         "Pool size[" << pool.size() << "] shouldn't exceed given max[" << max << "]");
-       });
-
-       CHECK_TIME();
-}
-
-}
+} // namespace anonymous
 
 BOOST_AUTO_TEST_SUITE(THREADPOOL)
 
@@ -167,14 +111,4 @@ BOOST_AUTO_TEST_CASE(fixed)
        }
 }
 
-BOOST_AUTO_TEST_CASE(dynamic)
-{
-       for (size_t min = 1; min <= 7; min += 2) {
-               for (size_t max = min + 2; max <= 13; max += 2) {
-                       BOOST_MESSAGE("Dynamic ThreadPool size: " << min << " ~ " << max);
-                       runDynamicPool(min, max);
-               }
-       }
-}
-
 BOOST_AUTO_TEST_SUITE_END()