IPC works on epoll 96/36596/4
authorPiotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
Tue, 10 Mar 2015 11:01:48 +0000 (12:01 +0100)
committerPiotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
Thu, 12 Mar 2015 08:59:41 +0000 (01:59 -0700)
[Bug/Feature]   N/A
[Cause]         N/A
[Solution]      N/A
[Verification]  Run tests

Change-Id: I6f6b5a7a70cecbecbdf0c502d0f8618577892a48

17 files changed:
common/epoll/event-poll.cpp
common/epoll/glib-dispatcher.cpp [moved from common/epoll/glib-poll-dispatcher.cpp with 91% similarity]
common/epoll/glib-dispatcher.hpp [moved from common/epoll/glib-poll-dispatcher.hpp with 84% similarity]
common/epoll/thread-dispatcher.cpp [moved from common/epoll/thread-poll-dispatcher.cpp with 88% similarity]
common/epoll/thread-dispatcher.hpp [moved from common/epoll/thread-poll-dispatcher.hpp with 83% similarity]
common/ipc/client.cpp
common/ipc/client.hpp
common/ipc/internals/acceptor.cpp
common/ipc/internals/acceptor.hpp
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp
common/ipc/service.cpp
common/ipc/service.hpp
tests/scripts/vsm_test_parser.py
tests/unit_tests/epoll/ut-event-poll.cpp
tests/unit_tests/ipc/ut-ipc.cpp
tests/unit_tests/ut.hpp

index dc1c1bc..9eb9fa0 100644 (file)
@@ -49,6 +49,9 @@ EventPoll::~EventPoll()
 {
     if (!mCallbacks.empty()) {
         LOGW("Not removed callbacks: " << mCallbacks.size());
+        for (const auto& item : mCallbacks) {
+            LOGT("Not removed fd: " << item.first);
+        }
         assert(0 && "Not removed callbacks left");
     }
     utils::close(mPollFD);
@@ -73,7 +76,7 @@ void EventPoll::addFD(const int fd, const Events events, Callback&& callback)
     }
 
     mCallbacks.insert({fd, std::make_shared<Callback>(std::move(callback))});
-    LOGT("Callback added for " << fd);
+    LOGT("Callback added for fd: " << fd);
 }
 
 void EventPoll::removeFD(const int fd)
@@ -87,7 +90,7 @@ void EventPoll::removeFD(const int fd)
     }
     mCallbacks.erase(iter);
     removeFDInternal(fd);
-    LOGT("Callback removed for " << fd);
+    LOGT("Callback removed for fd: " << fd);
 }
 
 bool EventPoll::dispatchIteration(const int timeoutMs)
@@ -115,7 +118,13 @@ bool EventPoll::dispatchIteration(const int timeoutMs)
 
         // add ref because removeFD(self) can be called inside callback
         std::shared_ptr<Callback> callback(iter->second);
-        return (*callback)(event.data.fd, event.events);
+        try {
+            LOGT("Dispatch fd: " << event.data.fd << ", events: " << eventsToString(event.events));
+            return (*callback)(event.data.fd, event.events);
+        } catch (std::exception& e) {
+            LOGE("Got unexpected exception: " << e.what());
+            assert(0 && "Callback should not throw any exceptions");
+        }
     }
 }
 
similarity index 91%
rename from common/epoll/glib-poll-dispatcher.cpp
rename to common/epoll/glib-dispatcher.cpp
index cde535d..9567abb 100644 (file)
  */
 
 #include "config.hpp"
