From 633f086ac82e192354b061ffed1682f4fd1d2a1e Mon Sep 17 00:00:00 2001
From: Piotr Bartosiewicz
Date: Mon, 23 Feb 2015 18:03:32 +0100
Subject: [PATCH] Epoll wrapper, epoll dispatchers (glib and threaded)
[Bug/Feature] N/A
[Cause] N/A
[Solution] Epoll is the only way, using no additional threads, to
aggregate descriptors to one epoll descriptor.
[Verification] Run tests
Change-Id: I3863129a8b947c467615b2e9d352fce3bd1cda9a
---
common/ipc/client.cpp | 2 +-
common/ipc/internals/acceptor.cpp | 2 +-
common/ipc/internals/event-queue.hpp | 4 +-
common/ipc/internals/processor.cpp | 10 +-
common/ipc/internals/request-queue.hpp | 4 +-
common/ipc/internals/socket.cpp | 18 +-
common/ipc/service.cpp | 2 +-
common/utils/event-poll.cpp | 151 +++++++++++++++
common/utils/event-poll.hpp | 68 +++++++
common/{ipc/internals => utils}/eventfd.cpp | 26 ++-
common/{ipc/internals => utils}/eventfd.hpp | 11 +-
.../internals/utils.cpp => utils/fd-utils.cpp} | 76 ++++----
.../internals/utils.hpp => utils/fd-utils.hpp} | 13 +-
common/utils/glib-poll-dispatcher.cpp | 61 ++++++
common/utils/glib-poll-dispatcher.hpp | 53 ++++++
common/utils/thread-poll-dispatcher.cpp | 53 ++++++
common/utils/thread-poll-dispatcher.hpp | 52 ++++++
tests/unit_tests/ipc/ut-ipc.cpp | 2 -
tests/unit_tests/utils/ut-event-poll.cpp | 205 +++++++++++++++++++++
19 files changed, 726 insertions(+), 87 deletions(-)
create mode 100644 common/utils/event-poll.cpp
create mode 100644 common/utils/event-poll.hpp
rename common/{ipc/internals => utils}/eventfd.cpp (67%)
rename common/{ipc/internals => utils}/eventfd.hpp (88%)
rename common/{ipc/internals/utils.cpp => utils/fd-utils.cpp} (68%)
rename common/{ipc/internals/utils.hpp => utils/fd-utils.hpp} (90%)
create mode 100644 common/utils/glib-poll-dispatcher.cpp
create mode 100644 common/utils/glib-poll-dispatcher.hpp
create mode 100644 common/utils/thread-poll-dispatcher.cpp
create mode 100644 common/utils/thread-poll-dispatcher.hpp
create mode 100644 tests/unit_tests/utils/ut-event-poll.cpp
diff --git a/common/ipc/client.cpp b/common/ipc/client.cpp
index 51e59f8..6669d8a 100644
--- a/common/ipc/client.cpp
+++ b/common/ipc/client.cpp
@@ -45,7 +45,7 @@ Client::~Client()
LOGS("Client Destructor");
try {
stop();
- } catch (IPCException& e) {
+ } catch (std::exception& e) {
LOGE("Error in Client's destructor: " << e.what());
}
}
diff --git a/common/ipc/internals/acceptor.cpp b/common/ipc/internals/acceptor.cpp
index 627e1fe..ecb2210 100644
--- a/common/ipc/internals/acceptor.cpp
+++ b/common/ipc/internals/acceptor.cpp
@@ -49,7 +49,7 @@ Acceptor::~Acceptor()
LOGT("Destroying Acceptor");
try {
stop();
- } catch (IPCException& e) {
+ } catch (std::exception& e) {
LOGE("Error in destructor: " << e.what());
}
LOGT("Destroyed Acceptor");
diff --git a/common/ipc/internals/event-queue.hpp b/common/ipc/internals/event-queue.hpp
index 2c591f7..b4532fd 100644
--- a/common/ipc/internals/event-queue.hpp
+++ b/common/ipc/internals/event-queue.hpp
@@ -26,7 +26,7 @@
#define COMMON_IPC_INTERNALS_EVENT_QUEUE_HPP
#include "ipc/exception.hpp"
-#include "ipc/internals/eventfd.hpp"
+#include "utils/eventfd.hpp"
#include "logger/logger.hpp"
#include
@@ -82,7 +82,7 @@ private:
std::mutex mCommunicationMutex;
std::queue mMessages;
- EventFD mEventFD;
+ utils::EventFD mEventFD;
};
template
diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp
index 79c1a22..93f721f 100644
--- a/common/ipc/internals/processor.cpp
+++ b/common/ipc/internals/processor.cpp
@@ -26,8 +26,8 @@
#include "ipc/exception.hpp"
#include "ipc/internals/processor.hpp"
-#include "ipc/internals/utils.hpp"
#include "utils/signal.hpp"
+#include "utils/exception.hpp"
#include
#include
@@ -79,21 +79,21 @@ Processor::~Processor()
LOGS(mLogPrefix + "Processor Destructor");
try {
stop();
- } catch (IPCException& e) {
+ } catch (std::exception& e) {
LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what());
}
}
Processor::Peers::iterator Processor::getPeerInfoIterator(const FileDescriptor fd)
{
- return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&fd](const PeerInfo & peerInfo) {
+ return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [fd](const PeerInfo & peerInfo) {
return fd == peerInfo.socketPtr->getFD();
});
}
Processor::Peers::iterator Processor::getPeerInfoIterator(const PeerID peerID)
{
- return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&peerID](const PeerInfo & peerInfo) {
+ return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [peerID](const PeerInfo & peerInfo) {
return peerID == peerInfo.peerID;
});
}
@@ -411,7 +411,7 @@ bool Processor::handleInput(const FileDescriptor fd)
Socket::Guard guard = socket.getGuard();
socket.read(&methodID, sizeof(methodID));
socket.read(&messageID, sizeof(messageID));
- } catch (const IPCException& e) {
+ } catch (const UtilsException& e) {
LOGE(mLogPrefix + "Error during reading the socket");
removePeerInternal(peerIt,
std::make_exception_ptr(IPCNaughtyPeerException()));
diff --git a/common/ipc/internals/request-queue.hpp b/common/ipc/internals/request-queue.hpp
index e1ad46b..44306ee 100644
--- a/common/ipc/internals/request-queue.hpp
+++ b/common/ipc/internals/request-queue.hpp
@@ -26,7 +26,7 @@
#define COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP
#include "ipc/exception.hpp"
-#include "ipc/internals/eventfd.hpp"
+#include "utils/eventfd.hpp"
#include "logger/logger.hpp"
#include
@@ -115,7 +115,7 @@ private:
std::list mRequests;
std::mutex mStateMutex;
- EventFD mEventFD;
+ utils::EventFD mEventFD;
};
template
diff --git a/common/ipc/internals/socket.cpp b/common/ipc/internals/socket.cpp
index 2469af6..a66456e 100644
--- a/common/ipc/internals/socket.cpp
+++ b/common/ipc/internals/socket.cpp
@@ -26,7 +26,7 @@
#include "ipc/exception.hpp"
#include "ipc/internals/socket.hpp"
-#include "ipc/internals/utils.hpp"
+#include "utils/fd-utils.hpp"
#include "logger/logger.hpp"
#include
@@ -60,8 +60,8 @@ Socket::Socket(Socket&& socket) noexcept
Socket::~Socket() noexcept
{
try {
- ipc::close(mFD);
- } catch (IPCException& e) {
+ utils::close(mFD);
+ } catch (std::exception& e) {
LOGE("Error in Socket's destructor: " << e.what());
}
}
@@ -89,13 +89,13 @@ std::shared_ptr Socket::accept()
void Socket::write(const void* bufferPtr, const size_t size) const
{
Guard guard(mCommunicationMutex);
- ipc::write(mFD, bufferPtr, size);
+ utils::write(mFD, bufferPtr, size);
}
void Socket::read(void* bufferPtr, const size_t size) const
{
Guard guard(mCommunicationMutex);
- ipc::read(mFD, bufferPtr, size);
+ utils::read(mFD, bufferPtr, size);
}
int Socket::getSystemdSocket(const std::string& path)
@@ -142,7 +142,7 @@ int Socket::createZoneSocket(const std::string& path)
reinterpret_cast(&serverAddress),
sizeof(struct sockaddr_un))) {
std::string message = strerror(errno);
- ::close(sockfd);
+ utils::close(sockfd);
LOGE("Error in bind: " << message);
IPCException("Error in bind: " + message);
}
@@ -150,7 +150,7 @@ int Socket::createZoneSocket(const std::string& path)
if (-1 == ::listen(sockfd,
MAX_QUEUE_LENGTH)) {
std::string message = strerror(errno);
- ::close(sockfd);
+ utils::close(sockfd);
LOGE("Error in listen: " << message);
IPCException("Error in listen: " + message);
}
@@ -188,7 +188,7 @@ Socket Socket::connectSocket(const std::string& path)
if (-1 == connect(fd,
reinterpret_cast(&serverAddress),
sizeof(struct sockaddr_un))) {
- ::close(fd);
+ utils::close(fd);
LOGE("Error in connect: " + std::string(strerror(errno)));
throw IPCException("Error in connect: " + std::string(strerror(errno)));
}
@@ -196,7 +196,7 @@ Socket Socket::connectSocket(const std::string& path)
// Nonblock socket
int flags = fcntl(fd, F_GETFL, 0);
if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
- ::close(fd);
+ utils::close(fd);
LOGE("Error in fcntl: " + std::string(strerror(errno)));
throw IPCException("Error in fcntl: " + std::string(strerror(errno)));
}
diff --git a/common/ipc/service.cpp b/common/ipc/service.cpp
index d945f73..442d804 100644
--- a/common/ipc/service.cpp
+++ b/common/ipc/service.cpp
@@ -50,7 +50,7 @@ Service::~Service()
LOGS("Service Destructor");
try {
stop();
- } catch (IPCException& e) {
+ } catch (std::exception& e) {
LOGE("Error in Service's destructor: " << e.what());
}
}
diff --git a/common/utils/event-poll.cpp b/common/utils/event-poll.cpp
new file mode 100644
index 0000000..b37d727
--- /dev/null
+++ b/common/utils/event-poll.cpp
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief C++ epoll wrapper
+ */
+
+#include "config.hpp"
+#include "utils/event-poll.hpp"
+#include "utils/fd-utils.hpp"
+#include "utils/exception.hpp"
+#include "logger/logger.hpp"
+
+#include
+#include
+#include
+#include
+
+namespace vasum {
+namespace utils {
+
+EventPoll::EventPoll()
+ : mPollFD(::epoll_create1(EPOLL_CLOEXEC))
+{
+ if (mPollFD == -1) {
+ LOGE("Failed to create epoll: " << getSystemErrorMessage());
+ throw UtilsException("Could not create epoll");
+ }
+}
+
+EventPoll::~EventPoll()
+{
+ if (!mCallbacks.empty()) {
+ LOGW("Not removed callbacks: " << mCallbacks.size());
+ assert(0 && "Not removed callbacks left");
+ }
+ utils::close(mPollFD);
+}
+
+int EventPoll::getPollFD() const
+{
+ return mPollFD;
+}
+
+void EventPoll::addFD(const int fd, const Events events, Callback&& callback)
+{
+ std::lock_guard lock(mMutex);
+
+ if (mCallbacks.find(fd) != mCallbacks.end()) {
+ LOGW("Already added fd: " << fd);
+ throw UtilsException("FD already added");
+ }
+
+ if (!addFDInternal(fd, events)) {
+ throw UtilsException("Could not add fd");
+ }
+
+ mCallbacks.insert({fd, std::make_shared(std::move(callback))});
+ LOGT("Callback added for " << fd);
+}
+
+void EventPoll::removeFD(const int fd)
+{
+ std::lock_guard lock(mMutex);
+
+ auto iter = mCallbacks.find(fd);
+ if (iter == mCallbacks.end()) {
+ LOGW("Failed to remove nonexistent fd: " << fd);
+ throw UtilsException("FD does not exist");
+ }
+ mCallbacks.erase(iter);
+ removeFDInternal(fd);
+ LOGT("Callback removed for " << fd);
+}
+
+bool EventPoll::dispatchIteration(const int timeoutMs)
+{
+ for (;;) {
+ epoll_event event;
+ int num = epoll_wait(mPollFD, &event, 1, timeoutMs);
+ if (num == 0) {
+ return false; // timeout
+ }
+ if (num < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ LOGE("Failed to wait on epoll: " << getSystemErrorMessage());
+ throw UtilsException("Could not wait for event");
+ }
+
+ // callback could be removed in the meantime, so be careful, find it inside lock
+ std::lock_guard lock(mMutex);
+ auto iter = mCallbacks.find(event.data.fd);
+ if (iter == mCallbacks.end()) {
+ continue;
+ }
+
+ // add ref because removeFD(self) can be called inside callback
+ std::shared_ptr callback(iter->second);
+ return (*callback)(event.data.fd, event.events);
+ }
+}
+
+void EventPoll::dispatchLoop()
+{
+ while (dispatchIteration(-1)) {}
+}
+
+bool EventPoll::addFDInternal(const int fd, const Events events)
+{
+ epoll_event event;
+ memset(&event, 0, sizeof(event));
+ event.events = events;
+ event.data.fd = fd;
+
+ if (epoll_ctl(mPollFD, EPOLL_CTL_ADD, fd, &event) == -1) {
+ LOGE("Failed to add fd to poll: " << getSystemErrorMessage());
+ return false;
+ }
+ return true;
+}
+
+void EventPoll::removeFDInternal(const int fd)
+{
+ if (epoll_ctl(mPollFD, EPOLL_CTL_DEL, fd, NULL) == -1) {
+ assert(errno != EBADF); // always removeFD before closing fd locally!
+ // this is important because linux will reuse this fd number
+ LOGE("Failed to remove fd from poll: " << getSystemErrorMessage());
+ }
+}
+
+} // namespace utils
+} // namespace vasum
diff --git a/common/utils/event-poll.hpp b/common/utils/event-poll.hpp
new file mode 100644
index 0000000..1a7ae2c
--- /dev/null
+++ b/common/utils/event-poll.hpp
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief C++ epoll wrapper
+ */
+
+#ifndef COMMON_UTILS_EVENT_POLL_HPP
+#define COMMON_UTILS_EVENT_POLL_HPP
+
+#include
+#include
+#include
+#include
+
+namespace vasum {
+namespace utils {
+
+class EventPoll {
+public:
+
+ typedef unsigned int Events;
+ typedef std::function Callback;
+
+ EventPoll();
+ ~EventPoll();
+
+ int getPollFD() const;
+
+ void addFD(const int fd, const Events events, Callback&& callback);
+ void removeFD(const int fd);
+
+ bool dispatchIteration(const int timeoutMs);
+ void dispatchLoop();
+
+private:
+ typedef std::recursive_mutex Mutex;
+
+ const int mPollFD;
+ Mutex mMutex;
+ std::unordered_map> mCallbacks;
+
+ bool addFDInternal(const int fd, const Events events);
+ void removeFDInternal(const int fd);
+};
+
+
+} // namespace utils
+} // namespace vasum
+
+#endif // COMMON_UTILS_EVENT_POLL_HPP
diff --git a/common/ipc/internals/eventfd.cpp b/common/utils/eventfd.cpp
similarity index 67%
rename from common/ipc/internals/eventfd.cpp
rename to common/utils/eventfd.cpp
index 729243d..2fd8d62 100644
--- a/common/ipc/internals/eventfd.cpp
+++ b/common/utils/eventfd.cpp
@@ -24,9 +24,9 @@
#include "config.hpp"
-#include "ipc/internals/eventfd.hpp"
-#include "ipc/internals/utils.hpp"
-#include "ipc/exception.hpp"
+#include "utils/eventfd.hpp"
+#include "utils/exception.hpp"
+#include "utils/fd-utils.hpp"
#include "logger/logger.hpp"
#include
@@ -35,24 +35,20 @@
#include
namespace vasum {
-namespace ipc {
+namespace utils {
EventFD::EventFD()
{
- mFD = ::eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
+ mFD = ::eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC);
if (mFD == -1) {
- LOGE("Error in eventfd: " << std::string(strerror(errno)));
- throw IPCException("Error in eventfd: " + std::string(strerror(errno)));
+ LOGE("Error in eventfd: " << getSystemErrorMessage());
+ throw UtilsException("Error in eventfd: " + getSystemErrorMessage());
}
}
EventFD::~EventFD()
{
- try {
- ipc::close(mFD);
- } catch (IPCException& e) {
- LOGE("Error in Event's destructor: " << e.what());
- }
+ utils::close(mFD);
}
int EventFD::getFD() const
@@ -63,15 +59,15 @@ int EventFD::getFD() const
void EventFD::send()
{
const std::uint64_t toSend = 1;
- ipc::write(mFD, &toSend, sizeof(toSend));
+ utils::write(mFD, &toSend, sizeof(toSend));
}
void EventFD::receive()
{
std::uint64_t readBuffer;
- ipc::read(mFD, &readBuffer, sizeof(readBuffer));
+ utils::read(mFD, &readBuffer, sizeof(readBuffer));
}
-} // namespace ipc
+} // namespace utils
} // namespace vasum
diff --git a/common/ipc/internals/eventfd.hpp b/common/utils/eventfd.hpp
similarity index 88%
rename from common/ipc/internals/eventfd.hpp
rename to common/utils/eventfd.hpp
index cf3094c..b3d5027 100644
--- a/common/ipc/internals/eventfd.hpp
+++ b/common/utils/eventfd.hpp
@@ -22,17 +22,18 @@
* @brief Eventfd wrapper
*/
-#ifndef COMMON_IPC_INTERNALS_EVENTFD_HPP
-#define COMMON_IPC_INTERNALS_EVENTFD_HPP
+#ifndef COMMON_UTILS_EVENTFD_HPP
+#define COMMON_UTILS_EVENTFD_HPP
namespace vasum {
-namespace ipc {
+namespace utils {
class EventFD {
public:
EventFD();
~EventFD();
+
EventFD(const EventFD& eventfd) = delete;
EventFD& operator=(const EventFD&) = delete;
@@ -56,7 +57,7 @@ private:
int mFD;
};
-} // namespace ipc
+} // namespace utils
} // namespace vasum
-#endif // COMMON_IPC_INTERNALS_EVENTFD_HPP
+#endif // COMMON_UTILS_EVENTFD_HPP
diff --git a/common/ipc/internals/utils.cpp b/common/utils/fd-utils.cpp
similarity index 68%
rename from common/ipc/internals/utils.cpp
rename to common/utils/fd-utils.cpp
index cb9c93c..59bad36 100644
--- a/common/ipc/internals/utils.cpp
+++ b/common/utils/fd-utils.cpp
@@ -19,13 +19,13 @@
/**
* @file
* @author Jan Olszak (j.olszak@samsung.com)
- * @brief Utility functions
+ * @brief File descriptor utility functions
*/
#include "config.hpp"
-#include "ipc/exception.hpp"
-#include "ipc/internals/utils.hpp"
+#include "utils/fd-utils.hpp"
+#include "utils/exception.hpp"
#include "logger/logger.hpp"
#include
@@ -40,7 +40,7 @@ namespace fs = boost::filesystem;
namespace chr = std::chrono;
namespace vasum {
-namespace ipc {
+namespace utils {
namespace {
@@ -51,13 +51,14 @@ void waitForEvent(int fd,
// Wait for the rest of the data
struct pollfd fds[1];
fds[0].fd = fd;
- fds[0].events = event | POLLHUP;
+ fds[0].events = event;
for (;;) {
chr::milliseconds timeoutMS = chr::duration_cast(deadline - chr::high_resolution_clock::now());
if (timeoutMS.count() < 0) {
- LOGE("Timeout in read");
- throw IPCException("Timeout in read");
+ LOGE("Timeout while waiting for event: " << std::hex << event <<
+ " on fd: " << std::dec << fd);
+ throw UtilsException("Timeout");
}
int ret = ::poll(fds, 1 /*fds size*/, timeoutMS.count());
@@ -66,13 +67,13 @@ void waitForEvent(int fd,
if (errno == EINTR) {
continue;
}
- LOGE("Error in poll: " + std::string(strerror(errno)));
- throw IPCException("Error in poll: " + std::string(strerror(errno)));
+ LOGE("Error in poll: " + getSystemErrorMessage());
+ throw UtilsException("Error in poll: " + getSystemErrorMessage());
}
if (ret == 0) {
LOGE("Timeout in read");
- throw IPCException("Timeout in read");
+ throw UtilsException("Timeout in read");
}
if (fds[0].revents & event) {
@@ -82,7 +83,7 @@ void waitForEvent(int fd,
if (fds[0].revents & POLLHUP) {
LOGW("Peer disconnected");
- throw IPCException("Peer disconnected");
+ throw UtilsException("Peer disconnected");
}
}
}
@@ -98,11 +99,10 @@ void close(int fd)
for (;;) {
if (-1 == ::close(fd)) {
if (errno == EINTR) {
- LOGD("Close interrupted by a signal, retrying");
+ LOGT("Close interrupted by a signal, retrying");
continue;
}
- LOGE("Error in close: " << std::string(strerror(errno)));
- throw IPCException("Error in close: " + std::string(strerror(errno)));
+ LOGE("Error in close: " << getSystemErrorMessage());
}
break;
}
@@ -118,22 +118,21 @@ void write(int fd, const void* bufferPtr, const size_t size, int timeoutMS)
int n = ::write(fd,
reinterpret_cast(bufferPtr) + nTotal,
size - nTotal);
- if (n > 0) {
+ if (n >= 0) {
nTotal += n;
+ if (nTotal == size) {
+ // All data is written, break loop
+ break;
+ }
} else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
// Neglected errors
LOGD("Retrying write");
} else {
- LOGE("Error during writing: " + std::string(strerror(errno)));
- throw IPCException("Error during writing: " + std::string(strerror(errno)));
+ LOGE("Error during writing: " + getSystemErrorMessage());
+ throw UtilsException("Error during writing: " + getSystemErrorMessage());
}
- if (nTotal >= size) {
- // All data is written, break loop
- break;
- } else {
- waitForEvent(fd, POLLOUT, deadline);
- }
+ waitForEvent(fd, POLLOUT, deadline);
}
}
@@ -147,22 +146,25 @@ void read(int fd, void* bufferPtr, const size_t size, int timeoutMS)
int n = ::read(fd,
reinterpret_cast(bufferPtr) + nTotal,
size - nTotal);
- if (n > 0) {
+ if (n >= 0) {
nTotal += n;
+ if (nTotal == size) {
+ // All data is read, break loop
+ break;
+ }
+ if (n == 0) {
+ LOGW("Peer disconnected");
+ throw UtilsException("Peer disconnected");
+ }
} else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
// Neglected errors
LOGD("Retrying read");
} else {
- LOGE("Error during reading: " + std::string(strerror(errno)));
- throw IPCException("Error during reading: " + std::string(strerror(errno)));
+ LOGE("Error during reading: " + getSystemErrorMessage());
+ throw UtilsException("Error during reading: " + getSystemErrorMessage());
}
- if (nTotal >= size) {
- // All data is read, break loop
- break;
- } else {
- waitForEvent(fd, POLLIN, deadline);
- }
+ waitForEvent(fd, POLLIN, deadline);
}
}
@@ -170,8 +172,8 @@ unsigned int getMaxFDNumber()
{
struct rlimit rlim;
if (-1 == getrlimit(RLIMIT_NOFILE, &rlim)) {
- LOGE("Error during getrlimit: " + std::string(strerror(errno)));
- throw IPCException("Error during getrlimit: " + std::string(strerror(errno)));
+ LOGE("Error during getrlimit: " + getSystemErrorMessage());
+ throw UtilsException("Error during getrlimit: " + getSystemErrorMessage());
}
return rlim.rlim_cur;
}
@@ -182,8 +184,8 @@ void setMaxFDNumber(unsigned int limit)
rlim.rlim_cur = limit;
rlim.rlim_max = limit;
if (-1 == setrlimit(RLIMIT_NOFILE, &rlim)) {
- LOGE("Error during setrlimit: " + std::string(strerror(errno)));
- throw IPCException("Error during setrlimit: " + std::string(strerror(errno)));
+ LOGE("Error during setrlimit: " + getSystemErrorMessage());
+ throw UtilsException("Error during setrlimit: " + getSystemErrorMessage());
}
}
@@ -194,6 +196,6 @@ unsigned int getFDNumber()
fs::directory_iterator());
}
-} // namespace ipc
+} // namespace utils
} // namespace vasum
diff --git a/common/ipc/internals/utils.hpp b/common/utils/fd-utils.hpp
similarity index 90%
rename from common/ipc/internals/utils.hpp
rename to common/utils/fd-utils.hpp
index c3bcaf1..ac32b0a 100644
--- a/common/ipc/internals/utils.hpp
+++ b/common/utils/fd-utils.hpp
@@ -19,20 +19,19 @@
/**
* @file
* @author Jan Olszak (j.olszak@samsung.com)
- * @brief Utility functions
+ * @brief File descriptor utility functions
*/
-#ifndef COMMON_IPC_INTERNALS_UTILS_HPP
-#define COMMON_IPC_INTERNALS_UTILS_HPP
+#ifndef COMMON_UTILS_FD_HPP
+#define COMMON_UTILS_FD_HPP
#include
namespace vasum {
-namespace ipc {
+namespace utils {
/**
* Close the file descriptor.
- * Repeat until
*/
void close(int fd);
@@ -73,7 +72,7 @@ void setMaxFDNumber(unsigned int limit);
*/
unsigned int getFDNumber();
-} // namespace ipc
+} // namespace utils
} // namespace vasum
-#endif // COMMON_IPC_INTERNALS_UTILS_HPP
+#endif // COMMON_UTILS_FD_HPP
diff --git a/common/utils/glib-poll-dispatcher.cpp b/common/utils/glib-poll-dispatcher.cpp
new file mode 100644
index 0000000..5c66d16
--- /dev/null
+++ b/common/utils/glib-poll-dispatcher.cpp
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief glib epoll dispatcher
+ */
+
+#include "config.hpp"
+#include "utils/glib-poll-dispatcher.hpp"
+#include "utils/callback-wrapper.hpp"
+
+namespace vasum {
+namespace utils {
+
+GlibPollDispatcher::GlibPollDispatcher(EventPoll& poll)
+{
+ mChannel = g_io_channel_unix_new(poll.getPollFD());
+
+ auto dispatchCallback = [&]() {
+ poll.dispatchIteration(0);
+ };
+
+ auto cCallback = [](GIOChannel*, GIOCondition, gpointer data) -> gboolean {
+ getCallbackFromPointer(data)();
+ return TRUE;
+ };
+
+ mWatchId = g_io_add_watch_full(mChannel,
+ G_PRIORITY_DEFAULT,
+ G_IO_IN,
+ cCallback,
+ createCallbackWrapper(dispatchCallback, mGuard.spawn()),
+ &deleteCallbackWrapper);
+}
+
+GlibPollDispatcher::~GlibPollDispatcher()
+{
+ g_source_remove(mWatchId);
+ g_io_channel_unref(mChannel);
+ // mGuard destructor will wait for full unregister of dispatchCallback
+}
+
+} // namespace utils
+} // namespace vasum
diff --git a/common/utils/glib-poll-dispatcher.hpp b/common/utils/glib-poll-dispatcher.hpp
new file mode 100644
index 0000000..07da0c3
--- /dev/null
+++ b/common/utils/glib-poll-dispatcher.hpp
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief glib epoll dispatcher
+ */
+
+#ifndef COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
+#define COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
+
+#include "utils/event-poll.hpp"
+#include "utils/callback-guard.hpp"
+
+#include
+
+namespace vasum {
+namespace utils {
+
+/**
+ * Will dispatch poll events in glib thread
+ */
+class GlibPollDispatcher {
+public:
+ GlibPollDispatcher(EventPoll& poll);
+ ~GlibPollDispatcher();
+private:
+ CallbackGuard mGuard;
+ GIOChannel* mChannel;
+ guint mWatchId;
+};
+
+
+} // namespace utils
+} // namespace vasum
+
+#endif // COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
diff --git a/common/utils/thread-poll-dispatcher.cpp b/common/utils/thread-poll-dispatcher.cpp
new file mode 100644
index 0000000..f350587
--- /dev/null
+++ b/common/utils/thread-poll-dispatcher.cpp
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief Thread epoll dispatcher
+ */
+
+#include "config.hpp"
+#include "utils/thread-poll-dispatcher.hpp"
+
+#include
+
+namespace vasum {
+namespace utils {
+
+ThreadPollDispatcher::ThreadPollDispatcher(EventPoll& poll)
+ : mPoll(poll)
+ , mThread([&]{ poll.dispatchLoop(); })
+{
+ auto controlCallback = [this](int, EventPoll::Events) -> bool {
+ mStopEvent.receive();
+ return false; // break the loop
+ };
+
+ poll.addFD(mStopEvent.getFD(), EPOLLIN, std::move(controlCallback));
+}
+
+ThreadPollDispatcher::~ThreadPollDispatcher()
+{
+ mStopEvent.send();
+ mThread.join();
+ mPoll.removeFD(mStopEvent.getFD());
+}
+
+} // namespace utils
+} // namespace vasum
diff --git a/common/utils/thread-poll-dispatcher.hpp b/common/utils/thread-poll-dispatcher.hpp
new file mode 100644
index 0000000..e54cb4e
--- /dev/null
+++ b/common/utils/thread-poll-dispatcher.hpp
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief Thread epoll dispatcher
+ */
+
+#ifndef COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP
+#define COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP
+
+#include "utils/event-poll.hpp"
+#include "utils/eventfd.hpp"
+
+#include
+
+namespace vasum {
+namespace utils {
+
+/**
+ * Will dispatch poll events in a newly created thread
+ */
+class ThreadPollDispatcher {
+public:
+ ThreadPollDispatcher(EventPoll& poll);
+ ~ThreadPollDispatcher();
+private:
+ EventPoll& mPoll;
+ EventFD mStopEvent;
+ std::thread mThread;
+};
+
+} // namespace utils
+} // namespace vasum
+
+#endif // COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP
diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp
index 193eec9..fa0de22 100644
--- a/tests/unit_tests/ipc/ut-ipc.cpp
+++ b/tests/unit_tests/ipc/ut-ipc.cpp
@@ -592,7 +592,6 @@ BOOST_AUTO_TEST_CASE(ServiceGSource)
l.set();
};
- IPCGSource::Pointer serviceGSource;
Service s(SOCKET_PATH);
s.setMethodHandler(1, echoCallback);
@@ -622,7 +621,6 @@ BOOST_AUTO_TEST_CASE(ClientGSource)
Service s(SOCKET_PATH);
s.start();
- IPCGSource::Pointer clientGSource;
Client c(SOCKET_PATH);
c.setMethodHandler(1, echoCallback);
c.setSignalHandler(2, signalHandler);
diff --git a/tests/unit_tests/utils/ut-event-poll.cpp b/tests/unit_tests/utils/ut-event-poll.cpp
new file mode 100644
index 0000000..e387393
--- /dev/null
+++ b/tests/unit_tests/utils/ut-event-poll.cpp
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief Unit tests of event poll
+ */
+
+#include "config.hpp"
+#include "ut.hpp"
+
+#include "utils/event-poll.hpp"
+#include "logger/logger.hpp"
+#include "ipc/internals/socket.hpp"
+#include "utils/latch.hpp"
+#include "utils/glib-loop.hpp"
+#include "utils/glib-poll-dispatcher.hpp"
+#include "utils/thread-poll-dispatcher.hpp"
+
+#include