LOGT("Callback added for fd: " << fd);
}
+void EventPoll::modifyFD(const int fd, const Events events)
+{
+ // No need to lock and check mCallbacks map
+ if (!modifyFDInternal(fd, events)) {
+ throw UtilsException("Could not modify fd");
+ }
+}
+
void EventPoll::removeFD(const int fd)
{
std::lock_guard<Mutex> lock(mMutex);
auto iter = mCallbacks.find(fd);
if (iter == mCallbacks.end()) {
- LOGW("Failed to remove nonexistent fd: " << fd);
- throw UtilsException("FD does not exist");
+ LOGT("Callback not found, probably already removed fd: " << fd);
+ return;
}
mCallbacks.erase(iter);
removeFDInternal(fd);
}
bool EventPoll::dispatchIteration(const int timeoutMs)
+
{
for (;;) {
epoll_event event;
std::shared_ptr<Callback> callback(iter->second);
try {
LOGT("Dispatch fd: " << event.data.fd << ", events: " << eventsToString(event.events));
- return (*callback)(event.data.fd, event.events);
+ (*callback)(event.data.fd, event.events);
+ return true;
} catch (std::exception& e) {
LOGE("Got unexpected exception: " << e.what());
assert(0 && "Callback should not throw any exceptions");
+ return true;
}
}
}
-void EventPoll::dispatchLoop()
-{
- while (dispatchIteration(-1)) {}
-}
-
bool EventPoll::addFDInternal(const int fd, const Events events)
{
epoll_event event;
return true;
}
+bool EventPoll::modifyFDInternal(const int fd, const Events events)
+{
+ epoll_event event;
+ memset(&event, 0, sizeof(event));
+ event.events = events;
+ event.data.fd = fd;
+
+ if (epoll_ctl(mPollFD, EPOLL_CTL_MOD, fd, &event) == -1) {
+ LOGE("Failed to modify fd in poll: " << getSystemErrorMessage());
+ return false;
+ }
+ return true;
+}
+
void EventPoll::removeFDInternal(const int fd)
{
if (epoll_ctl(mPollFD, EPOLL_CTL_DEL, fd, NULL) == -1) {
class EventPoll {
public:
- typedef std::function<bool(int fd, Events events)> Callback;
+ typedef std::function<void(int fd, Events events)> Callback;
EventPoll();
~EventPoll();
int getPollFD() const;
void addFD(const int fd, const Events events, Callback&& callback);
+ void modifyFD(const int fd, const Events events);
void removeFD(const int fd);
+ /**
+ * Dispatch at most one signalled FD
+ * @param timeoutMs how long should wait in case of no pending events
+ * (0 - return immediately, -1 - wait forever)
+ * @return false on timeout
+ */
bool dispatchIteration(const int timeoutMs);
- void dispatchLoop();
private:
typedef std::recursive_mutex Mutex;
std::unordered_map<int, std::shared_ptr<Callback>> mCallbacks;
bool addFDInternal(const int fd, const Events events);
+ bool modifyFDInternal(const int fd, const Events events);
void removeFDInternal(const int fd);
};
namespace epoll {
ThreadDispatcher::ThreadDispatcher()
+ : mStopped(false)
{
- auto controlCallback = [this](int, Events) -> bool {
+ auto controlCallback = [this](int, Events) {
mStopEvent.receive();
- return false; // break the loop
+ mStopped.store(true, std::memory_order_release);
};
mPoll.addFD(mStopEvent.getFD(), EPOLLIN, std::move(controlCallback));
mThread = std::thread([this] {
- mPoll.dispatchLoop();
+ while (!mStopped.load(std::memory_order_acquire)) {
+ mPoll.dispatchIteration(-1);
+ }
});
}
#include "utils/eventfd.hpp"
#include <thread>
+#include <atomic>
namespace vasum {
namespace epoll {
private:
EventPoll mPoll;
utils::EventFD mStopEvent;
+ std::atomic_bool mStopped;
std::thread mThread;
};
}
LOGS("Client start");
// Initialize the connection with the server
- auto handleEvent = [&](int, epoll::Events) -> bool {
+ auto handleEvent = [&](int, epoll::Events) {
mProcessor.handleEvent();
- return true;
};
mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleEvent);
mProcessor.start();
if (pollEvents & EPOLLIN) {
mProcessor.handleInput(fd);
+ return; // because handleInput will handle RDHUP
}
if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
{
LOGS("Client setNewPeerCallback");
auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+ auto handleFd = [&](FileDescriptor fd, epoll::Events events) {
handle(fd, events);
- return true;
};
mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
if (newPeerCallback) {
* - new way to generate UIDs
* - callbacks for serialization/parsing
* - store Sockets in a vector, maybe SocketStore?
-* - poll loop outside.
* - waiting till the EventQueue is empty before leaving stop()
* - no new events added after stop() called
-* - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
-* there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
*
*/
class Processor {
return;
}
LOGS("Service start");
- auto handleConnection = [&](int, epoll::Events) -> bool {
+ auto handleConnection = [&](int, epoll::Events) {
mAcceptor.handleConnection();
- return true;
};
- auto handleProcessorEvent = [&](int, epoll::Events) -> bool {
+ auto handleProcessorEvent = [&](int, epoll::Events) {
mProcessor.handleEvent();
- return true;
};
- mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection);
mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleProcessorEvent);
mProcessor.start();
+ mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection);
}
bool Service::isStarted()
return;
}
LOGS("Service stop");
- mProcessor.stop();
-
mEventPoll.removeFD(mAcceptor.getConnectionFD());
+ mProcessor.stop();
mEventPoll.removeFD(mProcessor.getEventFD());
}
if (pollEvents & EPOLLIN) {
mProcessor.handleInput(fd);
+ return; // because handleInput will handle RDHUP
}
if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
{
LOGS("Service setNewPeerCallback");
auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+ auto handleFd = [&](FileDescriptor fd, epoll::Events events) {
handle(fd, events);
- return true;
};
mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
if (newPeerCallback) {
} // namespace
-BOOST_AUTO_TEST_CASE(GlibLoopTest)
-{
- ScopedGlibLoop loop;
-}
-
BOOST_AUTO_TEST_CASE(DbusDaemonTest)
{
ScopedDbusDaemon daemon;
#include "epoll/event-poll.hpp"
#include "logger/logger.hpp"
#include "ipc/internals/socket.hpp"
-#include "utils/latch.hpp"
+#include "utils/value-latch.hpp"
#include "utils/glib-loop.hpp"
#include "epoll/glib-dispatcher.hpp"
#include "epoll/thread-dispatcher.hpp"
+using namespace vasum;
using namespace vasum::utils;
using namespace vasum::epoll;
using namespace vasum::ipc;
void doSocketTest(EventPoll& poll)
{
- const std::string PATH = "/tmp/ut-poll.sock";
- const std::string MESSAGE = "This is a test message";
-
- Latch goodMessage;
- Latch remoteClosed;
-
- Socket listen = Socket::createSocket(PATH);
- std::shared_ptr<Socket> server;
+ using namespace std::placeholders;
- auto serverCallback = [&](int, Events events) -> bool {
+ //TODO don't use ipc socket
+ const std::string PATH = "/tmp/ut-poll.sock";
+ const size_t REQUEST_LEN = 5;
+ const std::string REQUEST_GOOD = "GET 1";
+ const std::string REQUEST_BAD = "GET 7";
+ const std::string RESPONSE = "This is a response message";
+
+ // Scenario 1:
+ // client connects to server listening socket
+ // client ---good-request---> server
+ // server ---response---> client
+ // client disconnects
+ //
+ // Scenario 2:
+ // client connects to server listening socket
+ // client ---bad-request----> server
+ // server disconnects
+
+ // { server setup
+
+ auto serverCallback = [&](int /*fd*/,
+ Events events,
+ std::shared_ptr<Socket> socket,
+ CallbackGuard::Tracker) {
LOGD("Server events: " << eventsToString(events));
+ if (events & EPOLLIN) {
+ std::string request(REQUEST_LEN, 'x');
+ socket->read(&request.front(), request.size());
+ if (request == REQUEST_GOOD) {
+ poll.modifyFD(socket->getFD(), EPOLLRDHUP | EPOLLOUT);
+ } else {
+ // disconnect (socket is kept in callback)
+ poll.removeFD(socket->getFD());
+ }
+ }
+
if (events & EPOLLOUT) {
- server->write(MESSAGE.data(), MESSAGE.size());
- poll.removeFD(server->getFD());
- server.reset();
+ socket->write(RESPONSE.data(), RESPONSE.size());
+ poll.modifyFD(socket->getFD(), EPOLLRDHUP);
+ }
+
+ if (events & EPOLLRDHUP) {
+ // client has disconnected
+ poll.removeFD(socket->getFD());
}
- return true;
};
- auto listenCallback = [&](int, Events events) -> bool {
+ Socket listenSocket = Socket::createSocket(PATH);
+ CallbackGuard serverSocketsGuard;
+
+ auto listenCallback = [&](int /*fd*/, Events events) {
LOGD("Listen events: " << eventsToString(events));
if (events & EPOLLIN) {
- server = listen.accept();
- poll.addFD(server->getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLOUT, serverCallback);
+ // accept new server connection
+ std::shared_ptr<Socket> socket = listenSocket.accept();
+ poll.addFD(socket->getFD(),
+ EPOLLRDHUP | EPOLLIN,
+ std::bind(serverCallback, _1, _2, socket, serverSocketsGuard.spawn()));
}
- return true;
};
- poll.addFD(listen.getFD(), EPOLLIN, listenCallback);
+ poll.addFD(listenSocket.getFD(), EPOLLIN, listenCallback);
+
+ // } server setup
- Socket client = Socket::connectSocket(PATH);
+ // { client setup
- auto clientCallback = [&](int, Events events) -> bool {
+ auto clientCallback = [&](int /*fd*/,
+ Events events,
+ Socket& socket,
+ const std::string& request,
+ ValueLatch<std::string>& response) {
LOGD("Client events: " << eventsToString(events));
+ if (events & EPOLLOUT) {
+ socket.write(request.data(), request.size());
+ poll.modifyFD(socket.getFD(), EPOLLRDHUP | EPOLLIN);
+ }
+
if (events & EPOLLIN) {
- std::string ret(MESSAGE.size(), 'x');
- client.read(&ret.front(), ret.size());
- if (ret == MESSAGE) {
- goodMessage.set();
+ try {
+ std::string msg(RESPONSE.size(), 'x');
+ socket.read(&msg.front(), msg.size());
+ response.set(msg);
+ } catch (UtilsException&) {
+ response.set(std::string());
}
+ poll.modifyFD(socket.getFD(), EPOLLRDHUP);
}
+
if (events & EPOLLRDHUP) {
- poll.removeFD(client.getFD());
- remoteClosed.set();
+ LOGD("Server has disconnected");
+ poll.removeFD(socket.getFD()); //prevent active loop
}
- return true;
};
- poll.addFD(client.getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLIN, clientCallback);
-
- BOOST_CHECK(goodMessage.wait(TIMEOUT));
- BOOST_CHECK(remoteClosed.wait(TIMEOUT));
-
- poll.removeFD(listen.getFD());
+ // } client setup
+
+ // Scenario 1
+ LOGD("Scerario 1");
+ {
+ Socket client = Socket::connectSocket(PATH);
+ ValueLatch<std::string> response;
+
+ poll.addFD(client.getFD(),
+ EPOLLRDHUP | EPOLLOUT,
+ std::bind(clientCallback,
+ _1,
+ _2,
+ std::ref(client),
+ REQUEST_GOOD,
+ std::ref(response)));
+
+ BOOST_CHECK(response.get(TIMEOUT) == RESPONSE);
+
+ poll.removeFD(client.getFD());
+ }
+
+ // Scenario 2
+ LOGD("Scerario 2");
+ {
+ Socket client = Socket::connectSocket(PATH);
+ ValueLatch<std::string> response;
+
+ poll.addFD(client.getFD(),
+ EPOLLRDHUP | EPOLLOUT,
+ std::bind(clientCallback,
+ _1,
+ _2,
+ std::ref(client),
+ REQUEST_BAD,
+ std::ref(response)));
+
+ BOOST_CHECK(response.get(TIMEOUT) == std::string());
+
+ poll.removeFD(client.getFD());
+ }
+ LOGD("Done");
+
+ poll.removeFD(listenSocket.getFD());
+
+ // wait for all server sockets (ensure all EPOLLRDHUP are processed)
+ BOOST_REQUIRE(serverSocketsGuard.waitForTrackers(TIMEOUT));
}
BOOST_AUTO_TEST_CASE(ThreadedPollSocket)
EventPoll innerPoll;
- auto dispatchInner = [&](int, Events) -> bool {
+ auto dispatchInner = [&](int, Events) {
innerPoll.dispatchIteration(0);
- return true;
};
dispatcher.getPoll().addFD(innerPoll.getPollFD(), EPOLLIN, dispatchInner);
doSocketTest(innerPoll);
#include "config.hpp"
#include "ut.hpp"
-#include "utils/latch.hpp"
#include "utils/glib-loop.hpp"
#include <atomic>
-BOOST_AUTO_TEST_SUITE(UtilsGlibLoopSuite)
+BOOST_AUTO_TEST_SUITE(GlibLoopSuite)
using namespace vasum;
using namespace vasum::utils;
namespace {
-const unsigned int TIMER_INTERVAL_MS = 10;
-const unsigned int TIMER_NUMBER = 5;
+const unsigned int TIMER_INTERVAL_MS = 100;
+const unsigned int TIMER_NUMBER = 4;
const unsigned int TIMER_WAIT_FOR = 2 * TIMER_NUMBER * TIMER_INTERVAL_MS;
} // namespace
+BOOST_AUTO_TEST_CASE(GlibLoopTest)
+{
+ ScopedGlibLoop loop;
+}
+
BOOST_AUTO_TEST_CASE(GlibTimerEventTest)
{
ScopedGlibLoop loop;
- Latch latch;
std::atomic_uint counter(0);
CallbackGuard guard;
- Glib::OnTimerEventCallback callback = [&]()->bool {
- latch.set();
+ auto callback = [&]()-> bool {
if (++counter >= TIMER_NUMBER) {
return false;
}
Glib::addTimerEvent(TIMER_INTERVAL_MS, callback, guard);
- BOOST_REQUIRE(latch.waitForN(TIMER_NUMBER, TIMER_WAIT_FOR));
- BOOST_REQUIRE(latch.wait(TIMER_WAIT_FOR) == false);
+ BOOST_CHECK(counter < TIMER_NUMBER);
+ BOOST_CHECK(guard.waitForTrackers(TIMER_WAIT_FOR));
+ BOOST_CHECK_EQUAL(counter, TIMER_NUMBER);
}
BOOST_AUTO_TEST_SUITE_END()