Worker thread utility class 91/31891/7
authorPiotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
Thu, 11 Dec 2014 16:54:13 +0000 (17:54 +0100)
committerPiotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
Tue, 16 Dec 2014 12:46:39 +0000 (13:46 +0100)
[Bug/Feature]   A utility class that wraps a queue of tasks executed in a
                dedicated thread.
[Cause]         N/A
[Solution]      N/A
[Verification]  Build, install, run tests

Change-Id: I32788fd6321c2877cf4dafe7213c1a140c1d3fd2

common/utils/counting-map.hpp [new file with mode: 0644]
common/utils/worker.cpp [new file with mode: 0644]
common/utils/worker.hpp [new file with mode: 0644]
server/zone.cpp
server/zone.hpp
server/zones-manager.cpp
server/zones-manager.hpp
tests/unit_tests/server/ut-zone.cpp
tests/unit_tests/utils/ut-counting-map.cpp [new file with mode: 0644]
tests/unit_tests/utils/ut-worker.cpp [new file with mode: 0644]

diff --git a/common/utils/counting-map.hpp b/common/utils/counting-map.hpp
new file mode 100644 (file)
index 0000000..da5f4c7
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   Counting map
+ */
+
+#ifndef COMMON_UTILS_COUNTING_MAP_HPP
+#define COMMON_UTILS_COUNTING_MAP_HPP
+
+#include <unordered_map>
+
+namespace vasum {
+namespace utils {
+
+
+/**
+ * Structure used to count elements.
+ * It's like multiset + count but is more efficient.
+ */
+template<class Key>
+class CountingMap {
+public:
+    size_t increment(const Key& key)
+    {
+        auto res = mMap.insert(typename Map::value_type(key, 1));
+        if (!res.second) {
+            ++res.first->second;
+        }
+        return res.first->second;
+    }
+
+    size_t decrement(const Key& key)
+    {
+        auto it = mMap.find(key);
+        if (it == mMap.end()) {
+            return 0;
+        }
+        if (--it->second == 0) {
+            mMap.erase(it);
+            return 0;
+        }
+        return it->second;
+    }
+
+    void clear()
+    {
+        mMap.clear();
+    }
+
+    size_t get(const Key& key) const
+    {
+        auto it = mMap.find(key);
+        return it == mMap.end() ? 0 : it->second;
+    }
+
+    bool empty() const
+    {
+        return mMap.empty();
+    }
+private:
+    typedef std::unordered_map<Key, size_t> Map;
+    Map mMap;
+};
+
+
+} // namespace utils
+} // namespace vasum
+
+
+#endif // COMMON_UTILS_COUNTING_MAP_HPP
diff --git a/common/utils/worker.cpp b/common/utils/worker.cpp
new file mode 100644 (file)
index 0000000..2cb3284
--- /dev/null
@@ -0,0 +1,186 @@
+/*
+ *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   A worker thread that executes tasks
+ */
+
+#include "config.hpp"
+#include "utils/worker.hpp"
+#include "utils/counting-map.hpp"
+#include "logger/logger.hpp"
+
+#include <atomic>
+#include <deque>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <cassert>
+
+
+namespace vasum {
+namespace utils {
+
+
+class Worker::WorkerQueue {
+public:
+    WorkerQueue()
+        : mLastGroupID(0), mEnding(false)
+    {
+        LOGT("Worker queue created");
+    }
+
+    ~WorkerQueue()
+    {
+        {
+            Lock lock(mMutex);
+            assert(mTaskQueue.empty());
+            assert(mGroupCounter.empty());
+            mEnding = true;
+        }
+        if (mThread.joinable()) {
+            mAddedCondition.notify_all();
+            mThread.join();
+        }
+        LOGT("Worker queue destroyed");
+    }
+
+    GroupID getNextGroupID()
+    {
+        return ++mLastGroupID;
+    }
+
+    void addTask(const Worker::Task& task, GroupID groupID)
+    {
+        assert(task);
+
+        Lock lock(mMutex);
+        LOGT("Adding task to subgroup " << groupID);
+        mTaskQueue.push_back(TaskInfo{task, groupID});
+        mGroupCounter.increment(groupID);
+        mAddedCondition.notify_one();
+        if (!mThread.joinable()) {
+            mThread = std::thread(&WorkerQueue::workerProc, this);
+        }
+    }
+
+    void waitForGroupEmpty(GroupID groupID)
+    {
+        Lock lock(mMutex);
+        size_t count = mGroupCounter.get(groupID);
+        if (count > 0) {
+            LOGD("Waiting for " << count << " task in group " << groupID);
+        }
+        mEmptyGroupCondition.wait(lock, [this, groupID] {
+            return mGroupCounter.get(groupID) == 0;
+        });
+    }
+private:
+    typedef std::unique_lock<std::mutex> Lock;
+
+    struct TaskInfo {
+        Worker::Task task;
+        GroupID groupID;
+    };
+
+    std::atomic<GroupID> mLastGroupID;
+    std::condition_variable mAddedCondition;
+    std::condition_variable mEmptyGroupCondition;
+    std::thread mThread;
+
+    std::mutex mMutex; // protects below member variables:
+    bool mEnding;
+    std::deque<TaskInfo> mTaskQueue;
+    CountingMap<GroupID> mGroupCounter;
+
+    void workerProc()
+    {
+        LOGT("Worker thread started");
+        for (;;) {
+            // wait for a task
+            GroupID groupID;
+            {
+                Lock lock(mMutex);
+                mAddedCondition.wait(lock, [this] {
+                    return !mTaskQueue.empty() || mEnding;
+                });
+                if (mTaskQueue.empty()) {
+                    break;
+                }
+                TaskInfo taskInfo = std::move(mTaskQueue.front());
+                mTaskQueue.pop_front();
+
+                lock.unlock();
+
+                // execute
+                execute(taskInfo);
+                groupID = taskInfo.groupID;
+            }
+            // remove from queue
+            {
+                Lock lock(mMutex);
+                if (mGroupCounter.decrement(groupID) == 0) {
+                    mEmptyGroupCondition.notify_all();
+                }
+            }
+        }
+        LOGT("Worker thread exited");
+    }
+
+    void execute(const TaskInfo& taskInfo)
+    {
+        try {
+            LOGT("Executing task from subgroup " << taskInfo.groupID);
+            taskInfo.task();
+        } catch (const std::exception& e) {
+            LOGE("Unexpected exception while executing task: " << e.what());
+        }
+    }
+};
+
+
+Worker::Pointer Worker::create()
+{
+    return Pointer(new Worker(std::make_shared<WorkerQueue>()));
+}
+
+Worker::Worker(const std::shared_ptr<WorkerQueue>& workerQueue)
+    : mWorkerQueue(workerQueue), mGroupID(workerQueue->getNextGroupID())
+{
+}
+
+Worker::~Worker()
+{
+    mWorkerQueue->waitForGroupEmpty(mGroupID);
+}
+
+Worker::Pointer Worker::createSubWorker()
+{
+    return Pointer(new Worker(mWorkerQueue));
+}
+
+void Worker::addTask(const Task& task)
+{
+    mWorkerQueue->addTask(task, mGroupID);
+}
+
+
+} // namespace utils
+} // namespace vasum
diff --git a/common/utils/worker.hpp b/common/utils/worker.hpp
new file mode 100644 (file)
index 0000000..0d951fb
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   A worker thread that executes tasks
+ */
+
+#ifndef COMMON_UTILS_WORKER_HPP
+#define COMMON_UTILS_WORKER_HPP
+
+#include <functional>
+#include <memory>
+
+namespace vasum {
+namespace utils {
+
+/**
+ * A queue with tasks executed in a dedicated thread.
+ * Current implementation creates a thread on the first use.
+ */
+class Worker {
+public:
+    typedef std::shared_ptr<Worker> Pointer;
+    typedef std::function<void()> Task;
+
+    ~Worker();
+
+    /**
+     * Creates a worker with its own thread
+     */
+    static Pointer create();
+
+    /**
+     * Creates a worker that share a thread with its parent
+     */
+    Pointer createSubWorker();
+
+    /**
+     * Adds a task to the queue.
+     */
+    void addTask(const Task& task);
+
+private:
+    typedef unsigned int GroupID;
+    class WorkerQueue;
+
+    const std::shared_ptr<WorkerQueue> mWorkerQueue;
+    const GroupID mGroupID;
+
+    Worker(const std::shared_ptr<WorkerQueue>& workerQueue);
+};
+
+} // namespace utils
+} // namespace vasum
+
+
+#endif // COMMON_UTILS_WORKER_HPP
index 2e4573c..5f6dce0 100644 (file)
@@ -65,10 +65,12 @@ void declareUnit(const std::string& file, ZoneProvisioning::Unit&& unit)
 
 } // namespace
 
