From 8f92cd9ec0ab3c34ac8add4c9b6e8f2a31227cb5 Mon Sep 17 00:00:00 2001
From: Piotr Bartosiewicz
Date: Thu, 11 Dec 2014 17:54:13 +0100
Subject: [PATCH] Worker thread utility class
[Bug/Feature] A utility class that wraps a queue of tasks executed in a
dedicated thread.
[Cause] N/A
[Solution] N/A
[Verification] Build, install, run tests
Change-Id: I32788fd6321c2877cf4dafe7213c1a140c1d3fd2
---
common/utils/counting-map.hpp | 88 ++++++++++++++
common/utils/worker.cpp | 186 +++++++++++++++++++++++++++++
common/utils/worker.hpp | 74 ++++++++++++
server/zone.cpp | 37 ++----
server/zone.hpp | 13 +-
server/zones-manager.cpp | 39 +++---
server/zones-manager.hpp | 2 +
tests/unit_tests/server/ut-zone.cpp | 9 +-
tests/unit_tests/utils/ut-counting-map.cpp | 81 +++++++++++++
tests/unit_tests/utils/ut-worker.cpp | 182 ++++++++++++++++++++++++++++
10 files changed, 656 insertions(+), 55 deletions(-)
create mode 100644 common/utils/counting-map.hpp
create mode 100644 common/utils/worker.cpp
create mode 100644 common/utils/worker.hpp
create mode 100644 tests/unit_tests/utils/ut-counting-map.cpp
create mode 100644 tests/unit_tests/utils/ut-worker.cpp
diff --git a/common/utils/counting-map.hpp b/common/utils/counting-map.hpp
new file mode 100644
index 0000000..da5f4c7
--- /dev/null
+++ b/common/utils/counting-map.hpp
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief Counting map
+ */
+
+#ifndef COMMON_UTILS_COUNTING_MAP_HPP
+#define COMMON_UTILS_COUNTING_MAP_HPP
+
+#include
+
+namespace vasum {
+namespace utils {
+
+
+/**
+ * Structure used to count elements.
+ * It's like multiset + count but is more efficient.
+ */
+template
+class CountingMap {
+public:
+ size_t increment(const Key& key)
+ {
+ auto res = mMap.insert(typename Map::value_type(key, 1));
+ if (!res.second) {
+ ++res.first->second;
+ }
+ return res.first->second;
+ }
+
+ size_t decrement(const Key& key)
+ {
+ auto it = mMap.find(key);
+ if (it == mMap.end()) {
+ return 0;
+ }
+ if (--it->second == 0) {
+ mMap.erase(it);
+ return 0;
+ }
+ return it->second;
+ }
+
+ void clear()
+ {
+ mMap.clear();
+ }
+
+ size_t get(const Key& key) const
+ {
+ auto it = mMap.find(key);
+ return it == mMap.end() ? 0 : it->second;
+ }
+
+ bool empty() const
+ {
+ return mMap.empty();
+ }
+private:
+ typedef std::unordered_map Map;
+ Map mMap;
+};
+
+
+} // namespace utils
+} // namespace vasum
+
+
+#endif // COMMON_UTILS_COUNTING_MAP_HPP
diff --git a/common/utils/worker.cpp b/common/utils/worker.cpp
new file mode 100644
index 0000000..2cb3284
--- /dev/null
+++ b/common/utils/worker.cpp
@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief A worker thread that executes tasks
+ */
+
+#include "config.hpp"
+#include "utils/worker.hpp"
+#include "utils/counting-map.hpp"
+#include "logger/logger.hpp"
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+
+namespace vasum {
+namespace utils {
+
+
+class Worker::WorkerQueue {
+public:
+ WorkerQueue()
+ : mLastGroupID(0), mEnding(false)
+ {
+ LOGT("Worker queue created");
+ }
+
+ ~WorkerQueue()
+ {
+ {
+ Lock lock(mMutex);
+ assert(mTaskQueue.empty());
+ assert(mGroupCounter.empty());
+ mEnding = true;
+ }
+ if (mThread.joinable()) {
+ mAddedCondition.notify_all();
+ mThread.join();
+ }
+ LOGT("Worker queue destroyed");
+ }
+
+ GroupID getNextGroupID()
+ {
+ return ++mLastGroupID;
+ }
+
+ void addTask(const Worker::Task& task, GroupID groupID)
+ {
+ assert(task);
+
+ Lock lock(mMutex);
+ LOGT("Adding task to subgroup " << groupID);
+ mTaskQueue.push_back(TaskInfo{task, groupID});
+ mGroupCounter.increment(groupID);
+ mAddedCondition.notify_one();
+ if (!mThread.joinable()) {
+ mThread = std::thread(&WorkerQueue::workerProc, this);
+ }
+ }
+
+ void waitForGroupEmpty(GroupID groupID)
+ {
+ Lock lock(mMutex);
+ size_t count = mGroupCounter.get(groupID);
+ if (count > 0) {
+ LOGD("Waiting for " << count << " task in group " << groupID);
+ }
+ mEmptyGroupCondition.wait(lock, [this, groupID] {
+ return mGroupCounter.get(groupID) == 0;
+ });
+ }
+private:
+ typedef std::unique_lock Lock;
+
+ struct TaskInfo {
+ Worker::Task task;
+ GroupID groupID;
+ };
+
+ std::atomic mLastGroupID;
+ std::condition_variable mAddedCondition;
+ std::condition_variable mEmptyGroupCondition;
+ std::thread mThread;
+
+ std::mutex mMutex; // protects below member variables:
+ bool mEnding;
+ std::deque mTaskQueue;
+ CountingMap mGroupCounter;
+
+ void workerProc()
+ {
+ LOGT("Worker thread started");
+ for (;;) {
+ // wait for a task
+ GroupID groupID;
+ {
+ Lock lock(mMutex);
+ mAddedCondition.wait(lock, [this] {
+ return !mTaskQueue.empty() || mEnding;
+ });
+ if (mTaskQueue.empty()) {
+ break;
+ }
+ TaskInfo taskInfo = std::move(mTaskQueue.front());
+ mTaskQueue.pop_front();
+
+ lock.unlock();
+
+ // execute
+ execute(taskInfo);
+ groupID = taskInfo.groupID;
+ }
+ // remove from queue
+ {
+ Lock lock(mMutex);
+ if (mGroupCounter.decrement(groupID) == 0) {
+ mEmptyGroupCondition.notify_all();
+ }
+ }
+ }
+ LOGT("Worker thread exited");
+ }
+
+ void execute(const TaskInfo& taskInfo)
+ {
+ try {
+ LOGT("Executing task from subgroup " << taskInfo.groupID);
+ taskInfo.task();
+ } catch (const std::exception& e) {
+ LOGE("Unexpected exception while executing task: " << e.what());
+ }
+ }
+};
+
+
+Worker::Pointer Worker::create()
+{
+ return Pointer(new Worker(std::make_shared()));
+}
+
+Worker::Worker(const std::shared_ptr& workerQueue)
+ : mWorkerQueue(workerQueue), mGroupID(workerQueue->getNextGroupID())
+{
+}
+
+Worker::~Worker()
+{
+ mWorkerQueue->waitForGroupEmpty(mGroupID);
+}
+
+Worker::Pointer Worker::createSubWorker()
+{
+ return Pointer(new Worker(mWorkerQueue));
+}
+
+void Worker::addTask(const Task& task)
+{
+ mWorkerQueue->addTask(task, mGroupID);
+}
+
+
+} // namespace utils
+} // namespace vasum
diff --git a/common/utils/worker.hpp b/common/utils/worker.hpp
new file mode 100644
index 0000000..0d951fb
--- /dev/null
+++ b/common/utils/worker.hpp
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief A worker thread that executes tasks
+ */
+
+#ifndef COMMON_UTILS_WORKER_HPP
+#define COMMON_UTILS_WORKER_HPP
+
+#include
+#include
+
+namespace vasum {
+namespace utils {
+
+/**
+ * A queue with tasks executed in a dedicated thread.
+ * Current implementation creates a thread on the first use.
+ */
+class Worker {
+public:
+ typedef std::shared_ptr Pointer;
+ typedef std::function Task;
+
+ ~Worker();
+
+ /**
+ * Creates a worker with its own thread
+ */
+ static Pointer create();
+
+ /**
+ * Creates a worker that share a thread with its parent
+ */
+ Pointer createSubWorker();
+
+ /**
+ * Adds a task to the queue.
+ */
+ void addTask(const Task& task);
+
+private:
+ typedef unsigned int GroupID;
+ class WorkerQueue;
+
+ const std::shared_ptr mWorkerQueue;
+ const GroupID mGroupID;
+
+ Worker(const std::shared_ptr& workerQueue);
+};
+
+} // namespace utils
+} // namespace vasum
+
+
+#endif // COMMON_UTILS_WORKER_HPP
diff --git a/server/zone.cpp b/server/zone.cpp
index 2e4573c..5f6dce0 100644
--- a/server/zone.cpp
+++ b/server/zone.cpp
@@ -65,10 +65,12 @@ void declareUnit(const std::string& file, ZoneProvisioning::Unit&& unit)
} // namespace
-Zone::Zone(const std::string& zonesPath,
- const std::string& zoneConfigPath,
- const std::string& lxcTemplatePrefix,
- const std::string& baseRunMountPointPath)
+Zone::Zone(const utils::Worker::Pointer& worker,
+ const std::string& zonesPath,
+ const std::string& zoneConfigPath,
+ const std::string& lxcTemplatePrefix,
+ const std::string& baseRunMountPointPath)
+ : mWorker(worker)
{
config::loadFromFile(zoneConfigPath, mConfig);
@@ -92,19 +94,9 @@ Zone::~Zone()
{
// Make sure all OnNameLostCallbacks get finished and no new will
// get called before proceeding further. This guarantees no race
- // condition on the mReconnectThread.
- {
- Lock lock(mReconnectMutex);
- disconnect();
- }
-
- if (mReconnectThread.joinable()) {
- mReconnectThread.join();
- }
-
- if (mStartThread.joinable()) {
- mStartThread.join();
- }
+ // condition on the reconnect thread.
+ Lock lock(mReconnectMutex);
+ disconnect();
}
const std::vector& Zone::getPermittedToSend() const
@@ -146,10 +138,6 @@ void Zone::start()
void Zone::startAsync(const StartAsyncResultCallback& callback)
{
- if (mStartThread.joinable()) {
- mStartThread.join();
- }
-
auto startWrapper = [this, callback]() {
bool succeeded = false;
@@ -165,7 +153,7 @@ void Zone::startAsync(const StartAsyncResultCallback& callback)
}
};
- mStartThread = std::thread(startWrapper);
+ mWorker->addTask(startWrapper);
}
void Zone::stop()
@@ -300,10 +288,7 @@ void Zone::onNameLostCallback()
{
LOGI(getId() << ": A connection to the DBUS server has been lost, reconnecting...");
- if (mReconnectThread.joinable()) {
- mReconnectThread.join();
- }
- mReconnectThread = std::thread(std::bind(&Zone::reconnectHandler, this));
+ mWorker->addTask(std::bind(&Zone::reconnectHandler, this));
}
void Zone::reconnectHandler()
diff --git a/server/zone.hpp b/server/zone.hpp
index 5ee95af..1fdc588 100644
--- a/server/zone.hpp
+++ b/server/zone.hpp
@@ -30,6 +30,7 @@
#include "zone-admin.hpp"
#include "zone-connection.hpp"
#include "zone-connection-transport.hpp"
+#include "utils/worker.hpp"
#include
#include
@@ -50,10 +51,11 @@ public:
* @param lxcTemplatePrefix directory where templates are stored
* @param baseRunMountPointPath base directory for run mount point
*/
- Zone(const std::string& zonesPath,
- const std::string& zoneConfigPath,
- const std::string& lxcTemplatePrefix,
- const std::string& baseRunMountPointPath);
+ Zone(const utils::Worker::Pointer& worker,
+ const std::string& zonesPath,
+ const std::string& zoneConfigPath,
+ const std::string& lxcTemplatePrefix,
+ const std::string& baseRunMountPointPath);
Zone(Zone&&) = default;
virtual ~Zone();
@@ -253,14 +255,13 @@ public:
const std::string& target);
private:
+ utils::Worker::Pointer mWorker;
ZoneConfig mConfig;
std::vector mPermittedToSend;
std::vector mPermittedToRecv;
std::unique_ptr mConnectionTransport;
std::unique_ptr mAdmin;
std::unique_ptr mConnection;
- std::thread mReconnectThread;
- std::thread mStartThread;
mutable std::recursive_mutex mReconnectMutex;
NotifyActiveZoneCallback mNotifyCallback;
DisplayOffCallback mDisplayOffCallback;
diff --git a/server/zones-manager.cpp b/server/zones-manager.cpp
index 402fc7f..2ce03d7 100644
--- a/server/zones-manager.cpp
+++ b/server/zones-manager.cpp
@@ -74,7 +74,8 @@ const unsigned int ZONE_IP_BASE_THIRD_OCTET = 100;
} // namespace
-ZonesManager::ZonesManager(const std::string& managerConfigPath): mDetachOnExit(false)
+ZonesManager::ZonesManager(const std::string& managerConfigPath)
+ : mWorker(utils::Worker::create()), mDetachOnExit(false)
{
LOGD("Instantiating ZonesManager object...");
@@ -168,32 +169,33 @@ void ZonesManager::createZone(const std::string& zoneConfig)
std::string zoneConfigPath = utils::getAbsolutePath(zoneConfig, baseConfigPath);
LOGT("Creating Zone " << zoneConfigPath);
- std::unique_ptr c(new Zone(mConfig.zonesPath,
- zoneConfigPath,
- mConfig.lxcTemplatePrefix,
- mConfig.runMountPointPrefix));
- const std::string id = c->getId();
+ std::unique_ptr zone(new Zone(mWorker->createSubWorker(),
+ mConfig.zonesPath,
+ zoneConfigPath,
+ mConfig.lxcTemplatePrefix,
+ mConfig.runMountPointPrefix));
+ const std::string id = zone->getId();
if (id == HOST_ID) {
throw ZoneOperationException("Cannot use reserved zone ID");
}
using namespace std::placeholders;
- c->setNotifyActiveZoneCallback(bind(&ZonesManager::notifyActiveZoneHandler,
- this, id, _1, _2));
+ zone->setNotifyActiveZoneCallback(bind(&ZonesManager::notifyActiveZoneHandler,
+ this, id, _1, _2));
- c->setDisplayOffCallback(bind(&ZonesManager::displayOffHandler,
- this, id));
+ zone->setDisplayOffCallback(bind(&ZonesManager::displayOffHandler,
+ this, id));
- c->setFileMoveRequestCallback(bind(&ZonesManager::handleZoneMoveFileRequest,
- this, id, _1, _2, _3));
+ zone->setFileMoveRequestCallback(bind(&ZonesManager::handleZoneMoveFileRequest,
+ this, id, _1, _2, _3));
- c->setProxyCallCallback(bind(&ZonesManager::handleProxyCall,
- this, id, _1, _2, _3, _4, _5, _6, _7));
+ zone->setProxyCallCallback(bind(&ZonesManager::handleProxyCall,
+ this, id, _1, _2, _3, _4, _5, _6, _7));
- c->setDbusStateChangedCallback(bind(&ZonesManager::handleDbusStateChanged,
- this, id, _1));
+ zone->setDbusStateChangedCallback(bind(&ZonesManager::handleDbusStateChanged,
+ this, id, _1));
- mZones.insert(ZoneMap::value_type(id, std::move(c)));
+ mZones.insert(ZoneMap::value_type(id, std::move(zone)));
// after zone is created successfully, put a file informing that zones are enabled
if (mZones.size() == 1) {
@@ -836,8 +838,7 @@ void ZonesManager::handleDestroyZoneCall(const std::string& id,
result->setVoid();
};
- std::thread thread(destroyer);
- thread.detach(); //TODO fix it
+ mWorker->addTask(destroyer);
}
void ZonesManager::handleLockZoneCall(const std::string& id,
diff --git a/server/zones-manager.hpp b/server/zones-manager.hpp
index 2fc0305..b58da86 100644
--- a/server/zones-manager.hpp
+++ b/server/zones-manager.hpp
@@ -31,6 +31,7 @@
#include "host-connection.hpp"
#include "input-monitor.hpp"
#include "proxy-call-policy.hpp"
+#include "utils/worker.hpp"
#include
#include
@@ -105,6 +106,7 @@ public:
void setZonesDetachOnExit();
private:
+ utils::Worker::Pointer mWorker;
ZonesManagerConfig mConfig;
std::string mConfigPath;
HostConnection mHostConnection;
diff --git a/tests/unit_tests/server/ut-zone.cpp b/tests/unit_tests/server/ut-zone.cpp
index 7d80d66..80e73da 100644
--- a/tests/unit_tests/server/ut-zone.cpp
+++ b/tests/unit_tests/server/ut-zone.cpp
@@ -63,10 +63,11 @@ struct Fixture {
std::unique_ptr create(const std::string& configPath)
{
- return std::unique_ptr(new Zone(ZONES_PATH,
- configPath,
- LXC_TEMPLATES_PATH,
- ""));
+ return std::unique_ptr(new Zone(utils::Worker::create(),
+ ZONES_PATH,
+ configPath,
+ LXC_TEMPLATES_PATH,
+ ""));
}
void ensureStarted()
diff --git a/tests/unit_tests/utils/ut-counting-map.cpp b/tests/unit_tests/utils/ut-counting-map.cpp
new file mode 100644
index 0000000..702470f
--- /dev/null
+++ b/tests/unit_tests/utils/ut-counting-map.cpp
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief Unit tests of counting map
+ */
+
+#include "config.hpp"
+#include "ut.hpp"
+
+#include "utils/counting-map.hpp"
+
+BOOST_AUTO_TEST_SUITE(CountingMapSuite)
+
+using namespace vasum::utils;
+
+BOOST_AUTO_TEST_CASE(CountingTest)
+{
+ CountingMap map;
+
+ BOOST_CHECK(map.empty());
+ BOOST_CHECK_EQUAL(0, map.get("ala"));
+
+ BOOST_CHECK_EQUAL(1, map.increment("ala"));
+ BOOST_CHECK_EQUAL(1, map.increment("ma"));
+
+ BOOST_CHECK(!map.empty());
+ BOOST_CHECK_EQUAL(1, map.get("ala"));
+ BOOST_CHECK_EQUAL(1, map.get("ma"));
+ BOOST_CHECK_EQUAL(0, map.get("kota"));
+
+ BOOST_CHECK_EQUAL(2, map.increment("ala"));
+ BOOST_CHECK_EQUAL(2, map.increment("ma"));
+ BOOST_CHECK_EQUAL(3, map.increment("ma"));
+
+ BOOST_CHECK(!map.empty());
+ BOOST_CHECK_EQUAL(2, map.get("ala"));
+ BOOST_CHECK_EQUAL(3, map.get("ma"));
+ BOOST_CHECK_EQUAL(0, map.get("kota"));
+
+ BOOST_CHECK_EQUAL(1, map.decrement("ala"));
+ BOOST_CHECK_EQUAL(0, map.decrement("kota"));
+
+ BOOST_CHECK(!map.empty());
+ BOOST_CHECK_EQUAL(1, map.get("ala"));
+ BOOST_CHECK_EQUAL(3, map.get("ma"));
+ BOOST_CHECK_EQUAL(0, map.get("kota"));
+
+ BOOST_CHECK_EQUAL(0, map.decrement("ala"));
+
+ BOOST_CHECK(!map.empty());
+ BOOST_CHECK_EQUAL(0, map.get("ala"));
+ BOOST_CHECK_EQUAL(3, map.get("ma"));
+ BOOST_CHECK_EQUAL(0, map.get("kota"));
+
+ BOOST_CHECK_EQUAL(2, map.decrement("ma"));
+ BOOST_CHECK_EQUAL(1, map.decrement("ma"));
+ BOOST_CHECK_EQUAL(0, map.decrement("ma"));
+
+ BOOST_CHECK(map.empty());
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/tests/unit_tests/utils/ut-worker.cpp b/tests/unit_tests/utils/ut-worker.cpp
new file mode 100644
index 0000000..280889b
--- /dev/null
+++ b/tests/unit_tests/utils/ut-worker.cpp
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief Unit tests of worker thread
+ */
+
+#include "config.hpp"
+#include "ut.hpp"
+
+#include "utils/worker.hpp"
+#include "utils/latch.hpp"
+
+#include
+#include
+#include
+
+BOOST_AUTO_TEST_SUITE(WorkerSuite)
+
+using namespace vasum::utils;
+
+const int unsigned TIMEOUT = 1000;
+
+BOOST_AUTO_TEST_CASE(NoTasksTest)
+{
+ Worker::Pointer worker = Worker::create();
+}
+
+BOOST_AUTO_TEST_CASE(NoTasks2Test)
+{
+ Worker::Pointer worker = Worker::create();
+ Worker::Pointer sub1 = worker->createSubWorker();
+ Worker::Pointer sub2 = worker->createSubWorker();
+ Worker::Pointer sub3 = sub1->createSubWorker();
+
+ sub1.reset();
+ worker.reset();
+}
+
+BOOST_AUTO_TEST_CASE(SimpleTest)
+{
+ Latch done;
+
+ Worker::Pointer worker = Worker::create();
+ worker->addTask([&] {
+ done.set();
+ });
+
+ BOOST_CHECK(done.wait(TIMEOUT));
+}
+
+BOOST_AUTO_TEST_CASE(QueueTest)
+{
+ std::mutex mutex;
+ std::string result;
+
+ Worker::Pointer worker = Worker::create();
+
+ for (int n=0; n<10; ++n) {
+ worker->addTask([&, n]{
+ std::lock_guard lock(mutex);
+ result += std::to_string(n);
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ });
+ }
+
+ worker.reset();
+
+ std::lock_guard lock(mutex);
+ BOOST_CHECK_EQUAL("0123456789", result);
+}
+
+BOOST_AUTO_TEST_CASE(ThreadResumeTest)
+{
+ Latch done;
+
+ const auto task = [&] {
+ done.set();
+ };
+
+ Worker::Pointer worker = Worker::create();
+
+ worker->addTask(task);
+
+ BOOST_CHECK(done.wait(TIMEOUT));
+
+ // make sure worker thread is in waiting state
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ worker->addTask(task);
+
+ worker.reset();
+
+ BOOST_CHECK(done.wait(TIMEOUT));
+}
+
+BOOST_AUTO_TEST_CASE(SubWorkerTest)
+{
+ std::mutex mutex;
+ std::string result;
+
+ Worker::Pointer worker = Worker::create();
+ Worker::Pointer sub1 = worker->createSubWorker();
+ Worker::Pointer sub2 = worker->createSubWorker();
+
+ auto addTask = [&](Worker::Pointer w, const std::string& id) {
+ w->addTask([&, id]{
+ std::lock_guard lock(mutex);
+ result += id;
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ });
+ };
+
+ for (int n=0; n<4; ++n) {
+ addTask(worker, "_w" + std::to_string(n));
+ addTask(sub1, "_a" + std::to_string(n));
+ }
+
+ worker.reset();
+ sub1.reset();
+
+ {
+ std::lock_guard lock(mutex);
+ BOOST_CHECK_EQUAL("_w0_a0_w1_a1_w2_a2_w3_a3", result);
+ result.clear();
+ }
+
+ addTask(sub2, "_b0");
+ addTask(sub2, "_b1");
+
+ sub2.reset();
+
+ {
+ std::lock_guard lock(mutex);
+ BOOST_CHECK_EQUAL("_b0_b1", result);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(NoCopyTest)
+{
+ typedef std::atomic_int Counter;
+
+ struct Task {
+ Counter& count;
+
+ Task(Counter& c) : count(c) {};
+ Task(const Task& t) : count(t.count) {++count;}
+ Task(Task&& r) : count(r.count) {}
+ Task& operator=(const Task&) = delete;
+ Task& operator=(Task&&) = delete;
+ void operator() () {}
+
+ };
+
+ Counter copyCount(0);
+
+ Worker::Pointer worker = Worker::create();
+ worker->addTask(Task(copyCount));
+ worker.reset();
+
+ BOOST_CHECK_EQUAL(1, copyCount); // one copy for creating std::function
+}
+
+BOOST_AUTO_TEST_SUITE_END()
--
2.7.4