IPC: Remote error handling 63/34463/13
authorJan Olszak <j.olszak@samsung.com>
Mon, 26 Jan 2015 13:33:01 +0000 (14:33 +0100)
committerJan Olszak <j.olszak@samsung.com>
Fri, 30 Jan 2015 11:51:05 +0000 (12:51 +0100)
[Bug/Feature]  Passing errors to callbacks
[Cause]        N/A
[Solution]     N/A
[Verification] Build, install, run tests, run tests under valgrind

Change-Id: Icbe4df6671144fd34a3bf8b43c4360c2242a6d3e

12 files changed:
common/ipc/client.hpp
common/ipc/exception.hpp
common/ipc/internals/method-request.hpp
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp
common/ipc/internals/request-queue.hpp
common/ipc/internals/result-builder.hpp [new file with mode: 0644]
common/ipc/result.hpp [new file with mode: 0644]
common/ipc/service.hpp
common/ipc/types.cpp
common/ipc/types.hpp
tests/unit_tests/ipc/ut-ipc.cpp

index eedf81b..1ee44bb 100644 (file)
@@ -28,6 +28,7 @@
 #include "ipc/internals/processor.hpp"
 #include "ipc/ipc-gsource.hpp"
 #include "ipc/types.hpp"
+#include "ipc/result.hpp"
 #include "logger/logger.hpp"
 
 #include <string>
index 794cf21..8e9a7df 100644 (file)
 #include "base-exception.hpp"
 
 namespace vasum {
-
+namespace ipc {
 
 /**
  * Base class for exceptions in IPC
  */
 struct IPCException: public VasumException {
-    IPCException(const std::string& error) : VasumException(error) {}
+    IPCException(const std::string& message)
+        : VasumException(message) {}
 };
 
 struct IPCParsingException: public IPCException {
-    IPCParsingException(const std::string& error) : IPCException(error) {}
+    IPCParsingException(const std::string& message = "Exception during reading/parsing data from the socket")
+        : IPCException(message) {}
 };
 
 struct IPCSerializationException: public IPCException {
-    IPCSerializationException(const std::string& error) : IPCException(error) {}
+    IPCSerializationException(const std::string& message = "Exception during writing/serializing data to the socket")
+        : IPCException(message) {}
 };
 
 struct IPCPeerDisconnectedException: public IPCException {
-    IPCPeerDisconnectedException(const std::string& error) : IPCException(error) {}
+    IPCPeerDisconnectedException(const std::string& message = "No such peer. Might got disconnected.")
+        : IPCException(message) {}
 };
 
 struct IPCNaughtyPeerException: public IPCException {
-    IPCNaughtyPeerException(const std::string& error) : IPCException(error) {}
+    IPCNaughtyPeerException(const std::string& message = "Peer performed a forbidden action.")
+        : IPCException(message) {}
+};
+
+struct IPCRemovedPeerException: public IPCException {
+    IPCRemovedPeerException(const std::string& message = "Removing peer")
+        : IPCException(message) {}
+};
+
+struct IPCClosingException: public IPCException {
+    IPCClosingException(const std::string& message = "Closing IPC")
+        : IPCException(message) {}
 };
 
 struct IPCTimeoutException: public IPCException {
-    IPCTimeoutException(const std::string& error) : IPCException(error) {}
+    IPCTimeoutException(const std::string& message)
+        : IPCException(message) {}
+};
+
+struct IPCUserException: public IPCException {
+    IPCUserException(const int code, const std::string& message)
+        : IPCException(message),
+          mCode(code)
+    {}
+
+    int getCode() const
+    {
+        return mCode;
+    }
+
+private:
+    int mCode;
 };
-}
 
+} // namespace ipc
+} // namespace vasum
 
 #endif // COMMON_IPC_EXCEPTION_HPP
index 36d3d7a..8ed17c5 100644 (file)
 #ifndef COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
 #define COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
 
+#include "ipc/internals/result-builder.hpp"
 #include "ipc/types.hpp"
+#include "ipc/result.hpp"
 #include "logger/logger-scope.hpp"
 #include "config/manager.hpp"
