#include "ipc/internals/processor.hpp"
#include "ipc/ipc-gsource.hpp"
#include "ipc/types.hpp"
+#include "ipc/result.hpp"
#include "logger/logger.hpp"
#include <string>
#include "base-exception.hpp"
namespace vasum {
-
+namespace ipc {
/**
* Base class for exceptions in IPC
*/
struct IPCException: public VasumException {
- IPCException(const std::string& error) : VasumException(error) {}
+ IPCException(const std::string& message)
+ : VasumException(message) {}
};
struct IPCParsingException: public IPCException {
- IPCParsingException(const std::string& error) : IPCException(error) {}
+ IPCParsingException(const std::string& message = "Exception during reading/parsing data from the socket")
+ : IPCException(message) {}
};
struct IPCSerializationException: public IPCException {
- IPCSerializationException(const std::string& error) : IPCException(error) {}
+ IPCSerializationException(const std::string& message = "Exception during writing/serializing data to the socket")
+ : IPCException(message) {}
};
struct IPCPeerDisconnectedException: public IPCException {
- IPCPeerDisconnectedException(const std::string& error) : IPCException(error) {}
+ IPCPeerDisconnectedException(const std::string& message = "No such peer. Might got disconnected.")
+ : IPCException(message) {}
};
struct IPCNaughtyPeerException: public IPCException {
- IPCNaughtyPeerException(const std::string& error) : IPCException(error) {}
+ IPCNaughtyPeerException(const std::string& message = "Peer performed a forbidden action.")
+ : IPCException(message) {}
+};
+
+struct IPCRemovedPeerException: public IPCException {
+ IPCRemovedPeerException(const std::string& message = "Removing peer")
+ : IPCException(message) {}
+};
+
+struct IPCClosingException: public IPCException {
+ IPCClosingException(const std::string& message = "Closing IPC")
+ : IPCException(message) {}
};
struct IPCTimeoutException: public IPCException {
- IPCTimeoutException(const std::string& error) : IPCException(error) {}
+ IPCTimeoutException(const std::string& message)
+ : IPCException(message) {}
+};
+
+struct IPCUserException: public IPCException {
+ IPCUserException(const int code, const std::string& message)
+ : IPCException(message),
+ mCode(code)
+ {}
+
+ int getCode() const
+ {
+ return mCode;
+ }
+
+private:
+ int mCode;
};
-}
+} // namespace ipc
+} // namespace vasum
#endif // COMMON_IPC_EXCEPTION_HPP
#ifndef COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
#define COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
+#include "ipc/internals/result-builder.hpp"
#include "ipc/types.hpp"
+#include "ipc/result.hpp"
#include "logger/logger-scope.hpp"
#include "config/manager.hpp"
+#include <utility>
namespace vasum {
namespace ipc {
std::shared_ptr<void> data;
SerializeCallback serialize;
ParseCallback parse;
- ResultHandler<void>::type process;
+ ResultBuilderHandler process;
private:
MethodRequest(const MethodID methodID, const FileDescriptor peerFD)
return data;
};
- request->process = [process](Status status, std::shared_ptr<void>& data)->void {
- LOGS("Method process, status: " << toString(status));
- std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- return process(status, tmpData);
+ request->process = [process](ResultBuilder & resultBuilder) {
+ LOGS("Method process");
+ process(resultBuilder.build<ReceivedDataType>());
};
return request;
const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
+const MethodID Processor::ERROR_METHOD_ID = std::numeric_limits<MethodID>::max() - 2;
Processor::Processor(const std::string& logName,
const PeerCallback& newPeerCallback,
utils::signalBlock(SIGPIPE);
using namespace std::placeholders;
- setMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
- std::bind(&Processor::onNewSignals, this, _1, _2));
+ setSignalHandlerInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
+ std::bind(&Processor::onNewSignals, this, _1, _2));
+
+ setSignalHandlerInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, std::bind(&Processor::onErrorSignal, this, _1, _2));
}
Processor::~Processor()
{
Lock lock(mStateMutex);
auto request = std::make_shared<FinishRequest>(conditionPtr);
- mRequestQueue.push(Event::FINISH, request);
+ mRequestQueue.pushBack(Event::FINISH, request);
}
LOGD(mLogPrefix + "Waiting for the Processor to stop");
FileDescriptor peerFD = socketPtr->getFD();
auto request = std::make_shared<AddPeerRequest>(peerFD, socketPtr);
- mRequestQueue.push(Event::ADD_PEER, request);
+ mRequestQueue.pushBack(Event::ADD_PEER, request);
LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD);
{
Lock lock(mStateMutex);
auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
- mRequestQueue.push(Event::REMOVE_PEER, request);
+ mRequestQueue.pushBack(Event::REMOVE_PEER, request);
}
auto isPeerDeleted = [&peerFD, this]()->bool {
conditionPtr->wait(lock, isPeerDeleted);
}
-void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
+void Processor::removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr)
{
LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD);
LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD);
}
// Erase associated return value callbacks
- std::shared_ptr<void> data;
for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
if (it->second.peerFD == peerFD) {
- IGNORE_EXCEPTIONS(it->second.process(status, data));
+ ResultBuilder resultBuilder(exceptionPtr);
+ IGNORE_EXCEPTIONS(it->second.process(resultBuilder));
it = mReturnCallbacks.erase(it);
} else {
++it;
if (mFDs[i].revents & POLLHUP) {
LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd);
mFDs[i].revents &= ~(POLLHUP);
- removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED);
+ removePeerInternal(mFDs[i].fd,
+ std::make_exception_ptr(IPCPeerDisconnectedException()));
isPeerRemoved = true;
}
}
bool Processor::handleLostConnection(const FileDescriptor peerFD)
{
Lock lock(mStateMutex);
- removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
+ removePeerInternal(peerFD,
+ std::make_exception_ptr(IPCPeerDisconnectedException()));
return true;
}
} catch (const IPCException& e) {
LOGE(mLogPrefix + "Error during reading the socket");
- removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
+ removePeerInternal(socketPtr->getFD(),
+ std::make_exception_ptr(IPCNaughtyPeerException()));
return true;
}
} else {
// Nothing
LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID);
- removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
+ removePeerInternal(socketPtr->getFD(),
+ std::make_exception_ptr(IPCNaughtyPeerException()));
return true;
}
}
}
}
-std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
- std::shared_ptr<RegisterSignalsMessage>& data)
+void Processor::onNewSignals(const FileDescriptor peerFD,
+ std::shared_ptr<RegisterSignalsProtocolMessage>& data)
{
LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD);
for (const MethodID methodID : data->ids) {
mSignalsPeers[methodID].push_back(peerFD);
}
+}
- return std::make_shared<EmptyData>();
+void Processor::onErrorSignal(const FileDescriptor, std::shared_ptr<ErrorProtocolMessage>& data)
+{
+ LOGS(mLogPrefix + "Processor onErrorSignal messageID: " << data->messageID);
+
+ ReturnCallbacks returnCallbacks = std::move(mReturnCallbacks.at(data->messageID));
+ mReturnCallbacks.erase(data->messageID);
+
+ ResultBuilder resultBuilder(std::make_exception_ptr(IPCUserException(data->code, data->message)));
+ IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
}
bool Processor::onReturnValue(const Socket& socket,
mReturnCallbacks.erase(messageID);
} catch (const std::out_of_range&) {
LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
- removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(),
+ std::make_exception_ptr(IPCNaughtyPeerException()));
return true;
}
data = returnCallbacks.parse(socket.getFD());
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception during parsing: " << e.what());
- IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
- removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
+ ResultBuilder resultBuilder(std::make_exception_ptr(IPCParsingException()));
+ IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
+ removePeerInternal(socket.getFD(),
+ std::make_exception_ptr(IPCParsingException()));
return true;
}
// LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID);
- IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
+ ResultBuilder resultBuilder(data);
+ IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
// LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed");
return false;
data = signalCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception during parsing: " << e.what());
- removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(),
+ std::make_exception_ptr(IPCParsingException()));
return true;
}
// LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID);
try {
signalCallbacks->signal(socket.getFD(), data);
+ } catch (const IPCUserException& e) {
+ LOGW("Discarded user's exception");
+ return false;
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception in method handler: " << e.what());
- removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(),
+ std::make_exception_ptr(IPCNaughtyPeerException()));
+
return true;
}
data = methodCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception during parsing: " << e.what());
- removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(),
+ std::make_exception_ptr(IPCParsingException()));
return true;
}
std::shared_ptr<void> returnData;
try {
returnData = methodCallbacks->method(socket.getFD(), data);
+ } catch (const IPCUserException& e) {
+ LOGW("User's exception");
+ auto data = std::make_shared<ErrorProtocolMessage>(messageID, e.getCode(), e.what());
+ signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, socket.getFD(), data);
+ return false;
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception in method handler: " << e.what());
- removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(),
+ std::make_exception_ptr(IPCNaughtyPeerException()));
return true;
}
methodCallbacks->serialize(socket.getFD(), returnData);
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception during serialization: " << e.what());
- removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
+ removePeerInternal(socket.getFD(),
+ std::make_exception_ptr(IPCSerializationException()));
+
return true;
}
LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
// Pass the error to the processing callback
- IGNORE_EXCEPTIONS(request.process(Status::PEER_DISCONNECTED, request.data));
+ ResultBuilder resultBuilder(std::make_exception_ptr(IPCPeerDisconnectedException()));
+ IGNORE_EXCEPTIONS(request.process(resultBuilder));
return false;
}
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Error during sending a method: " << e.what());
- // Inform about the error,
- IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(Status::SERIALIZATION_ERROR, request.data));
+ // Inform about the error
+ ResultBuilder resultBuilder(std::make_exception_ptr(IPCSerializationException()));
+ IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(resultBuilder));
mReturnCallbacks.erase(request.messageID);
- removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
+ removePeerInternal(request.peerFD,
+ std::make_exception_ptr(IPCSerializationException()));
+
return true;
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
- removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
+ removePeerInternal(request.peerFD,
+ std::make_exception_ptr(IPCSerializationException()));
+
return true;
}
for (const auto kv : mSignalsCallbacks) {
ids.push_back(kv.first);
}
- auto data = std::make_shared<RegisterSignalsMessage>(ids);
- callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
- request.peerFD,
- data,
- discardResultHandler<EmptyData>);
+ auto data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
+ signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
+ request.peerFD,
+ data);
resetPolling();
{
LOGS(mLogPrefix + "Processor onRemovePeer");
- removePeerInternal(request.peerFD, Status::REMOVED_PEER);
+ removePeerInternal(request.peerFD,
+ std::make_exception_ptr(IPCRemovedPeerException()));
+
request.conditionPtr->notify_all();
return true;
switch (request.requestID) {
case Event::METHOD: {
auto requestPtr = request.get<MethodRequest>();
- IGNORE_EXCEPTIONS(requestPtr->process(Status::CLOSING, requestPtr->data));
+ ResultBuilder resultBuilder(std::make_exception_ptr(IPCClosingException()));
+ IGNORE_EXCEPTIONS(requestPtr->process(resultBuilder));
break;
}
case Event::REMOVE_PEER: {
#ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
#define COMMON_IPC_INTERNALS_PROCESSOR_HPP
+#include "ipc/internals/result-builder.hpp"
#include "ipc/internals/socket.hpp"
#include "ipc/internals/request-queue.hpp"
#include "ipc/internals/method-request.hpp"
namespace ipc {
const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
-const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
/**
* This class wraps communication via UX sockets
* - no new events added after stop() called
* - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
* there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
+* - remove recursive mutex
*
*/
class Processor {
};
public:
-
friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event);
/**
static const MethodID REGISTER_SIGNAL_METHOD_ID;
/**
+ * Error return message
+ */
+ static const MethodID ERROR_METHOD_ID;
+
+ /**
* Constructs the Processor, but doesn't start it.
* The object is ready to add methods.
*
CONFIG_REGISTER_EMPTY
};
- struct RegisterSignalsMessage {
- RegisterSignalsMessage() = default;
- RegisterSignalsMessage(const std::vector<MethodID> ids)
+ struct RegisterSignalsProtocolMessage {
+ RegisterSignalsProtocolMessage() = default;
+ RegisterSignalsProtocolMessage(const std::vector<MethodID> ids)
: ids(ids) {}
std::vector<MethodID> ids;
)
};
+ struct ErrorProtocolMessage {
+ ErrorProtocolMessage() = default;
+ ErrorProtocolMessage(const MessageID messageID, const int code, const std::string& message)
+ : messageID(messageID), code(code), message(message) {}
+
+ MessageID messageID;
+ int code;
+ std::string message;
+
+ CONFIG_REGISTER
+ (
+ messageID,
+ code,
+ message
+ )
+ };
+
struct MethodHandlers {
MethodHandlers(const MethodHandlers& other) = delete;
MethodHandlers& operator=(const MethodHandlers&) = delete;
ReturnCallbacks(ReturnCallbacks&&) = default;
ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
- ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler<void>::type& process)
+ ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultBuilderHandler& process)
: peerFD(peerFD), parse(parse), process(process) {}
FileDescriptor peerFD;
ParseCallback parse;
- ResultHandler<void>::type process;
+ ResultBuilderHandler process;
};
std::string mLogPrefix;
const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
template<typename ReceivedDataType>
- static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
+ void setSignalHandlerInternal(const MethodID methodID,
+ const typename SignalHandler<ReceivedDataType>::type& handler);
+
+ template<typename SentDataType>
+ void signalInternal(const MethodID methodID,
+ const FileDescriptor peerFD,
+ const std::shared_ptr<SentDataType>& data);
void run();
std::shared_ptr<SignalHandlers> signalCallbacks);
void resetPolling();
FileDescriptor getNextFileDescriptor();
- void removePeerInternal(const FileDescriptor peerFD, Status status);
+ void removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr);
+
+ void onNewSignals(const FileDescriptor peerFD,
+ std::shared_ptr<RegisterSignalsProtocolMessage>& data);
- std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
- std::shared_ptr<RegisterSignalsMessage>& data);
+ void onErrorSignal(const FileDescriptor peerFD,
+ std::shared_ptr<ErrorProtocolMessage>& data);
};
return method(peerFD, tmpData);
};
- {
- Lock lock(mStateMutex);
- mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
- }
+ mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
}
template<typename SentDataType, typename ReceivedDataType>
throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
}
- setMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
+ setMethodHandlerInternal<SentDataType, ReceivedDataType>(methodID, method);
}
}
template<typename ReceivedDataType>
+void Processor::setSignalHandlerInternal(const MethodID methodID,
+ const typename SignalHandler<ReceivedDataType>::type& handler)
+{
+ SignalHandlers signalCall;
+
+ signalCall.parse = [](const int fd)->std::shared_ptr<void> {
+ std::shared_ptr<ReceivedDataType> dataToFill(new ReceivedDataType());
+ config::loadFromFD<ReceivedDataType>(fd, *dataToFill);
+ return dataToFill;
+ };
+
+ signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& dataReceived) {
+ std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(dataReceived);
+ handler(peerFD, tmpData);
+ };
+
+ mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
+}
+
+
+template<typename ReceivedDataType>
void Processor::setSignalHandler(const MethodID methodID,
const typename SignalHandler<ReceivedDataType>::type& handler)
{
throw IPCException("Forbidden methodID: " + std::to_string(methodID));
}
- std::shared_ptr<RegisterSignalsMessage> data;
+ std::shared_ptr<RegisterSignalsProtocolMessage> data;
std::vector<FileDescriptor> peerFDs;
+
{
Lock lock(mStateMutex);
throw IPCException("MethodID used by a method: " + std::to_string(methodID));
}
- SignalHandlers signalCall;
-
- signalCall.parse = [](const int fd)->std::shared_ptr<void> {
- std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
- config::loadFromFD<ReceivedDataType>(fd, *data);
- return data;
- };
-
- signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
- std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- handler(peerFD, tmpData);
- };
-
- mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
+ setSignalHandlerInternal<ReceivedDataType>(methodID, handler);
// Broadcast the new signal:
std::vector<MethodID> ids {methodID};
- data = std::make_shared<RegisterSignalsMessage>(ids);
+ data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
for (const auto kv : mSockets) {
peerFDs.push_back(kv.first);
}
for (const auto peerFD : peerFDs) {
- callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
- peerFD,
- data,
- DEFAULT_METHOD_TIMEOUT);
+ signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
+ peerFD,
+ data);
}
}
{
Lock lock(mStateMutex);
auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
- mRequestQueue.push(Event::METHOD, request);
+ mRequestQueue.pushBack(Event::METHOD, request);
return request->messageID;
}
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS)
{
- std::shared_ptr<ReceivedDataType> result;
+ Result<ReceivedDataType> result;
std::mutex mutex;
std::condition_variable cv;
- Status returnStatus = ipc::Status::UNDEFINED;
- auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
+ auto process = [&result, &mutex, &cv](const Result<ReceivedDataType> && r) {
std::unique_lock<std::mutex> lock(mutex);
- returnStatus = status;
- result = returnedData;
+ result = std::move(r);
cv.notify_all();
};
data,
process);
- auto isResultInitialized = [&returnStatus]() {
- return returnStatus != ipc::Status::UNDEFINED;
+ auto isResultInitialized = [&result]() {
+ return result.isValid();
};
std::unique_lock<std::mutex> lock(mutex);
}
}
- throwOnError(returnStatus);
+ return result.get();
+}
- return result;
+template<typename SentDataType>
+void Processor::signalInternal(const MethodID methodID,
+ const FileDescriptor peerFD,
+ const std::shared_ptr<SentDataType>& data)
+{
+ Lock lock(mStateMutex);
+ auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
+ mRequestQueue.pushFront(Event::SIGNAL, request);
}
template<typename SentDataType>
}
for (const FileDescriptor peerFD : it->second) {
auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
- mRequestQueue.push(Event::SIGNAL, request);
+ mRequestQueue.pushBack(Event::SIGNAL, request);
}
}
bool isEmpty() const;
/**
- * Push data to the queue
+ * Push data to back of the queue
*
* @param requestID request type
* @param data data corresponding to the request
*/
- void push(const RequestIdType requestID,
- const std::shared_ptr<void>& data = nullptr);
+ void pushBack(const RequestIdType requestID,
+ const std::shared_ptr<void>& data = nullptr);
+
+ /**
+ * Push data to back of the queue
+ *
+ * @param requestID request type
+ * @param data data corresponding to the request
+ */
+ void pushFront(const RequestIdType requestID,
+ const std::shared_ptr<void>& data = nullptr);
/**
* @return get the data from the next request
}
template<typename RequestIdType>
-void RequestQueue<RequestIdType>::push(const RequestIdType requestID,
- const std::shared_ptr<void>& data)
+void RequestQueue<RequestIdType>::pushBack(const RequestIdType requestID,
+ const std::shared_ptr<void>& data)
{
Request request(requestID, data);
mRequests.push_back(std::move(request));
}
template<typename RequestIdType>
+void RequestQueue<RequestIdType>::pushFront(const RequestIdType requestID,
+ const std::shared_ptr<void>& data)
+{
+ Request request(requestID, data);
+ mRequests.push_front(std::move(request));
+ mEventFD.send();
+}
+
+template<typename RequestIdType>
typename RequestQueue<RequestIdType>::Request RequestQueue<RequestIdType>::pop()
{
mEventFD.receive();
--- /dev/null
+/*
+* Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Class for storing result of a method - data or exception
+ */
+
+#ifndef COMMON_IPC_RESULT_BUILDER_HPP
+#define COMMON_IPC_RESULT_BUILDER_HPP
+
+#include "ipc/result.hpp"
+#include <functional>
+#include <exception>
+#include <memory>
+
+namespace vasum {
+namespace ipc {
+
+class ResultBuilder {
+public:
+ ResultBuilder()
+ : mData(nullptr),
+ mExceptionPtr(nullptr)
+ {}
+
+ ResultBuilder(const std::exception_ptr& exceptionPtr)
+ : mData(nullptr),
+ mExceptionPtr(exceptionPtr)
+ {}
+
+ ResultBuilder(const std::shared_ptr<void>& data)
+ : mData(data),
+ mExceptionPtr(nullptr)
+
+ {}
+
+ template<typename Data>
+ Result<Data> build()
+ {
+ return Result<Data>(std::move(std::static_pointer_cast<Data>(mData)),
+ std::move(mExceptionPtr));
+ }
+
+private:
+ std::shared_ptr<void> mData;
+ std::exception_ptr mExceptionPtr;
+};
+
+typedef std::function<void(ResultBuilder&)> ResultBuilderHandler;
+
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_RESULT_BUILDER_HPP
+
+
+
+
+
+
--- /dev/null
+/*
+* Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Class for storing result of a method - data or exception
+ */
+
+#ifndef COMMON_IPC_RESULT_HPP
+#define COMMON_IPC_RESULT_HPP
+
+#include <functional>
+#include <exception>
+#include <memory>
+
+namespace vasum {
+namespace ipc {
+
+template<typename Data>
+class Result {
+public:
+ Result()
+ : mData(nullptr),
+ mExceptionPtr(nullptr)
+ {}
+
+ Result(std::shared_ptr<Data>&& data, std::exception_ptr&& exceptionPtr)
+ : mData(std::move(data)),
+ mExceptionPtr(std::move(exceptionPtr))
+ {}
+
+ std::shared_ptr<Data> get() const
+ {
+ if (mExceptionPtr) {
+ std::rethrow_exception(mExceptionPtr);
+ }
+ return mData;
+ }
+
+ bool isValid() const
+ {
+ return (bool)mExceptionPtr || (bool)mData;
+ }
+
+private:
+ std::shared_ptr<Data> mData;
+ std::exception_ptr mExceptionPtr;
+};
+
+template<typename Data>
+struct ResultHandler {
+ typedef std::function < void(Result<Data>&&) > type;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_RESULT_HPP
#include "ipc/internals/acceptor.hpp"
#include "ipc/ipc-gsource.hpp"
#include "ipc/types.hpp"
+#include "ipc/result.hpp"
#include "logger/logger.hpp"
#include <string>
return ++gLastMessageID;
}
-std::string toString(const Status status)
-{
- switch (status) {
- case Status::OK: return "No error, everything is OK";
- case Status::PARSING_ERROR: return "Exception during reading/parsing data from the socket";
- case Status::SERIALIZATION_ERROR: return "Exception during writing/serializing data to the socket";
- case Status::PEER_DISCONNECTED: return "No such peer. Might got disconnected.";
- case Status::NAUGHTY_PEER: return "Peer performed a forbidden action.";
- case Status::REMOVED_PEER: return "Removing peer";
- case Status::CLOSING: return "Closing IPC";
- case Status::UNDEFINED: return "Undefined state";
- default: return "Unknown status";
- }
-}
-void throwOnError(const Status status)
-{
- if (status == Status::OK) {
- return;
- }
- std::string message = toString(status);
- LOGE(message);
-
- switch (status) {
- case Status::PARSING_ERROR: throw IPCParsingException(message);
- case Status::SERIALIZATION_ERROR: throw IPCSerializationException(message);
- case Status::PEER_DISCONNECTED: throw IPCPeerDisconnectedException(message);
- case Status::NAUGHTY_PEER: throw IPCNaughtyPeerException(message);
- case Status::REMOVED_PEER: throw IPCException(message);
- case Status::CLOSING: throw IPCException(message);
- case Status::UNDEFINED: throw IPCException(message);
- default: return throw IPCException(message);
- }
-}
} // namespace ipc
} // namespace vasum
typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
-enum class Status : int {
- OK = 0,
- PARSING_ERROR,
- SERIALIZATION_ERROR,
- PEER_DISCONNECTED,
- NAUGHTY_PEER,
- REMOVED_PEER,
- CLOSING,
- UNDEFINED
-};
-
-std::string toString(const Status status);
-void throwOnError(const Status status);
MessageID getNextMessageID();
-
template<typename SentDataType, typename ReceivedDataType>
struct MethodHandler {
typedef std::function<std::shared_ptr<SentDataType>(FileDescriptor peerFD,
std::shared_ptr<ReceivedDataType>& data)> type;
};
-template <typename ReceivedDataType>
-struct ResultHandler {
- typedef std::function<void(Status status,
- std::shared_ptr<ReceivedDataType>& resultData)> type;
-};
-
} // namespace ipc
} // namespace vasum
#include "ipc/service.hpp"
#include "ipc/client.hpp"
#include "ipc/types.hpp"
+#include "ipc/result.hpp"
#include "utils/glib-loop.hpp"
#include "utils/latch.hpp"
#include "utils/value-latch.hpp"
c.start();
//Async call
- auto dataBack = [&recvDataLatch](ipc::Status status, std::shared_ptr<RecvData>& data) {
- if (status == ipc::Status::OK) {
- recvDataLatch.set(data);
- }
+ auto dataBack = [&recvDataLatch](Result<RecvData> && r) {
+ recvDataLatch.set(r.get());
};
c.callAsync<SendData, RecvData>(1, sentData, dataBack);
FileDescriptor peerFD = connect(s, c);
// Async call
- auto dataBack = [&recvDataLatch](ipc::Status status, std::shared_ptr<RecvData>& data) {
- if (status == ipc::Status::OK) {
- recvDataLatch.set(data);
- }
+ auto dataBack = [&recvDataLatch](Result<RecvData> && r) {
+ recvDataLatch.set(r.get());
};
s.callAsync<SendData, RecvData>(1, peerFD, sentData, dataBack);
BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
{
- ValueLatch<ipc::Status> retStatusLatch;
+ ValueLatch<Result<RecvData>> retStatusLatch;
Service s(socketPath);
auto method = [](const FileDescriptor, std::shared_ptr<ThrowOnAcceptData>&) {
Client c(socketPath);
c.start();
- auto dataBack = [&retStatusLatch](ipc::Status status, std::shared_ptr<RecvData>&) {
- retStatusLatch.set(status);
+ auto dataBack = [&retStatusLatch](Result<RecvData> && r) {
+ retStatusLatch.set(std::move(r));
};
std::shared_ptr<SendData> sentData(new SendData(78));
c.callAsync<SendData, RecvData>(1, sentData, dataBack);
// Wait for the response
- ipc::Status retStatus = retStatusLatch.get(TIMEOUT);
+ Result<RecvData> result = retStatusLatch.get(TIMEOUT);
// The disconnection might have happened:
// - after sending the message (PEER_DISCONNECTED)
// - during external serialization (SERIALIZATION_ERROR)
- BOOST_CHECK(retStatus == ipc::Status::PEER_DISCONNECTED || retStatus == ipc::Status::SERIALIZATION_ERROR);
+ BOOST_CHECK_THROW(result.get(), IPCException);
}
BOOST_CHECK(l.wait(TIMEOUT));
}
+BOOST_AUTO_TEST_CASE(UsersError)
+{
+ const int TEST_ERROR_CODE = -234;
+ const std::string TEST_ERROR_MESSAGE = "Ay, caramba!";
+
+ Service s(socketPath);
+ Client c(socketPath);
+ auto clientID = connect(s, c);
+
+ auto throwingMethodHandler = [&](const FileDescriptor, std::shared_ptr<RecvData>&) -> std::shared_ptr<SendData> {
+ throw IPCUserException(TEST_ERROR_CODE, TEST_ERROR_MESSAGE);
+ };
+
+ s.setMethodHandler<SendData, RecvData>(1, throwingMethodHandler);
+ c.setMethodHandler<SendData, RecvData>(1, throwingMethodHandler);
+
+ std::shared_ptr<SendData> sentData(new SendData(78));
+
+ auto hasProperData = [&](const IPCUserException & e) {
+ return e.getCode() == TEST_ERROR_CODE && e.what() == TEST_ERROR_MESSAGE;
+ };
+
+ BOOST_CHECK_EXCEPTION((c.callSync<SendData, RecvData>(1, sentData, TIMEOUT)), IPCUserException, hasProperData);
+ BOOST_CHECK_EXCEPTION((s.callSync<SendData, RecvData>(1, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData);
+
+}
// BOOST_AUTO_TEST_CASE(ConnectionLimitTest)
// {
// unsigned oldLimit = ipc::getMaxFDNumber();