{
if (!mCallbacks.empty()) {
LOGW("Not removed callbacks: " << mCallbacks.size());
+ for (const auto& item : mCallbacks) {
+ LOGT("Not removed fd: " << item.first);
+ }
assert(0 && "Not removed callbacks left");
}
utils::close(mPollFD);
}
mCallbacks.insert({fd, std::make_shared<Callback>(std::move(callback))});
- LOGT("Callback added for " << fd);
+ LOGT("Callback added for fd: " << fd);
}
void EventPoll::removeFD(const int fd)
}
mCallbacks.erase(iter);
removeFDInternal(fd);
- LOGT("Callback removed for " << fd);
+ LOGT("Callback removed for fd: " << fd);
}
bool EventPoll::dispatchIteration(const int timeoutMs)
// add ref because removeFD(self) can be called inside callback
std::shared_ptr<Callback> callback(iter->second);
- return (*callback)(event.data.fd, event.events);
+ try {
+ LOGT("Dispatch fd: " << event.data.fd << ", events: " << eventsToString(event.events));
+ return (*callback)(event.data.fd, event.events);
+ } catch (std::exception& e) {
+ LOGE("Got unexpected exception: " << e.what());
+ assert(0 && "Callback should not throw any exceptions");
+ }
}
}
*/
#include "config.hpp"
-#include "epoll/glib-poll-dispatcher.hpp"
+#include "epoll/glib-dispatcher.hpp"
#include "utils/callback-wrapper.hpp"
namespace vasum {
namespace epoll {
-GlibPollDispatcher::GlibPollDispatcher()
+GlibDispatcher::GlibDispatcher()
{
mChannel = g_io_channel_unix_new(mPoll.getPollFD());
&utils::deleteCallbackWrapper<decltype(dispatchCallback)>);
}
-GlibPollDispatcher::~GlibPollDispatcher()
+GlibDispatcher::~GlibDispatcher()
{
g_source_remove(mWatchId);
g_io_channel_unref(mChannel);
// mGuard destructor will wait for full unregister of dispatchCallback
}
-EventPoll& GlibPollDispatcher::getPoll()
+EventPoll& GlibDispatcher::getPoll()
{
return mPoll;
}
* @brief glib epoll dispatcher
*/
-#ifndef COMMON_EPOLL_GLIB_POLL_DISPATCHER_HPP
-#define COMMON_EPOLL_GLIB_POLL_DISPATCHER_HPP
+#ifndef COMMON_EPOLL_GLIB_DISPATCHER_HPP
+#define COMMON_EPOLL_GLIB_DISPATCHER_HPP
#include "epoll/event-poll.hpp"
#include "utils/callback-guard.hpp"
/**
* Will dispatch poll events in glib thread
*/
-class GlibPollDispatcher {
+class GlibDispatcher {
public:
- GlibPollDispatcher();
- ~GlibPollDispatcher();
+ GlibDispatcher();
+ ~GlibDispatcher();
EventPoll& getPoll();
private:
} // namespace epoll
} // namespace vasum
-#endif // COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP
+#endif // COMMON_UTILS_GLIB_DISPATCHER_HPP
*/
#include "config.hpp"
-#include "epoll/thread-poll-dispatcher.hpp"
+#include "epoll/thread-dispatcher.hpp"
namespace vasum {
namespace epoll {
-ThreadPollDispatcher::ThreadPollDispatcher()
+ThreadDispatcher::ThreadDispatcher()
{
auto controlCallback = [this](int, Events) -> bool {
mStopEvent.receive();
});
}
-ThreadPollDispatcher::~ThreadPollDispatcher()
+ThreadDispatcher::~ThreadDispatcher()
{
mStopEvent.send();
mThread.join();
mPoll.removeFD(mStopEvent.getFD());
}
-EventPoll& ThreadPollDispatcher::getPoll()
+EventPoll& ThreadDispatcher::getPoll()
{
return mPoll;
}
* @brief Thread epoll dispatcher
*/
-#ifndef COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP
-#define COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP
+#ifndef COMMON_EPOLL_THREAD_DISPATCHER_HPP
+#define COMMON_EPOLL_THREAD_DISPATCHER_HPP
#include "epoll/event-poll.hpp"
#include "utils/eventfd.hpp"
/**
* Will dispatch poll events in a newly created thread
*/
-class ThreadPollDispatcher {
+class ThreadDispatcher {
public:
- ThreadPollDispatcher();
- ~ThreadPollDispatcher();
+ ThreadDispatcher();
+ ~ThreadDispatcher();
EventPoll& getPoll();
private:
} // namespace epoll
} // namespace vasum
-#endif // COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP
+#endif // COMMON_EPOLL_THREAD_DISPATCHER_HPP
namespace vasum {
namespace ipc {
-Client::Client(const std::string& socketPath)
- : mProcessor("[CLIENT] "),
+Client::Client(epoll::EventPoll& eventPoll, const std::string& socketPath)
+ : mEventPoll(eventPoll),
+ mProcessor("[CLIENT] "),
mSocketPath(socketPath)
{
LOGS("Client Constructor");
}
}
-void Client::start(const bool usesExternalPolling)
+void Client::start()
{
+ if (mProcessor.isStarted()) {
+ return;
+ }
LOGS("Client start");
// Initialize the connection with the server
- if (usesExternalPolling) {
- startPoll();
- }
- mProcessor.start(usesExternalPolling);
+ auto handleEvent = [&](int, epoll::Events) -> bool {
+ mProcessor.handleEvent();
+ return true;
+ };
+ mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleEvent);
+ mProcessor.start();
LOGD("Connecting to " + mSocketPath);
auto socketPtr = std::make_shared<Socket>(Socket::connectSocket(mSocketPath));
void Client::stop()
{
+ if (!mProcessor.isStarted()) {
+ return;
+ }
LOGS("Client stop");
mProcessor.stop();
- if (mIPCGSourcePtr) {
- stopPoll();
- }
-}
-
-void Client::startPoll()
-{
- LOGS("Client startPoll");
- using namespace std::placeholders;
- mIPCGSourcePtr = IPCGSource::create(std::bind(&Client::handle, this, _1, _2));
- mIPCGSourcePtr->addFD(mProcessor.getEventFD());
- mIPCGSourcePtr->attach();
+ mEventPoll.removeFD(mProcessor.getEventFD());
}
-void Client::stopPoll()
-{
- LOGS("Client stopPoll");
-
- mIPCGSourcePtr->removeFD(mProcessor.getEventFD());
- mIPCGSourcePtr->detach();
- mIPCGSourcePtr.reset();
-}
-
-void Client::handle(const FileDescriptor fd, const short pollEvent)
+void Client::handle(const FileDescriptor fd, const epoll::Events pollEvents)
{
+ //TODO remove handle method
LOGS("Client handle");
if (!isStarted()) {
return;
}
- if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) {
- mProcessor.handleEvent();
- return;
-
- } else if (pollEvent & POLLIN) {
+ if (pollEvents & EPOLLIN) {
mProcessor.handleInput(fd);
- return;
+ }
- } else if (pollEvent & POLLHUP) {
+ if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
mProcessor.handleLostConnection(fd);
- return;
}
}
{
LOGS("Client setNewPeerCallback");
auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- if (mIPCGSourcePtr) {
- mIPCGSourcePtr->addFD(fd);
- }
+ auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+ handle(fd, events);
+ return true;
+ };
+ mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
if (newPeerCallback) {
newPeerCallback(peerID, fd);
}
{
LOGS("Client setRemovedPeerCallback");
auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- if (mIPCGSourcePtr) {
- mIPCGSourcePtr->removeFD(fd);
- }
+ mEventPoll.removeFD(fd);
if (removedPeerCallback) {
removedPeerCallback(peerID, fd);
}
#define COMMON_IPC_CLIENT_HPP
#include "ipc/internals/processor.hpp"
-#include "ipc/ipc-gsource.hpp"
#include "ipc/types.hpp"
#include "ipc/result.hpp"
+#include "epoll/event-poll.hpp"
#include "logger/logger.hpp"
#include <string>
* This class wraps communication via UX sockets for client applications.
* It uses serialization mechanism from libConfig.
*
- * There is one additional thread:
- * - PROCESSOR is responsible for the communication and calling the callbacks
- *
* For message format @see ipc::Processor
*/
class Client {
public:
/**
+ * @param eventPoll event poll
* @param serverPath path to the server's socket
*/
- Client(const std::string& serverPath);
+ Client(epoll::EventPoll& eventPoll, const std::string& serverPath);
~Client();
Client(const Client&) = delete;
Client& operator=(const Client&) = delete;
/**
- * Starts the worker thread
- *
- * @param usesExternalPolling internal or external polling is used
+ * Starts processing
*/
- void start(const bool usesExternalPolling = false);
+ void start();
/**
* @return is the communication thread running
bool isStarted();
/**
- * Stops all worker thread
+ * Stops processing
*/
void stop();
/**
- * Used with an external polling loop.
- * Handles one event from the file descriptor.
- *
- * @param fd file descriptor
- * @param pollEvent event on the fd. Defined in poll.h
- *
- */
- void handle(const FileDescriptor fd, const short pollEvent);
-
- /**
* Set the callback called for each new connection to a peer
*
* @param newPeerCallback the callback
const std::shared_ptr<SentDataType>& data);
private:
-
- void startPoll();
- void stopPoll();
-
+ epoll::EventPoll& mEventPoll;
PeerID mServiceID;
Processor mProcessor;
std::string mSocketPath;
- IPCGSource::Pointer mIPCGSourcePtr;
+
+ void handle(const FileDescriptor fd, const epoll::Events pollEvents);
+
};
template<typename SentDataType, typename ReceivedDataType>
#include "config.hpp"
-#include "ipc/exception.hpp"
#include "ipc/internals/acceptor.hpp"
#include "logger/logger.hpp"
-#include <poll.h>
-#include <cerrno>
-#include <cstring>
-#include <vector>
-
namespace vasum {
namespace ipc {
Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback)
- : mIsRunning(false),
- mNewConnectionCallback(newConnectionCallback),
+ : mNewConnectionCallback(newConnectionCallback),
mSocket(Socket::createSocket(socketPath))
{
LOGT("Creating Acceptor for socket " << socketPath);
Acceptor::~Acceptor()
{
- LOGT("Destroying Acceptor");
- try {
- stop();
- } catch (std::exception& e) {
- LOGE("Error in destructor: " << e.what());
- }
LOGT("Destroyed Acceptor");
}
-void Acceptor::start()
-{
- LOGT("Starting Acceptor");
- if (!mThread.joinable()) {
- mThread = std::thread(&Acceptor::run, this);
- }
- LOGT("Started Acceptor");
-}
-
-void Acceptor::stop()
-{
- LOGT("Stopping Acceptor");
-
- if (mThread.joinable()) {
- mEventQueue.send(Event::FINISH);
- LOGT("Waiting for Acceptor to finish");
- mThread.join();
- }
-
- LOGT("Stopped Acceptor");
-}
-
-void Acceptor::run()
-{
- // Setup polling structure
- std::vector<struct pollfd> fds(2);
-
- fds[0].fd = mEventQueue.getFD();
- fds[0].events = POLLIN;
-
- fds[1].fd = mSocket.getFD();
- fds[1].events = POLLIN;
-
- mIsRunning = true;
- while (mIsRunning) {
- LOGT("Waiting for new connections...");
-
- int ret = ::poll(fds.data(), fds.size(), -1 /*blocking call*/);
-
- LOGT("...Incoming connection!");
-
- if (ret == -1 || ret == 0) {
- if (errno == EINTR) {
- continue;
- }
- LOGE("Error in poll: " << std::string(strerror(errno)));
- throw IPCException("Error in poll: " + std::string(strerror(errno)));
- }
-
- // Check for incoming connections
- if (fds[1].revents & POLLIN) {
- fds[1].revents = 0;
- handleConnection();
- }
-
- // Check for incoming events
- if (fds[0].revents & POLLIN) {
- fds[0].revents = 0;
- handleEvent();
- }
- }
- LOGT("Exiting run");
-}
-
void Acceptor::handleConnection()
{
std::shared_ptr<Socket> tmpSocket = mSocket.accept();
mNewConnectionCallback(tmpSocket);
}
-void Acceptor::handleEvent()
-{
- if (mEventQueue.receive() == Event::FINISH) {
- LOGD("Event FINISH");
- mIsRunning = false;
- }
-}
-
-FileDescriptor Acceptor::getEventFD()
-{
- return mEventQueue.getFD();
-}
-
FileDescriptor Acceptor::getConnectionFD()
{
return mSocket.getFD();
#include "config.hpp"
#include "ipc/internals/socket.hpp"
-#include "ipc/internals/event-queue.hpp"
#include "ipc/types.hpp"
#include <string>
-#include <thread>
namespace vasum {
namespace ipc {
Acceptor& operator=(const Acceptor&) = delete;
/**
- * Starts the thread accepting the new connections.
- */
- void start();
-
- /**
- * Stops the accepting thread.
- */
- void stop();
-
- /**
* Handle one incoming connection.
* Used with external polling
*/
void handleConnection();
/**
- * Handle one event from the internal event's queue
- * Used with external polling
- */
- void handleEvent();
-
- /**
- * @return file descriptor of internal event's queue
- */
- FileDescriptor getEventFD();
-
- /**
* @return file descriptor for the connection socket
*/
FileDescriptor getConnectionFD();
private:
- enum class Event : int {
- FINISH // Shutdown request
- };
-
- bool mIsRunning;
-
NewConnectionCallback mNewConnectionCallback;
Socket mSocket;
-
- EventQueue<Event> mEventQueue;
- std::thread mThread;
-
- void run();
};
} // namespace ipc
#include "ipc/exception.hpp"
#include "ipc/internals/processor.hpp"
-#include "utils/signal.hpp"
#include "utils/exception.hpp"
#include <cerrno>
{
LOGS(mLogPrefix + "Processor Constructor");
- utils::signalBlock(SIGPIPE);
-
using namespace std::placeholders;
setSignalHandlerInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
std::bind(&Processor::onNewSignals, this, _1, _2));
return mIsRunning;
}
-void Processor::start(bool usesExternalPolling)
+void Processor::start()
{
LOGS(mLogPrefix + "Processor start");
if (!mIsRunning) {
LOGI(mLogPrefix + "Processor start");
mIsRunning = true;
- mUsesExternalPolling = usesExternalPolling;
- if (!usesExternalPolling) {
- mThread = std::thread(&Processor::run, this);
- }
}
}
LOGD(mLogPrefix + "Waiting for the Processor to stop");
- if (mThread.joinable()) {
- mThread.join();
- } else {
- // Wait till the FINISH request is served
- Lock lock(mStateMutex);
- conditionPtr->wait(lock, [this]() {
- return !mIsRunning;
- });
- }
+ // Wait till the FINISH request is served
+ Lock lock(mStateMutex);
+ conditionPtr->wait(lock, [this]() {
+ return !mIsRunning;
+ });
+ assert(mPeerInfo.empty());
}
}
auto requestPtr = std::make_shared<AddPeerRequest>(socketPtr);
mRequestQueue.pushBack(Event::ADD_PEER, requestPtr);
- LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID);
+ LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID
+ << ", fd: " << socketPtr->getFD());
return requestPtr->peerID;
}
void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_ptr& exceptionPtr)
{
- LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
- LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
-
if (peerIt == mPeerInfo.end()) {
LOGW("Peer already removed");
return;
}
+ LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
+ LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
+
// Remove from signal addressees
for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
it->second.remove(peerIt->peerID);
mPeerInfo.erase(peerIt);
}
-void Processor::resetPolling()
-{
- LOGS(mLogPrefix + "Processor resetPolling");
-
- if (mUsesExternalPolling) {
- return;
- }
-
- // Setup polling on eventfd and sockets
- mFDs.resize(mPeerInfo.size() + 1);
- LOGI(mLogPrefix + "Reseting mFDS.size: " << mFDs.size());
-
- mFDs[0].fd = mRequestQueue.getFD();
- mFDs[0].events = POLLIN;
-
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- auto fd = mPeerInfo[i - 1].socketPtr->getFD();
-
- LOGI(mLogPrefix + "Reseting fd: " << fd);
-
- mFDs[i].fd = fd;
- mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
- // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
- }
-}
-
-void Processor::run()
-{
- LOGS(mLogPrefix + "Processor run");
-
- {
- Lock lock(mStateMutex);
- resetPolling();
- }
-
- while (isStarted()) {
- LOGT(mLogPrefix + "Waiting for communication...");
- int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
- LOGT(mLogPrefix + "... incoming communication!");
- if (ret == -1 || ret == 0) {
- if (errno == EINTR) {
- continue;
- }
- LOGE(mLogPrefix + "Error in poll: " << std::string(strerror(errno)));
- throw IPCException("Error in poll: " + std::string(strerror(errno)));
- }
-
- // Check for lost connections:
- if (handleLostConnections()) {
- // mFDs changed
- resetPolling();
- continue;
- }
-
- // Check for incoming data.
- if (handleInputs()) {
- // mFDs changed
- resetPolling();
- continue;
- }
-
- // Check for incoming events
- if (mFDs[0].revents & POLLIN) {
- mFDs[0].revents &= ~(POLLIN);
- if (handleEvent()) {
- // mFDs changed
- resetPolling();
- continue;
- }
- }
-
- }
-}
-
-bool Processor::handleLostConnections()
-{
- Lock lock(mStateMutex);
-
- bool isPeerRemoved = false;
-
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- if (mFDs[i].revents & POLLHUP) {
- auto peerIt = getPeerInfoIterator(mFDs[i].fd);
- LOGI(mLogPrefix + "Lost connection to peer: " << peerIt->peerID);
- mFDs[i].revents &= ~(POLLHUP);
- removePeerInternal(peerIt,
- std::make_exception_ptr(IPCPeerDisconnectedException()));
- isPeerRemoved = true;
- }
- }
-
- return isPeerRemoved;
-}
-
bool Processor::handleLostConnection(const FileDescriptor fd)
{
Lock lock(mStateMutex);
return true;
}
-bool Processor::handleInputs()
-{
- // Lock not needed, mFDs won't be changed by handleInput
-
- bool pollChanged = false;
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- if (mFDs[i].revents & POLLIN) {
- mFDs[i].revents &= ~(POLLIN);
- pollChanged = pollChanged || handleInput(mFDs[i].fd);
- }
- }
-
- return pollChanged;
-}
-
bool Processor::handleInput(const FileDescriptor fd)
{
LOGS(mLogPrefix + "Processor handleInput fd: " << fd);
}
}
+ // Close peers
+ while (!mPeerInfo.empty()) {
+ removePeerInternal(--mPeerInfo.end(),
+ std::make_exception_ptr(IPCClosingException()));
+ }
+
mIsRunning = false;
request.conditionPtr->notify_all();
#include "logger/logger-scope.hpp"
#include <ostream>
-#include <poll.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/**
- * Start the processing thread.
- * Quits immediately after starting the thread.
- *
- * @param usesExternalPolling internal or external polling is used
+ * Start processing.
*/
- void start(const bool usesExternalPolling);
+ void start();
/**
* @return is processor running
RequestQueue<Event> mRequestQueue;
bool mIsRunning;
- bool mUsesExternalPolling;
std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
Peers mPeerInfo;
- std::vector<struct pollfd> mFDs;
std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
unsigned int mMaxNumberOfPeers;
- std::thread mThread;
-
template<typename SentDataType, typename ReceivedDataType>
void setMethodHandlerInternal(const MethodID methodID,
const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
const PeerID peerID,
const std::shared_ptr<SentDataType>& data);
- void run();
-
// Request handlers
bool onMethodRequest(MethodRequest& request);
bool onSignalRequest(SignalRequest& request);
bool onSendResultRequest(SendResultRequest& request);
bool onFinishRequest(FinishRequest& request);
- bool handleLostConnections();
- bool handleInputs();
-
bool onReturnValue(Peers::iterator& peerIt,
const MessageID messageID);
bool onRemoteMethod(Peers::iterator& peerIt,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<SignalHandlers> signalCallbacks);
- void resetPolling();
void removePeerInternal(Peers::iterator peerIt,
const std::exception_ptr& exceptionPtr);
namespace vasum {
namespace ipc {
-Service::Service(const std::string& socketPath,
+Service::Service(epoll::EventPoll& eventPoll,
+ const std::string& socketPath,
const PeerCallback& addPeerCallback,
const PeerCallback& removePeerCallback)
- : mProcessor("[SERVICE] "),
+ : mEventPoll(eventPoll),
+ mProcessor("[SERVICE] "),
mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1))
{
}
}
-void Service::start(const bool usesExternalPolling)
+void Service::start()
{
- LOGS("Service start");
- if (usesExternalPolling) {
- startPoll();
- }
- mProcessor.start(usesExternalPolling);
-
- // There can be an incoming connection from mAcceptor before mProcessor is listening,
- // but it's OK. It will handle the connection when ready. So no need to wait for mProcessor.
- if (!usesExternalPolling) {
- mAcceptor.start();
+ if (mProcessor.isStarted()) {
+ return;
}
+ LOGS("Service start");
+ auto handleConnection = [&](int, epoll::Events) -> bool {
+ mAcceptor.handleConnection();
+ return true;
+ };
+ auto handleProcessorEvent = [&](int, epoll::Events) -> bool {
+ mProcessor.handleEvent();
+ return true;
+ };
+ mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection);
+ mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleProcessorEvent);
+ mProcessor.start();
}
bool Service::isStarted()
void Service::stop()
{
+ if (!mProcessor.isStarted()) {
+ return;
+ }
LOGS("Service stop");
- mAcceptor.stop();
mProcessor.stop();
- if (mIPCGSourcePtr) {
- stopPoll();
- }
-}
-
-void Service::startPoll()
-{
- LOGS("Service startPoll");
-
- mIPCGSourcePtr = IPCGSource::create(std::bind(&Service::handle, this, _1, _2));
- mIPCGSourcePtr->addFD(mAcceptor.getEventFD());
- mIPCGSourcePtr->addFD(mAcceptor.getConnectionFD());
- mIPCGSourcePtr->addFD(mProcessor.getEventFD());
- mIPCGSourcePtr->attach();
+ mEventPoll.removeFD(mAcceptor.getConnectionFD());
+ mEventPoll.removeFD(mProcessor.getEventFD());
}
-void Service::stopPoll()
-{
- LOGS("Service stopPoll");
-
- mIPCGSourcePtr->removeFD(mAcceptor.getEventFD());
- mIPCGSourcePtr->removeFD(mAcceptor.getConnectionFD());
- mIPCGSourcePtr->removeFD(mProcessor.getEventFD());
- mIPCGSourcePtr->detach();
- mIPCGSourcePtr.reset();
-}
-
-void Service::handle(const FileDescriptor fd, const short pollEvent)
+void Service::handle(const FileDescriptor fd, const epoll::Events pollEvents)
{
+ //TODO remove handle method
LOGS("Service handle");
if (!isStarted()) {
return;
}
- if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) {
- mProcessor.handleEvent();
- return;
-
- } else if (fd == mAcceptor.getConnectionFD() && (pollEvent & POLLIN)) {
- mAcceptor.handleConnection();
- return;
-
- } else if (fd == mAcceptor.getEventFD() && (pollEvent & POLLIN)) {
- mAcceptor.handleEvent();
- return;
-
- } else if (pollEvent & POLLIN) {
+ if (pollEvents & EPOLLIN) {
mProcessor.handleInput(fd);
- return;
+ }
- } else if (pollEvent & POLLHUP) {
+ if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) {
mProcessor.handleLostConnection(fd);
- return;
}
}
{
LOGS("Service setNewPeerCallback");
auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- if (mIPCGSourcePtr) {
- mIPCGSourcePtr->addFD(fd);
- }
+ auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool {
+ handle(fd, events);
+ return true;
+ };
+ mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd);
if (newPeerCallback) {
newPeerCallback(peerID, fd);
}
{
LOGS("Service setRemovedPeerCallback");
auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
- if (mIPCGSourcePtr) {
- mIPCGSourcePtr->removeFD(fd);
- }
+ mEventPoll.removeFD(fd);
if (removedPeerCallback) {
removedPeerCallback(peerID, fd);
}
#include "ipc/internals/processor.hpp"
#include "ipc/internals/acceptor.hpp"
-#include "ipc/ipc-gsource.hpp"
#include "ipc/types.hpp"
#include "ipc/result.hpp"
+#include "epoll/event-poll.hpp"
#include "logger/logger.hpp"
#include <string>
* This class wraps communication via UX sockets.
* It uses serialization mechanism from libConfig.
*
- * There are two working threads:
- * - ACCEPTOR accepts incoming connections and passes them to PROCESSOR
- * - PROCESSOR is responsible for the communication and calling the callbacks
- *
* For message format @see ipc::Processor
*/
class Service {
public:
/**
+ * @param eventPoll event poll
* @param path path to the socket
*/
- Service(const std::string& path,
+ Service(epoll::EventPoll& eventPoll,
+ const std::string& path,
const PeerCallback& addPeerCallback = nullptr,
const PeerCallback& removePeerCallback = nullptr);
~Service();
Service& operator=(const Service&) = delete;
/**
- * Starts the worker and acceptor threads
- *
- * @param usesExternalPolling internal or external polling is used
+ * Starts processing
*/
- void start(const bool usesExternalPolling = false);
+ void start();
/**
* @return is the communication thread running
void stop();
/**
- * Used with an external polling loop.
- * Handles one event from the file descriptor.
- *
- * @param fd file descriptor
- * @param pollEvent event on the fd. Defined in poll.h
- *
- */
- void handle(const FileDescriptor fd, const short pollEvent);
-
- /**
* Set the callback called for each new connection to a peer
*
* @param newPeerCallback the callback
void signal(const MethodID methodID,
const std::shared_ptr<SentDataType>& data);
private:
-
- void startPoll();
- void stopPoll();
-
- typedef std::lock_guard<std::mutex> Lock;
+ epoll::EventPoll& mEventPoll;
Processor mProcessor;
Acceptor mAcceptor;
- IPCGSource::Pointer mIPCGSourcePtr;
+
+ void handle(const FileDescriptor fd, const epoll::Events pollEvents);
};
__indentChar = " "
def testCaseSummary(self, testSuite, testName, testResult, recLevel):
- msg = self.__indentChar * recLevel + BOLD + "{:<50}".format(testName)
+ msg = self.__indentChar * recLevel + BOLD + "{:<50} ".format(testName)
if testResult == "passed":
msg += GREEN
#include "ipc/internals/socket.hpp"
#include "utils/latch.hpp"
#include "utils/glib-loop.hpp"
-#include "epoll/glib-poll-dispatcher.hpp"
-#include "epoll/thread-poll-dispatcher.hpp"
+#include "epoll/glib-dispatcher.hpp"
+#include "epoll/thread-dispatcher.hpp"
using namespace vasum::utils;
using namespace vasum::epoll;
BOOST_AUTO_TEST_CASE(ThreadedPoll)
{
- ThreadPollDispatcher dispatcher;
+ ThreadDispatcher dispatcher;
}
BOOST_AUTO_TEST_CASE(GlibPoll)
{
ScopedGlibLoop loop;
- GlibPollDispatcher dispatcher;
+ GlibDispatcher dispatcher;
}
void doSocketTest(EventPoll& poll)
BOOST_AUTO_TEST_CASE(ThreadedPollSocket)
{
- ThreadPollDispatcher dispatcher;
+ ThreadDispatcher dispatcher;
doSocketTest(dispatcher.getPoll());
}
{
ScopedGlibLoop loop;
- GlibPollDispatcher dispatcher;
+ GlibDispatcher dispatcher;
doSocketTest(dispatcher.getPoll());
}
BOOST_AUTO_TEST_CASE(PollStacking)
{
- ThreadPollDispatcher dispatcher;
+ ThreadDispatcher dispatcher;
EventPoll innerPoll;
#include "ipc/client.hpp"
#include "ipc/types.hpp"
#include "ipc/result.hpp"
+#include "epoll/thread-dispatcher.hpp"
+#include "epoll/glib-dispatcher.hpp"
#include "utils/glib-loop.hpp"
#include "utils/latch.hpp"
#include "utils/value-latch.hpp"
using namespace vasum;
using namespace vasum::ipc;
+using namespace vasum::epoll;
using namespace vasum::utils;
using namespace std::placeholders;
-namespace {
-
// Timeout for sending one message
const int TIMEOUT = 1000 /*ms*/;
const std::string TEST_DIR = "/tmp/ut-ipc";
const std::string SOCKET_PATH = TEST_DIR + "/test.socket";
-struct Fixture {
+struct FixtureBase {
ScopedDir mTestPathGuard;
- Fixture()
+ FixtureBase()
: mTestPathGuard(TEST_DIR)
{
}
};
+struct ThreadedFixture : FixtureBase {
+ ThreadDispatcher dispatcher;
+
+ EventPoll& getPoll() { return dispatcher.getPoll(); }
+};
+
+struct GlibFixture : FixtureBase {
+ ScopedGlibLoop glibLoop;
+ GlibDispatcher dispatcher;
+
+ EventPoll& getPoll() { return dispatcher.getPoll(); }
+};
+
struct SendData {
int intVal;
SendData(int i): intVal(i) {}
methodResult->set(returnData);
}
-PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientGlib = false)
+PeerID connect(Service& s, Client& c)
{
// Connects the Client to the Service and returns Clients PeerID
ValueLatch<PeerID> peerIDLatch;
s.setNewPeerCallback(newPeerCallback);
if (!s.isStarted()) {
- s.start(isServiceGlib);
+ s.start();
}
- c.start(isClientGlib);
+ c.start();
PeerID peerID = peerIDLatch.get(TIMEOUT);
s.setNewPeerCallback(nullptr);
return peerID;
}
-PeerID connectServiceGSource(Service& s, Client& c)
-{
- return connect(s, c, true, false);
-}
-
-PeerID connectClientGSource(Service& s, Client& c)
-{
- return connect(s, c, false, true);
-}
-
void testEcho(Client& c, const MethodID methodID)
{
std::shared_ptr<SendData> sentData(new SendData(34));
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-} // namespace
-
-
-BOOST_FIXTURE_TEST_SUITE(IPCSuite, Fixture)
+BOOST_AUTO_TEST_SUITE(IPCSuite)
-BOOST_AUTO_TEST_CASE(ConstructorDestructor)
+MULTI_FIXTURE_TEST_CASE(ConstructorDestructor, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
}
-BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod)
+MULTI_FIXTURE_TEST_CASE(ServiceAddRemoveMethod, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<EmptyData, EmptyData>(1, returnEmptyCallback);
s.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
s.setMethodHandler<SendData, RecvData>(1, echoCallback);
s.setMethodHandler<SendData, RecvData>(2, returnDataCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
testEcho(c, 1);
BOOST_CHECK_THROW(testEcho(c, 2), IPCException);
}
-BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod)
+MULTI_FIXTURE_TEST_CASE(ClientAddRemoveMethod, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler<EmptyData, EmptyData>(1, returnEmptyCallback);
c.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException);
}
-BOOST_AUTO_TEST_CASE(ServiceStartStop)
+MULTI_FIXTURE_TEST_CASE(ServiceStartStop, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
s.start();
}
-BOOST_AUTO_TEST_CASE(ClientStartStop)
+MULTI_FIXTURE_TEST_CASE(ClientStartStop, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
c.start();
c.stop();
}
-BOOST_AUTO_TEST_CASE(SyncClientToServiceEcho)
+MULTI_FIXTURE_TEST_CASE(SyncClientToServiceEcho, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(1, echoCallback);
s.setMethodHandler<SendData, RecvData>(2, echoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
testEcho(c, 1);
testEcho(c, 2);
}
-BOOST_AUTO_TEST_CASE(Restart)
+MULTI_FIXTURE_TEST_CASE(Restart, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(1, echoCallback);
s.start();
s.setMethodHandler<SendData, RecvData>(2, echoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
testEcho(c, 1);
testEcho(c, 2);
s.stop();
s.start();
+ BOOST_CHECK_THROW(testEcho(c, 2), IPCException);
+
+ c.stop();
+ c.start();
+
testEcho(c, 1);
testEcho(c, 2);
}
-BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho)
+MULTI_FIXTURE_TEST_CASE(SyncServiceToClientEcho, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
PeerID peerID = connect(s, c);
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho)
+MULTI_FIXTURE_TEST_CASE(AsyncClientToServiceEcho, F, ThreadedFixture, GlibFixture)
{
std::shared_ptr<SendData> sentData(new SendData(34));
ValueLatch<std::shared_ptr<RecvData>> recvDataLatch;
// Setup Service and Client
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(1, echoCallback);
s.start();
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
//Async call
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
+MULTI_FIXTURE_TEST_CASE(AsyncServiceToClientEcho, F, ThreadedFixture, GlibFixture)
{
std::shared_ptr<SendData> sentData(new SendData(56));
ValueLatch<std::shared_ptr<RecvData>> recvDataLatch;
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
PeerID peerID = connect(s, c);
}
-BOOST_AUTO_TEST_CASE(SyncTimeout)
+MULTI_FIXTURE_TEST_CASE(SyncTimeout, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(1, longEchoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
std::shared_ptr<SendData> sentData(new SendData(78));
BOOST_REQUIRE_THROW((c.callSync<SendData, RecvData>(1, sentData, TIMEOUT)), IPCException);
}
-BOOST_AUTO_TEST_CASE(SerializationError)
+MULTI_FIXTURE_TEST_CASE(SerializationError, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(1, echoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
std::shared_ptr<ThrowOnAcceptData> throwingData(new ThrowOnAcceptData());
}
-BOOST_AUTO_TEST_CASE(ParseError)
+MULTI_FIXTURE_TEST_CASE(ParseError, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(1, echoCallback);
s.start();
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
std::shared_ptr<SendData> sentData(new SendData(78));
BOOST_CHECK_THROW((c.callSync<SendData, ThrowOnAcceptData>(1, sentData, 10000)), IPCParsingException);
}
-BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
+MULTI_FIXTURE_TEST_CASE(DisconnectedPeerError, F, ThreadedFixture, GlibFixture)
{
ValueLatch<Result<RecvData>> retStatusLatch;
- Service s(SOCKET_PATH);
+
+ Service s(F::getPoll(), SOCKET_PATH);
auto method = [](const PeerID, std::shared_ptr<ThrowOnAcceptData>&, MethodResult::Pointer methodResult) {
auto resultData = std::make_shared<SendData>(1);
s.setMethodHandler<SendData, ThrowOnAcceptData>(1, method);
s.start();
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
auto dataBack = [&retStatusLatch](Result<RecvData> && r) {
}
-BOOST_AUTO_TEST_CASE(ReadTimeout)
+MULTI_FIXTURE_TEST_CASE(ReadTimeout, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
auto longEchoCallback = [](const PeerID, std::shared_ptr<RecvData>& data, MethodResult::Pointer methodResult) {
auto resultData = std::make_shared<LongSendData>(data->intVal, LONG_OPERATION_TIME);
methodResult->set<LongSendData>(resultData);
};
s.setMethodHandler<LongSendData, RecvData>(1, longEchoCallback);
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
// Test timeout on read
}
-BOOST_AUTO_TEST_CASE(WriteTimeout)
+MULTI_FIXTURE_TEST_CASE(WriteTimeout, F, ThreadedFixture, GlibFixture)
{
- Service s(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
s.setMethodHandler<SendData, RecvData>(1, echoCallback);
s.start();
- Client c(SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
c.start();
// Test echo with a minimal timeout
}
-BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
+MULTI_FIXTURE_TEST_CASE(AddSignalInRuntime, F, ThreadedFixture, GlibFixture)
{
ValueLatch<std::shared_ptr<RecvData>> recvDataLatchA;
ValueLatch<std::shared_ptr<RecvData>> recvDataLatchB;
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
connect(s, c);
auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr<RecvData>& data) {
}
-BOOST_AUTO_TEST_CASE(AddSignalOffline)
+MULTI_FIXTURE_TEST_CASE(AddSignalOffline, F, ThreadedFixture, GlibFixture)
{
ValueLatch<std::shared_ptr<RecvData>> recvDataLatchA;
ValueLatch<std::shared_ptr<RecvData>> recvDataLatchB;
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr<RecvData>& data) {
recvDataLatchA.set(data);
BOOST_CHECK_EQUAL(recvDataB->intVal, sendDataB->intVal);
}
-
-BOOST_AUTO_TEST_CASE(ServiceGSource)
-{
- utils::Latch l;
- ScopedGlibLoop loop;
-
- auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
- l.set();
- };
-
- Service s(SOCKET_PATH);
- s.setMethodHandler<SendData, RecvData>(1, echoCallback);
-
- Client c(SOCKET_PATH);
- s.setSignalHandler<RecvData>(2, signalHandler);
-
- connectServiceGSource(s, c);
-
- testEcho(c, 1);
-
- auto data = std::make_shared<SendData>(1);
- c.signal<SendData>(2, data);
-
- BOOST_CHECK(l.wait(TIMEOUT));
-}
-
-
-BOOST_AUTO_TEST_CASE(ClientGSource)
-{
- utils::Latch l;
- ScopedGlibLoop loop;
-
- auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
- l.set();
- };
-
- Service s(SOCKET_PATH);
- s.start();
-
- Client c(SOCKET_PATH);
- c.setMethodHandler<SendData, RecvData>(1, echoCallback);
- c.setSignalHandler<RecvData>(2, signalHandler);
-
- PeerID peerID = connectClientGSource(s, c);
-
- testEcho(s, 1, peerID);
-
- auto data = std::make_shared<SendData>(1);
- s.signal<SendData>(2, data);
-
- BOOST_CHECK(l.wait(TIMEOUT));
-}
-
-BOOST_AUTO_TEST_CASE(UsersError)
+MULTI_FIXTURE_TEST_CASE(UsersError, F, ThreadedFixture, GlibFixture)
{
const int TEST_ERROR_CODE = -234;
const std::string TEST_ERROR_MESSAGE = "Ay, caramba!";
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
auto clientID = connect(s, c);
auto throwingMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer) {
BOOST_CHECK_EXCEPTION((s.callSync<SendData, RecvData>(2, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData);
}
-BOOST_AUTO_TEST_CASE(AsyncResult)
+MULTI_FIXTURE_TEST_CASE(AsyncResult, F, ThreadedFixture, GlibFixture)
{
const int TEST_ERROR_CODE = -567;
const std::string TEST_ERROR_MESSAGE = "Ooo jooo!";
- Service s(SOCKET_PATH);
- Client c(SOCKET_PATH);
+ Service s(F::getPoll(), SOCKET_PATH);
+ Client c(F::getPoll(), SOCKET_PATH);
auto clientID = connect(s, c);
auto errorMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer methodResult) {
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-// BOOST_AUTO_TEST_CASE(ConnectionLimitTest)
+MULTI_FIXTURE_TEST_CASE(MixOperations, F, ThreadedFixture, GlibFixture)
+{
+ utils::Latch l;
+
+ auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
+ l.set();
+ };
+
+ Service s(F::getPoll(), SOCKET_PATH);
+ s.setMethodHandler<SendData, RecvData>(1, echoCallback);
+
+ Client c(F::getPoll(), SOCKET_PATH);
+ s.setSignalHandler<RecvData>(2, signalHandler);
+
+ connect(s, c);
+
+ testEcho(c, 1);
+
+ auto data = std::make_shared<SendData>(1);
+ c.signal<SendData>(2, data);
+
+ BOOST_CHECK(l.wait(TIMEOUT));
+}
+
+// MULTI_FIXTURE_TEST_CASE(ConnectionLimitTest, F, ThreadedFixture, GlibFixture)
// {
// unsigned oldLimit = ipc::getMaxFDNumber();
// ipc::setMaxFDNumber(50);
// // Setup Service and many Clients
-// Service s(SOCKET_PATH);
+// Service s(F::getPoll(), SOCKET_PATH);
// s.setMethodHandler<SendData, RecvData>(1, echoCallback);
// s.start();
// std::list<Client> clients;
// for (int i = 0; i < 100; ++i) {
// try {
-// clients.push_back(Client(SOCKET_PATH));
+// clients.push_back(Client(F::getPoll(), SOCKET_PATH));
// clients.back().start();
// } catch (...) {}
// }
#define BOOST_TEST_DYN_LINK
#include <boost/test/unit_test.hpp>
+#include <boost/mpl/vector.hpp>
+
#include <string>
/**
+ * Usage example:
+ *
+ * MULTI_FIXTURE_TEST_CASE(Test, T, Fixture1, Fixture2, Fixture3) {
+ * std::cout << T::i << "\n";
+ * }
+ */
+#define MULTI_FIXTURE_TEST_CASE(NAME, TPARAM, ...) \
+ typedef boost::mpl::vector<__VA_ARGS__> NAME##_fixtures; \
+ BOOST_FIXTURE_TEST_CASE_TEMPLATE(NAME, TPARAM, NAME##_fixtures, TPARAM)
+
+
+/**
* An exception message checker
*
* Usage example: