From: Piotr Bartosiewicz
Date: Tue, 10 Mar 2015 11:01:48 +0000 (+0100)
Subject: IPC works on epoll
X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=99314f1553605b796e6ae5b90466113f98e79360;p=platform%2Fcore%2Fsecurity%2Fvasum.git
IPC works on epoll
[Bug/Feature] N/A
[Cause] N/A
[Solution] N/A
[Verification] Run tests
Change-Id: I6f6b5a7a70cecbecbdf0c502d0f8618577892a48
---
diff --git a/common/epoll/event-poll.cpp b/common/epoll/event-poll.cpp
index dc1c1bc..9eb9fa0 100644
--- a/common/epoll/event-poll.cpp
+++ b/common/epoll/event-poll.cpp
@@ -49,6 +49,9 @@ EventPoll::~EventPoll()
{
if (!mCallbacks.empty()) {
LOGW("Not removed callbacks: " << mCallbacks.size());
+ for (const auto& item : mCallbacks) {
+ LOGT("Not removed fd: " << item.first);
+ }
assert(0 && "Not removed callbacks left");
}
utils::close(mPollFD);
@@ -73,7 +76,7 @@ void EventPoll::addFD(const int fd, const Events events, Callback&& callback)
}
mCallbacks.insert({fd, std::make_shared(std::move(callback))});
- LOGT("Callback added for " << fd);
+ LOGT("Callback added for fd: " << fd);
}
void EventPoll::removeFD(const int fd)
@@ -87,7 +90,7 @@ void EventPoll::removeFD(const int fd)
}
mCallbacks.erase(iter);
removeFDInternal(fd);
- LOGT("Callback removed for " << fd);
+ LOGT("Callback removed for fd: " << fd);
}
bool EventPoll::dispatchIteration(const int timeoutMs)
@@ -115,7 +118,13 @@ bool EventPoll::dispatchIteration(const int timeoutMs)
// add ref because removeFD(self) can be called inside callback
std::shared_ptr callback(iter->second);
- return (*callback)(event.data.fd, event.events);
+ try {
+ LOGT("Dispatch fd: " << event.data.fd << ", events: " << eventsToString(event.events));
+ return (*callback)(event.data.fd, event.events);
+ } catch (std::exception& e) {
+ LOGE("Got unexpected exception: " << e.what());
+ assert(0 && "Callback should not throw any exceptions");
+ }
}
}
diff --git a/common/epoll/glib-poll-dispatcher.cpp b/common/epoll/glib-dispatcher.cpp
similarity index 91%
rename from common/epoll/glib-poll-dispatcher.cpp
rename to common/epoll/glib-dispatcher.cpp
index cde535d..9567abb 100644
--- a/common/epoll/glib-poll-dispatcher.cpp
+++ b/common/epoll/glib-dispatcher.cpp
@@ -23,13 +23,13 @@
*/
#include "config.hpp"
-#include "epoll/glib-poll-dispatcher.hpp"
+#include "epoll/glib-dispatcher.hpp"
#include "utils/callback-wrapper.hpp"
namespace vasum {
namespace epoll {
-GlibPollDispatcher::GlibPollDispatcher()
+GlibDispatcher::GlibDispatcher()
{
mChannel = g_io_channel_unix_new(mPoll.getPollFD());
@@ -50,14 +50,14 @@ GlibPollDispatcher::GlibPollDispatcher()
&utils::deleteCallbackWrapper);
}
-GlibPollDispatcher::~GlibPollDispatcher()
+GlibDispatcher::~GlibDispatcher()
{
g_source_remove(mWatchId);
g_io_channel_unref(mChannel);
// mGuard destructor will wait for full unregister of dispatchCallback
}
-EventPoll& GlibPollDispatcher::getPoll()
+EventPoll& GlibDispatcher::getPoll()
{
return mPoll;
}
diff --git a/common/epoll/glib-poll-dispatcher.hpp b/common/epoll/glib-dispatcher.hpp
similarity index 84%
rename from common/epoll/glib-poll-dispatcher.hpp
rename to common/epoll/glib-dispatcher.hpp
index cf300bb..0fa7b3a 100644
--- a/common/epoll/glib-poll-dispatcher.hpp
+++ b/common/epoll/glib-dispatcher.hpp
@@ -22,8 +22,8 @@
* @brief glib epoll dispatcher
*/
-#ifndef COMMON_EPOLL_GLIB_POLL_DISPATCHER_HPP
-#define COMMON_EPOLL_GLIB_POLL_DISPATCHER_HPP
+#ifndef COMMON_EPOLL_GLIB_DISPATCHER_HPP
+#define COMMON_EPOLL_GLIB_DISPATCHER_HPP
#include "epoll/event-poll.hpp"
#include "utils/callback-guard.hpp"
@@ -36,10 +36,10 @@ namespace epoll {
/**
* Will dispatch poll events in glib thread
*/
-class GlibPollDispatcher {
+class GlibDispatcher {
public:
- GlibPollDispatcher();
- ~GlibPollDispatcher();
+ GlibDispatcher();
+ ~GlibDispatcher();
EventPoll& getPoll();
private:
@@ -53,4 +53,4 @@ private:
} // namespace epoll
} // namespace vasum
-#endif // COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
+#endif // COMMON_UTILS_GLIB_DISPATCHER_HPP
diff --git a/common/epoll/thread-poll-dispatcher.cpp b/common/epoll/thread-dispatcher.cpp
similarity index 88%
rename from common/epoll/thread-poll-dispatcher.cpp
rename to common/epoll/thread-dispatcher.cpp
index 797a8c4..a82cad8 100644
--- a/common/epoll/thread-poll-dispatcher.cpp
+++ b/common/epoll/thread-dispatcher.cpp
@@ -23,12 +23,12 @@
*/
#include "config.hpp"
-#include "epoll/thread-poll-dispatcher.hpp"
+#include "epoll/thread-dispatcher.hpp"
namespace vasum {
namespace epoll {
-ThreadPollDispatcher::ThreadPollDispatcher()
+ThreadDispatcher::ThreadDispatcher()
{
auto controlCallback = [this](int, Events) -> bool {
mStopEvent.receive();
@@ -41,14 +41,14 @@ ThreadPollDispatcher::ThreadPollDispatcher()
});
}
-ThreadPollDispatcher::~ThreadPollDispatcher()
+ThreadDispatcher::~ThreadDispatcher()
{
mStopEvent.send();
mThread.join();
mPoll.removeFD(mStopEvent.getFD());
}
-EventPoll& ThreadPollDispatcher::getPoll()
+EventPoll& ThreadDispatcher::getPoll()
{
return mPoll;
}
diff --git a/common/epoll/thread-poll-dispatcher.hpp b/common/epoll/thread-dispatcher.hpp
similarity index 83%
rename from common/epoll/thread-poll-dispatcher.hpp
rename to common/epoll/thread-dispatcher.hpp
index af6b278..7d0f30d 100644
--- a/common/epoll/thread-poll-dispatcher.hpp
+++ b/common/epoll/thread-dispatcher.hpp
@@ -22,8 +22,8 @@
* @brief Thread epoll dispatcher
*/
-#ifndef COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP
-#define COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP
+#ifndef COMMON_EPOLL_THREAD_DISPATCHER_HPP
+#define COMMON_EPOLL_THREAD_DISPATCHER_HPP
#include "epoll/event-poll.hpp"
#include "utils/eventfd.hpp"
@@ -36,10 +36,10 @@ namespace epoll {
/**
* Will dispatch poll events in a newly created thread
*/
-class ThreadPollDispatcher {
+class ThreadDispatcher {
public:
- ThreadPollDispatcher();
- ~ThreadPollDispatcher();
+ ThreadDispatcher();
+ ~ThreadDispatcher();
EventPoll& getPoll();
private:
@@ -51,4 +51,4 @@ private:
} // namespace epoll
} // namespace vasum
-#endif // COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP
+#endif // COMMON_EPOLL_THREAD_DISPATCHER_HPP
diff --git a/common/ipc/client.cpp b/common/ipc/client.cpp
index 6669d8a..abecb71 100644
--- a/common/ipc/client.cpp
+++ b/common/ipc/client.cpp
@@ -31,8 +31,9 @@
namespace vasum {
namespace ipc {
-Client::Client(const std::string& socketPath)
- : mProcessor("[CLIENT] "),
+Client::Client(epoll::EventPoll& eventPoll, const std::string& socketPath)
+ : mEventPoll(eventPoll),
+ mProcessor("[CLIENT] "),
mSocketPath(socketPath)
{
LOGS("Client Constructor");
@@ -50,14 +51,19 @@ Client::~Client()
}
}
-void Client::start(const bool usesExternalPolling)
+void Client::start()
{
+ if (mProcessor.isStarted()) {
+ return;
+ }
LOGS("Client start");
// Initialize the connection with the server
- if (usesExternalPolling) {
- startPoll();
- }
- mProcessor.start(usesExternalPolling);
+ auto handleEvent = [&](int, epoll::Events) -> bool {
+ mProcessor.handleEvent();
+ return true;
+ };
+ mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleEvent);
+ mProcessor.start();
LOGD("Connecting to " + mSocketPath);
auto socketPtr = std::make_shared(Socket::connectSocket(mSocketPath));
@@ -71,34 +77,18 @@ bool Client::isStarted()
void Client::stop()
{
+ if (!mProcessor.isStarted()) {
+ return;
+ }
LOGS("Client stop");
mProcessor.stop();
- if (mIPCGSourcePtr) {
- stopPoll();
- }
-}
-
-void Client::startPoll()
-{
- LOGS("Client startPoll");
- using namespace std::placeholders;
- mIPCGSourcePtr = IPCGSource::create(std::bind(&Client::handle, this, _1, _2));
- mIPCGSourcePtr->addFD(mProcessor.getEventFD());
- mIPCGSourcePtr->attach();
+ mEventPoll.removeFD(mProcessor.getEventFD());
}
-void Client::stopPoll()
-{
- LOGS("Client stopPoll");
-
- mIPCGSourcePtr->removeFD(mProcessor.getEventFD());
- mIPCGSourcePtr->detach();
- mIPCGSourcePtr.reset();
-}
-
-void Client::handle(const FileDescriptor fd, const short pollEvent)
+void Client::handle(const FileDescriptor fd, const epoll::Events pollEvents)
{
+ //TODO remove handle method
LOGS("Client handle");
if (!isStarted()) {
@@ -106,17 +96,12 @@ void Client::handle(const FileDescriptor fd, const short pollEvent)
return;
}
- if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) {
- mProcessor.handleEvent();
- return;
-
- } else if (pollEvent & POLLIN) {
+ if (pollEvents & EPOLLIN) {
mProcessor.handleInput(fd);
- return;
+ }
- } else if (pollEvent & POLLHUP) {
+ if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
mProcessor.handleLostConnection(fd);
- return;
}
}
@@ -124,9 +109,11 @@ void Client::setNewPeerCallback(const PeerCallback& newPeerCallback)
{
LOGS("Client setNewPeerCallback");
auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- if (mIPCGSourcePtr) {
- mIPCGSourcePtr->addFD(fd);
- }
+ auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+ handle(fd, events);
+ return true;
+ };
+ mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
if (newPeerCallback) {
newPeerCallback(peerID, fd);
}
@@ -138,9 +125,7 @@ void Client::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
{
LOGS("Client setRemovedPeerCallback");
auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- if (mIPCGSourcePtr) {
- mIPCGSourcePtr->removeFD(fd);
- }
+ mEventPoll.removeFD(fd);
if (removedPeerCallback) {
removedPeerCallback(peerID, fd);
}
diff --git a/common/ipc/client.hpp b/common/ipc/client.hpp
index 321b4da..a3342b6 100644
--- a/common/ipc/client.hpp
+++ b/common/ipc/client.hpp
@@ -26,9 +26,9 @@
#define COMMON_IPC_CLIENT_HPP
#include "ipc/internals/processor.hpp"
-#include "ipc/ipc-gsource.hpp"
#include "ipc/types.hpp"
#include "ipc/result.hpp"
+#include "epoll/event-poll.hpp"
#include "logger/logger.hpp"
#include
@@ -40,28 +40,24 @@ namespace ipc {
* This class wraps communication via UX sockets for client applications.
* It uses serialization mechanism from libConfig.
*
- * There is one additional thread:
- * - PROCESSOR is responsible for the communication and calling the callbacks
- *
* For message format @see ipc::Processor
*/
class Client {
public:
/**
+ * @param eventPoll event poll
* @param serverPath path to the server's socket
*/
- Client(const std::string& serverPath);
+ Client(epoll::EventPoll& eventPoll, const std::string& serverPath);
~Client();
Client(const Client&) = delete;
Client& operator=(const Client&) = delete;
/**
- * Starts the worker thread
- *
- * @param usesExternalPolling internal or external polling is used
+ * Starts processing
*/
- void start(const bool usesExternalPolling = false);
+ void start();
/**
* @return is the communication thread running
@@ -69,21 +65,11 @@ public:
bool isStarted();
/**
- * Stops all worker thread
+ * Stops processing
*/
void stop();
/**
- * Used with an external polling loop.
- * Handles one event from the file descriptor.
- *
- * @param fd file descriptor
- * @param pollEvent event on the fd. Defined in poll.h
- *
- */
- void handle(const FileDescriptor fd, const short pollEvent);
-
- /**
* Set the callback called for each new connection to a peer
*
* @param newPeerCallback the callback
@@ -170,14 +156,13 @@ public:
const std::shared_ptr& data);
private:
-
- void startPoll();
- void stopPoll();
-
+ epoll::EventPoll& mEventPoll;
PeerID mServiceID;
Processor mProcessor;
std::string mSocketPath;
- IPCGSource::Pointer mIPCGSourcePtr;
+
+ void handle(const FileDescriptor fd, const epoll::Events pollEvents);
+
};
template
diff --git a/common/ipc/internals/acceptor.cpp b/common/ipc/internals/acceptor.cpp
index ecb2210..15b9ad8 100644
--- a/common/ipc/internals/acceptor.cpp
+++ b/common/ipc/internals/acceptor.cpp
@@ -24,21 +24,14 @@
#include "config.hpp"
-#include "ipc/exception.hpp"
#include "ipc/internals/acceptor.hpp"
#include "logger/logger.hpp"
-#include
-#include
-#include
-#include
-
namespace vasum {
namespace ipc {
Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback)
- : mIsRunning(false),
- mNewConnectionCallback(newConnectionCallback),
+ : mNewConnectionCallback(newConnectionCallback),
mSocket(Socket::createSocket(socketPath))
{
LOGT("Creating Acceptor for socket " << socketPath);
@@ -46,98 +39,15 @@ Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& n
Acceptor::~Acceptor()
{
- LOGT("Destroying Acceptor");
- try {
- stop();
- } catch (std::exception& e) {
- LOGE("Error in destructor: " << e.what());
- }
LOGT("Destroyed Acceptor");
}
-void Acceptor::start()
-{
- LOGT("Starting Acceptor");
- if (!mThread.joinable()) {
- mThread = std::thread(&Acceptor::run, this);
- }
- LOGT("Started Acceptor");
-}
-
-void Acceptor::stop()
-{
- LOGT("Stopping Acceptor");
-
- if (mThread.joinable()) {
- mEventQueue.send(Event::FINISH);
- LOGT("Waiting for Acceptor to finish");
- mThread.join();
- }
-
- LOGT("Stopped Acceptor");
-}
-
-void Acceptor::run()
-{
- // Setup polling structure
- std::vector fds(2);
-
- fds[0].fd = mEventQueue.getFD();
- fds[0].events = POLLIN;
-
- fds[1].fd = mSocket.getFD();
- fds[1].events = POLLIN;
-
- mIsRunning = true;
- while (mIsRunning) {
- LOGT("Waiting for new connections...");
-
- int ret = ::poll(fds.data(), fds.size(), -1 /*blocking call*/);
-
- LOGT("...Incoming connection!");
-
- if (ret == -1 || ret == 0) {
- if (errno == EINTR) {
- continue;
- }
- LOGE("Error in poll: " << std::string(strerror(errno)));
- throw IPCException("Error in poll: " + std::string(strerror(errno)));
- }
-
- // Check for incoming connections
- if (fds[1].revents & POLLIN) {
- fds[1].revents = 0;
- handleConnection();
- }
-
- // Check for incoming events
- if (fds[0].revents & POLLIN) {
- fds[0].revents = 0;
- handleEvent();
- }
- }
- LOGT("Exiting run");
-}
-
void Acceptor::handleConnection()
{
std::shared_ptr tmpSocket = mSocket.accept();
mNewConnectionCallback(tmpSocket);
}
-void Acceptor::handleEvent()
-{
- if (mEventQueue.receive() == Event::FINISH) {
- LOGD("Event FINISH");
- mIsRunning = false;
- }
-}
-
-FileDescriptor Acceptor::getEventFD()
-{
- return mEventQueue.getFD();
-}
-
FileDescriptor Acceptor::getConnectionFD()
{
return mSocket.getFD();
diff --git a/common/ipc/internals/acceptor.hpp b/common/ipc/internals/acceptor.hpp
index f87a0bb..a6e9ec8 100644
--- a/common/ipc/internals/acceptor.hpp
+++ b/common/ipc/internals/acceptor.hpp
@@ -28,11 +28,9 @@
#include "config.hpp"
#include "ipc/internals/socket.hpp"
-#include "ipc/internals/event-queue.hpp"
#include "ipc/types.hpp"
#include
-#include
namespace vasum {
namespace ipc {
@@ -59,51 +57,19 @@ public:
Acceptor& operator=(const Acceptor&) = delete;
/**
- * Starts the thread accepting the new connections.
- */
- void start();
-
- /**
- * Stops the accepting thread.
- */
- void stop();
-
- /**
* Handle one incoming connection.
* Used with external polling
*/
void handleConnection();
/**
- * Handle one event from the internal event's queue
- * Used with external polling
- */
- void handleEvent();
-
- /**
- * @return file descriptor of internal event's queue
- */
- FileDescriptor getEventFD();
-
- /**
* @return file descriptor for the connection socket
*/
FileDescriptor getConnectionFD();
private:
- enum class Event : int {
- FINISH // Shutdown request
- };
-
- bool mIsRunning;
-
NewConnectionCallback mNewConnectionCallback;
Socket mSocket;
-
- EventQueue mEventQueue;
- std::thread mThread;
-
- void run();
};
} // namespace ipc
diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp
index 93f721f..587723c 100644
--- a/common/ipc/internals/processor.cpp
+++ b/common/ipc/internals/processor.cpp
@@ -26,7 +26,6 @@
#include "ipc/exception.hpp"
#include "ipc/internals/processor.hpp"
-#include "utils/signal.hpp"
#include "utils/exception.hpp"
#include
@@ -65,8 +64,6 @@ Processor::Processor(const std::string& logName,
{
LOGS(mLogPrefix + "Processor Constructor");
- utils::signalBlock(SIGPIPE);
-
using namespace std::placeholders;
setSignalHandlerInternal(REGISTER_SIGNAL_METHOD_ID,
std::bind(&Processor::onNewSignals, this, _1, _2));
@@ -104,7 +101,7 @@ bool Processor::isStarted()
return mIsRunning;
}
-void Processor::start(bool usesExternalPolling)
+void Processor::start()
{
LOGS(mLogPrefix + "Processor start");
@@ -112,10 +109,6 @@ void Processor::start(bool usesExternalPolling)
if (!mIsRunning) {
LOGI(mLogPrefix + "Processor start");
mIsRunning = true;
- mUsesExternalPolling = usesExternalPolling;
- if (!usesExternalPolling) {
- mThread = std::thread(&Processor::run, this);
- }
}
}
@@ -133,15 +126,12 @@ void Processor::stop()
LOGD(mLogPrefix + "Waiting for the Processor to stop");
- if (mThread.joinable()) {
- mThread.join();
- } else {
- // Wait till the FINISH request is served
- Lock lock(mStateMutex);
- conditionPtr->wait(lock, [this]() {
- return !mIsRunning;
- });
- }
+ // Wait till the FINISH request is served
+ Lock lock(mStateMutex);
+ conditionPtr->wait(lock, [this]() {
+ return !mIsRunning;
+ });
+ assert(mPeerInfo.empty());
}
}
@@ -204,7 +194,8 @@ PeerID Processor::addPeer(const std::shared_ptr& socketPtr)
auto requestPtr = std::make_shared(socketPtr);
mRequestQueue.pushBack(Event::ADD_PEER, requestPtr);
- LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID);
+ LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID
+ << ", fd: " << socketPtr->getFD());
return requestPtr->peerID;
}
@@ -233,14 +224,14 @@ void Processor::removePeerSyncInternal(const PeerID peerID, Lock& lock)
void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_ptr& exceptionPtr)
{
- LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
- LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
-
if (peerIt == mPeerInfo.end()) {
LOGW("Peer already removed");
return;
}
+ LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
+ LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
+
// Remove from signal addressees
for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
it->second.remove(peerIt->peerID);
@@ -270,100 +261,6 @@ void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_
mPeerInfo.erase(peerIt);
}
-void Processor::resetPolling()
-{
- LOGS(mLogPrefix + "Processor resetPolling");
-
- if (mUsesExternalPolling) {
- return;
- }
-
- // Setup polling on eventfd and sockets
- mFDs.resize(mPeerInfo.size() + 1);
- LOGI(mLogPrefix + "Reseting mFDS.size: " << mFDs.size());
-
- mFDs[0].fd = mRequestQueue.getFD();
- mFDs[0].events = POLLIN;
-
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- auto fd = mPeerInfo[i - 1].socketPtr->getFD();
-
- LOGI(mLogPrefix + "Reseting fd: " << fd);
-
- mFDs[i].fd = fd;
- mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
- // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
- }
-}
-
-void Processor::run()
-{
- LOGS(mLogPrefix + "Processor run");
-
- {
- Lock lock(mStateMutex);
- resetPolling();
- }
-
- while (isStarted()) {
- LOGT(mLogPrefix + "Waiting for communication...");
- int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
- LOGT(mLogPrefix + "... incoming communication!");
- if (ret == -1 || ret == 0) {
- if (errno == EINTR) {
- continue;
- }
- LOGE(mLogPrefix + "Error in poll: " << std::string(strerror(errno)));
- throw IPCException("Error in poll: " + std::string(strerror(errno)));
- }
-
- // Check for lost connections:
- if (handleLostConnections()) {
- // mFDs changed
- resetPolling();
- continue;
- }
-
- // Check for incoming data.
- if (handleInputs()) {
- // mFDs changed
- resetPolling();
- continue;
- }
-
- // Check for incoming events
- if (mFDs[0].revents & POLLIN) {
- mFDs[0].revents &= ~(POLLIN);
- if (handleEvent()) {
- // mFDs changed
- resetPolling();
- continue;
- }
- }
-
- }
-}
-
-bool Processor::handleLostConnections()
-{
- Lock lock(mStateMutex);
-
- bool isPeerRemoved = false;
-
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- if (mFDs[i].revents & POLLHUP) {
- auto peerIt = getPeerInfoIterator(mFDs[i].fd);
- LOGI(mLogPrefix + "Lost connection to peer: " << peerIt->peerID);
- mFDs[i].revents &= ~(POLLHUP);
- removePeerInternal(peerIt,
- std::make_exception_ptr(IPCPeerDisconnectedException()));
- isPeerRemoved = true;
- }
- }
-
- return isPeerRemoved;
-}
-
bool Processor::handleLostConnection(const FileDescriptor fd)
{
Lock lock(mStateMutex);
@@ -373,21 +270,6 @@ bool Processor::handleLostConnection(const FileDescriptor fd)
return true;
}
-bool Processor::handleInputs()
-{
- // Lock not needed, mFDs won't be changed by handleInput
-
- bool pollChanged = false;
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- if (mFDs[i].revents & POLLIN) {
- mFDs[i].revents &= ~(POLLIN);
- pollChanged = pollChanged || handleInput(mFDs[i].fd);
- }
- }
-
- return pollChanged;
-}
-
bool Processor::handleInput(const FileDescriptor fd)
{
LOGS(mLogPrefix + "Processor handleInput fd: " << fd);
@@ -795,6 +677,12 @@ bool Processor::onFinishRequest(FinishRequest& request)
}
}
+ // Close peers
+ while (!mPeerInfo.empty()) {
+ removePeerInternal(--mPeerInfo.end(),
+ std::make_exception_ptr(IPCClosingException()));
+ }
+
mIsRunning = false;
request.conditionPtr->notify_all();
diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp
index 86a3f42..6e0ae81 100644
--- a/common/ipc/internals/processor.hpp
+++ b/common/ipc/internals/processor.hpp
@@ -43,7 +43,6 @@
#include "logger/logger-scope.hpp"
#include
-#include
#include
#include
#include
@@ -136,12 +135,9 @@ public:
/**
- * Start the processing thread.
- * Quits immediately after starting the thread.
- *
- * @param usesExternalPolling internal or external polling is used
+ * Start processing.
*/
- void start(const bool usesExternalPolling);
+ void start();
/**
* @return is processor running
@@ -432,14 +428,12 @@ private:
RequestQueue mRequestQueue;
bool mIsRunning;
- bool mUsesExternalPolling;
std::unordered_map> mMethodsCallbacks;
std::unordered_map> mSignalsCallbacks;
std::unordered_map> mSignalsPeers;
Peers mPeerInfo;
- std::vector mFDs;
std::unordered_map mReturnCallbacks;
@@ -451,8 +445,6 @@ private:
unsigned int mMaxNumberOfPeers;
- std::thread mThread;
-
template
void setMethodHandlerInternal(const MethodID methodID,
const typename MethodHandler::type& process);
@@ -466,8 +458,6 @@ private:
const PeerID peerID,
const std::shared_ptr& data);
- void run();
-
// Request handlers
bool onMethodRequest(MethodRequest& request);
bool onSignalRequest(SignalRequest& request);
@@ -476,9 +466,6 @@ private:
bool onSendResultRequest(SendResultRequest& request);
bool onFinishRequest(FinishRequest& request);
- bool handleLostConnections();
- bool handleInputs();
-
bool onReturnValue(Peers::iterator& peerIt,
const MessageID messageID);
bool onRemoteMethod(Peers::iterator& peerIt,
@@ -489,7 +476,6 @@ private:
const MethodID methodID,
const MessageID messageID,
std::shared_ptr signalCallbacks);
- void resetPolling();
void removePeerInternal(Peers::iterator peerIt,
const std::exception_ptr& exceptionPtr);
diff --git a/common/ipc/service.cpp b/common/ipc/service.cpp
index 442d804..c436726 100644
--- a/common/ipc/service.cpp
+++ b/common/ipc/service.cpp
@@ -33,10 +33,12 @@ using namespace std::placeholders;
namespace vasum {
namespace ipc {
-Service::Service(const std::string& socketPath,
+Service::Service(epoll::EventPoll& eventPoll,
+ const std::string& socketPath,
const PeerCallback& addPeerCallback,
const PeerCallback& removePeerCallback)
- : mProcessor("[SERVICE] "),
+ : mEventPoll(eventPoll),
+ mProcessor("[SERVICE] "),
mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1))
{
@@ -55,19 +57,23 @@ Service::~Service()
}
}
-void Service::start(const bool usesExternalPolling)
+void Service::start()
{
- LOGS("Service start");
- if (usesExternalPolling) {
- startPoll();
- }
- mProcessor.start(usesExternalPolling);
-
- // There can be an incoming connection from mAcceptor before mProcessor is listening,
- // but it's OK. It will handle the connection when ready. So no need to wait for mProcessor.
- if (!usesExternalPolling) {
- mAcceptor.start();
+ if (mProcessor.isStarted()) {
+ return;
}
+ LOGS("Service start");
+ auto handleConnection = [&](int, epoll::Events) -> bool {
+ mAcceptor.handleConnection();
+ return true;
+ };
+ auto handleProcessorEvent = [&](int, epoll::Events) -> bool {
+ mProcessor.handleEvent();
+ return true;
+ };
+ mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection);
+ mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleProcessorEvent);
+ mProcessor.start();
}
bool Service::isStarted()
@@ -77,39 +83,19 @@ bool Service::isStarted()
void Service::stop()
{
+ if (!mProcessor.isStarted()) {
+ return;
+ }
LOGS("Service stop");
- mAcceptor.stop();
mProcessor.stop();
- if (mIPCGSourcePtr) {
- stopPoll();
- }
-}
-
-void Service::startPoll()
-{
- LOGS("Service startPoll");
-
- mIPCGSourcePtr = IPCGSource::create(std::bind(&Service::handle, this, _1, _2));
- mIPCGSourcePtr->addFD(mAcceptor.getEventFD());
- mIPCGSourcePtr->addFD(mAcceptor.getConnectionFD());
- mIPCGSourcePtr->addFD(mProcessor.getEventFD());
- mIPCGSourcePtr->attach();
+ mEventPoll.removeFD(mAcceptor.getConnectionFD());
+ mEventPoll.removeFD(mProcessor.getEventFD());
}
-void Service::stopPoll()
-{
- LOGS("Service stopPoll");
-
- mIPCGSourcePtr->removeFD(mAcceptor.getEventFD());
- mIPCGSourcePtr->removeFD(mAcceptor.getConnectionFD());
- mIPCGSourcePtr->removeFD(mProcessor.getEventFD());
- mIPCGSourcePtr->detach();
- mIPCGSourcePtr.reset();
-}
-
-void Service::handle(const FileDescriptor fd, const short pollEvent)
+void Service::handle(const FileDescriptor fd, const epoll::Events pollEvents)
{
+ //TODO remove handle method
LOGS("Service handle");
if (!isStarted()) {
@@ -117,25 +103,12 @@ void Service::handle(const FileDescriptor fd, const short pollEvent)
return;
}
- if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) {
- mProcessor.handleEvent();
- return;
-
- } else if (fd == mAcceptor.getConnectionFD() && (pollEvent & POLLIN)) {
- mAcceptor.handleConnection();
- return;
-
- } else if (fd == mAcceptor.getEventFD() && (pollEvent & POLLIN)) {
- mAcceptor.handleEvent();
- return;
-
- } else if (pollEvent & POLLIN) {
+ if (pollEvents & EPOLLIN) {
mProcessor.handleInput(fd);
- return;
+ }
- } else if (pollEvent & POLLHUP) {
+ if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
mProcessor.handleLostConnection(fd);
- return;
}
}
@@ -143,9 +116,11 @@ void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
{
LOGS("Service setNewPeerCallback");
auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- if (mIPCGSourcePtr) {
- mIPCGSourcePtr->addFD(fd);
- }
+ auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+ handle(fd, events);
+ return true;
+ };
+ mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
if (newPeerCallback) {
newPeerCallback(peerID, fd);
}
@@ -157,9 +132,7 @@ void Service::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
{
LOGS("Service setRemovedPeerCallback");
auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- if (mIPCGSourcePtr) {
- mIPCGSourcePtr->removeFD(fd);
- }
+ mEventPoll.removeFD(fd);
if (removedPeerCallback) {
removedPeerCallback(peerID, fd);
}
diff --git a/common/ipc/service.hpp b/common/ipc/service.hpp
index 022c9c9..880c137 100644
--- a/common/ipc/service.hpp
+++ b/common/ipc/service.hpp
@@ -27,9 +27,9 @@
#include "ipc/internals/processor.hpp"
#include "ipc/internals/acceptor.hpp"
-#include "ipc/ipc-gsource.hpp"
#include "ipc/types.hpp"
#include "ipc/result.hpp"
+#include "epoll/event-poll.hpp"
#include "logger/logger.hpp"
#include
@@ -42,18 +42,16 @@ namespace ipc {
* This class wraps communication via UX sockets.
* It uses serialization mechanism from libConfig.
*
- * There are two working threads:
- * - ACCEPTOR accepts incoming connections and passes them to PROCESSOR
- * - PROCESSOR is responsible for the communication and calling the callbacks
- *
* For message format @see ipc::Processor
*/
class Service {
public:
/**
+ * @param eventPoll event poll
* @param path path to the socket
*/
- Service(const std::string& path,
+ Service(epoll::EventPoll& eventPoll,
+ const std::string& path,
const PeerCallback& addPeerCallback = nullptr,
const PeerCallback& removePeerCallback = nullptr);
~Service();
@@ -62,11 +60,9 @@ public:
Service& operator=(const Service&) = delete;
/**
- * Starts the worker and acceptor threads
- *
- * @param usesExternalPolling internal or external polling is used
+ * Starts processing
*/
- void start(const bool usesExternalPolling = false);
+ void start();
/**
* @return is the communication thread running
@@ -79,16 +75,6 @@ public:
void stop();
/**
- * Used with an external polling loop.
- * Handles one event from the file descriptor.
- *
- * @param fd file descriptor
- * @param pollEvent event on the fd. Defined in poll.h
- *
- */
- void handle(const FileDescriptor fd, const short pollEvent);
-
- /**
* Set the callback called for each new connection to a peer
*
* @param newPeerCallback the callback
@@ -175,14 +161,11 @@ public:
void signal(const MethodID methodID,
const std::shared_ptr& data);
private:
-
- void startPoll();
- void stopPoll();
-
- typedef std::lock_guard Lock;
+ epoll::EventPoll& mEventPoll;
Processor mProcessor;
Acceptor mAcceptor;
- IPCGSource::Pointer mIPCGSourcePtr;
+
+ void handle(const FileDescriptor fd, const epoll::Events pollEvents);
};
diff --git a/tests/scripts/vsm_test_parser.py b/tests/scripts/vsm_test_parser.py
index 41dd495..d0d4fa7 100644
--- a/tests/scripts/vsm_test_parser.py
+++ b/tests/scripts/vsm_test_parser.py
@@ -35,7 +35,7 @@ class Logger(object):
__indentChar = " "
def testCaseSummary(self, testSuite, testName, testResult, recLevel):
- msg = self.__indentChar * recLevel + BOLD + "{:<50}".format(testName)
+ msg = self.__indentChar * recLevel + BOLD + "{:<50} ".format(testName)
if testResult == "passed":
msg += GREEN
diff --git a/tests/unit_tests/epoll/ut-event-poll.cpp b/tests/unit_tests/epoll/ut-event-poll.cpp
index 0bcbe81..00cb385 100644
--- a/tests/unit_tests/epoll/ut-event-poll.cpp
+++ b/tests/unit_tests/epoll/ut-event-poll.cpp
@@ -31,8 +31,8 @@
#include "ipc/internals/socket.hpp"
#include "utils/latch.hpp"
#include "utils/glib-loop.hpp"
-#include "epoll/glib-poll-dispatcher.hpp"
-#include "epoll/thread-poll-dispatcher.hpp"
+#include "epoll/glib-dispatcher.hpp"
+#include "epoll/thread-dispatcher.hpp"
using namespace vasum::utils;
using namespace vasum::epoll;
@@ -54,14 +54,14 @@ BOOST_AUTO_TEST_CASE(EmptyPoll)
BOOST_AUTO_TEST_CASE(ThreadedPoll)
{
- ThreadPollDispatcher dispatcher;
+ ThreadDispatcher dispatcher;
}
BOOST_AUTO_TEST_CASE(GlibPoll)
{
ScopedGlibLoop loop;
- GlibPollDispatcher dispatcher;
+ GlibDispatcher dispatcher;
}
void doSocketTest(EventPoll& poll)
@@ -126,7 +126,7 @@ void doSocketTest(EventPoll& poll)
BOOST_AUTO_TEST_CASE(ThreadedPollSocket)
{
- ThreadPollDispatcher dispatcher;
+ ThreadDispatcher dispatcher;
doSocketTest(dispatcher.getPoll());
}
@@ -135,14 +135,14 @@ BOOST_AUTO_TEST_CASE(GlibPollSocket)
{
ScopedGlibLoop loop;
- GlibPollDispatcher dispatcher;
+ GlibDispatcher dispatcher;
doSocketTest(dispatcher.getPoll());
}
BOOST_AUTO_TEST_CASE(PollStacking)
{
- ThreadPollDispatcher dispatcher;
+ ThreadDispatcher dispatcher;
EventPoll innerPoll;
diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp
index 088f576..8f47a65 100644
--- a/tests/unit_tests/ipc/ut-ipc.cpp
+++ b/tests/unit_tests/ipc/ut-ipc.cpp
@@ -34,6 +34,8 @@
#include "ipc/client.hpp"
#include "ipc/types.hpp"
#include "ipc/result.hpp"
+#include "epoll/thread-dispatcher.hpp"
+#include "epoll/glib-dispatcher.hpp"
#include "utils/glib-loop.hpp"
#include "utils/latch.hpp"
#include "utils/value-latch.hpp"
@@ -51,11 +53,10 @@
using namespace vasum;
using namespace vasum::ipc;
+using namespace vasum::epoll;
using namespace vasum::utils;
using namespace std::placeholders;
-namespace {
-
// Timeout for sending one message
const int TIMEOUT = 1000 /*ms*/;
@@ -68,15 +69,28 @@ const int LONG_OPERATION_TIME = 1000 + TIMEOUT;
const std::string TEST_DIR = "/tmp/ut-ipc";
const std::string SOCKET_PATH = TEST_DIR + "/test.socket";
-struct Fixture {
+struct FixtureBase {
ScopedDir mTestPathGuard;
- Fixture()
+ FixtureBase()
: mTestPathGuard(TEST_DIR)
{
}
};
+struct ThreadedFixture : FixtureBase {
+ ThreadDispatcher dispatcher;
+
+ EventPoll& getPoll() { return dispatcher.getPoll(); }
+};
+
+struct GlibFixture : FixtureBase {
+ ScopedGlibLoop glibLoop;
+ GlibDispatcher dispatcher;
+
+ EventPoll& getPoll() { return dispatcher.getPoll(); }
+};
+
struct SendData {
int intVal;
SendData(int i): intVal(i) {}
@@ -167,7 +181,7 @@ void longEchoCallback(const PeerID,
methodResult->set(returnData);
}
-PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientGlib = false)
+PeerID connect(Service& s, Client& c)
{
// Connects the Client to the Service and returns Clients PeerID
ValueLatch peerIDLatch;
@@ -178,10 +192,10 @@ PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientG
s.setNewPeerCallback(newPeerCallback);
if (!s.isStarted()) {
- s.start(isServiceGlib);
+ s.start();
}
- c.start(isClientGlib);
+ c.start();
PeerID peerID = peerIDLatch.get(TIMEOUT);
s.setNewPeerCallback(nullptr);
@@ -189,16 +203,6 @@ PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientG
return peerID;
}
-PeerID connectServiceGSource(Service& s, Client& c)
-{
- return connect(s, c, true, false);
-}
-
-PeerID connectClientGSource(Service& s, Client& c)
-{
- return connect(s, c, false, true);
-}
-
void testEcho(Client& c, const MethodID methodID)
{
std::shared_ptr sentData(new SendData(34));
@@ -215,20 +219,17 @@ void testEcho(Service& s, const MethodID methodID, const PeerID peerID)
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-} // namespace
-
-
-BOOST_FIXTURE_TEST_SUITE(IPCSuite, Fixture)
+BOOST_AUTO_TEST_SUITE(IPCSuite)
-BOOST_AUTO_TEST_CASE(ConstructorDestructor)
+MULTI_FIXTURE_TEST_CASE(ConstructorDestructor, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
}
-BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod)
+MULTI_FIXTURE_TEST_CASE(ServiceAddRemoveMethod, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler(1, returnEmptyCallback);
s.setMethodHandler(1, returnDataCallback);
@@ -237,7 +238,7 @@ BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod)
s.setMethodHandler(1, echoCallback);
s.setMethodHandler(2, returnDataCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
testEcho(c, 1);
@@ -247,10 +248,10 @@ BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod)
BOOST_CHECK_THROW(testEcho(c, 2), IPCException);
}
-BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod)
+MULTI_FIXTURE_TEST_CASE(ClientAddRemoveMethod, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler(1, returnEmptyCallback);
c.setMethodHandler(1, returnDataCallback);
@@ -267,9 +268,9 @@ BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod)
BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException);
}
-BOOST_AUTO_TEST_CASE(ServiceStartStop)
+MULTI_FIXTURE_TEST_CASE(ServiceStartStop, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler(1, returnDataCallback);
@@ -282,10 +283,10 @@ BOOST_AUTO_TEST_CASE(ServiceStartStop)
s.start();
}
-BOOST_AUTO_TEST_CASE(ClientStartStop)
+MULTI_FIXTURE_TEST_CASE(ClientStartStop, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler(1, returnDataCallback);
c.start();
@@ -300,27 +301,27 @@ BOOST_AUTO_TEST_CASE(ClientStartStop)
c.stop();
}
-BOOST_AUTO_TEST_CASE(SyncClientToServiceEcho)
+MULTI_FIXTURE_TEST_CASE(SyncClientToServiceEcho, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler(1, echoCallback);
s.setMethodHandler(2, echoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
testEcho(c, 1);
testEcho(c, 2);
}
-BOOST_AUTO_TEST_CASE(Restart)
+MULTI_FIXTURE_TEST_CASE(Restart, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler(1, echoCallback);
s.start();
s.setMethodHandler(2, echoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
testEcho(c, 1);
testEcho(c, 2);
@@ -334,14 +335,19 @@ BOOST_AUTO_TEST_CASE(Restart)
s.stop();
s.start();
+ BOOST_CHECK_THROW(testEcho(c, 2), IPCException);
+
+ c.stop();
+ c.start();
+
testEcho(c, 1);
testEcho(c, 2);
}
-BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho)
+MULTI_FIXTURE_TEST_CASE(SyncServiceToClientEcho, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler(1, echoCallback);
PeerID peerID = connect(s, c);
@@ -351,16 +357,16 @@ BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho)
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho)
+MULTI_FIXTURE_TEST_CASE(AsyncClientToServiceEcho, F, ThreadedFixture, GlibFixture)
{
std::shared_ptr sentData(new SendData(34));
ValueLatch> recvDataLatch;
// Setup Service and Client
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler(1, echoCallback);
s.start();
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
//Async call
@@ -374,13 +380,13 @@ BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho)
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
+MULTI_FIXTURE_TEST_CASE(AsyncServiceToClientEcho, F, ThreadedFixture, GlibFixture)
{
std::shared_ptr sentData(new SendData(56));
ValueLatch> recvDataLatch;
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler(1, echoCallback);
PeerID peerID = connect(s, c);
@@ -397,24 +403,24 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
}
-BOOST_AUTO_TEST_CASE(SyncTimeout)
+MULTI_FIXTURE_TEST_CASE(SyncTimeout, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler(1, longEchoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
std::shared_ptr sentData(new SendData(78));
BOOST_REQUIRE_THROW((c.callSync(1, sentData, TIMEOUT)), IPCException);
}
-BOOST_AUTO_TEST_CASE(SerializationError)
+MULTI_FIXTURE_TEST_CASE(SerializationError, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler(1, echoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
std::shared_ptr throwingData(new ThrowOnAcceptData());
@@ -423,23 +429,24 @@ BOOST_AUTO_TEST_CASE(SerializationError)
}
-BOOST_AUTO_TEST_CASE(ParseError)
+MULTI_FIXTURE_TEST_CASE(ParseError, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler(1, echoCallback);
s.start();
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
std::shared_ptr sentData(new SendData(78));
BOOST_CHECK_THROW((c.callSync(1, sentData, 10000)), IPCParsingException);
}
-BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
+MULTI_FIXTURE_TEST_CASE(DisconnectedPeerError, F, ThreadedFixture, GlibFixture)
{
ValueLatch> retStatusLatch;
- Service s(SOCKET_PATH);
+
+ Service s(F::getPoll(), SOCKET_PATH);
auto method = [](const PeerID, std::shared_ptr&, MethodResult::Pointer methodResult) {
auto resultData = std::make_shared(1);
@@ -450,7 +457,7 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
s.setMethodHandler(1, method);
s.start();
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
auto dataBack = [&retStatusLatch](Result && r) {
@@ -470,16 +477,16 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
}
-BOOST_AUTO_TEST_CASE(ReadTimeout)
+MULTI_FIXTURE_TEST_CASE(ReadTimeout, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
auto longEchoCallback = [](const PeerID, std::shared_ptr& data, MethodResult::Pointer methodResult) {
auto resultData = std::make_shared(data->intVal, LONG_OPERATION_TIME);
methodResult->set(resultData);
};
s.setMethodHandler(1, longEchoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
// Test timeout on read
@@ -488,13 +495,13 @@ BOOST_AUTO_TEST_CASE(ReadTimeout)
}
-BOOST_AUTO_TEST_CASE(WriteTimeout)
+MULTI_FIXTURE_TEST_CASE(WriteTimeout, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler(1, echoCallback);
s.start();
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
// Test echo with a minimal timeout
@@ -509,13 +516,13 @@ BOOST_AUTO_TEST_CASE(WriteTimeout)
}
-BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
+MULTI_FIXTURE_TEST_CASE(AddSignalInRuntime, F, ThreadedFixture, GlibFixture)
{
ValueLatch> recvDataLatchA;
ValueLatch> recvDataLatchB;
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr& data) {
@@ -545,13 +552,13 @@ BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
}
-BOOST_AUTO_TEST_CASE(AddSignalOffline)
+MULTI_FIXTURE_TEST_CASE(AddSignalOffline, F, ThreadedFixture, GlibFixture)
{
ValueLatch> recvDataLatchA;
ValueLatch> recvDataLatchB;
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr& data) {
recvDataLatchA.set(data);
@@ -581,66 +588,13 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline)
BOOST_CHECK_EQUAL(recvDataB->intVal, sendDataB->intVal);
}
-
-BOOST_AUTO_TEST_CASE(ServiceGSource)
-{
- utils::Latch l;
- ScopedGlibLoop loop;
-
- auto signalHandler = [&l](const PeerID, std::shared_ptr&) {
- l.set();
- };
-
- Service s(SOCKET_PATH);
- s.setMethodHandler(1, echoCallback);
-
- Client c(SOCKET_PATH);
- s.setSignalHandler(2, signalHandler);
-
- connectServiceGSource(s, c);
-
- testEcho(c, 1);
-
- auto data = std::make_shared(1);
- c.signal(2, data);
-
- BOOST_CHECK(l.wait(TIMEOUT));
-}
-
-
-BOOST_AUTO_TEST_CASE(ClientGSource)
-{
- utils::Latch l;
- ScopedGlibLoop loop;
-
- auto signalHandler = [&l](const PeerID, std::shared_ptr&) {
- l.set();
- };
-
- Service s(SOCKET_PATH);
- s.start();
-
- Client c(SOCKET_PATH);
- c.setMethodHandler(1, echoCallback);
- c.setSignalHandler(2, signalHandler);
-
- PeerID peerID = connectClientGSource(s, c);
-
- testEcho(s, 1, peerID);
-
- auto data = std::make_shared(1);
- s.signal(2, data);
-
- BOOST_CHECK(l.wait(TIMEOUT));
-}
-
-BOOST_AUTO_TEST_CASE(UsersError)
+MULTI_FIXTURE_TEST_CASE(UsersError, F, ThreadedFixture, GlibFixture)
{
const int TEST_ERROR_CODE = -234;
const std::string TEST_ERROR_MESSAGE = "Ay, caramba!";
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
auto clientID = connect(s, c);
auto throwingMethodHandler = [&](const PeerID, std::shared_ptr&, MethodResult::Pointer) {
@@ -668,13 +622,13 @@ BOOST_AUTO_TEST_CASE(UsersError)
BOOST_CHECK_EXCEPTION((s.callSync(2, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData);
}
-BOOST_AUTO_TEST_CASE(AsyncResult)
+MULTI_FIXTURE_TEST_CASE(AsyncResult, F, ThreadedFixture, GlibFixture)
{
const int TEST_ERROR_CODE = -567;
const std::string TEST_ERROR_MESSAGE = "Ooo jooo!";
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
auto clientID = connect(s, c);
auto errorMethodHandler = [&](const PeerID, std::shared_ptr&, MethodResult::Pointer methodResult) {
@@ -724,20 +678,44 @@ BOOST_AUTO_TEST_CASE(AsyncResult)
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-// BOOST_AUTO_TEST_CASE(ConnectionLimitTest)
+MULTI_FIXTURE_TEST_CASE(MixOperations, F, ThreadedFixture, GlibFixture)
+{
+ utils::Latch l;
+
+ auto signalHandler = [&l](const PeerID, std::shared_ptr&) {
+ l.set();
+ };
+
+ Service s(F::getPoll(), SOCKET_PATH);
+ s.setMethodHandler(1, echoCallback);
+
+ Client c(F::getPoll(), SOCKET_PATH);
+ s.setSignalHandler(2, signalHandler);
+
+ connect(s, c);
+
+ testEcho(c, 1);
+
+ auto data = std::make_shared(1);
+ c.signal(2, data);
+
+ BOOST_CHECK(l.wait(TIMEOUT));
+}
+
+// MULTI_FIXTURE_TEST_CASE(ConnectionLimitTest, F, ThreadedFixture, GlibFixture)
// {
// unsigned oldLimit = ipc::getMaxFDNumber();
// ipc::setMaxFDNumber(50);
// // Setup Service and many Clients
-// Service s(SOCKET_PATH);
+// Service s(F::getPoll(), SOCKET_PATH);
// s.setMethodHandler(1, echoCallback);
// s.start();
// std::list clients;
// for (int i = 0; i < 100; ++i) {
// try {
-// clients.push_back(Client(SOCKET_PATH));
+// clients.push_back(Client(F::getPoll(), SOCKET_PATH));
// clients.back().start();
// } catch (...) {}
// }
diff --git a/tests/unit_tests/ut.hpp b/tests/unit_tests/ut.hpp
index d8b4b94..e96d363 100644
--- a/tests/unit_tests/ut.hpp
+++ b/tests/unit_tests/ut.hpp
@@ -28,9 +28,23 @@
#define BOOST_TEST_DYN_LINK
#include
+#include
+
#include
/**
+ * Usage example:
+ *
+ * MULTI_FIXTURE_TEST_CASE(Test, T, Fixture1, Fixture2, Fixture3) {
+ * std::cout << T::i << "\n";
+ * }
+ */
+#define MULTI_FIXTURE_TEST_CASE(NAME, TPARAM, ...) \
+ typedef boost::mpl::vector<__VA_ARGS__> NAME##_fixtures; \
+ BOOST_FIXTURE_TEST_CASE_TEMPLATE(NAME, TPARAM, NAME##_fixtures, TPARAM)
+
+
+/**
* An exception message checker
*
* Usage example: