IPC: External polling loop with a Client 52/32052/12
authorJan Olszak <j.olszak@samsung.com>
Tue, 16 Dec 2014 13:09:33 +0000 (14:09 +0100)
committerJan Olszak <j.olszak@samsung.com>
Wed, 7 Jan 2015 11:00:37 +0000 (12:00 +0100)
[Bug/Feature]  Using GMainLoop with a Client is possible.
               Fixed some buggs
[Cause]        N/A
[Solution]     N/A
[Verification] Build, install, run tests

Change-Id: Iab3350b400739bb951d84e0d6b7de15d0cccf1d3

15 files changed:
common/ipc/client.cpp
common/ipc/client.hpp
common/ipc/internals/acceptor.cpp
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp
common/ipc/ipc-gsource.cpp
common/ipc/ipc-gsource.hpp
common/ipc/service.cpp
common/utils/latch.hpp
common/utils/signal.cpp [new file with mode: 0644]
common/utils/signal.hpp [new file with mode: 0644]
server/CMakeLists.txt
tests/unit_tests/CMakeLists.txt
tests/unit_tests/ipc/ut-ipc.cpp
zone-daemon/CMakeLists.txt

index 835020d..3187f15 100644 (file)
@@ -48,16 +48,21 @@ Client::~Client()
     LOGD("Destroyed client");
 }
 
-void Client::start()
+void Client::connect()
 {
-    LOGD("Starting client...");
-
     // Initialize the connection with the server
     LOGD("Connecting to " + mSocketPath);
     auto socketPtr = std::make_shared<Socket>(Socket::connectSocket(mSocketPath));
     mServiceFD = mProcessor.addPeer(socketPtr);
+}
+
+void Client::start()
+{
+    LOGD("Starting client...");
 
-    // Start listening
+    connect();
+
+    // Start polling thread
     mProcessor.start();
 
     LOGD("Started client");
@@ -75,6 +80,31 @@ void Client::stop()
     LOGD("Stopped");
 }
 
+std::vector<FileDescriptor> Client::getFDs()
+{
+    std::vector<FileDescriptor> fds;
+    fds.push_back(mProcessor.getEventFD());
+    fds.push_back(mServiceFD);
+
+    return fds;
+}
+
+void Client::handle(const FileDescriptor fd, const short pollEvent)
+{
+    if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) {
+        mProcessor.handleEvent();
+        return;
+
+    } else if (pollEvent & POLLIN) {
+        mProcessor.handleInput(fd);
+        return;
+
+    } else if (pollEvent & POLLHUP) {
+        mProcessor.handleLostConnection(fd);
+        return;
+    }
+}
+
 void Client::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
     mProcessor.setNewPeerCallback(newPeerCallback);
index 5751812..b5b00e5 100644 (file)
@@ -55,6 +55,13 @@ public:
     Client& operator=(const Client&) = delete;
 
     /**
+     * Places a connection request in the internal event queue.
+     *
+     * Used with an external polling loop.
+     */
+    void connect();
+
+    /**
      * Starts the worker thread
      */
     void start();
@@ -70,6 +77,23 @@ public:
     void stop();
 
     /**
+    * Used with an external polling loop
+    *
+    * @return vector of internal file descriptors
+    */
+    std::vector<FileDescriptor> getFDs();
+
+    /**
+     * Used with an external polling loop.
+     * Handles one event from the file descriptor.
+     *
+     * @param fd file descriptor
+     * @param pollEvent event on the fd. Defined in poll.h
+     *
+     */
+    void handle(const FileDescriptor fd, const short pollEvent);
+
+    /**
      * Set the callback called for each new connection to a peer
      *
      * @param newPeerCallback the callback
index 1eab1c2..627e1fe 100644 (file)
@@ -67,12 +67,13 @@ void Acceptor::start()
 void Acceptor::stop()
 {
     LOGT("Stopping Acceptor");
+
     if (mThread.joinable()) {
-        LOGT("Event::FINISH -> Acceptor");
         mEventQueue.send(Event::FINISH);
         LOGT("Waiting for Acceptor to finish");
         mThread.join();
     }
+
     LOGT("Stopped Acceptor");
 }
 
index 72a1788..22aa38e 100644 (file)
 #include "ipc/exception.hpp"
 #include "ipc/internals/processor.hpp"
 #include "ipc/internals/utils.hpp"
+#include "utils/signal.hpp"
 
 #include <cerrno>
 #include <cstring>
+#include <csignal>
 #include <stdexcept>
 
 #include <sys/socket.h>
@@ -58,8 +60,9 @@ Processor::Processor(const PeerCallback& newPeerCallback,
       mMaxNumberOfPeers(maxNumberOfPeers)
 {
     LOGT("Creating Processor");
-    using namespace std::placeholders;
 
+    utils::signalBlock(SIGPIPE);
+    using namespace std::placeholders;
     addMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
                                                                 std::bind(&Processor::onNewSignals, this, _1, _2));
 
@@ -73,6 +76,7 @@ Processor::~Processor()
     } catch (IPCException& e) {
         LOGE("Error in Processor's destructor: " << e.what());
     }
+
     LOGT("Destroyed Processor");
 }
 
@@ -93,10 +97,12 @@ void Processor::start()
 void Processor::stop()
 {
     LOGT("Stopping Processor");
+
     if (isStarted()) {
         mEventQueue.send(Event::FINISH);
         mThread.join();
     }
+
     LOGT("Stopped Processor");
 }
 
@@ -167,7 +173,10 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
     LOGW("Removing peer. ID: " << peerFD);
     {
         Lock lock(mSocketsMutex);
-        mSockets.erase(peerFD);
+        if (!mSockets.erase(peerFD)) {
+            LOGW("No such peer. Another thread called removePeerInternal");
+            return;
+        }
 
         // Remove from signal addressees
         for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
@@ -269,8 +278,6 @@ void Processor::run()
         }
 
     }
-
-    cleanCommunication();
 }
 
 bool Processor::handleLostConnections()
@@ -327,6 +334,8 @@ bool Processor::handleInput(const FileDescriptor peerFD)
 
     std::shared_ptr<Socket> socketPtr;
     try {
+        // Get the peer's socket
+        Lock lock(mSocketsMutex);
         socketPtr = mSockets.at(peerFD);
     } catch (const std::out_of_range&) {
         LOGE("No such peer: " << peerFD);
@@ -497,7 +506,10 @@ bool Processor::handleEvent()
     switch (mEventQueue.receive()) {
     case Event::FINISH: {
         LOGD("Event FINISH");
+
         mIsRunning = false;
+        cleanCommunication();
+
         return false;
     }
 
@@ -607,6 +619,15 @@ bool Processor::onCall()
     LOGT("Handle call (from another thread) to send a message.");
     CallQueue::Call call = getCall();
 
+    if (call.parse && call.process) {
+        return onMethodCall(call);
+    } else {
+        return onSignalCall(call);
+    }
+}
+
+bool Processor::onSignalCall(CallQueue::Call& call)
+{
     std::shared_ptr<Socket> socketPtr;
     try {
         // Get the peer's socket
@@ -614,11 +635,43 @@ bool Processor::onCall()
         socketPtr = mSockets.at(call.peerFD);
     } catch (const std::out_of_range&) {
         LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
+        return false;
+    }
+
+    try {
+        // Send the call with the socket
+        Socket::Guard guard = socketPtr->getGuard();
+        socketPtr->write(&call.methodID, sizeof(call.methodID));
+        socketPtr->write(&call.messageID, sizeof(call.messageID));
+        call.serialize(socketPtr->getFD(), call.data);
+    } catch (const std::exception& e) {
+        LOGE("Error during sending a signal: " << e.what());
+
+        removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
+        return true;
+    }
+
+    return false;
+
+}
+
+bool Processor::onMethodCall(CallQueue::Call& call)
+{
+    std::shared_ptr<Socket> socketPtr;
+    try {
+        // Get the peer's socket
+        Lock lock(mSocketsMutex);
+        socketPtr = mSockets.at(call.peerFD);
+    } catch (const std::out_of_range&) {
+        LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
+
+        // Pass the error to the processing callback
         IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
+
         return false;
     }
 
-    if (call.parse && call.process) {
+    {
         // Set what to do with the return message, but only if needed
         Lock lock(mReturnCallbacksMutex);
         if (mReturnCallbacks.count(call.messageID) != 0) {
@@ -636,9 +689,9 @@ bool Processor::onCall()
         socketPtr->write(&call.messageID, sizeof(call.messageID));
         call.serialize(socketPtr->getFD(), call.data);
     } catch (const std::exception& e) {
-        LOGE("Error during sending a message: " << e.what());
+        LOGE("Error during sending a method: " << e.what());
 
-        // Inform about the error
+        // Inform about the error,
         IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data));
 
         {
index d33d12b..728b8d2 100644 (file)
@@ -76,6 +76,11 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
 *  - callbacks for serialization/parsing
 *  - store Sockets in a vector, maybe SocketStore?
 *  - fix valgrind tests
+*  - poll loop outside.
+*  - waiting till the EventQueue is empty before leaving stop()
+*  - no new events added after stop() called
+*  - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
+*    there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
 *
 *
 */
