// Initialize the connection with the server
LOGD("Connecting to " + mSocketPath);
auto socketPtr = std::make_shared<Socket>(Socket::connectSocket(mSocketPath));
- mServiceID = mProcessor.addPeer(socketPtr);
+ mServiceFD = mProcessor.addPeer(socketPtr);
// Start listening
mProcessor.start();
const std::shared_ptr<SentDataType>& data);
private:
- PeerID mServiceID;
+ FileDescriptor mServiceFD;
Processor mProcessor;
std::string mSocketPath;
};
unsigned int timeoutMS)
{
LOGD("Sync calling method: " << methodID);
- return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceID, data, timeoutMS);
+ return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceFD, data, timeoutMS);
}
template<typename SentDataType, typename ReceivedDataType>
LOGD("Async calling method: " << methodID);
mProcessor.callAsync<SentDataType,
ReceivedDataType>(methodID,
- mServiceID,
+ mServiceFD,
data,
resultCallback);
LOGD("Async called method: " << methodID);
Call() = default;
Call(Call&&) = default;
- PeerID peerID;
+ FileDescriptor peerFD;
MethodID methodID;
MessageID messageID;
std::shared_ptr<void> data;
template<typename SentDataType, typename ReceivedDataType>
MessageID push(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process);
template<typename SentDataType>
MessageID push(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data);
Call pop();
template<typename SentDataType, typename ReceivedDataType>
MessageID CallQueue::push(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process)
{
Call call;
call.methodID = methodID;
- call.peerID = peerID;
+ call.peerFD = peerFD;
call.data = data;
MessageID messageID = getNextMessageID();
template<typename SentDataType>
MessageID CallQueue::push(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data)
{
Call call;
call.methodID = methodID;
- call.peerID = peerID;
+ call.peerFD = peerFD;
call.data = data;
MessageID messageID = getNextMessageID();
const unsigned int maxNumberOfPeers)
: mNewPeerCallback(newPeerCallback),
mRemovedPeerCallback(removedPeerCallback),
- mMaxNumberOfPeers(maxNumberOfPeers),
- mPeerIDCounter(0)
+ mMaxNumberOfPeers(maxNumberOfPeers)
{
LOGT("Creating Processor");
using namespace std::placeholders;
mMethodsCallbacks.erase(methodID);
}
-PeerID Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
+FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
{
LOGT("Adding socket");
- PeerID peerID;
+ FileDescriptor peerFD;
{
Lock lock(mSocketsMutex);
- peerID = getNextPeerID();
- SocketInfo socketInfo(peerID, std::move(socketPtr));
+ peerFD = socketPtr->getFD();
+ SocketInfo socketInfo(peerFD, std::move(socketPtr));
mNewSockets.push(std::move(socketInfo));
}
- LOGI("New peer added. Id: " << peerID);
+ LOGI("New peer added. Id: " << peerFD);
mEventQueue.send(Event::ADD_PEER);
- return peerID;
+ return peerFD;
}
-void Processor::removePeer(const PeerID peerID)
+void Processor::removePeer(const FileDescriptor peerFD)
{
std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
{
Lock lock(mSocketsMutex);
- RemovePeerRequest request(peerID, conditionPtr);
+ RemovePeerRequest request(peerFD, conditionPtr);
mPeersToDelete.push(std::move(request));
}
mEventQueue.send(Event::REMOVE_PEER);
- auto isPeerDeleted = [&peerID, this]()->bool {
+ auto isPeerDeleted = [&peerFD, this]()->bool {
Lock lock(mSocketsMutex);
- return mSockets.count(peerID) == 0;
+ return mSockets.count(peerFD) == 0;
};
std::mutex mutex;
conditionPtr->wait(lock, isPeerDeleted);
}
-void Processor::removePeerInternal(const PeerID peerID, Status status)
+void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
{
- LOGW("Removing peer. ID: " << peerID);
+ LOGW("Removing peer. ID: " << peerFD);
{
Lock lock(mSocketsMutex);
- mSockets.erase(peerID);
+ mSockets.erase(peerFD);
// Remove from signal addressees
for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
- it->second.remove(peerID);
+ it->second.remove(peerFD);
if (it->second.empty()) {
it = mSignalsPeers.erase(it);
} else {
std::shared_ptr<void> data;
for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
- if (it->second.peerID == peerID) {
+ if (it->second.peerFD == peerFD) {
IGNORE_EXCEPTIONS(it->second.process(status, data));
it = mReturnCallbacks.erase(it);
} else {
Lock lock(mCallbacksMutex);
if (mRemovedPeerCallback) {
// Notify about the deletion
- mRemovedPeerCallback(peerID);
+ mRemovedPeerCallback(peerFD);
}
}
bool Processor::handleLostConnections()
{
- std::list<PeerID> peersToRemove;
+ std::vector<FileDescriptor> peersToRemove;
{
Lock lock(mSocketsMutex);
- auto socketIt = mSockets.begin();
- for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) {
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
if (mFDs[i].revents & POLLHUP) {
- LOGI("Lost connection to peer: " << socketIt->first);
+ LOGI("Lost connection to peer: " << mFDs[i].fd);
mFDs[i].revents &= ~(POLLHUP);
- peersToRemove.push_back(socketIt->first);
+ peersToRemove.push_back(mFDs[i].fd);
}
}
}
- for (const PeerID peerID : peersToRemove) {
- removePeerInternal(peerID, Status::PEER_DISCONNECTED);
+ for (const FileDescriptor peerFD : peersToRemove) {
+ removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
}
return !peersToRemove.empty();
bool Processor::handleInputs()
{
- std::list<std::pair<PeerID, std::shared_ptr<Socket>> > peersWithInput;
+ std::vector<std::shared_ptr<Socket>> socketsWithInput;
{
Lock lock(mSocketsMutex);
- auto socketIt = mSockets.begin();
- for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) {
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
if (mFDs[i].revents & POLLIN) {
mFDs[i].revents &= ~(POLLIN);
- peersWithInput.push_back(*socketIt);
+ socketsWithInput.push_back(mSockets[mFDs[i].fd]);
}
}
}
bool pollChanged = false;
// Handle input outside the critical section
- for (const auto& peer : peersWithInput) {
- pollChanged = pollChanged || handleInput(peer.first, *peer.second);
+ for (const auto& socketPtr : socketsWithInput) {
+ pollChanged = pollChanged || handleInput(*socketPtr);
}
return pollChanged;
}
-bool Processor::handleInput(const PeerID peerID, const Socket& socket)
+bool Processor::handleInput(const Socket& socket)
{
LOGT("Handle incoming data");
MethodID methodID;
socket.read(&messageID, sizeof(messageID));
if (methodID == RETURN_METHOD_ID) {
- return onReturnValue(peerID, socket, messageID);
+ return onReturnValue(socket, messageID);
} else {
Lock lock(mCallsMutex);
// Method
std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
lock.unlock();
- return onRemoteCall(peerID, socket, methodID, messageID, methodCallbacks);
+ return onRemoteCall(socket, methodID, messageID, methodCallbacks);
} else if (mSignalsCallbacks.count(methodID)) {
// Signal
std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
lock.unlock();
- return onRemoteSignal(peerID, socket, methodID, messageID, signalCallbacks);
+ return onRemoteSignal(socket, methodID, messageID, signalCallbacks);
} else {
// Nothing
lock.unlock();
LOGW("No method or signal callback for methodID: " << methodID);
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
}
return false;
}
-std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const PeerID peerID,
+std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
std::shared_ptr<RegisterSignalsMessage>& data)
{
- LOGD("New signals for peer: " << peerID);
+ LOGD("New signals for peer: " << peerFD);
Lock lock(mSocketsMutex);
for (MethodID methodID : data->ids) {
- mSignalsPeers[methodID].push_back(peerID);
+ mSignalsPeers[methodID].push_back(peerFD);
}
return std::make_shared<EmptyData>();
}
-bool Processor::onReturnValue(const PeerID peerID,
- const Socket& socket,
+bool Processor::onReturnValue(const Socket& socket,
const MessageID messageID)
{
LOGI("Return value for messageID: " << messageID);
mReturnCallbacks.erase(messageID);
} catch (const std::out_of_range&) {
LOGW("No return callback for messageID: " << messageID);
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
} catch (const std::exception& e) {
LOGE("Exception during parsing: " << e.what());
IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
- removePeerInternal(peerID, Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
return false;
}
-bool Processor::onRemoteSignal(const PeerID peerID,
- const Socket& socket,
+bool Processor::onRemoteSignal(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<SignalHandlers> signalCallbacks)
data = signalCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
LOGE("Exception during parsing: " << e.what());
- removePeerInternal(peerID, Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
try {
- signalCallbacks->signal(peerID, data);
+ signalCallbacks->signal(socket.getFD(), data);
} catch (const std::exception& e) {
LOGE("Exception in method handler: " << e.what());
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
return false;
}
-bool Processor::onRemoteCall(const PeerID peerID,
- const Socket& socket,
+bool Processor::onRemoteCall(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<MethodHandlers> methodCallbacks)
data = methodCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
LOGE("Exception during parsing: " << e.what());
- removePeerInternal(peerID, Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
std::shared_ptr<void> returnData;
try {
- returnData = methodCallbacks->method(peerID, data);
+ returnData = methodCallbacks->method(socket.getFD(), data);
} catch (const std::exception& e) {
LOGE("Exception in method handler: " << e.what());
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
methodCallbacks->serialize(socket.getFD(), returnData);
} catch (const std::exception& e) {
LOGE("Exception during serialization: " << e.what());
- removePeerInternal(peerID, Status::SERIALIZATION_ERROR);
+ removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
return true;
}
mNewSockets.pop();
if (mSockets.size() > mMaxNumberOfPeers) {
- LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerID);
+ LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
return false;
}
- if (mSockets.count(socketInfo.peerID) != 0) {
- LOGE("There already was a socket for peerID: " << socketInfo.peerID);
+ if (mSockets.count(socketInfo.peerFD) != 0) {
+ LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
return false;
}
- mSockets[socketInfo.peerID] = std::move(socketInfo.socketPtr);
+ mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
}
// Broadcast the new signal to peers
LOGW("Sending handled signals");
- std::list<PeerID> peersIDs;
+ std::list<FileDescriptor> peersIDs;
{
Lock lock(mSocketsMutex);
for (const auto kv : mSockets) {
}
auto data = std::make_shared<RegisterSignalsMessage>(ids);
- for (const PeerID peerID : peersIDs) {
+ for (const FileDescriptor peerFD : peersIDs) {
callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
- peerID,
+ peerFD,
data,
discardResultHandler<EmptyData>);
}
Lock lock(mCallbacksMutex);
if (mNewPeerCallback) {
// Notify about the new user.
- mNewPeerCallback(socketInfo.peerID);
+ mNewPeerCallback(socketInfo.peerFD);
}
}
return true;
mPeersToDelete.pop();
}
- removePeerInternal(request.peerID, Status::REMOVED_PEER);
+ removePeerInternal(request.peerFD, Status::REMOVED_PEER);
request.conditionPtr->notify_all();
return true;
}
-PeerID Processor::getNextPeerID()
-{
- // TODO: This method of generating UIDs is buggy. To be changed.
- return ++mPeerIDCounter;
-}
-
CallQueue::Call Processor::getCall()
{
Lock lock(mCallsMutex);
try {
// Get the peer's socket
Lock lock(mSocketsMutex);
- socketPtr = mSockets.at(call.peerID);
+ socketPtr = mSockets.at(call.peerFD);
} catch (const std::out_of_range&) {
- LOGE("Peer disconnected. No socket with a peerID: " << call.peerID);
+ LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
return false;
}
if (mReturnCallbacks.count(call.messageID) != 0) {
LOGE("There already was a return callback for messageID: " << call.messageID);
}
- mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerID,
+ mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
std::move(call.parse),
std::move(call.process)));
}
mReturnCallbacks.erase(call.messageID);
}
- removePeerInternal(call.peerID, Status::SERIALIZATION_ERROR);
+ removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
return true;
}
#include "logger/logger.hpp"
#include <poll.h>
-
-#include <atomic>
#include <condition_variable>
#include <queue>
#include <mutex>
* - helper function for removing from unordered map
* - new way to generate UIDs
* - callbacks for serialization/parsing
+* - store Sockets in a vector, maybe SocketStore?
+*
+*
*/
class Processor {
public:
* Calls the newPeerCallback.
*
* @param socketPtr pointer to the new socket
- * @return peerID of the new socket
+ * @return peerFD of the new socket
*/
- PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
+ FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
/**
* Request removing peer and wait
*
- * @param peerID id of the peer
+ * @param peerFD id of the peer
*/
- void removePeer(const PeerID peerID);
+ void removePeer(const FileDescriptor peerFD);
/**
* Saves the callbacks connected to the method id.
* Synchronous method call.
*
* @param methodID API dependent id of the method
- * @param peerID id of the peer
+ * @param peerFD id of the peer
* @param data data to sent
* @param timeoutMS how long to wait for the return value before throw
* @tparam SentDataType data type to send
*/
template<typename SentDataType, typename ReceivedDataType>
std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS = 500);
* Asynchronous method call
*
* @param methodID API dependent id of the method
- * @param peerID id of the peer
+ * @param peerFD id of the peer
* @param data data to sent
* @param process callback processing the return data
* @tparam SentDataType data type to send
*/
template<typename SentDataType, typename ReceivedDataType>
MessageID callAsync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process);
ReturnCallbacks(ReturnCallbacks&&) = default;
ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
- ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultHandler<void>::type& process)
- : peerID(peerID), parse(parse), process(process) {}
+ ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler<void>::type& process)
+ : peerFD(peerFD), parse(parse), process(process) {}
- PeerID peerID;
+ FileDescriptor peerFD;
ParseCallback parse;
ResultHandler<void>::type process;
};
SocketInfo(SocketInfo&&) = default;
SocketInfo& operator=(SocketInfo &&) = default;
- SocketInfo(const PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
- : peerID(peerID), socketPtr(socketPtr) {}
+ SocketInfo(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
+ : peerFD(peerFD), socketPtr(socketPtr) {}
- PeerID peerID;
+ FileDescriptor peerFD;
std::shared_ptr<Socket> socketPtr;
};
RemovePeerRequest(RemovePeerRequest&&) = default;
RemovePeerRequest& operator=(RemovePeerRequest &&) = default;
- RemovePeerRequest(const PeerID peerID,
+ RemovePeerRequest(const FileDescriptor peerFD,
const std::shared_ptr<std::condition_variable>& conditionPtr)
- : peerID(peerID), conditionPtr(conditionPtr) {}
+ : peerFD(peerFD), conditionPtr(conditionPtr) {}
- PeerID peerID;
+ FileDescriptor peerFD;
std::shared_ptr<std::condition_variable> conditionPtr;
};
CallQueue mCalls;
std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
- std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
+ std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
// Mutex for changing mSockets map.
// Shouldn't be locked on any read/write, that could block. Just copy the ptr.
std::mutex mSocketsMutex;
- std::unordered_map<PeerID, std::shared_ptr<Socket> > mSockets;
+ std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
+ std::vector<struct pollfd> mFDs;
std::queue<SocketInfo> mNewSockets;
std::queue<RemovePeerRequest> mPeersToDelete;
unsigned int mMaxNumberOfPeers;
std::thread mThread;
- std::vector<struct pollfd> mFDs;
-
- std::atomic<PeerID> mPeerIDCounter;
template<typename SentDataType, typename ReceivedDataType>
void addMethodHandlerInternal(const MethodID methodID,
template<typename SentDataType, typename ReceivedDataType>
MessageID callInternal(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process);
bool onRemovePeer();
bool handleLostConnections();
bool handleInputs();
- bool handleInput(const PeerID peerID, const Socket& socket);
- bool onReturnValue(const PeerID peerID,
- const Socket& socket,
+ bool handleInput(const Socket& socket);
+ bool onReturnValue(const Socket& socket,
const MessageID messageID);
- bool onRemoteCall(const PeerID peerID,
- const Socket& socket,
+ bool onRemoteCall(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<MethodHandlers> methodCallbacks);
- bool onRemoteSignal(const PeerID peerID,
- const Socket& socket,
+ bool onRemoteSignal(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<SignalHandlers> signalCallbacks);
void resetPolling();
- PeerID getNextPeerID();
+ FileDescriptor getNextFileDescriptor();
CallQueue::Call getCall();
- void removePeerInternal(const PeerID peerID, Status status);
+ void removePeerInternal(const FileDescriptor peerFD, Status status);
- std::shared_ptr<EmptyData> onNewSignals(const PeerID peerID,
+ std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
std::shared_ptr<RegisterSignalsMessage>& data);
config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
};
- methodCall.method = [method](const PeerID peerID, std::shared_ptr<void>& data)->std::shared_ptr<void> {
+ methodCall.method = [method](const FileDescriptor peerFD, std::shared_ptr<void>& data)->std::shared_ptr<void> {
std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- return method(peerID, tmpData);
+ return method(peerFD, tmpData);
};
{
return data;
};
- signalCall.signal = [handler](const PeerID peerID, std::shared_ptr<void>& data) {
+ signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- handler(peerID, tmpData);
+ handler(peerFD, tmpData);
};
{
std::vector<MethodID> ids {methodID};
auto data = std::make_shared<RegisterSignalsMessage>(ids);
- std::list<PeerID> peersIDs;
+ std::list<FileDescriptor> peersIDs;
{
Lock lock(mSocketsMutex);
for (const auto kv : mSockets) {
}
}
- for (const PeerID peerID : peersIDs) {
+ for (const FileDescriptor peerFD : peersIDs) {
callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
- peerID,
+ peerFD,
data,
DEFAULT_METHOD_TIMEOUT);
}
template<typename SentDataType, typename ReceivedDataType>
MessageID Processor::callInternal(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process)
{
Lock lock(mCallsMutex);
- MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
+ MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
mEventQueue.send(Event::CALL);
return messageID;
template<typename SentDataType, typename ReceivedDataType>
MessageID Processor::callAsync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process)
{
throw IPCException("The Processor thread is not started. Can't send any data.");
}
- return callInternal<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
+ return callInternal<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
}
template<typename SentDataType, typename ReceivedDataType>
std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS)
{
};
MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
- peerID,
+ peerFD,
data,
process);
}
}
if (isTimeout) {
- removePeer(peerID);
+ removePeer(peerFD);
LOGE("Function call timeout; methodID: " << methodID);
throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
} else {
throw IPCException("The Processor thread is not started. Can't send any data.");
}
- std::list<PeerID> peersIDs;
+ std::list<FileDescriptor> peersIDs;
{
Lock lock(mSocketsMutex);
peersIDs = mSignalsPeers[methodID];
}
- for (const PeerID peerID : peersIDs) {
+ for (const FileDescriptor peerFD : peersIDs) {
Lock lock(mCallsMutex);
- mCalls.push<SentDataType>(methodID, peerID, data);
+ mCalls.push<SentDataType>(methodID, peerFD, data);
mEventQueue.send(Event::CALL);
}
}
*/
template<typename SentDataType, typename ReceivedDataType>
std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS = 500);
*/
template<typename SentDataType, typename ReceivedDataType>
void callAsync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& resultCallback);
template<typename SentDataType, typename ReceivedDataType>
std::shared_ptr<ReceivedDataType> Service::callSync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS)
{
- LOGD("Sync calling method: " << methodID << " for user: " << peerID);
- return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerID, data, timeoutMS);
+ LOGD("Sync calling method: " << methodID << " for user: " << peerFD);
+ return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerFD, data, timeoutMS);
}
template<typename SentDataType, typename ReceivedDataType>
void Service::callAsync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& resultCallback)
{
- LOGD("Async calling method: " << methodID << " for user: " << peerID);
+ LOGD("Async calling method: " << methodID << " for user: " << peerFD);
mProcessor.callAsync<SentDataType,
ReceivedDataType>(methodID,
- peerID,
+ peerFD,
data,
resultCallback);
- LOGD("Async called method: " << methodID << "for user: " << peerID);
+ LOGD("Async called method: " << methodID << "for user: " << peerFD);
}
template<typename SentDataType>
namespace security_containers {
namespace ipc {
-typedef std::function<void(int)> PeerCallback;
-typedef unsigned int PeerID;
+typedef int FileDescriptor;
typedef unsigned int MethodID;
typedef unsigned int MessageID;
+typedef std::function<void(FileDescriptor)> PeerCallback;
+
enum class Status : int {
OK = 0,
PARSING_ERROR,
template<typename SentDataType, typename ReceivedDataType>
struct MethodHandler {
- typedef std::function<std::shared_ptr<SentDataType>(PeerID, std::shared_ptr<ReceivedDataType>&)> type;
+ typedef std::function<std::shared_ptr<SentDataType>(FileDescriptor peerFD,
+ std::shared_ptr<ReceivedDataType>& data)> type;
};
template<typename ReceivedDataType>
struct SignalHandler {
- typedef std::function<void(PeerID, std::shared_ptr<ReceivedDataType>&)> type;
+ typedef std::function<void(FileDescriptor peerFD,
+ std::shared_ptr<ReceivedDataType>& data)> type;
};
template <typename ReceivedDataType>
struct ResultHandler {
- typedef std::function<void(Status, std::shared_ptr<ReceivedDataType>&)> type;
+ typedef std::function<void(Status status,
+ std::shared_ptr<ReceivedDataType>& resultData)> type;
};
} // namespace ipc
}
};
-std::shared_ptr<EmptyData> returnEmptyCallback(const PeerID, std::shared_ptr<EmptyData>&)
+std::shared_ptr<EmptyData> returnEmptyCallback(const FileDescriptor, std::shared_ptr<EmptyData>&)
{
return std::shared_ptr<EmptyData>(new EmptyData());
}
-std::shared_ptr<SendData> returnDataCallback(const PeerID, std::shared_ptr<SendData>&)
+std::shared_ptr<SendData> returnDataCallback(const FileDescriptor, std::shared_ptr<SendData>&)
{
return std::shared_ptr<SendData>(new SendData(1));
}
-std::shared_ptr<SendData> echoCallback(const PeerID, std::shared_ptr<SendData>& data)
+std::shared_ptr<SendData> echoCallback(const FileDescriptor, std::shared_ptr<SendData>& data)
{
return data;
}
-std::shared_ptr<SendData> longEchoCallback(const PeerID, std::shared_ptr<SendData>& data)
+std::shared_ptr<SendData> longEchoCallback(const FileDescriptor, std::shared_ptr<SendData>& data)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
return data;
}
-PeerID connect(Service& s, Client& c)
+FileDescriptor connect(Service& s, Client& c)
{
- // Connects the Client to the Service and returns Clients PeerID
+ // Connects the Client to the Service and returns Clients FileDescriptor
std::mutex mutex;
std::condition_variable cv;
- PeerID peerID = 0;
- auto newPeerCallback = [&cv, &peerID, &mutex](const PeerID newPeerID) {
+ FileDescriptor peerFD = 0;
+ auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFileDescriptor) {
std::unique_lock<std::mutex> lock(mutex);
- peerID = newPeerID;
+ peerFD = newFileDescriptor;
cv.notify_one();
};
c.start();
std::unique_lock<std::mutex> lock(mutex);
- BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerID]() {
- return peerID != 0;
+ BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerFD]() {
+ return peerFD != 0;
}));
- return peerID;
+ return peerFD;
}
void testEcho(Client& c, const MethodID methodID)
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-void testEcho(Service& s, const MethodID methodID, const PeerID peerID)
+void testEcho(Service& s, const MethodID methodID, const FileDescriptor peerFD)
{
std::shared_ptr<SendData> sentData(new SendData(56));
- std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(methodID, peerID, sentData);
+ std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(methodID, peerFD, sentData);
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
c.addMethodHandler<EmptyData, EmptyData>(1, returnEmptyCallback);
c.addMethodHandler<SendData, SendData>(1, returnDataCallback);
- PeerID peerID = connect(s, c);
+ FileDescriptor peerFD = connect(s, c);
c.addMethodHandler<SendData, SendData>(1, echoCallback);
c.addMethodHandler<SendData, SendData>(2, returnDataCallback);
- testEcho(s, 1, peerID);
+ testEcho(s, 1, peerFD);
c.removeMethod(1);
c.removeMethod(2);
- BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException);
+ BOOST_CHECK_THROW(testEcho(s, 1, peerFD), IPCException);
}
BOOST_AUTO_TEST_CASE(ServiceStartStop)
Service s(socketPath);
Client c(socketPath);
c.addMethodHandler<SendData, SendData>(1, echoCallback);
- PeerID peerID = connect(s, c);
+ FileDescriptor peerFD = connect(s, c);
std::shared_ptr<SendData> sentData(new SendData(56));
- std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(1, peerID, sentData);
+ std::shared_ptr<SendData> recvData = s.callSync<SendData, SendData>(1, peerFD, sentData);
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
Service s(socketPath);
Client c(socketPath);
c.addMethodHandler<SendData, SendData>(1, echoCallback);
- PeerID peerID = connect(s, c);
+ FileDescriptor peerFD = connect(s, c);
// Async call
std::shared_ptr<SendData> sentData(new SendData(56));
cv.notify_one();
};
- s.callAsync<SendData, SendData>(1, peerID, sentData, dataBack);
+ s.callAsync<SendData, SendData>(1, peerFD, sentData, dataBack);
// Wait for the response
std::unique_lock<std::mutex> lock(mutex);
{
Service s(socketPath);
- auto method = [](const PeerID, std::shared_ptr<ThrowOnAcceptData>&) {
+ auto method = [](const FileDescriptor, std::shared_ptr<ThrowOnAcceptData>&) {
return std::shared_ptr<SendData>(new SendData(1));
};
BOOST_AUTO_TEST_CASE(ReadTimeout)
{
Service s(socketPath);
- auto longEchoCallback = [](const PeerID, std::shared_ptr<SendData>& data) {
+ auto longEchoCallback = [](const FileDescriptor, std::shared_ptr<SendData>& data) {
return std::shared_ptr<LongSendData>(new LongSendData(data->intVal));
};
s.addMethodHandler<LongSendData, SendData>(1, longEchoCallback);
connect(s, c);
std::atomic_bool isHandlerACalled(false);
- auto handlerA = [&isHandlerACalled](const PeerID, std::shared_ptr<SendData>&) {
+ auto handlerA = [&isHandlerACalled](const FileDescriptor, std::shared_ptr<SendData>&) {
isHandlerACalled = true;
};
std::atomic_bool isHandlerBCalled(false);
- auto handlerB = [&isHandlerBCalled](const PeerID, std::shared_ptr<SendData>&) {
+ auto handlerB = [&isHandlerBCalled](const FileDescriptor, std::shared_ptr<SendData>&) {
isHandlerBCalled = true;
};
Client c(socketPath);
std::atomic_bool isHandlerACalled(false);
- auto handlerA = [&isHandlerACalled](const PeerID, std::shared_ptr<SendData>&) {
+ auto handlerA = [&isHandlerACalled](const FileDescriptor, std::shared_ptr<SendData>&) {
isHandlerACalled = true;
};
std::atomic_bool isHandlerBCalled(false);
- auto handlerB = [&isHandlerBCalled](const PeerID, std::shared_ptr<SendData>&) {
+ auto handlerB = [&isHandlerBCalled](const FileDescriptor, std::shared_ptr<SendData>&) {
isHandlerBCalled = true;
};