-Zone::Zone(const std::string& zonesPath,
-                     const std::string& zoneConfigPath,
-                     const std::string& lxcTemplatePrefix,
-                     const std::string& baseRunMountPointPath)
+Zone::Zone(const utils::Worker::Pointer& worker,
+           const std::string& zonesPath,
+           const std::string& zoneConfigPath,
+           const std::string& lxcTemplatePrefix,
+           const std::string& baseRunMountPointPath)
+    : mWorker(worker)
 {
     config::loadFromFile(zoneConfigPath, mConfig);
 
@@ -92,19 +94,9 @@ Zone::~Zone()
 {
     // Make sure all OnNameLostCallbacks get finished and no new will
     // get called before proceeding further. This guarantees no race
-    // condition on the mReconnectThread.
-    {
-        Lock lock(mReconnectMutex);
-        disconnect();
-    }
-
-    if (mReconnectThread.joinable()) {
-        mReconnectThread.join();
-    }
-
-    if (mStartThread.joinable()) {
-        mStartThread.join();
-    }
+    // condition on the reconnect thread.
+    Lock lock(mReconnectMutex);
+    disconnect();
 }
 
 const std::vector<boost::regex>& Zone::getPermittedToSend() const
@@ -146,10 +138,6 @@ void Zone::start()
 
 void Zone::startAsync(const StartAsyncResultCallback& callback)
 {
-    if (mStartThread.joinable()) {
-        mStartThread.join();
-    }
-
     auto startWrapper = [this, callback]() {
         bool succeeded = false;
 
@@ -165,7 +153,7 @@ void Zone::startAsync(const StartAsyncResultCallback& callback)
         }
     };
 
-    mStartThread = std::thread(startWrapper);
+    mWorker->addTask(startWrapper);
 }
 
 void Zone::stop()
