Epoll wrapper, epoll dispatchers (glib and threaded) 12/35712/8
authorPiotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
Mon, 23 Feb 2015 17:03:32 +0000 (18:03 +0100)
committerPiotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
Mon, 2 Mar 2015 15:05:05 +0000 (16:05 +0100)
[Bug/Feature]   N/A
[Cause]         N/A
[Solution]      Epoll is the only way, using no additional threads, to
                aggregate descriptors to one epoll descriptor.
[Verification]  Run tests

Change-Id: I3863129a8b947c467615b2e9d352fce3bd1cda9a

19 files changed:
common/ipc/client.cpp
common/ipc/internals/acceptor.cpp
common/ipc/internals/event-queue.hpp
common/ipc/internals/processor.cpp
common/ipc/internals/request-queue.hpp
common/ipc/internals/socket.cpp
common/ipc/service.cpp
common/utils/event-poll.cpp [new file with mode: 0644]
common/utils/event-poll.hpp [new file with mode: 0644]
common/utils/eventfd.cpp [moved from common/ipc/internals/eventfd.cpp with 67% similarity]
common/utils/eventfd.hpp [moved from common/ipc/internals/eventfd.hpp with 88% similarity]
common/utils/fd-utils.cpp [moved from common/ipc/internals/utils.cpp with 68% similarity]
common/utils/fd-utils.hpp [moved from common/ipc/internals/utils.hpp with 90% similarity]
common/utils/glib-poll-dispatcher.cpp [new file with mode: 0644]
common/utils/glib-poll-dispatcher.hpp [new file with mode: 0644]
common/utils/thread-poll-dispatcher.cpp [new file with mode: 0644]
common/utils/thread-poll-dispatcher.hpp [new file with mode: 0644]
tests/unit_tests/ipc/ut-ipc.cpp
tests/unit_tests/utils/ut-event-poll.cpp [new file with mode: 0644]

index 51e59f8..6669d8a 100644 (file)
@@ -45,7 +45,7 @@ Client::~Client()
     LOGS("Client Destructor");
     try {
         stop();
-    } catch (IPCException& e) {
+    } catch (std::exception& e) {
         LOGE("Error in Client's destructor: " << e.what());
     }
 }
index 627e1fe..ecb2210 100644 (file)
@@ -49,7 +49,7 @@ Acceptor::~Acceptor()
     LOGT("Destroying Acceptor");
     try {
         stop();
-    } catch (IPCException& e) {
+    } catch (std::exception& e) {
         LOGE("Error in destructor: " << e.what());
     }
     LOGT("Destroyed Acceptor");
index 2c591f7..b4532fd 100644 (file)
@@ -26,7 +26,7 @@
 #define COMMON_IPC_INTERNALS_EVENT_QUEUE_HPP
 
 #include "ipc/exception.hpp"
-#include "ipc/internals/eventfd.hpp"
+#include "utils/eventfd.hpp"
 #include "logger/logger.hpp"
 
 #include <string>
@@ -82,7 +82,7 @@ private:
     std::mutex mCommunicationMutex;
     std::queue<MessageType> mMessages;
 
-    EventFD mEventFD;
+    utils::EventFD mEventFD;
 };
 
 template<typename MessageType>
index 79c1a22..93f721f 100644 (file)
@@ -26,8 +26,8 @@
 
 #include "ipc/exception.hpp"
 #include "ipc/internals/processor.hpp"
-#include "ipc/internals/utils.hpp"
 #include "utils/signal.hpp"
+#include "utils/exception.hpp"
 
 #include <cerrno>
 #include <cstring>
@@ -79,21 +79,21 @@ Processor::~Processor()
     LOGS(mLogPrefix + "Processor Destructor");
     try {
         stop();
-    } catch (IPCException& e) {
+    } catch (std::exception& e) {
         LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what());
     }
 }
 
 Processor::Peers::iterator Processor::getPeerInfoIterator(const FileDescriptor fd)
 {
-    return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&fd](const PeerInfo & peerInfo) {
+    return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [fd](const PeerInfo & peerInfo) {
         return fd == peerInfo.socketPtr->getFD();
     });
 }
 
 Processor::Peers::iterator Processor::getPeerInfoIterator(const PeerID peerID)
 {
-    return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&peerID](const PeerInfo & peerInfo) {
+    return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [peerID](const PeerInfo & peerInfo) {
         return peerID == peerInfo.peerID;
     });
 }
@@ -411,7 +411,7 @@ bool Processor::handleInput(const FileDescriptor fd)
             Socket::Guard guard = socket.getGuard();
             socket.read(&methodID, sizeof(methodID));
             socket.read(&messageID, sizeof(messageID));
-        } catch (const IPCException& e) {
+        } catch (const UtilsException& e) {
             LOGE(mLogPrefix + "Error during reading the socket");
             removePeerInternal(peerIt,
                                std::make_exception_ptr(IPCNaughtyPeerException()));
index e1ad46b..44306ee 100644 (file)
@@ -26,7 +26,7 @@
 #define COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP
 
 #include "ipc/exception.hpp"
-#include "ipc/internals/eventfd.hpp"
+#include "utils/eventfd.hpp"
 #include "logger/logger.hpp"
 
 #include <list>
@@ -115,7 +115,7 @@ private:
 
     std::list<Request> mRequests;
     std::mutex mStateMutex;
-    EventFD mEventFD;
+    utils::EventFD mEventFD;
 };
 
 template<typename RequestIdType>
index 2469af6..a66456e 100644 (file)
@@ -26,7 +26,7 @@
 
 #include "ipc/exception.hpp"
 #include "ipc/internals/socket.hpp"
-#include "ipc/internals/utils.hpp"
+#include "utils/fd-utils.hpp"
 #include "logger/logger.hpp"
 
 #include <systemd/sd-daemon.h>
@@ -60,8 +60,8 @@ Socket::Socket(Socket&& socket) noexcept
 Socket::~Socket() noexcept
 {
     try {
-        ipc::close(mFD);
-    } catch (IPCException& e) {
+        utils::close(mFD);
+    } catch (std::exception& e) {
         LOGE("Error in Socket's destructor: " << e.what());
     }
 }
@@ -89,13 +89,13 @@ std::shared_ptr<Socket> Socket::accept()
 void Socket::write(const void* bufferPtr, const size_t size) const
 {
     Guard guard(mCommunicationMutex);
-    ipc::write(mFD, bufferPtr, size);
+    utils::write(mFD, bufferPtr, size);
 }
 
 void Socket::read(void* bufferPtr, const size_t size) const
 {
     Guard guard(mCommunicationMutex);
-    ipc::read(mFD, bufferPtr, size);
+    utils::read(mFD, bufferPtr, size);
 }
 
 int Socket::getSystemdSocket(const std::string& path)
@@ -142,7 +142,7 @@ int Socket::createZoneSocket(const std::string& path)
                      reinterpret_cast<struct sockaddr*>(&serverAddress),
                      sizeof(struct sockaddr_un))) {
         std::string message = strerror(errno);
-        ::close(sockfd);
+        utils::close(sockfd);
         LOGE("Error in bind: " << message);
         IPCException("Error in bind: " + message);
     }
@@ -150,7 +150,7 @@ int Socket::createZoneSocket(const std::string& path)
     if (-1 == ::listen(sockfd,
                        MAX_QUEUE_LENGTH)) {
         std::string message = strerror(errno);
-        ::close(sockfd);
+        utils::close(sockfd);
         LOGE("Error in listen: " << message);
         IPCException("Error in listen: " + message);
     }
@@ -188,7 +188,7 @@ Socket Socket::connectSocket(const std::string& path)
     if (-1 == connect(fd,
                       reinterpret_cast<struct sockaddr*>(&serverAddress),
                       sizeof(struct sockaddr_un))) {
-        ::close(fd);
+        utils::close(fd);
         LOGE("Error in connect: " + std::string(strerror(errno)));
         throw IPCException("Error in connect: " + std::string(strerror(errno)));
     }
@@ -196,7 +196,7 @@ Socket Socket::connectSocket(const std::string& path)
     // Nonblock socket
     int flags = fcntl(fd, F_GETFL, 0);
     if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
-        ::close(fd);
+        utils::close(fd);
         LOGE("Error in fcntl: " + std::string(strerror(errno)));
         throw IPCException("Error in fcntl: " + std::string(strerror(errno)));
     }
index d945f73..442d804 100644 (file)
@@ -50,7 +50,7 @@ Service::~Service()
     LOGS("Service Destructor");
     try {
         stop();
-    } catch (IPCException& e) {
+    } catch (std::exception& e) {
         LOGE("Error in Service's destructor: " << e.what());
     }
 }
