IPC: Added unique PeerID to the API to identify peers 38/34938/8
authorJan Olszak <j.olszak@samsung.com>
Wed, 4 Feb 2015 13:26:31 +0000 (14:26 +0100)
committerPiotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
Thu, 5 Feb 2015 16:03:13 +0000 (08:03 -0800)
[Bug/Feature]   PeerCallback takes peer's ID and fd
                All API methods take PeerID
                Replaced unordered map with a vector
[Cause]         N/A
[Solution]      N/A
[Verification]  Build, install, run tests, run tests under valgrind

Change-Id: I352fd3942c4a2e6f7f5c891b1dcc0109d8bf7128

13 files changed:
common/ipc/client.cpp
common/ipc/client.hpp
common/ipc/internals/add-peer-request.hpp
common/ipc/internals/method-request.hpp
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp
common/ipc/internals/remove-peer-request.hpp
common/ipc/internals/signal-request.hpp
common/ipc/service.cpp
common/ipc/service.hpp
common/ipc/types.cpp
common/ipc/types.hpp
tests/unit_tests/ipc/ut-ipc.cpp

index 1b7ae56..51e59f8 100644 (file)
@@ -61,7 +61,7 @@ void Client::start(const bool usesExternalPolling)
 
     LOGD("Connecting to " + mSocketPath);
     auto socketPtr = std::make_shared<Socket>(Socket::connectSocket(mSocketPath));
-    mServiceFD = mProcessor.addPeer(socketPtr);
+    mServiceID = mProcessor.addPeer(socketPtr);
 }
 
 bool Client::isStarted()
@@ -123,12 +123,12 @@ void Client::handle(const FileDescriptor fd, const short pollEvent)
 void Client::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
     LOGS("Client setNewPeerCallback");
-    auto callback = [newPeerCallback, this](FileDescriptor fd) {
+    auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
         if (mIPCGSourcePtr) {
             mIPCGSourcePtr->addFD(fd);
         }
         if (newPeerCallback) {
-            newPeerCallback(fd);
+            newPeerCallback(peerID, fd);
         }
     };
     mProcessor.setNewPeerCallback(callback);
@@ -137,12 +137,12 @@ void Client::setNewPeerCallback(const PeerCallback& newPeerCallback)
 void Client::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
 {
     LOGS("Client setRemovedPeerCallback");
-    auto callback = [removedPeerCallback, this](FileDescriptor fd) {
+    auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
         if (mIPCGSourcePtr) {
             mIPCGSourcePtr->removeFD(fd);
         }
         if (removedPeerCallback) {
-            removedPeerCallback(fd);
+            removedPeerCallback(peerID, fd);
         }
     };
     mProcessor.setRemovedPeerCallback(callback);
index 1ee44bb..321b4da 100644 (file)
@@ -174,7 +174,7 @@ private:
     void startPoll();
     void stopPoll();
 
-    FileDescriptor mServiceFD;
+    PeerID mServiceID;
     Processor mProcessor;
     std::string mSocketPath;
     IPCGSource::Pointer mIPCGSourcePtr;
@@ -202,7 +202,7 @@ std::shared_ptr<ReceivedDataType> Client::callSync(const MethodID methodID,
                                                    unsigned int timeoutMS)
 {
     LOGS("Client callSync, methodID: " << methodID << ", timeoutMS: " << timeoutMS);
-    return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceFD, data, timeoutMS);
+    return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceID, data, timeoutMS);
 }
 
 template<typename SentDataType, typename ReceivedDataType>
@@ -213,7 +213,7 @@ void Client::callAsync(const MethodID methodID,
     LOGS("Client callAsync, methodID: " << methodID);
     mProcessor.callAsync<SentDataType,
                          ReceivedDataType>(methodID,
-                                           mServiceFD,
+                                           mServiceID,
                                            data,
                                            resultCallback);
 }
index 3409ba5..57ec06c 100644 (file)
@@ -36,14 +36,14 @@ public:
     AddPeerRequest(const AddPeerRequest&) = delete;
     AddPeerRequest& operator=(const AddPeerRequest&) = delete;
 
-    AddPeerRequest(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
-        : peerFD(peerFD),
-          socketPtr(socketPtr)
+    AddPeerRequest(const std::shared_ptr<Socket>& socketPtr)
+        : socketPtr(socketPtr),
+          peerID(getNextPeerID())
     {
     }
 
-    FileDescriptor peerFD;
     std::shared_ptr<Socket> socketPtr;
+    PeerID peerID;
 };
 
 } // namespace ipc
index 8ed17c5..8ec1223 100644 (file)
@@ -42,12 +42,12 @@ public:
 
     template<typename SentDataType, typename ReceivedDataType>
     static std::shared_ptr<MethodRequest> create(const MethodID methodID,
-                                                 const FileDescriptor peerFD,
+                                                 const PeerID peerID,
                                                  const std::shared_ptr<SentDataType>& data,
                                                  const typename ResultHandler<ReceivedDataType>::type& process);
 
     MethodID methodID;
-    FileDescriptor peerFD;
+    PeerID peerID;
     MessageID messageID;
     std::shared_ptr<void> data;
     SerializeCallback serialize;
@@ -55,9 +55,9 @@ public:
     ResultBuilderHandler process;
 
 private:
-    MethodRequest(const MethodID methodID, const FileDescriptor peerFD)
+    MethodRequest(const MethodID methodID, const PeerID peerID)
         : methodID(methodID),
-          peerFD(peerFD),
+          peerID(peerID),
           messageID(getNextMessageID())
     {}
 };
@@ -65,21 +65,21 @@ private:
 
 template<typename SentDataType, typename ReceivedDataType>
 std::shared_ptr<MethodRequest> MethodRequest::create(const MethodID methodID,
-                                                     const FileDescriptor peerFD,
+                                                     const PeerID peerID,
                                                      const std::shared_ptr<SentDataType>& data,
                                                      const typename ResultHandler<ReceivedDataType>::type& process)
 {
-    std::shared_ptr<MethodRequest> request(new MethodRequest(methodID, peerFD));
+    std::shared_ptr<MethodRequest> request(new MethodRequest(methodID, peerID));
 
     request->data = data;
 
     request->serialize = [](const int fd, std::shared_ptr<void>& data)->void {
-        LOGS("Method serialize, peerFD: " << fd);
+        LOGS("Method serialize, peerID: " << fd);
         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
     };
 
     request->parse = [](const int fd)->std::shared_ptr<void> {
-        LOGS("Method parse, peerFD: " << fd);
+        LOGS("Method parse, peerID: " << fd);
         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
         config::loadFromFD<ReceivedDataType>(fd, *data);
         return data;
index 1f51d5e..d63e490 100644 (file)
@@ -84,6 +84,20 @@ Processor::~Processor()
     }
 }
 
+Processor::Peers::iterator Processor::getPeerInfoIterator(const FileDescriptor fd)
+{
+    return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&fd](const PeerInfo & peerInfo) {
+        return fd == peerInfo.socketPtr->getFD();
+    });
+}
+
+Processor::Peers::iterator Processor::getPeerInfoIterator(const PeerID peerID)
+{
+    return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&peerID](const PeerInfo & peerInfo) {
+        return peerID == peerInfo.peerID;
+    });
+}
+
 bool Processor::isStarted()
 {
     Lock lock(mStateMutex);
@@ -155,55 +169,54 @@ void Processor::removeMethod(const MethodID methodID)
     mMethodsCallbacks.erase(methodID);
 }
 
-FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
+PeerID Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
 {
     LOGS(mLogPrefix + "Processor addPeer");
     Lock lock(mStateMutex);
 
-    FileDescriptor peerFD = socketPtr->getFD();
-    auto request = std::make_shared<AddPeerRequest>(peerFD, socketPtr);
-    mRequestQueue.pushBack(Event::ADD_PEER, request);
+    auto requestPtr = std::make_shared<AddPeerRequest>(socketPtr);
+    mRequestQueue.pushBack(Event::ADD_PEER, requestPtr);
 
-    LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD);
+    LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID);
 
-    return peerFD;
+    return requestPtr->peerID;
 }
 
-void Processor::removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock)
+void Processor::removePeerSyncInternal(const PeerID peerID, Lock& lock)
 {
-    LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD);
+    LOGS(mLogPrefix + "Processor removePeer peerID: " << peerID);
 
-    auto isPeerDeleted = [&peerFD, this]()->bool {
-        return mSockets.count(peerFD) == 0;
+    auto isPeerDeleted = [&peerID, this]()->bool {
+        return getPeerInfoIterator(peerID) == mPeerInfo.end();
     };
 
-    mRequestQueue.removeIf([peerFD](Request & request) {
+    mRequestQueue.removeIf([peerID](Request & request) {
         return request.requestID == Event::ADD_PEER &&
-               request.get<AddPeerRequest>()->peerFD == peerFD;
+               request.get<AddPeerRequest>()->peerID == peerID;
     });
 
     // Remove peer and wait till he's gone
     std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
 
-    auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
+    auto request = std::make_shared<RemovePeerRequest>(peerID, conditionPtr);
     mRequestQueue.pushBack(Event::REMOVE_PEER, request);
 
     conditionPtr->wait(lock, isPeerDeleted);
 }
 
-void Processor::removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr)
+void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_ptr& exceptionPtr)
 {
-    LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD);
-    LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD);
+    LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
+    LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
 
-    if (!mSockets.erase(peerFD)) {
-        LOGW(mLogPrefix + "No such peer. Another thread called removePeerInternal");
+    if (peerIt == mPeerInfo.end()) {
+        LOGW("Peer already removed");
         return;
     }
 
     // Remove from signal addressees
     for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
-        it->second.remove(peerFD);
+        it->second.remove(peerIt->peerID);
         if (it->second.empty()) {
             it = mSignalsPeers.erase(it);
         } else {
@@ -213,7 +226,7 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, const std::excep
 
     // Erase associated return value callbacks
     for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
-        if (it->second.peerFD == peerFD) {
+        if (it->second.peerID == peerIt->peerID) {
             ResultBuilder resultBuilder(exceptionPtr);
             IGNORE_EXCEPTIONS(it->second.process(resultBuilder));
             it = mReturnCallbacks.erase(it);
@@ -224,8 +237,10 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, const std::excep
 
     if (mRemovedPeerCallback) {
         // Notify about the deletion
-        mRemovedPeerCallback(peerFD);
+        mRemovedPeerCallback(peerIt->peerID, peerIt->socketPtr->getFD());
     }
+
+    mPeerInfo.erase(peerIt);
 }
 
 void Processor::resetPolling()
@@ -236,19 +251,20 @@ void Processor::resetPolling()
         return;
     }
 
-    LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
     // Setup polling on eventfd and sockets
-    mFDs.resize(mSockets.size() + 1);
+    mFDs.resize(mPeerInfo.size() + 1);
+    LOGI(mLogPrefix + "Reseting mFDS.size: " << mFDs.size());
 
     mFDs[0].fd = mRequestQueue.getFD();
     mFDs[0].events = POLLIN;
 
-    auto socketIt = mSockets.begin();
     for (unsigned int i = 1; i < mFDs.size(); ++i) {
-        LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
-        mFDs[i].fd = socketIt->second->getFD();
+        auto fd = mPeerInfo[i - 1].socketPtr->getFD();
+
+        LOGI(mLogPrefix + "Reseting fd: " << fd);
+
+        mFDs[i].fd = fd;
         mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
-        ++socketIt;
         // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
     }
 }
@@ -306,25 +322,26 @@ bool Processor::handleLostConnections()
     Lock lock(mStateMutex);
 
     bool isPeerRemoved = false;
-    {
-        for (unsigned int i = 1; i < mFDs.size(); ++i) {
-            if (mFDs[i].revents & POLLHUP) {
-                LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd);
-                mFDs[i].revents &= ~(POLLHUP);
-                removePeerInternal(mFDs[i].fd,
-                                   std::make_exception_ptr(IPCPeerDisconnectedException()));
-                isPeerRemoved = true;
-            }
+
+    for (unsigned int i = 1; i < mFDs.size(); ++i) {
+        if (mFDs[i].revents & POLLHUP) {
+            auto peerIt = getPeerInfoIterator(mFDs[i].fd);
+            LOGI(mLogPrefix + "Lost connection to peer: " << peerIt->peerID);
+            mFDs[i].revents &= ~(POLLHUP);
+            removePeerInternal(peerIt,
+                               std::make_exception_ptr(IPCPeerDisconnectedException()));
+            isPeerRemoved = true;
         }
     }
 
     return isPeerRemoved;
 }
 
-bool Processor::handleLostConnection(const FileDescriptor peerFD)
+bool Processor::handleLostConnection(const FileDescriptor fd)
 {
     Lock lock(mStateMutex);
-    removePeerInternal(peerFD,
+    auto peerIt = getPeerInfoIterator(fd);
+    removePeerInternal(peerIt,
                        std::make_exception_ptr(IPCPeerDisconnectedException()));
     return true;
 }
@@ -344,54 +361,53 @@ bool Processor::handleInputs()
     return pollChanged;
 }
 
-bool Processor::handleInput(const FileDescriptor peerFD)
+bool Processor::handleInput(const FileDescriptor fd)
 {
-    LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD);
+    LOGS(mLogPrefix + "Processor handleInput fd: " << fd);
 
     Lock lock(mStateMutex);
 
-    std::shared_ptr<Socket> socketPtr;
-    try {
-        // Get the peer's socket
-        socketPtr = mSockets.at(peerFD);
-    } catch (const std::out_of_range&) {
-        LOGE(mLogPrefix + "No such peer: " << peerFD);
+    auto peerIt = getPeerInfoIterator(fd);
+
+    if (peerIt == mPeerInfo.end()) {
+        LOGE(mLogPrefix + "No peer for fd: " << fd);
         return false;
     }
 
+    Socket& socket = *peerIt->socketPtr;
+
     MethodID methodID;
     MessageID messageID;
     {
-        Socket::Guard guard = socketPtr->getGuard();
+        Socket::Guard guard = socket.getGuard();
         try {
-            socketPtr->read(&methodID, sizeof(methodID));
-            socketPtr->read(&messageID, sizeof(messageID));
+            socket.read(&methodID, sizeof(methodID));
+            socket.read(&messageID, sizeof(messageID));
 
         } catch (const IPCException& e) {
             LOGE(mLogPrefix + "Error during reading the socket");
-            removePeerInternal(socketPtr->getFD(),
+            removePeerInternal(peerIt,
                                std::make_exception_ptr(IPCNaughtyPeerException()));
             return true;
         }
 
         if (methodID == RETURN_METHOD_ID) {
-            return onReturnValue(*socketPtr, messageID);
+            return onReturnValue(peerIt, messageID);
 
         } else {
             if (mMethodsCallbacks.count(methodID)) {
                 // Method
                 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
-                return onRemoteMethod(*socketPtr, methodID, messageID, methodCallbacks);
+                return onRemoteMethod(peerIt, methodID, messageID, methodCallbacks);
 
             } else if (mSignalsCallbacks.count(methodID)) {
                 // Signal
                 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
-                return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
+                return onRemoteSignal(peerIt, methodID, messageID, signalCallbacks);
 
             } else {
-                // Nothing
                 LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID);
-                removePeerInternal(socketPtr->getFD(),
+                removePeerInternal(peerIt,
                                    std::make_exception_ptr(IPCNaughtyPeerException()));
                 return true;
             }
@@ -399,20 +415,20 @@ bool Processor::handleInput(const FileDescriptor peerFD)
     }
 }
 
-void Processor::onNewSignals(const FileDescriptor peerFD,
-                             std::shared_ptr<RegisterSignalsProtocolMessage>& data)
+void Processor::onNewSignals(const PeerID peerID, std::shared_ptr<RegisterSignalsProtocolMessage>& data)
 {
-    LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD);
+    LOGS(mLogPrefix + "Processor onNewSignals peerID: " << peerID);
 
     for (const MethodID methodID : data->ids) {
-        mSignalsPeers[methodID].push_back(peerFD);
+        mSignalsPeers[methodID].push_back(peerID);
     }
 }
 
-void Processor::onErrorSignal(const FileDescriptor, std::shared_ptr<ErrorProtocolMessage>& data)
+void Processor::onErrorSignal(const PeerID, std::shared_ptr<ErrorProtocolMessage>& data)
 {
     LOGS(mLogPrefix + "Processor onErrorSignal messageID: " << data->messageID);
 
+    // If there is no return callback an out_of_range error will be thrown and peer will be removed
     ReturnCallbacks returnCallbacks = std::move(mReturnCallbacks.at(data->messageID));
     mReturnCallbacks.erase(data->messageID);
 
@@ -420,7 +436,7 @@ void Processor::onErrorSignal(const FileDescriptor, std::shared_ptr<ErrorProtoco
     IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
 }
 
-bool Processor::onReturnValue(const Socket& socket,
+bool Processor::onReturnValue(Peers::iterator& peerIt,
                               const MessageID messageID)
 {
     LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
@@ -432,7 +448,7 @@ bool Processor::onReturnValue(const Socket& socket,
         mReturnCallbacks.erase(messageID);
     } catch (const std::out_of_range&) {
         LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
-        removePeerInternal(socket.getFD(),
+        removePeerInternal(peerIt,
                            std::make_exception_ptr(IPCNaughtyPeerException()));
         return true;
     }
@@ -440,12 +456,12 @@ bool Processor::onReturnValue(const Socket& socket,
     std::shared_ptr<void> data;
     try {
         LOGT(mLogPrefix + "Parsing incoming return data");
-        data = returnCallbacks.parse(socket.getFD());
+        data = returnCallbacks.parse(peerIt->socketPtr->getFD());
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
         ResultBuilder resultBuilder(std::make_exception_ptr(IPCParsingException()));
         IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
-        removePeerInternal(socket.getFD(),
+        removePeerInternal(peerIt,
                            std::make_exception_ptr(IPCParsingException()));
         return true;
     }
@@ -456,7 +472,7 @@ bool Processor::onReturnValue(const Socket& socket,
     return false;
 }
 
-bool Processor::onRemoteSignal(const Socket& socket,
+bool Processor::onRemoteSignal(Peers::iterator& peerIt,
                                const MethodID methodID,
                                const MessageID messageID,
                                std::shared_ptr<SignalHandlers> signalCallbacks)
@@ -466,22 +482,22 @@ bool Processor::onRemoteSignal(const Socket& socket,
     std::shared_ptr<void> data;
     try {
         LOGT(mLogPrefix + "Parsing incoming data");
-        data = signalCallbacks->parse(socket.getFD());
+        data = signalCallbacks->parse(peerIt->socketPtr->getFD());
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
-        removePeerInternal(socket.getFD(),
+        removePeerInternal(peerIt,
                            std::make_exception_ptr(IPCParsingException()));
         return true;
     }
 
     try {
-        signalCallbacks->signal(socket.getFD(), data);
+        signalCallbacks->signal(peerIt->peerID, data);
     } catch (const IPCUserException& e) {
         LOGW("Discarded user's exception");
         return false;
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception in method handler: " << e.what());
-        removePeerInternal(socket.getFD(),
+        removePeerInternal(peerIt,
                            std::make_exception_ptr(IPCNaughtyPeerException()));
 
         return true;
@@ -490,7 +506,7 @@ bool Processor::onRemoteSignal(const Socket& socket,
     return false;
 }
 
-bool Processor::onRemoteMethod(const Socket& socket,
+bool Processor::onRemoteMethod(Peers::iterator& peerIt,
                                const MethodID methodID,
                                const MessageID messageID,
                                std::shared_ptr<MethodHandlers> methodCallbacks)
@@ -500,10 +516,10 @@ bool Processor::onRemoteMethod(const Socket& socket,
     std::shared_ptr<void> data;
     try {
         LOGT(mLogPrefix + "Parsing incoming data");
-        data = methodCallbacks->parse(socket.getFD());
+        data = methodCallbacks->parse(peerIt->socketPtr->getFD());
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
-        removePeerInternal(socket.getFD(),
+        removePeerInternal(peerIt,
                            std::make_exception_ptr(IPCParsingException()));
         return true;
     }
@@ -511,15 +527,15 @@ bool Processor::onRemoteMethod(const Socket& socket,
     LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID);
     std::shared_ptr<void> returnData;
     try {
-        returnData = methodCallbacks->method(socket.getFD(), data);
+        returnData = methodCallbacks->method(peerIt->peerID, data);
     } catch (const IPCUserException& e) {
         LOGW("User's exception");
         auto data = std::make_shared<ErrorProtocolMessage>(messageID, e.getCode(), e.what());
-        signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, socket.getFD(), data);
+        signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, peerIt->peerID, data);
         return false;
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception in method handler: " << e.what());
-        removePeerInternal(socket.getFD(),
+        removePeerInternal(peerIt,
                            std::make_exception_ptr(IPCNaughtyPeerException()));
         return true;
     }
@@ -527,13 +543,14 @@ bool Processor::onRemoteMethod(const Socket& socket,
     LOGT(mLogPrefix + "Sending return data; methodID: " << methodID << "; messageID: " << messageID);
     try {
         // Send the call with the socket
+        Socket& socket = *peerIt->socketPtr;
         Socket::Guard guard = socket.getGuard();
         socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
         socket.write(&messageID, sizeof(messageID));
         methodCallbacks->serialize(socket.getFD(), returnData);
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception during serialization: " << e.what());
-        removePeerInternal(socket.getFD(),
+        removePeerInternal(peerIt,
                            std::make_exception_ptr(IPCSerializationException()));
 
         return true;
@@ -565,13 +582,11 @@ bool Processor::handleEvent()
 bool Processor::onMethodRequest(MethodRequest& request)
 {
     LOGS(mLogPrefix + "Processor onMethodRequest");
-    std::shared_ptr<Socket> socketPtr;
 
-    try {
-        // Get the peer's socket
-        socketPtr = mSockets.at(request.peerFD);
-    } catch (const std::out_of_range&) {
-        LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
+    auto peerIt = getPeerInfoIterator(request.peerID);
+
+    if (peerIt == mPeerInfo.end()) {
+        LOGE(mLogPrefix + "Peer disconnected. No user with a peerID: " << request.peerID);
 
         // Pass the error to the processing callback
         ResultBuilder resultBuilder(std::make_exception_ptr(IPCPeerDisconnectedException()));
@@ -583,17 +598,18 @@ bool Processor::onMethodRequest(MethodRequest& request)
     if (mReturnCallbacks.count(request.messageID) != 0) {
         LOGE(mLogPrefix + "There already was a return callback for messageID: " << request.messageID);
     }
-    mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(request.peerFD,
+    mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(peerIt->peerID,
                                                                     std::move(request.parse),
                                                                     std::move(request.process)));
 
+    Socket& socket = *peerIt->socketPtr;
     try {
         // Send the call with the socket
-        Socket::Guard guard = socketPtr->getGuard();
-        socketPtr->write(&request.methodID, sizeof(request.methodID));
-        socketPtr->write(&request.messageID, sizeof(request.messageID));
+        Socket::Guard guard = socket.getGuard();
+        socket.write(&request.methodID, sizeof(request.methodID));
+        socket.write(&request.messageID, sizeof(request.messageID));
         LOGT(mLogPrefix + "Serializing the message");
-        request.serialize(socketPtr->getFD(), request.data);
+        request.serialize(socket.getFD(), request.data);
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Error during sending a method: " << e.what());
 
@@ -603,10 +619,8 @@ bool Processor::onMethodRequest(MethodRequest& request)
 
 
         mReturnCallbacks.erase(request.messageID);
-        removePeerInternal(request.peerFD,
+        removePeerInternal(peerIt,
                            std::make_exception_ptr(IPCSerializationException()));
-
-
         return true;
 
     }
@@ -618,27 +632,25 @@ bool Processor::onSignalRequest(SignalRequest& request)
 {
     LOGS(mLogPrefix + "Processor onSignalRequest");
 
-    std::shared_ptr<Socket> socketPtr;
-    try {
-        // Get the peer's socket
-        socketPtr = mSockets.at(request.peerFD);
-    } catch (const std::out_of_range&) {
-        LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
+    auto peerIt = getPeerInfoIterator(request.peerID);
+
+    if (peerIt == mPeerInfo.end()) {
+        LOGE(mLogPrefix + "Peer disconnected. No user for peerID: " << request.peerID);
         return false;
     }
 
+    Socket& socket = *peerIt->socketPtr;
     try {
         // Send the call with the socket
-        Socket::Guard guard = socketPtr->getGuard();
-        socketPtr->write(&request.methodID, sizeof(request.methodID));
-        socketPtr->write(&request.messageID, sizeof(request.messageID));
-        request.serialize(socketPtr->getFD(), request.data);
+        Socket::Guard guard = socket.getGuard();
+        socket.write(&request.methodID, sizeof(request.methodID));
+        socket.write(&request.messageID, sizeof(request.messageID));
+        request.serialize(socket.getFD(), request.data);
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
 
-        removePeerInternal(request.peerFD,
+        removePeerInternal(peerIt,
                            std::make_exception_ptr(IPCSerializationException()));
-
         return true;
     }
 
@@ -649,16 +661,18 @@ bool Processor::onAddPeerRequest(AddPeerRequest& request)
 {
     LOGS(mLogPrefix + "Processor onAddPeerRequest");
 
-    if (mSockets.size() > mMaxNumberOfPeers) {
-        LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerFD);
+    if (mPeerInfo.size() > mMaxNumberOfPeers) {
+        LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerID);
         return false;
     }
-    if (mSockets.count(request.peerFD) != 0) {
-        LOGE(mLogPrefix + "There already was a socket for peerFD: " << request.peerFD);
+
+    if (getPeerInfoIterator(request.peerID) != mPeerInfo.end()) {
+        LOGE(mLogPrefix + "There already was a socket for peerID: " << request.peerID);
         return false;
     }
 
-    mSockets[request.peerFD] = std::move(request.socketPtr);
+    PeerInfo peerInfo(request.peerID, request.socketPtr);
+    mPeerInfo.push_back(std::move(peerInfo));
 
 
     // Sending handled signals
@@ -668,16 +682,16 @@ bool Processor::onAddPeerRequest(AddPeerRequest& request)
     }
     auto data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
     signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
-                                                   request.peerFD,
+                                                   request.peerID,
                                                    data);
 
     if (mNewPeerCallback) {
         // Notify about the new user.
         LOGT(mLogPrefix + "Calling NewPeerCallback");
-        mNewPeerCallback(request.peerFD);
+        mNewPeerCallback(request.peerID, request.socketPtr->getFD());
     }
 
-    LOGI(mLogPrefix + "New peer: " << request.peerFD);
+    LOGI(mLogPrefix + "New peerID: " << request.peerID);
     return true;
 }
 
@@ -685,7 +699,7 @@ bool Processor::onRemovePeerRequest(RemovePeerRequest& request)
 {
     LOGS(mLogPrefix + "Processor onRemovePeer");
 
-    removePeerInternal(request.peerFD,
+    removePeerInternal(getPeerInfoIterator(request.peerID),
                        std::make_exception_ptr(IPCRemovedPeerException()));
 
     request.conditionPtr->notify_all();
index a77c072..604d863 100644 (file)
@@ -170,9 +170,9 @@ public:
      * Calls the newPeerCallback.
      *
      * @param socketPtr pointer to the new socket
-     * @return peerFD of the new socket
+     * @return peerID of the new user
      */
-    FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
+    PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
 
     /**
      * Saves the callbacks connected to the method id.
@@ -219,7 +219,7 @@ public:
      * Synchronous method call.
      *
      * @param methodID API dependent id of the method
-     * @param peerFD id of the peer
+     * @param peerD id of the peer
      * @param data data to sent
      * @param timeoutMS how long to wait for the return value before throw
      * @tparam SentDataType data type to send
@@ -227,7 +227,7 @@ public:
      */
     template<typename SentDataType, typename ReceivedDataType>
     std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
-                                               const FileDescriptor peerFD,
+                                               const PeerID peerID,
                                                const std::shared_ptr<SentDataType>& data,
                                                unsigned int timeoutMS = 500);
 
@@ -235,7 +235,7 @@ public:
      * Asynchronous method call
      *
      * @param methodID API dependent id of the method
-     * @param peerFD id of the peer
+     * @param peerID id of the peer
      * @param data data to sent
      * @param process callback processing the return data
      * @tparam SentDataType data type to send
@@ -243,7 +243,7 @@ public:
      */
     template<typename SentDataType, typename ReceivedDataType>
     MessageID callAsync(const MethodID methodID,
-                        const FileDescriptor peerFD,
+                        const PeerID peerID,
                         const std::shared_ptr<SentDataType>& data,
                         const typename ResultHandler<ReceivedDataType>::type& process);
 
@@ -265,19 +265,19 @@ public:
      * Removes one peer.
      * Handler used in external polling.
      *
-     * @param peerFD file description identifying the peer
+     * @param fd file description identifying the peer
      * @return should the polling structure be rebuild
      */
-    bool handleLostConnection(const FileDescriptor peerFD);
+    bool handleLostConnection(const FileDescriptor fd);
 
     /**
      * Handles input from one peer.
      * Handler used in external polling.
      *
-     * @param peerFD file description identifying the peer
+     * @param fd file description identifying the peer
      * @return should the polling structure be rebuild
      */
-    bool handleInput(const FileDescriptor peerFD);
+    bool handleInput(const FileDescriptor fd);
 
     /**
      * Handle one event from the internal event's queue
@@ -361,14 +361,31 @@ private:
         ReturnCallbacks(ReturnCallbacks&&) = default;
         ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
 
-        ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultBuilderHandler& process)
-            : peerFD(peerFD), parse(parse), process(process) {}
+        ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultBuilderHandler& process)
+            : peerID(peerID), parse(parse), process(process) {}
 
-        FileDescriptor peerFD;
+        PeerID peerID;
         ParseCallback parse;
         ResultBuilderHandler process;
     };
 
+    struct PeerInfo {
+        PeerInfo(const PeerInfo& other) = delete;
+        PeerInfo& operator=(const PeerInfo&) = delete;
+        PeerInfo() = delete;
+
+        PeerInfo(PeerInfo&&) = default;
+        PeerInfo& operator=(PeerInfo &&) = default;
+
+        PeerInfo(PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
+            : peerID(peerID), socketPtr(socketPtr) {}
+
+        PeerID peerID;
+        std::shared_ptr<Socket> socketPtr;
+    };
+
+    typedef std::vector<PeerInfo> Peers;
+
     std::string mLogPrefix;
 
     RequestQueue<Event> mRequestQueue;
@@ -378,9 +395,9 @@ private:
 
     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
     std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
-    std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
+    std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
 
-    std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
+    Peers mPeerInfo;
     std::vector<struct pollfd> mFDs;
 
     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
@@ -405,7 +422,7 @@ private:
 
     template<typename SentDataType>
     void signalInternal(const MethodID methodID,
-                        const FileDescriptor peerFD,
+                        const PeerID peerID,
                         const std::shared_ptr<SentDataType>& data);
 
     void run();
@@ -420,27 +437,30 @@ private:
     bool handleLostConnections();
     bool handleInputs();
 
-    bool onReturnValue(const Socket& socket,
+    bool onReturnValue(Peers::iterator& peerIt,
                        const MessageID messageID);
-    bool onRemoteMethod(const Socket& socket,
+    bool onRemoteMethod(Peers::iterator& peerIt,
                         const MethodID methodID,
                         const MessageID messageID,
                         std::shared_ptr<MethodHandlers> methodCallbacks);
-    bool onRemoteSignal(const Socket& socket,
+    bool onRemoteSignal(Peers::iterator& peerIt,
                         const MethodID methodID,
                         const MessageID messageID,
                         std::shared_ptr<SignalHandlers> signalCallbacks);
     void resetPolling();
-    FileDescriptor getNextFileDescriptor();
-    void removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr);
-    void removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock);
 
-    void onNewSignals(const FileDescriptor peerFD,
+    void removePeerInternal(Peers::iterator peerIt,
+                            const std::exception_ptr& exceptionPtr);
+    void removePeerSyncInternal(const PeerID peerID, Lock& lock);
+
+    void onNewSignals(const PeerID peerID,
                       std::shared_ptr<RegisterSignalsProtocolMessage>& data);
 
-    void onErrorSignal(const FileDescriptor peerFD,
+    void onErrorSignal(const PeerID peerID,
                        std::shared_ptr<ErrorProtocolMessage>& data);
 
+    Peers::iterator getPeerInfoIterator(const FileDescriptor fd);
+    Peers::iterator getPeerInfoIterator(const PeerID peerID);
 
 };
 
@@ -460,9 +480,9 @@ void Processor::setMethodHandlerInternal(const MethodID methodID,
         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
     };
 
-    methodCall.method = [method](const FileDescriptor peerFD, std::shared_ptr<void>& data)->std::shared_ptr<void> {
+    methodCall.method = [method](const PeerID peerID, std::shared_ptr<void>& data)->std::shared_ptr<void> {
         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
-        return method(peerFD, tmpData);
+        return method(peerID, tmpData);
     };
 
     mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
@@ -502,9 +522,9 @@ void Processor::setSignalHandlerInternal(const MethodID methodID,
         return dataToFill;
     };
 
-    signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& dataReceived) {
+    signalCall.signal = [handler](const PeerID peerID, std::shared_ptr<void>& dataReceived) {
         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(dataReceived);
-        handler(peerFD, tmpData);
+        handler(peerID, tmpData);
     };
 
     mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
@@ -537,9 +557,9 @@ void Processor::setSignalHandler(const MethodID methodID,
         std::vector<MethodID> ids {methodID};
         data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
 
-        for (const auto kv : mSockets) {
+        for (const PeerInfo& peerInfo : mPeerInfo) {
             signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
-                                                           kv.first,
+                                                           peerInfo.peerID,
                                                            data);
         }
     }
@@ -548,12 +568,12 @@ void Processor::setSignalHandler(const MethodID methodID,
 
 template<typename SentDataType, typename ReceivedDataType>
 MessageID Processor::callAsync(const MethodID methodID,
-                               const FileDescriptor peerFD,
+                               const PeerID peerID,
                                const std::shared_ptr<SentDataType>& data,
                                const typename ResultHandler<ReceivedDataType>::type& process)
 {
     Lock lock(mStateMutex);
-    auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
+    auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
     mRequestQueue.pushBack(Event::METHOD, request);
     return request->messageID;
 }
@@ -561,7 +581,7 @@ MessageID Processor::callAsync(const MethodID methodID,
 
 template<typename SentDataType, typename ReceivedDataType>
 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
-                                                      const FileDescriptor peerFD,
+                                                      const PeerID peerID,
                                                       const std::shared_ptr<SentDataType>& data,
                                                       unsigned int timeoutMS)
 {
@@ -575,7 +595,7 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
     };
 
     MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
-                                                                    peerFD,
+                                                                    peerID,
                                                                     data,
                                                                     process);
 
@@ -597,7 +617,7 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
 
         if (isTimeout) {
             LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
-            removePeerSyncInternal(peerFD, lock);
+            removePeerSyncInternal(peerID, lock);
             throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
         } else {
             LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
@@ -613,10 +633,10 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
 
 template<typename SentDataType>
 void Processor::signalInternal(const MethodID methodID,
-                               const FileDescriptor peerFD,
+                               const PeerID peerID,
                                const std::shared_ptr<SentDataType>& data)
 {
-    auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
+    auto request = SignalRequest::create<SentDataType>(methodID, peerID, data);
     mRequestQueue.pushFront(Event::SIGNAL, request);
 }
 
@@ -630,8 +650,8 @@ void Processor::signal(const MethodID methodID,
         LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID);
         return;
     }
-    for (const FileDescriptor peerFD : it->second) {
-        auto request =  SignalRequest::create<SentDataType>(methodID, peerFD, data);
+    for (const PeerID peerID : it->second) {
+        auto request =  SignalRequest::create<SentDataType>(methodID, peerID, data);
         mRequestQueue.pushBack(Event::SIGNAL, request);
     }
 }
index 900a8a0..6ef8eca 100644 (file)
@@ -38,14 +38,14 @@ public:
     RemovePeerRequest(const RemovePeerRequest&) = delete;
     RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
 
-    RemovePeerRequest(const FileDescriptor peerFD,
+    RemovePeerRequest(const PeerID peerID,
                       const std::shared_ptr<std::condition_variable>& conditionPtr)
-        : peerFD(peerFD),
+        : peerID(peerID),
           conditionPtr(conditionPtr)
     {
     }
 
-    FileDescriptor peerFD;
+    PeerID peerID;
     std::shared_ptr<std::condition_variable> conditionPtr;
 };
 
index ad80d91..8a11ee8 100644 (file)
@@ -41,19 +41,19 @@ public:
 
     template<typename SentDataType>
     static std::shared_ptr<SignalRequest> create(const MethodID methodID,
-                                                 const FileDescriptor peerFD,
+                                                 const PeerID peerID,
                                                  const std::shared_ptr<SentDataType>& data);
 
     MethodID methodID;
-    FileDescriptor peerFD;
+    PeerID peerID;
     MessageID messageID;
     std::shared_ptr<void> data;
     SerializeCallback serialize;
 
 private:
-    SignalRequest(const MethodID methodID, const FileDescriptor peerFD)
+    SignalRequest(const MethodID methodID, const PeerID peerID)
         : methodID(methodID),
-          peerFD(peerFD),
+          peerID(peerID),
           messageID(getNextMessageID())
     {}
 
@@ -61,15 +61,15 @@ private:
 
 template<typename SentDataType>
 std::shared_ptr<SignalRequest> SignalRequest::create(const MethodID methodID,
-                                                     const FileDescriptor peerFD,
+                                                     const PeerID peerID,
                                                      const std::shared_ptr<SentDataType>& data)
 {
-    std::shared_ptr<SignalRequest> request(new SignalRequest(methodID, peerFD));
+    std::shared_ptr<SignalRequest> request(new SignalRequest(methodID, peerID));
 
     request->data = data;
 
     request->serialize = [](const int fd, std::shared_ptr<void>& data)->void {
-        LOGS("Signal serialize, peerFD: " << fd);
+        LOGS("Signal serialize, peerID: " << fd);
         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
     };
 
index 9bf721b..d945f73 100644 (file)
@@ -142,12 +142,12 @@ void Service::handle(const FileDescriptor fd, const short pollEvent)
 void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
     LOGS("Service setNewPeerCallback");
-    auto callback = [newPeerCallback, this](FileDescriptor fd) {
+    auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
         if (mIPCGSourcePtr) {
             mIPCGSourcePtr->addFD(fd);
         }
         if (newPeerCallback) {
-            newPeerCallback(fd);
+            newPeerCallback(peerID, fd);
         }
     };
     mProcessor.setNewPeerCallback(callback);
@@ -156,12 +156,12 @@ void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
 void Service::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
 {
     LOGS("Service setRemovedPeerCallback");
-    auto callback = [removedPeerCallback, this](FileDescriptor fd) {
+    auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
         if (mIPCGSourcePtr) {
             mIPCGSourcePtr->removeFD(fd);
         }
         if (removedPeerCallback) {
-            removedPeerCallback(fd);
+            removedPeerCallback(peerID, fd);
         }
     };
     mProcessor.setRemovedPeerCallback(callback);
index 383c71d..022c9c9 100644 (file)
@@ -143,7 +143,7 @@ public:
      */
     template<typename SentDataType, typename ReceivedDataType>
     std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
-                                               const FileDescriptor peerFD,
+                                               const PeerID peerID,
                                                const std::shared_ptr<SentDataType>& data,
                                                unsigned int timeoutMS = 500);
 
@@ -158,7 +158,7 @@ public:
      */
     template<typename SentDataType, typename ReceivedDataType>
     void callAsync(const MethodID methodID,
-                   const FileDescriptor peerFD,
+                   const PeerID peerID,
                    const std::shared_ptr<SentDataType>& data,
                    const typename ResultHandler<ReceivedDataType>::type& resultCallback);
 
@@ -204,26 +204,26 @@ void Service::setSignalHandler(const MethodID methodID,
 
 template<typename SentDataType, typename ReceivedDataType>
 std::shared_ptr<ReceivedDataType> Service::callSync(const MethodID methodID,
-                                                    const FileDescriptor peerFD,
+                                                    const PeerID peerID,
                                                     const std::shared_ptr<SentDataType>& data,
                                                     unsigned int timeoutMS)
 {
     LOGS("Service callSync, methodID: " << methodID
-         << ", peerFD: " << peerFD
+         << ", peerID: " << peerID
          << ", timeoutMS: " << timeoutMS);
-    return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerFD, data, timeoutMS);
+    return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerID, data, timeoutMS);
 }
 
 template<typename SentDataType, typename ReceivedDataType>
 void Service::callAsync(const MethodID methodID,
-                        const FileDescriptor peerFD,
+                        const PeerID peerID,
                         const std::shared_ptr<SentDataType>& data,
                         const typename ResultHandler<ReceivedDataType>::type& resultCallback)
 {
-    LOGS("Service callAsync, methodID: " << methodID << ", peerFD: " << peerFD);
+    LOGS("Service callAsync, methodID: " << methodID << ", peerID: " << peerID);
     mProcessor.callAsync<SentDataType,
                          ReceivedDataType>(methodID,
-                                           peerFD,
+                                           peerID,
                                            data,
                                            resultCallback);
 }
index a73a612..9854e0e 100644 (file)
@@ -34,6 +34,7 @@ namespace ipc {
 
 namespace {
 std::atomic<MessageID> gLastMessageID(0);
+std::atomic<PeerID> gLastPeerID(0);
 } // namespace
 
 MessageID getNextMessageID()
@@ -41,6 +42,10 @@ MessageID getNextMessageID()
     return ++gLastMessageID;
 }
 
+PeerID getNextPeerID()
+{
+    return ++gLastPeerID;
+}
 
 
 } // namespace ipc
index b5411b3..fe132ec 100644 (file)
  * @brief   Types definitions
  */
 
-#ifndef COMMON_IPC_HANDLERS_HPP
-#define COMMON_IPC_HANDLERS_HPP
+#ifndef COMMON_IPC_TYPES_HPP
+#define COMMON_IPC_TYPES_HPP
 
 #include "ipc/exception.hpp"
-
 #include <functional>
 #include <memory>
 #include <string>
@@ -37,26 +36,28 @@ namespace ipc {
 typedef int FileDescriptor;
 typedef unsigned int MethodID;
 typedef unsigned int MessageID;
+typedef unsigned int PeerID;
 
-typedef std::function<void(FileDescriptor)> PeerCallback;
+typedef std::function<void(const PeerID, const FileDescriptor)> PeerCallback;
 typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
 typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
 
 MessageID getNextMessageID();
+PeerID getNextPeerID();
 
 template<typename SentDataType, typename ReceivedDataType>
 struct MethodHandler {
-    typedef std::function<std::shared_ptr<SentDataType>(FileDescriptor peerFD,
+    typedef std::function<std::shared_ptr<SentDataType>(PeerID peerID,
                                                         std::shared_ptr<ReceivedDataType>& data)> type;
 };
 
 template<typename ReceivedDataType>
 struct SignalHandler {
-    typedef std::function<void(FileDescriptor peerFD,
+    typedef std::function<void(PeerID peerID,
                                std::shared_ptr<ReceivedDataType>& data)> type;
 };
 
 } // namespace ipc
 } // namespace vasum
 
-#endif // COMMON_IPC_HANDLERS_HPP
+#endif // COMMON_IPC_TYPES_HPP
index 5408735..cb475f1 100644 (file)
@@ -136,33 +136,33 @@ struct ThrowOnAcceptData {
     }
 };
 
-std::shared_ptr<EmptyData> returnEmptyCallback(const FileDescriptor, std::shared_ptr<EmptyData>&)
+std::shared_ptr<EmptyData> returnEmptyCallback(const PeerID, std::shared_ptr<EmptyData>&)
 {
     return std::make_shared<EmptyData>();
 }
 
-std::shared_ptr<SendData> returnDataCallback(const FileDescriptor, std::shared_ptr<RecvData>&)
+std::shared_ptr<SendData> returnDataCallback(const PeerID, std::shared_ptr<RecvData>&)
 {
     return std::make_shared<SendData>(1);
 }
 
-std::shared_ptr<SendData> echoCallback(const FileDescriptor, std::shared_ptr<RecvData>& data)
+std::shared_ptr<SendData> echoCallback(const PeerID, std::shared_ptr<RecvData>& data)
 {
     return std::make_shared<SendData>(data->intVal);
 }
 
-std::shared_ptr<SendData> longEchoCallback(const FileDescriptor, std::shared_ptr<RecvData>& data)
+std::shared_ptr<SendData> longEchoCallback(const PeerID, std::shared_ptr<RecvData>& data)
 {
     std::this_thread::sleep_for(std::chrono::milliseconds(LONG_OPERATION_TIME));
     return std::make_shared<SendData>(data->intVal);
 }
 
-FileDescriptor connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientGlib = false)
+PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientGlib = false)
 {
-    // Connects the Client to the Service and returns Clients FileDescriptor
-    ValueLatch<FileDescriptor> peerFDLatch;
-    auto newPeerCallback = [&peerFDLatch](const FileDescriptor newFD) {
-        peerFDLatch.set(newFD);
+    // Connects the Client to the Service and returns Clients PeerID
+    ValueLatch<PeerID> peerIDLatch;
+    auto newPeerCallback = [&peerIDLatch](const PeerID newID, const FileDescriptor) {
+        peerIDLatch.set(newID);
     };
 
     s.setNewPeerCallback(newPeerCallback);
@@ -173,18 +173,18 @@ FileDescriptor connect(Service& s, Client& c, bool isServiceGlib = false, bool i
 
     c.start(isClientGlib);
 
-    FileDescriptor peerFD = peerFDLatch.get(TIMEOUT);
+    PeerID peerID = peerIDLatch.get(TIMEOUT);
     s.setNewPeerCallback(nullptr);
-    BOOST_REQUIRE_NE(peerFD, 0);
-    return peerFD;
+    BOOST_REQUIRE_NE(peerID, 0);
+    return peerID;
 }
 
-FileDescriptor connectServiceGSource(Service& s, Client& c)
+PeerID connectServiceGSource(Service& s, Client& c)
 {
     return connect(s, c, true, false);
 }
 
-FileDescriptor connectClientGSource(Service& s, Client& c)
+PeerID connectClientGSource(Service& s, Client& c)
 {
     return connect(s, c, false, true);
 }
@@ -197,10 +197,10 @@ void testEcho(Client& c, const MethodID methodID)
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
-void testEcho(Service& s, const MethodID methodID, const FileDescriptor peerFD)
+void testEcho(Service& s, const MethodID methodID, const PeerID peerID)
 {
     std::shared_ptr<SendData> sentData(new SendData(56));
-    std::shared_ptr<RecvData> recvData = s.callSync<SendData, RecvData>(methodID, peerFD, sentData, TIMEOUT);
+    std::shared_ptr<RecvData> recvData = s.callSync<SendData, RecvData>(methodID, peerID, sentData, TIMEOUT);
     BOOST_REQUIRE(recvData);
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
@@ -244,17 +244,17 @@ BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod)
     c.setMethodHandler<EmptyData, EmptyData>(1, returnEmptyCallback);
     c.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
 
-    FileDescriptor peerFD = connect(s, c);
+    PeerID peerID = connect(s, c);
 
     c.setMethodHandler<SendData, RecvData>(1, echoCallback);
     c.setMethodHandler<SendData, RecvData>(2, returnDataCallback);
 
-    testEcho(s, 1, peerFD);
+    testEcho(s, 1, peerID);
 
     c.removeMethod(1);
     c.removeMethod(2);
 
-    BOOST_CHECK_THROW(testEcho(s, 1, peerFD), IPCException);
+    BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException);
 }
 
 BOOST_AUTO_TEST_CASE(ServiceStartStop)
@@ -333,10 +333,10 @@ BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho)
     Service s(socketPath);
     Client c(socketPath);
     c.setMethodHandler<SendData, RecvData>(1, echoCallback);
-    FileDescriptor peerFD = connect(s, c);
+    PeerID peerID = connect(s, c);
 
     std::shared_ptr<SendData> sentData(new SendData(56));
-    std::shared_ptr<RecvData> recvData = s.callSync<SendData, RecvData>(1, peerFD, sentData);
+    std::shared_ptr<RecvData> recvData = s.callSync<SendData, RecvData>(1, peerID, sentData);
     BOOST_REQUIRE(recvData);
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
@@ -372,14 +372,14 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
     Service s(socketPath);
     Client c(socketPath);
     c.setMethodHandler<SendData, RecvData>(1, echoCallback);
-    FileDescriptor peerFD = connect(s, c);
+    PeerID peerID = connect(s, c);
 
     // Async call
     auto dataBack = [&recvDataLatch](Result<RecvData> && r) {
         recvDataLatch.set(r.get());
     };
 
-    s.callAsync<SendData, RecvData>(1, peerFD, sentData, dataBack);
+    s.callAsync<SendData, RecvData>(1, peerID, sentData, dataBack);
 
     // Wait for the response
     std::shared_ptr<RecvData> recvData(recvDataLatch.get(TIMEOUT));
@@ -431,7 +431,7 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
     ValueLatch<Result<RecvData>> retStatusLatch;
     Service s(socketPath);
 
-    auto method = [](const FileDescriptor, std::shared_ptr<ThrowOnAcceptData>&) {
+    auto method = [](const PeerID, std::shared_ptr<ThrowOnAcceptData>&) {
         return std::shared_ptr<SendData>(new SendData(1));
     };
 
@@ -462,7 +462,7 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
 BOOST_AUTO_TEST_CASE(ReadTimeout)
 {
     Service s(socketPath);
-    auto longEchoCallback = [](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+    auto longEchoCallback = [](const PeerID, std::shared_ptr<RecvData>& data) {
         return std::shared_ptr<LongSendData>(new LongSendData(data->intVal, LONG_OPERATION_TIME));
     };
     s.setMethodHandler<LongSendData, RecvData>(1, longEchoCallback);
@@ -506,14 +506,15 @@ BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
     Client c(socketPath);
     connect(s, c);
 
-    auto handlerA = [&recvDataLatchA](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+    auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr<RecvData>& data) {
         recvDataLatchA.set(data);
     };
 
-    auto handlerB = [&recvDataLatchB](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+    auto handlerB = [&recvDataLatchB](const PeerID, std::shared_ptr<RecvData>& data) {
         recvDataLatchB.set(data);
     };
 
+    LOGH("SETTING SIGNAAALS");
     c.setSignalHandler<RecvData>(1, handlerA);
     c.setSignalHandler<RecvData>(2, handlerB);
 
@@ -541,11 +542,11 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline)
     Service s(socketPath);
     Client c(socketPath);
 
-    auto handlerA = [&recvDataLatchA](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+    auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr<RecvData>& data) {
         recvDataLatchA.set(data);
     };
 
-    auto handlerB = [&recvDataLatchB](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+    auto handlerB = [&recvDataLatchB](const PeerID, std::shared_ptr<RecvData>& data) {
         recvDataLatchB.set(data);
     };
 
@@ -575,7 +576,7 @@ BOOST_AUTO_TEST_CASE(ServiceGSource)
     utils::Latch l;
     ScopedGlibLoop loop;
 
-    auto signalHandler = [&l](const FileDescriptor, std::shared_ptr<RecvData>&) {
+    auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
         l.set();
     };
 
@@ -602,7 +603,7 @@ BOOST_AUTO_TEST_CASE(ClientGSource)
     utils::Latch l;
     ScopedGlibLoop loop;
 
-    auto signalHandler = [&l](const FileDescriptor, std::shared_ptr<RecvData>&) {
+    auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
         l.set();
     };
 
@@ -614,9 +615,9 @@ BOOST_AUTO_TEST_CASE(ClientGSource)
     c.setMethodHandler<SendData, RecvData>(1, echoCallback);
     c.setSignalHandler<RecvData>(2, signalHandler);
 
-    FileDescriptor peerFD = connectClientGSource(s, c);
+    PeerID peerID = connectClientGSource(s, c);
 
-    testEcho(s, 1, peerFD);
+    testEcho(s, 1, peerID);
 
     auto data = std::make_shared<SendData>(1);
     s.signal<SendData>(2, data);
@@ -633,7 +634,7 @@ BOOST_AUTO_TEST_CASE(UsersError)
     Client c(socketPath);
     auto clientID = connect(s, c);
 
-    auto throwingMethodHandler = [&](const FileDescriptor, std::shared_ptr<RecvData>&) -> std::shared_ptr<SendData> {
+    auto throwingMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&) -> std::shared_ptr<SendData> {
         throw IPCUserException(TEST_ERROR_CODE, TEST_ERROR_MESSAGE);
     };