From: Piotr Bartosiewicz Date: Tue, 10 Mar 2015 11:01:48 +0000 (+0100) Subject: IPC works on epoll X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=99314f1553605b796e6ae5b90466113f98e79360;p=platform%2Fcore%2Fsecurity%2Fvasum.git IPC works on epoll [Bug/Feature] N/A [Cause] N/A [Solution] N/A [Verification] Run tests Change-Id: I6f6b5a7a70cecbecbdf0c502d0f8618577892a48 --- diff --git a/common/epoll/event-poll.cpp b/common/epoll/event-poll.cpp index dc1c1bc..9eb9fa0 100644 --- a/common/epoll/event-poll.cpp +++ b/common/epoll/event-poll.cpp @@ -49,6 +49,9 @@ EventPoll::~EventPoll() { if (!mCallbacks.empty()) { LOGW("Not removed callbacks: " << mCallbacks.size()); + for (const auto& item : mCallbacks) { + LOGT("Not removed fd: " << item.first); + } assert(0 && "Not removed callbacks left"); } utils::close(mPollFD); @@ -73,7 +76,7 @@ void EventPoll::addFD(const int fd, const Events events, Callback&& callback) } mCallbacks.insert({fd, std::make_shared(std::move(callback))}); - LOGT("Callback added for " << fd); + LOGT("Callback added for fd: " << fd); } void EventPoll::removeFD(const int fd) @@ -87,7 +90,7 @@ void EventPoll::removeFD(const int fd) } mCallbacks.erase(iter); removeFDInternal(fd); - LOGT("Callback removed for " << fd); + LOGT("Callback removed for fd: " << fd); } bool EventPoll::dispatchIteration(const int timeoutMs) @@ -115,7 +118,13 @@ bool EventPoll::dispatchIteration(const int timeoutMs) // add ref because removeFD(self) can be called inside callback std::shared_ptr callback(iter->second); - return (*callback)(event.data.fd, event.events); + try { + LOGT("Dispatch fd: " << event.data.fd << ", events: " << eventsToString(event.events)); + return (*callback)(event.data.fd, event.events); + } catch (std::exception& e) { + LOGE("Got unexpected exception: " << e.what()); + assert(0 && "Callback should not throw any exceptions"); + } } } diff --git a/common/epoll/glib-poll-dispatcher.cpp b/common/epoll/glib-dispatcher.cpp similarity index 91% rename from common/epoll/glib-poll-dispatcher.cpp rename to common/epoll/glib-dispatcher.cpp index cde535d..9567abb 100644 --- a/common/epoll/glib-poll-dispatcher.cpp +++ b/common/epoll/glib-dispatcher.cpp @@ -23,13 +23,13 @@ */ #include "config.hpp" -#include "epoll/glib-poll-dispatcher.hpp" +#include "epoll/glib-dispatcher.hpp" #include "utils/callback-wrapper.hpp" namespace vasum { namespace epoll { -GlibPollDispatcher::GlibPollDispatcher() +GlibDispatcher::GlibDispatcher() { mChannel = g_io_channel_unix_new(mPoll.getPollFD()); @@ -50,14 +50,14 @@ GlibPollDispatcher::GlibPollDispatcher() &utils::deleteCallbackWrapper); } -GlibPollDispatcher::~GlibPollDispatcher() +GlibDispatcher::~GlibDispatcher() { g_source_remove(mWatchId); g_io_channel_unref(mChannel); // mGuard destructor will wait for full unregister of dispatchCallback } -EventPoll& GlibPollDispatcher::getPoll() +EventPoll& GlibDispatcher::getPoll() { return mPoll; } diff --git a/common/epoll/glib-poll-dispatcher.hpp b/common/epoll/glib-dispatcher.hpp similarity index 84% rename from common/epoll/glib-poll-dispatcher.hpp rename to common/epoll/glib-dispatcher.hpp index cf300bb..0fa7b3a 100644 --- a/common/epoll/glib-poll-dispatcher.hpp +++ b/common/epoll/glib-dispatcher.hpp @@ -22,8 +22,8 @@ * @brief glib epoll dispatcher */ -#ifndef COMMON_EPOLL_GLIB_POLL_DISPATCHER_HPP -#define COMMON_EPOLL_GLIB_POLL_DISPATCHER_HPP +#ifndef COMMON_EPOLL_GLIB_DISPATCHER_HPP +#define COMMON_EPOLL_GLIB_DISPATCHER_HPP #include "epoll/event-poll.hpp" #include "utils/callback-guard.hpp" @@ -36,10 +36,10 @@ namespace epoll { /** * Will dispatch poll events in glib thread */ -class GlibPollDispatcher { +class GlibDispatcher { public: - GlibPollDispatcher(); - ~GlibPollDispatcher(); + GlibDispatcher(); + ~GlibDispatcher(); EventPoll& getPoll(); private: @@ -53,4 +53,4 @@ private: } // namespace epoll } // namespace vasum -#endif // COMMON_UTILS_GLIB_POLL_DISPATCHER_HPP +#endif // COMMON_UTILS_GLIB_DISPATCHER_HPP diff --git a/common/epoll/thread-poll-dispatcher.cpp b/common/epoll/thread-dispatcher.cpp similarity index 88% rename from common/epoll/thread-poll-dispatcher.cpp rename to common/epoll/thread-dispatcher.cpp index 797a8c4..a82cad8 100644 --- a/common/epoll/thread-poll-dispatcher.cpp +++ b/common/epoll/thread-dispatcher.cpp @@ -23,12 +23,12 @@ */ #include "config.hpp" -#include "epoll/thread-poll-dispatcher.hpp" +#include "epoll/thread-dispatcher.hpp" namespace vasum { namespace epoll { -ThreadPollDispatcher::ThreadPollDispatcher() +ThreadDispatcher::ThreadDispatcher() { auto controlCallback = [this](int, Events) -> bool { mStopEvent.receive(); @@ -41,14 +41,14 @@ ThreadPollDispatcher::ThreadPollDispatcher() }); } -ThreadPollDispatcher::~ThreadPollDispatcher() +ThreadDispatcher::~ThreadDispatcher() { mStopEvent.send(); mThread.join(); mPoll.removeFD(mStopEvent.getFD()); } -EventPoll& ThreadPollDispatcher::getPoll() +EventPoll& ThreadDispatcher::getPoll() { return mPoll; } diff --git a/common/epoll/thread-poll-dispatcher.hpp b/common/epoll/thread-dispatcher.hpp similarity index 83% rename from common/epoll/thread-poll-dispatcher.hpp rename to common/epoll/thread-dispatcher.hpp index af6b278..7d0f30d 100644 --- a/common/epoll/thread-poll-dispatcher.hpp +++ b/common/epoll/thread-dispatcher.hpp @@ -22,8 +22,8 @@ * @brief Thread epoll dispatcher */ -#ifndef COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP -#define COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP +#ifndef COMMON_EPOLL_THREAD_DISPATCHER_HPP +#define COMMON_EPOLL_THREAD_DISPATCHER_HPP #include "epoll/event-poll.hpp" #include "utils/eventfd.hpp" @@ -36,10 +36,10 @@ namespace epoll { /** * Will dispatch poll events in a newly created thread */ -class ThreadPollDispatcher { +class ThreadDispatcher { public: - ThreadPollDispatcher(); - ~ThreadPollDispatcher(); + ThreadDispatcher(); + ~ThreadDispatcher(); EventPoll& getPoll(); private: @@ -51,4 +51,4 @@ private: } // namespace epoll } // namespace vasum -#endif // COMMON_EPOLL_THREAD_POLL_DISPATCHER_HPP +#endif // COMMON_EPOLL_THREAD_DISPATCHER_HPP diff --git a/common/ipc/client.cpp b/common/ipc/client.cpp index 6669d8a..abecb71 100644 --- a/common/ipc/client.cpp +++ b/common/ipc/client.cpp @@ -31,8 +31,9 @@ namespace vasum { namespace ipc { -Client::Client(const std::string& socketPath) - : mProcessor("[CLIENT] "), +Client::Client(epoll::EventPoll& eventPoll, const std::string& socketPath) + : mEventPoll(eventPoll), + mProcessor("[CLIENT] "), mSocketPath(socketPath) { LOGS("Client Constructor"); @@ -50,14 +51,19 @@ Client::~Client() } } -void Client::start(const bool usesExternalPolling) +void Client::start() { + if (mProcessor.isStarted()) { + return; + } LOGS("Client start"); // Initialize the connection with the server - if (usesExternalPolling) { - startPoll(); - } - mProcessor.start(usesExternalPolling); + auto handleEvent = [&](int, epoll::Events) -> bool { + mProcessor.handleEvent(); + return true; + }; + mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleEvent); + mProcessor.start(); LOGD("Connecting to " + mSocketPath); auto socketPtr = std::make_shared(Socket::connectSocket(mSocketPath)); @@ -71,34 +77,18 @@ bool Client::isStarted() void Client::stop() { + if (!mProcessor.isStarted()) { + return; + } LOGS("Client stop"); mProcessor.stop(); - if (mIPCGSourcePtr) { - stopPoll(); - } -} - -void Client::startPoll() -{ - LOGS("Client startPoll"); - using namespace std::placeholders; - mIPCGSourcePtr = IPCGSource::create(std::bind(&Client::handle, this, _1, _2)); - mIPCGSourcePtr->addFD(mProcessor.getEventFD()); - mIPCGSourcePtr->attach(); + mEventPoll.removeFD(mProcessor.getEventFD()); } -void Client::stopPoll() -{ - LOGS("Client stopPoll"); - - mIPCGSourcePtr->removeFD(mProcessor.getEventFD()); - mIPCGSourcePtr->detach(); - mIPCGSourcePtr.reset(); -} - -void Client::handle(const FileDescriptor fd, const short pollEvent) +void Client::handle(const FileDescriptor fd, const epoll::Events pollEvents) { + //TODO remove handle method LOGS("Client handle"); if (!isStarted()) { @@ -106,17 +96,12 @@ void Client::handle(const FileDescriptor fd, const short pollEvent) return; } - if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) { - mProcessor.handleEvent(); - return; - - } else if (pollEvent & POLLIN) { + if (pollEvents & EPOLLIN) { mProcessor.handleInput(fd); - return; + } - } else if (pollEvent & POLLHUP) { + if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) { mProcessor.handleLostConnection(fd); - return; } } @@ -124,9 +109,11 @@ void Client::setNewPeerCallback(const PeerCallback& newPeerCallback) { LOGS("Client setNewPeerCallback"); auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) { - if (mIPCGSourcePtr) { - mIPCGSourcePtr->addFD(fd); - } + auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool { + handle(fd, events); + return true; + }; + mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd); if (newPeerCallback) { newPeerCallback(peerID, fd); } @@ -138,9 +125,7 @@ void Client::setRemovedPeerCallback(const PeerCallback& removedPeerCallback) { LOGS("Client setRemovedPeerCallback"); auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) { - if (mIPCGSourcePtr) { - mIPCGSourcePtr->removeFD(fd); - } + mEventPoll.removeFD(fd); if (removedPeerCallback) { removedPeerCallback(peerID, fd); } diff --git a/common/ipc/client.hpp b/common/ipc/client.hpp index 321b4da..a3342b6 100644 --- a/common/ipc/client.hpp +++ b/common/ipc/client.hpp @@ -26,9 +26,9 @@ #define COMMON_IPC_CLIENT_HPP #include "ipc/internals/processor.hpp" -#include "ipc/ipc-gsource.hpp" #include "ipc/types.hpp" #include "ipc/result.hpp" +#include "epoll/event-poll.hpp" #include "logger/logger.hpp" #include @@ -40,28 +40,24 @@ namespace ipc { * This class wraps communication via UX sockets for client applications. * It uses serialization mechanism from libConfig. * - * There is one additional thread: - * - PROCESSOR is responsible for the communication and calling the callbacks - * * For message format @see ipc::Processor */ class Client { public: /** + * @param eventPoll event poll * @param serverPath path to the server's socket */ - Client(const std::string& serverPath); + Client(epoll::EventPoll& eventPoll, const std::string& serverPath); ~Client(); Client(const Client&) = delete; Client& operator=(const Client&) = delete; /** - * Starts the worker thread - * - * @param usesExternalPolling internal or external polling is used + * Starts processing */ - void start(const bool usesExternalPolling = false); + void start(); /** * @return is the communication thread running @@ -69,21 +65,11 @@ public: bool isStarted(); /** - * Stops all worker thread + * Stops processing */ void stop(); /** - * Used with an external polling loop. - * Handles one event from the file descriptor. - * - * @param fd file descriptor - * @param pollEvent event on the fd. Defined in poll.h - * - */ - void handle(const FileDescriptor fd, const short pollEvent); - - /** * Set the callback called for each new connection to a peer * * @param newPeerCallback the callback @@ -170,14 +156,13 @@ public: const std::shared_ptr& data); private: - - void startPoll(); - void stopPoll(); - + epoll::EventPoll& mEventPoll; PeerID mServiceID; Processor mProcessor; std::string mSocketPath; - IPCGSource::Pointer mIPCGSourcePtr; + + void handle(const FileDescriptor fd, const epoll::Events pollEvents); + }; template diff --git a/common/ipc/internals/acceptor.cpp b/common/ipc/internals/acceptor.cpp index ecb2210..15b9ad8 100644 --- a/common/ipc/internals/acceptor.cpp +++ b/common/ipc/internals/acceptor.cpp @@ -24,21 +24,14 @@ #include "config.hpp" -#include "ipc/exception.hpp" #include "ipc/internals/acceptor.hpp" #include "logger/logger.hpp" -#include -#include -#include -#include - namespace vasum { namespace ipc { Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback) - : mIsRunning(false), - mNewConnectionCallback(newConnectionCallback), + : mNewConnectionCallback(newConnectionCallback), mSocket(Socket::createSocket(socketPath)) { LOGT("Creating Acceptor for socket " << socketPath); @@ -46,98 +39,15 @@ Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& n Acceptor::~Acceptor() { - LOGT("Destroying Acceptor"); - try { - stop(); - } catch (std::exception& e) { - LOGE("Error in destructor: " << e.what()); - } LOGT("Destroyed Acceptor"); } -void Acceptor::start() -{ - LOGT("Starting Acceptor"); - if (!mThread.joinable()) { - mThread = std::thread(&Acceptor::run, this); - } - LOGT("Started Acceptor"); -} - -void Acceptor::stop() -{ - LOGT("Stopping Acceptor"); - - if (mThread.joinable()) { - mEventQueue.send(Event::FINISH); - LOGT("Waiting for Acceptor to finish"); - mThread.join(); - } - - LOGT("Stopped Acceptor"); -} - -void Acceptor::run() -{ - // Setup polling structure - std::vector fds(2); - - fds[0].fd = mEventQueue.getFD(); - fds[0].events = POLLIN; - - fds[1].fd = mSocket.getFD(); - fds[1].events = POLLIN; - - mIsRunning = true; - while (mIsRunning) { - LOGT("Waiting for new connections..."); - - int ret = ::poll(fds.data(), fds.size(), -1 /*blocking call*/); - - LOGT("...Incoming connection!"); - - if (ret == -1 || ret == 0) { - if (errno == EINTR) { - continue; - } - LOGE("Error in poll: " << std::string(strerror(errno))); - throw IPCException("Error in poll: " + std::string(strerror(errno))); - } - - // Check for incoming connections - if (fds[1].revents & POLLIN) { - fds[1].revents = 0; - handleConnection(); - } - - // Check for incoming events - if (fds[0].revents & POLLIN) { - fds[0].revents = 0; - handleEvent(); - } - } - LOGT("Exiting run"); -} - void Acceptor::handleConnection() { std::shared_ptr tmpSocket = mSocket.accept(); mNewConnectionCallback(tmpSocket); } -void Acceptor::handleEvent() -{ - if (mEventQueue.receive() == Event::FINISH) { - LOGD("Event FINISH"); - mIsRunning = false; - } -} - -FileDescriptor Acceptor::getEventFD() -{ - return mEventQueue.getFD(); -} - FileDescriptor Acceptor::getConnectionFD() { return mSocket.getFD(); diff --git a/common/ipc/internals/acceptor.hpp b/common/ipc/internals/acceptor.hpp index f87a0bb..a6e9ec8 100644 --- a/common/ipc/internals/acceptor.hpp +++ b/common/ipc/internals/acceptor.hpp @@ -28,11 +28,9 @@ #include "config.hpp" #include "ipc/internals/socket.hpp" -#include "ipc/internals/event-queue.hpp" #include "ipc/types.hpp" #include -#include namespace vasum { namespace ipc { @@ -59,51 +57,19 @@ public: Acceptor& operator=(const Acceptor&) = delete; /** - * Starts the thread accepting the new connections. - */ - void start(); - - /** - * Stops the accepting thread. - */ - void stop(); - - /** * Handle one incoming connection. * Used with external polling */ void handleConnection(); /** - * Handle one event from the internal event's queue - * Used with external polling - */ - void handleEvent(); - - /** - * @return file descriptor of internal event's queue - */ - FileDescriptor getEventFD(); - - /** * @return file descriptor for the connection socket */ FileDescriptor getConnectionFD(); private: - enum class Event : int { - FINISH // Shutdown request - }; - - bool mIsRunning; - NewConnectionCallback mNewConnectionCallback; Socket mSocket; - - EventQueue mEventQueue; - std::thread mThread; - - void run(); }; } // namespace ipc diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index 93f721f..587723c 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -26,7 +26,6 @@ #include "ipc/exception.hpp" #include "ipc/internals/processor.hpp" -#include "utils/signal.hpp" #include "utils/exception.hpp" #include @@ -65,8 +64,6 @@ Processor::Processor(const std::string& logName, { LOGS(mLogPrefix + "Processor Constructor"); - utils::signalBlock(SIGPIPE); - using namespace std::placeholders; setSignalHandlerInternal(REGISTER_SIGNAL_METHOD_ID, std::bind(&Processor::onNewSignals, this, _1, _2)); @@ -104,7 +101,7 @@ bool Processor::isStarted() return mIsRunning; } -void Processor::start(bool usesExternalPolling) +void Processor::start() { LOGS(mLogPrefix + "Processor start"); @@ -112,10 +109,6 @@ void Processor::start(bool usesExternalPolling) if (!mIsRunning) { LOGI(mLogPrefix + "Processor start"); mIsRunning = true; - mUsesExternalPolling = usesExternalPolling; - if (!usesExternalPolling) { - mThread = std::thread(&Processor::run, this); - } } } @@ -133,15 +126,12 @@ void Processor::stop() LOGD(mLogPrefix + "Waiting for the Processor to stop"); - if (mThread.joinable()) { - mThread.join(); - } else { - // Wait till the FINISH request is served - Lock lock(mStateMutex); - conditionPtr->wait(lock, [this]() { - return !mIsRunning; - }); - } + // Wait till the FINISH request is served + Lock lock(mStateMutex); + conditionPtr->wait(lock, [this]() { + return !mIsRunning; + }); + assert(mPeerInfo.empty()); } } @@ -204,7 +194,8 @@ PeerID Processor::addPeer(const std::shared_ptr& socketPtr) auto requestPtr = std::make_shared(socketPtr); mRequestQueue.pushBack(Event::ADD_PEER, requestPtr); - LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID); + LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID + << ", fd: " << socketPtr->getFD()); return requestPtr->peerID; } @@ -233,14 +224,14 @@ void Processor::removePeerSyncInternal(const PeerID peerID, Lock& lock) void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_ptr& exceptionPtr) { - LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID); - LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID); - if (peerIt == mPeerInfo.end()) { LOGW("Peer already removed"); return; } + LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID); + LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID); + // Remove from signal addressees for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) { it->second.remove(peerIt->peerID); @@ -270,100 +261,6 @@ void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_ mPeerInfo.erase(peerIt); } -void Processor::resetPolling() -{ - LOGS(mLogPrefix + "Processor resetPolling"); - - if (mUsesExternalPolling) { - return; - } - - // Setup polling on eventfd and sockets - mFDs.resize(mPeerInfo.size() + 1); - LOGI(mLogPrefix + "Reseting mFDS.size: " << mFDs.size()); - - mFDs[0].fd = mRequestQueue.getFD(); - mFDs[0].events = POLLIN; - - for (unsigned int i = 1; i < mFDs.size(); ++i) { - auto fd = mPeerInfo[i - 1].socketPtr->getFD(); - - LOGI(mLogPrefix + "Reseting fd: " << fd); - - mFDs[i].fd = fd; - mFDs[i].events = POLLIN | POLLHUP; // Listen for input events - // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too? - } -} - -void Processor::run() -{ - LOGS(mLogPrefix + "Processor run"); - - { - Lock lock(mStateMutex); - resetPolling(); - } - - while (isStarted()) { - LOGT(mLogPrefix + "Waiting for communication..."); - int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/); - LOGT(mLogPrefix + "... incoming communication!"); - if (ret == -1 || ret == 0) { - if (errno == EINTR) { - continue; - } - LOGE(mLogPrefix + "Error in poll: " << std::string(strerror(errno))); - throw IPCException("Error in poll: " + std::string(strerror(errno))); - } - - // Check for lost connections: - if (handleLostConnections()) { - // mFDs changed - resetPolling(); - continue; - } - - // Check for incoming data. - if (handleInputs()) { - // mFDs changed - resetPolling(); - continue; - } - - // Check for incoming events - if (mFDs[0].revents & POLLIN) { - mFDs[0].revents &= ~(POLLIN); - if (handleEvent()) { - // mFDs changed - resetPolling(); - continue; - } - } - - } -} - -bool Processor::handleLostConnections() -{ - Lock lock(mStateMutex); - - bool isPeerRemoved = false; - - for (unsigned int i = 1; i < mFDs.size(); ++i) { - if (mFDs[i].revents & POLLHUP) { - auto peerIt = getPeerInfoIterator(mFDs[i].fd); - LOGI(mLogPrefix + "Lost connection to peer: " << peerIt->peerID); - mFDs[i].revents &= ~(POLLHUP); - removePeerInternal(peerIt, - std::make_exception_ptr(IPCPeerDisconnectedException())); - isPeerRemoved = true; - } - } - - return isPeerRemoved; -} - bool Processor::handleLostConnection(const FileDescriptor fd) { Lock lock(mStateMutex); @@ -373,21 +270,6 @@ bool Processor::handleLostConnection(const FileDescriptor fd) return true; } -bool Processor::handleInputs() -{ - // Lock not needed, mFDs won't be changed by handleInput - - bool pollChanged = false; - for (unsigned int i = 1; i < mFDs.size(); ++i) { - if (mFDs[i].revents & POLLIN) { - mFDs[i].revents &= ~(POLLIN); - pollChanged = pollChanged || handleInput(mFDs[i].fd); - } - } - - return pollChanged; -} - bool Processor::handleInput(const FileDescriptor fd) { LOGS(mLogPrefix + "Processor handleInput fd: " << fd); @@ -795,6 +677,12 @@ bool Processor::onFinishRequest(FinishRequest& request) } } + // Close peers + while (!mPeerInfo.empty()) { + removePeerInternal(--mPeerInfo.end(), + std::make_exception_ptr(IPCClosingException())); + } + mIsRunning = false; request.conditionPtr->notify_all(); diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp index 86a3f42..6e0ae81 100644 --- a/common/ipc/internals/processor.hpp +++ b/common/ipc/internals/processor.hpp @@ -43,7 +43,6 @@ #include "logger/logger-scope.hpp" #include -#include #include #include #include @@ -136,12 +135,9 @@ public: /** - * Start the processing thread. - * Quits immediately after starting the thread. - * - * @param usesExternalPolling internal or external polling is used + * Start processing. */ - void start(const bool usesExternalPolling); + void start(); /** * @return is processor running @@ -432,14 +428,12 @@ private: RequestQueue mRequestQueue; bool mIsRunning; - bool mUsesExternalPolling; std::unordered_map> mMethodsCallbacks; std::unordered_map> mSignalsCallbacks; std::unordered_map> mSignalsPeers; Peers mPeerInfo; - std::vector mFDs; std::unordered_map mReturnCallbacks; @@ -451,8 +445,6 @@ private: unsigned int mMaxNumberOfPeers; - std::thread mThread; - template void setMethodHandlerInternal(const MethodID methodID, const typename MethodHandler::type& process); @@ -466,8 +458,6 @@ private: const PeerID peerID, const std::shared_ptr& data); - void run(); - // Request handlers bool onMethodRequest(MethodRequest& request); bool onSignalRequest(SignalRequest& request); @@ -476,9 +466,6 @@ private: bool onSendResultRequest(SendResultRequest& request); bool onFinishRequest(FinishRequest& request); - bool handleLostConnections(); - bool handleInputs(); - bool onReturnValue(Peers::iterator& peerIt, const MessageID messageID); bool onRemoteMethod(Peers::iterator& peerIt, @@ -489,7 +476,6 @@ private: const MethodID methodID, const MessageID messageID, std::shared_ptr signalCallbacks); - void resetPolling(); void removePeerInternal(Peers::iterator peerIt, const std::exception_ptr& exceptionPtr); diff --git a/common/ipc/service.cpp b/common/ipc/service.cpp index 442d804..c436726 100644 --- a/common/ipc/service.cpp +++ b/common/ipc/service.cpp @@ -33,10 +33,12 @@ using namespace std::placeholders; namespace vasum { namespace ipc { -Service::Service(const std::string& socketPath, +Service::Service(epoll::EventPoll& eventPoll, + const std::string& socketPath, const PeerCallback& addPeerCallback, const PeerCallback& removePeerCallback) - : mProcessor("[SERVICE] "), + : mEventPoll(eventPoll), + mProcessor("[SERVICE] "), mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1)) { @@ -55,19 +57,23 @@ Service::~Service() } } -void Service::start(const bool usesExternalPolling) +void Service::start() { - LOGS("Service start"); - if (usesExternalPolling) { - startPoll(); - } - mProcessor.start(usesExternalPolling); - - // There can be an incoming connection from mAcceptor before mProcessor is listening, - // but it's OK. It will handle the connection when ready. So no need to wait for mProcessor. - if (!usesExternalPolling) { - mAcceptor.start(); + if (mProcessor.isStarted()) { + return; } + LOGS("Service start"); + auto handleConnection = [&](int, epoll::Events) -> bool { + mAcceptor.handleConnection(); + return true; + }; + auto handleProcessorEvent = [&](int, epoll::Events) -> bool { + mProcessor.handleEvent(); + return true; + }; + mEventPoll.addFD(mAcceptor.getConnectionFD(), EPOLLIN, handleConnection); + mEventPoll.addFD(mProcessor.getEventFD(), EPOLLIN, handleProcessorEvent); + mProcessor.start(); } bool Service::isStarted() @@ -77,39 +83,19 @@ bool Service::isStarted() void Service::stop() { + if (!mProcessor.isStarted()) { + return; + } LOGS("Service stop"); - mAcceptor.stop(); mProcessor.stop(); - if (mIPCGSourcePtr) { - stopPoll(); - } -} - -void Service::startPoll() -{ - LOGS("Service startPoll"); - - mIPCGSourcePtr = IPCGSource::create(std::bind(&Service::handle, this, _1, _2)); - mIPCGSourcePtr->addFD(mAcceptor.getEventFD()); - mIPCGSourcePtr->addFD(mAcceptor.getConnectionFD()); - mIPCGSourcePtr->addFD(mProcessor.getEventFD()); - mIPCGSourcePtr->attach(); + mEventPoll.removeFD(mAcceptor.getConnectionFD()); + mEventPoll.removeFD(mProcessor.getEventFD()); } -void Service::stopPoll() -{ - LOGS("Service stopPoll"); - - mIPCGSourcePtr->removeFD(mAcceptor.getEventFD()); - mIPCGSourcePtr->removeFD(mAcceptor.getConnectionFD()); - mIPCGSourcePtr->removeFD(mProcessor.getEventFD()); - mIPCGSourcePtr->detach(); - mIPCGSourcePtr.reset(); -} - -void Service::handle(const FileDescriptor fd, const short pollEvent) +void Service::handle(const FileDescriptor fd, const epoll::Events pollEvents) { + //TODO remove handle method LOGS("Service handle"); if (!isStarted()) { @@ -117,25 +103,12 @@ void Service::handle(const FileDescriptor fd, const short pollEvent) return; } - if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) { - mProcessor.handleEvent(); - return; - - } else if (fd == mAcceptor.getConnectionFD() && (pollEvent & POLLIN)) { - mAcceptor.handleConnection(); - return; - - } else if (fd == mAcceptor.getEventFD() && (pollEvent & POLLIN)) { - mAcceptor.handleEvent(); - return; - - } else if (pollEvent & POLLIN) { + if (pollEvents & EPOLLIN) { mProcessor.handleInput(fd); - return; + } - } else if (pollEvent & POLLHUP) { + if ((pollEvents & EPOLLHUP) || (pollEvents & EPOLLRDHUP)) { mProcessor.handleLostConnection(fd); - return; } } @@ -143,9 +116,11 @@ void Service::setNewPeerCallback(const PeerCallback& newPeerCallback) { LOGS("Service setNewPeerCallback"); auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) { - if (mIPCGSourcePtr) { - mIPCGSourcePtr->addFD(fd); - } + auto handleFd = [&](FileDescriptor fd, epoll::Events events) -> bool { + handle(fd, events); + return true; + }; + mEventPoll.addFD(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP, handleFd); if (newPeerCallback) { newPeerCallback(peerID, fd); } @@ -157,9 +132,7 @@ void Service::setRemovedPeerCallback(const PeerCallback& removedPeerCallback) { LOGS("Service setRemovedPeerCallback"); auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) { - if (mIPCGSourcePtr) { - mIPCGSourcePtr->removeFD(fd); - } + mEventPoll.removeFD(fd); if (removedPeerCallback) { removedPeerCallback(peerID, fd); } diff --git a/common/ipc/service.hpp b/common/ipc/service.hpp index 022c9c9..880c137 100644 --- a/common/ipc/service.hpp +++ b/common/ipc/service.hpp @@ -27,9 +27,9 @@ #include "ipc/internals/processor.hpp" #include "ipc/internals/acceptor.hpp" -#include "ipc/ipc-gsource.hpp" #include "ipc/types.hpp" #include "ipc/result.hpp" +#include "epoll/event-poll.hpp" #include "logger/logger.hpp" #include @@ -42,18 +42,16 @@ namespace ipc { * This class wraps communication via UX sockets. * It uses serialization mechanism from libConfig. * - * There are two working threads: - * - ACCEPTOR accepts incoming connections and passes them to PROCESSOR - * - PROCESSOR is responsible for the communication and calling the callbacks - * * For message format @see ipc::Processor */ class Service { public: /** + * @param eventPoll event poll * @param path path to the socket */ - Service(const std::string& path, + Service(epoll::EventPoll& eventPoll, + const std::string& path, const PeerCallback& addPeerCallback = nullptr, const PeerCallback& removePeerCallback = nullptr); ~Service(); @@ -62,11 +60,9 @@ public: Service& operator=(const Service&) = delete; /** - * Starts the worker and acceptor threads - * - * @param usesExternalPolling internal or external polling is used + * Starts processing */ - void start(const bool usesExternalPolling = false); + void start(); /** * @return is the communication thread running @@ -79,16 +75,6 @@ public: void stop(); /** - * Used with an external polling loop. - * Handles one event from the file descriptor. - * - * @param fd file descriptor - * @param pollEvent event on the fd. Defined in poll.h - * - */ - void handle(const FileDescriptor fd, const short pollEvent); - - /** * Set the callback called for each new connection to a peer * * @param newPeerCallback the callback @@ -175,14 +161,11 @@ public: void signal(const MethodID methodID, const std::shared_ptr& data); private: - - void startPoll(); - void stopPoll(); - - typedef std::lock_guard Lock; + epoll::EventPoll& mEventPoll; Processor mProcessor; Acceptor mAcceptor; - IPCGSource::Pointer mIPCGSourcePtr; + + void handle(const FileDescriptor fd, const epoll::Events pollEvents); }; diff --git a/tests/scripts/vsm_test_parser.py b/tests/scripts/vsm_test_parser.py index 41dd495..d0d4fa7 100644 --- a/tests/scripts/vsm_test_parser.py +++ b/tests/scripts/vsm_test_parser.py @@ -35,7 +35,7 @@ class Logger(object): __indentChar = " " def testCaseSummary(self, testSuite, testName, testResult, recLevel): - msg = self.__indentChar * recLevel + BOLD + "{:<50}".format(testName) + msg = self.__indentChar * recLevel + BOLD + "{:<50} ".format(testName) if testResult == "passed": msg += GREEN diff --git a/tests/unit_tests/epoll/ut-event-poll.cpp b/tests/unit_tests/epoll/ut-event-poll.cpp index 0bcbe81..00cb385 100644 --- a/tests/unit_tests/epoll/ut-event-poll.cpp +++ b/tests/unit_tests/epoll/ut-event-poll.cpp @@ -31,8 +31,8 @@ #include "ipc/internals/socket.hpp" #include "utils/latch.hpp" #include "utils/glib-loop.hpp" -#include "epoll/glib-poll-dispatcher.hpp" -#include "epoll/thread-poll-dispatcher.hpp" +#include "epoll/glib-dispatcher.hpp" +#include "epoll/thread-dispatcher.hpp" using namespace vasum::utils; using namespace vasum::epoll; @@ -54,14 +54,14 @@ BOOST_AUTO_TEST_CASE(EmptyPoll) BOOST_AUTO_TEST_CASE(ThreadedPoll) { - ThreadPollDispatcher dispatcher; + ThreadDispatcher dispatcher; } BOOST_AUTO_TEST_CASE(GlibPoll) { ScopedGlibLoop loop; - GlibPollDispatcher dispatcher; + GlibDispatcher dispatcher; } void doSocketTest(EventPoll& poll) @@ -126,7 +126,7 @@ void doSocketTest(EventPoll& poll) BOOST_AUTO_TEST_CASE(ThreadedPollSocket) { - ThreadPollDispatcher dispatcher; + ThreadDispatcher dispatcher; doSocketTest(dispatcher.getPoll()); } @@ -135,14 +135,14 @@ BOOST_AUTO_TEST_CASE(GlibPollSocket) { ScopedGlibLoop loop; - GlibPollDispatcher dispatcher; + GlibDispatcher dispatcher; doSocketTest(dispatcher.getPoll()); } BOOST_AUTO_TEST_CASE(PollStacking) { - ThreadPollDispatcher dispatcher; + ThreadDispatcher dispatcher; EventPoll innerPoll; diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp index 088f576..8f47a65 100644 --- a/tests/unit_tests/ipc/ut-ipc.cpp +++ b/tests/unit_tests/ipc/ut-ipc.cpp @@ -34,6 +34,8 @@ #include "ipc/client.hpp" #include "ipc/types.hpp" #include "ipc/result.hpp" +#include "epoll/thread-dispatcher.hpp" +#include "epoll/glib-dispatcher.hpp" #include "utils/glib-loop.hpp" #include "utils/latch.hpp" #include "utils/value-latch.hpp" @@ -51,11 +53,10 @@ using namespace vasum; using namespace vasum::ipc; +using namespace vasum::epoll; using namespace vasum::utils; using namespace std::placeholders; -namespace { - // Timeout for sending one message const int TIMEOUT = 1000 /*ms*/; @@ -68,15 +69,28 @@ const int LONG_OPERATION_TIME = 1000 + TIMEOUT; const std::string TEST_DIR = "/tmp/ut-ipc"; const std::string SOCKET_PATH = TEST_DIR + "/test.socket"; -struct Fixture { +struct FixtureBase { ScopedDir mTestPathGuard; - Fixture() + FixtureBase() : mTestPathGuard(TEST_DIR) { } }; +struct ThreadedFixture : FixtureBase { + ThreadDispatcher dispatcher; + + EventPoll& getPoll() { return dispatcher.getPoll(); } +}; + +struct GlibFixture : FixtureBase { + ScopedGlibLoop glibLoop; + GlibDispatcher dispatcher; + + EventPoll& getPoll() { return dispatcher.getPoll(); } +}; + struct SendData { int intVal; SendData(int i): intVal(i) {} @@ -167,7 +181,7 @@ void longEchoCallback(const PeerID, methodResult->set(returnData); } -PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientGlib = false) +PeerID connect(Service& s, Client& c) { // Connects the Client to the Service and returns Clients PeerID ValueLatch peerIDLatch; @@ -178,10 +192,10 @@ PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientG s.setNewPeerCallback(newPeerCallback); if (!s.isStarted()) { - s.start(isServiceGlib); + s.start(); } - c.start(isClientGlib); + c.start(); PeerID peerID = peerIDLatch.get(TIMEOUT); s.setNewPeerCallback(nullptr); @@ -189,16 +203,6 @@ PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientG return peerID; } -PeerID connectServiceGSource(Service& s, Client& c) -{ - return connect(s, c, true, false); -} - -PeerID connectClientGSource(Service& s, Client& c) -{ - return connect(s, c, false, true); -} - void testEcho(Client& c, const MethodID methodID) { std::shared_ptr sentData(new SendData(34)); @@ -215,20 +219,17 @@ void testEcho(Service& s, const MethodID methodID, const PeerID peerID) BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); } -} // namespace - - -BOOST_FIXTURE_TEST_SUITE(IPCSuite, Fixture) +BOOST_AUTO_TEST_SUITE(IPCSuite) -BOOST_AUTO_TEST_CASE(ConstructorDestructor) +MULTI_FIXTURE_TEST_CASE(ConstructorDestructor, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); - Client c(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); } -BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod) +MULTI_FIXTURE_TEST_CASE(ServiceAddRemoveMethod, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); s.setMethodHandler(1, returnEmptyCallback); s.setMethodHandler(1, returnDataCallback); @@ -237,7 +238,7 @@ BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod) s.setMethodHandler(1, echoCallback); s.setMethodHandler(2, returnDataCallback); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); connect(s, c); testEcho(c, 1); @@ -247,10 +248,10 @@ BOOST_AUTO_TEST_CASE(ServiceAddRemoveMethod) BOOST_CHECK_THROW(testEcho(c, 2), IPCException); } -BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod) +MULTI_FIXTURE_TEST_CASE(ClientAddRemoveMethod, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); - Client c(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); c.setMethodHandler(1, returnEmptyCallback); c.setMethodHandler(1, returnDataCallback); @@ -267,9 +268,9 @@ BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod) BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException); } -BOOST_AUTO_TEST_CASE(ServiceStartStop) +MULTI_FIXTURE_TEST_CASE(ServiceStartStop, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); s.setMethodHandler(1, returnDataCallback); @@ -282,10 +283,10 @@ BOOST_AUTO_TEST_CASE(ServiceStartStop) s.start(); } -BOOST_AUTO_TEST_CASE(ClientStartStop) +MULTI_FIXTURE_TEST_CASE(ClientStartStop, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); - Client c(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); c.setMethodHandler(1, returnDataCallback); c.start(); @@ -300,27 +301,27 @@ BOOST_AUTO_TEST_CASE(ClientStartStop) c.stop(); } -BOOST_AUTO_TEST_CASE(SyncClientToServiceEcho) +MULTI_FIXTURE_TEST_CASE(SyncClientToServiceEcho, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); s.setMethodHandler(1, echoCallback); s.setMethodHandler(2, echoCallback); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); connect(s, c); testEcho(c, 1); testEcho(c, 2); } -BOOST_AUTO_TEST_CASE(Restart) +MULTI_FIXTURE_TEST_CASE(Restart, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); s.setMethodHandler(1, echoCallback); s.start(); s.setMethodHandler(2, echoCallback); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); c.start(); testEcho(c, 1); testEcho(c, 2); @@ -334,14 +335,19 @@ BOOST_AUTO_TEST_CASE(Restart) s.stop(); s.start(); + BOOST_CHECK_THROW(testEcho(c, 2), IPCException); + + c.stop(); + c.start(); + testEcho(c, 1); testEcho(c, 2); } -BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho) +MULTI_FIXTURE_TEST_CASE(SyncServiceToClientEcho, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); - Client c(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); c.setMethodHandler(1, echoCallback); PeerID peerID = connect(s, c); @@ -351,16 +357,16 @@ BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho) BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); } -BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho) +MULTI_FIXTURE_TEST_CASE(AsyncClientToServiceEcho, F, ThreadedFixture, GlibFixture) { std::shared_ptr sentData(new SendData(34)); ValueLatch> recvDataLatch; // Setup Service and Client - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); s.setMethodHandler(1, echoCallback); s.start(); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); c.start(); //Async call @@ -374,13 +380,13 @@ BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho) BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); } -BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho) +MULTI_FIXTURE_TEST_CASE(AsyncServiceToClientEcho, F, ThreadedFixture, GlibFixture) { std::shared_ptr sentData(new SendData(56)); ValueLatch> recvDataLatch; - Service s(SOCKET_PATH); - Client c(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); c.setMethodHandler(1, echoCallback); PeerID peerID = connect(s, c); @@ -397,24 +403,24 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho) } -BOOST_AUTO_TEST_CASE(SyncTimeout) +MULTI_FIXTURE_TEST_CASE(SyncTimeout, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); s.setMethodHandler(1, longEchoCallback); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); connect(s, c); std::shared_ptr sentData(new SendData(78)); BOOST_REQUIRE_THROW((c.callSync(1, sentData, TIMEOUT)), IPCException); } -BOOST_AUTO_TEST_CASE(SerializationError) +MULTI_FIXTURE_TEST_CASE(SerializationError, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); s.setMethodHandler(1, echoCallback); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); connect(s, c); std::shared_ptr throwingData(new ThrowOnAcceptData()); @@ -423,23 +429,24 @@ BOOST_AUTO_TEST_CASE(SerializationError) } -BOOST_AUTO_TEST_CASE(ParseError) +MULTI_FIXTURE_TEST_CASE(ParseError, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); s.setMethodHandler(1, echoCallback); s.start(); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); c.start(); std::shared_ptr sentData(new SendData(78)); BOOST_CHECK_THROW((c.callSync(1, sentData, 10000)), IPCParsingException); } -BOOST_AUTO_TEST_CASE(DisconnectedPeerError) +MULTI_FIXTURE_TEST_CASE(DisconnectedPeerError, F, ThreadedFixture, GlibFixture) { ValueLatch> retStatusLatch; - Service s(SOCKET_PATH); + + Service s(F::getPoll(), SOCKET_PATH); auto method = [](const PeerID, std::shared_ptr&, MethodResult::Pointer methodResult) { auto resultData = std::make_shared(1); @@ -450,7 +457,7 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError) s.setMethodHandler(1, method); s.start(); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); c.start(); auto dataBack = [&retStatusLatch](Result && r) { @@ -470,16 +477,16 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError) } -BOOST_AUTO_TEST_CASE(ReadTimeout) +MULTI_FIXTURE_TEST_CASE(ReadTimeout, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); auto longEchoCallback = [](const PeerID, std::shared_ptr& data, MethodResult::Pointer methodResult) { auto resultData = std::make_shared(data->intVal, LONG_OPERATION_TIME); methodResult->set(resultData); }; s.setMethodHandler(1, longEchoCallback); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); connect(s, c); // Test timeout on read @@ -488,13 +495,13 @@ BOOST_AUTO_TEST_CASE(ReadTimeout) } -BOOST_AUTO_TEST_CASE(WriteTimeout) +MULTI_FIXTURE_TEST_CASE(WriteTimeout, F, ThreadedFixture, GlibFixture) { - Service s(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); s.setMethodHandler(1, echoCallback); s.start(); - Client c(SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); c.start(); // Test echo with a minimal timeout @@ -509,13 +516,13 @@ BOOST_AUTO_TEST_CASE(WriteTimeout) } -BOOST_AUTO_TEST_CASE(AddSignalInRuntime) +MULTI_FIXTURE_TEST_CASE(AddSignalInRuntime, F, ThreadedFixture, GlibFixture) { ValueLatch> recvDataLatchA; ValueLatch> recvDataLatchB; - Service s(SOCKET_PATH); - Client c(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); connect(s, c); auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr& data) { @@ -545,13 +552,13 @@ BOOST_AUTO_TEST_CASE(AddSignalInRuntime) } -BOOST_AUTO_TEST_CASE(AddSignalOffline) +MULTI_FIXTURE_TEST_CASE(AddSignalOffline, F, ThreadedFixture, GlibFixture) { ValueLatch> recvDataLatchA; ValueLatch> recvDataLatchB; - Service s(SOCKET_PATH); - Client c(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr& data) { recvDataLatchA.set(data); @@ -581,66 +588,13 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline) BOOST_CHECK_EQUAL(recvDataB->intVal, sendDataB->intVal); } - -BOOST_AUTO_TEST_CASE(ServiceGSource) -{ - utils::Latch l; - ScopedGlibLoop loop; - - auto signalHandler = [&l](const PeerID, std::shared_ptr&) { - l.set(); - }; - - Service s(SOCKET_PATH); - s.setMethodHandler(1, echoCallback); - - Client c(SOCKET_PATH); - s.setSignalHandler(2, signalHandler); - - connectServiceGSource(s, c); - - testEcho(c, 1); - - auto data = std::make_shared(1); - c.signal(2, data); - - BOOST_CHECK(l.wait(TIMEOUT)); -} - - -BOOST_AUTO_TEST_CASE(ClientGSource) -{ - utils::Latch l; - ScopedGlibLoop loop; - - auto signalHandler = [&l](const PeerID, std::shared_ptr&) { - l.set(); - }; - - Service s(SOCKET_PATH); - s.start(); - - Client c(SOCKET_PATH); - c.setMethodHandler(1, echoCallback); - c.setSignalHandler(2, signalHandler); - - PeerID peerID = connectClientGSource(s, c); - - testEcho(s, 1, peerID); - - auto data = std::make_shared(1); - s.signal(2, data); - - BOOST_CHECK(l.wait(TIMEOUT)); -} - -BOOST_AUTO_TEST_CASE(UsersError) +MULTI_FIXTURE_TEST_CASE(UsersError, F, ThreadedFixture, GlibFixture) { const int TEST_ERROR_CODE = -234; const std::string TEST_ERROR_MESSAGE = "Ay, caramba!"; - Service s(SOCKET_PATH); - Client c(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); auto clientID = connect(s, c); auto throwingMethodHandler = [&](const PeerID, std::shared_ptr&, MethodResult::Pointer) { @@ -668,13 +622,13 @@ BOOST_AUTO_TEST_CASE(UsersError) BOOST_CHECK_EXCEPTION((s.callSync(2, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData); } -BOOST_AUTO_TEST_CASE(AsyncResult) +MULTI_FIXTURE_TEST_CASE(AsyncResult, F, ThreadedFixture, GlibFixture) { const int TEST_ERROR_CODE = -567; const std::string TEST_ERROR_MESSAGE = "Ooo jooo!"; - Service s(SOCKET_PATH); - Client c(SOCKET_PATH); + Service s(F::getPoll(), SOCKET_PATH); + Client c(F::getPoll(), SOCKET_PATH); auto clientID = connect(s, c); auto errorMethodHandler = [&](const PeerID, std::shared_ptr&, MethodResult::Pointer methodResult) { @@ -724,20 +678,44 @@ BOOST_AUTO_TEST_CASE(AsyncResult) BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); } -// BOOST_AUTO_TEST_CASE(ConnectionLimitTest) +MULTI_FIXTURE_TEST_CASE(MixOperations, F, ThreadedFixture, GlibFixture) +{ + utils::Latch l; + + auto signalHandler = [&l](const PeerID, std::shared_ptr&) { + l.set(); + }; + + Service s(F::getPoll(), SOCKET_PATH); + s.setMethodHandler(1, echoCallback); + + Client c(F::getPoll(), SOCKET_PATH); + s.setSignalHandler(2, signalHandler); + + connect(s, c); + + testEcho(c, 1); + + auto data = std::make_shared(1); + c.signal(2, data); + + BOOST_CHECK(l.wait(TIMEOUT)); +} + +// MULTI_FIXTURE_TEST_CASE(ConnectionLimitTest, F, ThreadedFixture, GlibFixture) // { // unsigned oldLimit = ipc::getMaxFDNumber(); // ipc::setMaxFDNumber(50); // // Setup Service and many Clients -// Service s(SOCKET_PATH); +// Service s(F::getPoll(), SOCKET_PATH); // s.setMethodHandler(1, echoCallback); // s.start(); // std::list clients; // for (int i = 0; i < 100; ++i) { // try { -// clients.push_back(Client(SOCKET_PATH)); +// clients.push_back(Client(F::getPoll(), SOCKET_PATH)); // clients.back().start(); // } catch (...) {} // } diff --git a/tests/unit_tests/ut.hpp b/tests/unit_tests/ut.hpp index d8b4b94..e96d363 100644 --- a/tests/unit_tests/ut.hpp +++ b/tests/unit_tests/ut.hpp @@ -28,9 +28,23 @@ #define BOOST_TEST_DYN_LINK #include +#include + #include /** + * Usage example: + * + * MULTI_FIXTURE_TEST_CASE(Test, T, Fixture1, Fixture2, Fixture3) { + * std::cout << T::i << "\n"; + * } + */ +#define MULTI_FIXTURE_TEST_CASE(NAME, TPARAM, ...) \ + typedef boost::mpl::vector<__VA_ARGS__> NAME##_fixtures; \ + BOOST_FIXTURE_TEST_CASE_TEMPLATE(NAME, TPARAM, NAME##_fixtures, TPARAM) + + +/** * An exception message checker * * Usage example: