IPC: Support for the external polling loop in Service 57/31957/9
authorJan Olszak <j.olszak@samsung.com>
Mon, 8 Dec 2014 15:48:21 +0000 (16:48 +0100)
committerJan Olszak <j.olszak@samsung.com>
Tue, 16 Dec 2014 09:40:30 +0000 (10:40 +0100)
[Bug/Feature]   Using GMainLoop is possible
[Cause]         N/A
[Solution]      For glib > v.2.36
[Verification]  Build, install, run tests

Change-Id: Ic6d74688c322dd79b29195d94658a4f2ffe0aa83

common/ipc/internals/acceptor.cpp
common/ipc/internals/acceptor.hpp
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp
common/ipc/internals/utils.cpp
common/ipc/ipc-gsource.cpp [new file with mode: 0644]
common/ipc/ipc-gsource.hpp [new file with mode: 0644]
common/ipc/service.cpp
common/ipc/service.hpp
common/ipc/types.cpp
tests/unit_tests/ipc/ut-ipc.cpp

index 3a6c4cd..1eab1c2 100644 (file)
 #include "config.hpp"
 
 #include "ipc/exception.hpp"
-#include "ipc/internals/utils.hpp"
 #include "ipc/internals/acceptor.hpp"
 #include "logger/logger.hpp"
 
 #include <poll.h>
 #include <cerrno>
 #include <cstring>
-#include <chrono>
 #include <vector>
 
 namespace vasum {
 namespace ipc {
 
 Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback)
-    : mNewConnectionCallback(newConnectionCallback),
+    : mIsRunning(false),
+      mNewConnectionCallback(newConnectionCallback),
       mSocket(Socket::createSocket(socketPath))
 {
     LOGT("Creating Acceptor for socket " << socketPath);
@@ -88,9 +87,8 @@ void Acceptor::run()
     fds[1].fd = mSocket.getFD();
     fds[1].events = POLLIN;
 
-    // Main loop
-    bool isRunning = true;
-    while (isRunning) {
+    mIsRunning = true;
+    while (mIsRunning) {
         LOGT("Waiting for new connections...");
 
         int ret = ::poll(fds.data(), fds.size(), -1 /*blocking call*/);
@@ -108,23 +106,41 @@ void Acceptor::run()
         // Check for incoming connections
         if (fds[1].revents & POLLIN) {
             fds[1].revents = 0;
-            std::shared_ptr<Socket> tmpSocket = mSocket.accept();
-            mNewConnectionCallback(tmpSocket);
+            handleConnection();
         }
 
         // Check for incoming events
         if (fds[0].revents & POLLIN) {
             fds[0].revents = 0;
-
-            if (mEventQueue.receive() == Event::FINISH) {
-                LOGD("Event FINISH");
-                isRunning = false;
-                break;
-            }
+            handleEvent();
         }
     }
     LOGT("Exiting run");
 }
 
+void Acceptor::handleConnection()
+{
+    std::shared_ptr<Socket> tmpSocket = mSocket.accept();
+    mNewConnectionCallback(tmpSocket);
+}
+
+void Acceptor::handleEvent()
+{
+    if (mEventQueue.receive() == Event::FINISH) {
+        LOGD("Event FINISH");
+        mIsRunning = false;
+    }
+}
+
+FileDescriptor Acceptor::getEventFD()
+{
+    return mEventQueue.getFD();
+}
+
+FileDescriptor Acceptor::getConnectionFD()
+{
+    return mSocket.getFD();
+}
+
 } // namespace ipc
 } // namespace vasum
index 702a161..f87a0bb 100644 (file)
@@ -29,6 +29,7 @@
 
 #include "ipc/internals/socket.hpp"
 #include "ipc/internals/event-queue.hpp"
+#include "ipc/types.hpp"
 
 #include <string>
 #include <thread>
@@ -67,11 +68,35 @@ public:
      */
     void stop();
 
+    /**
+     * Handle one incoming connection.
+     * Used with external polling
+     */
+    void handleConnection();
+
+    /**
+     * Handle one event from the internal event's queue
+     * Used with external polling
+     */
+    void handleEvent();
+
+    /**
+     * @return file descriptor of internal event's queue
+     */
+    FileDescriptor getEventFD();
+
+    /**
+     * @return file descriptor for the connection socket
+     */
+    FileDescriptor getConnectionFD();
+
 private:
     enum class Event : int {
         FINISH  // Shutdown request
     };
 
+    bool mIsRunning;
+
     NewConnectionCallback mNewConnectionCallback;
     Socket mSocket;
 
index be1060d..72a1788 100644 (file)
@@ -112,6 +112,11 @@ void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
     mRemovedPeerCallback = removedPeerCallback;
 }
 
+FileDescriptor Processor::getEventFD()
+{
+    return mEventQueue.getFD();
+}
+
 void Processor::removeMethod(const MethodID methodID)
 {
     LOGT("Removing method " << methodID);
@@ -128,9 +133,9 @@ FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
         peerFD = socketPtr->getFD();
         SocketInfo socketInfo(peerFD, std::move(socketPtr));
         mNewSockets.push(std::move(socketInfo));
+        mEventQueue.send(Event::ADD_PEER);
     }
     LOGI("New peer added. Id: " << peerFD);
-    mEventQueue.send(Event::ADD_PEER);
 
     return peerFD;
 }
@@ -143,9 +148,9 @@ void Processor::removePeer(const FileDescriptor peerFD)
         Lock lock(mSocketsMutex);
         RemovePeerRequest request(peerFD, conditionPtr);
         mPeersToDelete.push(std::move(request));
+        mEventQueue.send(Event::REMOVE_PEER);
     }
 
-    mEventQueue.send(Event::REMOVE_PEER);
 
     auto isPeerDeleted = [&peerFD, this]()->bool {
         Lock lock(mSocketsMutex);
@@ -204,6 +209,10 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
 
 void Processor::resetPolling()
 {
+    if (!isStarted()) {
+        return;
+    }
+
     LOGI("Resetting polling");
     // Setup polling on eventfd and sockets
     Lock lock(mSocketsMutex);
@@ -251,20 +260,22 @@ void Processor::run()
         }
 
         // Check for incoming events
-        if (handleEvent()) {
-            // mFDs changed
-            continue;
+        if (mFDs[0].revents & POLLIN) {
+            mFDs[0].revents &= ~(POLLIN);
+            if (handleEvent()) {
+                // mFDs changed
+                continue;
+            }
         }
+
     }
 
     cleanCommunication();
 }
 
-
 bool Processor::handleLostConnections()
 {
     std::vector<FileDescriptor> peersToRemove;
-
     {
         Lock lock(mSocketsMutex);
         for (unsigned int i = 1; i < mFDs.size(); ++i) {
@@ -283,39 +294,61 @@ bool Processor::handleLostConnections()
     return !peersToRemove.empty();
 }
 
+bool Processor::handleLostConnection(const FileDescriptor peerFD)
+{
+    removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
+    return true;
+}
+
 bool Processor::handleInputs()
 {
-    std::vector<std::shared_ptr<Socket>> socketsWithInput;
+    std::vector<FileDescriptor> peersWithInput;
     {
         Lock lock(mSocketsMutex);
         for (unsigned int i = 1; i < mFDs.size(); ++i) {
             if (mFDs[i].revents & POLLIN) {
                 mFDs[i].revents &= ~(POLLIN);
-                socketsWithInput.push_back(mSockets[mFDs[i].fd]);
+                peersWithInput.push_back(mFDs[i].fd);
             }
         }
     }
 
     bool pollChanged = false;
     // Handle input outside the critical section
-    for (const auto& socketPtr : socketsWithInput) {
-        pollChanged = pollChanged || handleInput(*socketPtr);
+    for (const FileDescriptor peerFD : peersWithInput) {
+        pollChanged = pollChanged || handleInput(peerFD);
     }
     return pollChanged;
 }
 
-bool Processor::handleInput(const Socket& socket)
+bool Processor::handleInput(const FileDescriptor peerFD)
 {
     LOGT("Handle incoming data");
+
+    std::shared_ptr<Socket> socketPtr;
+    try {
+        socketPtr = mSockets.at(peerFD);
+    } catch (const std::out_of_range&) {
+        LOGE("No such peer: " << peerFD);
+        return false;
+    }
+
     MethodID methodID;
     MessageID messageID;
     {
-        Socket::Guard guard = socket.getGuard();
-        socket.read(&methodID, sizeof(methodID));
-        socket.read(&messageID, sizeof(messageID));
+        Socket::Guard guard = socketPtr->getGuard();
+        try {
+            socketPtr->read(&methodID, sizeof(methodID));
+            socketPtr->read(&messageID, sizeof(messageID));
+
+        } catch (const IPCException& e) {
+            LOGE("Error during reading the socket");
+            removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
+            return true;
+        }
 
         if (methodID == RETURN_METHOD_ID) {
-            return onReturnValue(socket, messageID);
+            return onReturnValue(*socketPtr, messageID);
 
         } else {
             Lock lock(mCallsMutex);
@@ -323,19 +356,19 @@ bool Processor::handleInput(const Socket& socket)
                 // Method
                 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
                 lock.unlock();
-                return onRemoteCall(socket, methodID, messageID, methodCallbacks);
+                return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
 
             } else if (mSignalsCallbacks.count(methodID)) {
                 // Signal
                 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
                 lock.unlock();
-                return onRemoteSignal(socket, methodID, messageID, signalCallbacks);
+                return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
 
             } else {
                 // Nothing
                 lock.unlock();
                 LOGW("No method or signal callback for methodID: " << methodID);
-                removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
+                removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
                 return true;
             }
         }
@@ -461,13 +494,6 @@ bool Processor::onRemoteCall(const Socket& socket,
 
 bool Processor::handleEvent()
 {
-    if (!(mFDs[0].revents & POLLIN)) {
-        // No event to serve
-        return false;
-    }
-
-    mFDs[0].revents &= ~(POLLIN);
-
     switch (mEventQueue.receive()) {
     case Event::FINISH: {
         LOGD("Event FINISH");
@@ -518,11 +544,11 @@ bool Processor::onNewPeer()
 
     // Broadcast the new signal to peers
     LOGW("Sending handled signals");
-    std::list<FileDescriptor> peersIDs;
+    std::list<FileDescriptor> peersFDs;
     {
         Lock lock(mSocketsMutex);
         for (const auto kv : mSockets) {
-            peersIDs.push_back(kv.first);
+            peersFDs.push_back(kv.first);
         }
     }
 
@@ -535,7 +561,7 @@ bool Processor::onNewPeer()
     }
     auto data = std::make_shared<RegisterSignalsMessage>(ids);
 
-    for (const FileDescriptor peerFD : peersIDs) {
+    for (const FileDescriptor peerFD : peersFDs) {
         callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
                                                         peerFD,
                                                         data,
index 6ce2688..d33d12b 100644 (file)
@@ -75,6 +75,7 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
 *  - new way to generate UIDs
 *  - callbacks for serialization/parsing
 *  - store Sockets in a vector, maybe SocketStore?
+*  - fix valgrind tests
 *
 *
 */
@@ -240,6 +241,35 @@ public:
     void signal(const MethodID methodID,
                 const std::shared_ptr<SentDataType>& data);
 
+    /**
+     * Removes one peer.
+     * Handler used in external polling.
+     *
+     * @param peerFD file description identifying the peer
+     * @return should the polling structure be rebuild
+     */
+    bool handleLostConnection(const FileDescriptor peerFD);
+
+    /**
+     * Handles input from one peer.
+     * Handler used in external polling.
+     *
+     * @param peerFD file description identifying the peer
+     * @return should the polling structure be rebuild
+     */
+    bool handleInput(const FileDescriptor peerFD);
+
+    /**
+     * Handle one event from the internal event's queue
+     *
+     * @return should the polling structure be rebuild
+     */
+    bool handleEvent();
+
+    /**
+     * @return file descriptor for the internal event's queue
+     */
+    FileDescriptor getEventFD();
 
 private:
     typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
@@ -383,13 +413,12 @@ private:
     static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
 
     void run();
-    bool handleEvent();
     bool onCall();
     bool onNewPeer();
     bool onRemovePeer();
     bool handleLostConnections();
     bool handleInputs();
-    bool handleInput(const Socket& socket);
+
     bool onReturnValue(const Socket& socket,
                        const MessageID messageID);
     bool onRemoteCall(const Socket& socket,
@@ -494,26 +523,24 @@ void Processor::addSignalHandler(const MethodID methodID,
         mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
     }
 
-    if (isStarted()) {
-        // Broadcast the new signal to peers
-        std::vector<MethodID> ids {methodID};
-        auto data = std::make_shared<RegisterSignalsMessage>(ids);
+    std::vector<MethodID> ids {methodID};
+    auto data = std::make_shared<RegisterSignalsMessage>(ids);
 
-        std::list<FileDescriptor> peersIDs;
-        {
-            Lock lock(mSocketsMutex);
-            for (const auto kv : mSockets) {
-                peersIDs.push_back(kv.first);
-            }
+    std::list<FileDescriptor> peersFDs;
+    {
+        Lock lock(mSocketsMutex);
+        for (const auto kv : mSockets) {
+            peersFDs.push_back(kv.first);
         }
+    }
 
-        for (const FileDescriptor peerFD : peersIDs) {
-            callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
-                                                        peerFD,
-                                                        data,
-                                                        DEFAULT_METHOD_TIMEOUT);
-        }
+    for (const FileDescriptor peerFD : peersFDs) {
+        callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
+                                                    peerFD,
+                                                    data,
+                                                    DEFAULT_METHOD_TIMEOUT);
     }
+
 }
 
 template<typename SentDataType, typename ReceivedDataType>
@@ -535,11 +562,6 @@ MessageID Processor::callAsync(const MethodID methodID,
                                const std::shared_ptr<SentDataType>& data,
                                const typename ResultHandler<ReceivedDataType>::type& process)
 {
-    if (!isStarted()) {
-        LOGE("The Processor thread is not started. Can't send any data.");
-        throw IPCException("The Processor thread is not started. Can't send any data.");
-    }
-
     return callInternal<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
 }
 
@@ -600,18 +622,13 @@ template<typename SentDataType>
 void Processor::signal(const MethodID methodID,
                        const std::shared_ptr<SentDataType>& data)
 {
-    if (!isStarted()) {
-        LOGE("The Processor thread is not started. Can't send any data.");
-        throw IPCException("The Processor thread is not started. Can't send any data.");
-    }
-
-    std::list<FileDescriptor> peersIDs;
+    std::list<FileDescriptor> peersFDs;
     {
         Lock lock(mSocketsMutex);
-        peersIDs = mSignalsPeers[methodID];
+        peersFDs = mSignalsPeers[methodID];
     }
 
-    for (const FileDescriptor peerFD : peersIDs) {
+    for (const FileDescriptor peerFD : peersFDs) {
         Lock lock(mCallsMutex);
         mCalls.push<SentDataType>(methodID, peerFD, data);
         mEventQueue.send(Event::CALL);
index 88f8fc0..bd98c1b 100644 (file)
@@ -122,8 +122,8 @@ void write(int fd, const void* bufferPtr, const size_t size, int timeoutMS)
             // Neglected errors
             LOGD("Retrying write");
         } else {
-            LOGE("Error during reading: " + std::string(strerror(errno)));
-            throw IPCException("Error during reading: " + std::string(strerror(errno)));
+            LOGE("Error during writing: " + std::string(strerror(errno)));
+            throw IPCException("Error during writing: " + std::string(strerror(errno)));
         }
 
         if (nTotal >= size) {
diff --git a/common/ipc/ipc-gsource.cpp b/common/ipc/ipc-gsource.cpp
new file mode 100644 (file)
index 0000000..f5cdbb5
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Class for creating a dedicated GSource
+ */
+
+
+#include "config.hpp"
+
+#include "ipc/ipc-gsource.hpp"
+
+#if GLIB_CHECK_VERSION(2,36,0)
+
+#include "logger/logger.hpp"
+#include <algorithm>
+
+namespace vasum {
+namespace ipc {
+
+namespace {
+
+
+GIOCondition conditions = static_cast<GIOCondition>(G_IO_IN |
+                                                    G_IO_ERR |
+                                                    G_IO_HUP);
+}
+
+
+IPCGSource::IPCGSource(const std::vector<FileDescriptor> fds,
+                       const HandlerCallback& handlerCallback)
+    : mHandlerCallback(handlerCallback)
+{
+    LOGD("Constructing IPCGSource");
+    for (const FileDescriptor fd : fds) {
+        addFD(fd);
+    }
+}
+
+IPCGSource::~IPCGSource()
+{
+    LOGD("Destroying IPCGSource");
+    g_source_destroy(&mGSource);
+
+}
+
+IPCGSource* IPCGSource::create(const std::vector<FileDescriptor>& fds,
+                               const HandlerCallback& handlerCallback)
+{
+    LOGD("Creating IPCGSource");
+
+    static GSourceFuncs funcs = { &IPCGSource::prepare,
+                                  &IPCGSource::check,
+                                  &IPCGSource::dispatch,
+                                  &IPCGSource::finalize,
+                                  nullptr,
+                                  nullptr
+                                };
+
+    // New GSource
+    GSource* gSource = g_source_new(&funcs, sizeof(IPCGSource));
+    g_source_set_priority(gSource, G_PRIORITY_HIGH);
+
+    // Fill additional data
+    IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
+    return new(source) IPCGSource(fds, handlerCallback);
+}
+
+
+void IPCGSource::addFD(const FileDescriptor fd)
+{
+    if (!&mGSource) {
+        // In case it's called as a callback but the IPCGSource is destroyed
+        return;
+    }
+
+    LOGD("Adding fd to glib");
+    gpointer tag = g_source_add_unix_fd(&mGSource,
+                                        fd,
+                                        conditions);
+    FDInfo fdInfo(tag, fd);
+    mFDInfos.push_back(std::move(fdInfo));
+}
+
+void IPCGSource::removeFD(const FileDescriptor fd)
+{
+    if (!&mGSource) {
+        // In case it's called as a callback but the IPCGSource is destroyed
+        return;
+    }
+
+    LOGD("Removing fd from glib");
+    auto it = std::find(mFDInfos.begin(), mFDInfos.end(), fd);
+    if (it == mFDInfos.end()) {
+        LOGE("No such fd");
+        return;
+    }
+    g_source_remove_unix_fd(&mGSource, it->tag);
+    mFDInfos.erase(it);
+}
+
+guint IPCGSource::attach(GMainContext* context)
+{
+    LOGD("Attaching to GMainContext");
+    return g_source_attach(&mGSource, context);
+}
+
+gboolean IPCGSource::prepare(GSource* gSource, gint* timeout)
+{
+    if (!gSource) {
+        return FALSE;
+    }
+
+    *timeout = -1;
+
+    // TODO: Implement hasEvents() method in Client and Service and use it here as a callback:
+    // return source->hasEvents();
+    return FALSE;
+}
+
+gboolean IPCGSource::check(GSource* gSource)
+{
+    if (!gSource) {
+        return FALSE;
+    }
+
+    return TRUE;
+}
+
+gboolean IPCGSource::dispatch(GSource* gSource,
+                              GSourceFunc /*callback*/,
+                              gpointer /*userData*/)
+{
+    IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
+
+    for (const FDInfo fdInfo : source->mFDInfos) {
+        GIOCondition cond = g_source_query_unix_fd(gSource, fdInfo.tag);
+        if (conditions & cond) {
+            source->mHandlerCallback(fdInfo.fd, cond);
+        }
+    }
+
+    return TRUE; // Don't remove the GSource from the GMainContext
+}
+
+void  IPCGSource::finalize(GSource* gSource)
+{
+    if (gSource) {
+        IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
+        source->~IPCGSource();
+    }
+}
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // GLIB_CHECK_VERSION
diff --git a/common/ipc/ipc-gsource.hpp b/common/ipc/ipc-gsource.hpp
new file mode 100644 (file)
index 0000000..96e0a1a
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief    Class for creating a dedicated GSource
+ */
+
+#ifndef COMMON_IPC_IPC_GSOURCE_HPP
+#define COMMON_IPC_IPC_GSOURCE_HPP
+
+#include <glib.h>
+#if GLIB_CHECK_VERSION(2,36,0)
+
+#include "ipc/service.hpp"
+#include "ipc/types.hpp"
+#include "utils/callback-wrapper.hpp"
+#include <memory>
+
+
+namespace vasum {
+namespace ipc {
+
+/**
+ * Class for connecting to the glib's loop.
+ * Creates a dedicated GSource.
+ *
+ * It's supposed to be constructed ONLY with the static create method
+ * and destructed in a glib callback.
+ */
+struct IPCGSource {
+public:
+    typedef std::function<void(FileDescriptor fd, short pollEvent)> HandlerCallback;
+
+    IPCGSource() = delete;
+    IPCGSource(const IPCGSource&) = delete;
+    IPCGSource& operator=(const IPCGSource&) = delete;
+
+    /**
+     * New file descriptor to listen on.
+     *
+     * @param peerFD file descriptor
+     */
+    void addFD(const FileDescriptor peerFD);
+
+    /**
+     * Removes the file descriptor from the GSource
+     *
+     * @param peerFD file descriptor
+     */
+    void removeFD(const FileDescriptor peerFD);
+
+    /**
+     * Attach to the glib's GMainContext
+     *
+     * @param context where to connect
+     * @return result of the g_source_attach call
+     */
+    guint attach(GMainContext* context = nullptr);
+
+    /**
+     * Creates the IPCGSource class in the memory allocated by glib.
+     * Calls IPCGSource's constructor
+     *
+     * @param fds initial set of file descriptors
+     * @param handlerCallback event handling callback
+     *
+     * @return pointer to the IPCGSource
+     */
+    static IPCGSource* create(const std::vector<FileDescriptor>& fds,
+                              const HandlerCallback& handlerCallback);
+
+private:
+
+    /**
+     * GSourceFuncs' callback
+     */
+    static gboolean prepare(GSource* source, gint* timeout);
+
+    /**
+     * GSourceFuncs' callback
+     */
+    static gboolean check(GSource* source);
+
+    /**
+     * GSourceFuncs' callback
+     */
+    static gboolean dispatch(GSource* source,
+                             GSourceFunc callbacks,
+                             gpointer userData);
+
+    /**
+     * GSourceFuncs' callback
+     */
+    static void  finalize(GSource* source);
+
+
+
+    // Called only from IPCGSource::create
+    IPCGSource(const std::vector<FileDescriptor> fds,
+               const HandlerCallback& handlerCallback);
+
+    // Called only from IPCGSource::finalize
+    ~IPCGSource();
+
+    struct FDInfo {
+        FDInfo(gpointer tag, FileDescriptor fd)
+            : tag(tag), fd(fd) {}
+
+        bool operator==(const gpointer t)
+        {
+            return t == tag;
+        }
+
+        bool operator==(const FileDescriptor f)
+        {
+            return f == fd;
+        }
+
+        gpointer tag;
+        FileDescriptor fd;
+    };
+
+    GSource mGSource;
+    HandlerCallback mHandlerCallback;
+    std::vector<FDInfo> mFDInfos;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // GLIB_CHECK_VERSION
+
+#endif // COMMON_IPC_IPC_GSOURCE_HPP
index 5ee5fbd..be95cee 100644 (file)
@@ -79,6 +79,41 @@ void Service::stop()
     LOGD("Stopped");
 }
 
+std::vector<FileDescriptor> Service::getFDs()
+{
+    std::vector<FileDescriptor> fds;
+    fds.push_back(mAcceptor.getEventFD());
+    fds.push_back(mAcceptor.getConnectionFD());
+    fds.push_back(mProcessor.getEventFD());
+
+    return fds;
+}
+
+void Service::handle(const FileDescriptor fd, const short pollEvent)
+{
+    if (fd == mProcessor.getEventFD() && pollEvent & POLLIN) {
+        mProcessor.handleEvent();
+        return;
+
+    } else if (fd == mAcceptor.getConnectionFD() && pollEvent & POLLIN) {
+        mAcceptor.handleConnection();
+        return;
+
+    } else if (fd == mAcceptor.getEventFD() && pollEvent & POLLIN) {
+        mAcceptor.handleEvent();
+        return;
+
+    } else if (pollEvent & POLLIN) {
+        mProcessor.handleInput(fd);
+        return;
+
+    } else if (pollEvent & POLLHUP) {
+        mProcessor.handleLostConnection(fd);
+        return;
+    }
+}
+
+
 void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
     mProcessor.setNewPeerCallback(newPeerCallback);
index 1f9aee3..ed83606 100644 (file)
@@ -75,6 +75,23 @@ public:
     void stop();
 
     /**
+     * Used with an external polling loop
+     *
+     * @return vector of internal file descriptors
+     */
+    std::vector<FileDescriptor> getFDs();
+
+    /**
+     * Used with an external polling loop.
+     * Handles one event from the file descriptor.
+     *
+     * @param fd file descriptor
+     * @param pollEvent event on the fd. Defined in poll.h
+     *
+     */
+    void handle(const FileDescriptor fd, const short pollEvent);
+
+    /**
     * Set the callback called for each new connection to a peer
     *
     * @param newPeerCallback the callback
index 18c769d..fa57648 100644 (file)
@@ -22,6 +22,8 @@
  * @brief   Types definitions and helper functions
  */
 
+#include "config.hpp"
+
 #include "ipc/types.hpp"
 #include "logger/logger.hpp"
 
index c671db6..9ce131d 100644 (file)
@@ -33,7 +33,9 @@
 
 #include "ipc/service.hpp"
 #include "ipc/client.hpp"
+#include "ipc/ipc-gsource.hpp"
 #include "ipc/types.hpp"
+#include "utils/glib-loop.hpp"
 
 #include "config/fields.hpp"
 #include "logger/logger.hpp"
@@ -47,6 +49,8 @@
 
 using namespace vasum;
 using namespace vasum::ipc;
+using namespace vasum::utils;
+using namespace std::placeholders;
 namespace fs = boost::filesystem;
 
 namespace {
@@ -132,30 +136,48 @@ std::shared_ptr<SendData> longEchoCallback(const FileDescriptor, std::shared_ptr
     return data;
 }
 
-FileDescriptor connect(Service& s, Client& c)
+FileDescriptor connect(Service& s, Client& c, bool serviceUsesGlib = false)
 {
     // Connects the Client to the Service and returns Clients FileDescriptor
-
     std::mutex mutex;
     std::condition_variable cv;
 
     FileDescriptor peerFD = 0;
-    auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFileDescriptor) {
+    auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFD) {
         std::unique_lock<std::mutex> lock(mutex);
-        peerFD = newFileDescriptor;
-        cv.notify_one();
+        peerFD = newFD;
+        cv.notify_all();
     };
 
-    s.setNewPeerCallback(newPeerCallback);
 
-    if (!s.isStarted()) {
-        s.start();
+    if (!serviceUsesGlib) {
+        s.setNewPeerCallback(newPeerCallback);
+
+        if (!s.isStarted()) {
+            s.start();
+        }
+    } else {
+#if GLIB_CHECK_VERSION(2,36,0)
+
+        IPCGSource* serviceGSourcePtr = IPCGSource::create(s.getFDs(), std::bind(&Service::handle, &s, _1, _2));
+
+        auto agregateCallback = [&newPeerCallback, &serviceGSourcePtr](const FileDescriptor newFD) {
+            serviceGSourcePtr->addFD(newFD);
+            newPeerCallback(newFD);
+        };
+
+        s.setNewPeerCallback(agregateCallback);
+        s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, serviceGSourcePtr, _1));
+
+        serviceGSourcePtr->attach();
+#endif // GLIB_CHECK_VERSION
+
     }
 
     c.start();
 
     std::unique_lock<std::mutex> lock(mutex);
-    BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerFD]() {
+    BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() {
         return peerFD != 0;
     }));
 
@@ -165,7 +187,7 @@ FileDescriptor connect(Service& s, Client& c)
 void testEcho(Client& c, const MethodID methodID)
 {
     std::shared_ptr<SendData> sentData(new SendData(34));
-    std::shared_ptr<SendData> recvData = c.callSync<SendData, SendData>(methodID, sentData);
+    std::shared_ptr<SendData> recvData = c.callSync<SendData, SendData>(methodID, sentData, 1000);
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
@@ -554,6 +576,35 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline)
 }
 
 
+#if GLIB_CHECK_VERSION(2,36,0)
+
+BOOST_AUTO_TEST_CASE(ServiceGSource)
+{
+    ScopedGlibLoop loop;
+
+    std::atomic_bool isSignalCalled(false);
+    auto signalHandler = [&isSignalCalled](const FileDescriptor, std::shared_ptr<SendData>&) {
+        isSignalCalled = true;
+    };
+
+    Service s(socketPath);
+    s.addMethodHandler<SendData, SendData>(1, echoCallback);
+
+    Client c(socketPath);
+    s.addSignalHandler<SendData>(2, signalHandler);
+    connect(s, c, true);
+
+    testEcho(c, 1);
+
+    auto data = std::make_shared<SendData>(1);
+    c.signal<SendData>(2, data);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(100)); //TODO wait_for
+    BOOST_CHECK(isSignalCalled);
+}
+
+#endif // GLIB_CHECK_VERSION
+
 // BOOST_AUTO_TEST_CASE(ConnectionLimitTest)
 // {
 //     unsigned oldLimit = ipc::getMaxFDNumber();