IPC: Replace PeerID witch peer's file descriptor 59/31659/2
authorJan Olszak <j.olszak@samsung.com>
Mon, 8 Dec 2014 12:41:39 +0000 (13:41 +0100)
committerJan Olszak <j.olszak@samsung.com>
Mon, 8 Dec 2014 13:59:13 +0000 (14:59 +0100)
[Bug/Feature]   N/A
[Cause]         N/A
[Solution]      N/A
[Verification]  Build, install, run tests

Change-Id: I30203990d9c9c3a58515d2fe09b074072122c156

common/ipc/client.cpp
common/ipc/client.hpp
common/ipc/internals/call-queue.hpp
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp
common/ipc/service.hpp
common/ipc/types.hpp
tests/unit_tests/ipc/ut-ipc.cpp

index c806e7b..8b0e458 100644 (file)
@@ -55,7 +55,7 @@ void Client::start()
     // Initialize the connection with the server
     LOGD("Connecting to " + mSocketPath);
     auto socketPtr = std::make_shared<Socket>(Socket::connectSocket(mSocketPath));
-    mServiceID = mProcessor.addPeer(socketPtr);
+    mServiceFD = mProcessor.addPeer(socketPtr);
 
     // Start listening
     mProcessor.start();
index 3178474..6f8b049 100644 (file)
@@ -156,7 +156,7 @@ public:
                 const std::shared_ptr<SentDataType>& data);
 
 private:
-    PeerID mServiceID;
+    FileDescriptor mServiceFD;
     Processor mProcessor;
     std::string mSocketPath;
 };
@@ -185,7 +185,7 @@ std::shared_ptr<ReceivedDataType> Client::callSync(const MethodID methodID,
                                                    unsigned int timeoutMS)
 {
     LOGD("Sync calling method: " << methodID);
-    return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceID, data, timeoutMS);
+    return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceFD, data, timeoutMS);
 }
 
 template<typename SentDataType, typename ReceivedDataType>