+#include <utility>
 
 namespace vasum {
 namespace ipc {
@@ -49,7 +52,7 @@ public:
     std::shared_ptr<void> data;
     SerializeCallback serialize;
     ParseCallback parse;
-    ResultHandler<void>::type process;
+    ResultBuilderHandler process;
 
 private:
     MethodRequest(const MethodID methodID, const FileDescriptor peerFD)
@@ -82,10 +85,9 @@ std::shared_ptr<MethodRequest> MethodRequest::create(const MethodID methodID,
         return data;
     };
 
-    request->process = [process](Status status, std::shared_ptr<void>& data)->void {
-        LOGS("Method process, status: " << toString(status));
-        std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
-        return process(status, tmpData);
+    request->process = [process](ResultBuilder & resultBuilder) {
+        LOGS("Method process");
+        process(resultBuilder.build<ReceivedDataType>());
     };
 
     return request;
index bdc8a8d..58beca8 100644 (file)
@@ -51,6 +51,7 @@ namespace ipc {
 
 const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
+const MethodID Processor::ERROR_METHOD_ID = std::numeric_limits<MethodID>::max() - 2;
 
 Processor::Processor(const std::string& logName,
                      const PeerCallback& newPeerCallback,
@@ -66,8 +67,10 @@ Processor::Processor(const std::string& logName,
 
     utils::signalBlock(SIGPIPE);
     using namespace std::placeholders;
-    setMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
-                                                                std::bind(&Processor::onNewSignals, this, _1, _2));
+    setSignalHandlerInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
+                                                             std::bind(&Processor::onNewSignals, this, _1, _2));
+
+    setSignalHandlerInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, std::bind(&Processor::onErrorSignal, this, _1, _2));
 }
 
 Processor::~Processor()
@@ -110,7 +113,7 @@ void Processor::stop()
         {
             Lock lock(mStateMutex);
             auto request = std::make_shared<FinishRequest>(conditionPtr);
-            mRequestQueue.push(Event::FINISH, request);
+            mRequestQueue.pushBack(Event::FINISH, request);
         }
 
         LOGD(mLogPrefix + "Waiting for the Processor to stop");
@@ -158,7 +161,7 @@ FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
 
     FileDescriptor peerFD = socketPtr->getFD();
     auto request = std::make_shared<AddPeerRequest>(peerFD, socketPtr);
-    mRequestQueue.push(Event::ADD_PEER, request);
+    mRequestQueue.pushBack(Event::ADD_PEER, request);
 
     LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD);
 
@@ -182,7 +185,7 @@ void Processor::removePeer(const FileDescriptor peerFD)
     {
         Lock lock(mStateMutex);
         auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
-        mRequestQueue.push(Event::REMOVE_PEER, request);
+        mRequestQueue.pushBack(Event::REMOVE_PEER, request);
     }
 
     auto isPeerDeleted = [&peerFD, this]()->bool {
@@ -193,7 +196,7 @@ void Processor::removePeer(const FileDescriptor peerFD)
     conditionPtr->wait(lock, isPeerDeleted);
 }
 
-void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
+void Processor::removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr)
 {
     LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD);
     LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD);
@@ -214,10 +217,10 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
     }
 
     // Erase associated return value callbacks
-    std::shared_ptr<void> data;
     for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
         if (it->second.peerFD == peerFD) {
-            IGNORE_EXCEPTIONS(it->second.process(status, data));
+            ResultBuilder resultBuilder(exceptionPtr);
+            IGNORE_EXCEPTIONS(it->second.process(resultBuilder));
             it = mReturnCallbacks.erase(it);
         } else {
             ++it;
@@ -312,7 +315,8 @@ bool Processor::handleLostConnections()
             if (mFDs[i].revents & POLLHUP) {
                 LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd);
                 mFDs[i].revents &= ~(POLLHUP);
-                removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED);
+                removePeerInternal(mFDs[i].fd,
+                                   std::make_exception_ptr(IPCPeerDisconnectedException()));
                 isPeerRemoved = true;
             }
         }
@@ -324,7 +328,8 @@ bool Processor::handleLostConnections()
 bool Processor::handleLostConnection(const FileDescriptor peerFD)
 {
     Lock lock(mStateMutex);
-    removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
+    removePeerInternal(peerFD,
+                       std::make_exception_ptr(IPCPeerDisconnectedException()));
     return true;
 }
 
@@ -367,7 +372,8 @@ bool Processor::handleInput(const FileDescriptor peerFD)
 
         } catch (const IPCException& e) {
             LOGE(mLogPrefix + "Error during reading the socket");
-            removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
+            removePeerInternal(socketPtr->getFD(),
+                               std::make_exception_ptr(IPCNaughtyPeerException()));
             return true;
         }
 
@@ -388,23 +394,33 @@ bool Processor::handleInput(const FileDescriptor peerFD)
             } else {
                 // Nothing
                 LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID);
-                removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
+                removePeerInternal(socketPtr->getFD(),
+                                   std::make_exception_ptr(IPCNaughtyPeerException()));
                 return true;
             }
         }
     }
 }
 
-std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
-                                                              std::shared_ptr<RegisterSignalsMessage>& data)
+void Processor::onNewSignals(const FileDescriptor peerFD,
+                             std::shared_ptr<RegisterSignalsProtocolMessage>& data)
 {
     LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD);
 
     for (const MethodID methodID : data->ids) {
         mSignalsPeers[methodID].push_back(peerFD);
     }
+}
 
-    return std::make_shared<EmptyData>();
+void Processor::onErrorSignal(const FileDescriptor, std::shared_ptr<ErrorProtocolMessage>& data)
+{
+    LOGS(mLogPrefix + "Processor onErrorSignal messageID: " << data->messageID);
+
+    ReturnCallbacks returnCallbacks = std::move(mReturnCallbacks.at(data->messageID));
+    mReturnCallbacks.erase(data->messageID);
+
+    ResultBuilder resultBuilder(std::make_exception_ptr(IPCUserException(data->code, data->message)));
+    IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
 }
 
 bool Processor::onReturnValue(const Socket& socket,
@@ -420,7 +436,8 @@ bool Processor::onReturnValue(const Socket& socket,
         mReturnCallbacks.erase(messageID);
     } catch (const std::out_of_range&) {
         LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
-        removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
+        removePeerInternal(socket.getFD(),
+                           std::make_exception_ptr(IPCNaughtyPeerException()));
         return true;
     }
 
@@ -430,13 +447,16 @@ bool Processor::onReturnValue(const Socket& socket,
         data = returnCallbacks.parse(socket.getFD());
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
-        IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
-        removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
+        ResultBuilder resultBuilder(std::make_exception_ptr(IPCParsingException()));
+        IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
+        removePeerInternal(socket.getFD(),
+                           std::make_exception_ptr(IPCParsingException()));
         return true;
     }
 
     // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID);
-    IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
+    ResultBuilder resultBuilder(data);
+    IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
 
     // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed");
     return false;
@@ -457,16 +477,22 @@ bool Processor::onRemoteSignal(const Socket& socket,
         data = signalCallbacks->parse(socket.getFD());
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
-        removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
+        removePeerInternal(socket.getFD(),
+                           std::make_exception_ptr(IPCParsingException()));
         return true;
     }
 
     // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID);
     try {
         signalCallbacks->signal(socket.getFD(), data);
+    } catch (const IPCUserException& e) {
+        LOGW("Discarded user's exception");
+        return false;
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception in method handler: " << e.what());
-        removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
+        removePeerInternal(socket.getFD(),
+                           std::make_exception_ptr(IPCNaughtyPeerException()));
+
         return true;
     }
 
@@ -487,7 +513,8 @@ bool Processor::onRemoteCall(const Socket& socket,
         data = methodCallbacks->parse(socket.getFD());
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
-        removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
+        removePeerInternal(socket.getFD(),
+                           std::make_exception_ptr(IPCParsingException()));
         return true;
     }
 
@@ -495,9 +522,15 @@ bool Processor::onRemoteCall(const Socket& socket,
     std::shared_ptr<void> returnData;
     try {
         returnData = methodCallbacks->method(socket.getFD(), data);
+    } catch (const IPCUserException& e) {
+        LOGW("User's exception");
+        auto data = std::make_shared<ErrorProtocolMessage>(messageID, e.getCode(), e.what());
+        signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, socket.getFD(), data);
+        return false;
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception in method handler: " << e.what());
-        removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
+        removePeerInternal(socket.getFD(),
+                           std::make_exception_ptr(IPCNaughtyPeerException()));
         return true;
     }
 
@@ -510,7 +543,9 @@ bool Processor::onRemoteCall(const Socket& socket,
         methodCallbacks->serialize(socket.getFD(), returnData);
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Exception during serialization: " << e.what());
-        removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
+        removePeerInternal(socket.getFD(),
+                           std::make_exception_ptr(IPCSerializationException()));
+
         return true;
     }
 
@@ -549,7 +584,8 @@ bool Processor::onMethodRequest(MethodRequest& request)
         LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
 
         // Pass the error to the processing callback
-        IGNORE_EXCEPTIONS(request.process(Status::PEER_DISCONNECTED, request.data));
+        ResultBuilder resultBuilder(std::make_exception_ptr(IPCPeerDisconnectedException()));
+        IGNORE_EXCEPTIONS(request.process(resultBuilder));
 
         return false;
     }
@@ -571,12 +607,15 @@ bool Processor::onMethodRequest(MethodRequest& request)
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Error during sending a method: " << e.what());
 
-        // Inform about the error,
-        IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(Status::SERIALIZATION_ERROR, request.data));
+        // Inform about the error
+        ResultBuilder resultBuilder(std::make_exception_ptr(IPCSerializationException()));
+        IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(resultBuilder));
 
 
         mReturnCallbacks.erase(request.messageID);
