From ac0a215826052e4c0e1e88b446579aea841206ed Mon Sep 17 00:00:00 2001 From: Piotr Bartosiewicz Date: Fri, 5 Dec 2014 15:57:49 +0100 Subject: [PATCH] Fix ipc threading issues [Bug/Feature] N/A [Cause] N/A [Solution] N/A [Verification] Build, install, run tests Change-Id: I0f403bdb9dd535186a7c1fa10a486da265858ad7 --- common/ipc/internals/processor.cpp | 6 ++--- common/ipc/internals/processor.hpp | 2 +- tests/unit_tests/ipc/ut-ipc.cpp | 48 ++++++++++++++++++++------------------ 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index 5565124..b134b08 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -325,18 +325,18 @@ bool Processor::handleInput(const PeerID peerID, const Socket& socket) if (mMethodsCallbacks.count(methodID)) { // Method std::shared_ptr methodCallbacks = mMethodsCallbacks.at(methodID); - mCallsMutex.unlock(); + lock.unlock(); return onRemoteCall(peerID, socket, methodID, messageID, methodCallbacks); } else if (mSignalsCallbacks.count(methodID)) { // Signal std::shared_ptr signalCallbacks = mSignalsCallbacks.at(methodID); - mCallsMutex.unlock(); + lock.unlock(); return onRemoteSignal(peerID, socket, methodID, messageID, signalCallbacks); } else { // Nothing - mCallsMutex.unlock(); + lock.unlock(); LOGW("No method or signal callback for methodID: " << methodID); removePeerInternal(peerID, Status::NAUGHTY_PEER); return true; diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp index 8fc17fb..da2a5b9 100644 --- a/common/ipc/internals/processor.hpp +++ b/common/ipc/internals/processor.hpp @@ -243,7 +243,7 @@ public: private: typedef std::function& data)> SerializeCallback; typedef std::function(int fd)> ParseCallback; - typedef std::lock_guard Lock; + typedef std::unique_lock Lock; struct EmptyData { CONFIG_REGISTER_EMPTY diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp index 88696fe..b8b9e95 100644 --- a/tests/unit_tests/ipc/ut-ipc.cpp +++ b/tests/unit_tests/ipc/ut-ipc.cpp @@ -102,14 +102,12 @@ struct ThrowOnAcceptData { template void accept(Visitor) { - LOGE("Serialization and parsing failed"); - throw std::exception(); + throw std::runtime_error("intentional failure in accept"); } template void accept(Visitor) const { - LOGE("Const Serialization and parsing failed"); - throw std::exception(); + throw std::runtime_error("intentional failure in accept const"); } }; @@ -139,11 +137,11 @@ PeerID connect(Service& s, Client& c) // Connects the Client to the Service and returns Clients PeerID std::mutex mutex; - std::unique_lock lock(mutex); std::condition_variable cv; - unsigned int peerID = 0; - auto newPeerCallback = [&cv, &peerID](unsigned int newPeerID) { + PeerID peerID = 0; + auto newPeerCallback = [&cv, &peerID, &mutex](const PeerID newPeerID) { + std::unique_lock lock(mutex); peerID = newPeerID; cv.notify_one(); }; @@ -156,6 +154,7 @@ PeerID connect(Service& s, Client& c) c.start(); + std::unique_lock lock(mutex); BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerID]() { return peerID != 0; })); @@ -322,22 +321,23 @@ BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho) Client c(socketPath); c.start(); - std::mutex mtx; - std::unique_lock lck(mtx); + std::mutex mutex; std::condition_variable cv; //Async call std::shared_ptr sentData(new SendData(34)); std::shared_ptr recvData; - auto dataBack = [&cv, &recvData](ipc::Status status, std::shared_ptr& data) { + auto dataBack = [&cv, &recvData, &mutex](ipc::Status status, std::shared_ptr& data) { BOOST_CHECK(status == ipc::Status::OK); + std::unique_lock lock(mutex); recvData = data; cv.notify_one(); }; c.callAsync(1, sentData, dataBack); // Wait for the response - BOOST_CHECK(cv.wait_for(lck, std::chrono::milliseconds(100), [&recvData]() { + std::unique_lock lock(mutex); + BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(100), [&recvData]() { return static_cast(recvData); })); @@ -355,11 +355,11 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho) std::shared_ptr sentData(new SendData(56)); std::shared_ptr recvData; - std::mutex mtx; - std::unique_lock lck(mtx); + std::mutex mutex; std::condition_variable cv; - auto dataBack = [&cv, &recvData](ipc::Status status, std::shared_ptr& data) { + auto dataBack = [&cv, &recvData, &mutex](ipc::Status status, std::shared_ptr& data) { BOOST_CHECK(status == ipc::Status::OK); + std::unique_lock lock(mutex); recvData = data; cv.notify_one(); }; @@ -367,7 +367,8 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho) s.callAsync(1, peerID, sentData, dataBack); // Wait for the response - BOOST_CHECK(cv.wait_for(lck, std::chrono::milliseconds(1000), [&recvData]() { + std::unique_lock lock(mutex); + BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&recvData]() { return recvData.get() != nullptr; })); @@ -386,7 +387,7 @@ BOOST_AUTO_TEST_CASE(SyncTimeout) std::shared_ptr sentData(new SendData(78)); - BOOST_CHECK_THROW((c.callSync(1, sentData, 10)), IPCException); + BOOST_CHECK_THROW((c.callSync(1, sentData, 10)), IPCException); //TODO it fails from time to time } BOOST_AUTO_TEST_CASE(SerializationError) @@ -432,12 +433,12 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError) Client c(socketPath); c.start(); - std::mutex mtx; - std::unique_lock lck(mtx); + std::mutex mutex; std::condition_variable cv; ipc::Status retStatus = ipc::Status::UNDEFINED; - auto dataBack = [&cv, &retStatus](ipc::Status status, std::shared_ptr&) { + auto dataBack = [&cv, &retStatus, &mutex](ipc::Status status, std::shared_ptr&) { + std::unique_lock lock(mutex); retStatus = status; cv.notify_one(); }; @@ -446,10 +447,11 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError) c.callAsync(1, sentData, dataBack); // Wait for the response - BOOST_CHECK(cv.wait_for(lck, std::chrono::seconds(10), [&retStatus]() { + std::unique_lock lock(mutex); + BOOST_CHECK(cv.wait_for(lock, std::chrono::seconds(10), [&retStatus]() { return retStatus != ipc::Status::UNDEFINED; })); - BOOST_CHECK(retStatus == ipc::Status::PEER_DISCONNECTED); + BOOST_CHECK(retStatus == ipc::Status::PEER_DISCONNECTED); //TODO it fails from time to time } @@ -515,7 +517,7 @@ BOOST_AUTO_TEST_CASE(AddSignalInRuntime) s.signal(1, data); // Wait for the signals to arrive - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); //TODO wait_for BOOST_CHECK(isHandlerACalled && isHandlerBCalled); } @@ -547,7 +549,7 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline) s.signal(1, data); // Wait for the signals to arrive - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); //TODO wait_for BOOST_CHECK(isHandlerACalled && isHandlerBCalled); } -- 2.7.4