From 418c3f10a57e5b990bac875a524fde7051785dec Mon Sep 17 00:00:00 2001 From: Jan Olszak Date: Mon, 20 Jul 2015 15:02:46 +0200 Subject: [PATCH] IPC: Cleaned up EventPoll usage [Feature] Adding and removing fds to EventPoll is guarded internaly by Processor and Acceptor [Cause] N/A [Solution] N/A [Verification] Build, install, run tests Change-Id: I97999591b6c586159698f4f0231740c3c52d5cc5 --- libs/ipc/client.cpp | 10 ++-------- libs/ipc/epoll/event-poll.cpp | 4 +++- libs/ipc/internals/acceptor.cpp | 16 +++++++++------- libs/ipc/internals/acceptor.hpp | 19 +++++++++---------- libs/ipc/internals/processor.cpp | 11 ++++++++--- libs/ipc/internals/processor.hpp | 6 +++++- libs/ipc/service.cpp | 18 +++++------------- 7 files changed, 41 insertions(+), 43 deletions(-) diff --git a/libs/ipc/client.cpp b/libs/ipc/client.cpp index 3fff662..eb7f8d2 100644 --- a/libs/ipc/client.cpp +++ b/libs/ipc/client.cpp @@ -32,7 +32,7 @@ namespace ipc { Client::Client(epoll::EventPoll& eventPoll, const std::string& socketPath) : mEventPoll(eventPoll), - mProcessor("[CLIENT] "), + mProcessor(eventPoll, "[CLIENT] "), mSocketPath(socketPath) { LOGS("Client Constructor"); @@ -56,11 +56,7 @@ void Client::start() return; } LOGS("Client start"); - // Initialize the connection with the server - auto handleEvent = [&](int, epoll::Events) { - mProcessor.handleEvent(); - }; - mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleEvent); + mProcessor.start(); LOGD("Connecting to " + mSocketPath); @@ -80,8 +76,6 @@ void Client::stop() } LOGS("Client stop"); mProcessor.stop(); - - mEventPoll.removeFD(mProcessor.getEventFD()); } void Client::handle(const FileDescriptor fd, const epoll::Events pollEvents) diff --git a/libs/ipc/epoll/event-poll.cpp b/libs/ipc/epoll/event-poll.cpp index fb8470e..b6c4902 100644 --- a/libs/ipc/epoll/event-poll.cpp +++ b/libs/ipc/epoll/event-poll.cpp @@ -69,11 +69,12 @@ void EventPoll::addFD(const int fd, const Events events, Callback&& callback) std::lock_guard lock(mMutex); if (mCallbacks.find(fd) != mCallbacks.end()) { - LOGW("Already added fd: " << fd); + LOGE("Already added fd: " << fd); throw UtilsException("FD already added"); } if (!addFDInternal(fd, events)) { + LOGE("Could not add fd"); throw UtilsException("Could not add fd"); } @@ -85,6 +86,7 @@ void EventPoll::modifyFD(const int fd, const Events events) { // No need to lock and check mCallbacks map if (!modifyFDInternal(fd, events)) { + LOGE("Could not modify fd: " << fd); throw UtilsException("Could not modify fd"); } } diff --git a/libs/ipc/internals/acceptor.cpp b/libs/ipc/internals/acceptor.cpp index 604c65a..82430c4 100644 --- a/libs/ipc/internals/acceptor.cpp +++ b/libs/ipc/internals/acceptor.cpp @@ -27,18 +27,25 @@ #include "ipc/internals/acceptor.hpp" #include "logger/logger.hpp" +#include + namespace ipc { -Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback) - : mNewConnectionCallback(newConnectionCallback), +Acceptor::Acceptor(epoll::EventPoll& eventPoll, + const std::string& socketPath, + const NewConnectionCallback& newConnectionCallback) + : mEventPoll(eventPoll), + mNewConnectionCallback(newConnectionCallback), mSocket(Socket::createSocket(socketPath)) { LOGT("Creating Acceptor for socket " << socketPath); + mEventPoll.addFD(mSocket.getFD(), EPOLLIN, std::bind(&Acceptor::handleConnection, this)); } Acceptor::~Acceptor() { LOGT("Destroyed Acceptor"); + mEventPoll.removeFD(mSocket.getFD()); } void Acceptor::handleConnection() @@ -47,9 +54,4 @@ void Acceptor::handleConnection() mNewConnectionCallback(tmpSocket); } -FileDescriptor Acceptor::getConnectionFD() -{ - return mSocket.getFD(); -} - } // namespace ipc diff --git a/libs/ipc/internals/acceptor.hpp b/libs/ipc/internals/acceptor.hpp index 9725e70..7af0d3c 100644 --- a/libs/ipc/internals/acceptor.hpp +++ b/libs/ipc/internals/acceptor.hpp @@ -28,6 +28,7 @@ #include "config.hpp" #include "ipc/internals/socket.hpp" +#include "ipc/epoll/event-poll.hpp" #include "ipc/types.hpp" #include @@ -45,30 +46,28 @@ public: /** * Class for accepting new connections. * + * @param eventPoll dispatcher * @param socketPath path to the socket * @param newConnectionCallback called on new connections */ - Acceptor(const std::string& socketPath, + Acceptor(epoll::EventPoll& eventPoll, + const std::string& socketPath, const NewConnectionCallback& newConnectionCallback); ~Acceptor(); Acceptor(const Acceptor& acceptor) = delete; Acceptor& operator=(const Acceptor&) = delete; +private: + epoll::EventPoll& mEventPoll; + NewConnectionCallback mNewConnectionCallback; + Socket mSocket; + /** * Handle one incoming connection. * Used with external polling */ void handleConnection(); - - /** - * @return file descriptor for the connection socket - */ - FileDescriptor getConnectionFD(); - -private: - NewConnectionCallback mNewConnectionCallback; - Socket mSocket; }; } // namespace ipc diff --git a/libs/ipc/internals/processor.cpp b/libs/ipc/internals/processor.cpp index 2e7a575..3b6d790 100644 --- a/libs/ipc/internals/processor.cpp +++ b/libs/ipc/internals/processor.cpp @@ -53,11 +53,13 @@ const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits::max( const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits::max() - 1; const MethodID Processor::ERROR_METHOD_ID = std::numeric_limits::max() - 2; -Processor::Processor(const std::string& logName, +Processor::Processor(epoll::EventPoll& eventPoll, + const std::string& logName, const PeerCallback& newPeerCallback, const PeerCallback& removedPeerCallback, const unsigned int maxNumberOfPeers) - : mLogPrefix(logName), + : mEventPoll(eventPoll), + mLogPrefix(logName), mIsRunning(false), mNewPeerCallback(newPeerCallback), mRemovedPeerCallback(removedPeerCallback), @@ -110,6 +112,8 @@ void Processor::start() if (!mIsRunning) { LOGI(mLogPrefix + "Processor start"); mIsRunning = true; + + mEventPoll.addFD(mRequestQueue.getFD(), EPOLLIN, std::bind(&Processor::handleEvent, this)); } } @@ -684,9 +688,10 @@ bool Processor::onFinishRequest(FinishRequest& requestFinisher) std::make_exception_ptr(IPCClosingException())); } + mEventPoll.removeFD(mRequestQueue.getFD()); mIsRunning = false; - requestFinisher.conditionPtr->notify_all(); + return true; } diff --git a/libs/ipc/internals/processor.hpp b/libs/ipc/internals/processor.hpp index a0e39bf..b3ca9fb 100644 --- a/libs/ipc/internals/processor.hpp +++ b/libs/ipc/internals/processor.hpp @@ -34,6 +34,7 @@ #include "ipc/internals/remove-peer-request.hpp" #include "ipc/internals/send-result-request.hpp" #include "ipc/internals/finish-request.hpp" +#include "ipc/epoll/event-poll.hpp" #include "ipc/exception.hpp" #include "ipc/method-result.hpp" #include "ipc/types.hpp" @@ -119,7 +120,8 @@ public: * @param newPeerCallback called when a new peer arrives * @param removedPeerCallback called when the Processor stops listening for this peer */ - Processor(const std::string& logName = "", + Processor(epoll::EventPoll& eventPoll, + const std::string& logName = "", const PeerCallback& newPeerCallback = nullptr, const PeerCallback& removedPeerCallback = nullptr, const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS); @@ -417,6 +419,8 @@ private: std::shared_ptr socketPtr; }; + epoll::EventPoll& mEventPoll; + typedef std::vector Peers; std::string mLogPrefix; diff --git a/libs/ipc/service.cpp b/libs/ipc/service.cpp index 48d8c96..2e3dbe7 100644 --- a/libs/ipc/service.cpp +++ b/libs/ipc/service.cpp @@ -37,8 +37,8 @@ Service::Service(epoll::EventPoll& eventPoll, const PeerCallback& addPeerCallback, const PeerCallback& removePeerCallback) : mEventPoll(eventPoll), - mProcessor("[SERVICE] "), - mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1)) + mProcessor(eventPoll, "[SERVICE] "), + mAcceptor(eventPoll, socketPath, std::bind(&Processor::addPeer, &mProcessor, _1)) { LOGS("Service Constructor"); @@ -62,15 +62,9 @@ void Service::start() return; } LOGS("Service start"); - auto handleConnection = [&](int, epoll::Events) { - mAcceptor.handleConnection(); - }; - auto handleProcessorEvent = [&](int, epoll::Events) { - mProcessor.handleEvent(); - }; - mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleProcessorEvent); + mProcessor.start(); - mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection); + } bool Service::isStarted() @@ -84,9 +78,7 @@ void Service::stop() return; } LOGS("Service stop"); - mEventPoll.removeFD(mAcceptor.getConnectionFD()); mProcessor.stop(); - mEventPoll.removeFD(mProcessor.getEventFD()); } void Service::handle(const FileDescriptor fd, const epoll::Events pollEvents) @@ -95,7 +87,7 @@ void Service::handle(const FileDescriptor fd, const epoll::Events pollEvents) LOGS("Service handle"); if (!isStarted()) { - LOGW("Service stopped"); + LOGW("Service stopped, but got event: " << pollEvents << " on fd: " << fd); return; } -- 2.7.4