}
-ServerService::ServerService() :
- Service(),
- m_workqueue(2, 10)
+ServerService::ServerService() : Service(), m_workqueue(5)
{
this->m_db = std::make_shared<Db::Manager>(RW_DBSPACE "/.csr.db", RO_DBSPACE);
#include "common/audit/logger.h"
#include "common/exception.h"
-#define __BEGIN_CRITICAL__ { std::lock_guard<std::mutex> lock(this->m_mutex);
+#define __BEGIN_CRITICAL__ { std::unique_lock<std::mutex> lock(this->m_mutex);
#define __END_CRITICAL__ }
namespace Csr {
-ThreadPool::ThreadPool(size_t min, size_t max) :
- m_min(min),
- m_max(max),
- m_stop(false),
- m_runningWorkersNum(0)
+ThreadPool::ThreadPool(size_t threads) : m_stop(false), m_runningWorkersNum(0)
{
- if (this->m_min > this->m_max)
- ThrowExc(CSR_ERROR_SERVER, "thread pool MIN shouldn't be bigger than MAX");
+ for (size_t i = 0; i < threads; ++i) {
+ this->m_workers.emplace_back([this] {
+ INFO("Thread[" << std::this_thread::get_id() << "] start in pool");
- for (size_t i = 0; i < this->m_min; i++)
- this->add();
+ while (true) {
+ std::function<void()> task;
- DEBUG("Thread pool initialized with [" << this->m_workers.size() << "] threads");
-}
-
-void ThreadPool::add(void)
-{
- std::thread t([this]() {
- DEBUG("Thread[" << std::this_thread::get_id() << "] start in pool");
-
- while (true) {
- std::unique_lock<std::mutex> lock(this->m_mutex);
- this->m_cv.wait(lock, [this]() {
- return this->m_workers.size() > this->m_min ||
- this->m_stop ||
- !this->m_tasks.empty();
- });
-
- if (this->m_stop && this->m_tasks.empty()) {
- DEBUG("Thread pool stop requested. "
- "thread[" << std::this_thread::get_id() << "] returning.");
- break;
- }
-
- if (this->m_workers.size() > this->m_min && this->m_tasks.empty()) {
- DEBUG("Terminate idle thread[" << std::this_thread::get_id() << "]");
-
- // move thread itself to me and erase dummy in this->m_workers
- auto currentId = std::this_thread::get_id();
- this->m_workers[currentId].detach();
- this->m_workers.erase(currentId);
- break;
- }
-
- auto task = std::move(this->m_tasks.front());
- this->m_tasks.pop();
+ __BEGIN_CRITICAL__
+ this->m_cv.wait(lock, [this]() {
+ return this->m_stop || !this->m_tasks.empty();
+ });
- lock.unlock();
+ if (this->m_stop && this->m_tasks.empty()) {
+ INFO("Thread pool stop requested. "
+ "thread[" << std::this_thread::get_id() << "] returning.");
+ break;
+ }
- INFO("Start task on thread[" << std::this_thread::get_id() << "]");
+ task = std::move(this->m_tasks.front());
+ this->m_tasks.pop();
+ __END_CRITICAL__
- ++this->m_runningWorkersNum;
+ DEBUG("Start task on thread[" << std::this_thread::get_id() << "]");
- task();
+ ++this->m_runningWorkersNum;
- --this->m_runningWorkersNum;
- }
- });
+ task();
- this->m_workers[t.get_id()] = std::move(t);
-}
+ --this->m_runningWorkersNum;
+ }
+ });
+ }
-size_t ThreadPool::size() const
-{
- return this->m_workers.size();
+ DEBUG("Thread pool initialized with [" << this->m_workers.size() << "] threads");
}
bool ThreadPool::isTaskRunning() const
ThreadPool::~ThreadPool()
{
+ __BEGIN_CRITICAL__
this->m_stop = true;
+ __END_CRITICAL__
this->m_cv.notify_all();
for (auto &worker : this->m_workers) {
- if (worker.second.joinable())
- worker.second.join();
+ if (worker.joinable())
+ worker.join();
}
}
void ThreadPool::submit(std::function<void()> &&task)
{
- if (!this->m_tasks.empty() && this->m_workers.size() < this->m_max) {
- DEBUG("more workers needed. let's add.");
- this->add();
- }
-
__BEGIN_CRITICAL__
if (!this->m_stop)
#include <atomic>
#include <condition_variable>
#include <functional>
-#include <map>
+#include <vector>
#include <queue>
namespace Csr {
class ThreadPool {
public:
// worker thread dynamically created / joined from min to max
- explicit ThreadPool(size_t min, size_t max);
+ explicit ThreadPool(size_t threads);
virtual ~ThreadPool();
ThreadPool(const ThreadPool &) = delete;
// submit task to thread pool
void submit(std::function<void()> &&task);
- // get workers size in thread pool
- size_t size(void) const;
-
// check whether there's running task by worker.
bool isTaskRunning(void) const;
private:
- void add(void);
-
- const size_t m_min;
- const size_t m_max;
- std::atomic<bool> m_stop;
+ bool m_stop;
std::atomic<size_t> m_runningWorkersNum;
- std::map<std::thread::id, std::thread> m_workers;
+ std::vector<std::thread> m_workers;
std::queue<std::function<void()>> m_tasks;
std::mutex m_mutex;
std::condition_variable m_cv;
inline long long int END_TIME(void)
{
- return duration_cast<milliseconds>(high_resolution_clock::now() -
- _start).count();
+ return duration_cast<milliseconds>(high_resolution_clock::now() - _start).count();
}
inline void INC_EXPECTED_TIME(long long int t)
BOOST_MESSAGE("Elapsed time[" << END_TIME() << "]. "
"Expected scope: (" << _expected << ", " << _expected + PoolLogicUnit << ")");
- BOOST_REQUIRE_MESSAGE(END_TIME() < _expected + PoolLogicUnit,
- "Too much time elapsed");
+ BOOST_REQUIRE_MESSAGE(END_TIME() < _expected + PoolLogicUnit, "Too much time elapsed");
BOOST_REQUIRE_MESSAGE(END_TIME() >= _expected, "Too less time elapsed");
}
std::this_thread::sleep_for(milliseconds(TaskSleepUnit));
};
- Csr::ThreadPool pool(cnt, cnt);
-
- BOOST_REQUIRE_MESSAGE(pool.size() == cnt,
- "Thread pool isn't initialized well. "
- "Pool size[" << pool.size() << "] and correct size[" << cnt << "]");
+ Csr::ThreadPool pool(cnt);
for (size_t i = 0; i < cnt; i++)
pool.submit(task);
CHECK_TIME();
}
-void runDynamicPool(size_t min, size_t max)
-{
- BOOST_REQUIRE(min < max);
-
- START_TIME();
-
- exceptionGuard([&]() {
- INC_EXPECTED_TIME(TaskSleepUnit);
- auto task = [&]() {
- std::this_thread::sleep_for(milliseconds(TaskSleepUnit));
- };
-
- Csr::ThreadPool pool(min, max);
-
- BOOST_REQUIRE_MESSAGE(pool.size() == min,
- "Thread pool isn't initialized well. "
- "Pool size[" << pool.size() << "] and corret min size[" << min << "]");
-
- // Task assigned to already existing workers
- for (size_t i = 0; i < min; i++)
- pool.submit(task);
-
- // Task assigned new worker which is dynamically added to pool
- for (size_t i = min; i < max; i++)
- pool.submit(task);
-
- // wait for expected time to tasks done
- // additional time for thread pool logic running time
- INC_EXPECTED_TIME(PoolLogicUnit);
- std::this_thread::sleep_for(milliseconds(TaskSleepUnit + PoolLogicUnit));
- BOOST_REQUIRE_MESSAGE(pool.size() == min,
- "To dtor idle threads in pool doesn't work well. "
- "Pool size[" << pool.size() << "] shouldn't exceed given min[" << min << "]");
-
- // make all(maximum) workers busy
- INC_EXPECTED_TIME(TaskSleepUnit);
-
- for (size_t i = 0; i < max; i++)
- pool.submit(task);
-
- INC_EXPECTED_TIME(TaskSleepUnit);
- pool.submit(task); // One more task than maximum workers at the time
- BOOST_REQUIRE_MESSAGE(pool.size() == max,
- "Upper bound to make thread dynamically to pool doesn't work well. "
- "Pool size[" << pool.size() << "] shouldn't exceed given max[" << max << "]");
- });
-
- CHECK_TIME();
-}
-
-}
+} // namespace anonymous
BOOST_AUTO_TEST_SUITE(THREADPOOL)
}
}
-BOOST_AUTO_TEST_CASE(dynamic)
-{
- for (size_t min = 1; min <= 7; min += 2) {
- for (size_t max = min + 2; max <= 13; max += 2) {
- BOOST_MESSAGE("Dynamic ThreadPool size: " << min << " ~ " << max);
- runDynamicPool(min, max);
- }
- }
-}
-
BOOST_AUTO_TEST_SUITE_END()