IPC: Single state mutex in Processor 55/33755/4
authorJan Olszak <j.olszak@samsung.com>
Fri, 9 Jan 2015 12:48:05 +0000 (13:48 +0100)
committerJan Olszak <j.olszak@samsung.com>
Wed, 14 Jan 2015 14:10:40 +0000 (15:10 +0100)
[Bug/Feature]  Fixed a bug in callSync.
               Replaced all mutexes in Processor with only one.
               Added LOGS loggs
[Cause]        N/A
[Solution]     N/A
[Verification] Build, install, run tests, run tests under valgrind

Change-Id: I6b6ec26df5f5d7ba8b930e4321f766146fa556f0

common/ipc/client.cpp
common/ipc/client.hpp
common/ipc/internals/call-queue.cpp
common/ipc/internals/call-queue.hpp
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp
common/ipc/ipc-gsource.cpp
common/ipc/service.cpp
common/ipc/service.hpp
tests/unit_tests/ipc/ut-ipc.cpp

index 3187f15..8455b19 100644 (file)
@@ -34,18 +34,17 @@ namespace ipc {
 Client::Client(const std::string& socketPath)
     : mSocketPath(socketPath)
 {
-    LOGD("Creating client");
+    LOGS("Client Constructor");
 }
 
 Client::~Client()
 {
-    LOGD("Destroying client...");
+    LOGS("Client Destructor");
     try {
         stop();
     } catch (IPCException& e) {
         LOGE("Error in Client's destructor: " << e.what());
     }
-    LOGD("Destroyed client");
 }
 
 void Client::connect()
@@ -58,14 +57,9 @@ void Client::connect()
 
 void Client::start()
 {
-    LOGD("Starting client...");
-
+    LOGS("Client start");
     connect();
-
-    // Start polling thread
     mProcessor.start();
-
-    LOGD("Started client");
 }
 
 bool Client::isStarted()
@@ -75,9 +69,8 @@ bool Client::isStarted()
 
 void Client::stop()
 {
-    LOGD("Stopping client...");
+    LOGS("Client Destructor");
     mProcessor.stop();
-    LOGD("Stopped");
 }
 
 std::vector<FileDescriptor> Client::getFDs()
@@ -107,17 +100,19 @@ void Client::handle(const FileDescriptor fd, const short pollEvent)
 
 void Client::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
+    LOGS("Client setNewPeerCallback");
     mProcessor.setNewPeerCallback(newPeerCallback);
 }
 
 void Client::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
 {
+    LOGS("Client setRemovedPeerCallback");
     mProcessor.setRemovedPeerCallback(removedPeerCallback);
 }
 
 void Client::removeMethod(const MethodID methodID)
 {
-    LOGD("Removing method id: " << methodID);
+    LOGS("Client removeMethod methodID: " << methodID);
     mProcessor.removeMethod(methodID);
 }
 
index b5b00e5..de847a9 100644 (file)
@@ -189,18 +189,16 @@ template<typename SentDataType, typename ReceivedDataType>
 void Client::addMethodHandler(const MethodID methodID,
                               const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
 {
-    LOGD("Adding method with id " << methodID);
+    LOGS("Client addMethodHandler, methodID: " << methodID);
     mProcessor.addMethodHandler<SentDataType, ReceivedDataType>(methodID, method);
-    LOGD("Added method with id " << methodID);
 }
 
 template<typename ReceivedDataType>
 void Client::addSignalHandler(const MethodID methodID,
                               const typename SignalHandler<ReceivedDataType>::type& handler)
 {
-    LOGD("Adding signal with id " << methodID);
+    LOGS("Client addSignalHandler, methodID: " << methodID);
     mProcessor.addSignalHandler<ReceivedDataType>(methodID, handler);
-    LOGD("Added signal with id " << methodID);
 }
 
 template<typename SentDataType, typename ReceivedDataType>
@@ -208,7 +206,7 @@ std::shared_ptr<ReceivedDataType> Client::callSync(const MethodID methodID,
                                                    const std::shared_ptr<SentDataType>& data,
                                                    unsigned int timeoutMS)
 {
-    LOGD("Sync calling method: " << methodID);
+    LOGS("Client callSync, methodID: " << methodID << ", timeoutMS: " << timeoutMS);
     return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceFD, data, timeoutMS);
 }
 
@@ -217,22 +215,20 @@ void Client::callAsync(const MethodID methodID,
                        const std::shared_ptr<SentDataType>& data,
                        const typename ResultHandler<ReceivedDataType>::type& resultCallback)
 {
-    LOGD("Async calling method: " << methodID);
+    LOGS("Client callAsync, methodID: " << methodID);
     mProcessor.callAsync<SentDataType,
                          ReceivedDataType>(methodID,
                                            mServiceFD,
                                            data,
                                            resultCallback);
-    LOGD("Async called method: " << methodID);
 }
 
 template<typename SentDataType>
 void Client::signal(const MethodID methodID,
                     const std::shared_ptr<SentDataType>& data)
 {
-    LOGD("Signaling: " << methodID);
+    LOGS("Client signal, methodID: " << methodID);
     mProcessor.signal<SentDataType>(methodID, data);
-    LOGD("Signaled: " << methodID);
 }
 
 } // namespace ipc
index df70f11..70871e5 100644 (file)
@@ -27,6 +27,7 @@
 #include "ipc/internals/call-queue.hpp"
 #include "ipc/exception.hpp"
 #include "logger/logger.hpp"
