From 633f086ac82e192354b061ffed1682f4fd1d2a1e Mon Sep 17 00:00:00 2001 From: Piotr Bartosiewicz Date: Mon, 23 Feb 2015 18:03:32 +0100 Subject: [PATCH] Epoll wrapper, epoll dispatchers (glib and threaded) [Bug/Feature] N/A [Cause] N/A [Solution] Epoll is the only way, using no additional threads, to aggregate descriptors to one epoll descriptor. [Verification] Run tests Change-Id: I3863129a8b947c467615b2e9d352fce3bd1cda9a --- common/ipc/client.cpp | 2 +- common/ipc/internals/acceptor.cpp | 2 +- common/ipc/internals/event-queue.hpp | 4 +- common/ipc/internals/processor.cpp | 10 +- common/ipc/internals/request-queue.hpp | 4 +- common/ipc/internals/socket.cpp | 18 +- common/ipc/service.cpp | 2 +- common/utils/event-poll.cpp | 151 +++++++++++++++ common/utils/event-poll.hpp | 68 +++++++ common/{ipc/internals => utils}/eventfd.cpp | 26 ++- common/{ipc/internals => utils}/eventfd.hpp | 11 +- .../internals/utils.cpp => utils/fd-utils.cpp} | 76 ++++---- .../internals/utils.hpp => utils/fd-utils.hpp} | 13 +- common/utils/glib-poll-dispatcher.cpp | 61 ++++++ common/utils/glib-poll-dispatcher.hpp | 53 ++++++ common/utils/thread-poll-dispatcher.cpp | 53 ++++++ common/utils/thread-poll-dispatcher.hpp | 52 ++++++ tests/unit_tests/ipc/ut-ipc.cpp | 2 - tests/unit_tests/utils/ut-event-poll.cpp | 205 +++++++++++++++++++++ 19 files changed, 726 insertions(+), 87 deletions(-) create mode 100644 common/utils/event-poll.cpp create mode 100644 common/utils/event-poll.hpp rename common/{ipc/internals => utils}/eventfd.cpp (67%) rename common/{ipc/internals => utils}/eventfd.hpp (88%) rename common/{ipc/internals/utils.cpp => utils/fd-utils.cpp} (68%) rename common/{ipc/internals/utils.hpp => utils/fd-utils.hpp} (90%) create mode 100644 common/utils/glib-poll-dispatcher.cpp create mode 100644 common/utils/glib-poll-dispatcher.hpp create mode 100644 common/utils/thread-poll-dispatcher.cpp create mode 100644 common/utils/thread-poll-dispatcher.hpp create mode 100644 tests/unit_tests/utils/ut-event-poll.cpp diff --git a/common/ipc/client.cpp b/common/ipc/client.cpp index 51e59f8..6669d8a 100644 --- a/common/ipc/client.cpp +++ b/common/ipc/client.cpp @@ -45,7 +45,7 @@ Client::~Client() LOGS("Client Destructor"); try { stop(); - } catch (IPCException& e) { + } catch (std::exception& e) { LOGE("Error in Client's destructor: " << e.what()); } } diff --git a/common/ipc/internals/acceptor.cpp b/common/ipc/internals/acceptor.cpp index 627e1fe..ecb2210 100644 --- a/common/ipc/internals/acceptor.cpp +++ b/common/ipc/internals/acceptor.cpp @@ -49,7 +49,7 @@ Acceptor::~Acceptor() LOGT("Destroying Acceptor"); try { stop(); - } catch (IPCException& e) { + } catch (std::exception& e) { LOGE("Error in destructor: " << e.what()); } LOGT("Destroyed Acceptor"); diff --git a/common/ipc/internals/event-queue.hpp b/common/ipc/internals/event-queue.hpp index 2c591f7..b4532fd 100644 --- a/common/ipc/internals/event-queue.hpp +++ b/common/ipc/internals/event-queue.hpp @@ -26,7 +26,7 @@ #define COMMON_IPC_INTERNALS_EVENT_QUEUE_HPP #include "ipc/exception.hpp" -#include "ipc/internals/eventfd.hpp" +#include "utils/eventfd.hpp" #include "logger/logger.hpp" #include @@ -82,7 +82,7 @@ private: std::mutex mCommunicationMutex; std::queue mMessages; - EventFD mEventFD; + utils::EventFD mEventFD; }; template diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index 79c1a22..93f721f 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -26,8 +26,8 @@ #include "ipc/exception.hpp" #include "ipc/internals/processor.hpp" -#include "ipc/internals/utils.hpp" #include "utils/signal.hpp" +#include "utils/exception.hpp" #include #include @@ -79,21 +79,21 @@ Processor::~Processor() LOGS(mLogPrefix + "Processor Destructor"); try { stop(); - } catch (IPCException& e) { + } catch (std::exception& e) { LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what()); } } Processor::Peers::iterator Processor::getPeerInfoIterator(const FileDescriptor fd) { - return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&fd](const PeerInfo & peerInfo) { + return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [fd](const PeerInfo & peerInfo) { return fd == peerInfo.socketPtr->getFD(); }); } Processor::Peers::iterator Processor::getPeerInfoIterator(const PeerID peerID) { - return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&peerID](const PeerInfo & peerInfo) { + return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [peerID](const PeerInfo & peerInfo) { return peerID == peerInfo.peerID; }); } @@ -411,7 +411,7 @@ bool Processor::handleInput(const FileDescriptor fd) Socket::Guard guard = socket.getGuard(); socket.read(&methodID, sizeof(methodID)); socket.read(&messageID, sizeof(messageID)); - } catch (const IPCException& e) { + } catch (const UtilsException& e) { LOGE(mLogPrefix + "Error during reading the socket"); removePeerInternal(peerIt, std::make_exception_ptr(IPCNaughtyPeerException())); diff --git a/common/ipc/internals/request-queue.hpp b/common/ipc/internals/request-queue.hpp index e1ad46b..44306ee 100644 --- a/common/ipc/internals/request-queue.hpp +++ b/common/ipc/internals/request-queue.hpp @@ -26,7 +26,7 @@ #define COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP #include "ipc/exception.hpp" -#include "ipc/internals/eventfd.hpp" +#include "utils/eventfd.hpp" #include "logger/logger.hpp" #include @@ -115,7 +115,7 @@ private: std::list mRequests; std::mutex mStateMutex; - EventFD mEventFD; + utils::EventFD mEventFD; }; template diff --git a/common/ipc/internals/socket.cpp b/common/ipc/internals/socket.cpp index 2469af6..a66456e 100644 --- a/common/ipc/internals/socket.cpp +++ b/common/ipc/internals/socket.cpp @@ -26,7 +26,7 @@ #include "ipc/exception.hpp" #include "ipc/internals/socket.hpp" -#include "ipc/internals/utils.hpp" +#include "utils/fd-utils.hpp" #include "logger/logger.hpp" #include @@ -60,8 +60,8 @@ Socket::Socket(Socket&& socket) noexcept Socket::~Socket() noexcept { try { - ipc::close(mFD); - } catch (IPCException& e) { + utils::close(mFD); + } catch (std::exception& e) { LOGE("Error in Socket's destructor: " << e.what()); } } @@ -89,13 +89,13 @@ std::shared_ptr Socket::accept() void Socket::write(const void* bufferPtr, const size_t size) const { Guard guard(mCommunicationMutex); - ipc::write(mFD, bufferPtr, size); + utils::write(mFD, bufferPtr, size); } void Socket::read(void* bufferPtr, const size_t size) const { Guard guard(mCommunicationMutex); - ipc::read(mFD, bufferPtr, size); + utils::read(mFD, bufferPtr, size); } int Socket::getSystemdSocket(const std::string& path) @@ -142,7 +142,7 @@ int Socket::createZoneSocket(const std::string& path) reinterpret_cast(&serverAddress), sizeof(struct sockaddr_un))) { std::string message = strerror(errno); - ::close(sockfd); + utils::close(sockfd); LOGE("Error in bind: " << message); IPCException("Error in bind: " + message); } @@ -150,7 +150,7 @@ int Socket::createZoneSocket(const std::string& path) if (-1 == ::listen(sockfd, MAX_QUEUE_LENGTH)) { std::string message = strerror(errno); - ::close(sockfd); + utils::close(sockfd); LOGE("Error in listen: " << message); IPCException("Error in listen: " + message); } @@ -188,7 +188,7 @@ Socket Socket::connectSocket(const std::string& path) if (-1 == connect(fd, reinterpret_cast(&serverAddress), sizeof(struct sockaddr_un))) { - ::close(fd); + utils::close(fd); LOGE("Error in connect: " + std::string(strerror(errno))); throw IPCException("Error in connect: " + std::string(strerror(errno))); } @@ -196,7 +196,7 @@ Socket Socket::connectSocket(const std::string& path) // Nonblock socket int flags = fcntl(fd, F_GETFL, 0); if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK)) { - ::close(fd); + utils::close(fd); LOGE("Error in fcntl: " + std::string(strerror(errno))); throw IPCException("Error in fcntl: " + std::string(strerror(errno))); } diff --git a/common/ipc/service.cpp b/common/ipc/service.cpp index d945f73..442d804 100644 --- a/common/ipc/service.cpp +++ b/common/ipc/service.cpp @@ -50,7 +50,7 @@ Service::~Service() LOGS("Service Destructor"); try { stop(); - } catch (IPCException& e) { + } catch (std::exception& e) { LOGE("Error in Service's destructor: " << e.what()); } } diff --git a/common/utils/event-poll.cpp b/common/utils/event-poll.cpp new file mode 100644 index 0000000..b37d727 --- /dev/null +++ b/common/utils/event-poll.cpp @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief C++ epoll wrapper + */ + +#include "config.hpp" +#include "utils/event-poll.hpp" +#include "utils/fd-utils.hpp" +#include "utils/exception.hpp" +#include "logger/logger.hpp" + +#include +#include +#include +#include + +namespace vasum { +namespace utils { + +EventPoll::EventPoll() + : mPollFD(::epoll_create1(EPOLL_CLOEXEC)) +{ + if (mPollFD == -1) { + LOGE("Failed to create epoll: " << getSystemErrorMessage()); + throw UtilsException("Could not create epoll"); + } +} + +EventPoll::~EventPoll() +{ + if (!mCallbacks.empty()) { + LOGW("Not removed callbacks: " << mCallbacks.size()); + assert(0 && "Not removed callbacks left"); + } + utils::close(mPollFD); +} + +int EventPoll::getPollFD() const +{ + return mPollFD; +} + +void EventPoll::addFD(const int fd, const Events events, Callback&& callback) +{ + std::lock_guard lock(mMutex); + + if (mCallbacks.find(fd) != mCallbacks.end()) { + LOGW("Already added fd: " << fd); + throw UtilsException("FD already added"); + } + + if (!addFDInternal(fd, events)) { + throw UtilsException("Could not add fd"); + } + + mCallbacks.insert({fd, std::make_shared(std::move(callback))}); + LOGT("Callback added for " << fd); +} + +void EventPoll::removeFD(const int fd) +{ + std::lock_guard lock(mMutex); + + auto iter = mCallbacks.find(fd); + if (iter == mCallbacks.end()) { + LOGW("Failed to remove nonexistent fd: " << fd); + throw UtilsException("FD does not exist"); + } + mCallbacks.erase(iter); + removeFDInternal(fd); + LOGT("Callback removed for " << fd); +} + +bool EventPoll::dispatchIteration(const int timeoutMs) +{ + for (;;) { + epoll_event event; + int num = epoll_wait(mPollFD, &event, 1, timeoutMs); + if (num == 0) { + return false; // timeout + } + if (num < 0) { + if (errno == EINTR) { + continue; + } + LOGE("Failed to wait on epoll: " << getSystemErrorMessage()); + throw UtilsException("Could not wait for event"); + } + + // callback could be removed in the meantime, so be careful, find it inside lock + std::lock_guard lock(mMutex); + auto iter = mCallbacks.find(event.data.fd); + if (iter == mCallbacks.end()) { + continue; + } + + // add ref because removeFD(self) can be called inside callback + std::shared_ptr callback(iter->second); + return (*callback)(event.data.fd, event.events); + } +} + +void EventPoll::dispatchLoop() +{ + while (dispatchIteration(-1)) {} +} + +bool EventPoll::addFDInternal(const int fd, const Events events) +{ + epoll_event event; + memset(&event, 0, sizeof(event)); + event.events = events; + event.data.fd = fd; + + if (epoll_ctl(mPollFD, EPOLL_CTL_ADD, fd, &event) == -1) { + LOGE("Failed to add fd to poll: " << getSystemErrorMessage()); + return false; + } + return true; +} + +void EventPoll::removeFDInternal(const int fd) +{ + if (epoll_ctl(mPollFD, EPOLL_CTL_DEL, fd, NULL) == -1) { + assert(errno != EBADF); // always removeFD before closing fd locally! + // this is important because linux will reuse this fd number + LOGE("Failed to remove fd from poll: " << getSystemErrorMessage()); + } +} + +} // namespace utils +} // namespace vasum diff --git a/common/utils/event-poll.hpp b/common/utils/event-poll.hpp new file mode 100644 index 0000000..1a7ae2c --- /dev/null +++ b/common/utils/event-poll.hpp @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief C++ epoll wrapper + */ + +#ifndef COMMON_UTILS_EVENT_POLL_HPP +#define COMMON_UTILS_EVENT_POLL_HPP + +#include +#include +#include +#include + +namespace vasum { +namespace utils { + +class EventPoll { +public: + + typedef unsigned int Events; + typedef std::function Callback; + + EventPoll(); + ~EventPoll(); + + int getPollFD() const; + + void addFD(const int fd, const Events events, Callback&& callback); + void removeFD(const int fd); + + bool dispatchIteration(const int timeoutMs); + void dispatchLoop(); + +private: + typedef std::recursive_mutex Mutex; + + const int mPollFD; + Mutex mMutex; + std::unordered_map> mCallbacks; + + bool addFDInternal(const int fd, const Events events); + void removeFDInternal(const int fd); +}; + + +} // namespace utils +} // namespace vasum + +#endif // COMMON_UTILS_EVENT_POLL_HPP diff --git a/common/ipc/internals/eventfd.cpp b/common/utils/eventfd.cpp similarity index 67% rename from common/ipc/internals/eventfd.cpp rename to common/utils/eventfd.cpp index 729243d..2fd8d62 100644 --- a/common/ipc/internals/eventfd.cpp +++ b/common/utils/eventfd.cpp @@ -24,9 +24,9 @@ #include "config.hpp" -#include "ipc/internals/eventfd.hpp" -#include "ipc/internals/utils.hpp" -#include "ipc/exception.hpp" +#include "utils/eventfd.hpp" +#include "utils/exception.hpp" +#include "utils/fd-utils.hpp" #include "logger/logger.hpp" #include @@ -35,24 +35,20 @@ #include namespace vasum { -namespace ipc { +namespace utils { EventFD::EventFD() { - mFD = ::eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK); + mFD = ::eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC); if (mFD == -1) { - LOGE("Error in eventfd: " << std::string(strerror(errno))); - throw IPCException("Error in eventfd: " + std::string(strerror(errno))); + LOGE("Error in eventfd: " << getSystemErrorMessage()); + throw UtilsException("Error in eventfd: " + getSystemErrorMessage()); } } EventFD::~EventFD() { - try { - ipc::close(mFD); - } catch (IPCException& e) { - LOGE("Error in Event's destructor: " << e.what()); - } + utils::close(mFD); } int EventFD::getFD() const @@ -63,15 +59,15 @@ int EventFD::getFD() const void EventFD::send() { const std::uint64_t toSend = 1; - ipc::write(mFD, &toSend, sizeof(toSend)); + utils::write(mFD, &toSend, sizeof(toSend)); } void EventFD::receive() { std::uint64_t readBuffer; - ipc::read(mFD, &readBuffer, sizeof(readBuffer)); + utils::read(mFD, &readBuffer, sizeof(readBuffer)); } -} // namespace ipc +} // namespace utils } // namespace vasum diff --git a/common/ipc/internals/eventfd.hpp b/common/utils/eventfd.hpp similarity index 88% rename from common/ipc/internals/eventfd.hpp rename to common/utils/eventfd.hpp index cf3094c..b3d5027 100644 --- a/common/ipc/internals/eventfd.hpp +++ b/common/utils/eventfd.hpp @@ -22,17 +22,18 @@ * @brief Eventfd wrapper */ -#ifndef COMMON_IPC_INTERNALS_EVENTFD_HPP -#define COMMON_IPC_INTERNALS_EVENTFD_HPP +#ifndef COMMON_UTILS_EVENTFD_HPP +#define COMMON_UTILS_EVENTFD_HPP namespace vasum { -namespace ipc { +namespace utils { class EventFD { public: EventFD(); ~EventFD(); + EventFD(const EventFD& eventfd) = delete; EventFD& operator=(const EventFD&) = delete; @@ -56,7 +57,7 @@ private: int mFD; }; -} // namespace ipc +} // namespace utils } // namespace vasum -#endif // COMMON_IPC_INTERNALS_EVENTFD_HPP +#endif // COMMON_UTILS_EVENTFD_HPP diff --git a/common/ipc/internals/utils.cpp b/common/utils/fd-utils.cpp similarity index 68% rename from common/ipc/internals/utils.cpp rename to common/utils/fd-utils.cpp index cb9c93c..59bad36 100644 --- a/common/ipc/internals/utils.cpp +++ b/common/utils/fd-utils.cpp @@ -19,13 +19,13 @@ /** * @file * @author Jan Olszak (j.olszak@samsung.com) - * @brief Utility functions + * @brief File descriptor utility functions */ #include "config.hpp" -#include "ipc/exception.hpp" -#include "ipc/internals/utils.hpp" +#include "utils/fd-utils.hpp" +#include "utils/exception.hpp" #include "logger/logger.hpp" #include @@ -40,7 +40,7 @@ namespace fs = boost::filesystem; namespace chr = std::chrono; namespace vasum { -namespace ipc { +namespace utils { namespace { @@ -51,13 +51,14 @@ void waitForEvent(int fd, // Wait for the rest of the data struct pollfd fds[1]; fds[0].fd = fd; - fds[0].events = event | POLLHUP; + fds[0].events = event; for (;;) { chr::milliseconds timeoutMS = chr::duration_cast(deadline - chr::high_resolution_clock::now()); if (timeoutMS.count() < 0) { - LOGE("Timeout in read"); - throw IPCException("Timeout in read"); + LOGE("Timeout while waiting for event: " << std::hex << event << + " on fd: " << std::dec << fd); + throw UtilsException("Timeout"); } int ret = ::poll(fds, 1 /*fds size*/, timeoutMS.count()); @@ -66,13 +67,13 @@ void waitForEvent(int fd, if (errno == EINTR) { continue; } - LOGE("Error in poll: " + std::string(strerror(errno))); - throw IPCException("Error in poll: " + std::string(strerror(errno))); + LOGE("Error in poll: " + getSystemErrorMessage()); + throw UtilsException("Error in poll: " + getSystemErrorMessage()); } if (ret == 0) { LOGE("Timeout in read"); - throw IPCException("Timeout in read"); + throw UtilsException("Timeout in read"); } if (fds[0].revents & event) { @@ -82,7 +83,7 @@ void waitForEvent(int fd, if (fds[0].revents & POLLHUP) { LOGW("Peer disconnected"); - throw IPCException("Peer disconnected"); + throw UtilsException("Peer disconnected"); } } } @@ -98,11 +99,10 @@ void close(int fd) for (;;) { if (-1 == ::close(fd)) { if (errno == EINTR) { - LOGD("Close interrupted by a signal, retrying"); + LOGT("Close interrupted by a signal, retrying"); continue; } - LOGE("Error in close: " << std::string(strerror(errno))); - throw IPCException("Error in close: " + std::string(strerror(errno))); + LOGE("Error in close: " << getSystemErrorMessage()); } break; } @@ -118,22 +118,21 @@ void write(int fd, const void* bufferPtr, const size_t size, int timeoutMS) int n = ::write(fd, reinterpret_cast(bufferPtr) + nTotal, size - nTotal); - if (n > 0) { + if (n >= 0) { nTotal += n; + if (nTotal == size) { + // All data is written, break loop + break; + } } else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { // Neglected errors LOGD("Retrying write"); } else { - LOGE("Error during writing: " + std::string(strerror(errno))); - throw IPCException("Error during writing: " + std::string(strerror(errno))); + LOGE("Error during writing: " + getSystemErrorMessage()); + throw UtilsException("Error during writing: " + getSystemErrorMessage()); } - if (nTotal >= size) { - // All data is written, break loop - break; - } else { - waitForEvent(fd, POLLOUT, deadline); - } + waitForEvent(fd, POLLOUT, deadline); } } @@ -147,22 +146,25 @@ void read(int fd, void* bufferPtr, const size_t size, int timeoutMS) int n = ::read(fd, reinterpret_cast(bufferPtr) + nTotal, size - nTotal); - if (n > 0) { + if (n >= 0) { nTotal += n; + if (nTotal == size) { + // All data is read, break loop + break; + } + if (n == 0) { + LOGW("Peer disconnected"); + throw UtilsException("Peer disconnected"); + } } else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { // Neglected errors LOGD("Retrying read"); } else { - LOGE("Error during reading: " + std::string(strerror(errno))); - throw IPCException("Error during reading: " + std::string(strerror(errno))); + LOGE("Error during reading: " + getSystemErrorMessage()); + throw UtilsException("Error during reading: " + getSystemErrorMessage()); } - if (nTotal >= size) { - // All data is read, break loop - break; - } else { - waitForEvent(fd, POLLIN, deadline); - } + waitForEvent(fd, POLLIN, deadline); } } @@ -170,8 +172,8 @@ unsigned int getMaxFDNumber() { struct rlimit rlim; if (-1 == getrlimit(RLIMIT_NOFILE, &rlim)) { - LOGE("Error during getrlimit: " + std::string(strerror(errno))); - throw IPCException("Error during getrlimit: " + std::string(strerror(errno))); + LOGE("Error during getrlimit: " + getSystemErrorMessage()); + throw UtilsException("Error during getrlimit: " + getSystemErrorMessage()); } return rlim.rlim_cur; } @@ -182,8 +184,8 @@ void setMaxFDNumber(unsigned int limit) rlim.rlim_cur = limit; rlim.rlim_max = limit; if (-1 == setrlimit(RLIMIT_NOFILE, &rlim)) { - LOGE("Error during setrlimit: " + std::string(strerror(errno))); - throw IPCException("Error during setrlimit: " + std::string(strerror(errno))); + LOGE("Error during setrlimit: " + getSystemErrorMessage()); + throw UtilsException("Error during setrlimit: " + getSystemErrorMessage()); } } @@ -194,6 +196,6 @@ unsigned int getFDNumber() fs::directory_iterator()); } -} // namespace ipc +} // namespace utils } // namespace vasum diff --git a/common/ipc/internals/utils.hpp b/common/utils/fd-utils.hpp similarity index 90% rename from common/ipc/internals/utils.hpp rename to common/utils/fd-utils.hpp index c3bcaf1..ac32b0a 100644 --- a/common/ipc/internals/utils.hpp +++ b/common/utils/fd-utils.hpp @@ -19,20 +19,19 @@ /** * @file * @author Jan Olszak (j.olszak@samsung.com) - * @brief Utility functions + * @brief File descriptor utility functions */ -#ifndef COMMON_IPC_INTERNALS_UTILS_HPP -#define COMMON_IPC_INTERNALS_UTILS_HPP +#ifndef COMMON_UTILS_FD_HPP +#define COMMON_UTILS_FD_HPP #include namespace vasum { -namespace ipc { +namespace utils { /** * Close the file descriptor. - * Repeat until */ void close(int fd); @@ -73,7 +72,7 @@ void setMaxFDNumber(unsigned int limit); */ unsigned int getFDNumber(); -} // namespace ipc +} // namespace utils } // namespace vasum -#endif // COMMON_IPC_INTERNALS_UTILS_HPP +#endif // COMMON_UTILS_FD_HPP diff --git a/common/utils/glib-poll-dispatcher.cpp b/common/utils/glib-poll-dispatcher.cpp new file mode 100644 index 0000000..5c66d16 --- /dev/null +++ b/common/utils/glib-poll-dispatcher.cpp @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief glib epoll dispatcher + */ + +#include "config.hpp" +#include "utils/glib-poll-dispatcher.hpp" +#include "utils/callback-wrapper.hpp" + +namespace vasum { +namespace utils { + +GlibPollDispatcher::GlibPollDispatcher(EventPoll& poll) +{ + mChannel = g_io_channel_unix_new(poll.getPollFD()); + + auto dispatchCallback = [&]() { + poll.dispatchIteration(0); + }; + + auto cCallback = [](GIOChannel*, GIOCondition, gpointer data) -> gboolean { + getCallbackFromPointer(data)(); + return TRUE; + }; + + mWatchId = g_io_add_watch_full(mChannel, + G_PRIORITY_DEFAULT, + G_IO_IN, + cCallback, + createCallbackWrapper(dispatchCallback, mGuard.spawn()), + &deleteCallbackWrapper); +} + +GlibPollDispatcher::~GlibPollDispatcher() +{ + g_source_remove(mWatchId); + g_io_channel_unref(mChannel); + // mGuard destructor will wait for full unregister of dispatchCallback +} + +} // namespace utils +} // namespace vasum diff --git a/common/utils/glib-poll-dispatcher.hpp b/common/utils/glib-poll-dispatcher.hpp new file mode 100644 index 0000000..07da0c3 --- /dev/null +++ b/common/utils/glib-poll-dispatcher.hpp @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief glib epoll dispatcher + */ + +#ifndef COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP +#define COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP + +#include "utils/event-poll.hpp" +#include "utils/callback-guard.hpp" + +#include + +namespace vasum { +namespace utils { + +/** + * Will dispatch poll events in glib thread + */ +class GlibPollDispatcher { +public: + GlibPollDispatcher(EventPoll& poll); + ~GlibPollDispatcher(); +private: + CallbackGuard mGuard; + GIOChannel* mChannel; + guint mWatchId; +}; + + +} // namespace utils +} // namespace vasum + +#endif // COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP diff --git a/common/utils/thread-poll-dispatcher.cpp b/common/utils/thread-poll-dispatcher.cpp new file mode 100644 index 0000000..f350587 --- /dev/null +++ b/common/utils/thread-poll-dispatcher.cpp @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief Thread epoll dispatcher + */ + +#include "config.hpp" +#include "utils/thread-poll-dispatcher.hpp" + +#include + +namespace vasum { +namespace utils { + +ThreadPollDispatcher::ThreadPollDispatcher(EventPoll& poll) + : mPoll(poll) + , mThread([&]{ poll.dispatchLoop(); }) +{ + auto controlCallback = [this](int, EventPoll::Events) -> bool { + mStopEvent.receive(); + return false; // break the loop + }; + + poll.addFD(mStopEvent.getFD(), EPOLLIN, std::move(controlCallback)); +} + +ThreadPollDispatcher::~ThreadPollDispatcher() +{ + mStopEvent.send(); + mThread.join(); + mPoll.removeFD(mStopEvent.getFD()); +} + +} // namespace utils +} // namespace vasum diff --git a/common/utils/thread-poll-dispatcher.hpp b/common/utils/thread-poll-dispatcher.hpp new file mode 100644 index 0000000..e54cb4e --- /dev/null +++ b/common/utils/thread-poll-dispatcher.hpp @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief Thread epoll dispatcher + */ + +#ifndef COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP +#define COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP + +#include "utils/event-poll.hpp" +#include "utils/eventfd.hpp" + +#include + +namespace vasum { +namespace utils { + +/** + * Will dispatch poll events in a newly created thread + */ +class ThreadPollDispatcher { +public: + ThreadPollDispatcher(EventPoll& poll); + ~ThreadPollDispatcher(); +private: + EventPoll& mPoll; + EventFD mStopEvent; + std::thread mThread; +}; + +} // namespace utils +} // namespace vasum + +#endif // COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp index 193eec9..fa0de22 100644 --- a/tests/unit_tests/ipc/ut-ipc.cpp +++ b/tests/unit_tests/ipc/ut-ipc.cpp @@ -592,7 +592,6 @@ BOOST_AUTO_TEST_CASE(ServiceGSource) l.set(); }; - IPCGSource::Pointer serviceGSource; Service s(SOCKET_PATH); s.setMethodHandler(1, echoCallback); @@ -622,7 +621,6 @@ BOOST_AUTO_TEST_CASE(ClientGSource) Service s(SOCKET_PATH); s.start(); - IPCGSource::Pointer clientGSource; Client c(SOCKET_PATH); c.setMethodHandler(1, echoCallback); c.setSignalHandler(2, signalHandler); diff --git a/tests/unit_tests/utils/ut-event-poll.cpp b/tests/unit_tests/utils/ut-event-poll.cpp new file mode 100644 index 0000000..e387393 --- /dev/null +++ b/tests/unit_tests/utils/ut-event-poll.cpp @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Piotr Bartosiewicz + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + + +/** + * @file + * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com) + * @brief Unit tests of event poll + */ + +#include "config.hpp" +#include "ut.hpp" + +#include "utils/event-poll.hpp" +#include "logger/logger.hpp" +#include "ipc/internals/socket.hpp" +#include "utils/latch.hpp" +#include "utils/glib-loop.hpp" +#include "utils/glib-poll-dispatcher.hpp" +#include "utils/thread-poll-dispatcher.hpp" + +#include +#include + +using namespace vasum::utils; +using namespace vasum::ipc; + +namespace { + +const int unsigned TIMEOUT = 1000; +#define ADD_EVENT(e) {EPOLL##e, #e} +const std::map EVENT_NAMES = { + ADD_EVENT(IN), + ADD_EVENT(OUT), + ADD_EVENT(ERR), + ADD_EVENT(HUP), + ADD_EVENT(RDHUP), +}; +#undef ADD_EVENT + +std::string strEvents(EventPoll::Events events) +{ + if (events == 0) { + return ""; + } + std::ostringstream ss; + for (const auto& p : EVENT_NAMES) { + if (events & p.first) { + ss << p.second << ", "; + events &= ~p.first; + } + } + if (events != 0) { + ss << std::hex << events; + return ss.str(); + } else { + std::string ret = ss.str(); + ret.resize(ret.size() - 2); + return ret; + } +} + +} // namespace + +BOOST_AUTO_TEST_SUITE(EventPollSuite) + +BOOST_AUTO_TEST_CASE(EmptyPoll) +{ + EventPoll poll; + BOOST_CHECK(!poll.dispatchIteration(0)); +} + +BOOST_AUTO_TEST_CASE(ThreadedPoll) +{ + EventPoll poll; + ThreadPollDispatcher dispatcher(poll); +} + +BOOST_AUTO_TEST_CASE(GlibPoll) +{ + ScopedGlibLoop loop; + + EventPoll poll; + GlibPollDispatcher dispatcher(poll); +} + +void doSocketTest(EventPoll& poll, Latch& goodMessage, Latch& remoteClosed) +{ + const std::string PATH = "/tmp/ut-poll.sock"; + const std::string MESSAGE = "This is a test message"; + + Socket listen = Socket::createSocket(PATH); + std::shared_ptr server; + + auto serverCallback = [&](int, EventPoll::Events events) -> bool { + LOGD("Server events: " << strEvents(events)); + + if (events & EPOLLOUT) { + server->write(MESSAGE.data(), MESSAGE.size()); + poll.removeFD(server->getFD()); + server.reset(); + } + return true; + }; + + auto listenCallback = [&](int, EventPoll::Events events) -> bool { + LOGD("Listen events: " << strEvents(events)); + if (events & EPOLLIN) { + server = listen.accept(); + poll.addFD(server->getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLOUT, serverCallback); + } + return true; + }; + + poll.addFD(listen.getFD(), EPOLLIN, listenCallback); + + Socket client = Socket::connectSocket(PATH); + + auto clientCallback = [&](int, EventPoll::Events events) -> bool { + LOGD("Client events: " << strEvents(events)); + + if (events & EPOLLIN) { + std::string ret(MESSAGE.size(), 'x'); + client.read(&ret.front(), ret.size()); + if (ret == MESSAGE) { + goodMessage.set(); + } + } + if (events & EPOLLRDHUP) { + poll.removeFD(client.getFD()); + remoteClosed.set(); + } + return true; + }; + + poll.addFD(client.getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLIN, clientCallback); + + BOOST_CHECK(goodMessage.wait(TIMEOUT)); + BOOST_CHECK(remoteClosed.wait(TIMEOUT)); + + poll.removeFD(listen.getFD()); +} + +BOOST_AUTO_TEST_CASE(ThreadedPollSocket) +{ + Latch goodMessage; + Latch remoteClosed; + + EventPoll poll; + ThreadPollDispatcher dispatcher(poll); + + doSocketTest(poll, goodMessage, remoteClosed); +} + +BOOST_AUTO_TEST_CASE(GlibPollSocket) +{ + Latch goodMessage; + Latch remoteClosed; + + ScopedGlibLoop loop; + + EventPoll poll; + GlibPollDispatcher dispatcher(poll); + + doSocketTest(poll, goodMessage, remoteClosed); +} + +BOOST_AUTO_TEST_CASE(PollStacking) +{ + Latch goodMessage; + Latch remoteClosed; + + EventPoll outer; + EventPoll inner; + + auto dispatchInner = [&](int, EventPoll::Events) -> bool { + inner.dispatchIteration(0); + return true; + }; + + outer.addFD(inner.getPollFD(), EPOLLIN, dispatchInner); + + ThreadPollDispatcher dispatcher(outer); + doSocketTest(inner, goodMessage, remoteClosed); + + outer.removeFD(inner.getPollFD()); +} + +BOOST_AUTO_TEST_SUITE_END() + -- 2.7.4