@@ -300,10 +288,7 @@ void Zone::onNameLostCallback()
 {
     LOGI(getId() << ": A connection to the DBUS server has been lost, reconnecting...");
 
-    if (mReconnectThread.joinable()) {
-        mReconnectThread.join();
-    }
-    mReconnectThread = std::thread(std::bind(&Zone::reconnectHandler, this));
+    mWorker->addTask(std::bind(&Zone::reconnectHandler, this));
 }
 
 void Zone::reconnectHandler()
index 5ee95af..1fdc588 100644 (file)
@@ -30,6 +30,7 @@
 #include "zone-admin.hpp"
 #include "zone-connection.hpp"
 #include "zone-connection-transport.hpp"
+#include "utils/worker.hpp"
 
 #include <string>
 #include <memory>
@@ -50,10 +51,11 @@ public:
      * @param lxcTemplatePrefix directory where templates are stored
      * @param baseRunMountPointPath base directory for run mount point
      */
-    Zone(const std::string& zonesPath,
-              const std::string& zoneConfigPath,
-              const std::string& lxcTemplatePrefix,
-              const std::string& baseRunMountPointPath);
+    Zone(const utils::Worker::Pointer& worker,
+         const std::string& zonesPath,
+         const std::string& zoneConfigPath,
+         const std::string& lxcTemplatePrefix,
+         const std::string& baseRunMountPointPath);
     Zone(Zone&&) = default;
     virtual ~Zone();
 
@@ -253,14 +255,13 @@ public:
                      const std::string& target);
 
 private:
+    utils::Worker::Pointer mWorker;
     ZoneConfig mConfig;
     std::vector<boost::regex> mPermittedToSend;
     std::vector<boost::regex> mPermittedToRecv;
     std::unique_ptr<ZoneConnectionTransport> mConnectionTransport;
     std::unique_ptr<ZoneAdmin> mAdmin;
     std::unique_ptr<ZoneConnection> mConnection;