-#include "epoll/glib-poll-dispatcher.hpp"
+#include "epoll/glib-dispatcher.hpp"
 #include "utils/callback-wrapper.hpp"
 
 namespace vasum {
 namespace epoll {
 
-GlibPollDispatcher::GlibPollDispatcher()
+GlibDispatcher::GlibDispatcher()
 {
     mChannel = g_io_channel_unix_new(mPoll.getPollFD());
 
@@ -50,14 +50,14 @@ GlibPollDispatcher::GlibPollDispatcher()
                                    &utils::deleteCallbackWrapper<decltype(dispatchCallback)>);
 }
 
-GlibPollDispatcher::~GlibPollDispatcher()
+GlibDispatcher::~GlibDispatcher()
 {
     g_source_remove(mWatchId);
     g_io_channel_unref(mChannel);
     // mGuard destructor will wait for full unregister of dispatchCallback
 }
 
-EventPoll& GlibPollDispatcher::getPoll()
+EventPoll& GlibDispatcher::getPoll()
 {
     return mPoll;
 }
similarity index 84%
rename from common/epoll/glib-poll-dispatcher.hpp
rename to common/epoll/glib-dispatcher.hpp
index cf300bb..0fa7b3a 100644 (file)
@@ -22,8 +22,8 @@
  * @brief   glib epoll dispatcher
  */
 
-#ifndef COMMON_EPOLL_GLIB_POLL_DISPATCHER_HPP
-#define COMMON_EPOLL_GLIB_POLL_DISPATCHER_HPP
+#ifndef COMMON_EPOLL_GLIB_DISPATCHER_HPP
+#define COMMON_EPOLL_GLIB_DISPATCHER_HPP
 
 #include "epoll/event-poll.hpp"
 #include "utils/callback-guard.hpp"
@@ -36,10 +36,10 @@ namespace epoll {
 /**
  * Will dispatch poll events in glib thread
  */
-class GlibPollDispatcher {
+class GlibDispatcher {
 public:
-    GlibPollDispatcher();
-    ~GlibPollDispatcher();
+    GlibDispatcher();
+    ~GlibDispatcher();
 
     EventPoll& getPoll();
 private:
@@ -53,4 +53,4 @@ private:
 } // namespace epoll
 } // namespace vasum
 
-#endif // COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
+#endif // COMMON_UTILS_GLIB_DISPATCHER_HPP
similarity index 88%
rename from common/epoll/thread-poll-dispatcher.cpp
rename to common/epoll/thread-dispatcher.cpp
index 797a8c4..a82cad8 100644 (file)
  */
 
 #include "config.hpp"
-#include "epoll/thread-poll-dispatcher.hpp"
+#include "epoll/thread-dispatcher.hpp"
 
 namespace vasum {
 namespace epoll {
 
-ThreadPollDispatcher::ThreadPollDispatcher()
+ThreadDispatcher::ThreadDispatcher()
 {
     auto controlCallback = [this](int, Events) -> bool {
         mStopEvent.receive();
@@ -41,14 +41,14 @@ ThreadPollDispatcher::ThreadPollDispatcher()
     });
 }
 
-ThreadPollDispatcher::~ThreadPollDispatcher()
+ThreadDispatcher::~ThreadDispatcher()
 {
     mStopEvent.send();
     mThread.join();
     mPoll.removeFD(mStopEvent.getFD());
 }
 
-EventPoll& ThreadPollDispatcher::getPoll()
+EventPoll& ThreadDispatcher::getPoll()
 {
     return mPoll;
 }
similarity index 83%
rename from common/epoll/thread-poll-dispatcher.hpp
rename to common/epoll/thread-dispatcher.hpp
index af6b278..7d0f30d 100644 (file)
@@ -22,8 +22,8 @@
  * @brief   Thread epoll dispatcher
  */
 
-#ifndef COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP
-#define COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP
+#ifndef COMMON_EPOLL_THREAD_DISPATCHER_HPP
+#define COMMON_EPOLL_THREAD_DISPATCHER_HPP
 
 #include "epoll/event-poll.hpp"
 #include "utils/eventfd.hpp"
@@ -36,10 +36,10 @@ namespace epoll {
 /**
  * Will dispatch poll events in a newly created thread
  */
-class ThreadPollDispatcher {
+class ThreadDispatcher {
 public:
-    ThreadPollDispatcher();
-    ~ThreadPollDispatcher();
+    ThreadDispatcher();
+    ~ThreadDispatcher();
 
     EventPoll& getPoll();
 private:
@@ -51,4 +51,4 @@ private:
 } // namespace epoll
 } // namespace vasum
 
-#endif // COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP
+#endif // COMMON_EPOLL_THREAD_DISPATCHER_HPP
index 6669d8a..abecb71 100644 (file)
@@ -31,8 +31,9 @@
 namespace vasum {
 namespace ipc {
 
-Client::Client(const std::string& socketPath)
-    : mProcessor("[CLIENT]  "),
+Client::Client(epoll::EventPoll& eventPoll, const std::string& socketPath)
+    : mEventPoll(eventPoll),
+      mProcessor("[CLIENT]  "),
       mSocketPath(socketPath)
 {
     LOGS("Client Constructor");
@@ -50,14 +51,19 @@ Client::~Client()
     }
 }
 
-void Client::start(const bool usesExternalPolling)
+void Client::start()
 {
+    if (mProcessor.isStarted()) {
+        return;
+    }
     LOGS("Client start");
     // Initialize the connection with the server
-    if (usesExternalPolling) {
-        startPoll();
-    }
-    mProcessor.start(usesExternalPolling);
+    auto handleEvent = [&](int, epoll::Events) -> bool {
+        mProcessor.handleEvent();
+        return true;
+    };
+    mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleEvent);
+    mProcessor.start();
 
     LOGD("Connecting to " + mSocketPath);
     auto socketPtr = std::make_shared<Socket>(Socket::connectSocket(mSocketPath));
@@ -71,34 +77,18 @@ bool Client::isStarted()
 
 void Client::stop()
 {
+    if (!mProcessor.isStarted()) {
+        return;
+    }
     LOGS("Client stop");
     mProcessor.stop();
 
-    if (mIPCGSourcePtr) {
-        stopPoll();
-    }
-}
-
-void Client::startPoll()
-{
-    LOGS("Client startPoll");
-    using namespace std::placeholders;
-    mIPCGSourcePtr = IPCGSource::create(std::bind(&Client::handle, this, _1, _2));
-    mIPCGSourcePtr->addFD(mProcessor.getEventFD());
-    mIPCGSourcePtr->attach();
+    mEventPoll.removeFD(mProcessor.getEventFD());
 }
 
-void Client::stopPoll()
-{
-    LOGS("Client stopPoll");
-
-    mIPCGSourcePtr->removeFD(mProcessor.getEventFD());
-    mIPCGSourcePtr->detach();
-    mIPCGSourcePtr.reset();
-}
-
-void Client::handle(const FileDescriptor fd, const short pollEvent)
+void Client::handle(const FileDescriptor fd, const epoll::Events pollEvents)
 {
+    //TODO remove handle method
     LOGS("Client handle");
 
     if (!isStarted()) {
@@ -106,17 +96,12 @@ void Client::handle(const FileDescriptor fd, const short pollEvent)
         return;
     }
 
-    if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) {
-        mProcessor.handleEvent();
-        return;
-
-    } else if (pollEvent & POLLIN) {
+    if (pollEvents & EPOLLIN) {
         mProcessor.handleInput(fd);
-        return;
+    }
 
-    } else if (pollEvent & POLLHUP) {
+    if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
         mProcessor.handleLostConnection(fd);
-        return;
     }
 }
 
@@ -124,9 +109,11 @@ void Client::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
     LOGS("Client setNewPeerCallback");
     auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
-        if (mIPCGSourcePtr) {
-            mIPCGSourcePtr->addFD(fd);
-        }
+        auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+            handle(fd, events);
+            return true;
+        };
+        mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
         if (newPeerCallback) {
             newPeerCallback(peerID, fd);
         }
@@ -138,9 +125,7 @@ void Client::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
 {
     LOGS("Client setRemovedPeerCallback");
     auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
-        if (mIPCGSourcePtr) {
-            mIPCGSourcePtr->removeFD(fd);
-        }
+        mEventPoll.removeFD(fd);
         if (removedPeerCallback) {
             removedPeerCallback(peerID, fd);
         }
index 321b4da..a3342b6 100644 (file)
@@ -26,9 +26,9 @@
 #define COMMON_IPC_CLIENT_HPP
 
 #include "ipc/internals/processor.hpp"
-#include "ipc/ipc-gsource.hpp"
 #include "ipc/types.hpp"
 #include "ipc/result.hpp"
+#include "epoll/event-poll.hpp"
 #include "logger/logger.hpp"
 
 #include <string>
@@ -40,28 +40,24 @@ namespace ipc {
  * This class wraps communication via UX sockets for client applications.
  * It uses serialization mechanism from libConfig.
  *
- * There is one additional thread:
- * - PROCESSOR is responsible for the communication and calling the callbacks
- *
  * For message format @see ipc::Processor
  */
 class Client {
 public:
     /**
+     * @param eventPoll event poll
      * @param serverPath path to the server's socket
      */
-    Client(const std::string& serverPath);
+    Client(epoll::EventPoll& eventPoll, const std::string& serverPath);
     ~Client();
 
     Client(const Client&) = delete;
     Client& operator=(const Client&) = delete;
 
     /**
-     * Starts the worker thread
-     *
-     * @param usesExternalPolling internal or external polling is used
+     * Starts processing
      */
-    void start(const bool usesExternalPolling = false);
+    void start();
 
     /**
     * @return is the communication thread running
@@ -69,21 +65,11 @@ public:
     bool isStarted();
 
     /**
-     * Stops all worker thread
+     * Stops processing
      */
     void stop();
 
     /**
-     * Used with an external polling loop.
-     * Handles one event from the file descriptor.
-     *
-     * @param fd file descriptor
-     * @param pollEvent event on the fd. Defined in poll.h
-     *
-     */
-    void handle(const FileDescriptor fd, const short pollEvent);
-
-    /**
      * Set the callback called for each new connection to a peer
      *
      * @param newPeerCallback the callback
@@ -170,14 +156,13 @@ public:
                 const std::shared_ptr<SentDataType>& data);
 
 private:
-
-    void startPoll();
-    void stopPoll();
-
+    epoll::EventPoll& mEventPoll;
     PeerID mServiceID;
     Processor mProcessor;
     std::string mSocketPath;
-    IPCGSource::Pointer mIPCGSourcePtr;
+
+    void handle(const FileDescriptor fd, const epoll::Events pollEvents);
+
 };
 
 template<typename SentDataType, typename ReceivedDataType>
index ecb2210..15b9ad8 100644 (file)
 
 #include "config.hpp"
 
-#include "ipc/exception.hpp"
 #include "ipc/internals/acceptor.hpp"
 #include "logger/logger.hpp"
 
-#include <poll.h>
-#include <cerrno>
-#include <cstring>
-#include <vector>
-
 namespace vasum {
 namespace ipc {
 
 Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback)
-    : mIsRunning(false),
-      mNewConnectionCallback(newConnectionCallback),
+    : mNewConnectionCallback(newConnectionCallback),
       mSocket(Socket::createSocket(socketPath))
 {
     LOGT("Creating Acceptor for socket " << socketPath);
@@ -46,98 +39,15 @@ Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& n
 
 Acceptor::~Acceptor()
 {
-    LOGT("Destroying Acceptor");
-    try {
-        stop();
-    } catch (std::exception& e) {
-        LOGE("Error in destructor: " << e.what());
-    }
     LOGT("Destroyed Acceptor");
 }
 
-void Acceptor::start()
-{
-    LOGT("Starting Acceptor");
-    if (!mThread.joinable()) {
-        mThread = std::thread(&Acceptor::run, this);
-    }
-    LOGT("Started Acceptor");
-}
-
-void Acceptor::stop()
-{
-    LOGT("Stopping Acceptor");
-
-    if (mThread.joinable()) {
-        mEventQueue.send(Event::FINISH);
-        LOGT("Waiting for Acceptor to finish");
-        mThread.join();
-    }
-
-    LOGT("Stopped Acceptor");
-}
-
-void Acceptor::run()
-{
-    // Setup polling structure
-    std::vector<struct pollfd> fds(2);
-
-    fds[0].fd = mEventQueue.getFD();
-    fds[0].events = POLLIN;
-
-    fds[1].fd = mSocket.getFD();
-    fds[1].events = POLLIN;
-
-    mIsRunning = true;
-    while (mIsRunning) {
-        LOGT("Waiting for new connections...");
-
-        int ret = ::poll(fds.data(), fds.size(), -1 /*blocking call*/);
-
-        LOGT("...Incoming connection!");
-
-        if (ret == -1 || ret == 0) {
-            if (errno == EINTR) {
-                continue;
-            }
-            LOGE("Error in poll: " << std::string(strerror(errno)));
-            throw IPCException("Error in poll: " + std::string(strerror(errno)));
-        }
-
-        // Check for incoming connections
-        if (fds[1].revents & POLLIN) {
-            fds[1].revents = 0;
-            handleConnection();
-        }
-
-        // Check for incoming events
-        if (fds[0].revents & POLLIN) {
-            fds[0].revents = 0;
-            handleEvent();
-        }
-    }
-    LOGT("Exiting run");
-}
-
 void Acceptor::handleConnection()
 {
     std::shared_ptr<Socket> tmpSocket = mSocket.accept();
     mNewConnectionCallback(tmpSocket);
 }
 
-void Acceptor::handleEvent()
-{
-    if (mEventQueue.receive() == Event::FINISH) {
-        LOGD("Event FINISH");
-        mIsRunning = false;
-    }
-}
-
-FileDescriptor Acceptor::getEventFD()
-{
-    return mEventQueue.getFD();
-}
-
 FileDescriptor Acceptor::getConnectionFD()
 {
     return mSocket.getFD();
index f87a0bb..a6e9ec8 100644 (file)
 #include "config.hpp"
 
 #include "ipc/internals/socket.hpp"
-#include "ipc/internals/event-queue.hpp"
 #include "ipc/types.hpp"
 
 #include <string>
-#include <thread>
 
 namespace vasum {
 namespace ipc {
@@ -59,51 +57,19 @@ public:
     Acceptor& operator=(const Acceptor&) = delete;
 
     /**
-     * Starts the thread accepting the new connections.
-     */
-    void start();
-
-    /**
-     * Stops the accepting thread.
-     */
-    void stop();
-
-    /**
      * Handle one incoming connection.
      * Used with external polling
      */
     void handleConnection();
 
     /**
-     * Handle one event from the internal event's queue
-     * Used with external polling
-     */
-    void handleEvent();
-
-    /**
-     * @return file descriptor of internal event's queue
-     */
-    FileDescriptor getEventFD();
-
-    /**
      * @return file descriptor for the connection socket
      */
     FileDescriptor getConnectionFD();
 
 private:
-    enum class Event : int {
-        FINISH  // Shutdown request
-    };
-
-    bool mIsRunning;
-
     NewConnectionCallback mNewConnectionCallback;
     Socket mSocket;
-
-    EventQueue<Event> mEventQueue;
-    std::thread mThread;
-
-    void run();
 };
 
 } // namespace ipc
index 93f721f..587723c 100644 (file)
@@ -26,7 +26,6 @@
 
 #include "ipc/exception.hpp"
 #include "ipc/internals/processor.hpp"
-#include "utils/signal.hpp"
 #include "utils/exception.hpp"
 
 #include <cerrno>
@@ -65,8 +64,6 @@ Processor::Processor(const std::string& logName,
 {
     LOGS(mLogPrefix + "Processor Constructor");
 
-    utils::signalBlock(SIGPIPE);
-
     using namespace std::placeholders;
     setSignalHandlerInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
                                                              std::bind(&Processor::onNewSignals, this, _1, _2));
@@ -104,7 +101,7 @@ bool Processor::isStarted()
     return mIsRunning;
 }
 
-void Processor::start(bool usesExternalPolling)
+void Processor::start()
 {
     LOGS(mLogPrefix + "Processor start");
 
@@ -112,10 +109,6 @@ void Processor::start(bool usesExternalPolling)
     if (!mIsRunning) {
         LOGI(mLogPrefix + "Processor start");
         mIsRunning = true;
-        mUsesExternalPolling = usesExternalPolling;
-        if (!usesExternalPolling) {
-            mThread = std::thread(&Processor::run, this);
-        }
     }
 }
 
@@ -133,15 +126,12 @@ void Processor::stop()
 
         LOGD(mLogPrefix + "Waiting for the Processor to stop");
 
-        if (mThread.joinable()) {
-            mThread.join();
-        } else {
-            // Wait till the FINISH request is served
-            Lock lock(mStateMutex);
-            conditionPtr->wait(lock, [this]() {
-                return !mIsRunning;
-            });
-        }
+        // Wait till the FINISH request is served
+        Lock lock(mStateMutex);
+        conditionPtr->wait(lock, [this]() {
+            return !mIsRunning;
+        });
+        assert(mPeerInfo.empty());
     }
 }
 
@@ -204,7 +194,8 @@ PeerID Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
     auto requestPtr = std::make_shared<AddPeerRequest>(socketPtr);
     mRequestQueue.pushBack(Event::ADD_PEER, requestPtr);
 
-    LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID);
+    LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID
+            << ", fd: " << socketPtr->getFD());
 
     return requestPtr->peerID;
 }
@@ -233,14 +224,14 @@ void Processor::removePeerSyncInternal(const PeerID peerID, Lock& lock)
 
 void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_ptr& exceptionPtr)
 {
-    LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
-    LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
-
     if (peerIt == mPeerInfo.end()) {
         LOGW("Peer already removed");
         return;
     }
 
+    LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
+    LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
+
     // Remove from signal addressees
     for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
         it->second.remove(peerIt->peerID);
@@ -270,100 +261,6 @@ void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_
     mPeerInfo.erase(peerIt);
 }
 
-void Processor::resetPolling()
-{
-    LOGS(mLogPrefix + "Processor resetPolling");
-
-    if (mUsesExternalPolling) {
-        return;
-    }
-
-    // Setup polling on eventfd and sockets
-    mFDs.resize(mPeerInfo.size() + 1);
-    LOGI(mLogPrefix + "Reseting mFDS.size: " << mFDs.size());
-
-    mFDs[0].fd = mRequestQueue.getFD();
-    mFDs[0].events = POLLIN;
-
-    for (unsigned int i = 1; i < mFDs.size(); ++i) {
-        auto fd = mPeerInfo[i - 1].socketPtr->getFD();
-
-        LOGI(mLogPrefix + "Reseting fd: " << fd);
-
-        mFDs[i].fd = fd;
-        mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
-        // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
-    }
-}
-
-void Processor::run()
-{
-    LOGS(mLogPrefix + "Processor run");
-
-    {
-        Lock lock(mStateMutex);
-        resetPolling();
-    }
-
-    while (isStarted()) {
-        LOGT(mLogPrefix + "Waiting for communication...");
-        int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
-        LOGT(mLogPrefix + "... incoming communication!");
-        if (ret == -1 || ret == 0) {
-            if (errno == EINTR) {
-                continue;
-            }
-            LOGE(mLogPrefix + "Error in poll: " << std::string(strerror(errno)));
-            throw IPCException("Error in poll: " + std::string(strerror(errno)));
-        }
-
-        // Check for lost connections:
-        if (handleLostConnections()) {
-            // mFDs changed
-            resetPolling();
-            continue;
-        }
-
-        // Check for incoming data.
-        if (handleInputs()) {
-            // mFDs changed
-            resetPolling();
-            continue;
-        }
-
-        // Check for incoming events
-        if (mFDs[0].revents & POLLIN) {
-            mFDs[0].revents &= ~(POLLIN);
-            if (handleEvent()) {
-                // mFDs changed
-                resetPolling();
-                continue;
-            }
-        }
-
-    }
-}
-
-bool Processor::handleLostConnections()
-{
-    Lock lock(mStateMutex);
-
-    bool isPeerRemoved = false;
-
-    for (unsigned int i = 1; i < mFDs.size(); ++i) {
-        if (mFDs[i].revents & POLLHUP) {
-            auto peerIt = getPeerInfoIterator(mFDs[i].fd);
-            LOGI(mLogPrefix + "Lost connection to peer: " << peerIt->peerID);
-            mFDs[i].revents &= ~(POLLHUP);
-            removePeerInternal(peerIt,
-                               std::make_exception_ptr(IPCPeerDisconnectedException()));
-            isPeerRemoved = true;
-        }
-    }
-
-    return isPeerRemoved;
-}
-
 bool Processor::handleLostConnection(const FileDescriptor fd)
 {
     Lock lock(mStateMutex);
@@ -373,21 +270,6 @@ bool Processor::handleLostConnection(const FileDescriptor fd)
     return true;
 }
 