diff --git a/common/utils/event-poll.cpp b/common/utils/event-poll.cpp
new file mode 100644 (file)
index 0000000..b37d727
--- /dev/null
@@ -0,0 +1,151 @@
+/*
+ *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   C++ epoll wrapper
+ */
+
+#include "config.hpp"
+#include "utils/event-poll.hpp"
+#include "utils/fd-utils.hpp"
+#include "utils/exception.hpp"
+#include "logger/logger.hpp"
+
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <string.h>
+#include <assert.h>
+
+namespace vasum {
+namespace utils {
+
+EventPoll::EventPoll()
+    : mPollFD(::epoll_create1(EPOLL_CLOEXEC))
+{
+    if (mPollFD == -1) {
+        LOGE("Failed to create epoll: " << getSystemErrorMessage());
+        throw UtilsException("Could not create epoll");
+    }
+}
+
+EventPoll::~EventPoll()
+{
+    if (!mCallbacks.empty()) {
+        LOGW("Not removed callbacks: " << mCallbacks.size());
+        assert(0 && "Not removed callbacks left");
+    }
+    utils::close(mPollFD);
+}
+
+int EventPoll::getPollFD() const
+{
+    return mPollFD;
+}
+
+void EventPoll::addFD(const int fd, const Events events, Callback&& callback)
+{
+    std::lock_guard<Mutex> lock(mMutex);
+
+    if (mCallbacks.find(fd) != mCallbacks.end()) {
+        LOGW("Already added fd: " << fd);
+        throw UtilsException("FD already added");
+    }
+
+    if (!addFDInternal(fd, events)) {
+        throw UtilsException("Could not add fd");
+    }
+
+    mCallbacks.insert({fd, std::make_shared<Callback>(std::move(callback))});
+    LOGT("Callback added for " << fd);
+}
+
+void EventPoll::removeFD(const int fd)
+{
+    std::lock_guard<Mutex> lock(mMutex);
+
+    auto iter = mCallbacks.find(fd);
+    if (iter == mCallbacks.end()) {
+        LOGW("Failed to remove nonexistent fd: " << fd);
+        throw UtilsException("FD does not exist");
+    }
+    mCallbacks.erase(iter);
+    removeFDInternal(fd);
+    LOGT("Callback removed for " << fd);
+}
+
+bool EventPoll::dispatchIteration(const int timeoutMs)
+{
+    for (;;) {
+        epoll_event event;
+        int num = epoll_wait(mPollFD, &event, 1, timeoutMs);
+        if (num == 0) {
+            return false; // timeout
+        }
+        if (num < 0) {
+            if (errno == EINTR) {
+                continue;
+            }
+            LOGE("Failed to wait on epoll: " << getSystemErrorMessage());
+            throw UtilsException("Could not wait for event");
+        }
+
+        // callback could be removed in the meantime, so be careful, find it inside lock
+        std::lock_guard<Mutex> lock(mMutex);
+        auto iter = mCallbacks.find(event.data.fd);
+        if (iter == mCallbacks.end()) {
+            continue;
+        }
+
+        // add ref because removeFD(self) can be called inside callback
+        std::shared_ptr<Callback> callback(iter->second);
+        return (*callback)(event.data.fd, event.events);
+    }
+}
+
+void EventPoll::dispatchLoop()
+{
+    while (dispatchIteration(-1)) {}
+}
+
+bool EventPoll::addFDInternal(const int fd, const Events events)
+{
+    epoll_event event;
+    memset(&event, 0, sizeof(event));
+    event.events = events;
+    event.data.fd = fd;
+
+    if (epoll_ctl(mPollFD, EPOLL_CTL_ADD, fd, &event) == -1) {
+        LOGE("Failed to add fd to poll: " << getSystemErrorMessage());
+        return false;
+    }
+    return true;
+}
+
+void EventPoll::removeFDInternal(const int fd)
+{
+    if (epoll_ctl(mPollFD, EPOLL_CTL_DEL, fd, NULL) == -1) {
+        assert(errno != EBADF); // always removeFD before closing fd locally!
+                                // this is important because linux will reuse this fd number
+        LOGE("Failed to remove fd from poll: " << getSystemErrorMessage());
+    }
+}
+
+} // namespace utils
+} // namespace vasum
diff --git a/common/utils/event-poll.hpp b/common/utils/event-poll.hpp
new file mode 100644 (file)
index 0000000..1a7ae2c
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   C++ epoll wrapper
+ */
+
+#ifndef COMMON_UTILS_EVENT_POLL_HPP
+#define COMMON_UTILS_EVENT_POLL_HPP
+
+#include <functional>
+#include <mutex>
+#include <unordered_map>
+#include <memory>
+
+namespace vasum {
+namespace utils {
+
+class EventPoll {
+public:
+
+    typedef unsigned int Events;
+    typedef std::function<bool(int fd, Events events)> Callback;
+
+    EventPoll();
+    ~EventPoll();
+
+    int getPollFD() const;
+
+    void addFD(const int fd, const Events events, Callback&& callback);
+    void removeFD(const int fd);
+
+    bool dispatchIteration(const int timeoutMs);
+    void dispatchLoop();
+
+private:
+    typedef std::recursive_mutex Mutex;
+
+    const int mPollFD;
+    Mutex mMutex;
+    std::unordered_map<int, std::shared_ptr<Callback>> mCallbacks;
+
+    bool addFDInternal(const int fd, const Events events);
+    void removeFDInternal(const int fd);
+};
+
+
+} // namespace utils
+} // namespace vasum
+
+#endif // COMMON_UTILS_EVENT_POLL_HPP
similarity index 67%
rename from common/ipc/internals/eventfd.cpp
rename to common/utils/eventfd.cpp
index 729243d..2fd8d62 100644 (file)
@@ -24,9 +24,9 @@
 
 #include "config.hpp"
 
