From 8fe7189dc8d8789e7abd8f80fec3b0fe34a4c069 Mon Sep 17 00:00:00 2001 From: Jan Olszak Date: Mon, 26 Jan 2015 14:33:01 +0100 Subject: [PATCH] IPC: Remote error handling [Bug/Feature] Passing errors to callbacks [Cause] N/A [Solution] N/A [Verification] Build, install, run tests, run tests under valgrind Change-Id: Icbe4df6671144fd34a3bf8b43c4360c2242a6d3e --- common/ipc/client.hpp | 1 + common/ipc/exception.hpp | 48 +++++++++-- common/ipc/internals/method-request.hpp | 12 +-- common/ipc/internals/processor.cpp | 115 ++++++++++++++++++--------- common/ipc/internals/processor.hpp | 136 +++++++++++++++++++++----------- common/ipc/internals/request-queue.hpp | 28 +++++-- common/ipc/internals/result-builder.hpp | 78 ++++++++++++++++++ common/ipc/result.hpp | 74 +++++++++++++++++ common/ipc/service.hpp | 1 + common/ipc/types.cpp | 33 -------- common/ipc/types.hpp | 20 ----- tests/unit_tests/ipc/ut-ipc.cpp | 49 +++++++++--- 12 files changed, 428 insertions(+), 167 deletions(-) create mode 100644 common/ipc/internals/result-builder.hpp create mode 100644 common/ipc/result.hpp diff --git a/common/ipc/client.hpp b/common/ipc/client.hpp index eedf81b..1ee44bb 100644 --- a/common/ipc/client.hpp +++ b/common/ipc/client.hpp @@ -28,6 +28,7 @@ #include "ipc/internals/processor.hpp" #include "ipc/ipc-gsource.hpp" #include "ipc/types.hpp" +#include "ipc/result.hpp" #include "logger/logger.hpp" #include diff --git a/common/ipc/exception.hpp b/common/ipc/exception.hpp index 794cf21..8e9a7df 100644 --- a/common/ipc/exception.hpp +++ b/common/ipc/exception.hpp @@ -29,35 +29,67 @@ #include "base-exception.hpp" namespace vasum { - +namespace ipc { /** * Base class for exceptions in IPC */ struct IPCException: public VasumException { - IPCException(const std::string& error) : VasumException(error) {} + IPCException(const std::string& message) + : VasumException(message) {} }; struct IPCParsingException: public IPCException { - IPCParsingException(const std::string& error) : IPCException(error) {} + IPCParsingException(const std::string& message = "Exception during reading/parsing data from the socket") + : IPCException(message) {} }; struct IPCSerializationException: public IPCException { - IPCSerializationException(const std::string& error) : IPCException(error) {} + IPCSerializationException(const std::string& message = "Exception during writing/serializing data to the socket") + : IPCException(message) {} }; struct IPCPeerDisconnectedException: public IPCException { - IPCPeerDisconnectedException(const std::string& error) : IPCException(error) {} + IPCPeerDisconnectedException(const std::string& message = "No such peer. Might got disconnected.") + : IPCException(message) {} }; struct IPCNaughtyPeerException: public IPCException { - IPCNaughtyPeerException(const std::string& error) : IPCException(error) {} + IPCNaughtyPeerException(const std::string& message = "Peer performed a forbidden action.") + : IPCException(message) {} +}; + +struct IPCRemovedPeerException: public IPCException { + IPCRemovedPeerException(const std::string& message = "Removing peer") + : IPCException(message) {} +}; + +struct IPCClosingException: public IPCException { + IPCClosingException(const std::string& message = "Closing IPC") + : IPCException(message) {} }; struct IPCTimeoutException: public IPCException { - IPCTimeoutException(const std::string& error) : IPCException(error) {} + IPCTimeoutException(const std::string& message) + : IPCException(message) {} +}; + +struct IPCUserException: public IPCException { + IPCUserException(const int code, const std::string& message) + : IPCException(message), + mCode(code) + {} + + int getCode() const + { + return mCode; + } + +private: + int mCode; }; -} +} // namespace ipc +} // namespace vasum #endif // COMMON_IPC_EXCEPTION_HPP diff --git a/common/ipc/internals/method-request.hpp b/common/ipc/internals/method-request.hpp index 36d3d7a..8ed17c5 100644 --- a/common/ipc/internals/method-request.hpp +++ b/common/ipc/internals/method-request.hpp @@ -25,9 +25,12 @@ #ifndef COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP #define COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP +#include "ipc/internals/result-builder.hpp" #include "ipc/types.hpp" +#include "ipc/result.hpp" #include "logger/logger-scope.hpp" #include "config/manager.hpp" +#include namespace vasum { namespace ipc { @@ -49,7 +52,7 @@ public: std::shared_ptr data; SerializeCallback serialize; ParseCallback parse; - ResultHandler::type process; + ResultBuilderHandler process; private: MethodRequest(const MethodID methodID, const FileDescriptor peerFD) @@ -82,10 +85,9 @@ std::shared_ptr MethodRequest::create(const MethodID methodID, return data; }; - request->process = [process](Status status, std::shared_ptr& data)->void { - LOGS("Method process, status: " << toString(status)); - std::shared_ptr tmpData = std::static_pointer_cast(data); - return process(status, tmpData); + request->process = [process](ResultBuilder & resultBuilder) { + LOGS("Method process"); + process(resultBuilder.build()); }; return request; diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index bdc8a8d..58beca8 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -51,6 +51,7 @@ namespace ipc { const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits::max(); const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits::max() - 1; +const MethodID Processor::ERROR_METHOD_ID = std::numeric_limits::max() - 2; Processor::Processor(const std::string& logName, const PeerCallback& newPeerCallback, @@ -66,8 +67,10 @@ Processor::Processor(const std::string& logName, utils::signalBlock(SIGPIPE); using namespace std::placeholders; - setMethodHandlerInternal(REGISTER_SIGNAL_METHOD_ID, - std::bind(&Processor::onNewSignals, this, _1, _2)); + setSignalHandlerInternal(REGISTER_SIGNAL_METHOD_ID, + std::bind(&Processor::onNewSignals, this, _1, _2)); + + setSignalHandlerInternal(ERROR_METHOD_ID, std::bind(&Processor::onErrorSignal, this, _1, _2)); } Processor::~Processor() @@ -110,7 +113,7 @@ void Processor::stop() { Lock lock(mStateMutex); auto request = std::make_shared(conditionPtr); - mRequestQueue.push(Event::FINISH, request); + mRequestQueue.pushBack(Event::FINISH, request); } LOGD(mLogPrefix + "Waiting for the Processor to stop"); @@ -158,7 +161,7 @@ FileDescriptor Processor::addPeer(const std::shared_ptr& socketPtr) FileDescriptor peerFD = socketPtr->getFD(); auto request = std::make_shared(peerFD, socketPtr); - mRequestQueue.push(Event::ADD_PEER, request); + mRequestQueue.pushBack(Event::ADD_PEER, request); LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD); @@ -182,7 +185,7 @@ void Processor::removePeer(const FileDescriptor peerFD) { Lock lock(mStateMutex); auto request = std::make_shared(peerFD, conditionPtr); - mRequestQueue.push(Event::REMOVE_PEER, request); + mRequestQueue.pushBack(Event::REMOVE_PEER, request); } auto isPeerDeleted = [&peerFD, this]()->bool { @@ -193,7 +196,7 @@ void Processor::removePeer(const FileDescriptor peerFD) conditionPtr->wait(lock, isPeerDeleted); } -void Processor::removePeerInternal(const FileDescriptor peerFD, Status status) +void Processor::removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr) { LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD); LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD); @@ -214,10 +217,10 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, Status status) } // Erase associated return value callbacks - std::shared_ptr data; for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) { if (it->second.peerFD == peerFD) { - IGNORE_EXCEPTIONS(it->second.process(status, data)); + ResultBuilder resultBuilder(exceptionPtr); + IGNORE_EXCEPTIONS(it->second.process(resultBuilder)); it = mReturnCallbacks.erase(it); } else { ++it; @@ -312,7 +315,8 @@ bool Processor::handleLostConnections() if (mFDs[i].revents & POLLHUP) { LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd); mFDs[i].revents &= ~(POLLHUP); - removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED); + removePeerInternal(mFDs[i].fd, + std::make_exception_ptr(IPCPeerDisconnectedException())); isPeerRemoved = true; } } @@ -324,7 +328,8 @@ bool Processor::handleLostConnections() bool Processor::handleLostConnection(const FileDescriptor peerFD) { Lock lock(mStateMutex); - removePeerInternal(peerFD, Status::PEER_DISCONNECTED); + removePeerInternal(peerFD, + std::make_exception_ptr(IPCPeerDisconnectedException())); return true; } @@ -367,7 +372,8 @@ bool Processor::handleInput(const FileDescriptor peerFD) } catch (const IPCException& e) { LOGE(mLogPrefix + "Error during reading the socket"); - removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER); + removePeerInternal(socketPtr->getFD(), + std::make_exception_ptr(IPCNaughtyPeerException())); return true; } @@ -388,23 +394,33 @@ bool Processor::handleInput(const FileDescriptor peerFD) } else { // Nothing LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID); - removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER); + removePeerInternal(socketPtr->getFD(), + std::make_exception_ptr(IPCNaughtyPeerException())); return true; } } } } -std::shared_ptr Processor::onNewSignals(const FileDescriptor peerFD, - std::shared_ptr& data) +void Processor::onNewSignals(const FileDescriptor peerFD, + std::shared_ptr& data) { LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD); for (const MethodID methodID : data->ids) { mSignalsPeers[methodID].push_back(peerFD); } +} - return std::make_shared(); +void Processor::onErrorSignal(const FileDescriptor, std::shared_ptr& data) +{ + LOGS(mLogPrefix + "Processor onErrorSignal messageID: " << data->messageID); + + ReturnCallbacks returnCallbacks = std::move(mReturnCallbacks.at(data->messageID)); + mReturnCallbacks.erase(data->messageID); + + ResultBuilder resultBuilder(std::make_exception_ptr(IPCUserException(data->code, data->message))); + IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder)); } bool Processor::onReturnValue(const Socket& socket, @@ -420,7 +436,8 @@ bool Processor::onReturnValue(const Socket& socket, mReturnCallbacks.erase(messageID); } catch (const std::out_of_range&) { LOGW(mLogPrefix + "No return callback for messageID: " << messageID); - removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER); + removePeerInternal(socket.getFD(), + std::make_exception_ptr(IPCNaughtyPeerException())); return true; } @@ -430,13 +447,16 @@ bool Processor::onReturnValue(const Socket& socket, data = returnCallbacks.parse(socket.getFD()); } catch (const std::exception& e) { LOGE(mLogPrefix + "Exception during parsing: " << e.what()); - IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data)); - removePeerInternal(socket.getFD(), Status::PARSING_ERROR); + ResultBuilder resultBuilder(std::make_exception_ptr(IPCParsingException())); + IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder)); + removePeerInternal(socket.getFD(), + std::make_exception_ptr(IPCParsingException())); return true; } // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID); - IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data)); + ResultBuilder resultBuilder(data); + IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder)); // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed"); return false; @@ -457,16 +477,22 @@ bool Processor::onRemoteSignal(const Socket& socket, data = signalCallbacks->parse(socket.getFD()); } catch (const std::exception& e) { LOGE(mLogPrefix + "Exception during parsing: " << e.what()); - removePeerInternal(socket.getFD(), Status::PARSING_ERROR); + removePeerInternal(socket.getFD(), + std::make_exception_ptr(IPCParsingException())); return true; } // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID); try { signalCallbacks->signal(socket.getFD(), data); + } catch (const IPCUserException& e) { + LOGW("Discarded user's exception"); + return false; } catch (const std::exception& e) { LOGE(mLogPrefix + "Exception in method handler: " << e.what()); - removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER); + removePeerInternal(socket.getFD(), + std::make_exception_ptr(IPCNaughtyPeerException())); + return true; } @@ -487,7 +513,8 @@ bool Processor::onRemoteCall(const Socket& socket, data = methodCallbacks->parse(socket.getFD()); } catch (const std::exception& e) { LOGE(mLogPrefix + "Exception during parsing: " << e.what()); - removePeerInternal(socket.getFD(), Status::PARSING_ERROR); + removePeerInternal(socket.getFD(), + std::make_exception_ptr(IPCParsingException())); return true; } @@ -495,9 +522,15 @@ bool Processor::onRemoteCall(const Socket& socket, std::shared_ptr returnData; try { returnData = methodCallbacks->method(socket.getFD(), data); + } catch (const IPCUserException& e) { + LOGW("User's exception"); + auto data = std::make_shared(messageID, e.getCode(), e.what()); + signalInternal(ERROR_METHOD_ID, socket.getFD(), data); + return false; } catch (const std::exception& e) { LOGE(mLogPrefix + "Exception in method handler: " << e.what()); - removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER); + removePeerInternal(socket.getFD(), + std::make_exception_ptr(IPCNaughtyPeerException())); return true; } @@ -510,7 +543,9 @@ bool Processor::onRemoteCall(const Socket& socket, methodCallbacks->serialize(socket.getFD(), returnData); } catch (const std::exception& e) { LOGE(mLogPrefix + "Exception during serialization: " << e.what()); - removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR); + removePeerInternal(socket.getFD(), + std::make_exception_ptr(IPCSerializationException())); + return true; } @@ -549,7 +584,8 @@ bool Processor::onMethodRequest(MethodRequest& request) LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD); // Pass the error to the processing callback - IGNORE_EXCEPTIONS(request.process(Status::PEER_DISCONNECTED, request.data)); + ResultBuilder resultBuilder(std::make_exception_ptr(IPCPeerDisconnectedException())); + IGNORE_EXCEPTIONS(request.process(resultBuilder)); return false; } @@ -571,12 +607,15 @@ bool Processor::onMethodRequest(MethodRequest& request) } catch (const std::exception& e) { LOGE(mLogPrefix + "Error during sending a method: " << e.what()); - // Inform about the error, - IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(Status::SERIALIZATION_ERROR, request.data)); + // Inform about the error + ResultBuilder resultBuilder(std::make_exception_ptr(IPCSerializationException())); + IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(resultBuilder)); mReturnCallbacks.erase(request.messageID); - removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR); + removePeerInternal(request.peerFD, + std::make_exception_ptr(IPCSerializationException())); + return true; @@ -607,7 +646,9 @@ bool Processor::onSignalRequest(SignalRequest& request) } catch (const std::exception& e) { LOGE(mLogPrefix + "Error during sending a signal: " << e.what()); - removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR); + removePeerInternal(request.peerFD, + std::make_exception_ptr(IPCSerializationException())); + return true; } @@ -635,11 +676,10 @@ bool Processor::onAddPeerRequest(AddPeerRequest& request) for (const auto kv : mSignalsCallbacks) { ids.push_back(kv.first); } - auto data = std::make_shared(ids); - callAsync(REGISTER_SIGNAL_METHOD_ID, - request.peerFD, - data, - discardResultHandler); + auto data = std::make_shared(ids); + signalInternal(REGISTER_SIGNAL_METHOD_ID, + request.peerFD, + data); resetPolling(); @@ -658,7 +698,9 @@ bool Processor::onRemovePeerRequest(RemovePeerRequest& request) { LOGS(mLogPrefix + "Processor onRemovePeer"); - removePeerInternal(request.peerFD, Status::REMOVED_PEER); + removePeerInternal(request.peerFD, + std::make_exception_ptr(IPCRemovedPeerException())); + request.conditionPtr->notify_all(); return true; @@ -676,7 +718,8 @@ bool Processor::onFinishRequest(FinishRequest& request) switch (request.requestID) { case Event::METHOD: { auto requestPtr = request.get(); - IGNORE_EXCEPTIONS(requestPtr->process(Status::CLOSING, requestPtr->data)); + ResultBuilder resultBuilder(std::make_exception_ptr(IPCClosingException())); + IGNORE_EXCEPTIONS(requestPtr->process(resultBuilder)); break; } case Event::REMOVE_PEER: { diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp index c3bcc6b..b6bd615 100644 --- a/common/ipc/internals/processor.hpp +++ b/common/ipc/internals/processor.hpp @@ -25,6 +25,7 @@ #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP #define COMMON_IPC_INTERNALS_PROCESSOR_HPP +#include "ipc/internals/result-builder.hpp" #include "ipc/internals/socket.hpp" #include "ipc/internals/request-queue.hpp" #include "ipc/internals/method-request.hpp" @@ -55,7 +56,6 @@ namespace vasum { namespace ipc { const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500; -const unsigned int DEFAULT_METHOD_TIMEOUT = 1000; /** * This class wraps communication via UX sockets @@ -84,6 +84,7 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000; * - no new events added after stop() called * - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but * there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :) +* - remove recursive mutex * */ class Processor { @@ -97,7 +98,6 @@ private: }; public: - friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event); /** @@ -111,6 +111,11 @@ public: static const MethodID REGISTER_SIGNAL_METHOD_ID; /** + * Error return message + */ + static const MethodID ERROR_METHOD_ID; + + /** * Constructs the Processor, but doesn't start it. * The object is ready to add methods. * @@ -304,9 +309,9 @@ private: CONFIG_REGISTER_EMPTY }; - struct RegisterSignalsMessage { - RegisterSignalsMessage() = default; - RegisterSignalsMessage(const std::vector ids) + struct RegisterSignalsProtocolMessage { + RegisterSignalsProtocolMessage() = default; + RegisterSignalsProtocolMessage(const std::vector ids) : ids(ids) {} std::vector ids; @@ -317,6 +322,23 @@ private: ) }; + struct ErrorProtocolMessage { + ErrorProtocolMessage() = default; + ErrorProtocolMessage(const MessageID messageID, const int code, const std::string& message) + : messageID(messageID), code(code), message(message) {} + + MessageID messageID; + int code; + std::string message; + + CONFIG_REGISTER + ( + messageID, + code, + message + ) + }; + struct MethodHandlers { MethodHandlers(const MethodHandlers& other) = delete; MethodHandlers& operator=(const MethodHandlers&) = delete; @@ -347,12 +369,12 @@ private: ReturnCallbacks(ReturnCallbacks&&) = default; ReturnCallbacks& operator=(ReturnCallbacks &&) = default; - ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler::type& process) + ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultBuilderHandler& process) : peerFD(peerFD), parse(parse), process(process) {} FileDescriptor peerFD; ParseCallback parse; - ResultHandler::type process; + ResultBuilderHandler process; }; std::string mLogPrefix; @@ -386,7 +408,13 @@ private: const typename MethodHandler::type& process); template - static void discardResultHandler(Status, std::shared_ptr&) {} + void setSignalHandlerInternal(const MethodID methodID, + const typename SignalHandler::type& handler); + + template + void signalInternal(const MethodID methodID, + const FileDescriptor peerFD, + const std::shared_ptr& data); void run(); @@ -412,10 +440,13 @@ private: std::shared_ptr signalCallbacks); void resetPolling(); FileDescriptor getNextFileDescriptor(); - void removePeerInternal(const FileDescriptor peerFD, Status status); + void removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr); + + void onNewSignals(const FileDescriptor peerFD, + std::shared_ptr& data); - std::shared_ptr onNewSignals(const FileDescriptor peerFD, - std::shared_ptr& data); + void onErrorSignal(const FileDescriptor peerFD, + std::shared_ptr& data); }; @@ -441,10 +472,7 @@ void Processor::setMethodHandlerInternal(const MethodID methodID, return method(peerFD, tmpData); }; - { - Lock lock(mStateMutex); - mMethodsCallbacks[methodID] = std::make_shared(std::move(methodCall)); - } + mMethodsCallbacks[methodID] = std::make_shared(std::move(methodCall)); } template @@ -464,12 +492,33 @@ void Processor::setMethodHandler(const MethodID methodID, throw IPCException("MethodID used by a signal: " + std::to_string(methodID)); } - setMethodHandlerInternal(methodID, method); + setMethodHandlerInternal(methodID, method); } } template +void Processor::setSignalHandlerInternal(const MethodID methodID, + const typename SignalHandler::type& handler) +{ + SignalHandlers signalCall; + + signalCall.parse = [](const int fd)->std::shared_ptr { + std::shared_ptr dataToFill(new ReceivedDataType()); + config::loadFromFD(fd, *dataToFill); + return dataToFill; + }; + + signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr& dataReceived) { + std::shared_ptr tmpData = std::static_pointer_cast(dataReceived); + handler(peerFD, tmpData); + }; + + mSignalsCallbacks[methodID] = std::make_shared(std::move(signalCall)); +} + + +template void Processor::setSignalHandler(const MethodID methodID, const typename SignalHandler::type& handler) { @@ -478,8 +527,9 @@ void Processor::setSignalHandler(const MethodID methodID, throw IPCException("Forbidden methodID: " + std::to_string(methodID)); } - std::shared_ptr data; + std::shared_ptr data; std::vector peerFDs; + { Lock lock(mStateMutex); @@ -489,24 +539,11 @@ void Processor::setSignalHandler(const MethodID methodID, throw IPCException("MethodID used by a method: " + std::to_string(methodID)); } - SignalHandlers signalCall; - - signalCall.parse = [](const int fd)->std::shared_ptr { - std::shared_ptr data(new ReceivedDataType()); - config::loadFromFD(fd, *data); - return data; - }; - - signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr& data) { - std::shared_ptr tmpData = std::static_pointer_cast(data); - handler(peerFD, tmpData); - }; - - mSignalsCallbacks[methodID] = std::make_shared(std::move(signalCall)); + setSignalHandlerInternal(methodID, handler); // Broadcast the new signal: std::vector ids {methodID}; - data = std::make_shared(ids); + data = std::make_shared(ids); for (const auto kv : mSockets) { peerFDs.push_back(kv.first); @@ -514,10 +551,9 @@ void Processor::setSignalHandler(const MethodID methodID, } for (const auto peerFD : peerFDs) { - callSync(REGISTER_SIGNAL_METHOD_ID, - peerFD, - data, - DEFAULT_METHOD_TIMEOUT); + signalInternal(REGISTER_SIGNAL_METHOD_ID, + peerFD, + data); } } @@ -530,7 +566,7 @@ MessageID Processor::callAsync(const MethodID methodID, { Lock lock(mStateMutex); auto request = MethodRequest::create(methodID, peerFD, data, process); - mRequestQueue.push(Event::METHOD, request); + mRequestQueue.pushBack(Event::METHOD, request); return request->messageID; } @@ -541,16 +577,14 @@ std::shared_ptr Processor::callSync(const MethodID methodID, const std::shared_ptr& data, unsigned int timeoutMS) { - std::shared_ptr result; + Result result; std::mutex mutex; std::condition_variable cv; - Status returnStatus = ipc::Status::UNDEFINED; - auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr returnedData) { + auto process = [&result, &mutex, &cv](const Result && r) { std::unique_lock lock(mutex); - returnStatus = status; - result = returnedData; + result = std::move(r); cv.notify_all(); }; @@ -559,8 +593,8 @@ std::shared_ptr Processor::callSync(const MethodID methodID, data, process); - auto isResultInitialized = [&returnStatus]() { - return returnStatus != ipc::Status::UNDEFINED; + auto isResultInitialized = [&result]() { + return result.isValid(); }; std::unique_lock lock(mutex); @@ -594,9 +628,17 @@ std::shared_ptr Processor::callSync(const MethodID methodID, } } - throwOnError(returnStatus); + return result.get(); +} - return result; +template +void Processor::signalInternal(const MethodID methodID, + const FileDescriptor peerFD, + const std::shared_ptr& data) +{ + Lock lock(mStateMutex); + auto request = SignalRequest::create(methodID, peerFD, data); + mRequestQueue.pushFront(Event::SIGNAL, request); } template @@ -611,7 +653,7 @@ void Processor::signal(const MethodID methodID, } for (const FileDescriptor peerFD : it->second) { auto request = SignalRequest::create(methodID, peerFD, data); - mRequestQueue.push(Event::SIGNAL, request); + mRequestQueue.pushBack(Event::SIGNAL, request); } } diff --git a/common/ipc/internals/request-queue.hpp b/common/ipc/internals/request-queue.hpp index 82ba606..f648345 100644 --- a/common/ipc/internals/request-queue.hpp +++ b/common/ipc/internals/request-queue.hpp @@ -78,13 +78,22 @@ public: bool isEmpty() const; /** - * Push data to the queue + * Push data to back of the queue * * @param requestID request type * @param data data corresponding to the request */ - void push(const RequestIdType requestID, - const std::shared_ptr& data = nullptr); + void pushBack(const RequestIdType requestID, + const std::shared_ptr& data = nullptr); + + /** + * Push data to back of the queue + * + * @param requestID request type + * @param data data corresponding to the request + */ + void pushFront(const RequestIdType requestID, + const std::shared_ptr& data = nullptr); /** * @return get the data from the next request @@ -118,8 +127,8 @@ bool RequestQueue::isEmpty() const } template -void RequestQueue::push(const RequestIdType requestID, - const std::shared_ptr& data) +void RequestQueue::pushBack(const RequestIdType requestID, + const std::shared_ptr& data) { Request request(requestID, data); mRequests.push_back(std::move(request)); @@ -127,6 +136,15 @@ void RequestQueue::push(const RequestIdType requestID, } template +void RequestQueue::pushFront(const RequestIdType requestID, + const std::shared_ptr& data) +{ + Request request(requestID, data); + mRequests.push_front(std::move(request)); + mEventFD.send(); +} + +template typename RequestQueue::Request RequestQueue::pop() { mEventFD.receive(); diff --git a/common/ipc/internals/result-builder.hpp b/common/ipc/internals/result-builder.hpp new file mode 100644 index 0000000..3fe3c49 --- /dev/null +++ b/common/ipc/internals/result-builder.hpp @@ -0,0 +1,78 @@ +/* +* Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Class for storing result of a method - data or exception + */ + +#ifndef COMMON_IPC_RESULT_BUILDER_HPP +#define COMMON_IPC_RESULT_BUILDER_HPP + +#include "ipc/result.hpp" +#include +#include +#include + +namespace vasum { +namespace ipc { + +class ResultBuilder { +public: + ResultBuilder() + : mData(nullptr), + mExceptionPtr(nullptr) + {} + + ResultBuilder(const std::exception_ptr& exceptionPtr) + : mData(nullptr), + mExceptionPtr(exceptionPtr) + {} + + ResultBuilder(const std::shared_ptr& data) + : mData(data), + mExceptionPtr(nullptr) + + {} + + template + Result build() + { + return Result(std::move(std::static_pointer_cast(mData)), + std::move(mExceptionPtr)); + } + +private: + std::shared_ptr mData; + std::exception_ptr mExceptionPtr; +}; + +typedef std::function ResultBuilderHandler; + + +} // namespace ipc +} // namespace vasum + +#endif // COMMON_IPC_RESULT_BUILDER_HPP + + + + + + diff --git a/common/ipc/result.hpp b/common/ipc/result.hpp new file mode 100644 index 0000000..0edf172 --- /dev/null +++ b/common/ipc/result.hpp @@ -0,0 +1,74 @@ +/* +* Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Class for storing result of a method - data or exception + */ + +#ifndef COMMON_IPC_RESULT_HPP +#define COMMON_IPC_RESULT_HPP + +#include +#include +#include + +namespace vasum { +namespace ipc { + +template +class Result { +public: + Result() + : mData(nullptr), + mExceptionPtr(nullptr) + {} + + Result(std::shared_ptr&& data, std::exception_ptr&& exceptionPtr) + : mData(std::move(data)), + mExceptionPtr(std::move(exceptionPtr)) + {} + + std::shared_ptr get() const + { + if (mExceptionPtr) { + std::rethrow_exception(mExceptionPtr); + } + return mData; + } + + bool isValid() const + { + return (bool)mExceptionPtr || (bool)mData; + } + +private: + std::shared_ptr mData; + std::exception_ptr mExceptionPtr; +}; + +template +struct ResultHandler { + typedef std::function < void(Result&&) > type; +}; + +} // namespace ipc +} // namespace vasum + +#endif // COMMON_IPC_RESULT_HPP diff --git a/common/ipc/service.hpp b/common/ipc/service.hpp index 34b73fd..383c71d 100644 --- a/common/ipc/service.hpp +++ b/common/ipc/service.hpp @@ -29,6 +29,7 @@ #include "ipc/internals/acceptor.hpp" #include "ipc/ipc-gsource.hpp" #include "ipc/types.hpp" +#include "ipc/result.hpp" #include "logger/logger.hpp" #include diff --git a/common/ipc/types.cpp b/common/ipc/types.cpp index 5d7dab6..a73a612 100644 --- a/common/ipc/types.cpp +++ b/common/ipc/types.cpp @@ -41,40 +41,7 @@ MessageID getNextMessageID() return ++gLastMessageID; } -std::string toString(const Status status) -{ - switch (status) { - case Status::OK: return "No error, everything is OK"; - case Status::PARSING_ERROR: return "Exception during reading/parsing data from the socket"; - case Status::SERIALIZATION_ERROR: return "Exception during writing/serializing data to the socket"; - case Status::PEER_DISCONNECTED: return "No such peer. Might got disconnected."; - case Status::NAUGHTY_PEER: return "Peer performed a forbidden action."; - case Status::REMOVED_PEER: return "Removing peer"; - case Status::CLOSING: return "Closing IPC"; - case Status::UNDEFINED: return "Undefined state"; - default: return "Unknown status"; - } -} -void throwOnError(const Status status) -{ - if (status == Status::OK) { - return; - } - std::string message = toString(status); - LOGE(message); - - switch (status) { - case Status::PARSING_ERROR: throw IPCParsingException(message); - case Status::SERIALIZATION_ERROR: throw IPCSerializationException(message); - case Status::PEER_DISCONNECTED: throw IPCPeerDisconnectedException(message); - case Status::NAUGHTY_PEER: throw IPCNaughtyPeerException(message); - case Status::REMOVED_PEER: throw IPCException(message); - case Status::CLOSING: throw IPCException(message); - case Status::UNDEFINED: throw IPCException(message); - default: return throw IPCException(message); - } -} } // namespace ipc } // namespace vasum diff --git a/common/ipc/types.hpp b/common/ipc/types.hpp index 6186f65..b5411b3 100644 --- a/common/ipc/types.hpp +++ b/common/ipc/types.hpp @@ -42,22 +42,8 @@ typedef std::function PeerCallback; typedef std::function& data)> SerializeCallback; typedef std::function(int fd)> ParseCallback; -enum class Status : int { - OK = 0, - PARSING_ERROR, - SERIALIZATION_ERROR, - PEER_DISCONNECTED, - NAUGHTY_PEER, - REMOVED_PEER, - CLOSING, - UNDEFINED -}; - -std::string toString(const Status status); -void throwOnError(const Status status); MessageID getNextMessageID(); - template struct MethodHandler { typedef std::function(FileDescriptor peerFD, @@ -70,12 +56,6 @@ struct SignalHandler { std::shared_ptr& data)> type; }; -template -struct ResultHandler { - typedef std::function& resultData)> type; -}; - } // namespace ipc } // namespace vasum diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp index e5609f7..5408735 100644 --- a/tests/unit_tests/ipc/ut-ipc.cpp +++ b/tests/unit_tests/ipc/ut-ipc.cpp @@ -33,6 +33,7 @@ #include "ipc/service.hpp" #include "ipc/client.hpp" #include "ipc/types.hpp" +#include "ipc/result.hpp" #include "utils/glib-loop.hpp" #include "utils/latch.hpp" #include "utils/value-latch.hpp" @@ -353,10 +354,8 @@ BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho) c.start(); //Async call - auto dataBack = [&recvDataLatch](ipc::Status status, std::shared_ptr& data) { - if (status == ipc::Status::OK) { - recvDataLatch.set(data); - } + auto dataBack = [&recvDataLatch](Result && r) { + recvDataLatch.set(r.get()); }; c.callAsync(1, sentData, dataBack); @@ -376,10 +375,8 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho) FileDescriptor peerFD = connect(s, c); // Async call - auto dataBack = [&recvDataLatch](ipc::Status status, std::shared_ptr& data) { - if (status == ipc::Status::OK) { - recvDataLatch.set(data); - } + auto dataBack = [&recvDataLatch](Result && r) { + recvDataLatch.set(r.get()); }; s.callAsync(1, peerFD, sentData, dataBack); @@ -431,7 +428,7 @@ BOOST_AUTO_TEST_CASE(ParseError) BOOST_AUTO_TEST_CASE(DisconnectedPeerError) { - ValueLatch retStatusLatch; + ValueLatch> retStatusLatch; Service s(socketPath); auto method = [](const FileDescriptor, std::shared_ptr&) { @@ -445,20 +442,20 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError) Client c(socketPath); c.start(); - auto dataBack = [&retStatusLatch](ipc::Status status, std::shared_ptr&) { - retStatusLatch.set(status); + auto dataBack = [&retStatusLatch](Result && r) { + retStatusLatch.set(std::move(r)); }; std::shared_ptr sentData(new SendData(78)); c.callAsync(1, sentData, dataBack); // Wait for the response - ipc::Status retStatus = retStatusLatch.get(TIMEOUT); + Result result = retStatusLatch.get(TIMEOUT); // The disconnection might have happened: // - after sending the message (PEER_DISCONNECTED) // - during external serialization (SERIALIZATION_ERROR) - BOOST_CHECK(retStatus == ipc::Status::PEER_DISCONNECTED || retStatus == ipc::Status::SERIALIZATION_ERROR); + BOOST_CHECK_THROW(result.get(), IPCException); } @@ -627,6 +624,32 @@ BOOST_AUTO_TEST_CASE(ClientGSource) BOOST_CHECK(l.wait(TIMEOUT)); } +BOOST_AUTO_TEST_CASE(UsersError) +{ + const int TEST_ERROR_CODE = -234; + const std::string TEST_ERROR_MESSAGE = "Ay, caramba!"; + + Service s(socketPath); + Client c(socketPath); + auto clientID = connect(s, c); + + auto throwingMethodHandler = [&](const FileDescriptor, std::shared_ptr&) -> std::shared_ptr { + throw IPCUserException(TEST_ERROR_CODE, TEST_ERROR_MESSAGE); + }; + + s.setMethodHandler(1, throwingMethodHandler); + c.setMethodHandler(1, throwingMethodHandler); + + std::shared_ptr sentData(new SendData(78)); + + auto hasProperData = [&](const IPCUserException & e) { + return e.getCode() == TEST_ERROR_CODE && e.what() == TEST_ERROR_MESSAGE; + }; + + BOOST_CHECK_EXCEPTION((c.callSync(1, sentData, TIMEOUT)), IPCUserException, hasProperData); + BOOST_CHECK_EXCEPTION((s.callSync(1, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData); + +} // BOOST_AUTO_TEST_CASE(ConnectionLimitTest) // { // unsigned oldLimit = ipc::getMaxFDNumber(); -- 2.7.4