-bool Processor::handleInputs()
-{
-    // Lock not needed, mFDs won't be changed by handleInput
-
-    bool pollChanged = false;
-    for (unsigned int i = 1; i < mFDs.size(); ++i) {
-        if (mFDs[i].revents & POLLIN) {
-            mFDs[i].revents &= ~(POLLIN);
-            pollChanged = pollChanged || handleInput(mFDs[i].fd);
-        }
-    }
-
-    return pollChanged;
-}
-
 bool Processor::handleInput(const FileDescriptor fd)
 {
     LOGS(mLogPrefix + "Processor handleInput fd: " << fd);
@@ -795,6 +677,12 @@ bool Processor::onFinishRequest(FinishRequest& request)
         }
     }
 
+    // Close peers
+    while (!mPeerInfo.empty()) {
+        removePeerInternal(--mPeerInfo.end(),
+                           std::make_exception_ptr(IPCClosingException()));
+    }
+
     mIsRunning = false;
 
     request.conditionPtr->notify_all();
index 86a3f42..6e0ae81 100644 (file)
@@ -43,7 +43,6 @@
 #include "logger/logger-scope.hpp"
 
 #include <ostream>
-#include <poll.h>
 #include <condition_variable>
 #include <mutex>
 #include <chrono>
@@ -136,12 +135,9 @@ public:
 
 
     /**
-     * Start the processing thread.
-     * Quits immediately after starting the thread.
-     *
-     * @param usesExternalPolling internal or external polling is used
+     * Start processing.
      */
-    void start(const bool usesExternalPolling);
+    void start();
 
     /**
      * @return is processor running
@@ -432,14 +428,12 @@ private:
     RequestQueue<Event> mRequestQueue;
 
     bool mIsRunning;
-    bool mUsesExternalPolling;
 
     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
     std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
     std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
 
     Peers mPeerInfo;
-    std::vector<struct pollfd> mFDs;
 
     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
 
@@ -451,8 +445,6 @@ private:
 
     unsigned int mMaxNumberOfPeers;
 
-    std::thread mThread;
-
     template<typename SentDataType, typename ReceivedDataType>
     void setMethodHandlerInternal(const MethodID methodID,
                                   const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
@@ -466,8 +458,6 @@ private:
                         const PeerID peerID,
                         const std::shared_ptr<SentDataType>& data);
 
-    void run();
-
     // Request handlers
     bool onMethodRequest(MethodRequest& request);
     bool onSignalRequest(SignalRequest& request);
@@ -476,9 +466,6 @@ private:
     bool onSendResultRequest(SendResultRequest& request);
     bool onFinishRequest(FinishRequest& request);
 
-    bool handleLostConnections();
-    bool handleInputs();
-
     bool onReturnValue(Peers::iterator& peerIt,
                        const MessageID messageID);
     bool onRemoteMethod(Peers::iterator& peerIt,
@@ -489,7 +476,6 @@ private:
                         const MethodID methodID,
                         const MessageID messageID,
                         std::shared_ptr<SignalHandlers> signalCallbacks);
-    void resetPolling();
 
     void removePeerInternal(Peers::iterator peerIt,
                             const std::exception_ptr& exceptionPtr);
index 442d804..c436726 100644 (file)
@@ -33,10 +33,12 @@ using namespace std::placeholders;
 namespace vasum {
 namespace ipc {
 
-Service::Service(const std::string& socketPath,
+Service::Service(epoll::EventPoll& eventPoll,
+                 const std::string& socketPath,
                  const PeerCallback& addPeerCallback,
                  const PeerCallback& removePeerCallback)
-    : mProcessor("[SERVICE] "),
+    : mEventPoll(eventPoll),
+      mProcessor("[SERVICE] "),
       mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1))
 
 {
@@ -55,19 +57,23 @@ Service::~Service()
     }
 }
 
-void Service::start(const bool usesExternalPolling)
+void Service::start()
 {
-    LOGS("Service start");
-    if (usesExternalPolling) {
-        startPoll();
-    }
-    mProcessor.start(usesExternalPolling);
-
-    // There can be an incoming connection from mAcceptor before mProcessor is listening,
-    // but it's OK. It will handle the connection when ready. So no need to wait for mProcessor.
-    if (!usesExternalPolling) {
-        mAcceptor.start();
+    if (mProcessor.isStarted()) {
+        return;
     }
+    LOGS("Service start");
+    auto handleConnection = [&](int, epoll::Events) -> bool {
+        mAcceptor.handleConnection();
+        return true;
+    };
+    auto handleProcessorEvent = [&](int, epoll::Events) -> bool {
+        mProcessor.handleEvent();
+        return true;
+    };
+    mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection);
+    mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleProcessorEvent);
+    mProcessor.start();
 }
 
 bool Service::isStarted()
@@ -77,39 +83,19 @@ bool Service::isStarted()
 
 void Service::stop()
 {
+    if (!mProcessor.isStarted()) {
+        return;
+    }
     LOGS("Service stop");
-    mAcceptor.stop();
     mProcessor.stop();
 
-    if (mIPCGSourcePtr) {
-        stopPoll();
-    }
-}
-
-void Service::startPoll()
-{
-    LOGS("Service startPoll");
-
-    mIPCGSourcePtr = IPCGSource::create(std::bind(&Service::handle, this, _1, _2));
-    mIPCGSourcePtr->addFD(mAcceptor.getEventFD());
-    mIPCGSourcePtr->addFD(mAcceptor.getConnectionFD());
-    mIPCGSourcePtr->addFD(mProcessor.getEventFD());
-    mIPCGSourcePtr->attach();
+    mEventPoll.removeFD(mAcceptor.getConnectionFD());
+    mEventPoll.removeFD(mProcessor.getEventFD());
 }
 
-void Service::stopPoll()
-{
-    LOGS("Service stopPoll");
-
-    mIPCGSourcePtr->removeFD(mAcceptor.getEventFD());
-    mIPCGSourcePtr->removeFD(mAcceptor.getConnectionFD());
-    mIPCGSourcePtr->removeFD(mProcessor.getEventFD());
-    mIPCGSourcePtr->detach();
-    mIPCGSourcePtr.reset();
-}
-
-void Service::handle(const FileDescriptor fd, const short pollEvent)
+void Service::handle(const FileDescriptor fd, const epoll::Events pollEvents)
 {
+    //TODO remove handle method
     LOGS("Service handle");
 
     if (!isStarted()) {
@@ -117,25 +103,12 @@ void Service::handle(const FileDescriptor fd, const short pollEvent)
         return;
     }
 
-    if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) {
-        mProcessor.handleEvent();
-        return;
-
-    } else if (fd == mAcceptor.getConnectionFD() && (pollEvent & POLLIN)) {
-        mAcceptor.handleConnection();
-        return;
-
-    } else if (fd == mAcceptor.getEventFD() && (pollEvent & POLLIN)) {
-        mAcceptor.handleEvent();
-        return;
-
-    } else if (pollEvent & POLLIN) {
+    if (pollEvents & EPOLLIN) {
         mProcessor.handleInput(fd);
-        return;
+    }
 
-    } else if (pollEvent & POLLHUP) {
+    if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
         mProcessor.handleLostConnection(fd);
-        return;
     }
 }
 
@@ -143,9 +116,11 @@ void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
     LOGS("Service setNewPeerCallback");
     auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
-        if (mIPCGSourcePtr) {
-            mIPCGSourcePtr->addFD(fd);
-        }
+        auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+            handle(fd, events);
+            return true;
+        };
+        mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
         if (newPeerCallback) {
             newPeerCallback(peerID, fd);
         }
@@ -157,9 +132,7 @@ void Service::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
 {
     LOGS("Service setRemovedPeerCallback");
     auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
-        if (mIPCGSourcePtr) {
-            mIPCGSourcePtr->removeFD(fd);
-        }
+        mEventPoll.removeFD(fd);
         if (removedPeerCallback) {
             removedPeerCallback(peerID, fd);
         }
index 022c9c9..880c137 100644 (file)
@@ -27,9 +27,9 @@
 
 #include "ipc/internals/processor.hpp"
 #include "ipc/internals/acceptor.hpp"
-#include "ipc/ipc-gsource.hpp"
 #include "ipc/types.hpp"
 #include "ipc/result.hpp"
+#include "epoll/event-poll.hpp"
 #include "logger/logger.hpp"
 
 #include <string>
@@ -42,18 +42,16 @@ namespace ipc {
  * This class wraps communication via UX sockets.
  * It uses serialization mechanism from libConfig.
  *
- * There are two working threads:
- * - ACCEPTOR accepts incoming connections and passes them to PROCESSOR
- * - PROCESSOR is responsible for the communication and calling the callbacks
- *
  * For message format @see ipc::Processor
  */
 class Service {
 public:
     /**
+     * @param eventPoll event poll
      * @param path path to the socket
      */
-    Service(const std::string& path,
+    Service(epoll::EventPoll& eventPoll,
+            const std::string& path,
             const PeerCallback& addPeerCallback = nullptr,
             const PeerCallback& removePeerCallback = nullptr);
     ~Service();
@@ -62,11 +60,9 @@ public:
     Service& operator=(const Service&) = delete;
 
     /**
-     * Starts the worker and acceptor threads
-     *
-     * @param usesExternalPolling internal or external polling is used
+     * Starts processing
      */
-    void start(const bool usesExternalPolling = false);
+    void start();
 
     /**
     * @return is the communication thread running
@@ -79,16 +75,6 @@ public:
     void stop();
 
     /**
-     * Used with an external polling loop.
-     * Handles one event from the file descriptor.
-     *
-     * @param fd file descriptor
-     * @param pollEvent event on the fd. Defined in poll.h
-     *
-     */
-    void handle(const FileDescriptor fd, const short pollEvent);
-
-    /**
     * Set the callback called for each new connection to a peer
     *
     * @param newPeerCallback the callback
@@ -175,14 +161,11 @@ public:
     void signal(const MethodID methodID,
                 const std::shared_ptr<SentDataType>& data);
 private:
-
-    void startPoll();
-    void stopPoll();
-
-    typedef std::lock_guard<std::mutex> Lock;
+    epoll::EventPoll& mEventPoll;
     Processor mProcessor;
     Acceptor mAcceptor;
-    IPCGSource::Pointer mIPCGSourcePtr;
+
+    void handle(const FileDescriptor fd, const epoll::Events pollEvents);
 };
 
 
index 41dd495..d0d4fa7 100644 (file)
@@ -35,7 +35,7 @@ class Logger(object):
 
     __indentChar = "    "
     def testCaseSummary(self, testSuite, testName, testResult, recLevel):
-        msg = self.__indentChar * recLevel + BOLD + "{:<50}".format(testName)
+        msg = self.__indentChar * recLevel + BOLD + "{:<50} ".format(testName)
 
         if testResult == "passed":
             msg += GREEN
index 0bcbe81..00cb385 100644 (file)
@@ -31,8 +31,8 @@
 #include "ipc/internals/socket.hpp"
 #include "utils/latch.hpp"
 #include "utils/glib-loop.hpp"
-#include "epoll/glib-poll-dispatcher.hpp"
-#include "epoll/thread-poll-dispatcher.hpp"
+#include "epoll/glib-dispatcher.hpp"
+#include "epoll/thread-dispatcher.hpp"
 
 using namespace vasum::utils;
 using namespace vasum::epoll;
@@ -54,14 +54,14 @@ BOOST_AUTO_TEST_CASE(EmptyPoll)
 
 BOOST_AUTO_TEST_CASE(ThreadedPoll)
 {
-    ThreadPollDispatcher dispatcher;
+    ThreadDispatcher dispatcher;
 }
 
 BOOST_AUTO_TEST_CASE(GlibPoll)
 {
     ScopedGlibLoop loop;
 
-    GlibPollDispatcher dispatcher;
+    GlibDispatcher dispatcher;
 }
 
 void doSocketTest(EventPoll& poll)
@@ -126,7 +126,7 @@ void doSocketTest(EventPoll& poll)
 
 BOOST_AUTO_TEST_CASE(ThreadedPollSocket)
 {
-    ThreadPollDispatcher dispatcher;
+    ThreadDispatcher dispatcher;
 
     doSocketTest(dispatcher.getPoll());
 }
@@ -135,14 +135,14 @@ BOOST_AUTO_TEST_CASE(GlibPollSocket)
 {
     ScopedGlibLoop loop;
 
-    GlibPollDispatcher dispatcher;
+    GlibDispatcher dispatcher;
 
     doSocketTest(dispatcher.getPoll());
 }
 
 BOOST_AUTO_TEST_CASE(PollStacking)
 {
-    ThreadPollDispatcher dispatcher;
+    ThreadDispatcher dispatcher;
 
     EventPoll innerPoll;
 
index 088f576..8f47a65 100644 (file)
@@ -34,6 +34,8 @@
 #include "ipc/client.hpp"
 #include "ipc/types.hpp"
 #include "ipc/result.hpp"
+#include "epoll/thread-dispatcher.hpp"
+#include "epoll/glib-dispatcher.hpp"
 #include "utils/glib-loop.hpp"
 #include "utils/latch.hpp"
 #include "utils/value-latch.hpp"
 
 using namespace vasum;
 using namespace vasum::ipc;
+using namespace vasum::epoll;
 using namespace vasum::utils;
 using namespace std::placeholders;
 
-namespace {
-
 // Timeout for sending one message
 const int TIMEOUT = 1000 /*ms*/;
 
@@ -68,15 +69,28 @@ const int LONG_OPERATION_TIME = 1000 + TIMEOUT;
 const std::string TEST_DIR = "/tmp/ut-ipc";
 const std::string SOCKET_PATH = TEST_DIR + "/test.socket";
 
-struct Fixture {
+struct FixtureBase {
     ScopedDir mTestPathGuard;
 
-    Fixture()
+    FixtureBase()
         : mTestPathGuard(TEST_DIR)
     {
     }
 };
 
+struct ThreadedFixture : FixtureBase {
+    ThreadDispatcher dispatcher;
+
+    EventPoll& getPoll() { return dispatcher.getPoll(); }
+};
+
+struct GlibFixture : FixtureBase {
+    ScopedGlibLoop glibLoop;
+    GlibDispatcher dispatcher;
+
+    EventPoll& getPoll() { return dispatcher.getPoll(); }
+};
+
 struct SendData {
     int intVal;
     SendData(int i): intVal(i) {}
@@ -167,7 +181,7 @@ void longEchoCallback(const PeerID,
     methodResult->set(returnData);
 }
 
-PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientGlib = false)
+PeerID connect(Service& s, Client& c)
 {
     // Connects the Client to the Service and returns Clients PeerID
     ValueLatch<PeerID> peerIDLatch;
@@ -178,10 +192,10 @@ PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientG
     s.setNewPeerCallback(newPeerCallback);
 
     if (!s.isStarted()) {
-        s.start(isServiceGlib);
+        s.start();
     }
 
-    c.start(isClientGlib);
+    c.start();
 
     PeerID peerID = peerIDLatch.get(TIMEOUT);
     s.setNewPeerCallback(nullptr);
@@ -189,16 +203,6 @@ PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientG
     return peerID;
 }
 
-PeerID connectServiceGSource(Service& s, Client& c)
-{
-    return connect(s, c, true, false);
-}
-
-PeerID connectClientGSource(Service& s, Client& c)
-{
-    return connect(s, c, false, true);
-}
-
 void testEcho(Client& c, const MethodID methodID)
 {
     std::shared_ptr<SendData> sentData(new SendData(34));
@@ -215,20 +219,17 @@ void testEcho(Service& s, const MethodID methodID, const PeerID peerID)
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
-} // namespace
-
-
-BOOST_FIXTURE_TEST_SUITE(IPCSuite, Fixture)
+BOOST_AUTO_TEST_SUITE(IPCSuite)
 
-BOOST_AUTO_TEST_CASE(ConstructorDestructor)
+MULTI_FIXTURE_TEST_CASE(ConstructorDestructor, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
-    Client c(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
 }
 
-BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod)
+MULTI_FIXTURE_TEST_CASE(ServiceAddRemoveMethod, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
     s.setMethodHandler<EmptyData, EmptyData>(1, returnEmptyCallback);
     s.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
 
@@ -237,7 +238,7 @@ BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod)
     s.setMethodHandler<SendData, RecvData>(1, echoCallback);
     s.setMethodHandler<SendData, RecvData>(2, returnDataCallback);
 
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     connect(s, c);
     testEcho(c, 1);
 
@@ -247,10 +248,10 @@ BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod)
     BOOST_CHECK_THROW(testEcho(c, 2), IPCException);
 }
 
-BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod)
+MULTI_FIXTURE_TEST_CASE(ClientAddRemoveMethod, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
-    Client c(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     c.setMethodHandler<EmptyData, EmptyData>(1, returnEmptyCallback);
     c.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
 
@@ -267,9 +268,9 @@ BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod)
     BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException);
 }
 