-#include "ipc/internals/eventfd.hpp"
-#include "ipc/internals/utils.hpp"
-#include "ipc/exception.hpp"
+#include "utils/eventfd.hpp"
+#include "utils/exception.hpp"
+#include "utils/fd-utils.hpp"
 #include "logger/logger.hpp"
 
 #include <sys/eventfd.h>
 #include <cstdint>
 
 namespace vasum {
-namespace ipc {
+namespace utils {
 
 EventFD::EventFD()
 {
-    mFD = ::eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
+    mFD = ::eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC);
     if (mFD == -1) {
-        LOGE("Error in eventfd: " << std::string(strerror(errno)));
-        throw IPCException("Error in eventfd: " + std::string(strerror(errno)));
+        LOGE("Error in eventfd: " << getSystemErrorMessage());
+        throw UtilsException("Error in eventfd: " + getSystemErrorMessage());
     }
 }
 
 EventFD::~EventFD()
 {
-    try {
-        ipc::close(mFD);
-    } catch (IPCException& e) {
-        LOGE("Error in Event's destructor: " << e.what());
-    }
+    utils::close(mFD);
 }
 
 int EventFD::getFD() const
@@ -63,15 +59,15 @@ int EventFD::getFD() const
 void EventFD::send()
 {
     const std::uint64_t toSend = 1;
-    ipc::write(mFD, &toSend, sizeof(toSend));
+    utils::write(mFD, &toSend, sizeof(toSend));
 }
 
 void EventFD::receive()
 {
     std::uint64_t readBuffer;
-    ipc::read(mFD, &readBuffer, sizeof(readBuffer));
+    utils::read(mFD, &readBuffer, sizeof(readBuffer));
 }
 
 
-} // namespace ipc
+} // namespace utils
 } // namespace vasum
similarity index 88%
rename from common/ipc/internals/eventfd.hpp
rename to common/utils/eventfd.hpp
index cf3094c..b3d5027 100644 (file)
  * @brief   Eventfd wrapper
  */
 
-#ifndef COMMON_IPC_INTERNALS_EVENTFD_HPP
-#define COMMON_IPC_INTERNALS_EVENTFD_HPP
+#ifndef COMMON_UTILS_EVENTFD_HPP
+#define COMMON_UTILS_EVENTFD_HPP
 
 namespace vasum {
-namespace ipc {
+namespace utils {
 
 class EventFD {
 public:
 
     EventFD();
     ~EventFD();
+
     EventFD(const EventFD& eventfd) = delete;
     EventFD& operator=(const EventFD&) = delete;
 
@@ -56,7 +57,7 @@ private:
     int mFD;
 };
 
-} // namespace ipc
+} // namespace utils
 } // namespace vasum
 
-#endif // COMMON_IPC_INTERNALS_EVENTFD_HPP
+#endif // COMMON_UTILS_EVENTFD_HPP
similarity index 68%
rename from common/ipc/internals/utils.cpp
rename to common/utils/fd-utils.cpp
index cb9c93c..59bad36 100644 (file)
 /**
  * @file
  * @author  Jan Olszak (j.olszak@samsung.com)
- * @brief   Utility functions
+ * @brief   File descriptor utility functions
  */
 
 #include "config.hpp"
 
-#include "ipc/exception.hpp"
-#include "ipc/internals/utils.hpp"
+#include "utils/fd-utils.hpp"
+#include "utils/exception.hpp"
 #include "logger/logger.hpp"
 
 #include <cerrno>
@@ -40,7 +40,7 @@ namespace fs = boost::filesystem;
 namespace chr = std::chrono;
 
 namespace vasum {
-namespace ipc {
+namespace utils {
 
 namespace {
 
@@ -51,13 +51,14 @@ void waitForEvent(int fd,
     // Wait for the rest of the data
     struct pollfd fds[1];
     fds[0].fd = fd;
-    fds[0].events = event | POLLHUP;
+    fds[0].events = event;
 
     for (;;) {
         chr::milliseconds timeoutMS = chr::duration_cast<chr::milliseconds>(deadline - chr::high_resolution_clock::now());
         if (timeoutMS.count() < 0) {
-            LOGE("Timeout in read");
-            throw IPCException("Timeout in read");
+            LOGE("Timeout while waiting for event: " << std::hex << event <<
+                 " on fd: " << std::dec << fd);
+            throw UtilsException("Timeout");
         }
 
         int ret = ::poll(fds, 1 /*fds size*/, timeoutMS.count());
@@ -66,13 +67,13 @@ void waitForEvent(int fd,
             if (errno == EINTR) {
                 continue;
             }
-            LOGE("Error in poll: " + std::string(strerror(errno)));
-            throw IPCException("Error in poll: " + std::string(strerror(errno)));
+            LOGE("Error in poll: " + getSystemErrorMessage());
+            throw UtilsException("Error in poll: " + getSystemErrorMessage());
         }
 
         if (ret == 0) {
             LOGE("Timeout in read");
-            throw IPCException("Timeout in read");
+            throw UtilsException("Timeout in read");
         }
 
         if (fds[0].revents & event) {
@@ -82,7 +83,7 @@ void waitForEvent(int fd,
 
         if (fds[0].revents & POLLHUP) {
             LOGW("Peer disconnected");
-            throw IPCException("Peer disconnected");
+            throw UtilsException("Peer disconnected");
         }
     }
 }
@@ -98,11 +99,10 @@ void close(int fd)
     for (;;) {
         if (-1 == ::close(fd)) {
             if (errno == EINTR) {
-                LOGD("Close interrupted by a signal, retrying");
+                LOGT("Close interrupted by a signal, retrying");
                 continue;
             }
-            LOGE("Error in close: " << std::string(strerror(errno)));
-            throw IPCException("Error in close: " + std::string(strerror(errno)));
+            LOGE("Error in close: " << getSystemErrorMessage());
         }
         break;
     }
@@ -118,22 +118,21 @@ void write(int fd, const void* bufferPtr, const size_t size, int timeoutMS)
         int n  = ::write(fd,
                          reinterpret_cast<const char*>(bufferPtr) + nTotal,
                          size - nTotal);
-        if (n > 0) {
+        if (n >= 0) {
             nTotal += n;
+            if (nTotal == size) {
+                // All data is written, break loop
+                break;
+            }
         } else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
             // Neglected errors
             LOGD("Retrying write");
         } else {
-            LOGE("Error during writing: " + std::string(strerror(errno)));
-            throw IPCException("Error during writing: " + std::string(strerror(errno)));
+            LOGE("Error during writing: " + getSystemErrorMessage());
+            throw UtilsException("Error during writing: " + getSystemErrorMessage());
         }
 
-        if (nTotal >= size) {
-            // All data is written, break loop
-            break;
-        } else {
-            waitForEvent(fd, POLLOUT, deadline);
-        }
+        waitForEvent(fd, POLLOUT, deadline);
     }
 }
 
@@ -147,22 +146,25 @@ void read(int fd, void* bufferPtr, const size_t size, int timeoutMS)
         int n  = ::read(fd,
                         reinterpret_cast<char*>(bufferPtr) + nTotal,
                         size - nTotal);
-        if (n > 0) {
+        if (n >= 0) {
             nTotal += n;
+            if (nTotal == size) {
+                // All data is read, break loop
+                break;
+            }
+            if (n == 0) {
+                LOGW("Peer disconnected");
+                throw UtilsException("Peer disconnected");
+            }
         } else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
             // Neglected errors
             LOGD("Retrying read");
         } else {
-            LOGE("Error during reading: " + std::string(strerror(errno)));
-            throw IPCException("Error during reading: " + std::string(strerror(errno)));
+            LOGE("Error during reading: " + getSystemErrorMessage());
+            throw UtilsException("Error during reading: " + getSystemErrorMessage());
         }
 
-        if (nTotal >= size) {
-            // All data is read, break loop
-            break;
-        } else {
-            waitForEvent(fd, POLLIN, deadline);
-        }
+        waitForEvent(fd, POLLIN, deadline);
     }
 }
 
@@ -170,8 +172,8 @@ unsigned int getMaxFDNumber()
 {
     struct rlimit rlim;
     if (-1 ==  getrlimit(RLIMIT_NOFILE, &rlim)) {
-        LOGE("Error during getrlimit: " + std::string(strerror(errno)));
-        throw IPCException("Error during getrlimit: " + std::string(strerror(errno)));
+        LOGE("Error during getrlimit: " + getSystemErrorMessage());
+        throw UtilsException("Error during getrlimit: " + getSystemErrorMessage());
     }
     return rlim.rlim_cur;
 }
@@ -182,8 +184,8 @@ void setMaxFDNumber(unsigned int limit)
     rlim.rlim_cur = limit;
     rlim.rlim_max = limit;
     if (-1 ==  setrlimit(RLIMIT_NOFILE, &rlim)) {
-        LOGE("Error during setrlimit: " + std::string(strerror(errno)));
-        throw IPCException("Error during setrlimit: " + std::string(strerror(errno)));
+        LOGE("Error during setrlimit: " + getSystemErrorMessage());
+        throw UtilsException("Error during setrlimit: " + getSystemErrorMessage());
     }
 }
 
@@ -194,6 +196,6 @@ unsigned int getFDNumber()
                          fs::directory_iterator());
 }
 
-} // namespace ipc
+} // namespace utils
 } // namespace vasum
 
similarity index 90%
rename from common/ipc/internals/utils.hpp
rename to common/utils/fd-utils.hpp
index c3bcaf1..ac32b0a 100644 (file)
 /**
  * @file
  * @author  Jan Olszak (j.olszak@samsung.com)
- * @brief   Utility functions
+ * @brief   File descriptor utility functions
  */
 
-#ifndef COMMON_IPC_INTERNALS_UTILS_HPP
-#define COMMON_IPC_INTERNALS_UTILS_HPP
+#ifndef COMMON_UTILS_FD_HPP
+#define COMMON_UTILS_FD_HPP
 
 #include <cstddef>
 
 namespace vasum {
-namespace ipc {
+namespace utils {
 
 /**
  * Close the file descriptor.
- * Repeat until
  */
 void close(int fd);
 
@@ -73,7 +72,7 @@ void setMaxFDNumber(unsigned int limit);
  */
 unsigned int getFDNumber();
 
-} // namespace ipc
+} // namespace utils
 } // namespace vasum
 
-#endif // COMMON_IPC_INTERNALS_UTILS_HPP
+#endif // COMMON_UTILS_FD_HPP
diff --git a/common/utils/glib-poll-dispatcher.cpp b/common/utils/glib-poll-dispatcher.cpp
new file mode 100644 (file)
index 0000000..5c66d16
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   glib epoll dispatcher
+ */
+
+#include "config.hpp"
+#include "utils/glib-poll-dispatcher.hpp"
+#include "utils/callback-wrapper.hpp"
+
+namespace vasum {
+namespace utils {
+
+GlibPollDispatcher::GlibPollDispatcher(EventPoll& poll)
+{
+    mChannel = g_io_channel_unix_new(poll.getPollFD());
+
+    auto dispatchCallback = [&]() {
+        poll.dispatchIteration(0);
+    };
+
+    auto cCallback = [](GIOChannel*, GIOCondition, gpointer data) -> gboolean {
+        getCallbackFromPointer<decltype(dispatchCallback)>(data)();
+        return TRUE;
+    };
+
+    mWatchId = g_io_add_watch_full(mChannel,
+                                   G_PRIORITY_DEFAULT,
+                                   G_IO_IN,
+                                   cCallback,
+                                   createCallbackWrapper(dispatchCallback, mGuard.spawn()),
+                                   &deleteCallbackWrapper<decltype(dispatchCallback)>);
+}
+
+GlibPollDispatcher::~GlibPollDispatcher()
+{
+    g_source_remove(mWatchId);
+    g_io_channel_unref(mChannel);
+    // mGuard destructor will wait for full unregister of dispatchCallback
+}
+
+} // namespace utils
+} // namespace vasum
diff --git a/common/utils/glib-poll-dispatcher.hpp b/common/utils/glib-poll-dispatcher.hpp
new file mode 100644 (file)
index 0000000..07da0c3
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   glib epoll dispatcher
+ */
+
+#ifndef COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
+#define COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
+
+#include "utils/event-poll.hpp"
+#include "utils/callback-guard.hpp"
+
+#include <gio/gio.h>
+
+namespace vasum {
+namespace utils {
+
+/**
+ * Will dispatch poll events in glib thread
+ */
+class GlibPollDispatcher {
+public:
+    GlibPollDispatcher(EventPoll& poll);
+    ~GlibPollDispatcher();
+private:
+    CallbackGuard mGuard;
+    GIOChannel* mChannel;
+    guint mWatchId;
+};
+
+
+} // namespace utils
+} // namespace vasum
+
+#endif // COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
diff --git a/common/utils/thread-poll-dispatcher.cpp b/common/utils/thread-poll-dispatcher.cpp
new file mode 100644 (file)
index 0000000..f350587
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   Thread epoll dispatcher
+ */
+
+#include "config.hpp"
+#include "utils/thread-poll-dispatcher.hpp"
+
+#include <sys/epoll.h>
+
+namespace vasum {
+namespace utils {
+
+ThreadPollDispatcher::ThreadPollDispatcher(EventPoll& poll)
+    : mPoll(poll)
+    , mThread([&]{ poll.dispatchLoop(); })
+{
+    auto controlCallback = [this](int, EventPoll::Events) -> bool {
+        mStopEvent.receive();
+        return false; // break the loop
+    };
+
+    poll.addFD(mStopEvent.getFD(), EPOLLIN, std::move(controlCallback));
+}
+
+ThreadPollDispatcher::~ThreadPollDispatcher()
+{
+    mStopEvent.send();
+    mThread.join();
+    mPoll.removeFD(mStopEvent.getFD());
+}
+
+} // namespace utils
+} // namespace vasum
diff --git a/common/utils/thread-poll-dispatcher.hpp b/common/utils/thread-poll-dispatcher.hpp
new file mode 100644 (file)
index 0000000..e54cb4e
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   Thread epoll dispatcher
+ */
+
+#ifndef COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP
+#define COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP
+
+#include "utils/event-poll.hpp"
+#include "utils/eventfd.hpp"
+
+#include <thread>
+
+namespace vasum {
+namespace utils {
+
+/**
+ * Will dispatch poll events in a newly created thread
+ */
+class ThreadPollDispatcher {
+public:
+    ThreadPollDispatcher(EventPoll& poll);
+    ~ThreadPollDispatcher();
+private:
+    EventPoll& mPoll;
+    EventFD mStopEvent;
+    std::thread mThread;
+};
+
+} // namespace utils
+} // namespace vasum
+
+#endif // COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP
index 193eec9..fa0de22 100644 (file)
@@ -592,7 +592,6 @@ BOOST_AUTO_TEST_CASE(ServiceGSource)
         l.set();
     };
 
-    IPCGSource::Pointer serviceGSource;
     Service s(SOCKET_PATH);
     s.setMethodHandler<SendData, RecvData>(1, echoCallback);
 
@@ -622,7 +621,6 @@ BOOST_AUTO_TEST_CASE(ClientGSource)
     Service s(SOCKET_PATH);
     s.start();
 
-    IPCGSource::Pointer clientGSource;
     Client c(SOCKET_PATH);
     c.setMethodHandler<SendData, RecvData>(1, echoCallback);
     c.setSignalHandler<RecvData>(2, signalHandler);
diff --git a/tests/unit_tests/utils/ut-event-poll.cpp b/tests/unit_tests/utils/ut-event-poll.cpp
new file mode 100644 (file)
index 0000000..e387393
--- /dev/null
@@ -0,0 +1,205 @@
+/*
+ *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+
+/**
+ * @file
+ * @author  Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief   Unit tests of event poll
+ */
+
+#include "config.hpp"
+#include "ut.hpp"
+
+#include "utils/event-poll.hpp"
+#include "logger/logger.hpp"
+#include "ipc/internals/socket.hpp"
+#include "utils/latch.hpp"
+#include "utils/glib-loop.hpp"
+#include "utils/glib-poll-dispatcher.hpp"
+#include "utils/thread-poll-dispatcher.hpp"
+
+#include <map>
+#include <sys/epoll.h>
+
+using namespace vasum::utils;
+using namespace vasum::ipc;
+
+namespace {
+
+const int unsigned TIMEOUT = 1000;
+#define ADD_EVENT(e) {EPOLL##e, #e}
+const std::map<EventPoll::Events, std::string> EVENT_NAMES = {
+    ADD_EVENT(IN),
+    ADD_EVENT(OUT),
+    ADD_EVENT(ERR),
+    ADD_EVENT(HUP),
+    ADD_EVENT(RDHUP),
+};
+#undef ADD_EVENT
+
+std::string strEvents(EventPoll::Events events)
+{
+    if (events == 0) {
+        return "<none>";
+    }
+    std::ostringstream ss;
+    for (const auto& p : EVENT_NAMES) {
+        if (events & p.first) {
+            ss << p.second << ", ";
+            events &= ~p.first;
+        }
+    }
+    if (events != 0) {
+        ss << std::hex << events;
+        return ss.str();
+    } else {
+        std::string ret = ss.str();
+        ret.resize(ret.size() - 2);
+        return ret;
+    }
+}
+
+} // namespace
+
+BOOST_AUTO_TEST_SUITE(EventPollSuite)
+
+BOOST_AUTO_TEST_CASE(EmptyPoll)
+{
+    EventPoll poll;
+    BOOST_CHECK(!poll.dispatchIteration(0));
+}
+
+BOOST_AUTO_TEST_CASE(ThreadedPoll)
+{
+    EventPoll poll;
+    ThreadPollDispatcher dispatcher(poll);
+}
+
+BOOST_AUTO_TEST_CASE(GlibPoll)
+{
+    ScopedGlibLoop loop;
+
+    EventPoll poll;
+    GlibPollDispatcher dispatcher(poll);
+}
+
+void doSocketTest(EventPoll& poll, Latch& goodMessage, Latch& remoteClosed)
+{
+    const std::string PATH = "/tmp/ut-poll.sock";
+    const std::string MESSAGE = "This is a test message";
+
+    Socket listen = Socket::createSocket(PATH);
+    std::shared_ptr<Socket> server;
+
+    auto serverCallback = [&](int, EventPoll::Events events) -> bool {
+        LOGD("Server events: " << strEvents(events));
+
+        if (events & EPOLLOUT) {
+            server->write(MESSAGE.data(), MESSAGE.size());
+            poll.removeFD(server->getFD());
+            server.reset();
+        }
+        return true;
+    };
+
+    auto listenCallback = [&](int, EventPoll::Events events) -> bool {
+        LOGD("Listen events: " << strEvents(events));
+        if (events & EPOLLIN) {
+            server = listen.accept();
+            poll.addFD(server->getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLOUT, serverCallback);
+        }
+        return true;
+    };
+
+    poll.addFD(listen.getFD(), EPOLLIN, listenCallback);
+
+    Socket client = Socket::connectSocket(PATH);
+
+    auto clientCallback = [&](int, EventPoll::Events events) -> bool {
+        LOGD("Client events: " << strEvents(events));
+
+        if (events & EPOLLIN) {
+            std::string ret(MESSAGE.size(), 'x');
+            client.read(&ret.front(), ret.size());
+            if (ret == MESSAGE) {
+                goodMessage.set();
+            }
+        }
+        if (events & EPOLLRDHUP) {
+            poll.removeFD(client.getFD());
+            remoteClosed.set();
+        }
+        return true;
+    };
+
+    poll.addFD(client.getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLIN, clientCallback);
+
+    BOOST_CHECK(goodMessage.wait(TIMEOUT));
+    BOOST_CHECK(remoteClosed.wait(TIMEOUT));
+
+    poll.removeFD(listen.getFD());
+}
+
+BOOST_AUTO_TEST_CASE(ThreadedPollSocket)
+{
+    Latch goodMessage;
+    Latch remoteClosed;
+
+    EventPoll poll;
+    ThreadPollDispatcher dispatcher(poll);
+
+    doSocketTest(poll, goodMessage, remoteClosed);
+}
+
+BOOST_AUTO_TEST_CASE(GlibPollSocket)
+{
+    Latch goodMessage;
+    Latch remoteClosed;
+
+    ScopedGlibLoop loop;
+
+    EventPoll poll;
+    GlibPollDispatcher dispatcher(poll);
+
+    doSocketTest(poll, goodMessage, remoteClosed);
+}
+
+BOOST_AUTO_TEST_CASE(PollStacking)
+{
+    Latch goodMessage;
+    Latch remoteClosed;
+
+    EventPoll outer;
+    EventPoll inner;
+
+    auto dispatchInner = [&](int, EventPoll::Events) -> bool {
+        inner.dispatchIteration(0);
+        return true;
+    };
+
+    outer.addFD(inner.getPollFD(), EPOLLIN, dispatchInner);
+
+    ThreadPollDispatcher dispatcher(outer);
+    doSocketTest(inner, goodMessage, remoteClosed);
+
+    outer.removeFD(inner.getPollFD());
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+