-        removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
+        removePeerInternal(request.peerFD,
+                           std::make_exception_ptr(IPCSerializationException()));
+
 
         return true;
 
@@ -607,7 +646,9 @@ bool Processor::onSignalRequest(SignalRequest& request)
     } catch (const std::exception& e) {
         LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
 
-        removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
+        removePeerInternal(request.peerFD,
+                           std::make_exception_ptr(IPCSerializationException()));
+
         return true;
     }
 
@@ -635,11 +676,10 @@ bool Processor::onAddPeerRequest(AddPeerRequest& request)
     for (const auto kv : mSignalsCallbacks) {
         ids.push_back(kv.first);
     }
-    auto data = std::make_shared<RegisterSignalsMessage>(ids);
-    callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
-                                                 request.peerFD,
-                                                 data,
-                                                 discardResultHandler<EmptyData>);
+    auto data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
+    signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
+                                                   request.peerFD,
+                                                   data);
 
 
     resetPolling();
@@ -658,7 +698,9 @@ bool Processor::onRemovePeerRequest(RemovePeerRequest& request)
 {
     LOGS(mLogPrefix + "Processor onRemovePeer");
 
-    removePeerInternal(request.peerFD, Status::REMOVED_PEER);
+    removePeerInternal(request.peerFD,
+                       std::make_exception_ptr(IPCRemovedPeerException()));
+
     request.conditionPtr->notify_all();
 
     return true;
@@ -676,7 +718,8 @@ bool Processor::onFinishRequest(FinishRequest& request)
         switch (request.requestID) {
         case Event::METHOD: {
             auto requestPtr = request.get<MethodRequest>();
-            IGNORE_EXCEPTIONS(requestPtr->process(Status::CLOSING, requestPtr->data));
+            ResultBuilder resultBuilder(std::make_exception_ptr(IPCClosingException()));
+            IGNORE_EXCEPTIONS(requestPtr->process(resultBuilder));
             break;
         }
         case Event::REMOVE_PEER: {
index c3bcc6b..b6bd615 100644 (file)
@@ -25,6 +25,7 @@
 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
 
+#include "ipc/internals/result-builder.hpp"
 #include "ipc/internals/socket.hpp"
 #include "ipc/internals/request-queue.hpp"
 #include "ipc/internals/method-request.hpp"
@@ -55,7 +56,6 @@ namespace vasum {
 namespace ipc {
 
 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
-const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
 
 /**
 * This class wraps communication via UX sockets
@@ -84,6 +84,7 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
 *  - no new events added after stop() called
 *  - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
 *    there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
+*  - remove recursive mutex
 *
 */
 class Processor {
@@ -97,7 +98,6 @@ private:
     };
 
 public:
-
     friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event);
 
     /**
@@ -111,6 +111,11 @@ public:
     static const MethodID REGISTER_SIGNAL_METHOD_ID;
 
     /**
+    * Error return message
+    */
+    static const MethodID ERROR_METHOD_ID;
+
+    /**
      * Constructs the Processor, but doesn't start it.
      * The object is ready to add methods.
      *
@@ -304,9 +309,9 @@ private:
         CONFIG_REGISTER_EMPTY
     };
 
-    struct RegisterSignalsMessage {
-        RegisterSignalsMessage() = default;
-        RegisterSignalsMessage(const std::vector<MethodID> ids)
+    struct RegisterSignalsProtocolMessage {
+        RegisterSignalsProtocolMessage() = default;
+        RegisterSignalsProtocolMessage(const std::vector<MethodID> ids)
             : ids(ids) {}
 
         std::vector<MethodID> ids;
@@ -317,6 +322,23 @@ private:
         )
     };
 
+    struct ErrorProtocolMessage {
+        ErrorProtocolMessage() = default;
+        ErrorProtocolMessage(const MessageID messageID, const int code, const std::string& message)
+            : messageID(messageID), code(code), message(message) {}
+
+        MessageID messageID;
+        int code;
+        std::string message;
+
+        CONFIG_REGISTER
+        (
+            messageID,
+            code,
+            message
+        )
+    };
+
     struct MethodHandlers {
         MethodHandlers(const MethodHandlers& other) = delete;
         MethodHandlers& operator=(const MethodHandlers&) = delete;
@@ -347,12 +369,12 @@ private:
         ReturnCallbacks(ReturnCallbacks&&) = default;
         ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
 
-        ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler<void>::type& process)
+        ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultBuilderHandler& process)
             : peerFD(peerFD), parse(parse), process(process) {}
 
         FileDescriptor peerFD;
         ParseCallback parse;
-        ResultHandler<void>::type process;
+        ResultBuilderHandler process;
     };
 
     std::string mLogPrefix;
@@ -386,7 +408,13 @@ private:
                                   const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
 
     template<typename ReceivedDataType>
-    static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
+    void setSignalHandlerInternal(const MethodID methodID,
+                                  const typename SignalHandler<ReceivedDataType>::type& handler);
+
+    template<typename SentDataType>
+    void signalInternal(const MethodID methodID,
+                        const FileDescriptor peerFD,
+                        const std::shared_ptr<SentDataType>& data);
 
     void run();
 
@@ -412,10 +440,13 @@ private:
                         std::shared_ptr<SignalHandlers> signalCallbacks);
     void resetPolling();
     FileDescriptor getNextFileDescriptor();
-    void removePeerInternal(const FileDescriptor peerFD, Status status);
+    void removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr);
+
+    void onNewSignals(const FileDescriptor peerFD,
+                      std::shared_ptr<RegisterSignalsProtocolMessage>& data);
 
-    std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
-                                            std::shared_ptr<RegisterSignalsMessage>& data);
+    void onErrorSignal(const FileDescriptor peerFD,
+                       std::shared_ptr<ErrorProtocolMessage>& data);
 
 
 };
@@ -441,10 +472,7 @@ void Processor::setMethodHandlerInternal(const MethodID methodID,
         return method(peerFD, tmpData);
     };
 
-    {
-        Lock lock(mStateMutex);
-        mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
-    }
+    mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
 }
 
 template<typename SentDataType, typename ReceivedDataType>
@@ -464,12 +492,33 @@ void Processor::setMethodHandler(const MethodID methodID,
             throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
         }
 
-        setMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
+        setMethodHandlerInternal<SentDataType, ReceivedDataType>(methodID, method);
     }
 
 }
 
 template<typename ReceivedDataType>
+void Processor::setSignalHandlerInternal(const MethodID methodID,
+                                         const typename SignalHandler<ReceivedDataType>::type& handler)
+{
+    SignalHandlers signalCall;
+
+    signalCall.parse = [](const int fd)->std::shared_ptr<void> {
+        std::shared_ptr<ReceivedDataType> dataToFill(new ReceivedDataType());
+        config::loadFromFD<ReceivedDataType>(fd, *dataToFill);
+        return dataToFill;
+    };
+
+    signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& dataReceived) {
+        std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(dataReceived);
+        handler(peerFD, tmpData);
+    };
+
+    mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
+}
+
+
+template<typename ReceivedDataType>
 void Processor::setSignalHandler(const MethodID methodID,
                                  const typename SignalHandler<ReceivedDataType>::type& handler)
 {
@@ -478,8 +527,9 @@ void Processor::setSignalHandler(const MethodID methodID,
         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
     }
 
-    std::shared_ptr<RegisterSignalsMessage> data;
+    std::shared_ptr<RegisterSignalsProtocolMessage> data;
     std::vector<FileDescriptor> peerFDs;
+
     {
         Lock lock(mStateMutex);
 
@@ -489,24 +539,11 @@ void Processor::setSignalHandler(const MethodID methodID,
             throw IPCException("MethodID used by a method: " + std::to_string(methodID));
         }
 
-        SignalHandlers signalCall;
-
-        signalCall.parse = [](const int fd)->std::shared_ptr<void> {
-            std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
-            config::loadFromFD<ReceivedDataType>(fd, *data);
-            return data;
-        };
-
-        signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
-            std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
-            handler(peerFD, tmpData);
-        };
-
-        mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
+        setSignalHandlerInternal<ReceivedDataType>(methodID, handler);
 
         // Broadcast the new signal:
         std::vector<MethodID> ids {methodID};
-        data = std::make_shared<RegisterSignalsMessage>(ids);
+        data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
 
         for (const auto kv : mSockets) {
             peerFDs.push_back(kv.first);
@@ -514,10 +551,9 @@ void Processor::setSignalHandler(const MethodID methodID,
     }
 
     for (const auto peerFD : peerFDs) {
-        callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
-                                                    peerFD,
-                                                    data,
-                                                    DEFAULT_METHOD_TIMEOUT);
+        signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
+                                                       peerFD,
+                                                       data);
     }
 }
 
@@ -530,7 +566,7 @@ MessageID Processor::callAsync(const MethodID methodID,
 {
     Lock lock(mStateMutex);
     auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
-    mRequestQueue.push(Event::METHOD, request);
+    mRequestQueue.pushBack(Event::METHOD, request);
     return request->messageID;
 }
 
@@ -541,16 +577,14 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
                                                       const std::shared_ptr<SentDataType>& data,
                                                       unsigned int timeoutMS)
 {
-    std::shared_ptr<ReceivedDataType> result;
+    Result<ReceivedDataType> result;
 
     std::mutex mutex;
     std::condition_variable cv;
-    Status returnStatus = ipc::Status::UNDEFINED;
 
-    auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
+    auto process = [&result, &mutex, &cv](const Result<ReceivedDataType> && r) {
         std::unique_lock<std::mutex> lock(mutex);
-        returnStatus = status;
-        result = returnedData;
+        result = std::move(r);
         cv.notify_all();
     };
 
@@ -559,8 +593,8 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
                                                                     data,
                                                                     process);
 
-    auto isResultInitialized = [&returnStatus]() {
-        return returnStatus != ipc::Status::UNDEFINED;
+    auto isResultInitialized = [&result]() {
+        return result.isValid();
     };
 
     std::unique_lock<std::mutex> lock(mutex);
@@ -594,9 +628,17 @@ std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
         }
     }
 
-    throwOnError(returnStatus);
+    return result.get();
+}
 
-    return result;
+template<typename SentDataType>
+void Processor::signalInternal(const MethodID methodID,
+                               const FileDescriptor peerFD,
+                               const std::shared_ptr<SentDataType>& data)
+{
+    Lock lock(mStateMutex);
+    auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
+    mRequestQueue.pushFront(Event::SIGNAL, request);
 }
 
 template<typename SentDataType>
@@ -611,7 +653,7 @@ void Processor::signal(const MethodID methodID,
     }
     for (const FileDescriptor peerFD : it->second) {
         auto request =  SignalRequest::create<SentDataType>(methodID, peerFD, data);
-        mRequestQueue.push(Event::SIGNAL, request);
+        mRequestQueue.pushBack(Event::SIGNAL, request);
     }
 }
 
index 82ba606..f648345 100644 (file)
@@ -78,13 +78,22 @@ public:
     bool isEmpty() const;
 
     /**
-     * Push data to the queue
+     * Push data to back of the queue
      *
      * @param requestID request type
      * @param data data corresponding to the request
      */
-    void push(const RequestIdType requestID,
-              const std::shared_ptr<void>& data = nullptr);
+    void pushBack(const RequestIdType requestID,
+                  const std::shared_ptr<void>& data = nullptr);
+
+    /**
+     * Push data to back of the queue
+     *
+     * @param requestID request type
+     * @param data data corresponding to the request
+     */
+    void pushFront(const RequestIdType requestID,
+                   const std::shared_ptr<void>& data = nullptr);
 
     /**
      * @return get the data from the next request
@@ -118,8 +127,8 @@ bool RequestQueue<RequestIdType>::isEmpty() const
 }
 
 template<typename RequestIdType>
-void RequestQueue<RequestIdType>::push(const RequestIdType requestID,
-                                       const std::shared_ptr<void>& data)
+void RequestQueue<RequestIdType>::pushBack(const RequestIdType requestID,
+                                           const std::shared_ptr<void>& data)
 {
     Request request(requestID, data);
     mRequests.push_back(std::move(request));
@@ -127,6 +136,15 @@ void RequestQueue<RequestIdType>::push(const RequestIdType requestID,
 }
 
 template<typename RequestIdType>
+void RequestQueue<RequestIdType>::pushFront(const RequestIdType requestID,
+                                            const std::shared_ptr<void>& data)
+{
+    Request request(requestID, data);
+    mRequests.push_front(std::move(request));
+    mEventFD.send();
+}
+
+template<typename RequestIdType>
 typename RequestQueue<RequestIdType>::Request RequestQueue<RequestIdType>::pop()
 {
     mEventFD.receive();
diff --git a/common/ipc/internals/result-builder.hpp b/common/ipc/internals/result-builder.hpp
new file mode 100644 (file)
index 0000000..3fe3c49
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+*  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Class for storing result of a method - data or exception
+ */
+
+#ifndef COMMON_IPC_RESULT_BUILDER_HPP
+#define COMMON_IPC_RESULT_BUILDER_HPP
+
+#include "ipc/result.hpp"
+#include <functional>
+#include <exception>
+#include <memory>
+
+namespace vasum {
+namespace ipc {
+
+class ResultBuilder {
+public:
+    ResultBuilder()
+        : mData(nullptr),
+          mExceptionPtr(nullptr)
+    {}
+
+    ResultBuilder(const std::exception_ptr& exceptionPtr)
+        : mData(nullptr),
+          mExceptionPtr(exceptionPtr)
+    {}
+
+    ResultBuilder(const std::shared_ptr<void>& data)
+        : mData(data),
+          mExceptionPtr(nullptr)
+
+    {}
+
+    template<typename Data>
+    Result<Data> build()
+    {
+        return Result<Data>(std::move(std::static_pointer_cast<Data>(mData)),
+                            std::move(mExceptionPtr));
+    }
+
+private:
+    std::shared_ptr<void> mData;
+    std::exception_ptr mExceptionPtr;
+};
+
+typedef std::function<void(ResultBuilder&)> ResultBuilderHandler;
+
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_RESULT_BUILDER_HPP
+
+
+
+
+
+
diff --git a/common/ipc/result.hpp b/common/ipc/result.hpp
new file mode 100644 (file)
index 0000000..0edf172
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+*  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Class for storing result of a method - data or exception
+ */
+
+#ifndef COMMON_IPC_RESULT_HPP
+#define COMMON_IPC_RESULT_HPP
+
+#include <functional>
+#include <exception>
+#include <memory>
+
+namespace vasum {
+namespace ipc {
+
+template<typename Data>
+class Result {
+public:
+    Result()
+        : mData(nullptr),
+          mExceptionPtr(nullptr)
+    {}
+
+    Result(std::shared_ptr<Data>&& data, std::exception_ptr&& exceptionPtr)
+        : mData(std::move(data)),
+          mExceptionPtr(std::move(exceptionPtr))
+    {}
+
+    std::shared_ptr<Data> get() const
+    {
+        if (mExceptionPtr) {
+            std::rethrow_exception(mExceptionPtr);
+        }
+        return mData;
+    }
+
+    bool isValid() const
+    {
+        return (bool)mExceptionPtr || (bool)mData;
+    }
+
+private:
+    std::shared_ptr<Data> mData;
+    std::exception_ptr mExceptionPtr;
+};
+
+template<typename Data>
+struct ResultHandler {
+    typedef std::function < void(Result<Data>&&) > type;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_RESULT_HPP
index 34b73fd..383c71d 100644 (file)
@@ -29,6 +29,7 @@
 #include "ipc/internals/acceptor.hpp"
 #include "ipc/ipc-gsource.hpp"
 #include "ipc/types.hpp"
+#include "ipc/result.hpp"
 #include "logger/logger.hpp"
 
 #include <string>
index 5d7dab6..a73a612 100644 (file)
@@ -41,40 +41,7 @@ MessageID getNextMessageID()
     return ++gLastMessageID;
 }
 
-std::string toString(const Status status)
-{
-    switch (status) {
-    case Status::OK: return "No error, everything is OK";
-    case Status::PARSING_ERROR: return "Exception during reading/parsing data from the socket";
-    case Status::SERIALIZATION_ERROR: return "Exception during writing/serializing data to the socket";
-    case Status::PEER_DISCONNECTED: return "No such peer. Might got disconnected.";
-    case Status::NAUGHTY_PEER: return "Peer performed a forbidden action.";
-    case Status::REMOVED_PEER: return "Removing peer";
-    case Status::CLOSING: return "Closing IPC";
-    case Status::UNDEFINED: return "Undefined state";
-    default: return "Unknown status";
-    }
-}
 
-void throwOnError(const Status status)
-{
-    if (status == Status::OK) {
-        return;
-    }
 
-    std::string message = toString(status);
-    LOGE(message);
-
-    switch (status) {
-    case Status::PARSING_ERROR: throw IPCParsingException(message);
-    case Status::SERIALIZATION_ERROR: throw IPCSerializationException(message);
-    case Status::PEER_DISCONNECTED: throw IPCPeerDisconnectedException(message);
-    case Status::NAUGHTY_PEER: throw IPCNaughtyPeerException(message);
-    case Status::REMOVED_PEER: throw IPCException(message);
-    case Status::CLOSING: throw IPCException(message);
-    case Status::UNDEFINED: throw IPCException(message);
-    default: return throw IPCException(message);
-    }
-}
 } // namespace ipc
 } // namespace vasum
index 6186f65..b5411b3 100644 (file)
@@ -42,22 +42,8 @@ typedef std::function<void(FileDescriptor)> PeerCallback;
 typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
 typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
 
-enum class Status : int {
-    OK = 0,
-    PARSING_ERROR,
-    SERIALIZATION_ERROR,
-    PEER_DISCONNECTED,
-    NAUGHTY_PEER,
-    REMOVED_PEER,
-    CLOSING,
-    UNDEFINED
-};
-
-std::string toString(const Status status);
-void throwOnError(const Status status);
 MessageID getNextMessageID();
 
-
 template<typename SentDataType, typename ReceivedDataType>
 struct MethodHandler {
     typedef std::function<std::shared_ptr<SentDataType>(FileDescriptor peerFD,
@@ -70,12 +56,6 @@ struct SignalHandler {
                                std::shared_ptr<ReceivedDataType>& data)> type;
 };
 
-template <typename ReceivedDataType>
-struct ResultHandler {
-    typedef std::function<void(Status status,
-                               std::shared_ptr<ReceivedDataType>& resultData)> type;
-};
-
 } // namespace ipc
 } // namespace vasum
 
index e5609f7..5408735 100644 (file)
@@ -33,6 +33,7 @@
 #include "ipc/service.hpp"
 #include "ipc/client.hpp"
 #include "ipc/types.hpp"
+#include "ipc/result.hpp"
 #include "utils/glib-loop.hpp"
 #include "utils/latch.hpp"
 #include "utils/value-latch.hpp"
@@ -353,10 +354,8 @@ BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho)
     c.start();
 
     //Async call
-    auto dataBack = [&recvDataLatch](ipc::Status status, std::shared_ptr<RecvData>& data) {
-        if (status == ipc::Status::OK) {
-            recvDataLatch.set(data);
-        }
+    auto dataBack = [&recvDataLatch](Result<RecvData> && r) {
+        recvDataLatch.set(r.get());
     };
     c.callAsync<SendData, RecvData>(1, sentData, dataBack);
 
@@ -376,10 +375,8 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
     FileDescriptor peerFD = connect(s, c);
 
     // Async call
-    auto dataBack = [&recvDataLatch](ipc::Status status, std::shared_ptr<RecvData>& data) {
-        if (status == ipc::Status::OK) {
-            recvDataLatch.set(data);
-        }
+    auto dataBack = [&recvDataLatch](Result<RecvData> && r) {
+        recvDataLatch.set(r.get());
     };
 
     s.callAsync<SendData, RecvData>(1, peerFD, sentData, dataBack);
@@ -431,7 +428,7 @@ BOOST_AUTO_TEST_CASE(ParseError)
 
 BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
 {
-    ValueLatch<ipc::Status> retStatusLatch;
+    ValueLatch<Result<RecvData>> retStatusLatch;
     Service s(socketPath);
 
     auto method = [](const FileDescriptor, std::shared_ptr<ThrowOnAcceptData>&) {
@@ -445,20 +442,20 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
     Client c(socketPath);
     c.start();
 
-    auto dataBack = [&retStatusLatch](ipc::Status status, std::shared_ptr<RecvData>&) {
-        retStatusLatch.set(status);
+    auto dataBack = [&retStatusLatch](Result<RecvData> && r) {
+        retStatusLatch.set(std::move(r));
     };
 
     std::shared_ptr<SendData> sentData(new SendData(78));
     c.callAsync<SendData, RecvData>(1, sentData, dataBack);
 
     // Wait for the response
-    ipc::Status retStatus = retStatusLatch.get(TIMEOUT);
+    Result<RecvData> result = retStatusLatch.get(TIMEOUT);
 
     // The disconnection might have happened:
     // - after sending the message (PEER_DISCONNECTED)
     // - during external serialization (SERIALIZATION_ERROR)
-    BOOST_CHECK(retStatus == ipc::Status::PEER_DISCONNECTED || retStatus == ipc::Status::SERIALIZATION_ERROR);
+    BOOST_CHECK_THROW(result.get(), IPCException);
 }
 
 
@@ -627,6 +624,32 @@ BOOST_AUTO_TEST_CASE(ClientGSource)
     BOOST_CHECK(l.wait(TIMEOUT));
 }
 
+BOOST_AUTO_TEST_CASE(UsersError)
+{
+    const int TEST_ERROR_CODE = -234;
+    const std::string TEST_ERROR_MESSAGE = "Ay, caramba!";
+
+    Service s(socketPath);
+    Client c(socketPath);
+    auto clientID = connect(s, c);
+
+    auto throwingMethodHandler = [&](const FileDescriptor, std::shared_ptr<RecvData>&) -> std::shared_ptr<SendData> {
+        throw IPCUserException(TEST_ERROR_CODE, TEST_ERROR_MESSAGE);
+    };
+
+    s.setMethodHandler<SendData, RecvData>(1, throwingMethodHandler);
+    c.setMethodHandler<SendData, RecvData>(1, throwingMethodHandler);
+
+    std::shared_ptr<SendData> sentData(new SendData(78));
+
+    auto hasProperData = [&](const IPCUserException & e) {
+        return e.getCode() == TEST_ERROR_CODE && e.what() == TEST_ERROR_MESSAGE;
+    };
+
+    BOOST_CHECK_EXCEPTION((c.callSync<SendData, RecvData>(1, sentData, TIMEOUT)), IPCUserException, hasProperData);
+    BOOST_CHECK_EXCEPTION((s.callSync<SendData, RecvData>(1, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData);
+
+}
 // BOOST_AUTO_TEST_CASE(ConnectionLimitTest)
 // {
 //     unsigned oldLimit = ipc::getMaxFDNumber();