-    std::thread mReconnectThread;
-    std::thread mStartThread;
     mutable std::recursive_mutex mReconnectMutex;
     NotifyActiveZoneCallback mNotifyCallback;
     DisplayOffCallback mDisplayOffCallback;
index 402fc7f..2ce03d7 100644 (file)
@@ -74,7 +74,8 @@ const unsigned int ZONE_IP_BASE_THIRD_OCTET = 100;
 
 } // namespace
 
-ZonesManager::ZonesManager(const std::string& managerConfigPath): mDetachOnExit(false)
+ZonesManager::ZonesManager(const std::string& managerConfigPath)
+    : mWorker(utils::Worker::create()), mDetachOnExit(false)
 {
     LOGD("Instantiating ZonesManager object...");
 
@@ -168,32 +169,33 @@ void ZonesManager::createZone(const std::string& zoneConfig)
     std::string zoneConfigPath = utils::getAbsolutePath(zoneConfig, baseConfigPath);
 
     LOGT("Creating Zone " << zoneConfigPath);
-    std::unique_ptr<Zone> c(new Zone(mConfig.zonesPath,
-                                               zoneConfigPath,
-                                               mConfig.lxcTemplatePrefix,
-                                               mConfig.runMountPointPrefix));
-    const std::string id = c->getId();
+    std::unique_ptr<Zone> zone(new Zone(mWorker->createSubWorker(),
+                                        mConfig.zonesPath,
+                                        zoneConfigPath,
+                                        mConfig.lxcTemplatePrefix,
+                                        mConfig.runMountPointPrefix));
+    const std::string id = zone->getId();
     if (id == HOST_ID) {
         throw ZoneOperationException("Cannot use reserved zone ID");
     }
 
     using namespace std::placeholders;
-    c->setNotifyActiveZoneCallback(bind(&ZonesManager::notifyActiveZoneHandler,
-                                             this, id, _1, _2));
+    zone->setNotifyActiveZoneCallback(bind(&ZonesManager::notifyActiveZoneHandler,
+                                           this, id, _1, _2));
 
-    c->setDisplayOffCallback(bind(&ZonesManager::displayOffHandler,
-                                  this, id));
+    zone->setDisplayOffCallback(bind(&ZonesManager::displayOffHandler,
+                                     this, id));
 
-    c->setFileMoveRequestCallback(bind(&ZonesManager::handleZoneMoveFileRequest,
-                                            this, id, _1, _2, _3));
+    zone->setFileMoveRequestCallback(bind(&ZonesManager::handleZoneMoveFileRequest,
+                                          this, id, _1, _2, _3));
 
-    c->setProxyCallCallback(bind(&ZonesManager::handleProxyCall,
-                                 this, id, _1, _2, _3, _4, _5, _6, _7));
+    zone->setProxyCallCallback(bind(&ZonesManager::handleProxyCall,
+                                    this, id, _1, _2, _3, _4, _5, _6, _7));
 
-    c->setDbusStateChangedCallback(bind(&ZonesManager::handleDbusStateChanged,
-                                        this, id, _1));
+    zone->setDbusStateChangedCallback(bind(&ZonesManager::handleDbusStateChanged,
+                                           this, id, _1));
 
-    mZones.insert(ZoneMap::value_type(id, std::move(c)));
+    mZones.insert(ZoneMap::value_type(id, std::move(zone)));
 
     // after zone is created successfully, put a file informing that zones are enabled
     if (mZones.size() == 1) {
@@ -836,8 +838,7 @@ void ZonesManager::handleDestroyZoneCall(const std::string& id,
         result->setVoid();
     };
 
-    std::thread thread(destroyer);
-    thread.detach(); //TODO fix it
+    mWorker->addTask(destroyer);
 }
 
 void ZonesManager::handleLockZoneCall(const std::string& id,
index 2fc0305..b58da86 100644 (file)
@@ -31,6 +31,7 @@
 #include "host-connection.hpp"
 #include "input-monitor.hpp"
 #include "proxy-call-policy.hpp"
+#include "utils/worker.hpp"
 
 #include <string>
 #include <unordered_map>
@@ -105,6 +106,7 @@ public:
     void setZonesDetachOnExit();
 
 private:
+    utils::Worker::Pointer mWorker;
     ZonesManagerConfig mConfig;
     std::string mConfigPath;
     HostConnection mHostConnection;
index 7d80d66..80e73da 100644 (file)
@@ -63,10 +63,11 @@ struct Fixture {
 
     std::unique_ptr<Zone> create(const std::string& configPath)
     {
-        return std::unique_ptr<Zone>(new Zone(ZONES_PATH,
-                                                        configPath,
-                                                        LXC_TEMPLATES_PATH,
-                                                        ""));
+        return std::unique_ptr<Zone>(new Zone(utils::Worker::create(),
+                                              ZONES_PATH,
+                                              configPath,
+                                              LXC_TEMPLATES_PATH,
+                                              ""));
     }
 
     void ensureStarted()
diff --git a/tests/unit_tests/utils/ut-counting-map.cpp b/tests/unit_tests/utils/ut-counting-map.cpp
new file mode 100644 (file)
index 0000000..702470f
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   Unit tests of counting map
+ */
+
+#include "config.hpp"
+#include "ut.hpp"
+
+#include "utils/counting-map.hpp"
+
+BOOST_AUTO_TEST_SUITE(CountingMapSuite)
+
+using namespace vasum::utils;
+
+BOOST_AUTO_TEST_CASE(CountingTest)
+{
+    CountingMap<std::string> map;
+
+    BOOST_CHECK(map.empty());
+    BOOST_CHECK_EQUAL(0, map.get("ala"));
+
+    BOOST_CHECK_EQUAL(1, map.increment("ala"));
+    BOOST_CHECK_EQUAL(1, map.increment("ma"));
+
+    BOOST_CHECK(!map.empty());
+    BOOST_CHECK_EQUAL(1, map.get("ala"));
+    BOOST_CHECK_EQUAL(1, map.get("ma"));
+    BOOST_CHECK_EQUAL(0, map.get("kota"));
+
+    BOOST_CHECK_EQUAL(2, map.increment("ala"));
+    BOOST_CHECK_EQUAL(2, map.increment("ma"));
+    BOOST_CHECK_EQUAL(3, map.increment("ma"));
+
+    BOOST_CHECK(!map.empty());
+    BOOST_CHECK_EQUAL(2, map.get("ala"));
+    BOOST_CHECK_EQUAL(3, map.get("ma"));
+    BOOST_CHECK_EQUAL(0, map.get("kota"));
+
+    BOOST_CHECK_EQUAL(1, map.decrement("ala"));
+    BOOST_CHECK_EQUAL(0, map.decrement("kota"));
+
+    BOOST_CHECK(!map.empty());
+    BOOST_CHECK_EQUAL(1, map.get("ala"));
+    BOOST_CHECK_EQUAL(3, map.get("ma"));
+    BOOST_CHECK_EQUAL(0, map.get("kota"));
+
+    BOOST_CHECK_EQUAL(0, map.decrement("ala"));
+
+    BOOST_CHECK(!map.empty());
+    BOOST_CHECK_EQUAL(0, map.get("ala"));
+    BOOST_CHECK_EQUAL(3, map.get("ma"));
+    BOOST_CHECK_EQUAL(0, map.get("kota"));
+
+    BOOST_CHECK_EQUAL(2, map.decrement("ma"));
+    BOOST_CHECK_EQUAL(1, map.decrement("ma"));
+    BOOST_CHECK_EQUAL(0, map.decrement("ma"));
+
+    BOOST_CHECK(map.empty());
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/tests/unit_tests/utils/ut-worker.cpp b/tests/unit_tests/utils/ut-worker.cpp
new file mode 100644 (file)
index 0000000..280889b
--- /dev/null
@@ -0,0 +1,182 @@
+/*
+ *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   Unit tests of worker thread
+ */
+
+#include "config.hpp"
+#include "ut.hpp"
+
+#include "utils/worker.hpp"
+#include "utils/latch.hpp"
+
+#include <chrono>
+#include <thread>
+#include <atomic>
+
+BOOST_AUTO_TEST_SUITE(WorkerSuite)
+
+using namespace vasum::utils;
+
+const int unsigned TIMEOUT = 1000;
+
+BOOST_AUTO_TEST_CASE(NoTasksTest)
+{
+    Worker::Pointer worker = Worker::create();
+}
+
+BOOST_AUTO_TEST_CASE(NoTasks2Test)
+{
+    Worker::Pointer worker = Worker::create();
+    Worker::Pointer sub1 = worker->createSubWorker();
+    Worker::Pointer sub2 = worker->createSubWorker();
+    Worker::Pointer sub3 = sub1->createSubWorker();
+
+    sub1.reset();
+    worker.reset();
+}
+
+BOOST_AUTO_TEST_CASE(SimpleTest)
+{
+    Latch done;
+
+    Worker::Pointer worker = Worker::create();
+    worker->addTask([&] {
+        done.set();
+    });
+
+    BOOST_CHECK(done.wait(TIMEOUT));
+}
+
+BOOST_AUTO_TEST_CASE(QueueTest)
+{
+    std::mutex mutex;
+    std::string result;
+
+    Worker::Pointer worker = Worker::create();
+
+    for (int n=0; n<10; ++n) {
+        worker->addTask([&, n]{
+            std::lock_guard<std::mutex> lock(mutex);
+            result += std::to_string(n);
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        });
+    }
+
+    worker.reset();
+
+    std::lock_guard<std::mutex> lock(mutex);
+    BOOST_CHECK_EQUAL("0123456789", result);
+}
+
+BOOST_AUTO_TEST_CASE(ThreadResumeTest)
+{
+    Latch done;
+
+    const auto task = [&] {
+        done.set();
+    };
+
+    Worker::Pointer worker = Worker::create();
+
+    worker->addTask(task);
+
+    BOOST_CHECK(done.wait(TIMEOUT));
+
+    // make sure worker thread is in waiting state
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    worker->addTask(task);
+
+    worker.reset();
+
+    BOOST_CHECK(done.wait(TIMEOUT));
+}
+
+BOOST_AUTO_TEST_CASE(SubWorkerTest)
+{
+    std::mutex mutex;
+    std::string result;
+
+    Worker::Pointer worker = Worker::create();
+    Worker::Pointer sub1 = worker->createSubWorker();
+    Worker::Pointer sub2 = worker->createSubWorker();
+
+    auto addTask = [&](Worker::Pointer w, const std::string& id) {
+        w->addTask([&, id]{
+            std::lock_guard<std::mutex> lock(mutex);
+            result += id;
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        });
+    };
+
+    for (int n=0; n<4; ++n) {
+        addTask(worker, "_w" + std::to_string(n));
+        addTask(sub1, "_a" + std::to_string(n));
+    }
+
+    worker.reset();
+    sub1.reset();
+
+    {
+        std::lock_guard<std::mutex> lock(mutex);
+        BOOST_CHECK_EQUAL("_w0_a0_w1_a1_w2_a2_w3_a3", result);
+        result.clear();
+    }
+
+    addTask(sub2, "_b0");
+    addTask(sub2, "_b1");
+
+    sub2.reset();
+
+    {
+        std::lock_guard<std::mutex> lock(mutex);
+        BOOST_CHECK_EQUAL("_b0_b1", result);
+    }
+}
+
+BOOST_AUTO_TEST_CASE(NoCopyTest)
+{
+    typedef std::atomic_int Counter;
+
+    struct Task {
+        Counter& count;
+
+        Task(Counter& c) : count(c) {};
+        Task(const Task& t) : count(t.count) {++count;}
+        Task(Task&& r) : count(r.count) {}
+        Task& operator=(const Task&) = delete;
+        Task& operator=(Task&&) = delete;
+        void operator() () {}
+
+    };
+
+    Counter copyCount(0);
+
+    Worker::Pointer worker = Worker::create();
+    worker->addTask(Task(copyCount));
+    worker.reset();
+
+    BOOST_CHECK_EQUAL(1, copyCount); // one copy for creating std::function
+}
+
+BOOST_AUTO_TEST_SUITE_END()