+#include <algorithm>
 
 namespace vasum {
 namespace ipc {
@@ -51,6 +52,20 @@ MessageID CallQueue::getNextMessageID()
     return ++mMessageIDCounter;
 }
 
+bool CallQueue::erase(const MessageID messageID)
+{
+    LOGT("Erase messgeID: " << messageID);
+    auto it = std::find(mCalls.begin(), mCalls.end(), messageID);
+    if (it == mCalls.end()) {
+        LOGT("No such messgeID");
+        return false;
+    }
+
+    mCalls.erase(it);
+    LOGT("Erased");
+    return true;
+}
+
 CallQueue::Call CallQueue::pop()
 {
     if (isEmpty()) {
@@ -58,7 +73,7 @@ CallQueue::Call CallQueue::pop()
         throw IPCException("CallQueue is empty");
     }
     Call call = std::move(mCalls.front());
-    mCalls.pop();
+    mCalls.pop_front();
     return call;
 }
 
index 03adfc8..a6e45ed 100644 (file)
 
 #include "ipc/types.hpp"
 #include "config/manager.hpp"
+#include "logger/logger-scope.hpp"
 
 #include <atomic>
-#include <queue>
+#include <list>
 
 namespace vasum {
 namespace ipc {
@@ -45,9 +46,15 @@ public:
     struct Call {
         Call(const Call& other) = delete;
         Call& operator=(const Call&) = delete;
+        Call& operator=(Call&&) = default;
         Call() = default;
         Call(Call&&) = default;
 
+        bool operator==(const MessageID m)
+        {
+            return m == messageID;
+        }
+
         FileDescriptor peerFD;
         MethodID methodID;
         MessageID messageID;
@@ -78,10 +85,12 @@ public:
 
     Call pop();
 
+    bool erase(const MessageID messageID);
+
     bool isEmpty() const;
 
 private:
-    std::queue<Call> mCalls;
+    std::list<Call> mCalls;
     std::atomic<MessageID> mMessageIDCounter;
 
     MessageID getNextMessageID();
@@ -103,21 +112,24 @@ MessageID CallQueue::push(const MethodID methodID,
     call.messageID = messageID;
 
     call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+        LOGS("Method serialize, peerFD: " << fd);
         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
     };
 
     call.parse = [](const int fd)->std::shared_ptr<void> {
+        LOGS("Method parse, peerFD: " << fd);
         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
         config::loadFromFD<ReceivedDataType>(fd, *data);
         return data;
     };
 
     call.process = [process](Status status, std::shared_ptr<void>& data)->void {
+        LOGS("Method process, status: " << toString(status));
         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
         return process(status, tmpData);
     };
 
-    mCalls.push(std::move(call));
+    mCalls.push_back(std::move(call));
 
     return messageID;
 }
@@ -136,10 +148,11 @@ MessageID CallQueue::push(const MethodID methodID,
     call.messageID = messageID;
 
     call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+        LOGS("Signal serialize, peerFD: " << fd);
         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
     };
 
-    mCalls.push(std::move(call));
+    mCalls.push_back(std::move(call));
 
     return messageID;
 }
index 6084718..d1e829e 100644 (file)
@@ -59,25 +59,22 @@ Processor::Processor(const PeerCallback& newPeerCallback,
       mRemovedPeerCallback(removedPeerCallback),
       mMaxNumberOfPeers(maxNumberOfPeers)
 {
-    LOGT("Creating Processor");
+    LOGS("Processor Constructor");
 
     utils::signalBlock(SIGPIPE);
     using namespace std::placeholders;
     addMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
                                                                 std::bind(&Processor::onNewSignals, this, _1, _2));
-
 }
 
 Processor::~Processor()
 {
-    LOGT("Destroying Processor");
+    LOGS("Processor Destructor");
     try {
         stop();
     } catch (IPCException& e) {
         LOGE("Error in Processor's destructor: " << e.what());
     }
-
-    LOGT("Destroyed Processor");
 }
 
 bool Processor::isStarted()
@@ -87,60 +84,60 @@ bool Processor::isStarted()
 
 void Processor::start()
 {
-    LOGT("Starting Processor");
+    LOGS("Processor start");
+
     if (!isStarted()) {
         mThread = std::thread(&Processor::run, this);
     }
-    LOGT("Started Processor");
 }
 
 void Processor::stop()
 {
-    LOGT("Stopping Processor");
+    LOGS("Processor stop");
 
     if (isStarted()) {
-        mEventQueue.send(Event::FINISH);
+        {
+            Lock lock(mStateMutex);
+            mEventQueue.send(Event::FINISH);
+        }
+        LOGT("Waiting for the Processor to stop");
         mThread.join();
     }
-
-    LOGT("Stopped Processor");
 }
 
 void Processor::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
-    Lock lock(mCallbacksMutex);
+    Lock lock(mStateMutex);
     mNewPeerCallback = newPeerCallback;
 }
 
 void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
 {
-    Lock lock(mCallbacksMutex);
+    Lock lock(mStateMutex);
     mRemovedPeerCallback = removedPeerCallback;
 }
 
 FileDescriptor Processor::getEventFD()
 {
+    Lock lock(mStateMutex);
     return mEventQueue.getFD();
 }
 
 void Processor::removeMethod(const MethodID methodID)
 {
-    LOGT("Removing method " << methodID);
-    Lock lock(mCallsMutex);
+    Lock lock(mStateMutex);
     mMethodsCallbacks.erase(methodID);
 }
 
 FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
 {
-    LOGT("Adding socket");
-    FileDescriptor peerFD;
-    {
-        Lock lock(mSocketsMutex);
-        peerFD = socketPtr->getFD();
-        SocketInfo socketInfo(peerFD, std::move(socketPtr));
-        mNewSockets.push(std::move(socketInfo));
-        mEventQueue.send(Event::ADD_PEER);
-    }
+    LOGS("Processor addPeer");
+    Lock lock(mStateMutex);
+    FileDescriptor peerFD = socketPtr->getFD();
+    SocketInfo socketInfo(peerFD, std::move(socketPtr));
+    mNewSockets.push(std::move(socketInfo));
+    mEventQueue.send(Event::ADD_PEER);
+
     LOGI("New peer added. Id: " << peerFD);
 
     return peerFD;
@@ -148,18 +145,22 @@ FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
 
 void Processor::removePeer(const FileDescriptor peerFD)
 {
-    std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
+    LOGS("Processor removePeer peerFD: " << peerFD);
+
+    // TODO: Remove ADD_PEER event if it's not processed
+
 
+    // Remove peer and wait till he's gone
+    std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
     {
-        Lock lock(mSocketsMutex);
+        Lock lock(mStateMutex);
         RemovePeerRequest request(peerFD, conditionPtr);
         mPeersToDelete.push(std::move(request));
         mEventQueue.send(Event::REMOVE_PEER);
     }
 
-
     auto isPeerDeleted = [&peerFD, this]()->bool {
-        Lock lock(mSocketsMutex);
+        Lock lock(mStateMutex);
         return mSockets.count(peerFD) == 0;
     };
 
@@ -170,49 +171,41 @@ void Processor::removePeer(const FileDescriptor peerFD)
 
 void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
 {
-    LOGW("Removing peer. ID: " << peerFD);
-    {
-        Lock lock(mSocketsMutex);
-        if (!mSockets.erase(peerFD)) {
-            LOGW("No such peer. Another thread called removePeerInternal");
-            return;
-        }
+    LOGS("Processor removePeerInternal peerFD: " << peerFD);
+    LOGI("Removing peer. peerFD: " << peerFD);
 
-        // Remove from signal addressees
-        for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
-            it->second.remove(peerFD);
-            if (it->second.empty()) {
-                it = mSignalsPeers.erase(it);
-            } else {
-                ++it;
-            }
-        }
+    if (!mSockets.erase(peerFD)) {
+        LOGW("No such peer. Another thread called removePeerInternal");
+        return;
     }
 
-    {
-        // Erase associated return value callbacks
-        Lock lock(mReturnCallbacksMutex);
-
-        std::shared_ptr<void> data;
-        for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
-            if (it->second.peerFD == peerFD) {
-                IGNORE_EXCEPTIONS(it->second.process(status, data));
-                it = mReturnCallbacks.erase(it);
-            } else {
-                ++it;
-            }
+    // Remove from signal addressees
+    for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
+        it->second.remove(peerFD);
+        if (it->second.empty()) {
+            it = mSignalsPeers.erase(it);
+        } else {
+            ++it;
         }
     }
 
-
-    {
-        Lock lock(mCallbacksMutex);
-        if (mRemovedPeerCallback) {
-            // Notify about the deletion
-            mRemovedPeerCallback(peerFD);
+    // Erase associated return value callbacks
+    std::shared_ptr<void> data;
+    for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
+        if (it->second.peerFD == peerFD) {
+            IGNORE_EXCEPTIONS(it->second.process(status, data));
+            it = mReturnCallbacks.erase(it);
+        } else {
+            ++it;
         }
     }
 
+    if (mRemovedPeerCallback) {
+        // Notify about the deletion
+        mRemovedPeerCallback(peerFD);
+    }
+
+
     resetPolling();
 }
 
@@ -222,25 +215,29 @@ void Processor::resetPolling()
         return;
     }
 
-    LOGI("Resetting polling");
-    // Setup polling on eventfd and sockets
-    Lock lock(mSocketsMutex);
-    mFDs.resize(mSockets.size() + 1);
+    {
+        Lock lock(mStateMutex);
 
-    mFDs[0].fd = mEventQueue.getFD();
-    mFDs[0].events = POLLIN;
+        // Setup polling on eventfd and sockets
+        mFDs.resize(mSockets.size() + 1);
 
-    auto socketIt = mSockets.begin();
-    for (unsigned int i = 1; i < mFDs.size(); ++i) {
-        mFDs[i].fd = socketIt->second->getFD();
-        mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
-        ++socketIt;
-        // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
+        mFDs[0].fd = mEventQueue.getFD();
+        mFDs[0].events = POLLIN;
+
+        auto socketIt = mSockets.begin();
+        for (unsigned int i = 1; i < mFDs.size(); ++i) {
+            mFDs[i].fd = socketIt->second->getFD();
+            mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
+            ++socketIt;
+            // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
+        }
     }
 }
 
 void Processor::run()
 {
+    LOGS("Processor run");
+
     resetPolling();
 
     mIsRunning = true;
@@ -282,60 +279,53 @@ void Processor::run()
 
 bool Processor::handleLostConnections()
 {
-    std::vector<FileDescriptor> peersToRemove;
+    Lock lock(mStateMutex);
+
+    bool isPeerRemoved = false;
     {
-        Lock lock(mSocketsMutex);
         for (unsigned int i = 1; i < mFDs.size(); ++i) {
             if (mFDs[i].revents & POLLHUP) {
                 LOGI("Lost connection to peer: " << mFDs[i].fd);
                 mFDs[i].revents &= ~(POLLHUP);
-                peersToRemove.push_back(mFDs[i].fd);
+                removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED);
+                isPeerRemoved = true;
             }
         }
     }
 
-    for (const FileDescriptor peerFD : peersToRemove) {
-        removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
-    }
-
-    return !peersToRemove.empty();
+    return isPeerRemoved;
 }
 
 bool Processor::handleLostConnection(const FileDescriptor peerFD)
 {
+    Lock lock(mStateMutex);
     removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
     return true;
 }
 
 bool Processor::handleInputs()
 {
-    std::vector<FileDescriptor> peersWithInput;
-    {
-        Lock lock(mSocketsMutex);
-        for (unsigned int i = 1; i < mFDs.size(); ++i) {
-            if (mFDs[i].revents & POLLIN) {
-                mFDs[i].revents &= ~(POLLIN);
-                peersWithInput.push_back(mFDs[i].fd);
-            }
-        }
-    }
+    Lock lock(mStateMutex);
 
     bool pollChanged = false;
-    // Handle input outside the critical section
-    for (const FileDescriptor peerFD : peersWithInput) {
-        pollChanged = pollChanged || handleInput(peerFD);
+    for (unsigned int i = 1; i < mFDs.size(); ++i) {
+        if (mFDs[i].revents & POLLIN) {
+            mFDs[i].revents &= ~(POLLIN);
+            pollChanged = pollChanged || handleInput(mFDs[i].fd);
+        }
     }
+
     return pollChanged;
 }
 
 bool Processor::handleInput(const FileDescriptor peerFD)
 {
-    LOGT("Handle incoming data");
+    LOGS("Processor handleInput peerFD: " << peerFD);
+    Lock lock(mStateMutex);
 
     std::shared_ptr<Socket> socketPtr;
     try {
         // Get the peer's socket
-        Lock lock(mSocketsMutex);
         socketPtr = mSockets.at(peerFD);
     } catch (const std::out_of_range&) {
         LOGE("No such peer: " << peerFD);
@@ -360,22 +350,18 @@ bool Processor::handleInput(const FileDescriptor peerFD)
             return onReturnValue(*socketPtr, messageID);
 
         } else {
-            Lock lock(mCallsMutex);
             if (mMethodsCallbacks.count(methodID)) {
                 // Method
                 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
-                lock.unlock();
                 return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
 
             } else if (mSignalsCallbacks.count(methodID)) {
                 // Signal
                 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
-                lock.unlock();
                 return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
 
             } else {
                 // Nothing
-                lock.unlock();
                 LOGW("No method or signal callback for methodID: " << methodID);
                 removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
                 return true;
@@ -387,9 +373,9 @@ bool Processor::handleInput(const FileDescriptor peerFD)
 std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
                                                               std::shared_ptr<RegisterSignalsMessage>& data)
 {
-    LOGD("New signals for peer: " << peerFD);
-    Lock lock(mSocketsMutex);
-    for (MethodID methodID : data->ids) {
+    LOGS("Processor onNewSignals peerFD: " << peerFD);
+
+    for (const MethodID methodID : data->ids) {
         mSignalsPeers[methodID].push_back(peerFD);
     }
 
@@ -399,10 +385,11 @@ std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescript
 bool Processor::onReturnValue(const Socket& socket,
                               const MessageID messageID)
 {
-    LOGI("Return value for messageID: " << messageID);
+    LOGS("Processor onReturnValue messageID: " << messageID);
+
+    // LOGI("Return value for messageID: " << messageID);
     ReturnCallbacks returnCallbacks;
     try {
-        Lock lock(mReturnCallbacksMutex);
         LOGT("Getting the return callback");
         returnCallbacks = std::move(mReturnCallbacks.at(messageID));
         mReturnCallbacks.erase(messageID);
@@ -423,9 +410,10 @@ bool Processor::onReturnValue(const Socket& socket,
         return true;
     }
 
-    LOGT("Process return value callback for messageID: " << messageID);
+    // LOGT("Process return value callback for messageID: " << messageID);
     IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
 
+    // LOGT("Return value for messageID: " << messageID << " processed");
     return false;
 }
 
@@ -434,7 +422,9 @@ bool Processor::onRemoteSignal(const Socket& socket,
                                const MessageID messageID,
                                std::shared_ptr<SignalHandlers> signalCallbacks)
 {
-    LOGI("Remote signal; methodID: " << methodID << " messageID: " << messageID);
+    LOGS("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
+
+    // LOGI("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
 
     std::shared_ptr<void> data;
     try {
@@ -446,7 +436,7 @@ bool Processor::onRemoteSignal(const Socket& socket,
         return true;
     }
 
-    LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
+    // LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
     try {
         signalCallbacks->signal(socket.getFD(), data);
     } catch (const std::exception& e) {
@@ -463,7 +453,8 @@ bool Processor::onRemoteCall(const Socket& socket,
                              const MessageID messageID,
                              std::shared_ptr<MethodHandlers> methodCallbacks)
 {
-    LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
+    LOGS("Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
+    // LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
 
     std::shared_ptr<void> data;
     try {
@@ -503,13 +494,16 @@ bool Processor::onRemoteCall(const Socket& socket,
 
 bool Processor::handleEvent()
 {
+    LOGS("Processor handleEvent");
+
+    Lock lock(mStateMutex);
+
     switch (mEventQueue.receive()) {
+
     case Event::FINISH: {
         LOGD("Event FINISH");
-
         mIsRunning = false;
         cleanCommunication();
-
         return false;
     }
 
@@ -534,91 +528,69 @@ bool Processor::handleEvent()
 
 bool Processor::onNewPeer()
 {
-    SocketInfo socketInfo;
-    {
-        Lock lock(mSocketsMutex);
+    LOGS("Processor onNewPeer");
 
-        socketInfo = std::move(mNewSockets.front());
-        mNewSockets.pop();
-
-        if (mSockets.size() > mMaxNumberOfPeers) {
-            LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
-            return false;
-        }
-        if (mSockets.count(socketInfo.peerFD) != 0) {
-            LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
-            return false;
-        }
+    // TODO: What if there is no newSocket? (request removed in the mean time)
+    // Add new socket of the peer
+    SocketInfo socketInfo = std::move(mNewSockets.front());
+    mNewSockets.pop();
 
-        mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
+    if (mSockets.size() > mMaxNumberOfPeers) {
+        LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
+        return false;
+    }
+    if (mSockets.count(socketInfo.peerFD) != 0) {
+        LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
+        return false;
     }
 
+    mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
 
-    // Broadcast the new signal to peers
-    LOGW("Sending handled signals");
-    std::list<FileDescriptor> peersFDs;
-    {
-        Lock lock(mSocketsMutex);
-        for (const auto kv : mSockets) {
-            peersFDs.push_back(kv.first);
-        }
-    }
 
+    // LOGW("Sending handled signals");
     std::vector<MethodID> ids;
-    {
-        Lock lock(mSocketsMutex);
-        for (const auto kv : mSignalsCallbacks) {
-            ids.push_back(kv.first);
-        }
+    for (const auto kv : mSignalsCallbacks) {
+        ids.push_back(kv.first);
     }
     auto data = std::make_shared<RegisterSignalsMessage>(ids);
-
-    for (const FileDescriptor peerFD : peersFDs) {
-        callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
-                                                        peerFD,
-                                                        data,
-                                                        discardResultHandler<EmptyData>);
-    }
-    LOGW("Sent handled signals");
-
+    callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
+                                                 socketInfo.peerFD,
+                                                 data,
+                                                 discardResultHandler<EmptyData>);
+    // LOGW("Sent handled signals");
 
     resetPolling();
 
-    {
-        Lock lock(mCallbacksMutex);
-        if (mNewPeerCallback) {
-            // Notify about the new user.
-            LOGT("Calling NewPeerCallback");
-            mNewPeerCallback(socketInfo.peerFD);
-        }
+    if (mNewPeerCallback) {
+        // Notify about the new user.
+        LOGT("Calling NewPeerCallback");
+        mNewPeerCallback(socketInfo.peerFD);
     }
+
     return true;
 }
 
 bool Processor::onRemovePeer()
 {
-    RemovePeerRequest request;
-    {
-        Lock lock(mSocketsMutex);
-        request = std::move(mPeersToDelete.front());
-        mPeersToDelete.pop();
-    }
+    LOGS("Processor onRemovePeer");
 
-    removePeerInternal(request.peerFD, Status::REMOVED_PEER);
-    request.conditionPtr->notify_all();
-    return true;
-}
+    removePeerInternal(mPeersToDelete.front().peerFD, Status::REMOVED_PEER);
 
-CallQueue::Call Processor::getCall()
-{
-    Lock lock(mCallsMutex);
-    return mCalls.pop();
+    mPeersToDelete.front().conditionPtr->notify_all();
+    mPeersToDelete.pop();
+    return true;
 }
 
 bool Processor::onCall()
 {
-    LOGT("Handle call (from another thread) to send a message.");
-    CallQueue::Call call = getCall();
+    LOGS("Processor onCall");
+    CallQueue::Call call;
+    try {
+        call = std::move(mCalls.pop());
+    } catch (const IPCException&) {
+        LOGE("No calls to serve, but got an EVENT::CALL. Event got removed before serving");
+        return false;
+    }
 
     if (call.parse && call.process) {
         return onMethodCall(call);
@@ -629,10 +601,11 @@ bool Processor::onCall()
 
 bool Processor::onSignalCall(CallQueue::Call& call)
 {
+    LOGS("Processor onSignalCall");
+
     std::shared_ptr<Socket> socketPtr;
     try {
         // Get the peer's socket
-        Lock lock(mSocketsMutex);
         socketPtr = mSockets.at(call.peerFD);
     } catch (const std::out_of_range&) {
         LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
@@ -653,15 +626,16 @@ bool Processor::onSignalCall(CallQueue::Call& call)
     }
 
     return false;
-
 }
 
 bool Processor::onMethodCall(CallQueue::Call& call)
 {
+    LOGS("Processor onMethodCall");
     std::shared_ptr<Socket> socketPtr;
+
+
     try {
         // Get the peer's socket
-        Lock lock(mSocketsMutex);
         socketPtr = mSockets.at(call.peerFD);
     } catch (const std::out_of_range&) {
         LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
@@ -672,22 +646,19 @@ bool Processor::onMethodCall(CallQueue::Call& call)
         return false;
     }
 
-    {
-        // Set what to do with the return message, but only if needed
-        Lock lock(mReturnCallbacksMutex);
-        if (mReturnCallbacks.count(call.messageID) != 0) {
-            LOGE("There already was a return callback for messageID: " << call.messageID);
-        }
-        mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
-                                                                     std::move(call.parse),
-                                                                     std::move(call.process)));
+    if (mReturnCallbacks.count(call.messageID) != 0) {
+        LOGE("There already was a return callback for messageID: " << call.messageID);
     }
+    mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
+                                                                 std::move(call.parse),
+                                                                 std::move(call.process)));
 
     try {
         // Send the call with the socket
         Socket::Guard guard = socketPtr->getGuard();
         socketPtr->write(&call.methodID, sizeof(call.methodID));
         socketPtr->write(&call.messageID, sizeof(call.messageID));
+        LOGT("Serializing the message");
         call.serialize(socketPtr->getFD(), call.data);
     } catch (const std::exception& e) {
         LOGE("Error during sending a method: " << e.what());
@@ -695,13 +666,12 @@ bool Processor::onMethodCall(CallQueue::Call& call)
         // Inform about the error,
         IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data));
 
-        {
-            Lock lock(mReturnCallbacksMutex);
-            mReturnCallbacks.erase(call.messageID);
-        }
 
+        mReturnCallbacks.erase(call.messageID);
         removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
+
         return true;
+
     }
 
     return false;
@@ -709,35 +679,36 @@ bool Processor::onMethodCall(CallQueue::Call& call)
 
 void Processor::cleanCommunication()
 {
+    LOGS("Processor cleanCommunication");
+
     while (!mEventQueue.isEmpty()) {
         switch (mEventQueue.receive()) {
         case Event::FINISH: {
-            LOGD("Event FINISH after FINISH");
+            LOGE("Event FINISH after FINISH");
             break;
         }
         case Event::CALL: {
-            LOGD("Event CALL after FINISH");
-            CallQueue::Call call = getCall();
-            if (call.process) {
-                IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
+            LOGW("Event CALL after FINISH");
+            try {
+                CallQueue::Call call = mCalls.pop();
+                if (call.process) {
+                    IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
+                }
+            } catch (const IPCException&) {
+                // No more calls
             }
             break;
         }
 
         case Event::ADD_PEER: {
-            LOGD("Event ADD_PEER after FINISH");
+            LOGW("Event ADD_PEER after FINISH");
             break;
         }
 
         case Event::REMOVE_PEER: {
-            LOGD("Event REMOVE_PEER after FINISH");
-            RemovePeerRequest request;
-            {
-                Lock lock(mSocketsMutex);
-                request = std::move(mPeersToDelete.front());
-                mPeersToDelete.pop();
-            }
-            request.conditionPtr->notify_all();
+            LOGW("Event REMOVE_PEER after FINISH");
+            mPeersToDelete.front().conditionPtr->notify_all();
+            mPeersToDelete.pop();
             break;
         }
         }
index 0e8fe8f..b0f7ea0 100644 (file)
@@ -33,6 +33,7 @@
 #include "config/manager.hpp"
 #include "config/fields.hpp"
 #include "logger/logger.hpp"
+#include "logger/logger-scope.hpp"
 
 #include <poll.h>
 #include <condition_variable>
@@ -75,13 +76,13 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
 *  - new way to generate UIDs
 *  - callbacks for serialization/parsing
 *  - store Sockets in a vector, maybe SocketStore?
-*  - fix valgrind tests
 *  - poll loop outside.
 *  - waiting till the EventQueue is empty before leaving stop()
 *  - no new events added after stop() called
 *  - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
 *    there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
-*
+*  - EventQueue should store std::shared_ptr<void> and it should be the only queue to the Processor thread.
+*    It should have an API for removing events from the middle of the queue
 *
 */
 class Processor {
@@ -279,7 +280,7 @@ public:
 private:
     typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
     typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
-    typedef std::unique_lock<std::mutex> Lock;
+    typedef std::unique_lock<std::recursive_mutex> Lock;
 
     struct EmptyData {
         CONFIG_REGISTER_EMPTY
@@ -376,27 +377,22 @@ private:
 
     bool mIsRunning;
 
-    // Mutex for the Calls queue and the map of methods.
-    std::mutex mCallsMutex;
+
     CallQueue mCalls;
     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
     std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
     std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
 
-    // Mutex for changing mSockets map.
-    // Shouldn't be locked on any read/write, that could block. Just copy the ptr.
-    std::mutex mSocketsMutex;
     std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
     std::vector<struct pollfd> mFDs;
     std::queue<SocketInfo> mNewSockets;
     std::queue<RemovePeerRequest> mPeersToDelete;
 
-    // Mutex for modifying the map with return callbacks
-    std::mutex mReturnCallbacksMutex;
     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
 
-    // Mutex for setting callbacks
-    std::mutex mCallbacksMutex;
+    // Mutex for modifying any internal data
+    std::recursive_mutex mStateMutex;
+
     PeerCallback mNewPeerCallback;
     PeerCallback mRemovedPeerCallback;
 
@@ -408,12 +404,6 @@ private:
     void addMethodHandlerInternal(const MethodID methodID,
                                   const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
 
-    template<typename SentDataType, typename ReceivedDataType>
-    MessageID callInternal(const MethodID methodID,
-                           const FileDescriptor peerFD,
-                           const std::shared_ptr<SentDataType>& data,
-                           const typename ResultHandler<ReceivedDataType>::type& process);
-
     template<typename ReceivedDataType>
     static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
 
@@ -438,7 +428,6 @@ private:
                         std::shared_ptr<SignalHandlers> signalCallbacks);
     void resetPolling();
     FileDescriptor getNextFileDescriptor();
-    CallQueue::Call getCall();
     void removePeerInternal(const FileDescriptor peerFD, Status status);
 
     std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
@@ -470,7 +459,7 @@ void Processor::addMethodHandlerInternal(const MethodID methodID,
     };
 
     {
-        Lock lock(mCallsMutex);
+        Lock lock(mStateMutex);
         mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
     }
 }
@@ -485,14 +474,16 @@ void Processor::addMethodHandler(const MethodID methodID,
     }
 
     {
-        Lock lock(mCallsMutex);
+        Lock lock(mStateMutex);
+
         if (mSignalsCallbacks.count(methodID)) {
             LOGE("MethodID used by a signal: " << methodID);
             throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
         }
+
+        addMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
     }
 
-    addMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
 }
 
 template<typename ReceivedDataType>
@@ -504,64 +495,49 @@ void Processor::addSignalHandler(const MethodID methodID,
         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
     }
 
+    std::shared_ptr<RegisterSignalsMessage> data;
+    std::vector<FileDescriptor> peerFDs;
     {
-        Lock lock(mCallsMutex);
+        Lock lock(mStateMutex);
+
+        // Andd the signal handler:
         if (mMethodsCallbacks.count(methodID)) {
             LOGE("MethodID used by a method: " << methodID);
             throw IPCException("MethodID used by a method: " + std::to_string(methodID));
         }
-    }
 
-    SignalHandlers signalCall;
+        SignalHandlers signalCall;
 
-    signalCall.parse = [](const int fd)->std::shared_ptr<void> {
-        std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
-        config::loadFromFD<ReceivedDataType>(fd, *data);
-        return data;
-    };
+        signalCall.parse = [](const int fd)->std::shared_ptr<void> {
+            std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
+            config::loadFromFD<ReceivedDataType>(fd, *data);
+            return data;
+        };
 
-    signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
-        std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
-        handler(peerFD, tmpData);
-    };
+        signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
+            std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
+            handler(peerFD, tmpData);
+        };
 
-    {
-        Lock lock(mCallsMutex);
         mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
-    }
 
-    std::vector<MethodID> ids {methodID};
-    auto data = std::make_shared<RegisterSignalsMessage>(ids);
+        // Broadcast the new signal:
+        std::vector<MethodID> ids {methodID};
+        data = std::make_shared<RegisterSignalsMessage>(ids);
 
-    std::list<FileDescriptor> peersFDs;
-    {
-        Lock lock(mSocketsMutex);
         for (const auto kv : mSockets) {
-            peersFDs.push_back(kv.first);
+            peerFDs.push_back(kv.first);
         }
     }
 
-    for (const FileDescriptor peerFD : peersFDs) {
+    for (const auto peerFD : peerFDs) {
         callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
                                                     peerFD,
                                                     data,
                                                     DEFAULT_METHOD_TIMEOUT);
     }
-
 }
 
-template<typename SentDataType, typename ReceivedDataType>
-MessageID Processor::callInternal(const MethodID methodID,
-                                  const FileDescriptor peerFD,
-                                  const std::shared_ptr<SentDataType>& data,
-                                  const typename ResultHandler<ReceivedDataType>::type& process)
-{
-    Lock lock(mCallsMutex);
-    MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
-    mEventQueue.send(Event::CALL);
-
-    return messageID;
-}
 
 template<typename SentDataType, typename ReceivedDataType>
 MessageID Processor::callAsync(const MethodID methodID,
@@ -569,7 +545,11 @@ MessageID Processor::callAsync(const MethodID methodID,
                                const std::shared_ptr<SentDataType>& data,
                                const typename ResultHandler<ReceivedDataType>::type& process)
 {
-    return callInternal<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
+    Lock lock(mStateMutex);
+    MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
+    mEventQueue.send(Event::CALL);
+
+    return messageID;
 }
 
 
@@ -586,7 +566,6 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
     Status returnStatus = ipc::Status::UNDEFINED;
 
     auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
-        std::unique_lock<std::mutex> lock(mutex);
         returnStatus = status;
         result = returnedData;
         cv.notify_all();
@@ -603,30 +582,25 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
 
     std::unique_lock<std::mutex> lock(mutex);
     LOGT("Waiting for the response...");
-    // TODO: There is a race here. mReturnCallbacks were used to indicate if the return call was served or not,
-    //       but if the timeout occurs before the call is even sent, then this method is broken.
     if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
-        // Timeout occurred:
-        // - call isn't sent   => delete it
-        // - call is    sent and no reply => throw IPCTimeoutError
-        // - call is being serviced => wait for it with the same timeout
-        LOGT("Probably a timeout in callSync. Checking...");
-
-        bool isTimeout = false;
+        LOGW("Probably a timeout in callSync. Checking...");
+        bool isTimeout;
         {
-            Lock lock(mReturnCallbacksMutex);
-            if (1 == mReturnCallbacks.erase(messageID)) {
-                // Return callback was present, so there was a timeout
-                isTimeout = true;
-            }
+            Lock lock(mStateMutex);
+            // Call isn't sent or call is sent but there is no reply
+            isTimeout = mCalls.erase(messageID) || 1 == mReturnCallbacks.erase(messageID);
         }
+
         if (isTimeout) {
-            removePeer(peerFD);
             LOGE("Function call timeout; methodID: " << methodID);
+            removePeer(peerFD);
             throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
         } else {
-            //Timeout started during the return value processing, so wait for it to finish
-            cv.wait(lock, isResultInitialized);
+            LOGW("Timeout started during the return value processing, so wait for it to finish");
+            if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
+                LOGE("Function call timeout; methodID: " << methodID);
+                throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
+            }
         }
     }
 
@@ -639,14 +613,13 @@ template<typename SentDataType>
 void Processor::signal(const MethodID methodID,
                        const std::shared_ptr<SentDataType>& data)
 {
-    std::list<FileDescriptor> peersFDs;
-    {
-        Lock lock(mSocketsMutex);
-        peersFDs = mSignalsPeers[methodID];
+    Lock lock(mStateMutex);
+    const auto it = mSignalsPeers.find(methodID);
+    if (it == mSignalsPeers.end()) {
+        LOGW("No peer is handling signal with methodID: " << methodID);
+        return;
     }
-
-    for (const FileDescriptor peerFD : peersFDs) {
-        Lock lock(mCallsMutex);
+    for (const FileDescriptor peerFD : it->second) {
         mCalls.push<SentDataType>(methodID, peerFD, data);
         mEventQueue.send(Event::CALL);
     }
index 4c098d9..5a4e137 100644 (file)
@@ -48,7 +48,8 @@ IPCGSource::IPCGSource(const std::vector<FileDescriptor> fds,
                        const HandlerCallback& handlerCallback)
     : mHandlerCallback(handlerCallback)
 {
-    LOGD("Constructing IPCGSource");
+    LOGS("IPCGSource constructor");
+
     for (const FileDescriptor fd : fds) {
         addFD(fd);
     }
@@ -56,13 +57,13 @@ IPCGSource::IPCGSource(const std::vector<FileDescriptor> fds,
 
 IPCGSource::~IPCGSource()
 {
-    LOGD("Destroying IPCGSource");
+    LOGS("~IPCGSource");
 }
 
 IPCGSource::Pointer IPCGSource::create(const std::vector<FileDescriptor>& fds,
                                        const HandlerCallback& handlerCallback)
 {
-    LOGD("Creating IPCGSource");
+    LOGS("Creating IPCGSource");
 
     static GSourceFuncs funcs = { &IPCGSource::prepare,
                                   &IPCGSource::check,
@@ -94,12 +95,13 @@ IPCGSource::Pointer IPCGSource::create(const std::vector<FileDescriptor>& fds,
 
 void IPCGSource::addFD(const FileDescriptor fd)
 {
+
     if (!&mGSource) {
         // In case it's called as a callback but the IPCGSource is destroyed
         return;
     }
+    LOGS("Adding fd to glib");
 
-    LOGD("Adding fd to glib");
     gpointer tag = g_source_add_unix_fd(&mGSource,
                                         fd,
                                         conditions);
@@ -114,7 +116,7 @@ void IPCGSource::removeFD(const FileDescriptor fd)
         return;
     }
 
-    LOGD("Removing fd from glib");
+    LOGS("Removing fd from glib");
     auto it = std::find(mFDInfos.begin(), mFDInfos.end(), fd);
     if (it == mFDInfos.end()) {
         LOGE("No such fd");
@@ -126,7 +128,7 @@ void IPCGSource::removeFD(const FileDescriptor fd)
 
 guint IPCGSource::attach(GMainContext* context)
 {
-    LOGD("Attaching to GMainContext");
+    LOGS("Attaching to GMainContext");
     guint ret = g_source_attach(&mGSource, context);
     g_source_unref(&mGSource);
     return ret;
@@ -177,6 +179,8 @@ gboolean IPCGSource::dispatch(GSource* gSource,
 
 void  IPCGSource::finalize(GSource* gSource)
 {
+    LOGS("IPCGSource Finalize");
+
     if (gSource) {
         IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
         source->~IPCGSource();
index 5e720d6..ef46346 100644 (file)
@@ -40,30 +40,27 @@ Service::Service(const std::string& socketPath,
       mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1))
 
 {
-    LOGD("Creating server");
+    LOGS("Service Constructor");
 }
 
 Service::~Service()
 {
-    LOGD("Destroying server...");
+    LOGS("Service Destructor");
     try {
         stop();
     } catch (IPCException& e) {
         LOGE("Error in Service's destructor: " << e.what());
     }
-    LOGD("Destroyed");
 }
 
 void Service::start()
 {
-    LOGD("Starting server");
+    LOGS("Service start");
     mProcessor.start();
 
     // There can be an incoming connection from mAcceptor before mProcessor is listening,
     // but it's OK. It will handle the connection when ready. So no need to wait for mProcessor.
     mAcceptor.start();
-
-    LOGD("Started server");
 }
 
 bool Service::isStarted()
@@ -73,10 +70,9 @@ bool Service::isStarted()
 
 void Service::stop()
 {
-    LOGD("Stopping server..");
+    LOGS("Service stop");
     mAcceptor.stop();
     mProcessor.stop();
-    LOGD("Stopped");
 }
 
 std::vector<FileDescriptor> Service::getFDs()
@@ -116,19 +112,20 @@ void Service::handle(const FileDescriptor fd, const short pollEvent)
 
 void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
+    LOGS("Service setNewPeerCallback");
     mProcessor.setNewPeerCallback(newPeerCallback);
 }
 
 void Service::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
 {
+    LOGS("Service setRemovedPeerCallback");
     mProcessor.setRemovedPeerCallback(removedPeerCallback);
 }
 
 void Service::removeMethod(const MethodID methodID)
 {
-    LOGD("Removing method " << methodID);
+    LOGS("Service removeMethod methodID: " << methodID);
     mProcessor.removeMethod(methodID);
-    LOGD("Removed " << methodID);
 }
 
 
index ed83606..fa12e30 100644 (file)
@@ -188,18 +188,16 @@ template<typename SentDataType, typename ReceivedDataType>
 void Service::addMethodHandler(const MethodID methodID,
                                const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
 {
-    LOGD("Adding method with id " << methodID);
+    LOGS("Service addMethodHandler, methodID " << methodID);
     mProcessor.addMethodHandler<SentDataType, ReceivedDataType>(methodID, method);
-    LOGD("Added method with id " << methodID);
 }
 
 template<typename ReceivedDataType>
 void Service::addSignalHandler(const MethodID methodID,
                                const typename SignalHandler<ReceivedDataType>::type& handler)
 {
-    LOGD("Adding signal with id " << methodID);
+    LOGS("Service addSignalHandler, methodID " << methodID);
     mProcessor.addSignalHandler<ReceivedDataType>(methodID, handler);
-    LOGD("Added signal with id " << methodID);
 }
 
 template<typename SentDataType, typename ReceivedDataType>
@@ -208,7 +206,9 @@ std::shared_ptr<ReceivedDataType> Service::callSync(const MethodID methodID,
                                                     const std::shared_ptr<SentDataType>& data,
                                                     unsigned int timeoutMS)
 {
-    LOGD("Sync calling method: " << methodID << " for user: " << peerFD);
+    LOGS("Service callSync, methodID: " << methodID
+         << ", peerFD: " << peerFD
+         << ", timeoutMS: " << timeoutMS);
     return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerFD, data, timeoutMS);
 }
 
@@ -218,22 +218,20 @@ void Service::callAsync(const MethodID methodID,
                         const std::shared_ptr<SentDataType>& data,
                         const typename ResultHandler<ReceivedDataType>::type& resultCallback)
 {
-    LOGD("Async calling method: " << methodID << " for user: " << peerFD);
+    LOGS("Service callAsync, methodID: " << methodID << ", peerFD: " << peerFD);
     mProcessor.callAsync<SentDataType,
                          ReceivedDataType>(methodID,
                                            peerFD,
                                            data,
                                            resultCallback);
-    LOGD("Async called method: " << methodID << "for user: " << peerFD);
 }
 
 template<typename SentDataType>
 void Service::signal(const MethodID methodID,
                      const std::shared_ptr<SentDataType>& data)
 {
-    LOGD("Signaling: " << methodID);
+    LOGS("Service signal, methodID: " << methodID);
     mProcessor.signal<SentDataType>(methodID, data);
-    LOGD("Signaled: " << methodID);
 }
 
 } // namespace ipc
index 74c18f3..7c9df6c 100644 (file)
@@ -63,7 +63,7 @@ const int TIMEOUT = 1000 /*ms*/;
 const int SHORT_OPERATION_TIME = TIMEOUT / 100;
 
 // Time that will cause "TIMEOUT" methods to throw
-const int LONG_OPERATION_TIME = 3 * TIMEOUT;
+const int LONG_OPERATION_TIME = 500 + TIMEOUT;
 
 struct Fixture {
     std::string socketPath;
@@ -89,7 +89,7 @@ struct SendData {
 };
 
 struct LongSendData {
-    LongSendData(int i = 0, int waitTime = 1000): mSendData(i), mWaitTime(waitTime), intVal(i) {}
+    LongSendData(int i, int waitTime): mSendData(i), mWaitTime(waitTime), intVal(i) {}
 
     template<typename Visitor>
     void accept(Visitor visitor)
@@ -605,6 +605,9 @@ BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
     c.addSignalHandler<SendData>(1, handlerA);
     c.addSignalHandler<SendData>(2, handlerB);
 
+    // Wait for the signals to propagate to the Service
+    std::this_thread::sleep_for(std::chrono::milliseconds(2 * TIMEOUT));
+
     auto data = std::make_shared<SendData>(1);
     s.signal<SendData>(2, data);
     s.signal<SendData>(1, data);