IPC: Changed recursive_mutex to mutex in the Processor 07/34707/2
authorJan Olszak <j.olszak@samsung.com>
Fri, 30 Jan 2015 16:54:41 +0000 (17:54 +0100)
committerJan Olszak <j.olszak@samsung.com>
Fri, 30 Jan 2015 18:11:30 +0000 (19:11 +0100)
[Bug/Feature]   N/A
[Cause]         N/A
[Solution]      N/A
[Verification]  Build, install, run tests, run tests under valgrind

Change-Id: If683c017354c5a9f7fdf8bb5ce2ad84f9dd82fdd

common/ipc/internals/finish-request.hpp
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp
common/ipc/internals/remove-peer-request.hpp

index 3fd4a4f..d09a14e 100644 (file)
@@ -35,11 +35,11 @@ public:
     FinishRequest(const FinishRequest&) = delete;
     FinishRequest& operator=(const FinishRequest&) = delete;
 
-    FinishRequest(const std::shared_ptr<std::condition_variable_any>& conditionPtr)
+    FinishRequest(const std::shared_ptr<std::condition_variable>& conditionPtr)
         : conditionPtr(conditionPtr)
     {}
 
-    std::shared_ptr<std::condition_variable_any> conditionPtr;
+    std::shared_ptr<std::condition_variable> conditionPtr;
 };
 
 } // namespace ipc
index 58beca8..1f51d5e 100644 (file)
@@ -66,6 +66,7 @@ Processor::Processor(const std::string& logName,
     LOGS(mLogPrefix + "Processor Constructor");
 
     utils::signalBlock(SIGPIPE);
+
     using namespace std::placeholders;
     setSignalHandlerInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
                                                              std::bind(&Processor::onNewSignals, this, _1, _2));
@@ -94,7 +95,7 @@ void Processor::start(bool usesExternalPolling)
     LOGS(mLogPrefix + "Processor start");
 
     Lock lock(mStateMutex);
-    if (!isStarted()) {
+    if (!mIsRunning) {
         LOGI(mLogPrefix + "Processor start");
         mIsRunning = true;
         mUsesExternalPolling = usesExternalPolling;
@@ -109,7 +110,7 @@ void Processor::stop()
     LOGS(mLogPrefix + "Processor stop");
 
     if (isStarted()) {
-        auto conditionPtr = std::make_shared<std::condition_variable_any>();
+        auto conditionPtr = std::make_shared<std::condition_variable>();
         {
             Lock lock(mStateMutex);
             auto request = std::make_shared<FinishRequest>(conditionPtr);
@@ -124,7 +125,7 @@ void Processor::stop()
             // Wait till the FINISH request is served
             Lock lock(mStateMutex);
             conditionPtr->wait(lock, [this]() {
-                return !isStarted();
+                return !mIsRunning;
             });
         }
     }
@@ -168,31 +169,25 @@ FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
     return peerFD;
 }
 
-void Processor::removePeer(const FileDescriptor peerFD)
+void Processor::removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock)
 {
     LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD);
 
-    {
-        Lock lock(mStateMutex);
-        mRequestQueue.removeIf([peerFD](Request & request) {
-            return request.requestID == Event::ADD_PEER &&
-                   request.get<AddPeerRequest>()->peerFD == peerFD;
-        });
-    }
-
-    // Remove peer and wait till he's gone
-    std::shared_ptr<std::condition_variable_any> conditionPtr(new std::condition_variable_any());
-    {
-        Lock lock(mStateMutex);
-        auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
-        mRequestQueue.pushBack(Event::REMOVE_PEER, request);
-    }
-
     auto isPeerDeleted = [&peerFD, this]()->bool {
         return mSockets.count(peerFD) == 0;
     };
 
-    Lock lock(mStateMutex);
+    mRequestQueue.removeIf([peerFD](Request & request) {
+        return request.requestID == Event::ADD_PEER &&
+               request.get<AddPeerRequest>()->peerFD == peerFD;
+    });
+
+    // Remove peer and wait till he's gone
+    std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
+
+    auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
+    mRequestQueue.pushBack(Event::REMOVE_PEER, request);
+
     conditionPtr->wait(lock, isPeerDeleted);
 }
 
@@ -231,8 +226,6 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, const std::excep
         // Notify about the deletion
         mRemovedPeerCallback(peerFD);
     }
-
-    resetPolling();
 }
 
 void Processor::resetPolling()
@@ -243,23 +236,20 @@ void Processor::resetPolling()
         return;
     }
 
-    {
-        Lock lock(mStateMutex);
-        LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
-        // Setup polling on eventfd and sockets
-        mFDs.resize(mSockets.size() + 1);
+    LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
+    // Setup polling on eventfd and sockets
+    mFDs.resize(mSockets.size() + 1);
 
-        mFDs[0].fd = mRequestQueue.getFD();
-        mFDs[0].events = POLLIN;
+    mFDs[0].fd = mRequestQueue.getFD();
+    mFDs[0].events = POLLIN;
 
-        auto socketIt = mSockets.begin();
-        for (unsigned int i = 1; i < mFDs.size(); ++i) {
-            LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
-            mFDs[i].fd = socketIt->second->getFD();
-            mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
-            ++socketIt;
-            // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
-        }
+    auto socketIt = mSockets.begin();
+    for (unsigned int i = 1; i < mFDs.size(); ++i) {
+        LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
+        mFDs[i].fd = socketIt->second->getFD();
+        mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
+        ++socketIt;
+        // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
     }
 }
 
