IPC: Single request queue 22/33922/8
authorJan Olszak <j.olszak@samsung.com>
Thu, 15 Jan 2015 15:37:06 +0000 (16:37 +0100)
committerJan Olszak <j.olszak@samsung.com>
Mon, 19 Jan 2015 15:09:35 +0000 (16:09 +0100)
[Bug/Feature]  Single queue for passing data between threads
               Prefixes in loggs inside Processor
               Destructor always waits till Processor ends
[Cause]        N/A
[Solution]     N/A
[Verification] Build, install, run tests, run tests under valgrind

Change-Id: Idc31496559b46e836528843dfc411cbdeaf259e0

17 files changed:
common/ipc/client.cpp
common/ipc/client.hpp
common/ipc/internals/add-peer-request.hpp [new file with mode: 0644]
common/ipc/internals/call-queue.cpp [deleted file]
common/ipc/internals/call-queue.hpp [deleted file]
common/ipc/internals/finish-request.hpp [new file with mode: 0644]
common/ipc/internals/method-request.hpp [new file with mode: 0644]
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp
common/ipc/internals/remove-peer-request.hpp [new file with mode: 0644]
common/ipc/internals/request-queue.hpp [new file with mode: 0644]
common/ipc/internals/signal-request.hpp [new file with mode: 0644]
common/ipc/service.cpp
common/ipc/service.hpp
common/ipc/types.cpp
common/ipc/types.hpp
tests/unit_tests/ipc/ut-ipc.cpp

index 8455b19..16f77e6 100644 (file)
@@ -32,7 +32,8 @@ namespace vasum {
 namespace ipc {
 
 Client::Client(const std::string& socketPath)
-    : mSocketPath(socketPath)
+    : mProcessor("[CLIENT]  "),
+      mSocketPath(socketPath)
 {
     LOGS("Client Constructor");
 }
@@ -47,19 +48,14 @@ Client::~Client()
     }
 }
 
-void Client::connect()
+void Client::start(const bool usesExternalPolling)
 {
+    LOGS("Client start");
     // Initialize the connection with the server
     LOGD("Connecting to " + mSocketPath);
     auto socketPtr = std::make_shared<Socket>(Socket::connectSocket(mSocketPath));
     mServiceFD = mProcessor.addPeer(socketPtr);
-}
-
-void Client::start()
-{
-    LOGS("Client start");
-    connect();
-    mProcessor.start();
+    mProcessor.start(usesExternalPolling);
 }
 
 bool Client::isStarted()
index de847a9..7b86198 100644 (file)
@@ -55,16 +55,11 @@ public:
     Client& operator=(const Client&) = delete;
 
     /**
-     * Places a connection request in the internal event queue.
-     *
-     * Used with an external polling loop.
-     */
-    void connect();
-
-    /**
      * Starts the worker thread
+     *
+     * @param usesExternalPolling internal or external polling is used
      */
