From 1d2b75e9a25d971eefff29adc5fd4166e43cf827 Mon Sep 17 00:00:00 2001 From: Jan Olszak Date: Tue, 16 Dec 2014 14:09:33 +0100 Subject: [PATCH] IPC: External polling loop with a Client [Bug/Feature] Using GMainLoop with a Client is possible. Fixed some buggs [Cause] N/A [Solution] N/A [Verification] Build, install, run tests Change-Id: Iab3350b400739bb951d84e0d6b7de15d0cccf1d3 --- common/ipc/client.cpp | 38 ++++++++-- common/ipc/client.hpp | 24 +++++++ common/ipc/internals/acceptor.cpp | 3 +- common/ipc/internals/processor.cpp | 67 +++++++++++++++-- common/ipc/internals/processor.hpp | 7 ++ common/ipc/ipc-gsource.cpp | 31 +++++--- common/ipc/ipc-gsource.hpp | 16 +++-- common/ipc/service.cpp | 6 +- common/utils/latch.hpp | 8 +-- common/utils/signal.cpp | 63 ++++++++++++++++ common/utils/signal.hpp | 37 ++++++++++ server/CMakeLists.txt | 7 +- tests/unit_tests/CMakeLists.txt | 6 ++ tests/unit_tests/ipc/ut-ipc.cpp | 144 ++++++++++++++++++++++++++++++------- zone-daemon/CMakeLists.txt | 6 ++ 15 files changed, 404 insertions(+), 59 deletions(-) create mode 100644 common/utils/signal.cpp create mode 100644 common/utils/signal.hpp diff --git a/common/ipc/client.cpp b/common/ipc/client.cpp index 835020d..3187f15 100644 --- a/common/ipc/client.cpp +++ b/common/ipc/client.cpp @@ -48,16 +48,21 @@ Client::~Client() LOGD("Destroyed client"); } -void Client::start() +void Client::connect() { - LOGD("Starting client..."); - // Initialize the connection with the server LOGD("Connecting to " + mSocketPath); auto socketPtr = std::make_shared(Socket::connectSocket(mSocketPath)); mServiceFD = mProcessor.addPeer(socketPtr); +} + +void Client::start() +{ + LOGD("Starting client..."); - // Start listening + connect(); + + // Start polling thread mProcessor.start(); LOGD("Started client"); @@ -75,6 +80,31 @@ void Client::stop() LOGD("Stopped"); } +std::vector Client::getFDs() +{ + std::vector fds; + fds.push_back(mProcessor.getEventFD()); + fds.push_back(mServiceFD); + + return fds; +} + +void Client::handle(const FileDescriptor fd, const short pollEvent) +{ + if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) { + mProcessor.handleEvent(); + return; + + } else if (pollEvent & POLLIN) { + mProcessor.handleInput(fd); + return; + + } else if (pollEvent & POLLHUP) { + mProcessor.handleLostConnection(fd); + return; + } +} + void Client::setNewPeerCallback(const PeerCallback& newPeerCallback) { mProcessor.setNewPeerCallback(newPeerCallback); diff --git a/common/ipc/client.hpp b/common/ipc/client.hpp index 5751812..b5b00e5 100644 --- a/common/ipc/client.hpp +++ b/common/ipc/client.hpp @@ -55,6 +55,13 @@ public: Client& operator=(const Client&) = delete; /** + * Places a connection request in the internal event queue. + * + * Used with an external polling loop. + */ + void connect(); + + /** * Starts the worker thread */ void start(); @@ -70,6 +77,23 @@ public: void stop(); /** + * Used with an external polling loop + * + * @return vector of internal file descriptors + */ + std::vector getFDs(); + + /** + * Used with an external polling loop. + * Handles one event from the file descriptor. + * + * @param fd file descriptor + * @param pollEvent event on the fd. Defined in poll.h + * + */ + void handle(const FileDescriptor fd, const short pollEvent); + + /** * Set the callback called for each new connection to a peer * * @param newPeerCallback the callback diff --git a/common/ipc/internals/acceptor.cpp b/common/ipc/internals/acceptor.cpp index 1eab1c2..627e1fe 100644 --- a/common/ipc/internals/acceptor.cpp +++ b/common/ipc/internals/acceptor.cpp @@ -67,12 +67,13 @@ void Acceptor::start() void Acceptor::stop() { LOGT("Stopping Acceptor"); + if (mThread.joinable()) { - LOGT("Event::FINISH -> Acceptor"); mEventQueue.send(Event::FINISH); LOGT("Waiting for Acceptor to finish"); mThread.join(); } + LOGT("Stopped Acceptor"); } diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index 72a1788..22aa38e 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -27,9 +27,11 @@ #include "ipc/exception.hpp" #include "ipc/internals/processor.hpp" #include "ipc/internals/utils.hpp" +#include "utils/signal.hpp" #include #include +#include #include #include @@ -58,8 +60,9 @@ Processor::Processor(const PeerCallback& newPeerCallback, mMaxNumberOfPeers(maxNumberOfPeers) { LOGT("Creating Processor"); - using namespace std::placeholders; + utils::signalBlock(SIGPIPE); + using namespace std::placeholders; addMethodHandlerInternal(REGISTER_SIGNAL_METHOD_ID, std::bind(&Processor::onNewSignals, this, _1, _2)); @@ -73,6 +76,7 @@ Processor::~Processor() } catch (IPCException& e) { LOGE("Error in Processor's destructor: " << e.what()); } + LOGT("Destroyed Processor"); } @@ -93,10 +97,12 @@ void Processor::start() void Processor::stop() { LOGT("Stopping Processor"); + if (isStarted()) { mEventQueue.send(Event::FINISH); mThread.join(); } + LOGT("Stopped Processor"); } @@ -167,7 +173,10 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, Status status) LOGW("Removing peer. ID: " << peerFD); { Lock lock(mSocketsMutex); - mSockets.erase(peerFD); + if (!mSockets.erase(peerFD)) { + LOGW("No such peer. Another thread called removePeerInternal"); + return; + } // Remove from signal addressees for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) { @@ -269,8 +278,6 @@ void Processor::run() } } - - cleanCommunication(); } bool Processor::handleLostConnections() @@ -327,6 +334,8 @@ bool Processor::handleInput(const FileDescriptor peerFD) std::shared_ptr socketPtr; try { + // Get the peer's socket + Lock lock(mSocketsMutex); socketPtr = mSockets.at(peerFD); } catch (const std::out_of_range&) { LOGE("No such peer: " << peerFD); @@ -497,7 +506,10 @@ bool Processor::handleEvent() switch (mEventQueue.receive()) { case Event::FINISH: { LOGD("Event FINISH"); + mIsRunning = false; + cleanCommunication(); + return false; } @@ -607,6 +619,15 @@ bool Processor::onCall() LOGT("Handle call (from another thread) to send a message."); CallQueue::Call call = getCall(); + if (call.parse && call.process) { + return onMethodCall(call); + } else { + return onSignalCall(call); + } +} + +bool Processor::onSignalCall(CallQueue::Call& call) +{ std::shared_ptr socketPtr; try { // Get the peer's socket @@ -614,11 +635,43 @@ bool Processor::onCall() socketPtr = mSockets.at(call.peerFD); } catch (const std::out_of_range&) { LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD); + return false; + } + + try { + // Send the call with the socket + Socket::Guard guard = socketPtr->getGuard(); + socketPtr->write(&call.methodID, sizeof(call.methodID)); + socketPtr->write(&call.messageID, sizeof(call.messageID)); + call.serialize(socketPtr->getFD(), call.data); + } catch (const std::exception& e) { + LOGE("Error during sending a signal: " << e.what()); + + removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR); + return true; + } + + return false; + +} + +bool Processor::onMethodCall(CallQueue::Call& call) +{ + std::shared_ptr socketPtr; + try { + // Get the peer's socket + Lock lock(mSocketsMutex); + socketPtr = mSockets.at(call.peerFD); + } catch (const std::out_of_range&) { + LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD); + + // Pass the error to the processing callback IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data)); + return false; } - if (call.parse && call.process) { + { // Set what to do with the return message, but only if needed Lock lock(mReturnCallbacksMutex); if (mReturnCallbacks.count(call.messageID) != 0) { @@ -636,9 +689,9 @@ bool Processor::onCall() socketPtr->write(&call.messageID, sizeof(call.messageID)); call.serialize(socketPtr->getFD(), call.data); } catch (const std::exception& e) { - LOGE("Error during sending a message: " << e.what()); + LOGE("Error during sending a method: " << e.what()); - // Inform about the error + // Inform about the error, IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data)); { diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp index d33d12b..728b8d2 100644 --- a/common/ipc/internals/processor.hpp +++ b/common/ipc/internals/processor.hpp @@ -76,6 +76,11 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000; * - callbacks for serialization/parsing * - store Sockets in a vector, maybe SocketStore? * - fix valgrind tests +* - poll loop outside. +* - waiting till the EventQueue is empty before leaving stop() +* - no new events added after stop() called +* - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but +* there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :) * * */ @@ -414,6 +419,8 @@ private: void run(); bool onCall(); + bool onSignalCall(CallQueue::Call& call); + bool onMethodCall(CallQueue::Call& call); bool onNewPeer(); bool onRemovePeer(); bool handleLostConnections(); diff --git a/common/ipc/ipc-gsource.cpp b/common/ipc/ipc-gsource.cpp index f5cdbb5..4c098d9 100644 --- a/common/ipc/ipc-gsource.cpp +++ b/common/ipc/ipc-gsource.cpp @@ -57,12 +57,10 @@ IPCGSource::IPCGSource(const std::vector fds, IPCGSource::~IPCGSource() { LOGD("Destroying IPCGSource"); - g_source_destroy(&mGSource); - } -IPCGSource* IPCGSource::create(const std::vector& fds, - const HandlerCallback& handlerCallback) +IPCGSource::Pointer IPCGSource::create(const std::vector& fds, + const HandlerCallback& handlerCallback) { LOGD("Creating IPCGSource"); @@ -80,9 +78,19 @@ IPCGSource* IPCGSource::create(const std::vector& fds, // Fill additional data IPCGSource* source = reinterpret_cast(gSource); - return new(source) IPCGSource(fds, handlerCallback); -} + new(source)IPCGSource(fds, handlerCallback); + + auto deleter = [](IPCGSource * ptr) { + LOGD("Deleter"); + + if (!g_source_is_destroyed(&(ptr->mGSource))) { + // This way finalize method will be run in glib loop's thread + g_source_destroy(&(ptr->mGSource)); + } + }; + return std::shared_ptr(source, deleter); +} void IPCGSource::addFD(const FileDescriptor fd) { @@ -119,7 +127,9 @@ void IPCGSource::removeFD(const FileDescriptor fd) guint IPCGSource::attach(GMainContext* context) { LOGD("Attaching to GMainContext"); - return g_source_attach(&mGSource, context); + guint ret = g_source_attach(&mGSource, context); + g_source_unref(&mGSource); + return ret; } gboolean IPCGSource::prepare(GSource* gSource, gint* timeout) @@ -148,6 +158,11 @@ gboolean IPCGSource::dispatch(GSource* gSource, GSourceFunc /*callback*/, gpointer /*userData*/) { + if (!gSource || g_source_is_destroyed(gSource)) { + // Remove the GSource from the GMainContext + return FALSE; + } + IPCGSource* source = reinterpret_cast(gSource); for (const FDInfo fdInfo : source->mFDInfos) { @@ -157,7 +172,7 @@ gboolean IPCGSource::dispatch(GSource* gSource, } } - return TRUE; // Don't remove the GSource from the GMainContext + return TRUE; } void IPCGSource::finalize(GSource* gSource) diff --git a/common/ipc/ipc-gsource.hpp b/common/ipc/ipc-gsource.hpp index 96e0a1a..bb9a096 100644 --- a/common/ipc/ipc-gsource.hpp +++ b/common/ipc/ipc-gsource.hpp @@ -30,7 +30,7 @@ #include "ipc/service.hpp" #include "ipc/types.hpp" -#include "utils/callback-wrapper.hpp" + #include @@ -43,10 +43,17 @@ namespace ipc { * * It's supposed to be constructed ONLY with the static create method * and destructed in a glib callback. + * + * TODO: + * - waiting till the managed object (Client or Service) is destroyed + * before IPCGSource stops operating. For now programmer has to ensure this. */ struct IPCGSource { public: typedef std::function HandlerCallback; + typedef std::shared_ptr Pointer; + + ~IPCGSource(); IPCGSource() = delete; IPCGSource(const IPCGSource&) = delete; @@ -83,8 +90,8 @@ public: * * @return pointer to the IPCGSource */ - static IPCGSource* create(const std::vector& fds, - const HandlerCallback& handlerCallback); + static Pointer create(const std::vector& fds, + const HandlerCallback& handlerCallback); private: @@ -116,9 +123,6 @@ private: IPCGSource(const std::vector fds, const HandlerCallback& handlerCallback); - // Called only from IPCGSource::finalize - ~IPCGSource(); - struct FDInfo { FDInfo(gpointer tag, FileDescriptor fd) : tag(tag), fd(fd) {} diff --git a/common/ipc/service.cpp b/common/ipc/service.cpp index be95cee..5e720d6 100644 --- a/common/ipc/service.cpp +++ b/common/ipc/service.cpp @@ -91,15 +91,15 @@ std::vector Service::getFDs() void Service::handle(const FileDescriptor fd, const short pollEvent) { - if (fd == mProcessor.getEventFD() && pollEvent & POLLIN) { + if (fd == mProcessor.getEventFD() && (pollEvent & POLLIN)) { mProcessor.handleEvent(); return; - } else if (fd == mAcceptor.getConnectionFD() && pollEvent & POLLIN) { + } else if (fd == mAcceptor.getConnectionFD() && (pollEvent & POLLIN)) { mAcceptor.handleConnection(); return; - } else if (fd == mAcceptor.getEventFD() && pollEvent & POLLIN) { + } else if (fd == mAcceptor.getEventFD() && (pollEvent & POLLIN)) { mAcceptor.handleEvent(); return; diff --git a/common/utils/latch.hpp b/common/utils/latch.hpp index 1fa773b..7ef1dd7 100644 --- a/common/utils/latch.hpp +++ b/common/utils/latch.hpp @@ -54,7 +54,7 @@ public: void wait(); /** - * Waits for a single occurence of event with timeout. + * Waits for a single occurrence of event with timeout. * * @param timeoutMs timeout in ms to wait for * @return false on timeout @@ -64,14 +64,14 @@ public: /** * Waits for @ref n occurrences of event. * - * @param n number of occurences to wait for + * @param n number of occurrences to wait for */ void waitForN(const unsigned int n); /** - * Waits for @ref n occurences of event with timeout. + * Waits for @ref n occurrences of event with timeout. * - * @param n number of occurences to wait for + * @param n number of occurrences to wait for * @param timeoutMs timeout in ms to wait for * @return false on timeout */ diff --git a/common/utils/signal.cpp b/common/utils/signal.cpp new file mode 100644 index 0000000..39e7fca --- /dev/null +++ b/common/utils/signal.cpp @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Jan Olszak + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Signal related functions + */ + +#include "utils/signal.hpp" +#include "utils/exception.hpp" +#include "logger/logger.hpp" + +#include +#include +#include +#include + +namespace vasum { +namespace utils { + +void signalBlock(const int signalToBlock) +{ + ::sigset_t set; + if (-1 == ::sigemptyset(&set)) { + LOGE("Error in sigemptyset: " << std::string(strerror(errno))); + UtilsException("Error in sigemptyset: " + std::string(strerror(errno))); + } + + if (-1 ==::sigaddset(&set, signalToBlock)) { + LOGE("Error in sigaddset: " << std::string(strerror(errno))); + UtilsException("Error in sigaddset: " + std::string(strerror(errno))); + } + + int ret = ::pthread_sigmask(SIG_BLOCK, &set, nullptr /*&oldSet*/); + if (ret != 0) { + LOGE("Error in pthread_sigmask: " << std::to_string(ret)); + UtilsException("Error in pthread_sigmask: " + std::to_string(ret)); + } +} + +} // namespace utils +} // namespace vasum + + + + + diff --git a/common/utils/signal.hpp b/common/utils/signal.hpp new file mode 100644 index 0000000..f26e365 --- /dev/null +++ b/common/utils/signal.hpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved + * + * Contact: Jan Olszak + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Signal related functions + */ + +#ifndef COMMON_UTILS_SIGNAL_HPP +#define COMMON_UTILS_SIGNAL_HPP + +namespace vasum { +namespace utils { + +void signalBlock(const int signalsToBlock); + +} // namespace utils +} // namespace vasum + + +#endif // COMMON_UTILS_SIGNAL_HPP diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index adfef3e..79ced75 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -36,8 +36,13 @@ PKG_CHECK_MODULES(SERVER_DEPS REQUIRED lxc json gio-2.0 libsystemd-journal libsy INCLUDE_DIRECTORIES(${COMMON_FOLDER}) INCLUDE_DIRECTORIES(${CLIENT_FOLDER}) INCLUDE_DIRECTORIES(SYSTEM ${SERVER_DEPS_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}) -TARGET_LINK_LIBRARIES(${SERVER_CODENAME} ${SERVER_DEPS_LIBRARIES} ${Boost_LIBRARIES}) +SET_TARGET_PROPERTIES(${SERVER_CODENAME} PROPERTIES + COMPILE_FLAGS "-pthread" + LINK_FLAGS "-pthread" +) + +TARGET_LINK_LIBRARIES(${SERVER_CODENAME} ${SERVER_DEPS_LIBRARIES} ${Boost_LIBRARIES}) ## Subdirectories ############################################################## ADD_SUBDIRECTORY(configs) diff --git a/tests/unit_tests/CMakeLists.txt b/tests/unit_tests/CMakeLists.txt index c0cc2a8..8b66b09 100644 --- a/tests/unit_tests/CMakeLists.txt +++ b/tests/unit_tests/CMakeLists.txt @@ -40,6 +40,12 @@ PKG_CHECK_MODULES(UT_SERVER_DEPS REQUIRED lxc json gio-2.0 libsystemd-daemon libsystemd-journal libcap-ng libLogger libSimpleDbus libConfig) INCLUDE_DIRECTORIES(${COMMON_FOLDER} ${SERVER_FOLDER} ${UNIT_TESTS_FOLDER} ${CLIENT_FOLDER}) INCLUDE_DIRECTORIES(SYSTEM ${UT_SERVER_DEPS_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}) + +SET_TARGET_PROPERTIES(${UT_SERVER_CODENAME} PROPERTIES + COMPILE_FLAGS "-pthread" + LINK_FLAGS "-pthread" +) + TARGET_LINK_LIBRARIES(${UT_SERVER_CODENAME} ${UT_SERVER_DEPS_LIBRARIES} ${Boost_LIBRARIES}) diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp index 9ce131d..a7f8645 100644 --- a/tests/unit_tests/ipc/ut-ipc.cpp +++ b/tests/unit_tests/ipc/ut-ipc.cpp @@ -45,6 +45,7 @@ #include #include #include +#include #include using namespace vasum; @@ -136,7 +137,7 @@ std::shared_ptr longEchoCallback(const FileDescriptor, std::shared_ptr return data; } -FileDescriptor connect(Service& s, Client& c, bool serviceUsesGlib = false) +FileDescriptor connect(Service& s, Client& c) { // Connects the Client to the Service and returns Clients FileDescriptor std::mutex mutex; @@ -149,41 +150,102 @@ FileDescriptor connect(Service& s, Client& c, bool serviceUsesGlib = false) cv.notify_all(); }; + // TODO: On timeout remove the callback + s.setNewPeerCallback(newPeerCallback); + + if (!s.isStarted()) { + s.start(); + } + + c.start(); + + + std::unique_lock lock(mutex); + BOOST_REQUIRE(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() { + return peerFD != 0; + })); + + return peerFD; +} + - if (!serviceUsesGlib) { - s.setNewPeerCallback(newPeerCallback); - if (!s.isStarted()) { - s.start(); - } - } else { #if GLIB_CHECK_VERSION(2,36,0) - IPCGSource* serviceGSourcePtr = IPCGSource::create(s.getFDs(), std::bind(&Service::handle, &s, _1, _2)); +std::pair connectServiceGSource(Service& s, Client& c) +{ + std::mutex mutex; + std::condition_variable cv; - auto agregateCallback = [&newPeerCallback, &serviceGSourcePtr](const FileDescriptor newFD) { - serviceGSourcePtr->addFD(newFD); - newPeerCallback(newFD); - }; + FileDescriptor peerFD = 0; + IPCGSource::Pointer ipcGSourcePtr = IPCGSource::create(s.getFDs(), std::bind(&Service::handle, &s, _1, _2)); - s.setNewPeerCallback(agregateCallback); - s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, serviceGSourcePtr, _1)); + auto newPeerCallback = [&cv, &peerFD, &mutex, ipcGSourcePtr](const FileDescriptor newFD) { + if (ipcGSourcePtr) { + //TODO: Remove this if + ipcGSourcePtr->addFD(newFD); + } + std::unique_lock lock(mutex); + peerFD = newFD; + cv.notify_all(); + }; - serviceGSourcePtr->attach(); -#endif // GLIB_CHECK_VERSION - } + // TODO: On timeout remove the callback + s.setNewPeerCallback(newPeerCallback); + s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, ipcGSourcePtr, _1)); + + // Service starts to process + ipcGSourcePtr->attach(); c.start(); std::unique_lock lock(mutex); - BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() { + BOOST_REQUIRE(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() { return peerFD != 0; })); - return peerFD; + return std::make_pair(peerFD, ipcGSourcePtr); } +std::pair connectClientGSource(Service& s, Client& c) +{ + // Connects the Client to the Service and returns Clients FileDescriptor + std::mutex mutex; + std::condition_variable cv; + + FileDescriptor peerFD = 0; + auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFD) { + std::unique_lock lock(mutex); + peerFD = newFD; + cv.notify_all(); + }; + // TODO: On timeout remove the callback + s.setNewPeerCallback(newPeerCallback); + + if (!s.isStarted()) { + // Service starts to process + s.start(); + } + + + c.connect(); + IPCGSource::Pointer ipcGSourcePtr = IPCGSource::create(c.getFDs(), + std::bind(&Client::handle, &c, _1, _2)); + + ipcGSourcePtr->attach(); + + std::unique_lock lock(mutex); + BOOST_REQUIRE(cv.wait_for(lock, std::chrono::milliseconds(2000), [&peerFD]() { + return peerFD != 0; + })); + + return std::make_pair(peerFD, ipcGSourcePtr); +} + +#endif // GLIB_CHECK_VERSION + + void testEcho(Client& c, const MethodID methodID) { std::shared_ptr sentData(new SendData(34)); @@ -194,7 +256,7 @@ void testEcho(Client& c, const MethodID methodID) void testEcho(Service& s, const MethodID methodID, const FileDescriptor peerFD) { std::shared_ptr sentData(new SendData(56)); - std::shared_ptr recvData = s.callSync(methodID, peerFD, sentData); + std::shared_ptr recvData = s.callSync(methodID, peerFD, sentData, 1000); BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal); } @@ -481,17 +543,16 @@ BOOST_AUTO_TEST_CASE(ReadTimeout) { Service s(socketPath); auto longEchoCallback = [](const FileDescriptor, std::shared_ptr& data) { - return std::shared_ptr(new LongSendData(data->intVal)); + return std::shared_ptr(new LongSendData(data->intVal, 4000 /*ms*/)); }; s.addMethodHandler(1, longEchoCallback); - s.start(); Client c(socketPath); - c.start(); + connect(s, c); // Test timeout on read std::shared_ptr sentData(new SendData(334)); - BOOST_CHECK_THROW((c.callSync(1, sentData, 100)), IPCException); + BOOST_CHECK_THROW((c.callSync(1, sentData, 10)), IPCException); } @@ -587,12 +648,15 @@ BOOST_AUTO_TEST_CASE(ServiceGSource) isSignalCalled = true; }; + IPCGSource::Pointer serviceGSource; Service s(socketPath); s.addMethodHandler(1, echoCallback); Client c(socketPath); s.addSignalHandler(2, signalHandler); - connect(s, c, true); + + auto ret = connectServiceGSource(s, c); + serviceGSource = ret.second; testEcho(c, 1); @@ -603,6 +667,36 @@ BOOST_AUTO_TEST_CASE(ServiceGSource) BOOST_CHECK(isSignalCalled); } +BOOST_AUTO_TEST_CASE(ClientGSource) +{ + ScopedGlibLoop loop; + + std::atomic_bool isSignalCalled(false); + auto signalHandler = [&isSignalCalled](const FileDescriptor, std::shared_ptr&) { + isSignalCalled = true; + }; + + Service s(socketPath); + s.start(); + + IPCGSource::Pointer clientGSource; + Client c(socketPath); + c.addMethodHandler(1, echoCallback); + c.addSignalHandler(2, signalHandler); + + auto ret = connectClientGSource(s, c); + FileDescriptor peerFD = ret.first; + clientGSource = ret.second; + + testEcho(s, 1, peerFD); + + auto data = std::make_shared(1); + s.signal(2, data); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); //TODO wait_for + BOOST_CHECK(isSignalCalled); +} + #endif // GLIB_CHECK_VERSION // BOOST_AUTO_TEST_CASE(ConnectionLimitTest) diff --git a/zone-daemon/CMakeLists.txt b/zone-daemon/CMakeLists.txt index 50bfa1a..7baf37f 100644 --- a/zone-daemon/CMakeLists.txt +++ b/zone-daemon/CMakeLists.txt @@ -37,6 +37,12 @@ PKG_CHECK_MODULES(ZONE_DAEMON_DEPS REQUIRED gio-2.0 libsystemd-journal libcap-ng libLogger libSimpleDbus libConfig) INCLUDE_DIRECTORIES(${COMMON_FOLDER}) INCLUDE_DIRECTORIES(SYSTEM ${ZONE_DAEMON_DEPS_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}) + +SET_TARGET_PROPERTIES(${ZONE_DAEMON_CODENAME} PROPERTIES + COMPILE_FLAGS "-pthread" + LINK_FLAGS "-pthread" +) + TARGET_LINK_LIBRARIES(${ZONE_DAEMON_CODENAME} ${ZONE_DAEMON_DEPS_LIBRARIES} ${Boost_LIBRARIES}) -- 2.7.4