IPC: Cleaned up EventPoll usage 83/44283/2
authorJan Olszak <j.olszak@samsung.com>
Mon, 20 Jul 2015 13:02:46 +0000 (15:02 +0200)
committerJan Olszak <j.olszak@samsung.com>
Mon, 20 Jul 2015 13:09:43 +0000 (15:09 +0200)
[Feature]       Adding and removing fds to EventPoll
                is guarded internaly by Processor and Acceptor
[Cause]         N/A
[Solution]      N/A
[Verification]  Build, install, run tests

Change-Id: I97999591b6c586159698f4f0231740c3c52d5cc5

libs/ipc/client.cpp
libs/ipc/epoll/event-poll.cpp
libs/ipc/internals/acceptor.cpp
libs/ipc/internals/acceptor.hpp
libs/ipc/internals/processor.cpp
libs/ipc/internals/processor.hpp
libs/ipc/service.cpp

index 3fff662..eb7f8d2 100644 (file)
@@ -32,7 +32,7 @@ namespace ipc {
 
 Client::Client(epoll::EventPoll& eventPoll, const std::string& socketPath)
     : mEventPoll(eventPoll),
-      mProcessor("[CLIENT]  "),
+      mProcessor(eventPoll, "[CLIENT]  "),
       mSocketPath(socketPath)
 {
     LOGS("Client Constructor");
@@ -56,11 +56,7 @@ void Client::start()
         return;
     }
     LOGS("Client start");
-    // Initialize the connection with the server
-    auto handleEvent = [&](int, epoll::Events) {
-        mProcessor.handleEvent();
-    };
-    mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleEvent);
+
     mProcessor.start();
 
     LOGD("Connecting to " + mSocketPath);
@@ -80,8 +76,6 @@ void Client::stop()
     }
     LOGS("Client stop");
     mProcessor.stop();
-
-    mEventPoll.removeFD(mProcessor.getEventFD());
 }
 
 void Client::handle(const FileDescriptor fd, const epoll::Events pollEvents)
index fb8470e..b6c4902 100644 (file)
@@ -69,11 +69,12 @@ void EventPoll::addFD(const int fd, const Events events, Callback&& callback)
     std::lock_guard<Mutex> lock(mMutex);
 
     if (mCallbacks.find(fd) != mCallbacks.end()) {
-        LOGW("Already added fd: " << fd);
+        LOGE("Already added fd: " << fd);
         throw UtilsException("FD already added");
     }
 
     if (!addFDInternal(fd, events)) {
+        LOGE("Could not add fd");
         throw UtilsException("Could not add fd");
     }
 
@@ -85,6 +86,7 @@ void EventPoll::modifyFD(const int fd, const Events events)
 {
     // No need to lock and check mCallbacks map
     if (!modifyFDInternal(fd, events)) {
+        LOGE("Could not modify fd: " << fd);
         throw UtilsException("Could not modify fd");
     }
 }
index 604c65a..82430c4 100644 (file)
 #include "ipc/internals/acceptor.hpp"
 #include "logger/logger.hpp"
 
+#include <functional>
+
 namespace ipc {
 
-Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback)
-    : mNewConnectionCallback(newConnectionCallback),
+Acceptor::Acceptor(epoll::EventPoll& eventPoll,
+                   const std::string& socketPath,
+                   const NewConnectionCallback& newConnectionCallback)
+    : mEventPoll(eventPoll),
+      mNewConnectionCallback(newConnectionCallback),
       mSocket(Socket::createSocket(socketPath))
 {
     LOGT("Creating Acceptor for socket " << socketPath);
+    mEventPoll.addFD(mSocket.getFD(), EPOLLIN, std::bind(&Acceptor::handleConnection, this));
 }
 
 Acceptor::~Acceptor()
 {
     LOGT("Destroyed Acceptor");
+    mEventPoll.removeFD(mSocket.getFD());
 }
 
 void Acceptor::handleConnection()
@@ -47,9 +54,4 @@ void Acceptor::handleConnection()
     mNewConnectionCallback(tmpSocket);
 }
 
-FileDescriptor Acceptor::getConnectionFD()
-{
-    return mSocket.getFD();
-}
-
 } // namespace ipc
index 9725e70..7af0d3c 100644 (file)
@@ -28,6 +28,7 @@
 #include "config.hpp"
 
 #include "ipc/internals/socket.hpp"
+#include "ipc/epoll/event-poll.hpp"
 #include "ipc/types.hpp"
 
 #include <string>
@@ -45,30 +46,28 @@ public:
     /**
      * Class for accepting new connections.
      *
+     * @param eventPoll dispatcher
      * @param socketPath path to the socket
      * @param newConnectionCallback called on new connections
      */
