SET(TARGET_CSR_WP_ENGINE_SAMPLE ${SERVICE_NAME}-wp-engine)
SET(TARGET_CSR_TEST ${SERVICE_NAME}-test)
SET(TARGET_CSR_POPUP_TEST ${SERVICE_NAME}-popup-test)
+SET(TARGET_CSR_THREADPOOL_TEST ${SERVICE_NAME}-threadpool-test)
CONFIGURE_FILE(packaging/${SERVICE_NAME}.manifest.in ${SERVICE_NAME}.manifest @ONLY)
CONFIGURE_FILE(packaging/${SERVICE_NAME}-client.manifest.in ${SERVICE_NAME}-client.manifest @ONLY)
%{ro_data_dir}/license/%{name}-test.BSL-1.0
%{bin_dir}/%{service_name}-test
%{bin_dir}/%{service_name}-popup-test
+%{bin_dir}/%{service_name}-threadpool-test
# test resources
%{test_dir}
framework/main/csr-main.cpp
framework/service/logic.cpp
framework/service/server-service.cpp
+ framework/service/thread-pool.cpp
framework/ui/askuser.cpp
# question and response codes needed on both of
ADD_EXECUTABLE(${TARGET_CSR_SERVER} ${${TARGET_CSR_SERVER}_SRCS})
-SET(${TARGET_CSR_SERVER}_LIST_LINK_LIBRARIES
+TARGET_LINK_LIBRARIES(${TARGET_CSR_SERVER}
${${TARGET_CSR_SERVER}_DEP_LIBRARIES}
- -ldl # for dynamic loading engine library
+ ${TARGET_CSR_COMMON}
+ -ldl
-pthread
-pie
)
-SET(${TARGET_CSR_SERVER}_LIST_LINK_LIBRARIES
- ${${TARGET_CSR_SERVER}_LIST_LINK_LIBRARIES}
- ${TARGET_CSR_COMMON}
-)
-
-TARGET_LINK_LIBRARIES(${TARGET_CSR_SERVER} ${${TARGET_CSR_SERVER}_LIST_LINK_LIBRARIES})
-
INSTALL(TARGETS ${TARGET_CSR_SERVER} DESTINATION ${BIN_DIR})
############### CLIENT ###################
VERSION ${VERSION}
)
-SET(${TARGET_CSR_CLIENT}_LIST_LINK_LIBRARIES
+TARGET_LINK_LIBRARIES(${TARGET_CSR_CLIENT}
${${TARGET_CSR_CLIENT}_DEP_LIBRARIES}
-)
-
-SET(${TARGET_CSR_CLIENT}_LIST_LINK_LIBRARIES
- ${${TARGET_CSR_CLIENT}_LIST_LINK_LIBRARIES}
${TARGET_CSR_COMMON}
)
-TARGET_LINK_LIBRARIES(${TARGET_CSR_CLIENT} ${${TARGET_CSR_CLIENT}_LIST_LINK_LIBRARIES})
-
INSTALL(TARGETS ${TARGET_CSR_CLIENT} DESTINATION ${LIB_INSTALL_DIR})
ADD_SUBDIRECTORY(include)
namespace Csr {
-ServerService::ServerService(const std::string &address) : Service(address)
+ServerService::ServerService(const std::string &address) :
+ Service(address),
+ m_workqueue(2, 10)
{
}
{
DEBUG("let's dispatch it to worker threads.");
- auto process = [&]() {
- auto reply = m_logic.dispatch(connection->receive());
-
- connection->send(reply);
+ auto process = [&](RawBuffer &buffer) {
+ connection->send(m_logic.dispatch(buffer));
};
- /* TODO: submit to workqueue */
- process();
+ m_workqueue.submit(std::bind(process, connection->receive()));
}
}
#include "common/service.h"
#include "service/logic.h"
+#include "service/thread-pool.h"
namespace Csr {
virtual void onMessageProcess(const ConnShPtr &) override;
Logic m_logic;
+ ThreadPool m_workqueue;
};
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+/*
+ * @file thread-pool.cpp
+ * @author Jaemin Ryu (jm77.ryu@samsung.com)
+ * Kyungwook Tak (k.tak@samsung.com)
+ * @version 1.0
+ * @brief
+ */
+#include "service/thread-pool.h"
+
+#include <stdexcept>
+#include <utility>
+
+#include "common/audit/logger.h"
+
+#define __BEGIN_CRITICAL__ { std::lock_guard<std::mutex> lock(this->m_mutex);
+#define __END_CRITICAL__ }
+
+namespace Csr {
+
+ThreadPool::ThreadPool(size_t min, size_t max) :
+ m_min(min),
+ m_max(max),
+ m_stop(false)
+{
+ if (m_min > m_max)
+ throw std::logic_error("thread pool MIN shouldn't be bigger than MAX");
+
+ for (size_t i = 0; i < m_min; i++)
+ add();
+
+ DEBUG("Thread pool initialized with [" << m_workers.size() << "] threads");
+}
+
+void ThreadPool::add(void)
+{
+ std::thread t([this]() {
+
+ DEBUG("Thread[" << std::this_thread::get_id() << "] start in pool");
+
+ while (true) {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ m_cv.wait(lock, [this]() {
+ return m_workers.size() > m_min || m_stop || !m_tasks.empty();
+ });
+
+ if (m_stop && m_tasks.empty()) {
+ DEBUG("Thread pool stop requested. "
+ "thread[" << std::this_thread::get_id() << "] returning.");
+ break;
+ }
+
+ if (m_workers.size() > m_min && m_tasks.empty()) {
+ DEBUG("Terminate idle thread[" << std::this_thread::get_id() << "]");
+
+ // move thread itself to me and erase dummy in m_workers
+ std::thread::id currentId = std::this_thread::get_id();
+ m_workers[currentId].detach();
+ m_workers.erase(currentId);
+ break;
+ }
+
+ auto task = std::move(m_tasks.front());
+ m_tasks.pop();
+
+ lock.unlock();
+
+ INFO("Start task on thread[" << std::this_thread::get_id() << "]");
+
+ task();
+ }
+ });
+
+ m_workers[t.get_id()] = std::move(t);
+}
+
+size_t ThreadPool::size()
+{
+ return m_workers.size();
+}
+
+ThreadPool::~ThreadPool()
+{
+ m_stop = true;
+
+ m_cv.notify_all();
+
+ for (auto &worker: m_workers) {
+ if (worker.second.joinable())
+ worker.second.join();
+ }
+}
+
+void ThreadPool::submit(std::function<void()> &&task)
+{
+ if (!m_tasks.empty() && m_workers.size() < m_max) {
+ DEBUG("more workers needed. let's add.");
+ add();
+ }
+
+ __BEGIN_CRITICAL__
+
+ if (!m_stop)
+ m_tasks.emplace(std::move(task));
+
+ __END_CRITICAL__
+
+ m_cv.notify_one();
+}
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+/*
+ * @file thread-pool.h
+ * @author Jaemin Ryu (jm77.ryu@samsung.com)
+ * Kyungwook Tak (k.tak@samsung.com)
+ * @version 1.0
+ * @brief
+ */
+#pragma once
+
+#include <thread>
+#include <mutex>
+#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <map>
+#include <queue>
+
+namespace Csr {
+
+class ThreadPool {
+public:
+ // worker thread dynamically created / joined from min to max
+ explicit ThreadPool(size_t min, size_t max);
+ virtual ~ThreadPool();
+
+ ThreadPool(const ThreadPool &) = delete;
+ ThreadPool &operator=(const ThreadPool &) = delete;
+ ThreadPool(ThreadPool &&) = delete;
+ ThreadPool &operator=(ThreadPool &&) = delete;
+
+ // submit task to thread pool
+ void submit(std::function<void()> &&task);
+
+ // get workers size in thread pool
+ size_t size(void);
+
+private:
+ void add(void);
+
+ const size_t m_min;
+ const size_t m_max;
+ std::atomic<bool> m_stop;
+ std::map<std::thread::id, std::thread> m_workers;
+ std::queue<std::function<void()>> m_tasks;
+ std::mutex m_mutex;
+ std::condition_variable m_cv;
+};
+
+}
${TARGET_CSR_CS_ENGINE_SAMPLE}
${TARGET_CSR_WP_ENGINE_SAMPLE}
${${TARGET_CSR_TEST}_DEP_LIBRARIES}
- boost_unit_test_framework
+ -lboost_unit_test_framework
-ldl
)
INSTALL(DIRECTORY resources/ DESTINATION ${TEST_DIR})
ADD_SUBDIRECTORY(popup)
+ADD_SUBDIRECTORY(thread-pool)
TARGET_LINK_LIBRARIES(${TARGET_CSR_POPUP_TEST}
${TARGET_CSR_COMMON}
- boost_unit_test_framework
+ -lboost_unit_test_framework
-ldl
)
--- /dev/null
+# Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# @file CMakeLists.txt
+# @author Kyungwook Tak (k.tak@samsung.com)
+# @brief build test program of server thread pool
+#
+PKG_CHECK_MODULES(${TARGET_CSR_THREADPOOL_TEST}_DEP
+ REQUIRED
+)
+
+SET(${TARGET_CSR_THREADPOOL_TEST}_SRCS
+ ${PROJECT_SOURCE_DIR}/test/colour_log_formatter.cpp
+ ${PROJECT_SOURCE_DIR}/src/framework/service/thread-pool.cpp
+ test-main.cpp
+ test-thread-pool.cpp
+)
+
+INCLUDE_DIRECTORIES(
+ ${PROJECT_SOURCE_DIR}/src/include
+ ${PROJECT_SOURCE_DIR}/src/framework
+ ${PROJECT_SOURCE_DIR}/test
+ .
+ ${${TARGET_CSR_THREADPOOL_TEST}_DEP_INCLUDE_DIRS}
+)
+
+ADD_EXECUTABLE(${TARGET_CSR_THREADPOOL_TEST} ${${TARGET_CSR_THREADPOOL_TEST}_SRCS})
+
+TARGET_LINK_LIBRARIES(${TARGET_CSR_THREADPOOL_TEST}
+ ${TARGET_CSR_COMMON}
+ -pthread
+ -lboost_unit_test_framework
+ -ldl
+)
+
+INSTALL(TARGETS ${TARGET_CSR_THREADPOOL_TEST} DESTINATION ${BIN_DIR})
--- /dev/null
+/*
+ * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+/*
+ * @file test-main.cpp
+ * @author Kyungwook Tak(k.tak@samsung.com)
+ * @version 1.0
+ * @brief CSR thread pool test main
+ */
+
+#include <boost/test/unit_test.hpp>
+#include <boost/test/unit_test_log.hpp>
+#include <boost/test/results_reporter.hpp>
+#include <colour_log_formatter.h>
+
+struct TestConfig {
+ TestConfig()
+ {
+ boost::unit_test::unit_test_log.set_threshold_level(boost::unit_test::log_test_units);
+ boost::unit_test::results_reporter::set_level(boost::unit_test::SHORT_REPORT);
+ boost::unit_test::unit_test_log.set_formatter(new Csr::Test::colour_log_formatter);
+ }
+};
+
+BOOST_GLOBAL_FIXTURE(TestConfig)
--- /dev/null
+/*
+ * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+/*
+ * @file test-thread-pool.cpp
+ * @author Kyungwook Tak (k.tak@samsung.com)
+ * @version 1.0
+ * @brief CSR threadpool test
+ */
+#define BOOST_TEST_MODULE CSR_THREADPOOL_TEST
+#include "service/thread-pool.h"
+
+#include <iostream>
+#include <string>
+#include <functional>
+#include <chrono>
+#include <thread>
+#include <mutex>
+#include <ctime>
+#include <boost/test/unit_test.hpp>
+
+using namespace std::chrono;
+
+namespace {
+
+auto exceptionGuard = [&](std::function<void()> &&func) {
+ try {
+ func();
+ } catch (...) {
+ BOOST_REQUIRE_MESSAGE(0, "exception shouldn't be thronw.");
+ }
+};
+
+const static long long int TaskSleepUnit = 200; // millisec
+const static long long int PoolLogicUnit = 10; // millisec
+high_resolution_clock::time_point _start;
+long long int _expected;
+std::mutex _m;
+
+// times in milliseconds unit
+inline void START_TIME(void)
+{
+ _expected = 0;
+ _start = high_resolution_clock::now();
+}
+
+inline long long int END_TIME(void)
+{
+ return duration_cast<milliseconds>(high_resolution_clock::now() - _start).count();
+}
+
+inline void INC_EXPECTED_TIME(long long int t)
+{
+ std::lock_guard<std::mutex> l(_m);
+ _expected += t;
+}
+
+inline void CHECK_TIME(void)
+{
+ std::lock_guard<std::mutex> l(_m);
+
+ BOOST_MESSAGE("Elapsed time[" << END_TIME() << "]. "
+ "Expected scope: (" << _expected << ", " << _expected + PoolLogicUnit << ")");
+
+ BOOST_REQUIRE_MESSAGE(END_TIME() < _expected + PoolLogicUnit, "Too much time elapsed");
+ BOOST_REQUIRE_MESSAGE(END_TIME() >= _expected, "Too less time elapsed");
+}
+
+void runStaticPool(size_t cnt)
+{
+ START_TIME();
+
+ exceptionGuard([&]() {
+ INC_EXPECTED_TIME(TaskSleepUnit);
+ auto task = [&]() {
+ std::this_thread::sleep_for(milliseconds(TaskSleepUnit));
+ };
+
+ Csr::ThreadPool pool(cnt, cnt);
+
+ BOOST_REQUIRE_MESSAGE(pool.size() == cnt,
+ "Thread pool isn't initialized well. "
+ "Pool size[" << pool.size() << "] and correct size[" << cnt << "]");
+
+ for (size_t i = 0; i < cnt; i++)
+ pool.submit(task);
+
+ /* thread joins in thread-pool dtor */
+ });
+
+ CHECK_TIME();
+}
+
+void runDynamicPool(size_t min, size_t max)
+{
+ BOOST_REQUIRE(min < max);
+
+ START_TIME();
+
+ exceptionGuard([&]() {
+ INC_EXPECTED_TIME(TaskSleepUnit);
+ auto task = [&]() {
+ std::this_thread::sleep_for(milliseconds(TaskSleepUnit));
+ };
+
+ Csr::ThreadPool pool(min, max);
+
+ BOOST_REQUIRE_MESSAGE(pool.size() == min,
+ "Thread pool isn't initialized well. "
+ "Pool size[" << pool.size() << "] and corret min size[" << min << "]");
+
+ // Task assigned to already existing workers
+ for (size_t i = 0; i < min; i++)
+ pool.submit(task);
+
+ // Task assigned new worker which is dynamically added to pool
+ for (size_t i = min; i < max; i++)
+ pool.submit(task);
+
+ // wait for expected time to tasks done
+ // additional time for thread pool logic running time
+ INC_EXPECTED_TIME(PoolLogicUnit);
+ std::this_thread::sleep_for(milliseconds(TaskSleepUnit + PoolLogicUnit));
+ BOOST_REQUIRE_MESSAGE(pool.size() == min,
+ "To dtor idle threads in pool doesn't work well. "
+ "Pool size[" << pool.size() << "] shouldn't exceed given min[" << min << "]");
+
+ // make all(maximum) workers busy
+ INC_EXPECTED_TIME(TaskSleepUnit);
+ for (size_t i = 0; i < max; i++)
+ pool.submit(task);
+
+ INC_EXPECTED_TIME(TaskSleepUnit);
+ pool.submit(task); // One more task than maximum workers at the time
+ BOOST_REQUIRE_MESSAGE(pool.size() == max,
+ "Upper bound to make thread dynamically to pool doesn't work well. "
+ "Pool size[" << pool.size() << "] shouldn't exceed given max[" << max << "]");
+ });
+
+ CHECK_TIME();
+}
+
+}
+
+BOOST_AUTO_TEST_SUITE(THREADPOOL)
+
+BOOST_AUTO_TEST_CASE(fixed)
+{
+ for (size_t i = 1; i <= 13; i += 2) {
+ BOOST_MESSAGE("Fixed ThreadPool size: " << i);
+ runStaticPool(i);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(dynamic)
+{
+ for (size_t min = 1; min <= 7; min += 2) {
+ for (size_t max = min + 2; max <= 13; max += 2) {
+ BOOST_MESSAGE("Dynamic ThreadPool size: " << min << " ~ " << max);
+ runDynamicPool(min, max);
+ }
+ }
+}
+
+BOOST_AUTO_TEST_SUITE_END()