@@ -267,7 +257,10 @@ void Processor::run()
 {
     LOGS(mLogPrefix + "Processor run");
 
-    resetPolling();
+    {
+        Lock lock(mStateMutex);
+        resetPolling();
+    }
 
     while (isStarted()) {
         LOGT(mLogPrefix + "Waiting for communication...");
@@ -284,12 +277,14 @@ void Processor::run()
         // Check for lost connections:
         if (handleLostConnections()) {
             // mFDs changed
+            resetPolling();
             continue;
         }
 
         // Check for incoming data.
         if (handleInputs()) {
             // mFDs changed
+            resetPolling();
             continue;
         }
 
@@ -298,6 +293,7 @@ void Processor::run()
             mFDs[0].revents &= ~(POLLIN);
             if (handleEvent()) {
                 // mFDs changed
+                resetPolling();
                 continue;
             }
         }
@@ -335,7 +331,7 @@ bool Processor::handleLostConnection(const FileDescriptor peerFD)
 
 bool Processor::handleInputs()
 {
-    Lock lock(mStateMutex);
+    // Lock not needed, mFDs won't be changed by handleInput
 
     bool pollChanged = false;
     for (unsigned int i = 1; i < mFDs.size(); ++i) {
@@ -351,6 +347,7 @@ bool Processor::handleInputs()
 bool Processor::handleInput(const FileDescriptor peerFD)
 {
     LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD);
+
     Lock lock(mStateMutex);
 
     std::shared_ptr<Socket> socketPtr;
@@ -384,7 +381,7 @@ bool Processor::handleInput(const FileDescriptor peerFD)
             if (mMethodsCallbacks.count(methodID)) {
                 // Method
                 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
-                return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
+                return onRemoteMethod(*socketPtr, methodID, messageID, methodCallbacks);
 
             } else if (mSignalsCallbacks.count(methodID)) {
                 // Signal
@@ -428,7 +425,6 @@ bool Processor::onReturnValue(const Socket& socket,
 {
     LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
 
-    // LOGI(mLogPrefix + "Return value for messageID: " << messageID);
     ReturnCallbacks returnCallbacks;
     try {
         LOGT(mLogPrefix + "Getting the return callback");
@@ -454,11 +450,9 @@ bool Processor::onReturnValue(const Socket& socket,
         return true;
     }
 
-    // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID);
     ResultBuilder resultBuilder(data);
     IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
 
-    // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed");
     return false;
 }
 
@@ -469,8 +463,6 @@ bool Processor::onRemoteSignal(const Socket& socket,
 {
     LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
 
-    // LOGI(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
-
     std::shared_ptr<void> data;
     try {
         LOGT(mLogPrefix + "Parsing incoming data");
@@ -482,7 +474,6 @@ bool Processor::onRemoteSignal(const Socket& socket,
         return true;
     }
 
-    // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID);
     try {
         signalCallbacks->signal(socket.getFD(), data);
     } catch (const IPCUserException& e) {
@@ -499,13 +490,12 @@ bool Processor::onRemoteSignal(const Socket& socket,
     return false;
 }
 
-bool Processor::onRemoteCall(const Socket& socket,
-                             const MethodID methodID,
-                             const MessageID messageID,
-                             std::shared_ptr<MethodHandlers> methodCallbacks)
+bool Processor::onRemoteMethod(const Socket& socket,
+                               const MethodID methodID,
+                               const MessageID messageID,
+                               std::shared_ptr<MethodHandlers> methodCallbacks)
 {
-    LOGS(mLogPrefix + "Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
-    // LOGI(mLogPrefix + "Remote call; methodID: " << methodID << " messageID: " << messageID);
+    LOGS(mLogPrefix + "Processor onRemoteMethod; methodID: " << methodID << " messageID: " << messageID);
 
     std::shared_ptr<void> data;
     try {
@@ -681,9 +671,6 @@ bool Processor::onAddPeerRequest(AddPeerRequest& request)
                                                    request.peerFD,
                                                    data);
 
-
-    resetPolling();
-
     if (mNewPeerCallback) {
         // Notify about the new user.
         LOGT(mLogPrefix + "Calling NewPeerCallback");
index b6bd615..a77c072 100644 (file)
@@ -84,7 +84,6 @@ const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
 *  - no new events added after stop() called
 *  - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
 *    there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
-*  - remove recursive mutex
 *
 */
 class Processor {
@@ -176,13 +175,6 @@ public:
     FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
 
     /**
-     * Request removing peer and wait
-     *
-     * @param peerFD id of the peer
-     */
-    void removePeer(const FileDescriptor peerFD);
-
-    /**
      * Saves the callbacks connected to the method id.
      * When a message with the given method id is received,
      * the data will be passed to the serialization callback through file descriptor.
@@ -302,7 +294,7 @@ public:
 private:
     typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
     typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
-    typedef std::unique_lock<std::recursive_mutex> Lock;
+    typedef std::unique_lock<std::mutex> Lock;
     typedef RequestQueue<Event>::Request Request;
 
     struct EmptyData {
@@ -394,7 +386,7 @@ private:
     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
 
     // Mutex for modifying any internal data
-    std::recursive_mutex mStateMutex;
+    std::mutex mStateMutex;
 
     PeerCallback mNewPeerCallback;
     PeerCallback mRemovedPeerCallback;
@@ -430,10 +422,10 @@ private:
 
     bool onReturnValue(const Socket& socket,
                        const MessageID messageID);
-    bool onRemoteCall(const Socket& socket,
-                      const MethodID methodID,
-                      const MessageID messageID,
-                      std::shared_ptr<MethodHandlers> methodCallbacks);
+    bool onRemoteMethod(const Socket& socket,
+                        const MethodID methodID,
+                        const MessageID messageID,
+                        std::shared_ptr<MethodHandlers> methodCallbacks);
     bool onRemoteSignal(const Socket& socket,
                         const MethodID methodID,
                         const MessageID messageID,
@@ -441,6 +433,7 @@ private:
     void resetPolling();
     FileDescriptor getNextFileDescriptor();
     void removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr);
+    void removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock);
 
     void onNewSignals(const FileDescriptor peerFD,
                       std::shared_ptr<RegisterSignalsProtocolMessage>& data);
@@ -528,7 +521,6 @@ void Processor::setSignalHandler(const MethodID methodID,
     }
 
     std::shared_ptr<RegisterSignalsProtocolMessage> data;
-    std::vector<FileDescriptor> peerFDs;
 
     {
         Lock lock(mStateMutex);
@@ -546,15 +538,11 @@ void Processor::setSignalHandler(const MethodID methodID,
         data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
 
         for (const auto kv : mSockets) {
-            peerFDs.push_back(kv.first);
+            signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
+                                                           kv.first,
+                                                           data);
         }
     }
-
-    for (const auto peerFD : peerFDs) {
-        signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
-                                                       peerFD,
-                                                       data);
-    }
 }
 
 
@@ -578,12 +566,10 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
                                                       unsigned int timeoutMS)
 {
     Result<ReceivedDataType> result;
-
-    std::mutex mutex;
     std::condition_variable cv;
 
-    auto process = [&result, &mutex, &cv](const Result<ReceivedDataType> && r) {
-        std::unique_lock<std::mutex> lock(mutex);
+    auto process = [&result, &cv](const Result<ReceivedDataType> && r) {
+        // This is called under lock(mStateMutex)
         result = std::move(r);
         cv.notify_all();
     };
@@ -597,27 +583,21 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
         return result.isValid();
     };
 
-    std::unique_lock<std::mutex> lock(mutex);
+    Lock lock(mStateMutex);
     LOGT(mLogPrefix + "Waiting for the response...");
     if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
         LOGW(mLogPrefix + "Probably a timeout in callSync. Checking...");
-        bool isTimeout;
-        {
-            Lock lock(mStateMutex);
-            // Call isn't sent or call is sent but there is no reply
-            isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
-                return request.requestID == Event::METHOD &&
-                       request.get<MethodRequest>()->messageID == messageID;
-            })
-            || mRequestQueue.removeIf([messageID](Request & request) {
-                return request.requestID == Event::SIGNAL &&
-                       request.get<SignalRequest>()->messageID == messageID;
-            })
-            || 1 == mReturnCallbacks.erase(messageID);
-        }
+
+        // Call isn't sent or call is sent but there is no reply
+        bool isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
+            return request.requestID == Event::METHOD &&
+                   request.get<MethodRequest>()->messageID == messageID;
+        })
+        || 1 == mReturnCallbacks.erase(messageID);
+
         if (isTimeout) {
             LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
-            removePeer(peerFD);
+            removePeerSyncInternal(peerFD, lock);
             throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
         } else {
             LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
@@ -636,7 +616,6 @@ void Processor::signalInternal(const MethodID methodID,
                                const FileDescriptor peerFD,
                                const std::shared_ptr<SentDataType>& data)
 {
-    Lock lock(mStateMutex);
     auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
     mRequestQueue.pushFront(Event::SIGNAL, request);
 }
index 4ec07cb..900a8a0 100644 (file)
@@ -39,14 +39,14 @@ public:
     RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
 
     RemovePeerRequest(const FileDescriptor peerFD,
-                      const std::shared_ptr<std::condition_variable_any>& conditionPtr)
+                      const std::shared_ptr<std::condition_variable>& conditionPtr)
         : peerFD(peerFD),
           conditionPtr(conditionPtr)
     {
     }
 
     FileDescriptor peerFD;
-    std::shared_ptr<std::condition_variable_any> conditionPtr;
+    std::shared_ptr<std::condition_variable> conditionPtr;
 };
 
 } // namespace ipc