From 84d3cd04fd0718842bff9790950cbf54eb0671d6 Mon Sep 17 00:00:00 2001 From: Jan Olszak Date: Fri, 30 Jan 2015 17:54:41 +0100 Subject: [PATCH] IPC: Changed recursive_mutex to mutex in the Processor [Bug/Feature] N/A [Cause] N/A [Solution] N/A [Verification] Build, install, run tests, run tests under valgrind Change-Id: If683c017354c5a9f7fdf8bb5ce2ad84f9dd82fdd --- common/ipc/internals/finish-request.hpp | 4 +- common/ipc/internals/processor.cpp | 99 ++++++++++++---------------- common/ipc/internals/processor.hpp | 65 +++++++----------- common/ipc/internals/remove-peer-request.hpp | 4 +- 4 files changed, 69 insertions(+), 103 deletions(-) diff --git a/common/ipc/internals/finish-request.hpp b/common/ipc/internals/finish-request.hpp index 3fd4a4f..d09a14e 100644 --- a/common/ipc/internals/finish-request.hpp +++ b/common/ipc/internals/finish-request.hpp @@ -35,11 +35,11 @@ public: FinishRequest(const FinishRequest&) = delete; FinishRequest& operator=(const FinishRequest&) = delete; - FinishRequest(const std::shared_ptr& conditionPtr) + FinishRequest(const std::shared_ptr& conditionPtr) : conditionPtr(conditionPtr) {} - std::shared_ptr conditionPtr; + std::shared_ptr conditionPtr; }; } // namespace ipc diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index 58beca8..1f51d5e 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -66,6 +66,7 @@ Processor::Processor(const std::string& logName, LOGS(mLogPrefix + "Processor Constructor"); utils::signalBlock(SIGPIPE); + using namespace std::placeholders; setSignalHandlerInternal(REGISTER_SIGNAL_METHOD_ID, std::bind(&Processor::onNewSignals, this, _1, _2)); @@ -94,7 +95,7 @@ void Processor::start(bool usesExternalPolling) LOGS(mLogPrefix + "Processor start"); Lock lock(mStateMutex); - if (!isStarted()) { + if (!mIsRunning) { LOGI(mLogPrefix + "Processor start"); mIsRunning = true; mUsesExternalPolling = usesExternalPolling; @@ -109,7 +110,7 @@ void Processor::stop() LOGS(mLogPrefix + "Processor stop"); if (isStarted()) { - auto conditionPtr = std::make_shared(); + auto conditionPtr = std::make_shared(); { Lock lock(mStateMutex); auto request = std::make_shared(conditionPtr); @@ -124,7 +125,7 @@ void Processor::stop() // Wait till the FINISH request is served Lock lock(mStateMutex); conditionPtr->wait(lock, [this]() { - return !isStarted(); + return !mIsRunning; }); } } @@ -168,31 +169,25 @@ FileDescriptor Processor::addPeer(const std::shared_ptr& socketPtr) return peerFD; } -void Processor::removePeer(const FileDescriptor peerFD) +void Processor::removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock) { LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD); - { - Lock lock(mStateMutex); - mRequestQueue.removeIf([peerFD](Request & request) { - return request.requestID == Event::ADD_PEER && - request.get()->peerFD == peerFD; - }); - } - - // Remove peer and wait till he's gone - std::shared_ptr conditionPtr(new std::condition_variable_any()); - { - Lock lock(mStateMutex); - auto request = std::make_shared(peerFD, conditionPtr); - mRequestQueue.pushBack(Event::REMOVE_PEER, request); - } - auto isPeerDeleted = [&peerFD, this]()->bool { return mSockets.count(peerFD) == 0; }; - Lock lock(mStateMutex); + mRequestQueue.removeIf([peerFD](Request & request) { + return request.requestID == Event::ADD_PEER && + request.get()->peerFD == peerFD; + }); + + // Remove peer and wait till he's gone + std::shared_ptr conditionPtr(new std::condition_variable()); + + auto request = std::make_shared(peerFD, conditionPtr); + mRequestQueue.pushBack(Event::REMOVE_PEER, request); + conditionPtr->wait(lock, isPeerDeleted); } @@ -231,8 +226,6 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, const std::excep // Notify about the deletion mRemovedPeerCallback(peerFD); } - - resetPolling(); } void Processor::resetPolling() @@ -243,23 +236,20 @@ void Processor::resetPolling() return; } - { - Lock lock(mStateMutex); - LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size()); - // Setup polling on eventfd and sockets - mFDs.resize(mSockets.size() + 1); + LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size()); + // Setup polling on eventfd and sockets + mFDs.resize(mSockets.size() + 1); - mFDs[0].fd = mRequestQueue.getFD(); - mFDs[0].events = POLLIN; + mFDs[0].fd = mRequestQueue.getFD(); + mFDs[0].events = POLLIN; - auto socketIt = mSockets.begin(); - for (unsigned int i = 1; i < mFDs.size(); ++i) { - LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD()); - mFDs[i].fd = socketIt->second->getFD(); - mFDs[i].events = POLLIN | POLLHUP; // Listen for input events - ++socketIt; - // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too? - } + auto socketIt = mSockets.begin(); + for (unsigned int i = 1; i < mFDs.size(); ++i) { + LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD()); + mFDs[i].fd = socketIt->second->getFD(); + mFDs[i].events = POLLIN | POLLHUP; // Listen for input events + ++socketIt; + // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too? } } @@ -267,7 +257,10 @@ void Processor::run() { LOGS(mLogPrefix + "Processor run"); - resetPolling(); + { + Lock lock(mStateMutex); + resetPolling(); + } while (isStarted()) { LOGT(mLogPrefix + "Waiting for communication..."); @@ -284,12 +277,14 @@ void Processor::run() // Check for lost connections: if (handleLostConnections()) { // mFDs changed + resetPolling(); continue; } // Check for incoming data. if (handleInputs()) { // mFDs changed + resetPolling(); continue; } @@ -298,6 +293,7 @@ void Processor::run() mFDs[0].revents &= ~(POLLIN); if (handleEvent()) { // mFDs changed + resetPolling(); continue; } } @@ -335,7 +331,7 @@ bool Processor::handleLostConnection(const FileDescriptor peerFD) bool Processor::handleInputs() { - Lock lock(mStateMutex); + // Lock not needed, mFDs won't be changed by handleInput bool pollChanged = false; for (unsigned int i = 1; i < mFDs.size(); ++i) { @@ -351,6 +347,7 @@ bool Processor::handleInputs() bool Processor::handleInput(const FileDescriptor peerFD) { LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD); + Lock lock(mStateMutex); std::shared_ptr socketPtr; @@ -384,7 +381,7 @@ bool Processor::handleInput(const FileDescriptor peerFD) if (mMethodsCallbacks.count(methodID)) { // Method std::shared_ptr methodCallbacks = mMethodsCallbacks.at(methodID); - return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks); + return onRemoteMethod(*socketPtr, methodID, messageID, methodCallbacks); } else if (mSignalsCallbacks.count(methodID)) { // Signal @@ -428,7 +425,6 @@ bool Processor::onReturnValue(const Socket& socket, { LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID); - // LOGI(mLogPrefix + "Return value for messageID: " << messageID); ReturnCallbacks returnCallbacks; try { LOGT(mLogPrefix + "Getting the return callback"); @@ -454,11 +450,9 @@ bool Processor::onReturnValue(const Socket& socket, return true; } - // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID); ResultBuilder resultBuilder(data); IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder)); - // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed"); return false; } @@ -469,8 +463,6 @@ bool Processor::onRemoteSignal(const Socket& socket, { LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID); - // LOGI(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID); - std::shared_ptr data; try { LOGT(mLogPrefix + "Parsing incoming data"); @@ -482,7 +474,6 @@ bool Processor::onRemoteSignal(const Socket& socket, return true; } - // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID); try { signalCallbacks->signal(socket.getFD(), data); } catch (const IPCUserException& e) { @@ -499,13 +490,12 @@ bool Processor::onRemoteSignal(const Socket& socket, return false; } -bool Processor::onRemoteCall(const Socket& socket, - const MethodID methodID, - const MessageID messageID, - std::shared_ptr methodCallbacks) +bool Processor::onRemoteMethod(const Socket& socket, + const MethodID methodID, + const MessageID messageID, + std::shared_ptr methodCallbacks) { - LOGS(mLogPrefix + "Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID); - // LOGI(mLogPrefix + "Remote call; methodID: " << methodID << " messageID: " << messageID); + LOGS(mLogPrefix + "Processor onRemoteMethod; methodID: " << methodID << " messageID: " << messageID); std::shared_ptr data; try { @@ -681,9 +671,6 @@ bool Processor::onAddPeerRequest(AddPeerRequest& request) request.peerFD, data); - - resetPolling(); - if (mNewPeerCallback) { // Notify about the new user. LOGT(mLogPrefix + "Calling NewPeerCallback"); diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp index b6bd615..a77c072 100644 --- a/common/ipc/internals/processor.hpp +++ b/common/ipc/internals/processor.hpp @@ -84,7 +84,6 @@ const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500; * - no new events added after stop() called * - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but * there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :) -* - remove recursive mutex * */ class Processor { @@ -176,13 +175,6 @@ public: FileDescriptor addPeer(const std::shared_ptr& socketPtr); /** - * Request removing peer and wait - * - * @param peerFD id of the peer - */ - void removePeer(const FileDescriptor peerFD); - - /** * Saves the callbacks connected to the method id. * When a message with the given method id is received, * the data will be passed to the serialization callback through file descriptor. @@ -302,7 +294,7 @@ public: private: typedef std::function& data)> SerializeCallback; typedef std::function(int fd)> ParseCallback; - typedef std::unique_lock Lock; + typedef std::unique_lock Lock; typedef RequestQueue::Request Request; struct EmptyData { @@ -394,7 +386,7 @@ private: std::unordered_map mReturnCallbacks; // Mutex for modifying any internal data - std::recursive_mutex mStateMutex; + std::mutex mStateMutex; PeerCallback mNewPeerCallback; PeerCallback mRemovedPeerCallback; @@ -430,10 +422,10 @@ private: bool onReturnValue(const Socket& socket, const MessageID messageID); - bool onRemoteCall(const Socket& socket, - const MethodID methodID, - const MessageID messageID, - std::shared_ptr methodCallbacks); + bool onRemoteMethod(const Socket& socket, + const MethodID methodID, + const MessageID messageID, + std::shared_ptr methodCallbacks); bool onRemoteSignal(const Socket& socket, const MethodID methodID, const MessageID messageID, @@ -441,6 +433,7 @@ private: void resetPolling(); FileDescriptor getNextFileDescriptor(); void removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr); + void removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock); void onNewSignals(const FileDescriptor peerFD, std::shared_ptr& data); @@ -528,7 +521,6 @@ void Processor::setSignalHandler(const MethodID methodID, } std::shared_ptr data; - std::vector peerFDs; { Lock lock(mStateMutex); @@ -546,15 +538,11 @@ void Processor::setSignalHandler(const MethodID methodID, data = std::make_shared(ids); for (const auto kv : mSockets) { - peerFDs.push_back(kv.first); + signalInternal(REGISTER_SIGNAL_METHOD_ID, + kv.first, + data); } } - - for (const auto peerFD : peerFDs) { - signalInternal(REGISTER_SIGNAL_METHOD_ID, - peerFD, - data); - } } @@ -578,12 +566,10 @@ std::shared_ptr Processor::callSync(const MethodID methodID, unsigned int timeoutMS) { Result result; - - std::mutex mutex; std::condition_variable cv; - auto process = [&result, &mutex, &cv](const Result && r) { - std::unique_lock lock(mutex); + auto process = [&result, &cv](const Result && r) { + // This is called under lock(mStateMutex) result = std::move(r); cv.notify_all(); }; @@ -597,27 +583,21 @@ std::shared_ptr Processor::callSync(const MethodID methodID, return result.isValid(); }; - std::unique_lock lock(mutex); + Lock lock(mStateMutex); LOGT(mLogPrefix + "Waiting for the response..."); if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) { LOGW(mLogPrefix + "Probably a timeout in callSync. Checking..."); - bool isTimeout; - { - Lock lock(mStateMutex); - // Call isn't sent or call is sent but there is no reply - isTimeout = mRequestQueue.removeIf([messageID](Request & request) { - return request.requestID == Event::METHOD && - request.get()->messageID == messageID; - }) - || mRequestQueue.removeIf([messageID](Request & request) { - return request.requestID == Event::SIGNAL && - request.get()->messageID == messageID; - }) - || 1 == mReturnCallbacks.erase(messageID); - } + + // Call isn't sent or call is sent but there is no reply + bool isTimeout = mRequestQueue.removeIf([messageID](Request & request) { + return request.requestID == Event::METHOD && + request.get()->messageID == messageID; + }) + || 1 == mReturnCallbacks.erase(messageID); + if (isTimeout) { LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID); - removePeer(peerFD); + removePeerSyncInternal(peerFD, lock); throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID)); } else { LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish"); @@ -636,7 +616,6 @@ void Processor::signalInternal(const MethodID methodID, const FileDescriptor peerFD, const std::shared_ptr& data) { - Lock lock(mStateMutex); auto request = SignalRequest::create(methodID, peerFD, data); mRequestQueue.pushFront(Event::SIGNAL, request); } diff --git a/common/ipc/internals/remove-peer-request.hpp b/common/ipc/internals/remove-peer-request.hpp index 4ec07cb..900a8a0 100644 --- a/common/ipc/internals/remove-peer-request.hpp +++ b/common/ipc/internals/remove-peer-request.hpp @@ -39,14 +39,14 @@ public: RemovePeerRequest& operator=(const RemovePeerRequest&) = delete; RemovePeerRequest(const FileDescriptor peerFD, - const std::shared_ptr& conditionPtr) + const std::shared_ptr& conditionPtr) : peerFD(peerFD), conditionPtr(conditionPtr) { } FileDescriptor peerFD; - std::shared_ptr conditionPtr; + std::shared_ptr conditionPtr; }; } // namespace ipc -- 2.7.4