return mRequestQueue.getFD();
}
+void Processor::sendResult(const MethodID methodID,
+ const PeerID peerID,
+ const MessageID messageID,
+ const std::shared_ptr<void>& data)
+{
+ auto requestPtr = std::make_shared<SendResultRequest>(methodID, peerID, messageID, data);
+ mRequestQueue.pushFront(Event::SEND_RESULT, requestPtr);
+}
+
+void Processor::sendError(const PeerID peerID,
+ const MessageID messageID,
+ const int errorCode,
+ const std::string& message)
+{
+ auto data = std::make_shared<ErrorProtocolMessage>(messageID, errorCode, message);
+ signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, peerID , data);
+}
+
+void Processor::sendVoid(const MethodID methodID,
+ const PeerID peerID,
+ const MessageID messageID)
+{
+ auto data = std::make_shared<EmptyData>();
+ auto requestPtr = std::make_shared<SendResultRequest>(methodID, peerID, messageID, data);
+ mRequestQueue.pushFront(Event::SEND_RESULT, requestPtr);
+}
+
void Processor::removeMethod(const MethodID methodID)
{
Lock lock(mStateMutex);
return false;
}
- Socket& socket = *peerIt->socketPtr;
MethodID methodID;
MessageID messageID;
{
- Socket::Guard guard = socket.getGuard();
try {
+ // Read information about the incoming data
+ Socket& socket = *peerIt->socketPtr;
+ Socket::Guard guard = socket.getGuard();
socket.read(&methodID, sizeof(methodID));
socket.read(&messageID, sizeof(messageID));
-
} catch (const IPCException& e) {
LOGE(mLogPrefix + "Error during reading the socket");
removePeerInternal(peerIt,
}
LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID);
- std::shared_ptr<void> returnData;
try {
- returnData = methodCallbacks->method(peerIt->peerID, data);
+ methodCallbacks->method(peerIt->peerID,
+ data,
+ std::make_shared<MethodResult>(*this, methodID, messageID, peerIt->peerID));
} catch (const IPCUserException& e) {
LOGW("User's exception");
- auto data = std::make_shared<ErrorProtocolMessage>(messageID, e.getCode(), e.what());
- signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, peerIt->peerID, data);
+ sendError(peerIt->peerID, messageID, e.getCode(), e.what());
return false;
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception in method handler: " << e.what());
return true;
}
- LOGT(mLogPrefix + "Sending return data; methodID: " << methodID << "; messageID: " << messageID);
- try {
- // Send the call with the socket
- Socket& socket = *peerIt->socketPtr;
- Socket::Guard guard = socket.getGuard();
- socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
- socket.write(&messageID, sizeof(messageID));
- methodCallbacks->serialize(socket.getFD(), returnData);
- } catch (const std::exception& e) {
- LOGE(mLogPrefix + "Exception during serialization: " << e.what());
- removePeerInternal(peerIt,
- std::make_exception_ptr(IPCSerializationException()));
-
- return true;
- }
-
return false;
}
case Event::SIGNAL: return onSignalRequest(*request.get<SignalRequest>());
case Event::ADD_PEER: return onAddPeerRequest(*request.get<AddPeerRequest>());
case Event::REMOVE_PEER: return onRemovePeerRequest(*request.get<RemovePeerRequest>());
+ case Event::SEND_RESULT: return onSendResultRequest(*request.get<SendResultRequest>());
case Event::FINISH: return onFinishRequest(*request.get<FinishRequest>());
}
std::move(request.parse),
std::move(request.process)));
- Socket& socket = *peerIt->socketPtr;
try {
// Send the call with the socket
+ Socket& socket = *peerIt->socketPtr;
Socket::Guard guard = socket.getGuard();
socket.write(&request.methodID, sizeof(request.methodID));
socket.write(&request.messageID, sizeof(request.messageID));
return false;
}
- Socket& socket = *peerIt->socketPtr;
try {
// Send the call with the socket
+ Socket& socket = *peerIt->socketPtr;
Socket::Guard guard = socket.getGuard();
socket.write(&request.methodID, sizeof(request.methodID));
socket.write(&request.messageID, sizeof(request.messageID));
return true;
}
+bool Processor::onSendResultRequest(SendResultRequest& request)
+{
+ LOGS(mLogPrefix + "Processor onMethodRequest");
+
+ auto peerIt = getPeerInfoIterator(request.peerID);
+
+ if (peerIt == mPeerInfo.end()) {
+ LOGE(mLogPrefix + "Peer disconnected, no result is sent. No user with a peerID: " << request.peerID);
+ return false;
+ }
+
+ std::shared_ptr<MethodHandlers> methodCallbacks;
+ try {
+ methodCallbacks = mMethodsCallbacks.at(request.methodID);
+ } catch (const std::out_of_range&) {
+ LOGW(mLogPrefix + "No method, might have been deleted. methodID: " << request.methodID);
+ return true;
+ }
+
+ try {
+ // Send the call with the socket
+ Socket& socket = *peerIt->socketPtr;
+ Socket::Guard guard = socket.getGuard();
+ socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
+ socket.write(&request.messageID, sizeof(request.messageID));
+ LOGT(mLogPrefix + "Serializing the message");
+ methodCallbacks->serialize(socket.getFD(), request.data);
+ } catch (const std::exception& e) {
+ LOGE(mLogPrefix + "Error during sending a method: " << e.what());
+
+ // Inform about the error
+ ResultBuilder resultBuilder(std::make_exception_ptr(IPCSerializationException()));
+ IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(resultBuilder));
+
+
+ mReturnCallbacks.erase(request.messageID);
+ removePeerInternal(peerIt,
+ std::make_exception_ptr(IPCSerializationException()));
+ return true;
+
+ }
+
+ return false;
+}
+
bool Processor::onFinishRequest(FinishRequest& request)
{
LOGS(mLogPrefix + "Processor onFinishRequest");
onRemovePeerRequest(*request.get<RemovePeerRequest>());
break;
}
+ case Event::SEND_RESULT: {
+ onSendResultRequest(*request.get<SendResultRequest>());
+ break;
+ }
case Event::SIGNAL:
case Event::ADD_PEER:
case Event::FINISH:
os << "Event::REMOVE_PEER";
break;
}
+
+ case Processor::Event::SEND_RESULT: {
+ os << "Event::SEND_RESULT";
+ break;
+ }
}
return os;
#include "ipc/internals/signal-request.hpp"
#include "ipc/internals/add-peer-request.hpp"
#include "ipc/internals/remove-peer-request.hpp"
+#include "ipc/internals/send-result-request.hpp"
#include "ipc/internals/finish-request.hpp"
#include "ipc/exception.hpp"
+#include "ipc/method-result.hpp"
#include "ipc/types.hpp"
#include "config/manager.hpp"
#include "config/fields.hpp"
#include <list>
#include <functional>
#include <unordered_map>
+#include <utility>
namespace vasum {
namespace ipc {
const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
-
/**
* This class wraps communication via UX sockets
*
class Processor {
private:
enum class Event {
- FINISH, // Shutdown request
- METHOD, // New method call in the queue
- SIGNAL, // New signal call in the queue
- ADD_PEER, // New peer in the queue
- REMOVE_PEER // Remove peer
+ FINISH, // Shutdown request
+ METHOD, // New method call in the queue
+ SIGNAL, // New signal call in the queue
+ ADD_PEER, // New peer in the queue
+ REMOVE_PEER, // Remove peer
+ SEND_RESULT // Send the result of a method's call
};
public:
const typename SignalHandler<ReceivedDataType>::type& process);
/**
+ * Send result of the method.
+ * Used for asynchronous communication, only internally.
+ *
+ * @param methodID API dependent id of the method
+ * @param peerID id of the peer
+ * @param messageID id of the message to which it replies
+ * @param data data to send
+ */
+ void sendResult(const MethodID methodID,
+ const PeerID peerID,
+ const MessageID messageID,
+ const std::shared_ptr<void>& data);
+
+ /**
+ * Send error result of the method
+ *
+ * @param peerID id of the peer
+ * @param messageID id of the message to which it replies
+ * @param errorCode code of the error
+ * @param message description of the error
+ */
+ void sendError(const PeerID peerID,
+ const MessageID messageID,
+ const int errorCode,
+ const std::string& message);
+
+ /**
+ * Indicate that the method handler finished
+ *
+ * @param methodID API dependent id of the method
+ * @param peerID id of the peer
+ * @param messageID id of the message to which it replies
+ */
+ void sendVoid(const MethodID methodID,
+ const PeerID peerID,
+ const MessageID messageID);
+
+ /**
* Removes the callback
*
* @param methodID API dependent id of the method
*
* @param methodID API dependent id of the method
* @param peerID id of the peer
- * @param data data to sent
+ * @param data data to send
* @param timeoutMS how long to wait for the return value before throw
* @tparam SentDataType data type to send
* @tparam ReceivedDataType data type to receive
bool onSignalRequest(SignalRequest& request);
bool onAddPeerRequest(AddPeerRequest& request);
bool onRemovePeerRequest(RemovePeerRequest& request);
+ bool onSendResultRequest(SendResultRequest& request);
bool onFinishRequest(FinishRequest& request);
bool handleLostConnections();
config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
};
- methodCall.method = [method](const PeerID peerID, std::shared_ptr<void>& data)->std::shared_ptr<void> {
+ methodCall.method = [method](const PeerID peerID, std::shared_ptr<void>& data, MethodResult::Pointer && methodResult) {
std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- return method(peerID, tmpData);
+ method(peerID, tmpData, std::forward<MethodResult::Pointer>(methodResult));
};
mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
const PeerID peerID,
const std::shared_ptr<SentDataType>& data)
{
- auto request = SignalRequest::create<SentDataType>(methodID, peerID, data);
- mRequestQueue.pushFront(Event::SIGNAL, request);
+ auto requestPtr = SignalRequest::create<SentDataType>(methodID, peerID, data);
+ mRequestQueue.pushFront(Event::SIGNAL, requestPtr);
}
template<typename SentDataType>
return;
}
for (const PeerID peerID : it->second) {
- auto request = SignalRequest::create<SentDataType>(methodID, peerID, data);
- mRequestQueue.pushBack(Event::SIGNAL, request);
+ auto requestPtr = SignalRequest::create<SentDataType>(methodID, peerID, data);
+ mRequestQueue.pushBack(Event::SIGNAL, requestPtr);
}
}
#include <list>
#include <memory>
+#include <mutex>
#include <algorithm>
namespace vasum {
/**
* @return event's file descriptor
*/
- int getFD() const;
+ int getFD();
/**
* @return is the queue empty
*/
- bool isEmpty() const;
+ bool isEmpty();
/**
* Push data to back of the queue
bool removeIf(Predicate predicate);
private:
+ typedef std::unique_lock<std::mutex> Lock;
+
std::list<Request> mRequests;
+ std::mutex mStateMutex;
EventFD mEventFD;
};
template<typename RequestIdType>
-int RequestQueue<RequestIdType>::getFD() const
+int RequestQueue<RequestIdType>::getFD()
{
+ Lock lock(mStateMutex);
return mEventFD.getFD();
}
template<typename RequestIdType>
-bool RequestQueue<RequestIdType>::isEmpty() const
+bool RequestQueue<RequestIdType>::isEmpty()
{
+ Lock lock(mStateMutex);
return mRequests.empty();
}
void RequestQueue<RequestIdType>::pushBack(const RequestIdType requestID,
const std::shared_ptr<void>& data)
{
+ Lock lock(mStateMutex);
Request request(requestID, data);
mRequests.push_back(std::move(request));
mEventFD.send();
void RequestQueue<RequestIdType>::pushFront(const RequestIdType requestID,
const std::shared_ptr<void>& data)
{
+ Lock lock(mStateMutex);
Request request(requestID, data);
mRequests.push_front(std::move(request));
mEventFD.send();
template<typename RequestIdType>
typename RequestQueue<RequestIdType>::Request RequestQueue<RequestIdType>::pop()
{
+ Lock lock(mStateMutex);
mEventFD.receive();
if (mRequests.empty()) {
LOGE("Request queue is empty");
template<typename Predicate>
bool RequestQueue<RequestIdType>::removeIf(Predicate predicate)
{
+ Lock lock(mStateMutex);
auto it = std::find_if(mRequests.begin(), mRequests.end(), predicate);
if (it == mRequests.end()) {
return false;
--- /dev/null
+/*
+* Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Processor's request to send the result of a method
+ */
+
+#ifndef COMMON_IPC_INTERNALS_SEND_RESULT_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_SEND_RESULT_REQUEST_HPP
+
+#include "ipc/types.hpp"
+#include "logger/logger-scope.hpp"
+
+namespace vasum {
+namespace ipc {
+
+class SendResultRequest {
+public:
+ SendResultRequest(const SendResultRequest&) = delete;
+ SendResultRequest& operator=(const SendResultRequest&) = delete;
+
+ SendResultRequest(const MethodID methodID,
+ const PeerID peerID,
+ const MessageID messageID,
+ const std::shared_ptr<void>& data)
+ : methodID(methodID),
+ peerID(peerID),
+ messageID(messageID),
+ data(data)
+ {}
+
+ MethodID methodID;
+ PeerID peerID;
+ MessageID messageID;
+ std::shared_ptr<void> data;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_SEND_RESULT_REQUEST_HPP
SignalRequest(const SignalRequest&) = delete;
SignalRequest& operator=(const SignalRequest&) = delete;
-
-
template<typename SentDataType>
static std::shared_ptr<SignalRequest> create(const MethodID methodID,
const PeerID peerID,
--- /dev/null
+/*
+* Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Class for sending the result of a method
+ */
+
+#include "config.hpp"
+
+#include "ipc/method-result.hpp"
+#include "ipc/internals/processor.hpp"
+
+namespace vasum {
+namespace ipc {
+
+MethodResult::MethodResult(Processor& processor,
+ const MethodID methodID,
+ const MessageID messageID,
+ const PeerID peerID)
+ : mProcessor(processor),
+ mMethodID(methodID),
+ mPeerID(peerID),
+ mMessageID(messageID)
+{}
+
+void MethodResult::setInternal(const std::shared_ptr<void>& data)
+{
+ mProcessor.sendResult(mMethodID, mPeerID, mMessageID, data);
+}
+
+void MethodResult::setVoid()
+{
+ mProcessor.sendVoid(mMethodID, mPeerID, mMessageID);
+}
+
+void MethodResult::setError(const int code, const std::string& message)
+{
+ mProcessor.sendError(mPeerID, mMessageID, code, message);
+}
+
+} // namespace ipc
+} // namespace vasum
--- /dev/null
+/*
+* Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Class for sending the result of a method
+ */
+
+#ifndef COMMON_IPC_METHOD_RESULT_HPP
+#define COMMON_IPC_METHOD_RESULT_HPP
+
+#include "ipc/types.hpp"
+#include "logger/logger.hpp"
+#include <memory>
+
+namespace vasum {
+namespace ipc {
+
+class Processor;
+
+class MethodResult {
+public:
+ typedef std::shared_ptr<MethodResult> Pointer;
+
+ MethodResult(Processor& processor,
+ const MethodID methodID,
+ const MessageID messageID,
+ const PeerID peerID);
+
+
+ template<typename Data>
+ void set(const std::shared_ptr<Data>& data)
+ {
+ setInternal(data);
+ }
+
+ void setVoid();
+ void setError(const int code, const std::string& message);
+
+private:
+ Processor& mProcessor;
+ MethodID mMethodID;
+ PeerID mPeerID;
+ MessageID mMessageID;
+
+ void setInternal(const std::shared_ptr<void>& data);
+};
+
+template<typename SentDataType, typename ReceivedDataType>
+struct MethodHandler {
+ typedef std::function < void(PeerID peerID,
+ std::shared_ptr<ReceivedDataType>& data,
+ MethodResult::Pointer&& methodResult) > type;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_METHOD_RESULT_HPP
#ifndef COMMON_IPC_TYPES_HPP
#define COMMON_IPC_TYPES_HPP
-#include "ipc/exception.hpp"
#include <functional>
#include <memory>
#include <string>
MessageID getNextMessageID();
PeerID getNextPeerID();
-template<typename SentDataType, typename ReceivedDataType>
-struct MethodHandler {
- typedef std::function<std::shared_ptr<SentDataType>(PeerID peerID,
- std::shared_ptr<ReceivedDataType>& data)> type;
-};
template<typename ReceivedDataType>
struct SignalHandler {
#include <thread>
#include <chrono>
#include <utility>
+#include <future>
#include <boost/filesystem.hpp>
using namespace vasum;
}
};
-std::shared_ptr<EmptyData> returnEmptyCallback(const PeerID, std::shared_ptr<EmptyData>&)
+void returnEmptyCallback(const PeerID,
+ std::shared_ptr<EmptyData>&,
+ MethodResult::Pointer methodResult)
{
- return std::make_shared<EmptyData>();
+ methodResult->setVoid();
}
-std::shared_ptr<SendData> returnDataCallback(const PeerID, std::shared_ptr<RecvData>&)
+void returnDataCallback(const PeerID,
+ std::shared_ptr<RecvData>&,
+ MethodResult::Pointer methodResult)
{
- return std::make_shared<SendData>(1);
+ auto returnData = std::make_shared<SendData>(1);
+ methodResult->set(returnData);
}
-std::shared_ptr<SendData> echoCallback(const PeerID, std::shared_ptr<RecvData>& data)
+void echoCallback(const PeerID,
+ std::shared_ptr<RecvData>& data,
+ MethodResult::Pointer methodResult)
{
- return std::make_shared<SendData>(data->intVal);
+ auto returnData = std::make_shared<SendData>(data->intVal);
+ methodResult->set(returnData);
}
-std::shared_ptr<SendData> longEchoCallback(const PeerID, std::shared_ptr<RecvData>& data)
+void longEchoCallback(const PeerID,
+ std::shared_ptr<RecvData>& data,
+ MethodResult::Pointer methodResult)
{
std::this_thread::sleep_for(std::chrono::milliseconds(LONG_OPERATION_TIME));
- return std::make_shared<SendData>(data->intVal);
+ auto returnData = std::make_shared<SendData>(data->intVal);
+ methodResult->set(returnData);
}
PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientGlib = false)
ValueLatch<Result<RecvData>> retStatusLatch;
Service s(socketPath);
- auto method = [](const PeerID, std::shared_ptr<ThrowOnAcceptData>&) {
- return std::shared_ptr<SendData>(new SendData(1));
+ auto method = [](const PeerID, std::shared_ptr<ThrowOnAcceptData>&, MethodResult::Pointer methodResult) {
+ auto resultData = std::make_shared<SendData>(1);
+ methodResult->set<SendData>(resultData);
};
// Method will throw during serialization and disconnect automatically
BOOST_AUTO_TEST_CASE(ReadTimeout)
{
Service s(socketPath);
- auto longEchoCallback = [](const PeerID, std::shared_ptr<RecvData>& data) {
- return std::shared_ptr<LongSendData>(new LongSendData(data->intVal, LONG_OPERATION_TIME));
+ auto longEchoCallback = [](const PeerID, std::shared_ptr<RecvData>& data, MethodResult::Pointer methodResult) {
+ auto resultData = std::make_shared<LongSendData>(data->intVal, LONG_OPERATION_TIME);
+ methodResult->set<LongSendData>(resultData);
};
s.setMethodHandler<LongSendData, RecvData>(1, longEchoCallback);
recvDataLatchB.set(data);
};
- LOGH("SETTING SIGNAAALS");
c.setSignalHandler<RecvData>(1, handlerA);
c.setSignalHandler<RecvData>(2, handlerB);
Client c(socketPath);
auto clientID = connect(s, c);
- auto throwingMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&) -> std::shared_ptr<SendData> {
+ auto throwingMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer) {
throw IPCUserException(TEST_ERROR_CODE, TEST_ERROR_MESSAGE);
};
+ auto sendErrorMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer methodResult) {
+ methodResult->setError(TEST_ERROR_CODE, TEST_ERROR_MESSAGE);
+ };
+
s.setMethodHandler<SendData, RecvData>(1, throwingMethodHandler);
+ s.setMethodHandler<SendData, RecvData>(2, sendErrorMethodHandler);
c.setMethodHandler<SendData, RecvData>(1, throwingMethodHandler);
+ c.setMethodHandler<SendData, RecvData>(2, sendErrorMethodHandler);
std::shared_ptr<SendData> sentData(new SendData(78));
BOOST_CHECK_EXCEPTION((c.callSync<SendData, RecvData>(1, sentData, TIMEOUT)), IPCUserException, hasProperData);
BOOST_CHECK_EXCEPTION((s.callSync<SendData, RecvData>(1, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData);
+ BOOST_CHECK_EXCEPTION((c.callSync<SendData, RecvData>(2, sentData, TIMEOUT)), IPCUserException, hasProperData);
+ BOOST_CHECK_EXCEPTION((s.callSync<SendData, RecvData>(2, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData);
+}
+
+BOOST_AUTO_TEST_CASE(AsyncResult)
+{
+ const int TEST_ERROR_CODE = -567;
+ const std::string TEST_ERROR_MESSAGE = "Ooo jooo!";
+
+ Service s(socketPath);
+ Client c(socketPath);
+ auto clientID = connect(s, c);
+
+ auto errorMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer methodResult) {
+ std::async(std::launch::async, [&, methodResult] {
+ std::this_thread::sleep_for(std::chrono::milliseconds(SHORT_OPERATION_TIME));
+ methodResult->setError(TEST_ERROR_CODE, TEST_ERROR_MESSAGE);
+ });
+ };
+
+ auto voidMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&, MethodResult::Pointer methodResult) {
+ std::async(std::launch::async, [methodResult] {
+ std::this_thread::sleep_for(std::chrono::milliseconds(SHORT_OPERATION_TIME));
+ methodResult->setVoid();
+ });
+ };
+
+ auto dataMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>& data, MethodResult::Pointer methodResult) {
+ std::async(std::launch::async, [data, methodResult] {
+ std::this_thread::sleep_for(std::chrono::milliseconds(SHORT_OPERATION_TIME));
+ methodResult->set(data);
+ });
+ };
+
+ s.setMethodHandler<SendData, RecvData>(1, errorMethodHandler);
+ s.setMethodHandler<EmptyData, RecvData>(2, voidMethodHandler);
+ s.setMethodHandler<SendData, RecvData>(3, dataMethodHandler);
+ c.setMethodHandler<SendData, RecvData>(1, errorMethodHandler);
+ c.setMethodHandler<EmptyData, RecvData>(2, voidMethodHandler);
+ c.setMethodHandler<SendData, RecvData>(3, dataMethodHandler);
+
+ std::shared_ptr<SendData> sentData(new SendData(90));
+
+ auto hasProperData = [&](const IPCUserException & e) {
+ return e.getCode() == TEST_ERROR_CODE && e.what() == TEST_ERROR_MESSAGE;
+ };
+
+ BOOST_CHECK_EXCEPTION((s.callSync<SendData, RecvData>(1, clientID, sentData, TIMEOUT)), IPCUserException, hasProperData);
+ BOOST_CHECK_EXCEPTION((c.callSync<SendData, RecvData>(1, sentData, TIMEOUT)), IPCUserException, hasProperData);
+
+ BOOST_CHECK_NO_THROW((s.callSync<SendData, EmptyData>(2, clientID, sentData, TIMEOUT)));
+ BOOST_CHECK_NO_THROW((c.callSync<SendData, EmptyData>(2, sentData, TIMEOUT)));
+ std::shared_ptr<RecvData> recvData;
+ recvData = s.callSync<SendData, RecvData>(3, clientID, sentData, TIMEOUT);
+ BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
+ recvData = c.callSync<SendData, RecvData>(3, sentData, TIMEOUT);
+ BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
+
// BOOST_AUTO_TEST_CASE(ConnectionLimitTest)
// {
// unsigned oldLimit = ipc::getMaxFDNumber();