-    void start();
+    void start(const bool usesExternalPolling = false);
 
     /**
     * @return is the communication thread running
diff --git a/common/ipc/internals/add-peer-request.hpp b/common/ipc/internals/add-peer-request.hpp
new file mode 100644 (file)
index 0000000..05c5524
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Processor's request to add a peer
+ */
+
+#ifndef COMMON_IPC_INTERNALS_ADD_PEER_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_ADD_PEER_REQUEST_HPP
+
+#include "ipc/types.hpp"
+#include "ipc/internals/socket.hpp"
+
+namespace vasum {
+namespace ipc {
+
+class AddPeerRequest {
+public:
+    AddPeerRequest(const AddPeerRequest&) = delete;
+    AddPeerRequest& operator=(const AddPeerRequest&) = delete;
+
+    AddPeerRequest(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
+        : peerFD(peerFD),
+          socketPtr(socketPtr)
+    {
+    }
+
+    FileDescriptor peerFD;
+    std::shared_ptr<Socket> socketPtr;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_ADD_PEER_REQUEST_HPP
diff --git a/common/ipc/internals/call-queue.cpp b/common/ipc/internals/call-queue.cpp
deleted file mode 100644 (file)
index 70871e5..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
-*
-*  Contact: Jan Olszak <j.olszak@samsung.com>
-*
-*  Licensed under the Apache License, Version 2.0 (the "License");
-*  you may not use this file except in compliance with the License.
-*  You may obtain a copy of the License at
-*
-*      http://www.apache.org/licenses/LICENSE-2.0
-*
-*  Unless required by applicable law or agreed to in writing, software
-*  distributed under the License is distributed on an "AS IS" BASIS,
-*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-*  See the License for the specific language governing permissions and
-*  limitations under the License
-*/
-
-/**
- * @file
- * @author  Jan Olszak (j.olszak@samsung.com)
- * @brief   Managing the queue with calls
- */
-
-#include "config.hpp"
-
-#include "ipc/internals/call-queue.hpp"
-#include "ipc/exception.hpp"
-#include "logger/logger.hpp"
-#include <algorithm>
-
-namespace vasum {
-namespace ipc {
-
-CallQueue::CallQueue()
-    : mMessageIDCounter(0)
-{
-}
-
-CallQueue::~CallQueue()
-{
-}
-
-bool CallQueue::isEmpty() const
-{
-    return mCalls.empty();
-}
-
-MessageID CallQueue::getNextMessageID()
-{
-    // TODO: This method of generating UIDs is buggy. To be changed.
-    return ++mMessageIDCounter;
-}
-
-bool CallQueue::erase(const MessageID messageID)
-{
-    LOGT("Erase messgeID: " << messageID);
-    auto it = std::find(mCalls.begin(), mCalls.end(), messageID);
-    if (it == mCalls.end()) {
-        LOGT("No such messgeID");
-        return false;
-    }
-
-    mCalls.erase(it);
-    LOGT("Erased");
-    return true;
-}
-
-CallQueue::Call CallQueue::pop()
-{
-    if (isEmpty()) {
-        LOGE("CallQueue is empty");
-        throw IPCException("CallQueue is empty");
-    }
-    Call call = std::move(mCalls.front());
-    mCalls.pop_front();
-    return call;
-}
-
-} // namespace ipc
-} // namespace vasum
diff --git a/common/ipc/internals/call-queue.hpp b/common/ipc/internals/call-queue.hpp
deleted file mode 100644 (file)
index a6e45ed..0000000
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
-*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
-*
-*  Contact: Jan Olszak <j.olszak@samsung.com>
-*
-*  Licensed under the Apache License, Version 2.0 (the "License");
-*  you may not use this file except in compliance with the License.
-*  You may obtain a copy of the License at
-*
-*      http://www.apache.org/licenses/LICENSE-2.0
-*
-*  Unless required by applicable law or agreed to in writing, software
-*  distributed under the License is distributed on an "AS IS" BASIS,
-*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-*  See the License for the specific language governing permissions and
-*  limitations under the License
-*/
-
-/**
- * @file
- * @author  Jan Olszak (j.olszak@samsung.com)
- * @brief   Managing the queue with calls
- */
-
-#ifndef COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
-#define COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
-
-#include "ipc/types.hpp"
-#include "config/manager.hpp"
-#include "logger/logger-scope.hpp"
-
-#include <atomic>
-#include <list>
-
-namespace vasum {
-namespace ipc {
-
-/**
-* Class for managing a queue of calls in the Processor
-*/
-class CallQueue {
-public:
-    typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
-    typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
-
-    struct Call {
-        Call(const Call& other) = delete;
-        Call& operator=(const Call&) = delete;
-        Call& operator=(Call&&) = default;
-        Call() = default;
-        Call(Call&&) = default;
-
-        bool operator==(const MessageID m)
-        {
-            return m == messageID;
-        }
-
-        FileDescriptor peerFD;
-        MethodID methodID;
-        MessageID messageID;
-        std::shared_ptr<void> data;
-        SerializeCallback serialize;
-        ParseCallback parse;
-        ResultHandler<void>::type process;
-    };
-
-    CallQueue();
-    ~CallQueue();
-
-    CallQueue(const CallQueue&) = delete;
-    CallQueue(CallQueue&&) = delete;
-    CallQueue& operator=(const CallQueue&) = delete;
-
-    template<typename SentDataType, typename ReceivedDataType>
-    MessageID push(const MethodID methodID,
-                   const FileDescriptor peerFD,
-                   const std::shared_ptr<SentDataType>& data,
-                   const typename ResultHandler<ReceivedDataType>::type& process);
-
-
-    template<typename SentDataType>
-    MessageID push(const MethodID methodID,
-                   const FileDescriptor peerFD,
-                   const std::shared_ptr<SentDataType>& data);
-
-    Call pop();
-
-    bool erase(const MessageID messageID);
-
-    bool isEmpty() const;
-
-private:
-    std::list<Call> mCalls;
-    std::atomic<MessageID> mMessageIDCounter;
-
-    MessageID getNextMessageID();
-};
-
-
-template<typename SentDataType, typename ReceivedDataType>
-MessageID CallQueue::push(const MethodID methodID,
-                          const FileDescriptor peerFD,
-                          const std::shared_ptr<SentDataType>& data,
-                          const typename ResultHandler<ReceivedDataType>::type& process)
-{
-    Call call;
-    call.methodID = methodID;
-    call.peerFD = peerFD;
-    call.data = data;
-
-    MessageID messageID = getNextMessageID();
-    call.messageID = messageID;
-
-    call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
-        LOGS("Method serialize, peerFD: " << fd);
-        config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
-    };
-
-    call.parse = [](const int fd)->std::shared_ptr<void> {
-        LOGS("Method parse, peerFD: " << fd);
-        std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
-        config::loadFromFD<ReceivedDataType>(fd, *data);
-        return data;
-    };
-
-    call.process = [process](Status status, std::shared_ptr<void>& data)->void {
-        LOGS("Method process, status: " << toString(status));
-        std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
-        return process(status, tmpData);
-    };
-
-    mCalls.push_back(std::move(call));
-
-    return messageID;
-}
-
-template<typename SentDataType>
-MessageID CallQueue::push(const MethodID methodID,
-                          const FileDescriptor peerFD,
-                          const std::shared_ptr<SentDataType>& data)
-{
-    Call call;
-    call.methodID = methodID;
-    call.peerFD = peerFD;
-    call.data = data;
-
-    MessageID messageID = getNextMessageID();
-    call.messageID = messageID;
-
-    call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
-        LOGS("Signal serialize, peerFD: " << fd);
-        config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
-    };
-
-    mCalls.push_back(std::move(call));
-
-    return messageID;
-}
-
-} // namespace ipc
-} // namespace vasum
-
-#endif // COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
diff --git a/common/ipc/internals/finish-request.hpp b/common/ipc/internals/finish-request.hpp
new file mode 100644 (file)
index 0000000..3019475
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Managing the queue with requests
+ */
+
+#ifndef COMMON_IPC_INTERNALS_FINISH_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_FINISH_REQUEST_HPP
+
+#include <condition_variable>
+
+namespace vasum {
+namespace ipc {
+
+class FinishRequest {
+public:
+    FinishRequest(const FinishRequest&) = delete;
+    FinishRequest& operator=(const FinishRequest&) = delete;
+
+    FinishRequest(const std::shared_ptr<std::condition_variable_any>& conditionPtr)
+        : conditionPtr(conditionPtr)
+    {}
+
+    std::shared_ptr<std::condition_variable_any> conditionPtr;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_FINISH_REQUEST_HPP
diff --git a/common/ipc/internals/method-request.hpp b/common/ipc/internals/method-request.hpp
new file mode 100644 (file)
index 0000000..f9860f7
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Processor's request to call a method
+ */
+
+#ifndef COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
+
+#include "ipc/types.hpp"
+#include "logger/logger-scope.hpp"
+#include "config/manager.hpp"
+
+namespace vasum {
+namespace ipc {
+
+class MethodRequest {
+public:
+    MethodRequest(const MethodRequest&) = delete;
+    MethodRequest& operator=(const MethodRequest&) = delete;
+
+    template<typename SentDataType, typename ReceivedDataType>
+    static std::shared_ptr<MethodRequest> create(const MethodID methodID,
+                                                 const FileDescriptor peerFD,
+                                                 const std::shared_ptr<SentDataType>& data,
+                                                 const typename ResultHandler<ReceivedDataType>::type& process);
+
+    MethodID methodID;
+    FileDescriptor peerFD;
+    MessageID messageID;
+    std::shared_ptr<void> data;
+    SerializeCallback serialize;
+    ParseCallback parse;
+    ResultHandler<void>::type process;
+
+private:
+    MethodRequest(const MethodID methodID, const FileDescriptor peerFD)
+        : methodID(methodID),
+          peerFD(peerFD),
+          messageID(getNextMessageID())
+    {}
+};
+
+
+template<typename SentDataType, typename ReceivedDataType>
+std::shared_ptr<MethodRequest> MethodRequest::create(const MethodID methodID,
+                                                     const FileDescriptor peerFD,
+                                                     const std::shared_ptr<SentDataType>& data,
+                                                     const typename ResultHandler<ReceivedDataType>::type& process)
+{
+    std::shared_ptr<MethodRequest> request(new MethodRequest(methodID, peerFD));
+
+    request->data = data;
+
+    request->serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+        LOGS("Method serialize, peerFD: " << fd);
+        config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
+    };
+
+    request->parse = [](const int fd)->std::shared_ptr<void> {
+        LOGS("Method parse, peerFD: " << fd);
+        std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
+        config::loadFromFD<ReceivedDataType>(fd, *data);
+        return data;
+    };
+
+    request->process = [process](Status status, std::shared_ptr<void>& data)->void {
+        LOGS("Method process, status: " << toString(status));
+        std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
+        return process(status, tmpData);
+    };
+
+    return request;
+}
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
index d1e829e..05c97aa 100644 (file)
@@ -46,20 +46,23 @@ namespace ipc {
         expr;                                          \
     }                                                  \
     catch (const std::exception& e){                   \
-        LOGE("Callback threw an error: " << e.what()); \
+        LOGE(mLogPrefix + "Callback threw an error: " << e.what()); \
     }
 
 const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
 
-Processor::Processor(const PeerCallback& newPeerCallback,
+Processor::Processor(const std::string& logName,
+                     const PeerCallback& newPeerCallback,
                      const PeerCallback& removedPeerCallback,
                      const unsigned int maxNumberOfPeers)
-    : mNewPeerCallback(newPeerCallback),
+    : mLogPrefix(logName),
+      mIsRunning(false),
+      mNewPeerCallback(newPeerCallback),
       mRemovedPeerCallback(removedPeerCallback),
       mMaxNumberOfPeers(maxNumberOfPeers)
 {
-    LOGS("Processor Constructor");
+    LOGS(mLogPrefix + "Processor Constructor");
 
     utils::signalBlock(SIGPIPE);
     using namespace std::placeholders;
@@ -69,39 +72,57 @@ Processor::Processor(const PeerCallback& newPeerCallback,
 
 Processor::~Processor()
 {
-    LOGS("Processor Destructor");
+    LOGS(mLogPrefix + "Processor Destructor");
     try {
         stop();
     } catch (IPCException& e) {
-        LOGE("Error in Processor's destructor: " << e.what());
+        LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what());
     }
 }
 
 bool Processor::isStarted()
 {
-    return mThread.joinable();
+    Lock lock(mStateMutex);
+    return mIsRunning;
 }
 
-void Processor::start()
+void Processor::start(bool usesExternalPolling)
 {
-    LOGS("Processor start");
+    LOGS(mLogPrefix + "Processor start");
 
+    Lock lock(mStateMutex);
     if (!isStarted()) {
-        mThread = std::thread(&Processor::run, this);
+        LOGI(mLogPrefix + "Processor start");
+        mIsRunning = true;
+        if (!usesExternalPolling) {
+            mThread = std::thread(&Processor::run, this);
+        }
     }
 }
 
 void Processor::stop()
 {
-    LOGS("Processor stop");
+    LOGS(mLogPrefix + "Processor stop");
 
     if (isStarted()) {
+        auto conditionPtr = std::make_shared<std::condition_variable_any>();
         {
             Lock lock(mStateMutex);
-            mEventQueue.send(Event::FINISH);
+            auto request = std::make_shared<FinishRequest>(conditionPtr);
+            mRequestQueue.push(Event::FINISH, request);
+        }
+
+        LOGD(mLogPrefix + "Waiting for the Processor to stop");
+
+        if (mThread.joinable()) {
+            mThread.join();
+        } else {
+            // Wait till the FINISH request is served
+            Lock lock(mStateMutex);
+            conditionPtr->wait(lock, [this]() {
+                return !isStarted();
+            });
         }
-        LOGT("Waiting for the Processor to stop");
-        mThread.join();
     }
 }
 
@@ -120,7 +141,7 @@ void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
 FileDescriptor Processor::getEventFD()
 {
     Lock lock(mStateMutex);
-    return mEventQueue.getFD();
+    return mRequestQueue.getFD();
 }
 
 void Processor::removeMethod(const MethodID methodID)
@@ -131,51 +152,53 @@ void Processor::removeMethod(const MethodID methodID)
 
 FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
 {
-    LOGS("Processor addPeer");
+    LOGS(mLogPrefix + "Processor addPeer");
     Lock lock(mStateMutex);
+
     FileDescriptor peerFD = socketPtr->getFD();
-    SocketInfo socketInfo(peerFD, std::move(socketPtr));
-    mNewSockets.push(std::move(socketInfo));
-    mEventQueue.send(Event::ADD_PEER);
+    auto request = std::make_shared<AddPeerRequest>(peerFD, socketPtr);
+    mRequestQueue.push(Event::ADD_PEER, request);
 
-    LOGI("New peer added. Id: " << peerFD);
+    LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD);
 
     return peerFD;
 }
 
 void Processor::removePeer(const FileDescriptor peerFD)
 {
-    LOGS("Processor removePeer peerFD: " << peerFD);
-
-    // TODO: Remove ADD_PEER event if it's not processed
+    LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD);
 
+    {
+        Lock lock(mStateMutex);
+        mRequestQueue.removeIf([peerFD](Request & request) {
+            return request.requestID == Event::ADD_PEER &&
+                   request.get<AddPeerRequest>()->peerFD == peerFD;
+        });
+    }
 
     // Remove peer and wait till he's gone
-    std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
+    std::shared_ptr<std::condition_variable_any> conditionPtr(new std::condition_variable_any());
     {
         Lock lock(mStateMutex);
-        RemovePeerRequest request(peerFD, conditionPtr);
-        mPeersToDelete.push(std::move(request));
-        mEventQueue.send(Event::REMOVE_PEER);
+        auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
+        mRequestQueue.push(Event::REMOVE_PEER, request);
     }
 
     auto isPeerDeleted = [&peerFD, this]()->bool {
-        Lock lock(mStateMutex);
         return mSockets.count(peerFD) == 0;
     };
 
-    std::mutex mutex;
-    std::unique_lock<std::mutex> lock(mutex);
+    Lock lock(mStateMutex);
     conditionPtr->wait(lock, isPeerDeleted);
 }
 
 void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
 {
-    LOGS("Processor removePeerInternal peerFD: " << peerFD);
-    LOGI("Removing peer. peerFD: " << peerFD);
+    LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD);
+    LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD);
 
     if (!mSockets.erase(peerFD)) {
-        LOGW("No such peer. Another thread called removePeerInternal");
+        LOGW(mLogPrefix + "No such peer. Another thread called removePeerInternal");
         return;
     }
 
@@ -211,21 +234,25 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
 
 void Processor::resetPolling()
 {
+    LOGS(mLogPrefix + "Processor resetPolling");
+
     if (!isStarted()) {
+        LOGW(mLogPrefix + "Processor not started! Polling not reset!");
         return;
     }
 
     {
         Lock lock(mStateMutex);
-
+        LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
         // Setup polling on eventfd and sockets
         mFDs.resize(mSockets.size() + 1);
 
-        mFDs[0].fd = mEventQueue.getFD();
+        mFDs[0].fd = mRequestQueue.getFD();
         mFDs[0].events = POLLIN;
 
         auto socketIt = mSockets.begin();
         for (unsigned int i = 1; i < mFDs.size(); ++i) {
+            LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
             mFDs[i].fd = socketIt->second->getFD();
             mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
             ++socketIt;
@@ -236,20 +263,19 @@ void Processor::resetPolling()
 
 void Processor::run()
 {
-    LOGS("Processor run");
+    LOGS(mLogPrefix + "Processor run");
 
     resetPolling();
 
-    mIsRunning = true;
-    while (mIsRunning) {
-        LOGT("Waiting for communication...");
+    while (isStarted()) {
+        LOGT(mLogPrefix + "Waiting for communication...");
         int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
-        LOGT("... incoming communication!");
+        LOGT(mLogPrefix + "... incoming communication!");
         if (ret == -1 || ret == 0) {
             if (errno == EINTR) {
                 continue;
             }
-            LOGE("Error in poll: " << std::string(strerror(errno)));
+            LOGE(mLogPrefix + "Error in poll: " << std::string(strerror(errno)));
             throw IPCException("Error in poll: " + std::string(strerror(errno)));
         }
 
@@ -285,7 +311,7 @@ bool Processor::handleLostConnections()
     {
         for (unsigned int i = 1; i < mFDs.size(); ++i) {
             if (mFDs[i].revents & POLLHUP) {
-                LOGI("Lost connection to peer: " << mFDs[i].fd);
+                LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd);
                 mFDs[i].revents &= ~(POLLHUP);
                 removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED);
                 isPeerRemoved = true;
@@ -320,7 +346,7 @@ bool Processor::handleInputs()
 
 bool Processor::handleInput(const FileDescriptor peerFD)
 {
-    LOGS("Processor handleInput peerFD: " << peerFD);
+    LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD);
     Lock lock(mStateMutex);
 
     std::shared_ptr<Socket> socketPtr;
@@ -328,7 +354,7 @@ bool Processor::handleInput(const FileDescriptor peerFD)
         // Get the peer's socket
         socketPtr = mSockets.at(peerFD);
     } catch (const std::out_of_range&) {
-        LOGE("No such peer: " << peerFD);
+        LOGE(mLogPrefix + "No such peer: " << peerFD);
         return false;
     }
 
@@ -341,7 +367,7 @@ bool Processor::handleInput(const FileDescriptor peerFD)
             socketPtr->read(&messageID, sizeof(messageID));
 
         } catch (const IPCException& e) {
-            LOGE("Error during reading the socket");
+            LOGE(mLogPrefix + "Error during reading the socket");
             removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
             return true;
         }
@@ -362,7 +388,7 @@ bool Processor::handleInput(const FileDescriptor peerFD)
 
             } else {
                 // Nothing
-                LOGW("No method or signal callback for methodID: " << methodID);
+                LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID);
                 removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
                 return true;
             }
@@ -373,7 +399,7 @@ bool Processor::handleInput(const FileDescriptor peerFD)
 std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
                                                               std::shared_ptr<RegisterSignalsMessage>& data)
 {
-    LOGS("Processor onNewSignals peerFD: " << peerFD);
+    LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD);
 
     for (const MethodID methodID : data->ids) {
         mSignalsPeers[methodID].push_back(peerFD);
@@ -385,35 +411,35 @@ std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescript
 bool Processor::onReturnValue(const Socket& socket,
                               const MessageID messageID)
 {
-    LOGS("Processor onReturnValue messageID: " << messageID);
+    LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
 
-    // LOGI("Return value for messageID: " << messageID);
+    // LOGI(mLogPrefix + "Return value for messageID: " << messageID);
     ReturnCallbacks returnCallbacks;
     try {
-        LOGT("Getting the return callback");
+        LOGT(mLogPrefix + "Getting the return callback");
         returnCallbacks = std::move(mReturnCallbacks.at(messageID));
         mReturnCallbacks.erase(messageID);
     } catch (const std::out_of_range&) {
-        LOGW("No return callback for messageID: " << messageID);
+        LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
         return true;
     }
 
     std::shared_ptr<void> data;
     try {
-        LOGT("Parsing incoming return data");
+        LOGT(mLogPrefix + "Parsing incoming return data");
         data = returnCallbacks.parse(socket.getFD());
     } catch (const std::exception& e) {
-        LOGE("Exception during parsing: " << e.what());
+        LOGE(mLogPrefix + "Exception during parsing: " << e.what());
         IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
         return true;
     }
 
-    // LOGT("Process return value callback for messageID: " << messageID);
+    // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID);
     IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
 
-    // LOGT("Return value for messageID: " << messageID << " processed");
+    // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed");
     return false;
 }
 
@@ -422,25 +448,25 @@ bool Processor::onRemoteSignal(const Socket& socket,
                                const MessageID messageID,
                                std::shared_ptr<SignalHandlers> signalCallbacks)
 {
-    LOGS("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
+    LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
 
-    // LOGI("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
+    // LOGI(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
 
     std::shared_ptr<void> data;
     try {
-        LOGT("Parsing incoming data");
+        LOGT(mLogPrefix + "Parsing incoming data");
         data = signalCallbacks->parse(socket.getFD());
     } catch (const std::exception& e) {
-        LOGE("Exception during parsing: " << e.what());
+        LOGE(mLogPrefix + "Exception during parsing: " << e.what());
         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
         return true;
     }
 
-    // LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
+    // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID);
     try {
         signalCallbacks->signal(socket.getFD(), data);
     } catch (const std::exception& e) {
-        LOGE("Exception in method handler: " << e.what());
+        LOGE(mLogPrefix + "Exception in method handler: " << e.what());
         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
         return true;
     }
@@ -453,30 +479,30 @@ bool Processor::onRemoteCall(const Socket& socket,
                              const MessageID messageID,
                              std::shared_ptr<MethodHandlers> methodCallbacks)
 {
-    LOGS("Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
-    // LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
+    LOGS(mLogPrefix + "Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
+    // LOGI(mLogPrefix + "Remote call; methodID: " << methodID << " messageID: " << messageID);
 
     std::shared_ptr<void> data;
     try {
-        LOGT("Parsing incoming data");
+        LOGT(mLogPrefix + "Parsing incoming data");
         data = methodCallbacks->parse(socket.getFD());
     } catch (const std::exception& e) {
-        LOGE("Exception during parsing: " << e.what());
+        LOGE(mLogPrefix + "Exception during parsing: " << e.what());
         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
         return true;
     }
 
-    LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
+    LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID);
     std::shared_ptr<void> returnData;
     try {
         returnData = methodCallbacks->method(socket.getFD(), data);
     } catch (const std::exception& e) {
-        LOGE("Exception in method handler: " << e.what());
+        LOGE(mLogPrefix + "Exception in method handler: " << e.what());
         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
         return true;
     }
 
-    LOGT("Sending return data; methodID: " << methodID << "; messageID: " << messageID);
+    LOGT(mLogPrefix + "Sending return data; methodID: " << methodID << "; messageID: " << messageID);
     try {
         // Send the call with the socket
         Socket::Guard guard = socket.getGuard();
@@ -484,7 +510,7 @@ bool Processor::onRemoteCall(const Socket& socket,
         socket.write(&messageID, sizeof(messageID));
         methodCallbacks->serialize(socket.getFD(), returnData);
     } catch (const std::exception& e) {
-        LOGE("Exception during serialization: " << e.what());
+        LOGE(mLogPrefix + "Exception during serialization: " << e.what());
         removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
         return true;
     }
@@ -494,225 +520,213 @@ bool Processor::onRemoteCall(const Socket& socket,
 
 bool Processor::handleEvent()
 {
-    LOGS("Processor handleEvent");
+    LOGS(mLogPrefix + "Processor handleEvent");
 
     Lock lock(mStateMutex);
 
-    switch (mEventQueue.receive()) {
-
-    case Event::FINISH: {
-        LOGD("Event FINISH");
-        mIsRunning = false;
-        cleanCommunication();
-        return false;
-    }
-
-    case Event::CALL: {
-        LOGD("Event CALL");
-        return onCall();
-    }
+    auto request = mRequestQueue.pop();
+    LOGD(mLogPrefix + "Got: " << request.requestID);
 
-    case Event::ADD_PEER: {
-        LOGD("Event ADD_PEER");
-        return onNewPeer();
-    }
-
-    case Event::REMOVE_PEER: {
-        LOGD("Event REMOVE_PEER");
-        return onRemovePeer();
-    }
+    switch (request.requestID) {
+    case Event::METHOD:      return onMethodRequest(*request.get<MethodRequest>());
+    case Event::SIGNAL:      return onSignalRequest(*request.get<SignalRequest>());
+    case Event::ADD_PEER:    return onAddPeerRequest(*request.get<AddPeerRequest>());
+    case Event::REMOVE_PEER: return onRemovePeerRequest(*request.get<RemovePeerRequest>());
+    case Event::FINISH:      return onFinishRequest(*request.get<FinishRequest>());
     }
 
     return false;
 }
 
-bool Processor::onNewPeer()
+bool Processor::onMethodRequest(MethodRequest& request)
 {
-    LOGS("Processor onNewPeer");
+    LOGS(mLogPrefix + "Processor onMethodRequest");
+    std::shared_ptr<Socket> socketPtr;
 
-    // TODO: What if there is no newSocket? (request removed in the mean time)
-    // Add new socket of the peer
-    SocketInfo socketInfo = std::move(mNewSockets.front());
-    mNewSockets.pop();
+    try {
+        // Get the peer's socket
+        socketPtr = mSockets.at(request.peerFD);
+    } catch (const std::out_of_range&) {
+        LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
+
+        // Pass the error to the processing callback
+        IGNORE_EXCEPTIONS(request.process(Status::PEER_DISCONNECTED, request.data));
 
-    if (mSockets.size() > mMaxNumberOfPeers) {
-        LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
-        return false;
-    }
-    if (mSockets.count(socketInfo.peerFD) != 0) {
-        LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
         return false;
     }
 
-    mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
-
-
-    // LOGW("Sending handled signals");
-    std::vector<MethodID> ids;
-    for (const auto kv : mSignalsCallbacks) {
-        ids.push_back(kv.first);
+    if (mReturnCallbacks.count(request.messageID) != 0) {
+        LOGE(mLogPrefix + "There already was a return callback for messageID: " << request.messageID);
     }
-    auto data = std::make_shared<RegisterSignalsMessage>(ids);
-    callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
-                                                 socketInfo.peerFD,
-                                                 data,
-                                                 discardResultHandler<EmptyData>);
-    // LOGW("Sent handled signals");
+    mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(request.peerFD,
+                                                                    std::move(request.parse),
+                                                                    std::move(request.process)));
 
-    resetPolling();
-
-    if (mNewPeerCallback) {
-        // Notify about the new user.
-        LOGT("Calling NewPeerCallback");
-        mNewPeerCallback(socketInfo.peerFD);
-    }
+    try {
+        // Send the call with the socket
+        Socket::Guard guard = socketPtr->getGuard();
+        socketPtr->write(&request.methodID, sizeof(request.methodID));
+        socketPtr->write(&request.messageID, sizeof(request.messageID));
+        LOGT(mLogPrefix + "Serializing the message");
+        request.serialize(socketPtr->getFD(), request.data);
+    } catch (const std::exception& e) {
+        LOGE(mLogPrefix + "Error during sending a method: " << e.what());
 
-    return true;
-}
+        // Inform about the error,
+        IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(Status::SERIALIZATION_ERROR, request.data));
 
-bool Processor::onRemovePeer()
-{
-    LOGS("Processor onRemovePeer");
 
-    removePeerInternal(mPeersToDelete.front().peerFD, Status::REMOVED_PEER);
+        mReturnCallbacks.erase(request.messageID);
+        removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
 
-    mPeersToDelete.front().conditionPtr->notify_all();
-    mPeersToDelete.pop();
-    return true;
-}
+        return true;
 
-bool Processor::onCall()
-{
-    LOGS("Processor onCall");
-    CallQueue::Call call;
-    try {
-        call = std::move(mCalls.pop());
-    } catch (const IPCException&) {
-        LOGE("No calls to serve, but got an EVENT::CALL. Event got removed before serving");
-        return false;
     }
 
-    if (call.parse && call.process) {
-        return onMethodCall(call);
-    } else {
-        return onSignalCall(call);
-    }
+    return false;
 }
 
-bool Processor::onSignalCall(CallQueue::Call& call)
+bool Processor::onSignalRequest(SignalRequest& request)
 {
-    LOGS("Processor onSignalCall");
+    LOGS(mLogPrefix + "Processor onSignalRequest");
 
     std::shared_ptr<Socket> socketPtr;
     try {
         // Get the peer's socket
-        socketPtr = mSockets.at(call.peerFD);
+        socketPtr = mSockets.at(request.peerFD);
     } catch (const std::out_of_range&) {
-        LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
+        LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
         return false;
     }
 
     try {
         // Send the call with the socket
         Socket::Guard guard = socketPtr->getGuard();
-        socketPtr->write(&call.methodID, sizeof(call.methodID));
-        socketPtr->write(&call.messageID, sizeof(call.messageID));
-        call.serialize(socketPtr->getFD(), call.data);
+        socketPtr->write(&request.methodID, sizeof(request.methodID));
+        socketPtr->write(&request.messageID, sizeof(request.messageID));
+        request.serialize(socketPtr->getFD(), request.data);
     } catch (const std::exception& e) {
-        LOGE("Error during sending a signal: " << e.what());
+        LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
 
-        removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
+        removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
         return true;
     }
 
     return false;
 }
 
-bool Processor::onMethodCall(CallQueue::Call& call)
+bool Processor::onAddPeerRequest(AddPeerRequest& request)
 {
-    LOGS("Processor onMethodCall");
-    std::shared_ptr<Socket> socketPtr;
-
-
-    try {
-        // Get the peer's socket
-        socketPtr = mSockets.at(call.peerFD);
-    } catch (const std::out_of_range&) {
-        LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
-
-        // Pass the error to the processing callback
-        IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
+    LOGS(mLogPrefix + "Processor onAddPeerRequest");
 
+    if (mSockets.size() > mMaxNumberOfPeers) {
+        LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerFD);
         return false;
     }
-
-    if (mReturnCallbacks.count(call.messageID) != 0) {
-        LOGE("There already was a return callback for messageID: " << call.messageID);
+    if (mSockets.count(request.peerFD) != 0) {
+        LOGE(mLogPrefix + "There already was a socket for peerFD: " << request.peerFD);
+        return false;
     }
-    mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
-                                                                 std::move(call.parse),
-                                                                 std::move(call.process)));
 
-    try {
-        // Send the call with the socket
-        Socket::Guard guard = socketPtr->getGuard();
-        socketPtr->write(&call.methodID, sizeof(call.methodID));
-        socketPtr->write(&call.messageID, sizeof(call.messageID));
-        LOGT("Serializing the message");
-        call.serialize(socketPtr->getFD(), call.data);
-    } catch (const std::exception& e) {
-        LOGE("Error during sending a method: " << e.what());
+    mSockets[request.peerFD] = std::move(request.socketPtr);
 
-        // Inform about the error,
-        IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data));
 
+    // Sending handled signals
+    std::vector<MethodID> ids;
+    for (const auto kv : mSignalsCallbacks) {
+        ids.push_back(kv.first);
+    }
+    auto data = std::make_shared<RegisterSignalsMessage>(ids);
+    callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
+                                                 request.peerFD,
+                                                 data,
+                                                 discardResultHandler<EmptyData>);
 
-        mReturnCallbacks.erase(call.messageID);
-        removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
 
-        return true;
+    resetPolling();
 
+    if (mNewPeerCallback) {
+        // Notify about the new user.
+        LOGT(mLogPrefix + "Calling NewPeerCallback");
+        mNewPeerCallback(request.peerFD);
     }
 
-    return false;
+    LOGI(mLogPrefix + "New peer: " << request.peerFD);
+    return true;
 }
 
-void Processor::cleanCommunication()
+bool Processor::onRemovePeerRequest(RemovePeerRequest& request)
 {
-    LOGS("Processor cleanCommunication");
+    LOGS(mLogPrefix + "Processor onRemovePeer");
 
-    while (!mEventQueue.isEmpty()) {
-        switch (mEventQueue.receive()) {
-        case Event::FINISH: {
-            LOGE("Event FINISH after FINISH");
-            break;
-        }
-        case Event::CALL: {
-            LOGW("Event CALL after FINISH");
-            try {
-                CallQueue::Call call = mCalls.pop();
-                if (call.process) {
-                    IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
-                }
-            } catch (const IPCException&) {
-                // No more calls
-            }
-            break;
-        }
+    removePeerInternal(request.peerFD, Status::REMOVED_PEER);
+    request.conditionPtr->notify_all();
+
+    return true;
+}
+
+bool Processor::onFinishRequest(FinishRequest& request)
+{
+    LOGS(mLogPrefix + "Processor onFinishRequest");
+
+    // Clean the mRequestQueue
+    while (!mRequestQueue.isEmpty()) {
+        auto request = mRequestQueue.pop();
+        LOGE(mLogPrefix + "Got: " << request.requestID << " after FINISH");
 
-        case Event::ADD_PEER: {
-            LOGW("Event ADD_PEER after FINISH");
+        switch (request.requestID) {
+        case Event::METHOD: {
+            auto requestPtr = request.get<MethodRequest>();
+            IGNORE_EXCEPTIONS(requestPtr->process(Status::CLOSING, requestPtr->data));
             break;
         }
-
         case Event::REMOVE_PEER: {
-            LOGW("Event REMOVE_PEER after FINISH");
-            mPeersToDelete.front().conditionPtr->notify_all();
-            mPeersToDelete.pop();
+            request.get<RemovePeerRequest>()->conditionPtr->notify_all();
             break;
         }
+        case Event::SIGNAL:
+        case Event::ADD_PEER:
+        case Event::FINISH:
+            break;
         }
     }
+
+    mIsRunning = false;
+    request.conditionPtr->notify_all();
+    return true;
+}
+
+std::ostream& operator<<(std::ostream& os, const Processor::Event& event)
+{
+    switch (event) {
+
+    case Processor::Event::FINISH: {
+        os << "Event::FINISH";
+        break;
+    }
+
+    case Processor::Event::METHOD: {
+        os << "Event::METHOD";
+        break;
+    }
+
+    case Processor::Event::SIGNAL: {
+        os << "Event::SIGNAL";
+        break;
+    }
+
+    case Processor::Event::ADD_PEER: {
+        os << "Event::ADD_PEER";
+        break;
+    }
+
+    case Processor::Event::REMOVE_PEER: {
+        os << "Event::REMOVE_PEER";
+        break;
+    }
+    }
+
+    return os;
 }
 
 } // namespace ipc
index b0f7ea0..157f39c 100644 (file)
 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
 
 #include "ipc/internals/socket.hpp"
-#include "ipc/internals/event-queue.hpp"
-#include "ipc/internals/call-queue.hpp"
+#include "ipc/internals/request-queue.hpp"
+#include "ipc/internals/method-request.hpp"
+#include "ipc/internals/signal-request.hpp"
+#include "ipc/internals/add-peer-request.hpp"
+#include "ipc/internals/remove-peer-request.hpp"
+#include "ipc/internals/finish-request.hpp"
 #include "ipc/exception.hpp"
 #include "ipc/types.hpp"
 #include "config/manager.hpp"
@@ -35,9 +39,9 @@
 #include "logger/logger.hpp"
 #include "logger/logger-scope.hpp"
 
+#include <ostream>
 #include <poll.h>
 #include <condition_variable>
-#include <queue>
 #include <mutex>
 #include <chrono>
 #include <vector>
@@ -67,7 +71,6 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
 *
 * TODO:
-*  - some mutexes may not be needed
 *  - synchronous call to many peers
 *  - implement HandlerStore class for storing both signals and methods
 *  - API for removing signals
@@ -81,12 +84,22 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
 *  - no new events added after stop() called
 *  - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
 *    there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
-*  - EventQueue should store std::shared_ptr<void> and it should be the only queue to the Processor thread.
-*    It should have an API for removing events from the middle of the queue
 *
 */
 class Processor {
+private:
+    enum class Event {
+        FINISH,     // Shutdown request
+        METHOD,     // New method call in the queue
+        SIGNAL,     // New signal call in the queue
+        ADD_PEER,   // New peer in the queue
+        REMOVE_PEER // Remove peer
+    };
+
 public:
+
+    friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event);
+
     /**
      * Used to indicate a message with the return value.
      */
@@ -104,7 +117,8 @@ public:
      * @param newPeerCallback called when a new peer arrives
      * @param removedPeerCallback called when the Processor stops listening for this peer
      */
-    Processor(const PeerCallback& newPeerCallback = nullptr,
+    Processor(const std::string& logName = "",
+              const PeerCallback& newPeerCallback = nullptr,
               const PeerCallback& removedPeerCallback = nullptr,
               const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
     ~Processor();
@@ -113,11 +127,14 @@ public:
     Processor(Processor&&) = delete;
     Processor& operator=(const Processor&) = delete;
 
+
     /**
      * Start the processing thread.
      * Quits immediately after starting the thread.
+     *
+     * @param usesExternalPolling internal or external polling is used
      */
-    void start();
+    void start(const bool usesExternalPolling);
 
     /**
      * @return is processor running
@@ -281,6 +298,7 @@ private:
     typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
     typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
     typedef std::unique_lock<std::recursive_mutex> Lock;
+    typedef RequestQueue<Event>::Request Request;
 
     struct EmptyData {
         CONFIG_REGISTER_EMPTY
@@ -337,56 +355,18 @@ private:
         ResultHandler<void>::type process;
     };
 
-    struct SocketInfo {
-        SocketInfo(const SocketInfo& other) = delete;
-        SocketInfo& operator=(const SocketInfo&) = delete;
-        SocketInfo() = default;
-        SocketInfo(SocketInfo&&) = default;
-        SocketInfo& operator=(SocketInfo &&) = default;
-
-        SocketInfo(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
-            : peerFD(peerFD), socketPtr(socketPtr) {}
-
-        FileDescriptor peerFD;
-        std::shared_ptr<Socket> socketPtr;
-    };
-
-    struct RemovePeerRequest {
-        RemovePeerRequest(const RemovePeerRequest& other) = delete;
-        RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
-        RemovePeerRequest() = default;
-        RemovePeerRequest(RemovePeerRequest&&) = default;
-        RemovePeerRequest& operator=(RemovePeerRequest &&) = default;
-
-        RemovePeerRequest(const FileDescriptor peerFD,
-                          const std::shared_ptr<std::condition_variable>& conditionPtr)
-            : peerFD(peerFD), conditionPtr(conditionPtr) {}
-
-        FileDescriptor peerFD;
-        std::shared_ptr<std::condition_variable> conditionPtr;
-    };
-
-    enum class Event : int {
-        FINISH,     // Shutdown request
-        CALL,       // New method call in the queue
-        ADD_PEER,   // New peer in the queue
-        REMOVE_PEER // Remove peer
-    };
-    EventQueue<Event> mEventQueue;
+    std::string mLogPrefix;
 
+    RequestQueue<Event> mRequestQueue;
 
     bool mIsRunning;
 
-
-    CallQueue mCalls;
     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
     std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
     std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
 
     std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
     std::vector<struct pollfd> mFDs;
-    std::queue<SocketInfo> mNewSockets;
-    std::queue<RemovePeerRequest> mPeersToDelete;
 
     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
 
@@ -408,11 +388,14 @@ private:
     static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
 
     void run();
-    bool onCall();
-    bool onSignalCall(CallQueue::Call& call);
-    bool onMethodCall(CallQueue::Call& call);
-    bool onNewPeer();
-    bool onRemovePeer();
+
+    // Request handlers
+    bool onMethodRequest(MethodRequest& request);
+    bool onSignalRequest(SignalRequest& request);
+    bool onAddPeerRequest(AddPeerRequest& request);
+    bool onRemovePeerRequest(RemovePeerRequest& request);
+    bool onFinishRequest(FinishRequest& request);
+
     bool handleLostConnections();
     bool handleInputs();
 
@@ -434,7 +417,6 @@ private:
                                             std::shared_ptr<RegisterSignalsMessage>& data);
 
 
-    void cleanCommunication();
 };
 
 template<typename SentDataType, typename ReceivedDataType>
@@ -469,7 +451,7 @@ void Processor::addMethodHandler(const MethodID methodID,
                                  const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
 {
     if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
-        LOGE("Forbidden methodID: " << methodID);
+        LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
     }
 
@@ -477,7 +459,7 @@ void Processor::addMethodHandler(const MethodID methodID,
         Lock lock(mStateMutex);
 
         if (mSignalsCallbacks.count(methodID)) {
-            LOGE("MethodID used by a signal: " << methodID);
+            LOGE(mLogPrefix + "MethodID used by a signal: " << methodID);
             throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
         }
 
@@ -491,7 +473,7 @@ void Processor::addSignalHandler(const MethodID methodID,
                                  const typename SignalHandler<ReceivedDataType>::type& handler)
 {
     if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
-        LOGE("Forbidden methodID: " << methodID);
+        LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
     }
 
@@ -502,7 +484,7 @@ void Processor::addSignalHandler(const MethodID methodID,
 
         // Andd the signal handler:
         if (mMethodsCallbacks.count(methodID)) {
-            LOGE("MethodID used by a method: " << methodID);
+            LOGE(mLogPrefix + "MethodID used by a method: " << methodID);
             throw IPCException("MethodID used by a method: " + std::to_string(methodID));
         }
 
@@ -546,10 +528,9 @@ MessageID Processor::callAsync(const MethodID methodID,
                                const typename ResultHandler<ReceivedDataType>::type& process)
 {
     Lock lock(mStateMutex);
-    MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
-    mEventQueue.send(Event::CALL);
-
-    return messageID;
+    auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
+    mRequestQueue.push(Event::METHOD, request);
+    return request->messageID;
 }
 
 
@@ -581,24 +562,31 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
     };
 
     std::unique_lock<std::mutex> lock(mutex);
-    LOGT("Waiting for the response...");
+    LOGT(mLogPrefix + "Waiting for the response...");
     if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
-        LOGW("Probably a timeout in callSync. Checking...");
+        LOGW(mLogPrefix + "Probably a timeout in callSync. Checking...");
         bool isTimeout;
         {
             Lock lock(mStateMutex);
             // Call isn't sent or call is sent but there is no reply
-            isTimeout = mCalls.erase(messageID) || 1 == mReturnCallbacks.erase(messageID);
+            isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
+                return request.requestID == Event::METHOD &&
+                       request.get<MethodRequest>()->messageID == messageID;
+            })
+            || mRequestQueue.removeIf([messageID](Request & request) {
+                return request.requestID == Event::SIGNAL &&
+                       request.get<SignalRequest>()->messageID == messageID;
+            })
+            || 1 == mReturnCallbacks.erase(messageID);
         }
-
         if (isTimeout) {
-            LOGE("Function call timeout; methodID: " << methodID);
+            LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
             removePeer(peerFD);
             throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
         } else {
-            LOGW("Timeout started during the return value processing, so wait for it to finish");
+            LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
             if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
-                LOGE("Function call timeout; methodID: " << methodID);
+                LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
                 throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
             }
         }
@@ -616,16 +604,17 @@ void Processor::signal(const MethodID methodID,
     Lock lock(mStateMutex);
     const auto it = mSignalsPeers.find(methodID);
     if (it == mSignalsPeers.end()) {
-        LOGW("No peer is handling signal with methodID: " << methodID);
+        LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID);
         return;
     }
     for (const FileDescriptor peerFD : it->second) {
-        mCalls.push<SentDataType>(methodID, peerFD, data);
-        mEventQueue.send(Event::CALL);
+        auto request =  SignalRequest::create<SentDataType>(methodID, peerFD, data);
+        mRequestQueue.push(Event::SIGNAL, request);
     }
 }
 
 
+
 } // namespace ipc
 } // namespace vasum
 
diff --git a/common/ipc/internals/remove-peer-request.hpp b/common/ipc/internals/remove-peer-request.hpp
new file mode 100644 (file)
index 0000000..ec01ac4
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Processor's request to remove a peer
+ */
+
+#ifndef COMMON_IPC_INTERNALS_REMOVE_PEER_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_REMOVE_PEER_REQUEST_HPP
+
+#include "ipc/types.hpp"
+#include "ipc/internals/socket.hpp"
+#include <condition_variable>
+
+
+namespace vasum {
+namespace ipc {
+
+class RemovePeerRequest {
+public:
+    RemovePeerRequest(const RemovePeerRequest&) = delete;
+    RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
+
+    RemovePeerRequest(const FileDescriptor peerFD,
+                      const std::shared_ptr<std::condition_variable_any>& conditionPtr)
+        : peerFD(peerFD),
+          conditionPtr(conditionPtr)
+    {
+    }
+
+    FileDescriptor peerFD;
+    std::shared_ptr<std::condition_variable_any> conditionPtr;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_REMOVE_PEER_REQUEST_HPP
diff --git a/common/ipc/internals/request-queue.hpp b/common/ipc/internals/request-queue.hpp
new file mode 100644 (file)
index 0000000..35b5120
--- /dev/null
@@ -0,0 +1,161 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Managing the queue of messages carrying any kind of data
+ */
+
+#ifndef COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP
+#define COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP
+
+#include "ipc/exception.hpp"
+#include "ipc/internals/eventfd.hpp"
+#include "logger/logger.hpp"
+
+#include <list>
+#include <memory>
+#include <algorithm>
+
+namespace vasum {
+namespace ipc {
+
+/**
+* Class for managing a queue of Requests carrying any data
+*/
+template<typename RequestIdType>
+class RequestQueue {
+public:
+    RequestQueue() = default;
+
+    RequestQueue(const RequestQueue&) = delete;
+    RequestQueue& operator=(const RequestQueue&) = delete;
+
+    struct Request {
+        Request(const Request& other) = delete;
+        Request& operator=(const Request&) = delete;
+
+        Request(Request&&) = default;
+        Request(const RequestIdType requestID, const std::shared_ptr<void>& data)
+            : requestID(requestID),
+              data(data)
+        {}
+
+        template<typename DataType>
+        std::shared_ptr<DataType> get()
+        {
+            return std::static_pointer_cast<DataType>(data);
+        }
+
+        RequestIdType requestID;
+        std::shared_ptr<void> data;
+    };
+
+    /**
+     * @return event's file descriptor
+     */
+    int getFD() const;
+
+    /**
+     * @return is the queue empty
+     */
+    bool isEmpty() const;
+
+    /**
+     * Push data to the queue
+     *
+     * @param requestID request type
+     * @param data data corresponding to the request
+     */
+    void push(const RequestIdType requestID,
+              const std::shared_ptr<void>& data = nullptr);
+
+    /**
+     * @return get the data from the next request
+     */
+    Request pop();
+
+    /**
+     * Remove elements from the queue when the predicate returns true
+     *
+     * @param predicate condition
+     * @return was anything removed
+     */
+    template<typename Predicate>
+    bool removeIf(Predicate predicate);
+
+private:
+    std::list<Request> mRequests;
+    EventFD mEventFD;
+};
+
+template<typename RequestIdType>
+int RequestQueue<RequestIdType>::getFD() const
+{
+    return mEventFD.getFD();
+}
+
+template<typename RequestIdType>
+bool RequestQueue<RequestIdType>::isEmpty() const
+{
+    return mRequests.empty();
+}
+
+template<typename RequestIdType>
+void RequestQueue<RequestIdType>::push(const RequestIdType requestID,
+                                       const std::shared_ptr<void>& data)
+{
+    Request request(requestID, data);
+    mRequests.push_back(std::move(request));
+    mEventFD.send();
+}
+
+template<typename RequestIdType>
+typename RequestQueue<RequestIdType>::Request RequestQueue<RequestIdType>::pop()
+{
+    mEventFD.receive();
+    if (mRequests.empty()) {
+        LOGE("Request queue is empty");
+        throw IPCException("Request queue is empty");
+    }
+    Request request = std::move(mRequests.front());
+    mRequests.pop_front();
+    return request;
+}
+
+template<typename RequestIdType>
+template<typename Predicate>
+bool RequestQueue<RequestIdType>::removeIf(Predicate predicate)
+{
+    auto it = std::find_if(mRequests.begin(), mRequests.end(), predicate);
+    if (it == mRequests.end()) {
+        return false;
+    }
+
+    do {
+        it = mRequests.erase(it);
+        it = std::find_if(it, mRequests.end(), predicate);
+    } while (it != mRequests.end());
+
+    return true;
+}
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP
diff --git a/common/ipc/internals/signal-request.hpp b/common/ipc/internals/signal-request.hpp
new file mode 100644 (file)
index 0000000..4cf62c2
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Processor's request to send a signal
+ */
+
+#ifndef COMMON_IPC_INTERNALS_SIGNAL_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_SIGNAL_REQUEST_HPP
+
+#include "ipc/types.hpp"
+#include "config/manager.hpp"
+#include "logger/logger-scope.hpp"
+
+namespace vasum {
+namespace ipc {
+
+class SignalRequest {
+public:
+    SignalRequest(const SignalRequest&) = delete;
+    SignalRequest& operator=(const SignalRequest&) = delete;
+
+
+
+    template<typename SentDataType>
+    static std::shared_ptr<SignalRequest> create(const MethodID methodID,
+                                                 const FileDescriptor peerFD,
+                                                 const std::shared_ptr<SentDataType>& data);
+
+    MethodID methodID;
+    FileDescriptor peerFD;
+    MessageID messageID;
+    std::shared_ptr<void> data;
+    SerializeCallback serialize;
+
+private:
+    SignalRequest(const MethodID methodID, const FileDescriptor peerFD)
+        : methodID(methodID),
+          peerFD(peerFD),
+          messageID(getNextMessageID())
+    {}
+
+};
+
+template<typename SentDataType>
+std::shared_ptr<SignalRequest> SignalRequest::create(const MethodID methodID,
+                                                     const FileDescriptor peerFD,
+                                                     const std::shared_ptr<SentDataType>& data)
+{
+    std::shared_ptr<SignalRequest> request(new SignalRequest(methodID, peerFD));
+
+    request->data = data;
+
+    request->serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+        LOGS("Signal serialize, peerFD: " << fd);
+        config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
+    };
+
+    return request;
+}
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_SIGNAL_REQUEST_HPP
index ef46346..b96bcd4 100644 (file)
@@ -36,7 +36,7 @@ namespace ipc {
 Service::Service(const std::string& socketPath,
                  const PeerCallback& addPeerCallback,
                  const PeerCallback& removePeerCallback)
-    : mProcessor(addPeerCallback, removePeerCallback),
+    : mProcessor("[SERVICE] ", addPeerCallback, removePeerCallback),
       mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1))
 
 {
@@ -53,14 +53,16 @@ Service::~Service()
     }
 }
 
-void Service::start()
+void Service::start(const bool usesExternalPolling)
 {
     LOGS("Service start");
-    mProcessor.start();
+    mProcessor.start(usesExternalPolling);
 
     // There can be an incoming connection from mAcceptor before mProcessor is listening,
     // but it's OK. It will handle the connection when ready. So no need to wait for mProcessor.
-    mAcceptor.start();
+    if (!usesExternalPolling) {
+        mAcceptor.start();
+    }
 }
 
 bool Service::isStarted()
index fa12e30..9392a42 100644 (file)
@@ -61,8 +61,10 @@ public:
 
     /**
      * Starts the worker and acceptor threads
+     *
+     * @param usesExternalPolling internal or external polling is used
      */
-    void start();
+    void start(const bool usesExternalPolling = false);
 
     /**
     * @return is the communication thread running
index fa57648..ba4c1c4 100644 (file)
 #include "ipc/types.hpp"
 #include "logger/logger.hpp"
 
+#include <atomic>
 
 namespace vasum {
 namespace ipc {
 
+namespace {
+std::atomic<MessageID> gLastMessageID(0);
+} // namespace
+
+MessageID getNextMessageID()
+{
+    return ++gLastMessageID;
+}
+
 std::string toString(const Status status)
 {
     switch (status) {
index 5132911..10b87df 100644 (file)
@@ -39,6 +39,8 @@ typedef unsigned int MethodID;
 typedef unsigned int MessageID;
 
 typedef std::function<void(FileDescriptor)> PeerCallback;
+typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
+typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
 
 enum class Status : int {
     OK = 0,
@@ -53,6 +55,8 @@ enum class Status : int {
 
 std::string toString(const Status status);
 void throwOnError(const Status status);
+MessageID getNextMessageID();
+
 
 template<typename SentDataType, typename ReceivedDataType>
 struct MethodHandler {
index 7c9df6c..caf59d2 100644 (file)
@@ -63,7 +63,7 @@ const int TIMEOUT = 1000 /*ms*/;
 const int SHORT_OPERATION_TIME = TIMEOUT / 100;
 
 // Time that will cause "TIMEOUT" methods to throw
-const int LONG_OPERATION_TIME = 500 + TIMEOUT;
+const int LONG_OPERATION_TIME = 1000 + TIMEOUT;
 
 struct Fixture {
     std::string socketPath;
@@ -204,7 +204,7 @@ std::pair<FileDescriptor, IPCGSource::Pointer> connectServiceGSource(Service& s,
     // TODO: On timeout remove the callback
     s.setNewPeerCallback(newPeerCallback);
     s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, ipcGSourcePtr, _1));
-
+    s.start(true);
     // Service starts to process
     ipcGSourcePtr->attach();
 
@@ -239,7 +239,7 @@ std::pair<FileDescriptor, IPCGSource::Pointer> connectClientGSource(Service& s,
     }
 
 
-    c.connect();
+    c.start(true);
     IPCGSource::Pointer ipcGSourcePtr = IPCGSource::create(c.getFDs(),
                                                            std::bind(&Client::handle, &c, _1, _2));