From 8f92cd9ec0ab3c34ac8add4c9b6e8f2a31227cb5 Mon Sep 17 00:00:00 2001 From: Piotr Bartosiewicz Date: Thu, 11 Dec 2014 17:54:13 +0100 Subject: [PATCH] Worker thread utility class [Bug/Feature] A utility class that wraps a queue of tasks executed in a dedicated thread. [Cause] N/A [Solution] N/A [Verification] Build, install, run tests Change-Id: I32788fd6321c2877cf4dafe7213c1a140c1d3fd2 --- common/utils/counting-map.hpp | 88 ++++++++++++++ common/utils/worker.cpp | 186 +++++++++++++++++++++++++++++ common/utils/worker.hpp | 74 ++++++++++++ server/zone.cpp | 37 ++---- server/zone.hpp | 13 +- server/zones-manager.cpp | 39 +++--- server/zones-manager.hpp | 2 + tests/unit_tests/server/ut-zone.cpp | 9 +- tests/unit_tests/utils/ut-counting-map.cpp | 81 +++++++++++++ tests/unit_tests/utils/ut-worker.cpp | 182 ++++++++++++++++++++++++++++ 10 files changed, 656 insertions(+), 55 deletions(-) create mode 100644 common/utils/counting-map.hpp create mode 100644 common/utils/worker.cpp create mode 100644 common/utils/worker.hpp create mode 100644 tests/unit_tests/utils/ut-counting-map.cpp create mode 100644 tests/unit_tests/utils/ut-worker.cpp diff --git a/common/utils/counting-map.hpp b/common/utils/counting-map.hpp new file mode 100644 index 0000000..da5f4c7 --- /dev/null +++ b/common/utils/counting-map.hpp @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief Counting map + */ + +#ifndef COMMON_UTILS_COUNTING_MAP_HPP +#define COMMON_UTILS_COUNTING_MAP_HPP + +#include + +namespace vasum { +namespace utils { + + +/** + * Structure used to count elements. + * It's like multiset + count but is more efficient. + */ +template +class CountingMap { +public: + size_t increment(const Key& key) + { + auto res = mMap.insert(typename Map::value_type(key, 1)); + if (!res.second) { + ++res.first->second; + } + return res.first->second; + } + + size_t decrement(const Key& key) + { + auto it = mMap.find(key); + if (it == mMap.end()) { + return 0; + } + if (--it->second == 0) { + mMap.erase(it); + return 0; + } + return it->second; + } + + void clear() + { + mMap.clear(); + } + + size_t get(const Key& key) const + { + auto it = mMap.find(key); + return it == mMap.end() ? 0 : it->second; + } + + bool empty() const + { + return mMap.empty(); + } +private: + typedef std::unordered_map Map; + Map mMap; +}; + + +} // namespace utils +} // namespace vasum + + +#endif // COMMON_UTILS_COUNTING_MAP_HPP diff --git a/common/utils/worker.cpp b/common/utils/worker.cpp new file mode 100644 index 0000000..2cb3284 --- /dev/null +++ b/common/utils/worker.cpp @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief A worker thread that executes tasks + */ + +#include "config.hpp" +#include "utils/worker.hpp" +#include "utils/counting-map.hpp" +#include "logger/logger.hpp" + +#include +#include +#include +#include +#include +#include + + +namespace vasum { +namespace utils { + + +class Worker::WorkerQueue { +public: + WorkerQueue() + : mLastGroupID(0), mEnding(false) + { + LOGT("Worker queue created"); + } + + ~WorkerQueue() + { + { + Lock lock(mMutex); + assert(mTaskQueue.empty()); + assert(mGroupCounter.empty()); + mEnding = true; + } + if (mThread.joinable()) { + mAddedCondition.notify_all(); + mThread.join(); + } + LOGT("Worker queue destroyed"); + } + + GroupID getNextGroupID() + { + return ++mLastGroupID; + } + + void addTask(const Worker::Task& task, GroupID groupID) + { + assert(task); + + Lock lock(mMutex); + LOGT("Adding task to subgroup " << groupID); + mTaskQueue.push_back(TaskInfo{task, groupID}); + mGroupCounter.increment(groupID); + mAddedCondition.notify_one(); + if (!mThread.joinable()) { + mThread = std::thread(&WorkerQueue::workerProc, this); + } + } + + void waitForGroupEmpty(GroupID groupID) + { + Lock lock(mMutex); + size_t count = mGroupCounter.get(groupID); + if (count > 0) { + LOGD("Waiting for " << count << " task in group " << groupID); + } + mEmptyGroupCondition.wait(lock, [this, groupID] { + return mGroupCounter.get(groupID) == 0; + }); + } +private: + typedef std::unique_lock Lock; + + struct TaskInfo { + Worker::Task task; + GroupID groupID; + }; + + std::atomic mLastGroupID; + std::condition_variable mAddedCondition; + std::condition_variable mEmptyGroupCondition; + std::thread mThread; + + std::mutex mMutex; // protects below member variables: + bool mEnding; + std::deque mTaskQueue; + CountingMap mGroupCounter; + + void workerProc() + { + LOGT("Worker thread started"); + for (;;) { + // wait for a task + GroupID groupID; + { + Lock lock(mMutex); + mAddedCondition.wait(lock, [this] { + return !mTaskQueue.empty() || mEnding; + }); + if (mTaskQueue.empty()) { + break; + } + TaskInfo taskInfo = std::move(mTaskQueue.front()); + mTaskQueue.pop_front(); + + lock.unlock(); + + // execute + execute(taskInfo); + groupID = taskInfo.groupID; + } + // remove from queue + { + Lock lock(mMutex); + if (mGroupCounter.decrement(groupID) == 0) { + mEmptyGroupCondition.notify_all(); + } + } + } + LOGT("Worker thread exited"); + } + + void execute(const TaskInfo& taskInfo) + { + try { + LOGT("Executing task from subgroup " << taskInfo.groupID); + taskInfo.task(); + } catch (const std::exception& e) { + LOGE("Unexpected exception while executing task: " << e.what()); + } + } +}; + + +Worker::Pointer Worker::create() +{ + return Pointer(new Worker(std::make_shared())); +} + +Worker::Worker(const std::shared_ptr& workerQueue) + : mWorkerQueue(workerQueue), mGroupID(workerQueue->getNextGroupID()) +{ +} + +Worker::~Worker() +{ + mWorkerQueue->waitForGroupEmpty(mGroupID); +} + +Worker::Pointer Worker::createSubWorker() +{ + return Pointer(new Worker(mWorkerQueue)); +} + +void Worker::addTask(const Task& task) +{ + mWorkerQueue->addTask(task, mGroupID); +} + + +} // namespace utils +} // namespace vasum diff --git a/common/utils/worker.hpp b/common/utils/worker.hpp new file mode 100644 index 0000000..0d951fb --- /dev/null +++ b/common/utils/worker.hpp @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief A worker thread that executes tasks + */ + +#ifndef COMMON_UTILS_WORKER_HPP +#define COMMON_UTILS_WORKER_HPP + +#include +#include + +namespace vasum { +namespace utils { + +/** + * A queue with tasks executed in a dedicated thread. + * Current implementation creates a thread on the first use. + */ +class Worker { +public: + typedef std::shared_ptr Pointer; + typedef std::function Task; + + ~Worker(); + + /** + * Creates a worker with its own thread + */ + static Pointer create(); + + /** + * Creates a worker that share a thread with its parent + */ + Pointer createSubWorker(); + + /** + * Adds a task to the queue. + */ + void addTask(const Task& task); + +private: + typedef unsigned int GroupID; + class WorkerQueue; + + const std::shared_ptr mWorkerQueue; + const GroupID mGroupID; + + Worker(const std::shared_ptr& workerQueue); +}; + +} // namespace utils +} // namespace vasum + + +#endif // COMMON_UTILS_WORKER_HPP diff --git a/server/zone.cpp b/server/zone.cpp index 2e4573c..5f6dce0 100644 --- a/server/zone.cpp +++ b/server/zone.cpp @@ -65,10 +65,12 @@ void declareUnit(const std::string& file, ZoneProvisioning::Unit&& unit) } // namespace -Zone::Zone(const std::string& zonesPath, - const std::string& zoneConfigPath, - const std::string& lxcTemplatePrefix, - const std::string& baseRunMountPointPath) +Zone::Zone(const utils::Worker::Pointer& worker, + const std::string& zonesPath, + const std::string& zoneConfigPath, + const std::string& lxcTemplatePrefix, + const std::string& baseRunMountPointPath) + : mWorker(worker) { config::loadFromFile(zoneConfigPath, mConfig); @@ -92,19 +94,9 @@ Zone::~Zone() { // Make sure all OnNameLostCallbacks get finished and no new will // get called before proceeding further. This guarantees no race - // condition on the mReconnectThread. - { - Lock lock(mReconnectMutex); - disconnect(); - } - - if (mReconnectThread.joinable()) { - mReconnectThread.join(); - } - - if (mStartThread.joinable()) { - mStartThread.join(); - } + // condition on the reconnect thread. + Lock lock(mReconnectMutex); + disconnect(); } const std::vector& Zone::getPermittedToSend() const @@ -146,10 +138,6 @@ void Zone::start() void Zone::startAsync(const StartAsyncResultCallback& callback) { - if (mStartThread.joinable()) { - mStartThread.join(); - } - auto startWrapper = [this, callback]() { bool succeeded = false; @@ -165,7 +153,7 @@ void Zone::startAsync(const StartAsyncResultCallback& callback) } }; - mStartThread = std::thread(startWrapper); + mWorker->addTask(startWrapper); } void Zone::stop() @@ -300,10 +288,7 @@ void Zone::onNameLostCallback() { LOGI(getId() << ": A connection to the DBUS server has been lost, reconnecting..."); - if (mReconnectThread.joinable()) { - mReconnectThread.join(); - } - mReconnectThread = std::thread(std::bind(&Zone::reconnectHandler, this)); + mWorker->addTask(std::bind(&Zone::reconnectHandler, this)); } void Zone::reconnectHandler() diff --git a/server/zone.hpp b/server/zone.hpp index 5ee95af..1fdc588 100644 --- a/server/zone.hpp +++ b/server/zone.hpp @@ -30,6 +30,7 @@ #include "zone-admin.hpp" #include "zone-connection.hpp" #include "zone-connection-transport.hpp" +#include "utils/worker.hpp" #include #include @@ -50,10 +51,11 @@ public: * @param lxcTemplatePrefix directory where templates are stored * @param baseRunMountPointPath base directory for run mount point */ - Zone(const std::string& zonesPath, - const std::string& zoneConfigPath, - const std::string& lxcTemplatePrefix, - const std::string& baseRunMountPointPath); + Zone(const utils::Worker::Pointer& worker, + const std::string& zonesPath, + const std::string& zoneConfigPath, + const std::string& lxcTemplatePrefix, + const std::string& baseRunMountPointPath); Zone(Zone&&) = default; virtual ~Zone(); @@ -253,14 +255,13 @@ public: const std::string& target); private: + utils::Worker::Pointer mWorker; ZoneConfig mConfig; std::vector mPermittedToSend; std::vector mPermittedToRecv; std::unique_ptr mConnectionTransport; std::unique_ptr mAdmin; std::unique_ptr mConnection; - std::thread mReconnectThread; - std::thread mStartThread; mutable std::recursive_mutex mReconnectMutex; NotifyActiveZoneCallback mNotifyCallback; DisplayOffCallback mDisplayOffCallback; diff --git a/server/zones-manager.cpp b/server/zones-manager.cpp index 402fc7f..2ce03d7 100644 --- a/server/zones-manager.cpp +++ b/server/zones-manager.cpp @@ -74,7 +74,8 @@ const unsigned int ZONE_IP_BASE_THIRD_OCTET = 100; } // namespace -ZonesManager::ZonesManager(const std::string& managerConfigPath): mDetachOnExit(false) +ZonesManager::ZonesManager(const std::string& managerConfigPath) + : mWorker(utils::Worker::create()), mDetachOnExit(false) { LOGD("Instantiating ZonesManager object..."); @@ -168,32 +169,33 @@ void ZonesManager::createZone(const std::string& zoneConfig) std::string zoneConfigPath = utils::getAbsolutePath(zoneConfig, baseConfigPath); LOGT("Creating Zone " << zoneConfigPath); - std::unique_ptr c(new Zone(mConfig.zonesPath, - zoneConfigPath, - mConfig.lxcTemplatePrefix, - mConfig.runMountPointPrefix)); - const std::string id = c->getId(); + std::unique_ptr zone(new Zone(mWorker->createSubWorker(), + mConfig.zonesPath, + zoneConfigPath, + mConfig.lxcTemplatePrefix, + mConfig.runMountPointPrefix)); + const std::string id = zone->getId(); if (id == HOST_ID) { throw ZoneOperationException("Cannot use reserved zone ID"); } using namespace std::placeholders; - c->setNotifyActiveZoneCallback(bind(&ZonesManager::notifyActiveZoneHandler, - this, id, _1, _2)); + zone->setNotifyActiveZoneCallback(bind(&ZonesManager::notifyActiveZoneHandler, + this, id, _1, _2)); - c->setDisplayOffCallback(bind(&ZonesManager::displayOffHandler, - this, id)); + zone->setDisplayOffCallback(bind(&ZonesManager::displayOffHandler, + this, id)); - c->setFileMoveRequestCallback(bind(&ZonesManager::handleZoneMoveFileRequest, - this, id, _1, _2, _3)); + zone->setFileMoveRequestCallback(bind(&ZonesManager::handleZoneMoveFileRequest, + this, id, _1, _2, _3)); - c->setProxyCallCallback(bind(&ZonesManager::handleProxyCall, - this, id, _1, _2, _3, _4, _5, _6, _7)); + zone->setProxyCallCallback(bind(&ZonesManager::handleProxyCall, + this, id, _1, _2, _3, _4, _5, _6, _7)); - c->setDbusStateChangedCallback(bind(&ZonesManager::handleDbusStateChanged, - this, id, _1)); + zone->setDbusStateChangedCallback(bind(&ZonesManager::handleDbusStateChanged, + this, id, _1)); - mZones.insert(ZoneMap::value_type(id, std::move(c))); + mZones.insert(ZoneMap::value_type(id, std::move(zone))); // after zone is created successfully, put a file informing that zones are enabled if (mZones.size() == 1) { @@ -836,8 +838,7 @@ void ZonesManager::handleDestroyZoneCall(const std::string& id, result->setVoid(); }; - std::thread thread(destroyer); - thread.detach(); //TODO fix it + mWorker->addTask(destroyer); } void ZonesManager::handleLockZoneCall(const std::string& id, diff --git a/server/zones-manager.hpp b/server/zones-manager.hpp index 2fc0305..b58da86 100644 --- a/server/zones-manager.hpp +++ b/server/zones-manager.hpp @@ -31,6 +31,7 @@ #include "host-connection.hpp" #include "input-monitor.hpp" #include "proxy-call-policy.hpp" +#include "utils/worker.hpp" #include #include @@ -105,6 +106,7 @@ public: void setZonesDetachOnExit(); private: + utils::Worker::Pointer mWorker; ZonesManagerConfig mConfig; std::string mConfigPath; HostConnection mHostConnection; diff --git a/tests/unit_tests/server/ut-zone.cpp b/tests/unit_tests/server/ut-zone.cpp index 7d80d66..80e73da 100644 --- a/tests/unit_tests/server/ut-zone.cpp +++ b/tests/unit_tests/server/ut-zone.cpp @@ -63,10 +63,11 @@ struct Fixture { std::unique_ptr create(const std::string& configPath) { - return std::unique_ptr(new Zone(ZONES_PATH, - configPath, - LXC_TEMPLATES_PATH, - "")); + return std::unique_ptr(new Zone(utils::Worker::create(), + ZONES_PATH, + configPath, + LXC_TEMPLATES_PATH, + "")); } void ensureStarted() diff --git a/tests/unit_tests/utils/ut-counting-map.cpp b/tests/unit_tests/utils/ut-counting-map.cpp new file mode 100644 index 0000000..702470f --- /dev/null +++ b/tests/unit_tests/utils/ut-counting-map.cpp @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief Unit tests of counting map + */ + +#include "config.hpp" +#include "ut.hpp" + +#include "utils/counting-map.hpp" + +BOOST_AUTO_TEST_SUITE(CountingMapSuite) + +using namespace vasum::utils; + +BOOST_AUTO_TEST_CASE(CountingTest) +{ + CountingMap map; + + BOOST_CHECK(map.empty()); + BOOST_CHECK_EQUAL(0, map.get("ala")); + + BOOST_CHECK_EQUAL(1, map.increment("ala")); + BOOST_CHECK_EQUAL(1, map.increment("ma")); + + BOOST_CHECK(!map.empty()); + BOOST_CHECK_EQUAL(1, map.get("ala")); + BOOST_CHECK_EQUAL(1, map.get("ma")); + BOOST_CHECK_EQUAL(0, map.get("kota")); + + BOOST_CHECK_EQUAL(2, map.increment("ala")); + BOOST_CHECK_EQUAL(2, map.increment("ma")); + BOOST_CHECK_EQUAL(3, map.increment("ma")); + + BOOST_CHECK(!map.empty()); + BOOST_CHECK_EQUAL(2, map.get("ala")); + BOOST_CHECK_EQUAL(3, map.get("ma")); + BOOST_CHECK_EQUAL(0, map.get("kota")); + + BOOST_CHECK_EQUAL(1, map.decrement("ala")); + BOOST_CHECK_EQUAL(0, map.decrement("kota")); + + BOOST_CHECK(!map.empty()); + BOOST_CHECK_EQUAL(1, map.get("ala")); + BOOST_CHECK_EQUAL(3, map.get("ma")); + BOOST_CHECK_EQUAL(0, map.get("kota")); + + BOOST_CHECK_EQUAL(0, map.decrement("ala")); + + BOOST_CHECK(!map.empty()); + BOOST_CHECK_EQUAL(0, map.get("ala")); + BOOST_CHECK_EQUAL(3, map.get("ma")); + BOOST_CHECK_EQUAL(0, map.get("kota")); + + BOOST_CHECK_EQUAL(2, map.decrement("ma")); + BOOST_CHECK_EQUAL(1, map.decrement("ma")); + BOOST_CHECK_EQUAL(0, map.decrement("ma")); + + BOOST_CHECK(map.empty()); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/tests/unit_tests/utils/ut-worker.cpp b/tests/unit_tests/utils/ut-worker.cpp new file mode 100644 index 0000000..280889b --- /dev/null +++ b/tests/unit_tests/utils/ut-worker.cpp @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief Unit tests of worker thread + */ + +#include "config.hpp" +#include "ut.hpp" + +#include "utils/worker.hpp" +#include "utils/latch.hpp" + +#include +#include +#include + +BOOST_AUTO_TEST_SUITE(WorkerSuite) + +using namespace vasum::utils; + +const int unsigned TIMEOUT = 1000; + +BOOST_AUTO_TEST_CASE(NoTasksTest) +{ + Worker::Pointer worker = Worker::create(); +} + +BOOST_AUTO_TEST_CASE(NoTasks2Test) +{ + Worker::Pointer worker = Worker::create(); + Worker::Pointer sub1 = worker->createSubWorker(); + Worker::Pointer sub2 = worker->createSubWorker(); + Worker::Pointer sub3 = sub1->createSubWorker(); + + sub1.reset(); + worker.reset(); +} + +BOOST_AUTO_TEST_CASE(SimpleTest) +{ + Latch done; + + Worker::Pointer worker = Worker::create(); + worker->addTask([&] { + done.set(); + }); + + BOOST_CHECK(done.wait(TIMEOUT)); +} + +BOOST_AUTO_TEST_CASE(QueueTest) +{ + std::mutex mutex; + std::string result; + + Worker::Pointer worker = Worker::create(); + + for (int n=0; n<10; ++n) { + worker->addTask([&, n]{ + std::lock_guard lock(mutex); + result += std::to_string(n); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }); + } + + worker.reset(); + + std::lock_guard lock(mutex); + BOOST_CHECK_EQUAL("0123456789", result); +} + +BOOST_AUTO_TEST_CASE(ThreadResumeTest) +{ + Latch done; + + const auto task = [&] { + done.set(); + }; + + Worker::Pointer worker = Worker::create(); + + worker->addTask(task); + + BOOST_CHECK(done.wait(TIMEOUT)); + + // make sure worker thread is in waiting state + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + worker->addTask(task); + + worker.reset(); + + BOOST_CHECK(done.wait(TIMEOUT)); +} + +BOOST_AUTO_TEST_CASE(SubWorkerTest) +{ + std::mutex mutex; + std::string result; + + Worker::Pointer worker = Worker::create(); + Worker::Pointer sub1 = worker->createSubWorker(); + Worker::Pointer sub2 = worker->createSubWorker(); + + auto addTask = [&](Worker::Pointer w, const std::string& id) { + w->addTask([&, id]{ + std::lock_guard lock(mutex); + result += id; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }); + }; + + for (int n=0; n<4; ++n) { + addTask(worker, "_w" + std::to_string(n)); + addTask(sub1, "_a" + std::to_string(n)); + } + + worker.reset(); + sub1.reset(); + + { + std::lock_guard lock(mutex); + BOOST_CHECK_EQUAL("_w0_a0_w1_a1_w2_a2_w3_a3", result); + result.clear(); + } + + addTask(sub2, "_b0"); + addTask(sub2, "_b1"); + + sub2.reset(); + + { + std::lock_guard lock(mutex); + BOOST_CHECK_EQUAL("_b0_b1", result); + } +} + +BOOST_AUTO_TEST_CASE(NoCopyTest) +{ + typedef std::atomic_int Counter; + + struct Task { + Counter& count; + + Task(Counter& c) : count(c) {}; + Task(const Task& t) : count(t.count) {++count;} + Task(Task&& r) : count(r.count) {} + Task& operator=(const Task&) = delete; + Task& operator=(Task&&) = delete; + void operator() () {} + + }; + + Counter copyCount(0); + + Worker::Pointer worker = Worker::create(); + worker->addTask(Task(copyCount)); + worker.reset(); + + BOOST_CHECK_EQUAL(1, copyCount); // one copy for creating std::function +} + +BOOST_AUTO_TEST_SUITE_END() -- 2.7.4