@@ -196,7 +196,7 @@ void Client::callAsync(const MethodID methodID,
     LOGD("Async calling method: " << methodID);
     mProcessor.callAsync<SentDataType,
                          ReceivedDataType>(methodID,
-                                           mServiceID,
+                                           mServiceFD,
                                            data,
                                            resultCallback);
     LOGD("Async called method: " << methodID);
index 4d1ecf6..7911d7a 100644 (file)
@@ -48,7 +48,7 @@ public:
         Call() = default;
         Call(Call&&) = default;
 
-        PeerID peerID;
+        FileDescriptor peerFD;
         MethodID methodID;
         MessageID messageID;
         std::shared_ptr<void> data;
@@ -66,14 +66,14 @@ public:
 
     template<typename SentDataType, typename ReceivedDataType>
     MessageID push(const MethodID methodID,
-                   const PeerID peerID,
+                   const FileDescriptor peerFD,
                    const std::shared_ptr<SentDataType>& data,
                    const typename ResultHandler<ReceivedDataType>::type& process);
 
 
     template<typename SentDataType>
     MessageID push(const MethodID methodID,
-                   const PeerID peerID,
+                   const FileDescriptor peerFD,
                    const std::shared_ptr<SentDataType>& data);
 
     Call pop();
@@ -90,13 +90,13 @@ private:
 
 template<typename SentDataType, typename ReceivedDataType>
 MessageID CallQueue::push(const MethodID methodID,
-                          const PeerID peerID,
+                          const FileDescriptor peerFD,
                           const std::shared_ptr<SentDataType>& data,
                           const typename ResultHandler<ReceivedDataType>::type& process)
 {
     Call call;
     call.methodID = methodID;
-    call.peerID = peerID;
+    call.peerFD = peerFD;
     call.data = data;
 
     MessageID messageID = getNextMessageID();
@@ -124,12 +124,12 @@ MessageID CallQueue::push(const MethodID methodID,
 
 template<typename SentDataType>
 MessageID CallQueue::push(const MethodID methodID,
-                          const PeerID peerID,
+                          const FileDescriptor peerFD,
                           const std::shared_ptr<SentDataType>& data)
 {
     Call call;
     call.methodID = methodID;
-    call.peerID = peerID;
+    call.peerFD = peerFD;
     call.data = data;
 
     MessageID messageID = getNextMessageID();
index b134b08..7ac378e 100644 (file)
@@ -55,8 +55,7 @@ Processor::Processor(const PeerCallback& newPeerCallback,
                      const unsigned int maxNumberOfPeers)
     : mNewPeerCallback(newPeerCallback),
       mRemovedPeerCallback(removedPeerCallback),
-      mMaxNumberOfPeers(maxNumberOfPeers),
-      mPeerIDCounter(0)
+      mMaxNumberOfPeers(maxNumberOfPeers)
 {
     LOGT("Creating Processor");
     using namespace std::placeholders;
@@ -120,37 +119,37 @@ void Processor::removeMethod(const MethodID methodID)
     mMethodsCallbacks.erase(methodID);
 }
 
-PeerID Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
+FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
 {
     LOGT("Adding socket");
-    PeerID peerID;
+    FileDescriptor peerFD;
     {
         Lock lock(mSocketsMutex);
-        peerID = getNextPeerID();
-        SocketInfo socketInfo(peerID, std::move(socketPtr));
+        peerFD = socketPtr->getFD();
+        SocketInfo socketInfo(peerFD, std::move(socketPtr));
         mNewSockets.push(std::move(socketInfo));
     }
-    LOGI("New peer added. Id: " << peerID);
+    LOGI("New peer added. Id: " << peerFD);
     mEventQueue.send(Event::ADD_PEER);
 
-    return peerID;
+    return peerFD;
 }
 
-void Processor::removePeer(const PeerID peerID)
+void Processor::removePeer(const FileDescriptor peerFD)
 {
     std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
 
     {
         Lock lock(mSocketsMutex);
-        RemovePeerRequest request(peerID, conditionPtr);
+        RemovePeerRequest request(peerFD, conditionPtr);
         mPeersToDelete.push(std::move(request));
     }
 
     mEventQueue.send(Event::REMOVE_PEER);
 
-    auto isPeerDeleted = [&peerID, this]()->bool {
+    auto isPeerDeleted = [&peerFD, this]()->bool {
         Lock lock(mSocketsMutex);
-        return mSockets.count(peerID) == 0;
+        return mSockets.count(peerFD) == 0;
     };
 
     std::mutex mutex;
@@ -158,16 +157,16 @@ void Processor::removePeer(const PeerID peerID)
     conditionPtr->wait(lock, isPeerDeleted);
 }
 
-void Processor::removePeerInternal(const PeerID peerID, Status status)
+void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
 {
-    LOGW("Removing peer. ID: " << peerID);
+    LOGW("Removing peer. ID: " << peerFD);
     {
         Lock lock(mSocketsMutex);
-        mSockets.erase(peerID);
+        mSockets.erase(peerFD);
 
         // Remove from signal addressees
         for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
-            it->second.remove(peerID);
+            it->second.remove(peerFD);
             if (it->second.empty()) {
                 it = mSignalsPeers.erase(it);
             } else {
@@ -182,7 +181,7 @@ void Processor::removePeerInternal(const PeerID peerID, Status status)
 
         std::shared_ptr<void> data;
         for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
-            if (it->second.peerID == peerID) {
+            if (it->second.peerFD == peerFD) {
                 IGNORE_EXCEPTIONS(it->second.process(status, data));
                 it = mReturnCallbacks.erase(it);
             } else {
@@ -196,7 +195,7 @@ void Processor::removePeerInternal(const PeerID peerID, Status status)
         Lock lock(mCallbacksMutex);
         if (mRemovedPeerCallback) {
             // Notify about the deletion
-            mRemovedPeerCallback(peerID);
+            mRemovedPeerCallback(peerFD);
         }
     }
 
@@ -264,22 +263,21 @@ void Processor::run()
 
 bool Processor::handleLostConnections()
 {
-    std::list<PeerID> peersToRemove;
+    std::vector<FileDescriptor> peersToRemove;
 
     {
         Lock lock(mSocketsMutex);
-        auto socketIt = mSockets.begin();
-        for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) {
+        for (unsigned int i = 1; i < mFDs.size(); ++i) {
             if (mFDs[i].revents & POLLHUP) {
-                LOGI("Lost connection to peer: " << socketIt->first);
+                LOGI("Lost connection to peer: " << mFDs[i].fd);
                 mFDs[i].revents &= ~(POLLHUP);
-                peersToRemove.push_back(socketIt->first);
+                peersToRemove.push_back(mFDs[i].fd);
             }
         }
     }
 
-    for (const PeerID peerID : peersToRemove) {
-        removePeerInternal(peerID, Status::PEER_DISCONNECTED);
+    for (const FileDescriptor peerFD : peersToRemove) {
+        removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
     }
 
     return !peersToRemove.empty();
@@ -287,27 +285,26 @@ bool Processor::handleLostConnections()
 
 bool Processor::handleInputs()
 {
-    std::list<std::pair<PeerID, std::shared_ptr<Socket>> > peersWithInput;
+    std::vector<std::shared_ptr<Socket>> socketsWithInput;
     {
         Lock lock(mSocketsMutex);
-        auto socketIt = mSockets.begin();
-        for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) {
+        for (unsigned int i = 1; i < mFDs.size(); ++i) {
             if (mFDs[i].revents & POLLIN) {
                 mFDs[i].revents &= ~(POLLIN);
-                peersWithInput.push_back(*socketIt);
+                socketsWithInput.push_back(mSockets[mFDs[i].fd]);
             }
         }
     }
 
     bool pollChanged = false;
     // Handle input outside the critical section
-    for (const auto& peer : peersWithInput) {
-        pollChanged = pollChanged || handleInput(peer.first, *peer.second);
+    for (const auto& socketPtr : socketsWithInput) {
+        pollChanged = pollChanged || handleInput(*socketPtr);
     }
     return pollChanged;
 }
 
-bool Processor::handleInput(const PeerID peerID, const Socket& socket)
+bool Processor::handleInput(const Socket& socket)
 {
     LOGT("Handle incoming data");
     MethodID methodID;
@@ -318,7 +315,7 @@ bool Processor::handleInput(const PeerID peerID, const Socket& socket)
         socket.read(&messageID, sizeof(messageID));
 
         if (methodID == RETURN_METHOD_ID) {
-            return onReturnValue(peerID, socket, messageID);
+            return onReturnValue(socket, messageID);
 
         } else {
             Lock lock(mCallsMutex);
@@ -326,19 +323,19 @@ bool Processor::handleInput(const PeerID peerID, const Socket& socket)
                 // Method
                 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
                 lock.unlock();
-                return onRemoteCall(peerID, socket, methodID, messageID, methodCallbacks);
+                return onRemoteCall(socket, methodID, messageID, methodCallbacks);
 
             } else if (mSignalsCallbacks.count(methodID)) {
                 // Signal
                 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
                 lock.unlock();
-                return onRemoteSignal(peerID, socket, methodID, messageID, signalCallbacks);
+                return onRemoteSignal(socket, methodID, messageID, signalCallbacks);
 
             } else {
                 // Nothing
                 lock.unlock();
                 LOGW("No method or signal callback for methodID: " << methodID);
-                removePeerInternal(peerID, Status::NAUGHTY_PEER);
+                removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
                 return true;
             }
         }
@@ -347,20 +344,19 @@ bool Processor::handleInput(const PeerID peerID, const Socket& socket)
     return false;
 }
 
-std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const PeerID peerID,
+std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
                                                               std::shared_ptr<RegisterSignalsMessage>& data)
 {
-    LOGD("New signals for peer: " << peerID);
+    LOGD("New signals for peer: " << peerFD);
     Lock lock(mSocketsMutex);
     for (MethodID methodID : data->ids) {
-        mSignalsPeers[methodID].push_back(peerID);
+        mSignalsPeers[methodID].push_back(peerFD);
     }
 
     return std::make_shared<EmptyData>();
 }
 
-bool Processor::onReturnValue(const PeerID peerID,
-                              const Socket& socket,
+bool Processor::onReturnValue(const Socket& socket,
                               const MessageID messageID)
 {
     LOGI("Return value for messageID: " << messageID);
@@ -372,7 +368,7 @@ bool Processor::onReturnValue(const PeerID peerID,
         mReturnCallbacks.erase(messageID);
     } catch (const std::out_of_range&) {
         LOGW("No return callback for messageID: " << messageID);
-        removePeerInternal(peerID, Status::NAUGHTY_PEER);
+        removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
         return true;
     }
 
@@ -383,7 +379,7 @@ bool Processor::onReturnValue(const PeerID peerID,
     } catch (const std::exception& e) {
         LOGE("Exception during parsing: " << e.what());
         IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
-        removePeerInternal(peerID, Status::PARSING_ERROR);
+        removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
         return true;
     }
 
@@ -393,8 +389,7 @@ bool Processor::onReturnValue(const PeerID peerID,
     return false;
 }
 
-bool Processor::onRemoteSignal(const PeerID peerID,
-                               const Socket& socket,
+bool Processor::onRemoteSignal(const Socket& socket,
                                const MethodID methodID,
                                const MessageID messageID,
                                std::shared_ptr<SignalHandlers> signalCallbacks)
@@ -407,24 +402,23 @@ bool Processor::onRemoteSignal(const PeerID peerID,
         data = signalCallbacks->parse(socket.getFD());
     } catch (const std::exception& e) {
         LOGE("Exception during parsing: " << e.what());
-        removePeerInternal(peerID, Status::PARSING_ERROR);
+        removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
         return true;
     }
 
     LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
     try {
-        signalCallbacks->signal(peerID, data);
+        signalCallbacks->signal(socket.getFD(), data);
     } catch (const std::exception& e) {
         LOGE("Exception in method handler: " << e.what());
-        removePeerInternal(peerID, Status::NAUGHTY_PEER);
+        removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
         return true;
     }
 
     return false;
 }
 
-bool Processor::onRemoteCall(const PeerID peerID,
-                             const Socket& socket,
+bool Processor::onRemoteCall(const Socket& socket,
                              const MethodID methodID,
                              const MessageID messageID,
                              std::shared_ptr<MethodHandlers> methodCallbacks)
@@ -437,17 +431,17 @@ bool Processor::onRemoteCall(const PeerID peerID,
         data = methodCallbacks->parse(socket.getFD());
     } catch (const std::exception& e) {
         LOGE("Exception during parsing: " << e.what());
-        removePeerInternal(peerID, Status::PARSING_ERROR);
+        removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
         return true;
     }
 
     LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
     std::shared_ptr<void> returnData;
     try {
-        returnData = methodCallbacks->method(peerID, data);
+        returnData = methodCallbacks->method(socket.getFD(), data);
     } catch (const std::exception& e) {
         LOGE("Exception in method handler: " << e.what());
-        removePeerInternal(peerID, Status::NAUGHTY_PEER);
+        removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
         return true;
     }
 
@@ -460,7 +454,7 @@ bool Processor::onRemoteCall(const PeerID peerID,
         methodCallbacks->serialize(socket.getFD(), returnData);
     } catch (const std::exception& e) {
         LOGE("Exception during serialization: " << e.what());
-        removePeerInternal(peerID, Status::SERIALIZATION_ERROR);
+        removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
         return true;
     }
 
@@ -512,21 +506,21 @@ bool Processor::onNewPeer()
         mNewSockets.pop();
 
         if (mSockets.size() > mMaxNumberOfPeers) {
-            LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerID);
+            LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
             return false;
         }
-        if (mSockets.count(socketInfo.peerID) != 0) {
-            LOGE("There already was a socket for peerID: " << socketInfo.peerID);
+        if (mSockets.count(socketInfo.peerFD) != 0) {
+            LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
             return false;
         }
 
-        mSockets[socketInfo.peerID] = std::move(socketInfo.socketPtr);
+        mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
     }
 
 
     // Broadcast the new signal to peers
     LOGW("Sending handled signals");
-    std::list<PeerID> peersIDs;
+    std::list<FileDescriptor> peersIDs;
     {
         Lock lock(mSocketsMutex);
         for (const auto kv : mSockets) {
@@ -543,9 +537,9 @@ bool Processor::onNewPeer()
     }
     auto data = std::make_shared<RegisterSignalsMessage>(ids);
 
-    for (const PeerID peerID : peersIDs) {
+    for (const FileDescriptor peerFD : peersIDs) {
         callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
-                                                        peerID,
+                                                        peerFD,
                                                         data,
                                                         discardResultHandler<EmptyData>);
     }
@@ -558,7 +552,7 @@ bool Processor::onNewPeer()
         Lock lock(mCallbacksMutex);
         if (mNewPeerCallback) {
             // Notify about the new user.
-            mNewPeerCallback(socketInfo.peerID);
+            mNewPeerCallback(socketInfo.peerFD);
         }
     }
     return true;
@@ -573,17 +567,11 @@ bool Processor::onRemovePeer()
         mPeersToDelete.pop();
     }
 
-    removePeerInternal(request.peerID, Status::REMOVED_PEER);
+    removePeerInternal(request.peerFD, Status::REMOVED_PEER);
     request.conditionPtr->notify_all();
     return true;
 }
 
-PeerID Processor::getNextPeerID()
-{
-    // TODO: This method of generating UIDs is buggy. To be changed.
-    return ++mPeerIDCounter;
-}
-
 CallQueue::Call Processor::getCall()
 {
     Lock lock(mCallsMutex);
@@ -599,9 +587,9 @@ bool Processor::onCall()
     try {
         // Get the peer's socket
         Lock lock(mSocketsMutex);
-        socketPtr = mSockets.at(call.peerID);
+        socketPtr = mSockets.at(call.peerFD);
     } catch (const std::out_of_range&) {
-        LOGE("Peer disconnected. No socket with a peerID: " << call.peerID);
+        LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
         IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
         return false;
     }
@@ -612,7 +600,7 @@ bool Processor::onCall()
         if (mReturnCallbacks.count(call.messageID) != 0) {
             LOGE("There already was a return callback for messageID: " << call.messageID);
         }
-        mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerID,
+        mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
                                                                      std::move(call.parse),
                                                                      std::move(call.process)));
     }
@@ -634,7 +622,7 @@ bool Processor::onCall()
             mReturnCallbacks.erase(call.messageID);
         }
 
-        removePeerInternal(call.peerID, Status::SERIALIZATION_ERROR);
+        removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
         return true;
     }
 
index da2a5b9..476e662 100644 (file)
@@ -35,8 +35,6 @@
 #include "logger/logger.hpp"
 
 #include <poll.h>
-
-#include <atomic>
 #include <condition_variable>
 #include <queue>
 #include <mutex>
@@ -76,6 +74,9 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
 *  - helper function for removing from unordered map
 *  - new way to generate UIDs
 *  - callbacks for serialization/parsing
+*  - store Sockets in a vector, maybe SocketStore?
+*
+*
 */
 class Processor {
 public:
@@ -141,16 +142,16 @@ public:
      * Calls the newPeerCallback.
      *
      * @param socketPtr pointer to the new socket
-     * @return peerID of the new socket
+     * @return peerFD of the new socket
      */
-    PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
+    FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
 
     /**
      * Request removing peer and wait
      *
-     * @param peerID id of the peer
+     * @param peerFD id of the peer
      */
-    void removePeer(const PeerID peerID);
+    void removePeer(const FileDescriptor peerFD);
 
     /**
      * Saves the callbacks connected to the method id.
@@ -197,7 +198,7 @@ public:
      * Synchronous method call.
      *
      * @param methodID API dependent id of the method
-     * @param peerID id of the peer
+     * @param peerFD id of the peer
      * @param data data to sent
      * @param timeoutMS how long to wait for the return value before throw
      * @tparam SentDataType data type to send
@@ -205,7 +206,7 @@ public:
      */
     template<typename SentDataType, typename ReceivedDataType>
     std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
-                                               const PeerID peerID,
+                                               const FileDescriptor peerFD,
                                                const std::shared_ptr<SentDataType>& data,
                                                unsigned int timeoutMS = 500);
 
@@ -213,7 +214,7 @@ public:
      * Asynchronous method call
      *
      * @param methodID API dependent id of the method
-     * @param peerID id of the peer
+     * @param peerFD id of the peer
      * @param data data to sent
      * @param process callback processing the return data
      * @tparam SentDataType data type to send
@@ -221,7 +222,7 @@ public:
      */
     template<typename SentDataType, typename ReceivedDataType>
     MessageID callAsync(const MethodID methodID,
-                        const PeerID peerID,
+                        const FileDescriptor peerFD,
                         const std::shared_ptr<SentDataType>& data,
                         const typename ResultHandler<ReceivedDataType>::type& process);
 
@@ -292,10 +293,10 @@ private:
         ReturnCallbacks(ReturnCallbacks&&) = default;
         ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
 
-        ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultHandler<void>::type& process)
-            : peerID(peerID), parse(parse), process(process) {}
+        ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler<void>::type& process)
+            : peerFD(peerFD), parse(parse), process(process) {}
 
-        PeerID peerID;
+        FileDescriptor peerFD;
         ParseCallback parse;
         ResultHandler<void>::type process;
     };
@@ -307,10 +308,10 @@ private:
         SocketInfo(SocketInfo&&) = default;
         SocketInfo& operator=(SocketInfo &&) = default;
 
-        SocketInfo(const PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
-            : peerID(peerID), socketPtr(socketPtr) {}
+        SocketInfo(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
+            : peerFD(peerFD), socketPtr(socketPtr) {}
 
-        PeerID peerID;
+        FileDescriptor peerFD;
         std::shared_ptr<Socket> socketPtr;
     };
 
@@ -321,11 +322,11 @@ private:
         RemovePeerRequest(RemovePeerRequest&&) = default;
         RemovePeerRequest& operator=(RemovePeerRequest &&) = default;
 
-        RemovePeerRequest(const PeerID peerID,
+        RemovePeerRequest(const FileDescriptor peerFD,
                           const std::shared_ptr<std::condition_variable>& conditionPtr)
-            : peerID(peerID), conditionPtr(conditionPtr) {}
+            : peerFD(peerFD), conditionPtr(conditionPtr) {}
 
-        PeerID peerID;
+        FileDescriptor peerFD;
         std::shared_ptr<std::condition_variable> conditionPtr;
     };
 
@@ -345,12 +346,13 @@ private:
     CallQueue mCalls;
     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
     std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
-    std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
+    std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
 
     // Mutex for changing mSockets map.
     // Shouldn't be locked on any read/write, that could block. Just copy the ptr.
     std::mutex mSocketsMutex;
-    std::unordered_map<PeerID, std::shared_ptr<Socket> > mSockets;
+    std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
+    std::vector<struct pollfd> mFDs;
     std::queue<SocketInfo> mNewSockets;
     std::queue<RemovePeerRequest> mPeersToDelete;
 
@@ -366,9 +368,6 @@ private:
     unsigned int mMaxNumberOfPeers;
 
     std::thread mThread;
-    std::vector<struct pollfd> mFDs;
-
-    std::atomic<PeerID> mPeerIDCounter;
 
     template<typename SentDataType, typename ReceivedDataType>
     void addMethodHandlerInternal(const MethodID methodID,
@@ -376,7 +375,7 @@ private:
 
     template<typename SentDataType, typename ReceivedDataType>
     MessageID callInternal(const MethodID methodID,
-                           const PeerID peerID,
+                           const FileDescriptor peerFD,
                            const std::shared_ptr<SentDataType>& data,
                            const typename ResultHandler<ReceivedDataType>::type& process);
 
@@ -390,26 +389,23 @@ private:
     bool onRemovePeer();
     bool handleLostConnections();
     bool handleInputs();
-    bool handleInput(const PeerID peerID, const Socket& socket);
-    bool onReturnValue(const PeerID peerID,
-                       const Socket& socket,
+    bool handleInput(const Socket& socket);
+    bool onReturnValue(const Socket& socket,
                        const MessageID messageID);
-    bool onRemoteCall(const PeerID peerID,
-                      const Socket& socket,
+    bool onRemoteCall(const Socket& socket,
                       const MethodID methodID,
                       const MessageID messageID,
                       std::shared_ptr<MethodHandlers> methodCallbacks);
-    bool onRemoteSignal(const PeerID peerID,
-                        const Socket& socket,
+    bool onRemoteSignal(const Socket& socket,
                         const MethodID methodID,
                         const MessageID messageID,
                         std::shared_ptr<SignalHandlers> signalCallbacks);
     void resetPolling();
-    PeerID getNextPeerID();
+    FileDescriptor getNextFileDescriptor();
     CallQueue::Call getCall();
-    void removePeerInternal(const PeerID peerID, Status status);
+    void removePeerInternal(const FileDescriptor peerFD, Status status);
 
-    std::shared_ptr<EmptyData> onNewSignals(const PeerID peerID,
+    std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
                                             std::shared_ptr<RegisterSignalsMessage>& data);
 
 
@@ -432,9 +428,9 @@ void Processor::addMethodHandlerInternal(const MethodID methodID,
         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
     };
 
-    methodCall.method = [method](const PeerID peerID, std::shared_ptr<void>& data)->std::shared_ptr<void> {
+    methodCall.method = [method](const FileDescriptor peerFD, std::shared_ptr<void>& data)->std::shared_ptr<void> {
         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
-        return method(peerID, tmpData);
+        return method(peerFD, tmpData);
     };
 
     {
@@ -488,9 +484,9 @@ void Processor::addSignalHandler(const MethodID methodID,
         return data;
     };
 
-    signalCall.signal = [handler](const PeerID peerID, std::shared_ptr<void>& data) {
+    signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
-        handler(peerID, tmpData);
+        handler(peerFD, tmpData);
     };
 
     {
@@ -503,7 +499,7 @@ void Processor::addSignalHandler(const MethodID methodID,
         std::vector<MethodID> ids {methodID};
         auto data = std::make_shared<RegisterSignalsMessage>(ids);
 
-        std::list<PeerID> peersIDs;
+        std::list<FileDescriptor> peersIDs;
         {
             Lock lock(mSocketsMutex);
             for (const auto kv : mSockets) {
@@ -511,9 +507,9 @@ void Processor::addSignalHandler(const MethodID methodID,
             }
         }
 
-        for (const PeerID peerID : peersIDs) {
+        for (const FileDescriptor peerFD : peersIDs) {
             callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
-                                                        peerID,
+                                                        peerFD,
                                                         data,
                                                         DEFAULT_METHOD_TIMEOUT);
         }
@@ -522,12 +518,12 @@ void Processor::addSignalHandler(const MethodID methodID,
 
 template<typename SentDataType, typename ReceivedDataType>
 MessageID Processor::callInternal(const MethodID methodID,
-                                  const PeerID peerID,
+                                  const FileDescriptor peerFD,
                                   const std::shared_ptr<SentDataType>& data,
                                   const typename ResultHandler<ReceivedDataType>::type& process)
 {
     Lock lock(mCallsMutex);
-    MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
+    MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
     mEventQueue.send(Event::CALL);
 
     return messageID;
@@ -535,7 +531,7 @@ MessageID Processor::callInternal(const MethodID methodID,
 
 template<typename SentDataType, typename ReceivedDataType>
 MessageID Processor::callAsync(const MethodID methodID,
-                               const PeerID peerID,
+                               const FileDescriptor peerFD,
                                const std::shared_ptr<SentDataType>& data,
                                const typename ResultHandler<ReceivedDataType>::type& process)
 {
@@ -544,13 +540,13 @@ MessageID Processor::callAsync(const MethodID methodID,
         throw IPCException("The Processor thread is not started. Can't send any data.");
     }
 
-    return callInternal<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
+    return callInternal<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
 }
 
 
 template<typename SentDataType, typename ReceivedDataType>
 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
-                                                      const PeerID peerID,
+                                                      const FileDescriptor peerFD,
                                                       const std::shared_ptr<SentDataType>& data,
                                                       unsigned int timeoutMS)
 {
@@ -568,7 +564,7 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
     };
 
     MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
-                                                                    peerID,
+                                                                    peerFD,
                                                                     data,
                                                                     process);
 
@@ -586,7 +582,7 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
             }
         }
         if (isTimeout) {
-            removePeer(peerID);
+            removePeer(peerFD);
             LOGE("Function call timeout; methodID: " << methodID);
             throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
         } else {
@@ -609,15 +605,15 @@ void Processor::signal(const MethodID methodID,
         throw IPCException("The Processor thread is not started. Can't send any data.");
     }
 
-    std::list<PeerID> peersIDs;
+    std::list<FileDescriptor> peersIDs;
     {
         Lock lock(mSocketsMutex);
         peersIDs = mSignalsPeers[methodID];
     }
 
-    for (const PeerID peerID : peersIDs) {
+    for (const FileDescriptor peerFD : peersIDs) {
         Lock lock(mCallsMutex);
-        mCalls.push<SentDataType>(methodID, peerID, data);
+        mCalls.push<SentDataType>(methodID, peerFD, data);
         mEventQueue.send(Event::CALL);
     }
 }
index ac22eb2..317311d 100644 (file)
@@ -129,7 +129,7 @@ public:
      */
     template<typename SentDataType, typename ReceivedDataType>
     std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
-                                               const PeerID peerID,
+                                               const FileDescriptor peerFD,
                                                const std::shared_ptr<SentDataType>& data,
                                                unsigned int timeoutMS = 500);
 
@@ -144,7 +144,7 @@ public:
      */
     template<typename SentDataType, typename ReceivedDataType>
     void callAsync(const MethodID methodID,
-                   const PeerID peerID,
+                   const FileDescriptor peerFD,
                    const std::shared_ptr<SentDataType>& data,
                    const typename ResultHandler<ReceivedDataType>::type& resultCallback);
 
@@ -187,27 +187,27 @@ void Service::addSignalHandler(const MethodID methodID,
 
 template<typename SentDataType, typename ReceivedDataType>
 std::shared_ptr<ReceivedDataType> Service::callSync(const MethodID methodID,
-                                                    const PeerID peerID,
+                                                    const FileDescriptor peerFD,
                                                     const std::shared_ptr<SentDataType>& data,
                                                     unsigned int timeoutMS)
 {
-    LOGD("Sync calling method: " << methodID << " for user: " << peerID);
-    return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerID, data, timeoutMS);
+    LOGD("Sync calling method: " << methodID << " for user: " << peerFD);
+    return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerFD, data, timeoutMS);
 }
 
 template<typename SentDataType, typename ReceivedDataType>
 void Service::callAsync(const MethodID methodID,
-                        const PeerID peerID,
+                        const FileDescriptor peerFD,
                         const std::shared_ptr<SentDataType>& data,
                         const typename ResultHandler<ReceivedDataType>::type& resultCallback)
 {
-    LOGD("Async calling method: " << methodID << " for user: " << peerID);
+    LOGD("Async calling method: " << methodID << " for user: " << peerFD);
     mProcessor.callAsync<SentDataType,
                          ReceivedDataType>(methodID,
-                                           peerID,
+                                           peerFD,
                                            data,
                                            resultCallback);
-    LOGD("Async called method: " << methodID << "for user: " << peerID);
+    LOGD("Async called method: " << methodID << "for user: " << peerFD);
 }
 
 template<typename SentDataType>
index 6588fb0..5fe9188 100644 (file)
 namespace security_containers {
 namespace ipc {
 
-typedef std::function<void(int)> PeerCallback;
-typedef unsigned int PeerID;
+typedef int FileDescriptor;
 typedef unsigned int MethodID;
 typedef unsigned int MessageID;
 
+typedef std::function<void(FileDescriptor)> PeerCallback;
+
 enum class Status : int {
     OK = 0,
     PARSING_ERROR,
@@ -55,17 +56,20 @@ void throwOnError(const Status status);
 
 template<typename SentDataType, typename ReceivedDataType>
 struct MethodHandler {
-    typedef std::function<std::shared_ptr<SentDataType>(PeerID, std::shared_ptr<ReceivedDataType>&)> type;
+    typedef std::function<std::shared_ptr<SentDataType>(FileDescriptor peerFD,
+                                                        std::shared_ptr<ReceivedDataType>& data)> type;
 };
 
 template<typename ReceivedDataType>
 struct SignalHandler {
-    typedef std::function<void(PeerID, std::shared_ptr<ReceivedDataType>&)> type;
+    typedef std::function<void(FileDescriptor peerFD,
+                               std::shared_ptr<ReceivedDataType>& data)> type;
 };
 
 template <typename ReceivedDataType>
 struct ResultHandler {
-    typedef std::function<void(Status, std::shared_ptr<ReceivedDataType>&)> type;
+    typedef std::function<void(Status status,
+                               std::shared_ptr<ReceivedDataType>& resultData)> type;
 };
 
 } // namespace ipc
index b8b9e95..679f3df 100644 (file)
@@ -111,38 +111,38 @@ struct ThrowOnAcceptData {
     }
 };
 
-std::shared_ptr<EmptyData> returnEmptyCallback(const PeerID, std::shared_ptr<EmptyData>&)
+std::shared_ptr<EmptyData> returnEmptyCallback(const FileDescriptor, std::shared_ptr<EmptyData>&)
 {
     return std::shared_ptr<EmptyData>(new EmptyData());
 }
 
-std::shared_ptr<SendData> returnDataCallback(const PeerID, std::shared_ptr<SendData>&)
+std::shared_ptr<SendData> returnDataCallback(const FileDescriptor, std::shared_ptr<SendData>&)
 {
     return std::shared_ptr<SendData>(new SendData(1));
 }
 
-std::shared_ptr<SendData> echoCallback(const PeerID, std::shared_ptr<SendData>& data)
+std::shared_ptr<SendData> echoCallback(const FileDescriptor, std::shared_ptr<SendData>& data)
 {
     return data;
 }
 
-std::shared_ptr<SendData> longEchoCallback(const PeerID, std::shared_ptr<SendData>& data)
+std::shared_ptr<SendData> longEchoCallback(const FileDescriptor, std::shared_ptr<SendData>& data)
 {
     std::this_thread::sleep_for(std::chrono::seconds(1));
     return data;
 }
 
-PeerID connect(Service& s, Client& c)
+FileDescriptor connect(Service& s, Client& c)
 {
-    // Connects the Client to the Service and returns Clients PeerID
+    // Connects the Client to the Service and returns Clients FileDescriptor
 
     std::mutex mutex;
     std::condition_variable cv;
 
-    PeerID peerID = 0;
-    auto newPeerCallback = [&cv, &peerID, &mutex](const PeerID newPeerID) {
+    FileDescriptor peerFD = 0;
+    auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFileDescriptor) {
         std::unique_lock<std::mutex> lock(mutex);
-        peerID = newPeerID;
+        peerFD = newFileDescriptor;
         cv.notify_one();
     };
 
@@ -155,11 +155,11 @@ PeerID connect(Service& s, Client& c)
     c.start();
 
     std::unique_lock<std::mutex> lock(mutex);
-    BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerID]() {
-        return peerID != 0;
+    BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerFD]() {
+        return peerFD != 0;
     }));
 
-    return peerID;
+    return peerFD;
 }
 
 void testEcho(Client& c, const MethodID methodID)
@@ -169,10 +169,10 @@ void testEcho(Client& c, const MethodID methodID)
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
-void testEcho(Service& s, const MethodID methodID, const PeerID peerID)
+void testEcho(Service& s, const MethodID methodID, const FileDescriptor peerFD)
 {
     std::shared_ptr<SendData> sentData(new SendData(56));
-    std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(methodID, peerID, sentData);
+    std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(methodID, peerFD, sentData);
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
@@ -216,17 +216,17 @@ BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod)
     c.addMethodHandler<EmptyData, EmptyData>(1, returnEmptyCallback);
     c.addMethodHandler<SendData, SendData>(1, returnDataCallback);
 
-    PeerID peerID = connect(s, c);
+    FileDescriptor peerFD = connect(s, c);
 
     c.addMethodHandler<SendData, SendData>(1, echoCallback);
     c.addMethodHandler<SendData, SendData>(2, returnDataCallback);
 
-    testEcho(s, 1, peerID);
+    testEcho(s, 1, peerFD);
 
     c.removeMethod(1);
     c.removeMethod(2);
 
-    BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException);
+    BOOST_CHECK_THROW(testEcho(s, 1, peerFD), IPCException);
 }
 
 BOOST_AUTO_TEST_CASE(ServiceStartStop)
@@ -305,10 +305,10 @@ BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho)
     Service s(socketPath);
     Client c(socketPath);
     c.addMethodHandler<SendData, SendData>(1, echoCallback);
-    PeerID peerID = connect(s, c);
+    FileDescriptor peerFD = connect(s, c);
 
     std::shared_ptr<SendData> sentData(new SendData(56));
-    std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(1, peerID, sentData);
+    std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(1, peerFD, sentData);
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
@@ -349,7 +349,7 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
     Service s(socketPath);
     Client c(socketPath);
     c.addMethodHandler<SendData, SendData>(1, echoCallback);
-    PeerID peerID = connect(s, c);
+    FileDescriptor peerFD = connect(s, c);
 
     // Async call
     std::shared_ptr<SendData> sentData(new SendData(56));
@@ -364,7 +364,7 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
         cv.notify_one();
     };
 
-    s.callAsync<SendData, SendData>(1, peerID, sentData, dataBack);
+    s.callAsync<SendData, SendData>(1, peerFD, sentData, dataBack);
 
     // Wait for the response
     std::unique_lock<std::mutex> lock(mutex);
@@ -422,7 +422,7 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
 {
     Service s(socketPath);
 
-    auto method = [](const PeerID, std::shared_ptr<ThrowOnAcceptData>&) {
+    auto method = [](const FileDescriptor, std::shared_ptr<ThrowOnAcceptData>&) {
         return std::shared_ptr<SendData>(new SendData(1));
     };
 
@@ -458,7 +458,7 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
 BOOST_AUTO_TEST_CASE(ReadTimeout)
 {
     Service s(socketPath);
-    auto longEchoCallback = [](const PeerID, std::shared_ptr<SendData>& data) {
+    auto longEchoCallback = [](const FileDescriptor, std::shared_ptr<SendData>& data) {
         return std::shared_ptr<LongSendData>(new LongSendData(data->intVal));
     };
     s.addMethodHandler<LongSendData, SendData>(1, longEchoCallback);
@@ -500,12 +500,12 @@ BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
     connect(s, c);
 
     std::atomic_bool isHandlerACalled(false);
-    auto handlerA = [&isHandlerACalled](const PeerID, std::shared_ptr<SendData>&) {
+    auto handlerA = [&isHandlerACalled](const FileDescriptor, std::shared_ptr<SendData>&) {
         isHandlerACalled = true;
     };
 
     std::atomic_bool isHandlerBCalled(false);
-    auto handlerB = [&isHandlerBCalled](const PeerID, std::shared_ptr<SendData>&) {
+    auto handlerB = [&isHandlerBCalled](const FileDescriptor, std::shared_ptr<SendData>&) {
         isHandlerBCalled = true;
     };
 
@@ -528,12 +528,12 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline)
     Client c(socketPath);
 
     std::atomic_bool isHandlerACalled(false);
-    auto handlerA = [&isHandlerACalled](const PeerID, std::shared_ptr<SendData>&) {
+    auto handlerA = [&isHandlerACalled](const FileDescriptor, std::shared_ptr<SendData>&) {
         isHandlerACalled = true;
     };
 
     std::atomic_bool isHandlerBCalled(false);
-    auto handlerB = [&isHandlerBCalled](const PeerID, std::shared_ptr<SendData>&) {
+    auto handlerB = [&isHandlerBCalled](const FileDescriptor, std::shared_ptr<SendData>&) {
         isHandlerBCalled = true;
     };