-BOOST_AUTO_TEST_CASE(ServiceStartStop)
+MULTI_FIXTURE_TEST_CASE(ServiceStartStop, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
 
     s.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
 
@@ -282,10 +283,10 @@ BOOST_AUTO_TEST_CASE(ServiceStartStop)
     s.start();
 }
 
-BOOST_AUTO_TEST_CASE(ClientStartStop)
+MULTI_FIXTURE_TEST_CASE(ClientStartStop, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
-    Client c(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     c.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
 
     c.start();
@@ -300,27 +301,27 @@ BOOST_AUTO_TEST_CASE(ClientStartStop)
     c.stop();
 }
 
-BOOST_AUTO_TEST_CASE(SyncClientToServiceEcho)
+MULTI_FIXTURE_TEST_CASE(SyncClientToServiceEcho, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
     s.setMethodHandler<SendData, RecvData>(1, echoCallback);
     s.setMethodHandler<SendData, RecvData>(2, echoCallback);
 
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     connect(s, c);
 
     testEcho(c, 1);
     testEcho(c, 2);
 }
 
-BOOST_AUTO_TEST_CASE(Restart)
+MULTI_FIXTURE_TEST_CASE(Restart, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
     s.setMethodHandler<SendData, RecvData>(1, echoCallback);
     s.start();
     s.setMethodHandler<SendData, RecvData>(2, echoCallback);
 
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     c.start();
     testEcho(c, 1);
     testEcho(c, 2);
@@ -334,14 +335,19 @@ BOOST_AUTO_TEST_CASE(Restart)
     s.stop();
     s.start();
 
+    BOOST_CHECK_THROW(testEcho(c, 2), IPCException);
+
+    c.stop();
+    c.start();
+
     testEcho(c, 1);
     testEcho(c, 2);
 }
 
-BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho)
+MULTI_FIXTURE_TEST_CASE(SyncServiceToClientEcho, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
-    Client c(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     c.setMethodHandler<SendData, RecvData>(1, echoCallback);
     PeerID peerID = connect(s, c);
 
@@ -351,16 +357,16 @@ BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho)
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
-BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho)
+MULTI_FIXTURE_TEST_CASE(AsyncClientToServiceEcho, F, ThreadedFixture, GlibFixture)
 {
     std::shared_ptr<SendData> sentData(new SendData(34));
     ValueLatch<std::shared_ptr<RecvData>> recvDataLatch;
 
     // Setup Service and Client
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
     s.setMethodHandler<SendData, RecvData>(1, echoCallback);
     s.start();
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     c.start();
 
     //Async call
@@ -374,13 +380,13 @@ BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho)
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
-BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
+MULTI_FIXTURE_TEST_CASE(AsyncServiceToClientEcho, F, ThreadedFixture, GlibFixture)
 {
     std::shared_ptr<SendData> sentData(new SendData(56));
     ValueLatch<std::shared_ptr<RecvData>> recvDataLatch;
 
-    Service s(SOCKET_PATH);
-    Client c(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     c.setMethodHandler<SendData, RecvData>(1, echoCallback);
     PeerID peerID = connect(s, c);
 
@@ -397,24 +403,24 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
 }
 
 
-BOOST_AUTO_TEST_CASE(SyncTimeout)
+MULTI_FIXTURE_TEST_CASE(SyncTimeout, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
     s.setMethodHandler<SendData, RecvData>(1, longEchoCallback);
 
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     connect(s, c);
 
     std::shared_ptr<SendData> sentData(new SendData(78));
     BOOST_REQUIRE_THROW((c.callSync<SendData, RecvData>(1, sentData, TIMEOUT)), IPCException);
 }
 
-BOOST_AUTO_TEST_CASE(SerializationError)
+MULTI_FIXTURE_TEST_CASE(SerializationError, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
     s.setMethodHandler<SendData, RecvData>(1, echoCallback);
 
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     connect(s, c);
 
     std::shared_ptr<ThrowOnAcceptData> throwingData(new ThrowOnAcceptData());
@@ -423,23 +429,24 @@ BOOST_AUTO_TEST_CASE(SerializationError)
 
 }
 
-BOOST_AUTO_TEST_CASE(ParseError)
+MULTI_FIXTURE_TEST_CASE(ParseError, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
     s.setMethodHandler<SendData, RecvData>(1, echoCallback);
     s.start();
 
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     c.start();
 
     std::shared_ptr<SendData> sentData(new SendData(78));
     BOOST_CHECK_THROW((c.callSync<SendData, ThrowOnAcceptData>(1, sentData, 10000)), IPCParsingException);
 }
 
-BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
+MULTI_FIXTURE_TEST_CASE(DisconnectedPeerError, F, ThreadedFixture, GlibFixture)
 {
     ValueLatch<Result<RecvData>> retStatusLatch;
-    Service s(SOCKET_PATH);
+
+    Service s(F::getPoll(), SOCKET_PATH);
 
     auto method = [](const PeerID, std::shared_ptr<ThrowOnAcceptData>&, MethodResult::Pointer methodResult) {
         auto resultData = std::make_shared<SendData>(1);
@@ -450,7 +457,7 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
     s.setMethodHandler<SendData, ThrowOnAcceptData>(1, method);
     s.start();
 
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     c.start();
 
     auto dataBack = [&retStatusLatch](Result<RecvData> && r) {
@@ -470,16 +477,16 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
 }
 
 
-BOOST_AUTO_TEST_CASE(ReadTimeout)
+MULTI_FIXTURE_TEST_CASE(ReadTimeout, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
     auto longEchoCallback = [](const PeerID, std::shared_ptr<RecvData>& data, MethodResult::Pointer methodResult) {
         auto resultData = std::make_shared<LongSendData>(data->intVal, LONG_OPERATION_TIME);
         methodResult->set<LongSendData>(resultData);
     };
     s.setMethodHandler<LongSendData, RecvData>(1, longEchoCallback);
 
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     connect(s, c);
 
     // Test timeout on read
@@ -488,13 +495,13 @@ BOOST_AUTO_TEST_CASE(ReadTimeout)
 }
 
 
-BOOST_AUTO_TEST_CASE(WriteTimeout)
+MULTI_FIXTURE_TEST_CASE(WriteTimeout, F, ThreadedFixture, GlibFixture)
 {
-    Service s(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
     s.setMethodHandler<SendData, RecvData>(1, echoCallback);
     s.start();
 
-    Client c(SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     c.start();
 
     // Test echo with a minimal timeout
@@ -509,13 +516,13 @@ BOOST_AUTO_TEST_CASE(WriteTimeout)
 }
 
 
-BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
+MULTI_FIXTURE_TEST_CASE(AddSignalInRuntime, F, ThreadedFixture, GlibFixture)
 {
     ValueLatch<std::shared_ptr<RecvData>> recvDataLatchA;
     ValueLatch<std::shared_ptr<RecvData>> recvDataLatchB;
 
-    Service s(SOCKET_PATH);
-    Client c(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     connect(s, c);
 
     auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr<RecvData>& data) {
@@ -545,13 +552,13 @@ BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
 }
 
 
-BOOST_AUTO_TEST_CASE(AddSignalOffline)
+MULTI_FIXTURE_TEST_CASE(AddSignalOffline, F, ThreadedFixture, GlibFixture)
 {
     ValueLatch<std::shared_ptr<RecvData>> recvDataLatchA;
     ValueLatch<std::shared_ptr<RecvData>> recvDataLatchB;
 
-    Service s(SOCKET_PATH);
-    Client c(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
 
     auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr<RecvData>& data) {
         recvDataLatchA.set(data);
@@ -581,66 +588,13 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline)
     BOOST_CHECK_EQUAL(recvDataB->intVal, sendDataB->intVal);
 }
 
-
-BOOST_AUTO_TEST_CASE(ServiceGSource)
-{
-    utils::Latch l;
-    ScopedGlibLoop loop;
-
-    auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
-        l.set();
-    };
-
-    Service s(SOCKET_PATH);
-    s.setMethodHandler<SendData, RecvData>(1, echoCallback);
-
-    Client c(SOCKET_PATH);
-    s.setSignalHandler<RecvData>(2, signalHandler);
-
-    connectServiceGSource(s, c);
-
-    testEcho(c, 1);
-
-    auto data = std::make_shared<SendData>(1);
-    c.signal<SendData>(2, data);
-
-    BOOST_CHECK(l.wait(TIMEOUT));
-}
-
-
-BOOST_AUTO_TEST_CASE(ClientGSource)
-{
-    utils::Latch l;
-    ScopedGlibLoop loop;
-
-    auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
-        l.set();
-    };
-
-    Service s(SOCKET_PATH);
-    s.start();
-
-    Client c(SOCKET_PATH);
-    c.setMethodHandler<SendData, RecvData>(1, echoCallback);
-    c.setSignalHandler<RecvData>(2, signalHandler);
-
-    PeerID peerID = connectClientGSource(s, c);
-
-    testEcho(s, 1, peerID);
-
-    auto data = std::make_shared<SendData>(1);
-    s.signal<SendData>(2, data);
-
-    BOOST_CHECK(l.wait(TIMEOUT));
-}
-
-BOOST_AUTO_TEST_CASE(UsersError)
+MULTI_FIXTURE_TEST_CASE(UsersError, F, ThreadedFixture, GlibFixture)
 {
     const int TEST_ERROR_CODE = -234;
     const std::string TEST_ERROR_MESSAGE = "Ay, caramba!";
 
-    Service s(SOCKET_PATH);
-    Client c(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     auto clientID = connect(s, c);
 
     auto throwingMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer) {
@@ -668,13 +622,13 @@ BOOST_AUTO_TEST_CASE(UsersError)
     BOOST_CHECK_EXCEPTION((s.callSync<SendData, RecvData>(2, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData);
 }
 
-BOOST_AUTO_TEST_CASE(AsyncResult)
+MULTI_FIXTURE_TEST_CASE(AsyncResult, F, ThreadedFixture, GlibFixture)
 {
     const int TEST_ERROR_CODE = -567;
     const std::string TEST_ERROR_MESSAGE = "Ooo jooo!";
 
-    Service s(SOCKET_PATH);
-    Client c(SOCKET_PATH);
+    Service s(F::getPoll(), SOCKET_PATH);
+    Client c(F::getPoll(), SOCKET_PATH);
     auto clientID = connect(s, c);
 
     auto errorMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer methodResult) {
@@ -724,20 +678,44 @@ BOOST_AUTO_TEST_CASE(AsyncResult)
     BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
 }
 
-// BOOST_AUTO_TEST_CASE(ConnectionLimitTest)
+MULTI_FIXTURE_TEST_CASE(MixOperations, F, ThreadedFixture, GlibFixture)
+{
+    utils::Latch l;
+
+    auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
+        l.set();
+    };
+
+    Service s(F::getPoll(), SOCKET_PATH);
+    s.setMethodHandler<SendData, RecvData>(1, echoCallback);
+
+    Client c(F::getPoll(), SOCKET_PATH);
+    s.setSignalHandler<RecvData>(2, signalHandler);
+
+    connect(s, c);
+
+    testEcho(c, 1);
+
+    auto data = std::make_shared<SendData>(1);
+    c.signal<SendData>(2, data);
+
+    BOOST_CHECK(l.wait(TIMEOUT));
+}
+
+// MULTI_FIXTURE_TEST_CASE(ConnectionLimitTest, F, ThreadedFixture, GlibFixture)
 // {
 //     unsigned oldLimit = ipc::getMaxFDNumber();
 //     ipc::setMaxFDNumber(50);
 
 //     // Setup Service and many Clients
-//     Service s(SOCKET_PATH);
+//     Service s(F::getPoll(), SOCKET_PATH);
 //     s.setMethodHandler<SendData, RecvData>(1, echoCallback);
 //     s.start();
 
 //     std::list<Client> clients;
 //     for (int i = 0; i < 100; ++i) {
 //         try {
-//             clients.push_back(Client(SOCKET_PATH));
+//             clients.push_back(Client(F::getPoll(), SOCKET_PATH));
 //             clients.back().start();
 //         } catch (...) {}
 //     }
index d8b4b94..e96d363 100644 (file)
 #define BOOST_TEST_DYN_LINK
 #include <boost/test/unit_test.hpp>
 
+#include <boost/mpl/vector.hpp>
+
 #include <string>
 
 /**
+ * Usage example:
+ *
+ * MULTI_FIXTURE_TEST_CASE(Test, T, Fixture1, Fixture2, Fixture3) {
+ *     std::cout << T::i << "\n";
+ * }
+ */
+#define MULTI_FIXTURE_TEST_CASE(NAME, TPARAM, ...) \
+    typedef boost::mpl::vector<__VA_ARGS__> NAME##_fixtures; \
+    BOOST_FIXTURE_TEST_CASE_TEMPLATE(NAME, TPARAM, NAME##_fixtures, TPARAM)
+
+
+/**
  * An exception message checker
  *
  * Usage example: