IPCException(const std::string& error) : SecurityContainersException(error) {}
};
+struct IPCParsingException: public IPCException {
+ IPCParsingException(const std::string& error) : IPCException(error) {}
+};
+
+struct IPCSerializationException: public IPCException {
+ IPCSerializationException(const std::string& error) : IPCException(error) {}
+};
+
+struct IPCPeerDisconnectedException: public IPCException {
+ IPCPeerDisconnectedException(const std::string& error) : IPCException(error) {}
+};
+
+struct IPCNaughtyPeerException: public IPCException {
+ IPCNaughtyPeerException(const std::string& error) : IPCException(error) {}
+};
+struct IPCTimeoutException: public IPCException {
+ IPCTimeoutException(const std::string& error) : IPCException(error) {}
+};
}
}
LOGE("Error in poll: " << std::string(strerror(errno)));
throw IPCException("Error in poll: " + std::string(strerror(errno)));
- break;
}
// Check for incoming connections
#include <sys/socket.h>
#include <limits>
-
namespace security_containers {
namespace ipc {
+#define IGNORE_EXCEPTIONS(expr) \
+ try \
+ { \
+ expr; \
+ } \
+ catch (const std::exception& e){ \
+ LOGE("Callback threw an error: " << e.what()); \
+ }
+
+
+
+
const Processor::MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
Processor::Processor(const PeerCallback& newPeerCallback,
{
Lock lock(mSocketsMutex);
peerID = getNextPeerID();
- SocketInfo socketInfo;
- socketInfo.peerID = peerID;
- socketInfo.socketPtr = std::move(socketPtr);
- mNewSockets.push(std::move(socketInfo));
+ mNewSockets.emplace(peerID, std::move(socketPtr));
}
LOGI("New peer added. Id: " << peerID);
mEventQueue.send(Event::NEW_PEER);
return peerID;
}
-void Processor::removePeer(PeerID peerID)
+void Processor::removePeer(const PeerID peerID, Status status)
{
LOGW("Removing naughty peer. ID: " << peerID);
{
Lock lock(mSocketsMutex);
mSockets.erase(peerID);
}
+
+ {
+ // Erase associated return value callbacks
+ Lock lock(mReturnCallbacksMutex);
+
+ std::shared_ptr<void> data;
+ for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
+ if (it->second.peerID == peerID) {
+ IGNORE_EXCEPTIONS(it->second.process(status, data));
+ it = mReturnCallbacks.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
resetPolling();
}
}
}
+
bool Processor::handleLostConnections()
{
std::list<PeerID> peersToRemove;
}
}
- for (const auto peerID : peersToRemove) {
- LOGT("Removing peer. ID: " << peerID);
- mSockets.erase(peerID);
- }
}
- if (!peersToRemove.empty()) {
- resetPolling();
+ for (const PeerID peerID : peersToRemove) {
+ removePeer(peerID, Status::PEER_DISCONNECTED);
}
return !peersToRemove.empty();
MethodID methodID;
MessageID messageID;
{
- LOGI("Locking");
Socket::Guard guard = socket.getGuard();
socket.read(&methodID, sizeof(methodID));
socket.read(&messageID, sizeof(messageID));
- LOGI("Locked");
if (methodID == RETURN_METHOD_ID) {
- LOGI("Return value for messageID: " << messageID);
- ReturnCallbacks returnCallbacks;
- try {
- Lock lock(mReturnCallbacksMutex);
- LOGT("Getting the return callback");
- returnCallbacks = std::move(mReturnCallbacks.at(messageID));
- mReturnCallbacks.erase(messageID);
- } catch (const std::out_of_range&) {
- LOGW("No return callback for messageID: " << messageID);
- return false;
- }
+ return onReturnValue(peerID, socket, messageID);
+ } else {
+ return onRemoteCall(peerID, socket, methodID, messageID);
+ }
+ }
- std::shared_ptr<void> data;
- try {
- LOGT("Parsing incoming return data");
- data = returnCallbacks.parse(socket.getFD());
- } catch (const IPCException&) {
- removePeer(peerID);
- return true;
- }
+ return false;
+}
- guard.unlock();
+bool Processor::onReturnValue(const PeerID peerID,
+ const Socket& socket,
+ const MessageID messageID)
+{
+ LOGI("Return value for messageID: " << messageID);
+ ReturnCallbacks returnCallbacks;
+ try {
+ Lock lock(mReturnCallbacksMutex);
+ LOGT("Getting the return callback");
+ returnCallbacks = std::move(mReturnCallbacks.at(messageID));
+ mReturnCallbacks.erase(messageID);
+ } catch (const std::out_of_range&) {
+ LOGW("No return callback for messageID: " << messageID);
+ removePeer(peerID, Status::NAUGHTY_PEER);
+ return true;
+ }
- LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
- returnCallbacks.process(data);
+ std::shared_ptr<void> data;
+ try {
+ LOGT("Parsing incoming return data");
+ data = returnCallbacks.parse(socket.getFD());
+ } catch (const std::exception& e) {
+ LOGE("Exception during parsing: " << e.what());
+ IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
+ removePeer(peerID, Status::PARSING_ERROR);
+ return true;
+ }
- } else {
- LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
- std::shared_ptr<MethodHandlers> methodCallbacks;
- try {
- Lock lock(mCallsMutex);
- methodCallbacks = mMethodsCallbacks.at(methodID);
- } catch (const std::out_of_range&) {
- LOGW("No method callback for methodID: " << methodID);
- removePeer(peerID);
- return true;
- }
+ LOGT("Process return value callback for messageID: " << messageID);
+ IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
- std::shared_ptr<void> data;
- try {
- LOGT("Parsing incoming data");
- data = methodCallbacks->parse(socket.getFD());
- } catch (const IPCException&) {
- removePeer(peerID);
- return true;
- }
+ return false;
+}
- guard.unlock();
-
- LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
- std::shared_ptr<void> returnData = methodCallbacks->method(data);
-
- LOGT("Sending return data; methodID: " << methodID << "; messageID: " << messageID);
- try {
- // Send the call with the socket
- Socket::Guard guard = socket.getGuard();
- socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
- socket.write(&messageID, sizeof(messageID));
- methodCallbacks->serialize(socket.getFD(), returnData);
- } catch (const IPCException&) {
- removePeer(peerID);
- return true;
- }
- }
+bool Processor::onRemoteCall(const PeerID peerID,
+ const Socket& socket,
+ const MethodID methodID,
+ const MessageID messageID)
+{
+ LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
+
+ std::shared_ptr<MethodHandlers> methodCallbacks;
+ try {
+ Lock lock(mCallsMutex);
+ methodCallbacks = mMethodsCallbacks.at(methodID);
+ } catch (const std::out_of_range&) {
+ LOGW("No method callback for methodID: " << methodID);
+ removePeer(peerID, Status::NAUGHTY_PEER);
+ return true;
+ }
+
+ std::shared_ptr<void> data;
+ try {
+ LOGT("Parsing incoming data");
+ data = methodCallbacks->parse(socket.getFD());
+ } catch (const std::exception& e) {
+ LOGE("Exception during parsing: " << e.what());
+ removePeer(peerID, Status::PARSING_ERROR);
+ return true;
+ }
+
+ LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
+ std::shared_ptr<void> returnData;
+ try {
+ returnData = methodCallbacks->method(data);
+ } catch (const std::exception& e) {
+ LOGE("Exception in method handler: " << e.what());
+ removePeer(peerID, Status::NAUGHTY_PEER);
+ return true;
+ }
+
+ LOGT("Sending return data; methodID: " << methodID << "; messageID: " << messageID);
+ try {
+ // Send the call with the socket
+ Socket::Guard guard = socket.getGuard();
+ socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
+ socket.write(&messageID, sizeof(messageID));
+ methodCallbacks->serialize(socket.getFD(), returnData);
+ } catch (const std::exception& e) {
+ LOGE("Exception during serialization: " << e.what());
+ removePeer(peerID, Status::SERIALIZATION_ERROR);
+ return true;
}
return false;
case Event::CALL: {
LOGD("Event CALL");
- handleCall();
- return false;
+ return handleCall();
}
case Event::NEW_PEER: {
if (mSockets.size() > mMaxNumberOfPeers) {
LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerID);
-
+ return false;
}
if (mSockets.count(socketInfo.peerID) != 0) {
LOGE("There already was a socket for peerID: " << socketInfo.peerID);
+ return false;
}
- mSockets[socketInfo.peerID] = socketInfo.socketPtr;
+
+ mSockets.emplace(socketInfo.peerID, std::move(socketInfo.socketPtr));
}
resetPolling();
-
if (mNewPeerCallback) {
// Notify about the new user.
mNewPeerCallback(socketInfo.peerID);
return call;
}
-void Processor::handleCall()
+bool Processor::handleCall()
{
- LOGT("Handle call from another thread");
+ LOGT("Handle call (from another thread) to send a message.");
Call call = getCall();
- ReturnCallbacks returnCallbacks;
- returnCallbacks.parse = call.parse;
- returnCallbacks.process = call.process;
-
std::shared_ptr<Socket> socketPtr;
try {
- // Get the addressee's socket
+ // Get the peer's socket
Lock lock(mSocketsMutex);
socketPtr = mSockets.at(call.peerID);
} catch (const std::out_of_range&) {
LOGE("Peer disconnected. No socket with a peerID: " << call.peerID);
- return;
+ IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
+ return false;
}
MessageID messageID = getNextMessageID();
if (mReturnCallbacks.count(messageID) != 0) {
LOGE("There already was a return callback for messageID: " << messageID);
}
- mReturnCallbacks[messageID] = std::move(returnCallbacks);
+ mReturnCallbacks.emplace(messageID, ReturnCallbacks(call.peerID,
+ std::move(call.parse),
+ std::move(call.process)));
}
try {
call.serialize(socketPtr->getFD(), call.data);
} catch (const std::exception& e) {
LOGE("Error during sending a message: " << e.what());
+
+ // Inform about the error
+ IGNORE_EXCEPTIONS(mReturnCallbacks[messageID].process(Status::SERIALIZATION_ERROR, call.data));
+
{
Lock lock(mReturnCallbacksMutex);
mReturnCallbacks.erase(messageID);
}
- // TODO: User should get the error code.
+
+ removePeer(call.peerID, Status::SERIALIZATION_ERROR);
+ return true;
}
+
+ return false;
}
} // namespace ipc
* - don't throw timeout if the message is already processed
* - naming convention or methods that just commissions the PROCESS thread to do something
* - removePeer API function
+* - error handling - special message type
+* - some mutexes may not be needed
*/
class Processor {
public:
ReturnCallbacks(ReturnCallbacks&&) = default;
ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
+ ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultHandler<void>::type& process)
+ : peerID(peerID), parse(parse), process(process) {}
+
+ PeerID peerID;
ParseCallback parse;
ResultHandler<void>::type process;
};
SocketInfo(SocketInfo&&) = default;
SocketInfo& operator=(SocketInfo &&) = default;
- std::shared_ptr<Socket> socketPtr;
+ SocketInfo(const PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
+ : peerID(peerID), socketPtr(socketPtr) {}
+
PeerID peerID;
+ std::shared_ptr<Socket> socketPtr;
};
enum class Event : int {
void run();
bool handleEvent();
- void handleCall();
+ bool handleCall();
bool handleLostConnections();
bool handleInputs();
bool handleInput(const PeerID peerID, const Socket& socket);
+ bool onReturnValue(const PeerID peerID,
+ const Socket& socket,
+ const MessageID messageID);
+ bool onRemoteCall(const PeerID peerID,
+ const Socket& socket,
+ const MethodID methodID,
+ const MessageID messageID);
void resetPolling();
MessageID getNextMessageID();
PeerID getNextPeerID();
Call getCall();
- void removePeer(PeerID peerID);
+ void removePeer(const PeerID peerID, Status status);
};
config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
};
- call.process = [process](std::shared_ptr<void>& data)->void {
+ call.process = [process](Status status, std::shared_ptr<void>& data)->void {
std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- return process(tmpData);
+ return process(status, tmpData);
};
{
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
std::condition_variable cv;
+ Status returnStatus = ipc::Status::UNDEFINED;
- auto process = [&result, &cv](std::shared_ptr<ReceivedDataType> returnedData) {
+ auto process = [&result, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
+ returnStatus = status;
result = returnedData;
cv.notify_one();
};
data,
process);
- auto isResultInitialized = [&result]() {
- return static_cast<bool>(result);
+ auto isResultInitialized = [&returnStatus]() {
+ return returnStatus != ipc::Status::UNDEFINED;
};
if (!cv.wait_for(lck, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
LOGE("Function call timeout; methodID: " << methodID);
- throw IPCException("Function call timeout; methodID: " + std::to_string(methodID));
+ throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
}
+ throwOnError(returnStatus);
+
return result;
}
std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
- unsigned int timeoutMS);
+ unsigned int timeoutMS = 500);
/**
* Asynchronous method call. The return callback will be called on
std::shared_ptr<ReceivedDataType> Service::callSync(const MethodID methodID,
const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
- unsigned int timeoutMS = 500)
+ unsigned int timeoutMS)
{
LOGD("Sync calling method: " << methodID << " for user: " << peerID);
return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerID, data, timeoutMS);
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Types definitions and helper functions
+ */
+
+#include "ipc/types.hpp"
+#include "logger/logger.hpp"
+
+
+namespace security_containers {
+namespace ipc {
+
+std::string toString(const Status status)
+{
+ switch (status) {
+ case Status::OK: return "No error, everything is OK";
+ case Status::PARSING_ERROR: return "Exception during reading/parsing data from the socket";
+ case Status::SERIALIZATION_ERROR: return "Exception during writing/serializing data to the socket";
+ case Status::PEER_DISCONNECTED: return "No such peer. Might got disconnected.";
+ case Status::NAUGHTY_PEER: return "Peer performed a forbidden action.";
+ case Status::UNDEFINED: return "Undefined state";
+ default: return "Unknown status";
+ }
+}
+
+void throwOnError(const Status status)
+{
+ if (status == Status::OK) {
+ return;
+ }
+
+ std::string message = toString(status);
+ LOGE(message);
+
+ switch (status) {
+ case Status::PARSING_ERROR: throw IPCParsingException(message);
+ case Status::SERIALIZATION_ERROR: throw IPCSerializationException(message);
+ case Status::PEER_DISCONNECTED: throw IPCPeerDisconnectedException(message);
+ case Status::NAUGHTY_PEER: throw IPCNaughtyPeerException(message);
+ case Status::UNDEFINED: throw IPCException(message);
+ default: return throw IPCException(message);
+ }
+}
+} // namespace ipc
+} // namespace security_containers
/**
* @file
* @author Jan Olszak (j.olszak@samsung.com)
- * @brief Handler types definitions
+ * @brief Types definitions
*/
#ifndef COMMON_IPC_HANDLERS_HPP
#define COMMON_IPC_HANDLERS_HPP
+#include "ipc/exception.hpp"
+
#include <functional>
#include <memory>
+#include <string>
namespace security_containers {
namespace ipc {
+enum class Status : int {
+ OK = 0,
+ PARSING_ERROR,
+ SERIALIZATION_ERROR,
+ PEER_DISCONNECTED,
+ NAUGHTY_PEER,
+ UNDEFINED
+};
+
+std::string toString(const Status status);
+void throwOnError(const Status status);
+
template<typename SentDataType, typename ReceivedDataType>
struct MethodHandler {
typedef std::function<std::shared_ptr<SentDataType>(std::shared_ptr<ReceivedDataType>&)> type;
template <typename ReceivedDataType>
struct ResultHandler {
- typedef std::function<void(std::shared_ptr<ReceivedDataType>&)> type;
+ typedef std::function<void(Status, std::shared_ptr<ReceivedDataType>&)> type;
};
} // namespace ipc
CONFIG_REGISTER_EMPTY
};
+struct ThrowOnAcceptData {
+ template<typename Visitor>
+ void accept(Visitor)
+ {
+ LOGE("Serialization and parsing failed");
+ throw std::exception();
+ }
+ template<typename Visitor>
+ void accept(Visitor) const
+ {
+ LOGE("Const Serialization and parsing failed");
+ throw std::exception();
+ }
+};
+
std::shared_ptr<EmptyData> returnEmptyCallback(std::shared_ptr<EmptyData>&)
{
return std::shared_ptr<EmptyData>(new EmptyData());
return data;
}
+std::shared_ptr<SendData> longEchoCallback(std::shared_ptr<SendData>& data)
+{
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ return data;
+}
+
void testEcho(Client& c, const Client::MethodID methodID)
{
std::shared_ptr<SendData> sentData(new SendData(34));
//Async call
std::shared_ptr<SendData> sentData(new SendData(34));
std::shared_ptr<SendData> recvData;
- auto dataBack = [&cv, &recvData](std::shared_ptr<SendData>& data) {
+ auto dataBack = [&cv, &recvData](ipc::Status status, std::shared_ptr<SendData>& data) {
+ BOOST_CHECK(status == ipc::Status::OK);
recvData = data;
cv.notify_one();
};
std::shared_ptr<SendData> sentData(new SendData(56));
std::shared_ptr<SendData> recvData;
- auto dataBack = [&cv, &recvData](std::shared_ptr<SendData>& data) {
+ auto dataBack = [&cv, &recvData](ipc::Status status, std::shared_ptr<SendData>& data) {
+ BOOST_CHECK(status == ipc::Status::OK);
recvData = data;
cv.notify_one();
};
BOOST_AUTO_TEST_CASE(SyncTimeoutTest)
{
Service s(socketPath);
+ s.addMethodHandler<SendData, SendData>(1, longEchoCallback);
+
+ s.start();
+ Client c(socketPath);
+ c.start();
+
+ std::shared_ptr<SendData> sentData(new SendData(78));
+
+ BOOST_CHECK_THROW((c.callSync<SendData, SendData>(1, sentData, 10)), IPCException);
+}
+
+BOOST_AUTO_TEST_CASE(SerializationErrorTest)
+{
+ Service s(socketPath);
+ s.addMethodHandler<SendData, SendData>(1, echoCallback);
+ s.start();
+
+ Client c(socketPath);
+ c.start();
+
+ std::shared_ptr<ThrowOnAcceptData> throwingData(new ThrowOnAcceptData());
+
+ BOOST_CHECK_THROW((c.callSync<ThrowOnAcceptData, SendData>(1, throwingData)), IPCSerializationException);
+
+}
+
+BOOST_AUTO_TEST_CASE(ParseErrorTest)
+{
+ Service s(socketPath);
s.addMethodHandler<SendData, SendData>(1, echoCallback);
+ s.start();
+
+ Client c(socketPath);
+ c.start();
+
+ std::shared_ptr<SendData> sentData(new SendData(78));
+ BOOST_CHECK_THROW((c.callSync<SendData, ThrowOnAcceptData>(1, sentData, 10000)), IPCParsingException);
+}
+
+BOOST_AUTO_TEST_CASE(DisconnectedPeerErrorTest)
+{
+ Service s(socketPath);
+
+ auto method = [](std::shared_ptr<ThrowOnAcceptData>&) {
+ return std::shared_ptr<SendData>(new SendData(1));
+ };
+ // Method will throw during serialization and disconnect automatically
+ s.addMethodHandler<SendData, ThrowOnAcceptData>(1, method);
s.start();
+
Client c(socketPath);
c.start();
+ std::mutex mtx;
+ std::unique_lock<std::mutex> lck(mtx);
+ std::condition_variable cv;
+ ipc::Status retStatus = ipc::Status::UNDEFINED;
+
+ auto dataBack = [&cv, &retStatus](ipc::Status status, std::shared_ptr<SendData>&) {
+ retStatus = status;
+ cv.notify_one();
+ };
+
std::shared_ptr<SendData> sentData(new SendData(78));
+ c.callAsync<SendData, SendData>(1, sentData, dataBack);
- BOOST_CHECK_THROW((c.callSync<SendData, SendData>(1, sentData, 1)), IPCException);
+ // Wait for the response
+ BOOST_CHECK(cv.wait_for(lck, std::chrono::seconds(10), [&retStatus]() {
+ return retStatus != ipc::Status::UNDEFINED;
+ }));
+ BOOST_CHECK(retStatus == ipc::Status::PEER_DISCONNECTED);
}