From 1ea2e87dd7c56e65f918466cdb20ef5fd2baa434 Mon Sep 17 00:00:00 2001 From: Jan Olszak Date: Mon, 8 Dec 2014 16:48:21 +0100 Subject: [PATCH] IPC: Support for the external polling loop in Service [Bug/Feature] Using GMainLoop is possible [Cause] N/A [Solution] For glib > v.2.36 [Verification] Build, install, run tests Change-Id: Ic6d74688c322dd79b29195d94658a4f2ffe0aa83 --- common/ipc/internals/acceptor.cpp | 44 +++++++--- common/ipc/internals/acceptor.hpp | 25 ++++++ common/ipc/internals/processor.cpp | 84 +++++++++++------- common/ipc/internals/processor.hpp | 79 ++++++++++------- common/ipc/internals/utils.cpp | 4 +- common/ipc/ipc-gsource.cpp | 174 +++++++++++++++++++++++++++++++++++++ common/ipc/ipc-gsource.hpp | 150 ++++++++++++++++++++++++++++++++ common/ipc/service.cpp | 35 ++++++++ common/ipc/service.hpp | 17 ++++ common/ipc/types.cpp | 2 + tests/unit_tests/ipc/ut-ipc.cpp | 71 ++++++++++++--- 11 files changed, 599 insertions(+), 86 deletions(-) create mode 100644 common/ipc/ipc-gsource.cpp create mode 100644 common/ipc/ipc-gsource.hpp diff --git a/common/ipc/internals/acceptor.cpp b/common/ipc/internals/acceptor.cpp index 3a6c4cd..1eab1c2 100644 --- a/common/ipc/internals/acceptor.cpp +++ b/common/ipc/internals/acceptor.cpp @@ -25,21 +25,20 @@ #include "config.hpp" #include "ipc/exception.hpp" -#include "ipc/internals/utils.hpp" #include "ipc/internals/acceptor.hpp" #include "logger/logger.hpp" #include #include #include -#include #include namespace vasum { namespace ipc { Acceptor::Acceptor(const std::string& socketPath, const NewConnectionCallback& newConnectionCallback) - : mNewConnectionCallback(newConnectionCallback), + : mIsRunning(false), + mNewConnectionCallback(newConnectionCallback), mSocket(Socket::createSocket(socketPath)) { LOGT("Creating Acceptor for socket " << socketPath); @@ -88,9 +87,8 @@ void Acceptor::run() fds[1].fd = mSocket.getFD(); fds[1].events = POLLIN; - // Main loop - bool isRunning = true; - while (isRunning) { + mIsRunning = true; + while (mIsRunning) { LOGT("Waiting for new connections..."); int ret = ::poll(fds.data(), fds.size(), -1 /*blocking call*/); @@ -108,23 +106,41 @@ void Acceptor::run() // Check for incoming connections if (fds[1].revents & POLLIN) { fds[1].revents = 0; - std::shared_ptr tmpSocket = mSocket.accept(); - mNewConnectionCallback(tmpSocket); + handleConnection(); } // Check for incoming events if (fds[0].revents & POLLIN) { fds[0].revents = 0; - - if (mEventQueue.receive() == Event::FINISH) { - LOGD("Event FINISH"); - isRunning = false; - break; - } + handleEvent(); } } LOGT("Exiting run"); } +void Acceptor::handleConnection() +{ + std::shared_ptr tmpSocket = mSocket.accept(); + mNewConnectionCallback(tmpSocket); +} + +void Acceptor::handleEvent() +{ + if (mEventQueue.receive() == Event::FINISH) { + LOGD("Event FINISH"); + mIsRunning = false; + } +} + +FileDescriptor Acceptor::getEventFD() +{ + return mEventQueue.getFD(); +} + +FileDescriptor Acceptor::getConnectionFD() +{ + return mSocket.getFD(); +} + } // namespace ipc } // namespace vasum diff --git a/common/ipc/internals/acceptor.hpp b/common/ipc/internals/acceptor.hpp index 702a161..f87a0bb 100644 --- a/common/ipc/internals/acceptor.hpp +++ b/common/ipc/internals/acceptor.hpp @@ -29,6 +29,7 @@ #include "ipc/internals/socket.hpp" #include "ipc/internals/event-queue.hpp" +#include "ipc/types.hpp" #include #include @@ -67,11 +68,35 @@ public: */ void stop(); + /** + * Handle one incoming connection. + * Used with external polling + */ + void handleConnection(); + + /** + * Handle one event from the internal event's queue + * Used with external polling + */ + void handleEvent(); + + /** + * @return file descriptor of internal event's queue + */ + FileDescriptor getEventFD(); + + /** + * @return file descriptor for the connection socket + */ + FileDescriptor getConnectionFD(); + private: enum class Event : int { FINISH // Shutdown request }; + bool mIsRunning; + NewConnectionCallback mNewConnectionCallback; Socket mSocket; diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index be1060d..72a1788 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -112,6 +112,11 @@ void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback) mRemovedPeerCallback = removedPeerCallback; } +FileDescriptor Processor::getEventFD() +{ + return mEventQueue.getFD(); +} + void Processor::removeMethod(const MethodID methodID) { LOGT("Removing method " << methodID); @@ -128,9 +133,9 @@ FileDescriptor Processor::addPeer(const std::shared_ptr& socketPtr) peerFD = socketPtr->getFD(); SocketInfo socketInfo(peerFD, std::move(socketPtr)); mNewSockets.push(std::move(socketInfo)); + mEventQueue.send(Event::ADD_PEER); } LOGI("New peer added. Id: " << peerFD); - mEventQueue.send(Event::ADD_PEER); return peerFD; } @@ -143,9 +148,9 @@ void Processor::removePeer(const FileDescriptor peerFD) Lock lock(mSocketsMutex); RemovePeerRequest request(peerFD, conditionPtr); mPeersToDelete.push(std::move(request)); + mEventQueue.send(Event::REMOVE_PEER); } - mEventQueue.send(Event::REMOVE_PEER); auto isPeerDeleted = [&peerFD, this]()->bool { Lock lock(mSocketsMutex); @@ -204,6 +209,10 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, Status status) void Processor::resetPolling() { + if (!isStarted()) { + return; + } + LOGI("Resetting polling"); // Setup polling on eventfd and sockets Lock lock(mSocketsMutex); @@ -251,20 +260,22 @@ void Processor::run() } // Check for incoming events - if (handleEvent()) { - // mFDs changed - continue; + if (mFDs[0].revents & POLLIN) { + mFDs[0].revents &= ~(POLLIN); + if (handleEvent()) { + // mFDs changed + continue; + } } + } cleanCommunication(); } - bool Processor::handleLostConnections() { std::vector peersToRemove; - { Lock lock(mSocketsMutex); for (unsigned int i = 1; i < mFDs.size(); ++i) { @@ -283,39 +294,61 @@ bool Processor::handleLostConnections() return !peersToRemove.empty(); } +bool Processor::handleLostConnection(const FileDescriptor peerFD) +{ + removePeerInternal(peerFD, Status::PEER_DISCONNECTED); + return true; +} + bool Processor::handleInputs() { - std::vector> socketsWithInput; + std::vector peersWithInput; { Lock lock(mSocketsMutex); for (unsigned int i = 1; i < mFDs.size(); ++i) { if (mFDs[i].revents & POLLIN) { mFDs[i].revents &= ~(POLLIN); - socketsWithInput.push_back(mSockets[mFDs[i].fd]); + peersWithInput.push_back(mFDs[i].fd); } } } bool pollChanged = false; // Handle input outside the critical section - for (const auto& socketPtr : socketsWithInput) { - pollChanged = pollChanged || handleInput(*socketPtr); + for (const FileDescriptor peerFD : peersWithInput) { + pollChanged = pollChanged || handleInput(peerFD); } return pollChanged; } -bool Processor::handleInput(const Socket& socket) +bool Processor::handleInput(const FileDescriptor peerFD) { LOGT("Handle incoming data"); + + std::shared_ptr socketPtr; + try { + socketPtr = mSockets.at(peerFD); + } catch (const std::out_of_range&) { + LOGE("No such peer: " << peerFD); + return false; + } + MethodID methodID; MessageID messageID; { - Socket::Guard guard = socket.getGuard(); - socket.read(&methodID, sizeof(methodID)); - socket.read(&messageID, sizeof(messageID)); + Socket::Guard guard = socketPtr->getGuard(); + try { + socketPtr->read(&methodID, sizeof(methodID)); + socketPtr->read(&messageID, sizeof(messageID)); + + } catch (const IPCException& e) { + LOGE("Error during reading the socket"); + removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER); + return true; + } if (methodID == RETURN_METHOD_ID) { - return onReturnValue(socket, messageID); + return onReturnValue(*socketPtr, messageID); } else { Lock lock(mCallsMutex); @@ -323,19 +356,19 @@ bool Processor::handleInput(const Socket& socket) // Method std::shared_ptr methodCallbacks = mMethodsCallbacks.at(methodID); lock.unlock(); - return onRemoteCall(socket, methodID, messageID, methodCallbacks); + return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks); } else if (mSignalsCallbacks.count(methodID)) { // Signal std::shared_ptr signalCallbacks = mSignalsCallbacks.at(methodID); lock.unlock(); - return onRemoteSignal(socket, methodID, messageID, signalCallbacks); + return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks); } else { // Nothing lock.unlock(); LOGW("No method or signal callback for methodID: " << methodID); - removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER); + removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER); return true; } } @@ -461,13 +494,6 @@ bool Processor::onRemoteCall(const Socket& socket, bool Processor::handleEvent() { - if (!(mFDs[0].revents & POLLIN)) { - // No event to serve - return false; - } - - mFDs[0].revents &= ~(POLLIN); - switch (mEventQueue.receive()) { case Event::FINISH: { LOGD("Event FINISH"); @@ -518,11 +544,11 @@ bool Processor::onNewPeer() // Broadcast the new signal to peers LOGW("Sending handled signals"); - std::list peersIDs; + std::list peersFDs; { Lock lock(mSocketsMutex); for (const auto kv : mSockets) { - peersIDs.push_back(kv.first); + peersFDs.push_back(kv.first); } } @@ -535,7 +561,7 @@ bool Processor::onNewPeer() } auto data = std::make_shared(ids); - for (const FileDescriptor peerFD : peersIDs) { + for (const FileDescriptor peerFD : peersFDs) { callInternal(REGISTER_SIGNAL_METHOD_ID, peerFD, data, diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp index 6ce2688..d33d12b 100644 --- a/common/ipc/internals/processor.hpp +++ b/common/ipc/internals/processor.hpp @@ -75,6 +75,7 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000; * - new way to generate UIDs * - callbacks for serialization/parsing * - store Sockets in a vector, maybe SocketStore? +* - fix valgrind tests * * */ @@ -240,6 +241,35 @@ public: void signal(const MethodID methodID, const std::shared_ptr& data); + /** + * Removes one peer. + * Handler used in external polling. + * + * @param peerFD file description identifying the peer + * @return should the polling structure be rebuild + */ + bool handleLostConnection(const FileDescriptor peerFD); + + /** + * Handles input from one peer. + * Handler used in external polling. + * + * @param peerFD file description identifying the peer + * @return should the polling structure be rebuild + */ + bool handleInput(const FileDescriptor peerFD); + + /** + * Handle one event from the internal event's queue + * + * @return should the polling structure be rebuild + */ + bool handleEvent(); + + /** + * @return file descriptor for the internal event's queue + */ + FileDescriptor getEventFD(); private: typedef std::function& data)> SerializeCallback; @@ -383,13 +413,12 @@ private: static void discardResultHandler(Status, std::shared_ptr&) {} void run(); - bool handleEvent(); bool onCall(); bool onNewPeer(); bool onRemovePeer(); bool handleLostConnections(); bool handleInputs(); - bool handleInput(const Socket& socket); + bool onReturnValue(const Socket& socket, const MessageID messageID); bool onRemoteCall(const Socket& socket, @@ -494,26 +523,24 @@ void Processor::addSignalHandler(const MethodID methodID, mSignalsCallbacks[methodID] = std::make_shared(std::move(signalCall)); } - if (isStarted()) { - // Broadcast the new signal to peers - std::vector ids {methodID}; - auto data = std::make_shared(ids); + std::vector ids {methodID}; + auto data = std::make_shared(ids); - std::list peersIDs; - { - Lock lock(mSocketsMutex); - for (const auto kv : mSockets) { - peersIDs.push_back(kv.first); - } + std::list peersFDs; + { + Lock lock(mSocketsMutex); + for (const auto kv : mSockets) { + peersFDs.push_back(kv.first); } + } - for (const FileDescriptor peerFD : peersIDs) { - callSync(REGISTER_SIGNAL_METHOD_ID, - peerFD, - data, - DEFAULT_METHOD_TIMEOUT); - } + for (const FileDescriptor peerFD : peersFDs) { + callSync(REGISTER_SIGNAL_METHOD_ID, + peerFD, + data, + DEFAULT_METHOD_TIMEOUT); } + } template @@ -535,11 +562,6 @@ MessageID Processor::callAsync(const MethodID methodID, const std::shared_ptr& data, const typename ResultHandler::type& process) { - if (!isStarted()) { - LOGE("The Processor thread is not started. Can't send any data."); - throw IPCException("The Processor thread is not started. Can't send any data."); - } - return callInternal(methodID, peerFD, data, process); } @@ -600,18 +622,13 @@ template void Processor::signal(const MethodID methodID, const std::shared_ptr& data) { - if (!isStarted()) { - LOGE("The Processor thread is not started. Can't send any data."); - throw IPCException("The Processor thread is not started. Can't send any data."); - } - - std::list peersIDs; + std::list peersFDs; { Lock lock(mSocketsMutex); - peersIDs = mSignalsPeers[methodID]; + peersFDs = mSignalsPeers[methodID]; } - for (const FileDescriptor peerFD : peersIDs) { + for (const FileDescriptor peerFD : peersFDs) { Lock lock(mCallsMutex); mCalls.push(methodID, peerFD, data); mEventQueue.send(Event::CALL); diff --git a/common/ipc/internals/utils.cpp b/common/ipc/internals/utils.cpp index 88f8fc0..bd98c1b 100644 --- a/common/ipc/internals/utils.cpp +++ b/common/ipc/internals/utils.cpp @@ -122,8 +122,8 @@ void write(int fd, const void* bufferPtr, const size_t size, int timeoutMS) // Neglected errors LOGD("Retrying write"); } else { - LOGE("Error during reading: " + std::string(strerror(errno))); - throw IPCException("Error during reading: " + std::string(strerror(errno))); + LOGE("Error during writing: " + std::string(strerror(errno))); + throw IPCException("Error during writing: " + std::string(strerror(errno))); } if (nTotal >= size) { diff --git a/common/ipc/ipc-gsource.cpp b/common/ipc/ipc-gsource.cpp new file mode 100644 index 0000000..f5cdbb5 --- /dev/null +++ b/common/ipc/ipc-gsource.cpp @@ -0,0 +1,174 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Class for creating a dedicated GSource + */ + + +#include "config.hpp" + +#include "ipc/ipc-gsource.hpp" + +#if GLIB_CHECK_VERSION(2,36,0) + +#include "logger/logger.hpp" +#include + +namespace vasum { +namespace ipc { + +namespace { + + +GIOCondition conditions = static_cast(G_IO_IN | + G_IO_ERR | + G_IO_HUP); +} + + +IPCGSource::IPCGSource(const std::vector fds, + const HandlerCallback& handlerCallback) + : mHandlerCallback(handlerCallback) +{ + LOGD("Constructing IPCGSource"); + for (const FileDescriptor fd : fds) { + addFD(fd); + } +} + +IPCGSource::~IPCGSource() +{ + LOGD("Destroying IPCGSource"); + g_source_destroy(&mGSource); + +} + +IPCGSource* IPCGSource::create(const std::vector& fds, + const HandlerCallback& handlerCallback) +{ + LOGD("Creating IPCGSource"); + + static GSourceFuncs funcs = { &IPCGSource::prepare, + &IPCGSource::check, + &IPCGSource::dispatch, + &IPCGSource::finalize, + nullptr, + nullptr + }; + + // New GSource + GSource* gSource = g_source_new(&funcs, sizeof(IPCGSource)); + g_source_set_priority(gSource, G_PRIORITY_HIGH); + + // Fill additional data + IPCGSource* source = reinterpret_cast(gSource); + return new(source) IPCGSource(fds, handlerCallback); +} + + +void IPCGSource::addFD(const FileDescriptor fd) +{ + if (!&mGSource) { + // In case it's called as a callback but the IPCGSource is destroyed + return; + } + + LOGD("Adding fd to glib"); + gpointer tag = g_source_add_unix_fd(&mGSource, + fd, + conditions); + FDInfo fdInfo(tag, fd); + mFDInfos.push_back(std::move(fdInfo)); +} + +void IPCGSource::removeFD(const FileDescriptor fd) +{ + if (!&mGSource) { + // In case it's called as a callback but the IPCGSource is destroyed + return; + } + + LOGD("Removing fd from glib"); + auto it = std::find(mFDInfos.begin(), mFDInfos.end(), fd); + if (it == mFDInfos.end()) { + LOGE("No such fd"); + return; + } + g_source_remove_unix_fd(&mGSource, it->tag); + mFDInfos.erase(it); +} + +guint IPCGSource::attach(GMainContext* context) +{ + LOGD("Attaching to GMainContext"); + return g_source_attach(&mGSource, context); +} + +gboolean IPCGSource::prepare(GSource* gSource, gint* timeout) +{ + if (!gSource) { + return FALSE; + } + + *timeout = -1; + + // TODO: Implement hasEvents() method in Client and Service and use it here as a callback: + // return source->hasEvents(); + return FALSE; +} + +gboolean IPCGSource::check(GSource* gSource) +{ + if (!gSource) { + return FALSE; + } + + return TRUE; +} + +gboolean IPCGSource::dispatch(GSource* gSource, + GSourceFunc /*callback*/, + gpointer /*userData*/) +{ + IPCGSource* source = reinterpret_cast(gSource); + + for (const FDInfo fdInfo : source->mFDInfos) { + GIOCondition cond = g_source_query_unix_fd(gSource, fdInfo.tag); + if (conditions & cond) { + source->mHandlerCallback(fdInfo.fd, cond); + } + } + + return TRUE; // Don't remove the GSource from the GMainContext +} + +void IPCGSource::finalize(GSource* gSource) +{ + if (gSource) { + IPCGSource* source = reinterpret_cast(gSource); + source->~IPCGSource(); + } +} + +} // namespace ipc +} // namespace vasum + +#endif // GLIB_CHECK_VERSION diff --git a/common/ipc/ipc-gsource.hpp b/common/ipc/ipc-gsource.hpp new file mode 100644 index 0000000..96e0a1a --- /dev/null +++ b/common/ipc/ipc-gsource.hpp @@ -0,0 +1,150 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Class for creating a dedicated GSource + */ + +#ifndef COMMON_IPC_IPC_GSOURCE_HPP +#define COMMON_IPC_IPC_GSOURCE_HPP + +#include +#if GLIB_CHECK_VERSION(2,36,0) + +#include "ipc/service.hpp" +#include "ipc/types.hpp" +#include "utils/callback-wrapper.hpp" +#include + + +namespace vasum { +namespace ipc { + +/** + * Class for connecting to the glib's loop. + * Creates a dedicated GSource. + * + * It's supposed to be constructed ONLY with the static create method + * and destructed in a glib callback. + */ +struct IPCGSource { +public: + typedef std::function HandlerCallback; + + IPCGSource() = delete; + IPCGSource(const IPCGSource&) = delete; + IPCGSource& operator=(const IPCGSource&) = delete; + + /** + * New file descriptor to listen on. + * + * @param peerFD file descriptor + */ + void addFD(const FileDescriptor peerFD); + + /** + * Removes the file descriptor from the GSource + * + * @param peerFD file descriptor + */ + void removeFD(const FileDescriptor peerFD); + + /** + * Attach to the glib's GMainContext + * + * @param context where to connect + * @return result of the g_source_attach call + */ + guint attach(GMainContext* context = nullptr); + + /** + * Creates the IPCGSource class in the memory allocated by glib. + * Calls IPCGSource's constructor + * + * @param fds initial set of file descriptors + * @param handlerCallback event handling callback + * + * @return pointer to the IPCGSource + */ + static IPCGSource* create(const std::vector& fds, + const HandlerCallback& handlerCallback); + +private: + + /** + * GSourceFuncs' callback + */ + static gboolean prepare(GSource* source, gint* timeout); + + /** + * GSourceFuncs' callback + */ + static gboolean check(GSource* source); + + /** + * GSourceFuncs' callback + */ + static gboolean dispatch(GSource* source, + GSourceFunc callbacks, + gpointer userData); + + /** + * GSourceFuncs' callback + */ + static void finalize(GSource* source); + + + + // Called only from IPCGSource::create + IPCGSource(const std::vector fds, + const HandlerCallback& handlerCallback); + + // Called only from IPCGSource::finalize + ~IPCGSource(); + + struct FDInfo { + FDInfo(gpointer tag, FileDescriptor fd) + : tag(tag), fd(fd) {} + + bool operator==(const gpointer t) + { + return t == tag; + } + + bool operator==(const FileDescriptor f) + { + return f == fd; + } + + gpointer tag; + FileDescriptor fd; + }; + + GSource mGSource; + HandlerCallback mHandlerCallback; + std::vector mFDInfos; +}; + +} // namespace ipc +} // namespace vasum + +#endif // GLIB_CHECK_VERSION + +#endif // COMMON_IPC_IPC_GSOURCE_HPP diff --git a/common/ipc/service.cpp b/common/ipc/service.cpp index 5ee5fbd..be95cee 100644 --- a/common/ipc/service.cpp +++ b/common/ipc/service.cpp @@ -79,6 +79,41 @@ void Service::stop() LOGD("Stopped"); } +std::vector Service::getFDs() +{ + std::vector fds; + fds.push_back(mAcceptor.getEventFD()); + fds.push_back(mAcceptor.getConnectionFD()); + fds.push_back(mProcessor.getEventFD()); + + return fds; +} + +void Service::handle(const FileDescriptor fd, const short pollEvent) +{ + if (fd == mProcessor.getEventFD() && pollEvent & POLLIN) { + mProcessor.handleEvent(); + return; + + } else if (fd == mAcceptor.getConnectionFD() && pollEvent & POLLIN) { + mAcceptor.handleConnection(); + return; + + } else if (fd == mAcceptor.getEventFD() && pollEvent & POLLIN) { + mAcceptor.handleEvent(); + return; + + } else if (pollEvent & POLLIN) { + mProcessor.handleInput(fd); + return; + + } else if (pollEvent & POLLHUP) { + mProcessor.handleLostConnection(fd); + return; + } +} + + void Service::setNewPeerCallback(const PeerCallback& newPeerCallback) { mProcessor.setNewPeerCallback(newPeerCallback); diff --git a/common/ipc/service.hpp b/common/ipc/service.hpp index 1f9aee3..ed83606 100644 --- a/common/ipc/service.hpp +++ b/common/ipc/service.hpp @@ -75,6 +75,23 @@ public: void stop(); /** + * Used with an external polling loop + * + * @return vector of internal file descriptors + */ + std::vector getFDs(); + + /** + * Used with an external polling loop. + * Handles one event from the file descriptor. + * + * @param fd file descriptor + * @param pollEvent event on the fd. Defined in poll.h + * + */ + void handle(const FileDescriptor fd, const short pollEvent); + + /** * Set the callback called for each new connection to a peer * * @param newPeerCallback the callback diff --git a/common/ipc/types.cpp b/common/ipc/types.cpp index 18c769d..fa57648 100644 --- a/common/ipc/types.cpp +++ b/common/ipc/types.cpp @@ -22,6 +22,8 @@ * @brief Types definitions and helper functions */ +#include "config.hpp" + #include "ipc/types.hpp" #include "logger/logger.hpp" diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp index c671db6..9ce131d 100644 --- a/tests/unit_tests/ipc/ut-ipc.cpp +++ b/tests/unit_tests/ipc/ut-ipc.cpp @@ -33,7 +33,9 @@ #include "ipc/service.hpp" #include "ipc/client.hpp" +#include "ipc/ipc-gsource.hpp" #include "ipc/types.hpp" +#include "utils/glib-loop.hpp" #include "config/fields.hpp" #include "logger/logger.hpp" @@ -47,6 +49,8 @@ using namespace vasum; using namespace vasum::ipc; +using namespace vasum::utils; +using namespace std::placeholders; namespace fs = boost::filesystem; namespace { @@ -132,30 +136,48 @@ std::shared_ptr longEchoCallback(const FileDescriptor, std::shared_ptr return data; } -FileDescriptor connect(Service& s, Client& c) +FileDescriptor connect(Service& s, Client& c, bool serviceUsesGlib = false) { // Connects the Client to the Service and returns Clients FileDescriptor - std::mutex mutex; std::condition_variable cv; FileDescriptor peerFD = 0; - auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFileDescriptor) { + auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFD) { std::unique_lock lock(mutex); - peerFD = newFileDescriptor; - cv.notify_one(); + peerFD = newFD; + cv.notify_all(); }; - s.setNewPeerCallback(newPeerCallback); - if (!s.isStarted()) { - s.start(); + if (!serviceUsesGlib) { + s.setNewPeerCallback(newPeerCallback); + + if (!s.isStarted()) { + s.start(); + } + } else { +#if GLIB_CHECK_VERSION(2,36,0) + + IPCGSource* serviceGSourcePtr = IPCGSource::create(s.getFDs(), std::bind(&Service::handle, &s, _1, _2)); + + auto agregateCallback = [&newPeerCallback, &serviceGSourcePtr](const FileDescriptor newFD) { + serviceGSourcePtr->addFD(newFD); + newPeerCallback(newFD); + }; + + s.setNewPeerCallback(agregateCallback); + s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, serviceGSourcePtr, _1)); + + serviceGSourcePtr->attach(); +#endif // GLIB_CHECK_VERSION + } c.start(); std::unique_lock lock(mutex); - BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerFD]() { + BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() { return peerFD != 0; })); @@ -165,7 +187,7 @@ FileDescriptor connect(Service& s, Client& c) void testEcho(Client& c, const MethodID methodID) { std::shared_ptr sentData(new SendData(34)); - std::shared_ptr recvData = c.callSync(methodID, sentData); + std::shared_ptr recvData = c.callSync(methodID, sentData, 1000); BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); } @@ -554,6 +576,35 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline) } +#if GLIB_CHECK_VERSION(2,36,0) + +BOOST_AUTO_TEST_CASE(ServiceGSource) +{ + ScopedGlibLoop loop; + + std::atomic_bool isSignalCalled(false); + auto signalHandler = [&isSignalCalled](const FileDescriptor, std::shared_ptr&) { + isSignalCalled = true; + }; + + Service s(socketPath); + s.addMethodHandler(1, echoCallback); + + Client c(socketPath); + s.addSignalHandler(2, signalHandler); + connect(s, c, true); + + testEcho(c, 1); + + auto data = std::make_shared(1); + c.signal(2, data); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); //TODO wait_for + BOOST_CHECK(isSignalCalled); +} + +#endif // GLIB_CHECK_VERSION + // BOOST_AUTO_TEST_CASE(ConnectionLimitTest) // { // unsigned oldLimit = ipc::getMaxFDNumber(); -- 2.7.4