Add threadpool to server service 45/66345/1
authorKyungwook Tak <k.tak@samsung.com>
Fri, 25 Mar 2016 08:49:35 +0000 (17:49 +0900)
committerKyungwook Tak <k.tak@samsung.com>
Mon, 18 Apr 2016 11:11:59 +0000 (20:11 +0900)
Change-Id: Ic4fa2ef017689def44401f8a027f08b5ccfce414
Signed-off-by: Kyungwook Tak <k.tak@samsung.com>
12 files changed:
CMakeLists.txt
packaging/csr-framework.spec
src/CMakeLists.txt
src/framework/service/server-service.cpp
src/framework/service/server-service.h
src/framework/service/thread-pool.cpp [new file with mode: 0644]
src/framework/service/thread-pool.h [new file with mode: 0644]
test/CMakeLists.txt
test/popup/CMakeLists.txt
test/thread-pool/CMakeLists.txt [new file with mode: 0644]
test/thread-pool/test-main.cpp [new file with mode: 0644]
test/thread-pool/test-thread-pool.cpp [new file with mode: 0644]

index 73f44d0..dd192f3 100644 (file)
@@ -57,6 +57,7 @@ SET(TARGET_CSR_CS_ENGINE_SAMPLE ${SERVICE_NAME}-cs-engine)
 SET(TARGET_CSR_WP_ENGINE_SAMPLE ${SERVICE_NAME}-wp-engine)
 SET(TARGET_CSR_TEST ${SERVICE_NAME}-test)
 SET(TARGET_CSR_POPUP_TEST ${SERVICE_NAME}-popup-test)
+SET(TARGET_CSR_THREADPOOL_TEST ${SERVICE_NAME}-threadpool-test)
 
 CONFIGURE_FILE(packaging/${SERVICE_NAME}.manifest.in ${SERVICE_NAME}.manifest @ONLY)
 CONFIGURE_FILE(packaging/${SERVICE_NAME}-client.manifest.in ${SERVICE_NAME}-client.manifest @ONLY)
index 06a19bf..41865a2 100644 (file)
@@ -202,5 +202,6 @@ fi
 %{ro_data_dir}/license/%{name}-test.BSL-1.0
 %{bin_dir}/%{service_name}-test
 %{bin_dir}/%{service_name}-popup-test
+%{bin_dir}/%{service_name}-threadpool-test
 # test resources
 %{test_dir}