@@ -414,6 +419,8 @@ private:
 
     void run();
     bool onCall();
+    bool onSignalCall(CallQueue::Call& call);
+    bool onMethodCall(CallQueue::Call& call);
     bool onNewPeer();
     bool onRemovePeer();
     bool handleLostConnections();
index f5cdbb5..4c098d9 100644 (file)
@@ -57,12 +57,10 @@ IPCGSource::IPCGSource(const std::vector<FileDescriptor> fds,
 IPCGSource::~IPCGSource()
 {
     LOGD("Destroying IPCGSource");
-    g_source_destroy(&mGSource);
-
 }
 
-IPCGSource* IPCGSource::create(const std::vector<FileDescriptor>& fds,
-                               const HandlerCallback& handlerCallback)
+IPCGSource::Pointer IPCGSource::create(const std::vector<FileDescriptor>& fds,
+                                       const HandlerCallback& handlerCallback)
 {
     LOGD("Creating IPCGSource");
 
@@ -80,9 +78,19 @@ IPCGSource* IPCGSource::create(const std::vector<FileDescriptor>& fds,
 
     // Fill additional data
     IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
-    return new(source) IPCGSource(fds, handlerCallback);
-}
+    new(source)IPCGSource(fds, handlerCallback);
+
+    auto deleter = [](IPCGSource * ptr) {
+        LOGD("Deleter");
+
+        if (!g_source_is_destroyed(&(ptr->mGSource))) {
+            // This way finalize method will be run in glib loop's thread
+            g_source_destroy(&(ptr->mGSource));
+        }
+    };
 
+    return std::shared_ptr<IPCGSource>(source, deleter);
+}
 
 void IPCGSource::addFD(const FileDescriptor fd)
 {
@@ -119,7 +127,9 @@ void IPCGSource::removeFD(const FileDescriptor fd)
 guint IPCGSource::attach(GMainContext* context)
 {
     LOGD("Attaching to GMainContext");
-    return g_source_attach(&mGSource, context);
+    guint ret = g_source_attach(&mGSource, context);
+    g_source_unref(&mGSource);
+    return ret;
 }
 
 gboolean IPCGSource::prepare(GSource* gSource, gint* timeout)
@@ -148,6 +158,11 @@ gboolean IPCGSource::dispatch(GSource* gSource,
                               GSourceFunc /*callback*/,
                               gpointer /*userData*/)
 {
+    if (!gSource || g_source_is_destroyed(gSource)) {
+        // Remove the GSource from the GMainContext
+        return FALSE;
+    }
+
     IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
 
     for (const FDInfo fdInfo : source->mFDInfos) {
@@ -157,7 +172,7 @@ gboolean IPCGSource::dispatch(GSource* gSource,
         }
     }
 
-    return TRUE; // Don't remove the GSource from the GMainContext
+    return TRUE;
 }
 
 void  IPCGSource::finalize(GSource* gSource)
index 96e0a1a..bb9a096 100644 (file)
@@ -30,7 +30,7 @@
 
 #include "ipc/service.hpp"
 #include "ipc/types.hpp"
-#include "utils/callback-wrapper.hpp"
+
 #include <memory>
 
 
@@ -43,10 +43,17 @@ namespace ipc {
  *
  * It's supposed to be constructed ONLY with the static create method
  * and destructed in a glib callback.
+ *
+ * TODO:
+ * - waiting till the managed object (Client or Service) is destroyed
+ *   before IPCGSource stops operating. For now programmer has to ensure this.
  */
 struct IPCGSource {
 public:
     typedef std::function<void(FileDescriptor fd, short pollEvent)> HandlerCallback;
+    typedef std::shared_ptr<IPCGSource> Pointer;
+
+    ~IPCGSource();
 
     IPCGSource() = delete;
     IPCGSource(const IPCGSource&) = delete;
@@ -83,8 +90,8 @@ public:
      *
      * @return pointer to the IPCGSource
      */
-    static IPCGSource* create(const std::vector<FileDescriptor>& fds,
-                              const HandlerCallback& handlerCallback);
+    static Pointer create(const std::vector<FileDescriptor>& fds,
+                          const HandlerCallback& handlerCallback);
 
 private:
 
@@ -116,9 +123,6 @@ private:
     IPCGSource(const std::vector<FileDescriptor> fds,
                const HandlerCallback& handlerCallback);
 
-    // Called only from IPCGSource::finalize
-    ~IPCGSource();
-
     struct FDInfo {
         FDInfo(gpointer tag, FileDescriptor fd)
             : tag(tag), fd(fd) {}
index be95cee..5e720d6 100644 (file)
@@ -91,15 +91,15 @@ std::vector<FileDescriptor> Service::getFDs()
 
 void Service::handle(const FileDescriptor fd, const short pollEvent)
 {
-    if (fd == mProcessor.getEventFD() && pollEvent & POLLIN) {
+    if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) {
         mProcessor.handleEvent();
         return;
 
-    } else if (fd == mAcceptor.getConnectionFD() && pollEvent & POLLIN) {
+    } else if (fd == mAcceptor.getConnectionFD() && (pollEvent & POLLIN)) {
         mAcceptor.handleConnection();
         return;
 
-    } else if (fd == mAcceptor.getEventFD() && pollEvent & POLLIN) {
+    } else if (fd == mAcceptor.getEventFD() && (pollEvent & POLLIN)) {
         mAcceptor.handleEvent();
         return;
 
index 1fa773b..7ef1dd7 100644 (file)
@@ -54,7 +54,7 @@ public:
     void wait();
 
     /**
-     * Waits for a single occurence of event with timeout.
+     * Waits for a single occurrence of event with timeout.
      *
      * @param timeoutMs  timeout in ms to wait for
      * @return           false on timeout
@@ -64,14 +64,14 @@ public:
     /**
      * Waits for @ref n occurrences of event.
      *
-     * @param n  number of occurences to wait for
+     * @param n  number of occurrences to wait for
      */
     void waitForN(const unsigned int n);
 
     /**
-     * Waits for @ref n occurences of event with timeout.
+     * Waits for @ref n occurrences of event with timeout.
      *
-     * @param n          number of occurences to wait for
+     * @param n          number of occurrences to wait for
      * @param timeoutMs  timeout in ms to wait for
      * @return           false on timeout
      */
diff --git a/common/utils/signal.cpp b/common/utils/signal.cpp
new file mode 100644 (file)
index 0000000..39e7fca
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Jan Olszak <j.olszak@samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Signal related functions
+ */
+
+#include "utils/signal.hpp"
+#include "utils/exception.hpp"
+#include "logger/logger.hpp"
+
+#include <string>
+#include <cerrno>
+#include <cstring>
+#include <csignal>
+
+namespace vasum {
+namespace utils {
+
+void signalBlock(const int signalToBlock)
+{
+    ::sigset_t set;
+    if (-1 == ::sigemptyset(&set)) {
+        LOGE("Error in sigemptyset: " << std::string(strerror(errno)));
+        UtilsException("Error in sigemptyset: " + std::string(strerror(errno)));
+    }
+
+    if (-1 ==::sigaddset(&set, signalToBlock)) {
+        LOGE("Error in sigaddset: " << std::string(strerror(errno)));
+        UtilsException("Error in sigaddset: " + std::string(strerror(errno)));
+    }
+
+    int ret = ::pthread_sigmask(SIG_BLOCK, &set, nullptr /*&oldSet*/);
+    if (ret != 0) {
+        LOGE("Error in pthread_sigmask: " << std::to_string(ret));
+        UtilsException("Error in pthread_sigmask: " + std::to_string(ret));
+    }
+}
+
+} // namespace utils
+} // namespace vasum
+
+
+
+
+
diff --git a/common/utils/signal.hpp b/common/utils/signal.hpp
new file mode 100644 (file)
index 0000000..f26e365
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Jan Olszak <j.olszak@samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Signal related functions
+ */
+
+#ifndef COMMON_UTILS_SIGNAL_HPP
+#define COMMON_UTILS_SIGNAL_HPP
+
+namespace vasum {
+namespace utils {
+
+void signalBlock(const int signalsToBlock);
+
+} // namespace utils
+} // namespace vasum
+
+
+#endif // COMMON_UTILS_SIGNAL_HPP
index adfef3e..79ced75 100644 (file)
@@ -36,8 +36,13 @@ PKG_CHECK_MODULES(SERVER_DEPS REQUIRED lxc json gio-2.0 libsystemd-journal libsy
 INCLUDE_DIRECTORIES(${COMMON_FOLDER})
 INCLUDE_DIRECTORIES(${CLIENT_FOLDER})
 INCLUDE_DIRECTORIES(SYSTEM ${SERVER_DEPS_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS})
-TARGET_LINK_LIBRARIES(${SERVER_CODENAME} ${SERVER_DEPS_LIBRARIES} ${Boost_LIBRARIES})
 
+SET_TARGET_PROPERTIES(${SERVER_CODENAME} PROPERTIES
+    COMPILE_FLAGS "-pthread"
+    LINK_FLAGS "-pthread"
+)
+
+TARGET_LINK_LIBRARIES(${SERVER_CODENAME} ${SERVER_DEPS_LIBRARIES} ${Boost_LIBRARIES})
 
 ## Subdirectories ##############################################################
 ADD_SUBDIRECTORY(configs)
index c0cc2a8..8b66b09 100644 (file)
@@ -40,6 +40,12 @@ PKG_CHECK_MODULES(UT_SERVER_DEPS REQUIRED lxc json gio-2.0 libsystemd-daemon
                   libsystemd-journal libcap-ng libLogger libSimpleDbus libConfig)
 INCLUDE_DIRECTORIES(${COMMON_FOLDER} ${SERVER_FOLDER} ${UNIT_TESTS_FOLDER} ${CLIENT_FOLDER})
 INCLUDE_DIRECTORIES(SYSTEM ${UT_SERVER_DEPS_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS})
+
+SET_TARGET_PROPERTIES(${UT_SERVER_CODENAME} PROPERTIES
+    COMPILE_FLAGS "-pthread"
+    LINK_FLAGS "-pthread"
+)
+
 TARGET_LINK_LIBRARIES(${UT_SERVER_CODENAME} ${UT_SERVER_DEPS_LIBRARIES} ${Boost_LIBRARIES})
 
 
index 9ce131d..a7f8645 100644 (file)
@@ -45,6 +45,7 @@
 #include <string>
 #include <thread>
 #include <chrono>
+#include <utility>
 #include <boost/filesystem.hpp>
 
 using namespace vasum;
@@ -136,7 +137,7 @@ std::shared_ptr<SendData> longEchoCallback(const FileDescriptor, std::shared_ptr
     return data;
 }
 
-FileDescriptor connect(Service& s, Client& c, bool serviceUsesGlib = false)
+FileDescriptor connect(Service& s, Client& c)
 {
     // Connects the Client to the Service and returns Clients FileDescriptor
     std::mutex mutex;
@@ -149,41 +150,102 @@ FileDescriptor connect(Service& s, Client& c, bool serviceUsesGlib = false)
         cv.notify_all();
     };
 
+    // TODO: On timeout remove the callback
+    s.setNewPeerCallback(newPeerCallback);
+
+    if (!s.isStarted()) {
+        s.start();
+    }
+
+    c.start();
+
+
+    std::unique_lock<std::mutex> lock(mutex);
+    BOOST_REQUIRE(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() {
+        return peerFD != 0;
+    }));
+
+    return peerFD;
+}
+
 
-    if (!serviceUsesGlib) {
-        s.setNewPeerCallback(newPeerCallback);
 
-        if (!s.isStarted()) {
-            s.start();
-        }
-    } else {
 #if GLIB_CHECK_VERSION(2,36,0)
 
-        IPCGSource* serviceGSourcePtr = IPCGSource::create(s.getFDs(), std::bind(&Service::handle, &s, _1, _2));
+std::pair<FileDescriptor, IPCGSource::Pointer> connectServiceGSource(Service& s, Client& c)
+{
+    std::mutex mutex;
+    std::condition_variable cv;
 
-        auto agregateCallback = [&newPeerCallback, &serviceGSourcePtr](const FileDescriptor newFD) {
-            serviceGSourcePtr->addFD(newFD);
-            newPeerCallback(newFD);
-        };
+    FileDescriptor peerFD = 0;
+    IPCGSource::Pointer ipcGSourcePtr = IPCGSource::create(s.getFDs(), std::bind(&Service::handle, &s, _1, _2));
 
-        s.setNewPeerCallback(agregateCallback);
-        s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, serviceGSourcePtr, _1));
+    auto newPeerCallback = [&cv, &peerFD, &mutex, ipcGSourcePtr](const FileDescriptor newFD) {
+        if (ipcGSourcePtr) {
+            //TODO: Remove this if
+            ipcGSourcePtr->addFD(newFD);
+        }
+        std::unique_lock<std::mutex> lock(mutex);
+        peerFD = newFD;
+        cv.notify_all();
+    };
 
-        serviceGSourcePtr->attach();
-#endif // GLIB_CHECK_VERSION
 
-    }
+    // TODO: On timeout remove the callback
+    s.setNewPeerCallback(newPeerCallback);
+    s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, ipcGSourcePtr, _1));
+
+    // Service starts to process
+    ipcGSourcePtr->attach();
 
     c.start();
 
     std::unique_lock<std::mutex> lock(mutex);
-    BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() {
+    BOOST_REQUIRE(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() {
         return peerFD != 0;
     }));
 
-    return peerFD;
+    return std::make_pair(peerFD, ipcGSourcePtr);
 }
 
+std::pair<FileDescriptor, IPCGSource::Pointer> connectClientGSource(Service& s, Client& c)
+{
+    // Connects the Client to the Service and returns Clients FileDescriptor
+    std::mutex mutex;
+    std::condition_variable cv;
+
+    FileDescriptor peerFD = 0;
+    auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFD) {
+        std::unique_lock<std::mutex> lock(mutex);
+        peerFD = newFD;
+        cv.notify_all();
+    };
+    // TODO: On timeout remove the callback
+    s.setNewPeerCallback(newPeerCallback);
+
+    if (!s.isStarted()) {
+        // Service starts to process
+        s.start();
+    }
+
+
+    c.connect();
+    IPCGSource::Pointer ipcGSourcePtr = IPCGSource::create(c.getFDs(),
+                                                           std::bind(&Client::handle, &c, _1, _2));
+
+    ipcGSourcePtr->attach();
+
+    std::unique_lock<std::mutex> lock(mutex);
+    BOOST_REQUIRE(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() {
+        return peerFD != 0;
+    }));
+
+    return std::make_pair(peerFD, ipcGSourcePtr);
+}
+
+#endif // GLIB_CHECK_VERSION
+
+
 void testEcho(Client& c, const MethodID methodID)
 {
     std::shared_ptr<SendData> sentData(new SendData(34));
@@ -194,7 +256,7 @@ void testEcho(Client& c, const MethodID methodID)
 void testEcho(Service& s, const MethodID methodID, const FileDescriptor peerFD)
 {
     std::shared_ptr<SendData> sentData(new SendData(56));
-    std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(methodID, peerFD, sentData);
+    std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(methodID, peerFD, sentData, 1000);
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
@@ -481,17 +543,16 @@ BOOST_AUTO_TEST_CASE(ReadTimeout)
 {
     Service s(socketPath);
     auto longEchoCallback = [](const FileDescriptor, std::shared_ptr<SendData>& data) {
-        return std::shared_ptr<LongSendData>(new LongSendData(data->intVal));
+        return std::shared_ptr<LongSendData>(new LongSendData(data->intVal, 4000 /*ms*/));
     };
     s.addMethodHandler<LongSendData, SendData>(1, longEchoCallback);
-    s.start();
 
     Client c(socketPath);
-    c.start();
+    connect(s, c);
 
     // Test timeout on read
     std::shared_ptr<SendData> sentData(new SendData(334));
-    BOOST_CHECK_THROW((c.callSync<SendData, SendData>(1, sentData, 100)), IPCException);
+    BOOST_CHECK_THROW((c.callSync<SendData, SendData>(1, sentData, 10)), IPCException);
 }
 
 
@@ -587,12 +648,15 @@ BOOST_AUTO_TEST_CASE(ServiceGSource)
         isSignalCalled = true;
     };
 
+    IPCGSource::Pointer serviceGSource;
     Service s(socketPath);
     s.addMethodHandler<SendData, SendData>(1, echoCallback);
 
     Client c(socketPath);
     s.addSignalHandler<SendData>(2, signalHandler);
-    connect(s, c, true);
+
+    auto ret = connectServiceGSource(s, c);
+    serviceGSource = ret.second;
 
     testEcho(c, 1);
 
@@ -603,6 +667,36 @@ BOOST_AUTO_TEST_CASE(ServiceGSource)
     BOOST_CHECK(isSignalCalled);
 }
 
+BOOST_AUTO_TEST_CASE(ClientGSource)
+{
+    ScopedGlibLoop loop;
+
+    std::atomic_bool isSignalCalled(false);
+    auto signalHandler = [&isSignalCalled](const FileDescriptor, std::shared_ptr<SendData>&) {
+        isSignalCalled = true;
+    };
+
+    Service s(socketPath);
+    s.start();
+
+    IPCGSource::Pointer clientGSource;
+    Client c(socketPath);
+    c.addMethodHandler<SendData, SendData>(1, echoCallback);
+    c.addSignalHandler<SendData>(2, signalHandler);
+
+    auto ret = connectClientGSource(s, c);
+    FileDescriptor peerFD = ret.first;
+    clientGSource = ret.second;
+
+    testEcho(s, 1, peerFD);
+
+    auto data = std::make_shared<SendData>(1);
+    s.signal<SendData>(2, data);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(100)); //TODO wait_for
+    BOOST_CHECK(isSignalCalled);
+}
+
 #endif // GLIB_CHECK_VERSION
 
 // BOOST_AUTO_TEST_CASE(ConnectionLimitTest)
index 50bfa1a..7baf37f 100644 (file)
@@ -37,6 +37,12 @@ PKG_CHECK_MODULES(ZONE_DAEMON_DEPS REQUIRED gio-2.0 libsystemd-journal libcap-ng
                                                  libLogger libSimpleDbus libConfig)
 INCLUDE_DIRECTORIES(${COMMON_FOLDER})
 INCLUDE_DIRECTORIES(SYSTEM ${ZONE_DAEMON_DEPS_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS})
+
+SET_TARGET_PROPERTIES(${ZONE_DAEMON_CODENAME} PROPERTIES
+    COMPILE_FLAGS "-pthread"
+    LINK_FLAGS "-pthread"
+)
+
 TARGET_LINK_LIBRARIES(${ZONE_DAEMON_CODENAME} ${ZONE_DAEMON_DEPS_LIBRARIES} ${Boost_LIBRARIES})