-    Acceptor(const std::string& socketPath,
+    Acceptor(epoll::EventPoll& eventPoll,
+             const std::string& socketPath,
              const NewConnectionCallback& newConnectionCallback);
     ~Acceptor();
 
     Acceptor(const Acceptor& acceptor) = delete;
     Acceptor& operator=(const Acceptor&) = delete;
 
+private:
+    epoll::EventPoll& mEventPoll;
+    NewConnectionCallback mNewConnectionCallback;
+    Socket mSocket;
+
     /**
      * Handle one incoming connection.
      * Used with external polling
      */
     void handleConnection();
-
-    /**
-     * @return file descriptor for the connection socket
-     */
-    FileDescriptor getConnectionFD();
-
-private:
-    NewConnectionCallback mNewConnectionCallback;
-    Socket mSocket;
 };
 
 } // namespace ipc
index 2e7a575..3b6d790 100644 (file)
@@ -53,11 +53,13 @@ const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max(
 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
 const MethodID Processor::ERROR_METHOD_ID = std::numeric_limits<MethodID>::max() - 2;
 
-Processor::Processor(const std::string& logName,
+Processor::Processor(epoll::EventPoll& eventPoll,
+                     const std::string& logName,
                      const PeerCallback& newPeerCallback,
                      const PeerCallback& removedPeerCallback,
                      const unsigned int maxNumberOfPeers)
-    : mLogPrefix(logName),
+    : mEventPoll(eventPoll),
+      mLogPrefix(logName),
       mIsRunning(false),
       mNewPeerCallback(newPeerCallback),
       mRemovedPeerCallback(removedPeerCallback),
@@ -110,6 +112,8 @@ void Processor::start()
     if (!mIsRunning) {
         LOGI(mLogPrefix + "Processor start");
         mIsRunning = true;
+
+        mEventPoll.addFD(mRequestQueue.getFD(), EPOLLIN, std::bind(&Processor::handleEvent, this));
     }
 }
 
@@ -684,9 +688,10 @@ bool Processor::onFinishRequest(FinishRequest& requestFinisher)
                            std::make_exception_ptr(IPCClosingException()));
     }
 
+    mEventPoll.removeFD(mRequestQueue.getFD());
     mIsRunning = false;
-
     requestFinisher.conditionPtr->notify_all();
+
     return true;
 }
 
index a0e39bf..b3ca9fb 100644 (file)
@@ -34,6 +34,7 @@
 #include "ipc/internals/remove-peer-request.hpp"
 #include "ipc/internals/send-result-request.hpp"
 #include "ipc/internals/finish-request.hpp"
+#include "ipc/epoll/event-poll.hpp"
 #include "ipc/exception.hpp"
 #include "ipc/method-result.hpp"
 #include "ipc/types.hpp"
@@ -119,7 +120,8 @@ public:
      * @param newPeerCallback called when a new peer arrives
      * @param removedPeerCallback called when the Processor stops listening for this peer
      */
-    Processor(const std::string& logName = "",
+    Processor(epoll::EventPoll& eventPoll,
+              const std::string& logName = "",
               const PeerCallback& newPeerCallback = nullptr,
               const PeerCallback& removedPeerCallback = nullptr,
               const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
@@ -417,6 +419,8 @@ private:
         std::shared_ptr<Socket> socketPtr;
     };
 
+    epoll::EventPoll& mEventPoll;
+
     typedef std::vector<PeerInfo> Peers;
 
     std::string mLogPrefix;
index 48d8c96..2e3dbe7 100644 (file)
@@ -37,8 +37,8 @@ Service::Service(epoll::EventPoll& eventPoll,
                  const PeerCallback& addPeerCallback,
                  const PeerCallback& removePeerCallback)
     : mEventPoll(eventPoll),
-      mProcessor("[SERVICE] "),
-      mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1))
+      mProcessor(eventPoll, "[SERVICE] "),
+      mAcceptor(eventPoll, socketPath, std::bind(&Processor::addPeer, &mProcessor, _1))
 
 {
     LOGS("Service Constructor");
@@ -62,15 +62,9 @@ void Service::start()
         return;
     }
     LOGS("Service start");
-    auto handleConnection = [&](int, epoll::Events) {
-        mAcceptor.handleConnection();
-    };
-    auto handleProcessorEvent = [&](int, epoll::Events) {
-        mProcessor.handleEvent();
-    };
-    mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleProcessorEvent);
+
     mProcessor.start();
-    mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection);
+
 }
 
 bool Service::isStarted()
@@ -84,9 +78,7 @@ void Service::stop()
         return;
     }
     LOGS("Service stop");
-    mEventPoll.removeFD(mAcceptor.getConnectionFD());
     mProcessor.stop();
-    mEventPoll.removeFD(mProcessor.getEventFD());
 }
 
 void Service::handle(const FileDescriptor fd, const epoll::Events pollEvents)
@@ -95,7 +87,7 @@ void Service::handle(const FileDescriptor fd, const epoll::Events pollEvents)
     LOGS("Service handle");
 
     if (!isStarted()) {
-        LOGW("Service stopped");
+        LOGW("Service stopped, but got event: " << pollEvents << " on fd: " << fd);
         return;
     }