Epoll modifyFD method added. Some refactor. 91/36891/5
authorPiotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
Mon, 16 Mar 2015 15:14:54 +0000 (16:14 +0100)
committerDariusz Michaluk <d.michaluk@samsung.com>
Wed, 25 Mar 2015 10:37:55 +0000 (03:37 -0700)
[Bug/Feature]   N/A
[Cause]         N/A
[Solution]      N/A
[Verification]  Run tests

Change-Id: Idcba92d12a618a8095420f3c941a12d61aef1761

common/epoll/event-poll.cpp
common/epoll/event-poll.hpp
common/epoll/thread-dispatcher.cpp
common/epoll/thread-dispatcher.hpp
common/ipc/client.cpp
common/ipc/internals/processor.hpp
common/ipc/service.cpp
tests/unit_tests/dbus/ut-connection.cpp
tests/unit_tests/epoll/ut-event-poll.cpp
tests/unit_tests/utils/ut-glib-loop.cpp

index 9eb9fa0..dfb0385 100644 (file)
@@ -79,14 +79,22 @@ void EventPoll::addFD(const int fd, const Events events, Callback&& callback)
     LOGT("Callback added for fd: " << fd);
 }
 
+void EventPoll::modifyFD(const int fd, const Events events)
+{
+    // No need to lock and check mCallbacks map
+    if (!modifyFDInternal(fd, events)) {
+        throw UtilsException("Could not modify fd");
+    }
+}
+
 void EventPoll::removeFD(const int fd)
 {
     std::lock_guard<Mutex> lock(mMutex);
 
     auto iter = mCallbacks.find(fd);
     if (iter == mCallbacks.end()) {
-        LOGW("Failed to remove nonexistent fd: " << fd);
-        throw UtilsException("FD does not exist");
+        LOGT("Callback not found, probably already removed fd: " << fd);
+        return;
     }
     mCallbacks.erase(iter);
     removeFDInternal(fd);
@@ -94,6 +102,7 @@ void EventPoll::removeFD(const int fd)
 }
 
 bool EventPoll::dispatchIteration(const int timeoutMs)
+
 {
     for (;;) {
         epoll_event event;
@@ -120,19 +129,16 @@ bool EventPoll::dispatchIteration(const int timeoutMs)
         std::shared_ptr<Callback> callback(iter->second);
         try {
             LOGT("Dispatch fd: " << event.data.fd << ", events: " << eventsToString(event.events));
-            return (*callback)(event.data.fd, event.events);
+            (*callback)(event.data.fd, event.events);
+            return true;
         } catch (std::exception& e) {
             LOGE("Got unexpected exception: " << e.what());
             assert(0 && "Callback should not throw any exceptions");
+            return true;
         }
     }
 }
 
-void EventPoll::dispatchLoop()
-{
-    while (dispatchIteration(-1)) {}
-}
-
 bool EventPoll::addFDInternal(const int fd, const Events events)
 {
     epoll_event event;
@@ -147,6 +153,20 @@ bool EventPoll::addFDInternal(const int fd, const Events events)
     return true;
 }
 
+bool EventPoll::modifyFDInternal(const int fd, const Events events)
+{
+    epoll_event event;
+    memset(&event, 0, sizeof(event));
+    event.events = events;
+    event.data.fd = fd;
+
+    if (epoll_ctl(mPollFD, EPOLL_CTL_MOD, fd, &event) == -1) {
+        LOGE("Failed to modify fd in poll: " << getSystemErrorMessage());
+        return false;
+    }
+    return true;
+}
+
 void EventPoll::removeFDInternal(const int fd)
 {
     if (epoll_ctl(mPollFD, EPOLL_CTL_DEL, fd, NULL) == -1) {
index 2d37aaa..67614ab 100644 (file)
@@ -37,7 +37,7 @@ namespace epoll {
 
 class EventPoll {
 public:
-    typedef std::function<bool(int fd, Events events)> Callback;
+    typedef std::function<void(int fd, Events events)> Callback;
 
     EventPoll();
     ~EventPoll();
@@ -45,10 +45,16 @@ public:
     int getPollFD() const;
 
     void addFD(const int fd, const Events events, Callback&& callback);
+    void modifyFD(const int fd, const Events events);
     void removeFD(const int fd);
 
+    /**
+     * Dispatch at most one signalled FD
+     * @param timeoutMs how long should wait in case of no pending events
+     *        (0 - return immediately, -1 - wait forever)
+     * @return false on timeout
+     */
     bool dispatchIteration(const int timeoutMs);
-    void dispatchLoop();
 
 private:
     typedef std::recursive_mutex Mutex;
@@ -58,6 +64,7 @@ private:
     std::unordered_map<int, std::shared_ptr<Callback>> mCallbacks;
 
     bool addFDInternal(const int fd, const Events events);
+    bool modifyFDInternal(const int fd, const Events events);
     void removeFDInternal(const int fd);
 };
 
index a82cad8..d286c19 100644 (file)
@@ -29,15 +29,18 @@ namespace vasum {
 namespace epoll {
 
 ThreadDispatcher::ThreadDispatcher()
+    : mStopped(false)
 {
-    auto controlCallback = [this](int, Events) -> bool {
+    auto controlCallback = [this](int, Events) {
         mStopEvent.receive();
-        return false; // break the loop
+        mStopped.store(true, std::memory_order_release);
     };
 
     mPoll.addFD(mStopEvent.getFD(), EPOLLIN, std::move(controlCallback));
     mThread = std::thread([this] {
-        mPoll.dispatchLoop();
+        while (!mStopped.load(std::memory_order_acquire)) {
+            mPoll.dispatchIteration(-1);
+        }
     });
 }
 
index 7d0f30d..5c6c145 100644 (file)
@@ -29,6 +29,7 @@
 #include "utils/eventfd.hpp"
 
 #include <thread>
+#include <atomic>
 
 namespace vasum {
 namespace epoll {
@@ -45,6 +46,7 @@ public:
 private:
     EventPoll mPoll;
     utils::EventFD mStopEvent;
+    std::atomic_bool mStopped;
     std::thread mThread;
 };
 
index abecb71..75a62ad 100644 (file)
@@ -58,9 +58,8 @@ void Client::start()
     }
     LOGS("Client start");
     // Initialize the connection with the server
-    auto handleEvent = [&](int, epoll::Events) -> bool {
+    auto handleEvent = [&](int, epoll::Events) {
         mProcessor.handleEvent();
-        return true;
     };
     mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleEvent);
     mProcessor.start();
@@ -98,6 +97,7 @@ void Client::handle(const FileDescriptor fd, const epoll::Events pollEvents)
 
     if (pollEvents & EPOLLIN) {
         mProcessor.handleInput(fd);
+        return; // because handleInput will handle RDHUP
     }
 
     if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
@@ -109,9 +109,8 @@ void Client::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
     LOGS("Client setNewPeerCallback");
     auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
-        auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+        auto handleFd = [&](FileDescriptor fd, epoll::Events events) {
             handle(fd, events);
-            return true;
         };
         mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
         if (newPeerCallback) {
index 6e0ae81..dec16e8 100644 (file)
@@ -80,11 +80,8 @@ const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
 *  - new way to generate UIDs
 *  - callbacks for serialization/parsing
 *  - store Sockets in a vector, maybe SocketStore?
-*  - poll loop outside.
 *  - waiting till the EventQueue is empty before leaving stop()
 *  - no new events added after stop() called
-*  - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
-*    there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
 *
 */
 class Processor {
index c436726..849ce3c 100644 (file)
@@ -63,17 +63,15 @@ void Service::start()
         return;
     }
     LOGS("Service start");
-    auto handleConnection = [&](int, epoll::Events) -> bool {
+    auto handleConnection = [&](int, epoll::Events) {
         mAcceptor.handleConnection();
-        return true;
     };
-    auto handleProcessorEvent = [&](int, epoll::Events) -> bool {
+    auto handleProcessorEvent = [&](int, epoll::Events) {
         mProcessor.handleEvent();
-        return true;
     };
-    mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection);
     mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleProcessorEvent);
     mProcessor.start();
+    mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection);
 }
 
 bool Service::isStarted()
@@ -87,9 +85,8 @@ void Service::stop()
         return;
     }
     LOGS("Service stop");
-    mProcessor.stop();
-
     mEventPoll.removeFD(mAcceptor.getConnectionFD());
+    mProcessor.stop();
     mEventPoll.removeFD(mProcessor.getEventFD());
 }
 
@@ -105,6 +102,7 @@ void Service::handle(const FileDescriptor fd, const epoll::Events pollEvents)
 
     if (pollEvents & EPOLLIN) {
         mProcessor.handleInput(fd);
+        return; // because handleInput will handle RDHUP
     }
 
     if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
@@ -116,9 +114,8 @@ void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
 {
     LOGS("Service setNewPeerCallback");
     auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
-        auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+        auto handleFd = [&](FileDescriptor fd, epoll::Events events) {
             handle(fd, events);
-            return true;
         };
         mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
         if (newPeerCallback) {
index 4948cd7..e52778e 100644 (file)
@@ -96,11 +96,6 @@ std::string getInterfaceFromIntrospectionXML(const std::string& xml, const std::
 
 } // namespace
 
-BOOST_AUTO_TEST_CASE(GlibLoopTest)
-{
-    ScopedGlibLoop loop;
-}
-
 BOOST_AUTO_TEST_CASE(DbusDaemonTest)
 {
     ScopedDbusDaemon daemon;
index 00cb385..3a26b5e 100644 (file)
 #include "epoll/event-poll.hpp"
 #include "logger/logger.hpp"
 #include "ipc/internals/socket.hpp"
-#include "utils/latch.hpp"
+#include "utils/value-latch.hpp"
 #include "utils/glib-loop.hpp"
 #include "epoll/glib-dispatcher.hpp"
 #include "epoll/thread-dispatcher.hpp"
 
+using namespace vasum;
 using namespace vasum::utils;
 using namespace vasum::epoll;
 using namespace vasum::ipc;
@@ -66,62 +67,152 @@ BOOST_AUTO_TEST_CASE(GlibPoll)
 
 void doSocketTest(EventPoll& poll)
 {
-    const std::string PATH = "/tmp/ut-poll.sock";
-    const std::string MESSAGE = "This is a test message";
-
-    Latch goodMessage;
-    Latch remoteClosed;
-
-    Socket listen = Socket::createSocket(PATH);
-    std::shared_ptr<Socket> server;
+    using namespace std::placeholders;
 
-    auto serverCallback = [&](int, Events events) -> bool {
+    //TODO don't use ipc socket
+    const std::string PATH = "/tmp/ut-poll.sock";
+    const size_t REQUEST_LEN = 5;
+    const std::string REQUEST_GOOD = "GET 1";
+    const std::string REQUEST_BAD = "GET 7";
+    const std::string RESPONSE = "This is a response message";
+
+    // Scenario 1:
+    // client connects to server listening socket
+    // client ---good-request---> server
+    // server ---response---> client
+    // client disconnects
+    //
+    // Scenario 2:
+    // client connects to server listening socket
+    // client ---bad-request----> server
+    // server disconnects
+
+    // { server setup
+
+    auto serverCallback = [&](int /*fd*/,
+                              Events events,
+                              std::shared_ptr<Socket> socket,
+                              CallbackGuard::Tracker) {
         LOGD("Server events: " << eventsToString(events));
 
+        if (events & EPOLLIN) {
+            std::string request(REQUEST_LEN, 'x');
+            socket->read(&request.front(), request.size());
+            if (request == REQUEST_GOOD) {
+                poll.modifyFD(socket->getFD(), EPOLLRDHUP | EPOLLOUT);
+            } else {
+                // disconnect (socket is kept in callback)
+                poll.removeFD(socket->getFD());
+            }
+        }
+
         if (events & EPOLLOUT) {
-            server->write(MESSAGE.data(), MESSAGE.size());
-            poll.removeFD(server->getFD());
-            server.reset();
+            socket->write(RESPONSE.data(), RESPONSE.size());
+            poll.modifyFD(socket->getFD(), EPOLLRDHUP);
+        }
+
+        if (events & EPOLLRDHUP) {
+            // client has disconnected
+            poll.removeFD(socket->getFD());
         }
-        return true;
     };
 
-    auto listenCallback = [&](int, Events events) -> bool {
+    Socket listenSocket = Socket::createSocket(PATH);
+    CallbackGuard serverSocketsGuard;
+
+    auto listenCallback = [&](int /*fd*/, Events events) {
         LOGD("Listen events: " << eventsToString(events));
         if (events & EPOLLIN) {
-            server = listen.accept();
-            poll.addFD(server->getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLOUT, serverCallback);
+            // accept new server connection
+            std::shared_ptr<Socket> socket = listenSocket.accept();
+            poll.addFD(socket->getFD(),
+                       EPOLLRDHUP | EPOLLIN,
+                       std::bind(serverCallback, _1, _2, socket, serverSocketsGuard.spawn()));
         }
-        return true;
     };
 
-    poll.addFD(listen.getFD(), EPOLLIN, listenCallback);
+    poll.addFD(listenSocket.getFD(), EPOLLIN, listenCallback);
+
+    // } server setup
 
-    Socket client = Socket::connectSocket(PATH);
+    // { client setup
 
-    auto clientCallback = [&](int, Events events) -> bool {
+    auto clientCallback = [&](int /*fd*/,
+                              Events events,
+                              Socket& socket,
+                              const std::string& request,
+                              ValueLatch<std::string>& response) {
         LOGD("Client events: " << eventsToString(events));
 
+        if (events & EPOLLOUT) {
+            socket.write(request.data(), request.size());
+            poll.modifyFD(socket.getFD(), EPOLLRDHUP | EPOLLIN);
+        }
+
         if (events & EPOLLIN) {
-            std::string ret(MESSAGE.size(), 'x');
-            client.read(&ret.front(), ret.size());
-            if (ret == MESSAGE) {
-                goodMessage.set();
+            try {
+                std::string msg(RESPONSE.size(), 'x');
+                socket.read(&msg.front(), msg.size());
+                response.set(msg);
+            } catch (UtilsException&) {
+                response.set(std::string());
             }
+            poll.modifyFD(socket.getFD(), EPOLLRDHUP);
         }
+
         if (events & EPOLLRDHUP) {
-            poll.removeFD(client.getFD());
-            remoteClosed.set();
+            LOGD("Server has disconnected");
+            poll.removeFD(socket.getFD()); //prevent active loop
         }
-        return true;
     };
 
-    poll.addFD(client.getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLIN, clientCallback);
-
-    BOOST_CHECK(goodMessage.wait(TIMEOUT));
-    BOOST_CHECK(remoteClosed.wait(TIMEOUT));
-
-    poll.removeFD(listen.getFD());
+    // } client setup
+
+    // Scenario 1
+    LOGD("Scerario 1");
+    {
+        Socket client = Socket::connectSocket(PATH);
+        ValueLatch<std::string> response;
+
+        poll.addFD(client.getFD(),
+                   EPOLLRDHUP | EPOLLOUT,
+                   std::bind(clientCallback,
+                             _1,
+                             _2,
+                             std::ref(client),
+                             REQUEST_GOOD,
+                             std::ref(response)));
+
+        BOOST_CHECK(response.get(TIMEOUT) == RESPONSE);
+
+        poll.removeFD(client.getFD());
+    }
+
+    // Scenario 2
+    LOGD("Scerario 2");
+    {
+        Socket client = Socket::connectSocket(PATH);
+        ValueLatch<std::string> response;
+
+        poll.addFD(client.getFD(),
+                   EPOLLRDHUP | EPOLLOUT,
+                   std::bind(clientCallback,
+                             _1,
+                             _2,
+                             std::ref(client),
+                             REQUEST_BAD,
+                             std::ref(response)));
+
+        BOOST_CHECK(response.get(TIMEOUT) == std::string());
+
+        poll.removeFD(client.getFD());
+    }
+    LOGD("Done");
+
+    poll.removeFD(listenSocket.getFD());
+
+    // wait for all server sockets (ensure all EPOLLRDHUP are processed)
+    BOOST_REQUIRE(serverSocketsGuard.waitForTrackers(TIMEOUT));
 }
 
 BOOST_AUTO_TEST_CASE(ThreadedPollSocket)
@@ -146,9 +237,8 @@ BOOST_AUTO_TEST_CASE(PollStacking)
 
     EventPoll innerPoll;
 
-    auto dispatchInner = [&](int, Events) -> bool {
+    auto dispatchInner = [&](int, Events) {
         innerPoll.dispatchIteration(0);
-        return true;
     };
     dispatcher.getPoll().addFD(innerPoll.getPollFD(), EPOLLIN, dispatchInner);
     doSocketTest(innerPoll);
index 30d5342..7c5e318 100644 (file)
 #include "config.hpp"
 #include "ut.hpp"
 
-#include "utils/latch.hpp"
 #include "utils/glib-loop.hpp"
 
 #include <atomic>
 
-BOOST_AUTO_TEST_SUITE(UtilsGlibLoopSuite)
+BOOST_AUTO_TEST_SUITE(GlibLoopSuite)
 
 using namespace vasum;
 using namespace vasum::utils;
@@ -39,22 +38,25 @@ using namespace vasum::utils;
 
 namespace {
 
-const unsigned int TIMER_INTERVAL_MS = 10;
-const unsigned int TIMER_NUMBER      = 5;
+const unsigned int TIMER_INTERVAL_MS = 100;
+const unsigned int TIMER_NUMBER      = 4;
 const unsigned int TIMER_WAIT_FOR    = 2 * TIMER_NUMBER * TIMER_INTERVAL_MS;
 
 } // namespace
 
+BOOST_AUTO_TEST_CASE(GlibLoopTest)
+{
+    ScopedGlibLoop loop;
+}
+
 BOOST_AUTO_TEST_CASE(GlibTimerEventTest)
 {
     ScopedGlibLoop loop;
-    Latch latch;
     std::atomic_uint counter(0);
 
     CallbackGuard guard;
 
-    Glib::OnTimerEventCallback callback = [&]()->bool {
-        latch.set();
+    auto callback = [&]()-> bool {
         if (++counter >= TIMER_NUMBER) {
             return false;
         }
@@ -63,8 +65,9 @@ BOOST_AUTO_TEST_CASE(GlibTimerEventTest)
 
     Glib::addTimerEvent(TIMER_INTERVAL_MS, callback, guard);
 
-    BOOST_REQUIRE(latch.waitForN(TIMER_NUMBER, TIMER_WAIT_FOR));
-    BOOST_REQUIRE(latch.wait(TIMER_WAIT_FOR) == false);
+    BOOST_CHECK(counter < TIMER_NUMBER);
+    BOOST_CHECK(guard.waitForTrackers(TIMER_WAIT_FOR));
+    BOOST_CHECK_EQUAL(counter, TIMER_NUMBER);
 }
 
 BOOST_AUTO_TEST_SUITE_END()