LOGS("Client Destructor");
try {
stop();
- } catch (IPCException& e) {
+ } catch (std::exception& e) {
LOGE("Error in Client's destructor: " << e.what());
}
}
LOGT("Destroying Acceptor");
try {
stop();
- } catch (IPCException& e) {
+ } catch (std::exception& e) {
LOGE("Error in destructor: " << e.what());
}
LOGT("Destroyed Acceptor");
#define COMMON_IPC_INTERNALS_EVENT_QUEUE_HPP
#include "ipc/exception.hpp"
-#include "ipc/internals/eventfd.hpp"
+#include "utils/eventfd.hpp"
#include "logger/logger.hpp"
#include <string>
std::mutex mCommunicationMutex;
std::queue<MessageType> mMessages;
- EventFD mEventFD;
+ utils::EventFD mEventFD;
};
template<typename MessageType>
#include "ipc/exception.hpp"
#include "ipc/internals/processor.hpp"
-#include "ipc/internals/utils.hpp"
#include "utils/signal.hpp"
+#include "utils/exception.hpp"
#include <cerrno>
#include <cstring>
LOGS(mLogPrefix + "Processor Destructor");
try {
stop();
- } catch (IPCException& e) {
+ } catch (std::exception& e) {
LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what());
}
}
Processor::Peers::iterator Processor::getPeerInfoIterator(const FileDescriptor fd)
{
- return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&fd](const PeerInfo & peerInfo) {
+ return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [fd](const PeerInfo & peerInfo) {
return fd == peerInfo.socketPtr->getFD();
});
}
Processor::Peers::iterator Processor::getPeerInfoIterator(const PeerID peerID)
{
- return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&peerID](const PeerInfo & peerInfo) {
+ return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [peerID](const PeerInfo & peerInfo) {
return peerID == peerInfo.peerID;
});
}
Socket::Guard guard = socket.getGuard();
socket.read(&methodID, sizeof(methodID));
socket.read(&messageID, sizeof(messageID));
- } catch (const IPCException& e) {
+ } catch (const UtilsException& e) {
LOGE(mLogPrefix + "Error during reading the socket");
removePeerInternal(peerIt,
std::make_exception_ptr(IPCNaughtyPeerException()));
#define COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP
#include "ipc/exception.hpp"
-#include "ipc/internals/eventfd.hpp"
+#include "utils/eventfd.hpp"
#include "logger/logger.hpp"
#include <list>
std::list<Request> mRequests;
std::mutex mStateMutex;
- EventFD mEventFD;
+ utils::EventFD mEventFD;
};
template<typename RequestIdType>
#include "ipc/exception.hpp"
#include "ipc/internals/socket.hpp"
-#include "ipc/internals/utils.hpp"
+#include "utils/fd-utils.hpp"
#include "logger/logger.hpp"
#include <systemd/sd-daemon.h>
Socket::~Socket() noexcept
{
try {
- ipc::close(mFD);
- } catch (IPCException& e) {
+ utils::close(mFD);
+ } catch (std::exception& e) {
LOGE("Error in Socket's destructor: " << e.what());
}
}
void Socket::write(const void* bufferPtr, const size_t size) const
{
Guard guard(mCommunicationMutex);
- ipc::write(mFD, bufferPtr, size);
+ utils::write(mFD, bufferPtr, size);
}
void Socket::read(void* bufferPtr, const size_t size) const
{
Guard guard(mCommunicationMutex);
- ipc::read(mFD, bufferPtr, size);
+ utils::read(mFD, bufferPtr, size);
}
int Socket::getSystemdSocket(const std::string& path)
reinterpret_cast<struct sockaddr*>(&serverAddress),
sizeof(struct sockaddr_un))) {
std::string message = strerror(errno);
- ::close(sockfd);
+ utils::close(sockfd);
LOGE("Error in bind: " << message);
IPCException("Error in bind: " + message);
}
if (-1 == ::listen(sockfd,
MAX_QUEUE_LENGTH)) {
std::string message = strerror(errno);
- ::close(sockfd);
+ utils::close(sockfd);
LOGE("Error in listen: " << message);
IPCException("Error in listen: " + message);
}
if (-1 == connect(fd,
reinterpret_cast<struct sockaddr*>(&serverAddress),
sizeof(struct sockaddr_un))) {
- ::close(fd);
+ utils::close(fd);
LOGE("Error in connect: " + std::string(strerror(errno)));
throw IPCException("Error in connect: " + std::string(strerror(errno)));
}
// Nonblock socket
int flags = fcntl(fd, F_GETFL, 0);
if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
- ::close(fd);
+ utils::close(fd);
LOGE("Error in fcntl: " + std::string(strerror(errno)));
throw IPCException("Error in fcntl: " + std::string(strerror(errno)));
}
LOGS("Service Destructor");
try {
stop();
- } catch (IPCException& e) {
+ } catch (std::exception& e) {
LOGE("Error in Service's destructor: " << e.what());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief C++ epoll wrapper
+ */
+
+#include "config.hpp"
+#include "utils/event-poll.hpp"
+#include "utils/fd-utils.hpp"
+#include "utils/exception.hpp"
+#include "logger/logger.hpp"
+
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <string.h>
+#include <assert.h>
+
+namespace vasum {
+namespace utils {
+
+EventPoll::EventPoll()
+ : mPollFD(::epoll_create1(EPOLL_CLOEXEC))
+{
+ if (mPollFD == -1) {
+ LOGE("Failed to create epoll: " << getSystemErrorMessage());
+ throw UtilsException("Could not create epoll");
+ }
+}
+
+EventPoll::~EventPoll()
+{
+ if (!mCallbacks.empty()) {
+ LOGW("Not removed callbacks: " << mCallbacks.size());
+ assert(0 && "Not removed callbacks left");
+ }
+ utils::close(mPollFD);
+}
+
+int EventPoll::getPollFD() const
+{
+ return mPollFD;
+}
+
+void EventPoll::addFD(const int fd, const Events events, Callback&& callback)
+{
+ std::lock_guard<Mutex> lock(mMutex);
+
+ if (mCallbacks.find(fd) != mCallbacks.end()) {
+ LOGW("Already added fd: " << fd);
+ throw UtilsException("FD already added");
+ }
+
+ if (!addFDInternal(fd, events)) {
+ throw UtilsException("Could not add fd");
+ }
+
+ mCallbacks.insert({fd, std::make_shared<Callback>(std::move(callback))});
+ LOGT("Callback added for " << fd);
+}
+
+void EventPoll::removeFD(const int fd)
+{
+ std::lock_guard<Mutex> lock(mMutex);
+
+ auto iter = mCallbacks.find(fd);
+ if (iter == mCallbacks.end()) {
+ LOGW("Failed to remove nonexistent fd: " << fd);
+ throw UtilsException("FD does not exist");
+ }
+ mCallbacks.erase(iter);
+ removeFDInternal(fd);
+ LOGT("Callback removed for " << fd);
+}
+
+bool EventPoll::dispatchIteration(const int timeoutMs)
+{
+ for (;;) {
+ epoll_event event;
+ int num = epoll_wait(mPollFD, &event, 1, timeoutMs);
+ if (num == 0) {
+ return false; // timeout
+ }
+ if (num < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ LOGE("Failed to wait on epoll: " << getSystemErrorMessage());
+ throw UtilsException("Could not wait for event");
+ }
+
+ // callback could be removed in the meantime, so be careful, find it inside lock
+ std::lock_guard<Mutex> lock(mMutex);
+ auto iter = mCallbacks.find(event.data.fd);
+ if (iter == mCallbacks.end()) {
+ continue;
+ }
+
+ // add ref because removeFD(self) can be called inside callback
+ std::shared_ptr<Callback> callback(iter->second);
+ return (*callback)(event.data.fd, event.events);
+ }
+}
+
+void EventPoll::dispatchLoop()
+{
+ while (dispatchIteration(-1)) {}
+}
+
+bool EventPoll::addFDInternal(const int fd, const Events events)
+{
+ epoll_event event;
+ memset(&event, 0, sizeof(event));
+ event.events = events;
+ event.data.fd = fd;
+
+ if (epoll_ctl(mPollFD, EPOLL_CTL_ADD, fd, &event) == -1) {
+ LOGE("Failed to add fd to poll: " << getSystemErrorMessage());
+ return false;
+ }
+ return true;
+}
+
+void EventPoll::removeFDInternal(const int fd)
+{
+ if (epoll_ctl(mPollFD, EPOLL_CTL_DEL, fd, NULL) == -1) {
+ assert(errno != EBADF); // always removeFD before closing fd locally!
+ // this is important because linux will reuse this fd number
+ LOGE("Failed to remove fd from poll: " << getSystemErrorMessage());
+ }
+}
+
+} // namespace utils
+} // namespace vasum
--- /dev/null
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief C++ epoll wrapper
+ */
+
+#ifndef COMMON_UTILS_EVENT_POLL_HPP
+#define COMMON_UTILS_EVENT_POLL_HPP
+
+#include <functional>
+#include <mutex>
+#include <unordered_map>
+#include <memory>
+
+namespace vasum {
+namespace utils {
+
+class EventPoll {
+public:
+
+ typedef unsigned int Events;
+ typedef std::function<bool(int fd, Events events)> Callback;
+
+ EventPoll();
+ ~EventPoll();
+
+ int getPollFD() const;
+
+ void addFD(const int fd, const Events events, Callback&& callback);
+ void removeFD(const int fd);
+
+ bool dispatchIteration(const int timeoutMs);
+ void dispatchLoop();
+
+private:
+ typedef std::recursive_mutex Mutex;
+
+ const int mPollFD;
+ Mutex mMutex;
+ std::unordered_map<int, std::shared_ptr<Callback>> mCallbacks;
+
+ bool addFDInternal(const int fd, const Events events);
+ void removeFDInternal(const int fd);
+};
+
+
+} // namespace utils
+} // namespace vasum
+
+#endif // COMMON_UTILS_EVENT_POLL_HPP
#include "config.hpp"
-#include "ipc/internals/eventfd.hpp"
-#include "ipc/internals/utils.hpp"
-#include "ipc/exception.hpp"
+#include "utils/eventfd.hpp"
+#include "utils/exception.hpp"
+#include "utils/fd-utils.hpp"
#include "logger/logger.hpp"
#include <sys/eventfd.h>
#include <cstdint>
namespace vasum {
-namespace ipc {
+namespace utils {
EventFD::EventFD()
{
- mFD = ::eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
+ mFD = ::eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC);
if (mFD == -1) {
- LOGE("Error in eventfd: " << std::string(strerror(errno)));
- throw IPCException("Error in eventfd: " + std::string(strerror(errno)));
+ LOGE("Error in eventfd: " << getSystemErrorMessage());
+ throw UtilsException("Error in eventfd: " + getSystemErrorMessage());
}
}
EventFD::~EventFD()
{
- try {
- ipc::close(mFD);
- } catch (IPCException& e) {
- LOGE("Error in Event's destructor: " << e.what());
- }
+ utils::close(mFD);
}
int EventFD::getFD() const
void EventFD::send()
{
const std::uint64_t toSend = 1;
- ipc::write(mFD, &toSend, sizeof(toSend));
+ utils::write(mFD, &toSend, sizeof(toSend));
}
void EventFD::receive()
{
std::uint64_t readBuffer;
- ipc::read(mFD, &readBuffer, sizeof(readBuffer));
+ utils::read(mFD, &readBuffer, sizeof(readBuffer));
}
-} // namespace ipc
+} // namespace utils
} // namespace vasum
* @brief Eventfd wrapper
*/
-#ifndef COMMON_IPC_INTERNALS_EVENTFD_HPP
-#define COMMON_IPC_INTERNALS_EVENTFD_HPP
+#ifndef COMMON_UTILS_EVENTFD_HPP
+#define COMMON_UTILS_EVENTFD_HPP
namespace vasum {
-namespace ipc {
+namespace utils {
class EventFD {
public:
EventFD();
~EventFD();
+
EventFD(const EventFD& eventfd) = delete;
EventFD& operator=(const EventFD&) = delete;
int mFD;
};
-} // namespace ipc
+} // namespace utils
} // namespace vasum
-#endif // COMMON_IPC_INTERNALS_EVENTFD_HPP
+#endif // COMMON_UTILS_EVENTFD_HPP
/**
* @file
* @author Jan Olszak (j.olszak@samsung.com)
- * @brief Utility functions
+ * @brief File descriptor utility functions
*/
#include "config.hpp"
-#include "ipc/exception.hpp"
-#include "ipc/internals/utils.hpp"
+#include "utils/fd-utils.hpp"
+#include "utils/exception.hpp"
#include "logger/logger.hpp"
#include <cerrno>
namespace chr = std::chrono;
namespace vasum {
-namespace ipc {
+namespace utils {
namespace {
// Wait for the rest of the data
struct pollfd fds[1];
fds[0].fd = fd;
- fds[0].events = event | POLLHUP;
+ fds[0].events = event;
for (;;) {
chr::milliseconds timeoutMS = chr::duration_cast<chr::milliseconds>(deadline - chr::high_resolution_clock::now());
if (timeoutMS.count() < 0) {
- LOGE("Timeout in read");
- throw IPCException("Timeout in read");
+ LOGE("Timeout while waiting for event: " << std::hex << event <<
+ " on fd: " << std::dec << fd);
+ throw UtilsException("Timeout");
}
int ret = ::poll(fds, 1 /*fds size*/, timeoutMS.count());
if (errno == EINTR) {
continue;
}
- LOGE("Error in poll: " + std::string(strerror(errno)));
- throw IPCException("Error in poll: " + std::string(strerror(errno)));
+ LOGE("Error in poll: " + getSystemErrorMessage());
+ throw UtilsException("Error in poll: " + getSystemErrorMessage());
}
if (ret == 0) {
LOGE("Timeout in read");
- throw IPCException("Timeout in read");
+ throw UtilsException("Timeout in read");
}
if (fds[0].revents & event) {
if (fds[0].revents & POLLHUP) {
LOGW("Peer disconnected");
- throw IPCException("Peer disconnected");
+ throw UtilsException("Peer disconnected");
}
}
}
for (;;) {
if (-1 == ::close(fd)) {
if (errno == EINTR) {
- LOGD("Close interrupted by a signal, retrying");
+ LOGT("Close interrupted by a signal, retrying");
continue;
}
- LOGE("Error in close: " << std::string(strerror(errno)));
- throw IPCException("Error in close: " + std::string(strerror(errno)));
+ LOGE("Error in close: " << getSystemErrorMessage());
}
break;
}
int n = ::write(fd,
reinterpret_cast<const char*>(bufferPtr) + nTotal,
size - nTotal);
- if (n > 0) {
+ if (n >= 0) {
nTotal += n;
+ if (nTotal == size) {
+ // All data is written, break loop
+ break;
+ }
} else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
// Neglected errors
LOGD("Retrying write");
} else {
- LOGE("Error during writing: " + std::string(strerror(errno)));
- throw IPCException("Error during writing: " + std::string(strerror(errno)));
+ LOGE("Error during writing: " + getSystemErrorMessage());
+ throw UtilsException("Error during writing: " + getSystemErrorMessage());
}
- if (nTotal >= size) {
- // All data is written, break loop
- break;
- } else {
- waitForEvent(fd, POLLOUT, deadline);
- }
+ waitForEvent(fd, POLLOUT, deadline);
}
}
int n = ::read(fd,
reinterpret_cast<char*>(bufferPtr) + nTotal,
size - nTotal);
- if (n > 0) {
+ if (n >= 0) {
nTotal += n;
+ if (nTotal == size) {
+ // All data is read, break loop
+ break;
+ }
+ if (n == 0) {
+ LOGW("Peer disconnected");
+ throw UtilsException("Peer disconnected");
+ }
} else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
// Neglected errors
LOGD("Retrying read");
} else {
- LOGE("Error during reading: " + std::string(strerror(errno)));
- throw IPCException("Error during reading: " + std::string(strerror(errno)));
+ LOGE("Error during reading: " + getSystemErrorMessage());
+ throw UtilsException("Error during reading: " + getSystemErrorMessage());
}
- if (nTotal >= size) {
- // All data is read, break loop
- break;
- } else {
- waitForEvent(fd, POLLIN, deadline);
- }
+ waitForEvent(fd, POLLIN, deadline);
}
}
{
struct rlimit rlim;
if (-1 == getrlimit(RLIMIT_NOFILE, &rlim)) {
- LOGE("Error during getrlimit: " + std::string(strerror(errno)));
- throw IPCException("Error during getrlimit: " + std::string(strerror(errno)));
+ LOGE("Error during getrlimit: " + getSystemErrorMessage());
+ throw UtilsException("Error during getrlimit: " + getSystemErrorMessage());
}
return rlim.rlim_cur;
}
rlim.rlim_cur = limit;
rlim.rlim_max = limit;
if (-1 == setrlimit(RLIMIT_NOFILE, &rlim)) {
- LOGE("Error during setrlimit: " + std::string(strerror(errno)));
- throw IPCException("Error during setrlimit: " + std::string(strerror(errno)));
+ LOGE("Error during setrlimit: " + getSystemErrorMessage());
+ throw UtilsException("Error during setrlimit: " + getSystemErrorMessage());
}
}
fs::directory_iterator());
}
-} // namespace ipc
+} // namespace utils
} // namespace vasum
/**
* @file
* @author Jan Olszak (j.olszak@samsung.com)
- * @brief Utility functions
+ * @brief File descriptor utility functions
*/
-#ifndef COMMON_IPC_INTERNALS_UTILS_HPP
-#define COMMON_IPC_INTERNALS_UTILS_HPP
+#ifndef COMMON_UTILS_FD_HPP
+#define COMMON_UTILS_FD_HPP
#include <cstddef>
namespace vasum {
-namespace ipc {
+namespace utils {
/**
* Close the file descriptor.
- * Repeat until
*/
void close(int fd);
*/
unsigned int getFDNumber();
-} // namespace ipc
+} // namespace utils
} // namespace vasum
-#endif // COMMON_IPC_INTERNALS_UTILS_HPP
+#endif // COMMON_UTILS_FD_HPP
--- /dev/null
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief glib epoll dispatcher
+ */
+
+#include "config.hpp"
+#include "utils/glib-poll-dispatcher.hpp"
+#include "utils/callback-wrapper.hpp"
+
+namespace vasum {
+namespace utils {
+
+GlibPollDispatcher::GlibPollDispatcher(EventPoll& poll)
+{
+ mChannel = g_io_channel_unix_new(poll.getPollFD());
+
+ auto dispatchCallback = [&]() {
+ poll.dispatchIteration(0);
+ };
+
+ auto cCallback = [](GIOChannel*, GIOCondition, gpointer data) -> gboolean {
+ getCallbackFromPointer<decltype(dispatchCallback)>(data)();
+ return TRUE;
+ };
+
+ mWatchId = g_io_add_watch_full(mChannel,
+ G_PRIORITY_DEFAULT,
+ G_IO_IN,
+ cCallback,
+ createCallbackWrapper(dispatchCallback, mGuard.spawn()),
+ &deleteCallbackWrapper<decltype(dispatchCallback)>);
+}
+
+GlibPollDispatcher::~GlibPollDispatcher()
+{
+ g_source_remove(mWatchId);
+ g_io_channel_unref(mChannel);
+ // mGuard destructor will wait for full unregister of dispatchCallback
+}
+
+} // namespace utils
+} // namespace vasum
--- /dev/null
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief glib epoll dispatcher
+ */
+
+#ifndef COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
+#define COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
+
+#include "utils/event-poll.hpp"
+#include "utils/callback-guard.hpp"
+
+#include <gio/gio.h>
+
+namespace vasum {
+namespace utils {
+
+/**
+ * Will dispatch poll events in glib thread
+ */
+class GlibPollDispatcher {
+public:
+ GlibPollDispatcher(EventPoll& poll);
+ ~GlibPollDispatcher();
+private:
+ CallbackGuard mGuard;
+ GIOChannel* mChannel;
+ guint mWatchId;
+};
+
+
+} // namespace utils
+} // namespace vasum
+
+#endif // COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
--- /dev/null
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief Thread epoll dispatcher
+ */
+
+#include "config.hpp"
+#include "utils/thread-poll-dispatcher.hpp"
+
+#include <sys/epoll.h>
+
+namespace vasum {
+namespace utils {
+
+ThreadPollDispatcher::ThreadPollDispatcher(EventPoll& poll)
+ : mPoll(poll)
+ , mThread([&]{ poll.dispatchLoop(); })
+{
+ auto controlCallback = [this](int, EventPoll::Events) -> bool {
+ mStopEvent.receive();
+ return false; // break the loop
+ };
+
+ poll.addFD(mStopEvent.getFD(), EPOLLIN, std::move(controlCallback));
+}
+
+ThreadPollDispatcher::~ThreadPollDispatcher()
+{
+ mStopEvent.send();
+ mThread.join();
+ mPoll.removeFD(mStopEvent.getFD());
+}
+
+} // namespace utils
+} // namespace vasum
--- /dev/null
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief Thread epoll dispatcher
+ */
+
+#ifndef COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP
+#define COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP
+
+#include "utils/event-poll.hpp"
+#include "utils/eventfd.hpp"
+
+#include <thread>
+
+namespace vasum {
+namespace utils {
+
+/**
+ * Will dispatch poll events in a newly created thread
+ */
+class ThreadPollDispatcher {
+public:
+ ThreadPollDispatcher(EventPoll& poll);
+ ~ThreadPollDispatcher();
+private:
+ EventPoll& mPoll;
+ EventFD mStopEvent;
+ std::thread mThread;
+};
+
+} // namespace utils
+} // namespace vasum
+
+#endif // COMMON_UTILS_THREAD_POLL_DISPATCHER_HPP
l.set();
};
- IPCGSource::Pointer serviceGSource;
Service s(SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(1, echoCallback);
Service s(SOCKET_PATH);
s.start();
- IPCGSource::Pointer clientGSource;
Client c(SOCKET_PATH);
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
c.setSignalHandler<RecvData>(2, signalHandler);
--- /dev/null
+/*
+ * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Contact: Piotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+/**
+ * @file
+ * @author Piotr Bartosiewicz (p.bartosiewi@partner.samsung.com)
+ * @brief Unit tests of event poll
+ */
+
+#include "config.hpp"
+#include "ut.hpp"
+
+#include "utils/event-poll.hpp"
+#include "logger/logger.hpp"
+#include "ipc/internals/socket.hpp"
+#include "utils/latch.hpp"
+#include "utils/glib-loop.hpp"
+#include "utils/glib-poll-dispatcher.hpp"
+#include "utils/thread-poll-dispatcher.hpp"
+
+#include <map>
+#include <sys/epoll.h>
+
+using namespace vasum::utils;
+using namespace vasum::ipc;
+
+namespace {
+
+const int unsigned TIMEOUT = 1000;
+#define ADD_EVENT(e) {EPOLL##e, #e}
+const std::map<EventPoll::Events, std::string> EVENT_NAMES = {
+ ADD_EVENT(IN),
+ ADD_EVENT(OUT),
+ ADD_EVENT(ERR),
+ ADD_EVENT(HUP),
+ ADD_EVENT(RDHUP),
+};
+#undef ADD_EVENT
+
+std::string strEvents(EventPoll::Events events)
+{
+ if (events == 0) {
+ return "<none>";
+ }
+ std::ostringstream ss;
+ for (const auto& p : EVENT_NAMES) {
+ if (events & p.first) {
+ ss << p.second << ", ";
+ events &= ~p.first;
+ }
+ }
+ if (events != 0) {
+ ss << std::hex << events;
+ return ss.str();
+ } else {
+ std::string ret = ss.str();
+ ret.resize(ret.size() - 2);
+ return ret;
+ }
+}
+
+} // namespace
+
+BOOST_AUTO_TEST_SUITE(EventPollSuite)
+
+BOOST_AUTO_TEST_CASE(EmptyPoll)
+{
+ EventPoll poll;
+ BOOST_CHECK(!poll.dispatchIteration(0));
+}
+
+BOOST_AUTO_TEST_CASE(ThreadedPoll)
+{
+ EventPoll poll;
+ ThreadPollDispatcher dispatcher(poll);
+}
+
+BOOST_AUTO_TEST_CASE(GlibPoll)
+{
+ ScopedGlibLoop loop;
+
+ EventPoll poll;
+ GlibPollDispatcher dispatcher(poll);
+}
+
+void doSocketTest(EventPoll& poll, Latch& goodMessage, Latch& remoteClosed)
+{
+ const std::string PATH = "/tmp/ut-poll.sock";
+ const std::string MESSAGE = "This is a test message";
+
+ Socket listen = Socket::createSocket(PATH);
+ std::shared_ptr<Socket> server;
+
+ auto serverCallback = [&](int, EventPoll::Events events) -> bool {
+ LOGD("Server events: " << strEvents(events));
+
+ if (events & EPOLLOUT) {
+ server->write(MESSAGE.data(), MESSAGE.size());
+ poll.removeFD(server->getFD());
+ server.reset();
+ }
+ return true;
+ };
+
+ auto listenCallback = [&](int, EventPoll::Events events) -> bool {
+ LOGD("Listen events: " << strEvents(events));
+ if (events & EPOLLIN) {
+ server = listen.accept();
+ poll.addFD(server->getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLOUT, serverCallback);
+ }
+ return true;
+ };
+
+ poll.addFD(listen.getFD(), EPOLLIN, listenCallback);
+
+ Socket client = Socket::connectSocket(PATH);
+
+ auto clientCallback = [&](int, EventPoll::Events events) -> bool {
+ LOGD("Client events: " << strEvents(events));
+
+ if (events & EPOLLIN) {
+ std::string ret(MESSAGE.size(), 'x');
+ client.read(&ret.front(), ret.size());
+ if (ret == MESSAGE) {
+ goodMessage.set();
+ }
+ }
+ if (events & EPOLLRDHUP) {
+ poll.removeFD(client.getFD());
+ remoteClosed.set();
+ }
+ return true;
+ };
+
+ poll.addFD(client.getFD(), EPOLLHUP | EPOLLRDHUP | EPOLLIN, clientCallback);
+
+ BOOST_CHECK(goodMessage.wait(TIMEOUT));
+ BOOST_CHECK(remoteClosed.wait(TIMEOUT));
+
+ poll.removeFD(listen.getFD());
+}
+
+BOOST_AUTO_TEST_CASE(ThreadedPollSocket)
+{
+ Latch goodMessage;
+ Latch remoteClosed;
+
+ EventPoll poll;
+ ThreadPollDispatcher dispatcher(poll);
+
+ doSocketTest(poll, goodMessage, remoteClosed);
+}
+
+BOOST_AUTO_TEST_CASE(GlibPollSocket)
+{
+ Latch goodMessage;
+ Latch remoteClosed;
+
+ ScopedGlibLoop loop;
+
+ EventPoll poll;
+ GlibPollDispatcher dispatcher(poll);
+
+ doSocketTest(poll, goodMessage, remoteClosed);
+}
+
+BOOST_AUTO_TEST_CASE(PollStacking)
+{
+ Latch goodMessage;
+ Latch remoteClosed;
+
+ EventPoll outer;
+ EventPoll inner;
+
+ auto dispatchInner = [&](int, EventPoll::Events) -> bool {
+ inner.dispatchIteration(0);
+ return true;
+ };
+
+ outer.addFD(inner.getPollFD(), EPOLLIN, dispatchInner);
+
+ ThreadPollDispatcher dispatcher(outer);
+ doSocketTest(inner, goodMessage, remoteClosed);
+
+ outer.removeFD(inner.getPollFD());
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+