index 95d1e95..f7bbb98 100644 (file)
@@ -28,6 +28,7 @@ SET(${TARGET_CSR_SERVER}_SRCS
        framework/main/csr-main.cpp
        framework/service/logic.cpp
        framework/service/server-service.cpp
+       framework/service/thread-pool.cpp
        framework/ui/askuser.cpp
 
        # question and response codes needed on both of
@@ -52,20 +53,14 @@ SET_SOURCE_FILES_PROPERTIES(${${TARGET_CSR_SERVER}_SRCS}
 
 ADD_EXECUTABLE(${TARGET_CSR_SERVER} ${${TARGET_CSR_SERVER}_SRCS})
 
-SET(${TARGET_CSR_SERVER}_LIST_LINK_LIBRARIES
+TARGET_LINK_LIBRARIES(${TARGET_CSR_SERVER}
        ${${TARGET_CSR_SERVER}_DEP_LIBRARIES}
-       -ldl     # for dynamic loading engine library
+       ${TARGET_CSR_COMMON}
+       -ldl
        -pthread
        -pie
 )
 
-SET(${TARGET_CSR_SERVER}_LIST_LINK_LIBRARIES
-       ${${TARGET_CSR_SERVER}_LIST_LINK_LIBRARIES}
-       ${TARGET_CSR_COMMON}
-)
-
-TARGET_LINK_LIBRARIES(${TARGET_CSR_SERVER} ${${TARGET_CSR_SERVER}_LIST_LINK_LIBRARIES})
-
 INSTALL(TARGETS ${TARGET_CSR_SERVER} DESTINATION ${BIN_DIR})
 
 ###############      CLIENT     ###################
@@ -97,17 +92,11 @@ SET_TARGET_PROPERTIES(${TARGET_CSR_CLIENT}
                VERSION ${VERSION}
 )
 
-SET(${TARGET_CSR_CLIENT}_LIST_LINK_LIBRARIES
+TARGET_LINK_LIBRARIES(${TARGET_CSR_CLIENT}
        ${${TARGET_CSR_CLIENT}_DEP_LIBRARIES}
-)
-
-SET(${TARGET_CSR_CLIENT}_LIST_LINK_LIBRARIES
-       ${${TARGET_CSR_CLIENT}_LIST_LINK_LIBRARIES}
        ${TARGET_CSR_COMMON}
 )
 
-TARGET_LINK_LIBRARIES(${TARGET_CSR_CLIENT} ${${TARGET_CSR_CLIENT}_LIST_LINK_LIBRARIES})
-
 INSTALL(TARGETS ${TARGET_CSR_CLIENT} DESTINATION ${LIB_INSTALL_DIR})
 
 ADD_SUBDIRECTORY(include)
index e445b11..572be50 100644 (file)
@@ -25,7 +25,9 @@
 
 namespace Csr {
 
-ServerService::ServerService(const std::string &address) : Service(address)
+ServerService::ServerService(const std::string &address) :
+       Service(address),
+       m_workqueue(2, 10)
 {
 }
 
@@ -37,14 +39,11 @@ void ServerService::onMessageProcess(const ConnShPtr &connection)
 {
        DEBUG("let's dispatch it to worker threads.");
 
-       auto process = [&]() {
-               auto reply = m_logic.dispatch(connection->receive());
-
-               connection->send(reply);
+       auto process = [&](RawBuffer &buffer) {
+               connection->send(m_logic.dispatch(buffer));
        };
 
-       /* TODO: submit to workqueue */
-       process();
+       m_workqueue.submit(std::bind(process, connection->receive()));
 }
 
 }
index 790dc42..6eb6ae0 100644 (file)
@@ -23,6 +23,7 @@
 
 #include "common/service.h"
 #include "service/logic.h"
+#include "service/thread-pool.h"
 
 namespace Csr {
 
@@ -35,6 +36,7 @@ private:
        virtual void onMessageProcess(const ConnShPtr &) override;
 
        Logic m_logic;
+       ThreadPool m_workqueue;
 };
 
 }
diff --git a/src/framework/service/thread-pool.cpp b/src/framework/service/thread-pool.cpp
new file mode 100644 (file)
index 0000000..5dea019
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ *  Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+/*
+ * @file        thread-pool.cpp
+ * @author      Jaemin Ryu (jm77.ryu@samsung.com)
+ *              Kyungwook Tak (k.tak@samsung.com)
+ * @version     1.0
+ * @brief
+ */
+#include "service/thread-pool.h"
+
+#include <stdexcept>
+#include <utility>
+
+#include "common/audit/logger.h"
+
+#define __BEGIN_CRITICAL__ { std::lock_guard<std::mutex> lock(this->m_mutex);
+#define __END_CRITICAL__   }
+
+namespace Csr {
+
+ThreadPool::ThreadPool(size_t min, size_t max) :
+       m_min(min),
+       m_max(max),
+       m_stop(false)
+{
+       if (m_min > m_max)
+               throw std::logic_error("thread pool MIN shouldn't be bigger than MAX");
+
+       for (size_t i = 0; i < m_min; i++)
+               add();
+
+       DEBUG("Thread pool initialized with [" << m_workers.size() << "] threads");
+}
+
+void ThreadPool::add(void)
+{
+       std::thread t([this]() {
+
+               DEBUG("Thread[" << std::this_thread::get_id() << "] start in pool");
+
+               while (true) {
+                       std::unique_lock<std::mutex> lock(m_mutex);
+                       m_cv.wait(lock, [this]() {
+                               return m_workers.size() > m_min || m_stop || !m_tasks.empty();
+                       });
+
+                       if (m_stop && m_tasks.empty()) {
+                               DEBUG("Thread pool stop requested. "
+                                       "thread[" << std::this_thread::get_id() << "] returning.");
+                               break;
+                       }
+
+                       if (m_workers.size() > m_min && m_tasks.empty()) {
+                               DEBUG("Terminate idle thread[" << std::this_thread::get_id() << "]");
+
+                               // move thread itself to me and erase dummy in m_workers
+                               std::thread::id currentId = std::this_thread::get_id();
+                               m_workers[currentId].detach();
+                               m_workers.erase(currentId);
+                               break;
+                       }
+
+                       auto task = std::move(m_tasks.front());
+                       m_tasks.pop();
+
+                       lock.unlock();
+
+                       INFO("Start task on thread[" << std::this_thread::get_id() << "]");
+
+                       task();
+               }
+       });
+
+       m_workers[t.get_id()] = std::move(t);
+}
+
+size_t ThreadPool::size()
+{
+       return m_workers.size();
+}
+
+ThreadPool::~ThreadPool()
+{
+       m_stop = true;
+
+       m_cv.notify_all();
+
+       for (auto &worker: m_workers) {
+               if (worker.second.joinable())
+                       worker.second.join();
+       }
+}
+
+void ThreadPool::submit(std::function<void()> &&task)
+{
+       if (!m_tasks.empty() && m_workers.size() < m_max) {
+               DEBUG("more workers needed. let's add.");
+               add();
+       }
+
+       __BEGIN_CRITICAL__
+
+       if (!m_stop)
+               m_tasks.emplace(std::move(task));
+
+       __END_CRITICAL__
+
+       m_cv.notify_one();
+}
+
+}
diff --git a/src/framework/service/thread-pool.h b/src/framework/service/thread-pool.h
new file mode 100644 (file)
index 0000000..2197125
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ *  Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+/*
+ * @file        thread-pool.h
+ * @author      Jaemin Ryu (jm77.ryu@samsung.com)
+ *              Kyungwook Tak (k.tak@samsung.com)
+ * @version     1.0
+ * @brief
+ */
+#pragma once
+
+#include <thread>
+#include <mutex>
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <map>
+#include <queue>
+
+namespace Csr {
+
+class ThreadPool {
+public:
+       // worker thread dynamically created / joined from min to max
+       explicit ThreadPool(size_t min, size_t max);
+       virtual ~ThreadPool();
+
+       ThreadPool(const ThreadPool &) = delete;
+       ThreadPool &operator=(const ThreadPool &) = delete;
+       ThreadPool(ThreadPool &&) = delete;
+       ThreadPool &operator=(ThreadPool &&) = delete;
+
+       // submit task to thread pool
+       void submit(std::function<void()> &&task);
+
+       // get workers size in thread pool
+       size_t size(void);
+
+private:
+       void add(void);
+
+       const size_t m_min;
+       const size_t m_max;
+       std::atomic<bool> m_stop;
+       std::map<std::thread::id, std::thread> m_workers;
+       std::queue<std::function<void()>> m_tasks;
+       std::mutex m_mutex;
+       std::condition_variable m_cv;
+};
+
+}
index aa1f420..9b8ada2 100644 (file)
@@ -56,7 +56,7 @@ TARGET_LINK_LIBRARIES(${TARGET_CSR_TEST}
        ${TARGET_CSR_CS_ENGINE_SAMPLE}
        ${TARGET_CSR_WP_ENGINE_SAMPLE}
        ${${TARGET_CSR_TEST}_DEP_LIBRARIES}
-       boost_unit_test_framework
+       -lboost_unit_test_framework
        -ldl
 )
 
@@ -64,3 +64,4 @@ INSTALL(TARGETS ${TARGET_CSR_TEST} DESTINATION ${BIN_DIR})
 INSTALL(DIRECTORY resources/ DESTINATION ${TEST_DIR})
 
 ADD_SUBDIRECTORY(popup)
+ADD_SUBDIRECTORY(thread-pool)
index e2d5a0e..e2649f0 100644 (file)
@@ -40,7 +40,7 @@ ADD_EXECUTABLE(${TARGET_CSR_POPUP_TEST} ${${TARGET_CSR_POPUP_TEST}_SRCS})
 
 TARGET_LINK_LIBRARIES(${TARGET_CSR_POPUP_TEST}
        ${TARGET_CSR_COMMON}
-       boost_unit_test_framework
+       -lboost_unit_test_framework
        -ldl
 )
 
diff --git a/test/thread-pool/CMakeLists.txt b/test/thread-pool/CMakeLists.txt
new file mode 100644 (file)
index 0000000..1f17872
--- /dev/null
@@ -0,0 +1,47 @@
+# Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+# @file        CMakeLists.txt
+# @author      Kyungwook Tak (k.tak@samsung.com)
+# @brief       build test program of server thread pool
+#
+PKG_CHECK_MODULES(${TARGET_CSR_THREADPOOL_TEST}_DEP
+       REQUIRED
+)
+
+SET(${TARGET_CSR_THREADPOOL_TEST}_SRCS
+       ${PROJECT_SOURCE_DIR}/test/colour_log_formatter.cpp
+       ${PROJECT_SOURCE_DIR}/src/framework/service/thread-pool.cpp
+       test-main.cpp
+       test-thread-pool.cpp
+)
+
+INCLUDE_DIRECTORIES(
+       ${PROJECT_SOURCE_DIR}/src/include
+       ${PROJECT_SOURCE_DIR}/src/framework
+       ${PROJECT_SOURCE_DIR}/test
+       .
+       ${${TARGET_CSR_THREADPOOL_TEST}_DEP_INCLUDE_DIRS}
+)
+
+ADD_EXECUTABLE(${TARGET_CSR_THREADPOOL_TEST} ${${TARGET_CSR_THREADPOOL_TEST}_SRCS})
+
+TARGET_LINK_LIBRARIES(${TARGET_CSR_THREADPOOL_TEST}
+       ${TARGET_CSR_COMMON}
+       -pthread
+       -lboost_unit_test_framework
+       -ldl
+)
+
+INSTALL(TARGETS ${TARGET_CSR_THREADPOOL_TEST} DESTINATION ${BIN_DIR})
diff --git a/test/thread-pool/test-main.cpp b/test/thread-pool/test-main.cpp
new file mode 100644 (file)
index 0000000..4534850
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ *  Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+/*
+ * @file       test-main.cpp
+ * @author     Kyungwook Tak(k.tak@samsung.com)
+ * @version    1.0
+ * @brief      CSR thread pool test main
+ */
+
+#include <boost/test/unit_test.hpp>
+#include <boost/test/unit_test_log.hpp>
+#include <boost/test/results_reporter.hpp>
+#include <colour_log_formatter.h>
+
+struct TestConfig {
+       TestConfig()
+       {
+               boost::unit_test::unit_test_log.set_threshold_level(boost::unit_test::log_test_units);
+               boost::unit_test::results_reporter::set_level(boost::unit_test::SHORT_REPORT);
+               boost::unit_test::unit_test_log.set_formatter(new Csr::Test::colour_log_formatter);
+       }
+};
+
+BOOST_GLOBAL_FIXTURE(TestConfig)
diff --git a/test/thread-pool/test-thread-pool.cpp b/test/thread-pool/test-thread-pool.cpp
new file mode 100644 (file)
index 0000000..d282b92
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ *  Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+/*
+ * @file        test-thread-pool.cpp
+ * @author      Kyungwook Tak (k.tak@samsung.com)
+ * @version     1.0
+ * @brief       CSR threadpool test
+ */
+#define BOOST_TEST_MODULE CSR_THREADPOOL_TEST
+#include "service/thread-pool.h"
+
+#include <iostream>
+#include <string>
+#include <functional>
+#include <chrono>
+#include <thread>
+#include <mutex>
+#include <ctime>
+#include <boost/test/unit_test.hpp>
+
+using namespace std::chrono;
+
+namespace {
+
+auto exceptionGuard = [&](std::function<void()> &&func) {
+       try {
+               func();
+       } catch (...) {
+               BOOST_REQUIRE_MESSAGE(0, "exception shouldn't be thronw.");
+       }
+};
+
+const static long long int TaskSleepUnit = 200; // millisec
+const static long long int PoolLogicUnit = 10; // millisec
+high_resolution_clock::time_point _start;
+long long int _expected;
+std::mutex _m;
+
+// times in milliseconds unit
+inline void START_TIME(void)
+{
+       _expected = 0;
+       _start = high_resolution_clock::now();
+}
+
+inline long long int END_TIME(void)
+{
+       return duration_cast<milliseconds>(high_resolution_clock::now() - _start).count();
+}
+
+inline void INC_EXPECTED_TIME(long long int t)
+{
+       std::lock_guard<std::mutex> l(_m);
+       _expected += t;
+}
+
+inline void CHECK_TIME(void)
+{
+       std::lock_guard<std::mutex> l(_m);
+
+       BOOST_MESSAGE("Elapsed time[" << END_TIME() << "]. "
+               "Expected scope: (" << _expected << ", " << _expected + PoolLogicUnit << ")");
+
+       BOOST_REQUIRE_MESSAGE(END_TIME() < _expected + PoolLogicUnit, "Too much time elapsed");
+       BOOST_REQUIRE_MESSAGE(END_TIME() >= _expected, "Too less time elapsed");
+}
+
+void runStaticPool(size_t cnt)
+{
+       START_TIME();
+
+       exceptionGuard([&]() {
+               INC_EXPECTED_TIME(TaskSleepUnit);
+               auto task = [&]() {
+                       std::this_thread::sleep_for(milliseconds(TaskSleepUnit));
+               };
+
+               Csr::ThreadPool pool(cnt, cnt);
+
+               BOOST_REQUIRE_MESSAGE(pool.size() == cnt,
+                       "Thread pool isn't initialized well. "
+                       "Pool size[" << pool.size() << "] and correct size[" << cnt << "]");
+
+               for (size_t i = 0; i < cnt; i++)
+                       pool.submit(task);
+
+               /* thread joins in thread-pool dtor */
+       });
+
+       CHECK_TIME();
+}
+
+void runDynamicPool(size_t min, size_t max)
+{
+       BOOST_REQUIRE(min < max);
+
+       START_TIME();
+
+       exceptionGuard([&]() {
+               INC_EXPECTED_TIME(TaskSleepUnit);
+               auto task = [&]() {
+                       std::this_thread::sleep_for(milliseconds(TaskSleepUnit));
+               };
+
+               Csr::ThreadPool pool(min, max);
+
+               BOOST_REQUIRE_MESSAGE(pool.size() == min,
+                       "Thread pool isn't initialized well. "
+                       "Pool size[" << pool.size() << "] and corret min size[" << min << "]");
+
+               // Task assigned to already existing workers
+               for (size_t i = 0; i < min; i++)
+                       pool.submit(task);
+
+               // Task assigned new worker which is dynamically added to pool
+               for (size_t i = min; i < max; i++)
+                       pool.submit(task);
+
+               // wait for expected time to tasks done
+               // additional time for thread pool logic running time
+               INC_EXPECTED_TIME(PoolLogicUnit);
+               std::this_thread::sleep_for(milliseconds(TaskSleepUnit + PoolLogicUnit));
+               BOOST_REQUIRE_MESSAGE(pool.size() == min,
+                       "To dtor idle threads in pool doesn't work well. "
+                       "Pool size[" << pool.size() << "] shouldn't exceed given min[" << min << "]");
+
+               // make all(maximum) workers busy
+               INC_EXPECTED_TIME(TaskSleepUnit);
+               for (size_t i = 0; i < max; i++)
+                       pool.submit(task);
+
+               INC_EXPECTED_TIME(TaskSleepUnit);
+               pool.submit(task); // One more task than maximum workers at the time
+               BOOST_REQUIRE_MESSAGE(pool.size() == max,
+                       "Upper bound to make thread dynamically to pool doesn't work well. "
+                       "Pool size[" << pool.size() << "] shouldn't exceed given max[" << max << "]");
+       });
+
+       CHECK_TIME();
+}
+
+}
+
+BOOST_AUTO_TEST_SUITE(THREADPOOL)
+
+BOOST_AUTO_TEST_CASE(fixed)
+{
+       for (size_t i = 1; i <= 13; i += 2) {
+               BOOST_MESSAGE("Fixed ThreadPool size: " << i);
+               runStaticPool(i);
+       }
+}
+
+BOOST_AUTO_TEST_CASE(dynamic)
+{
+       for (size_t min = 1; min <= 7; min += 2) {
+               for (size_t max = min + 2; max <= 13; max += 2) {
+                       BOOST_MESSAGE("Dynamic ThreadPool size: " << min << " ~ " << max);
+                       runDynamicPool(min, max);
+               }
+       }
+}
+
+BOOST_AUTO_TEST_SUITE_END()