LOGD("Connecting to " + mSocketPath);
auto socketPtr = std::make_shared<Socket>(Socket::connectSocket(mSocketPath));
- mServiceFD = mProcessor.addPeer(socketPtr);
+ mServiceID = mProcessor.addPeer(socketPtr);
}
bool Client::isStarted()
void Client::setNewPeerCallback(const PeerCallback& newPeerCallback)
{
LOGS("Client setNewPeerCallback");
- auto callback = [newPeerCallback, this](FileDescriptor fd) {
+ auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
if (mIPCGSourcePtr) {
mIPCGSourcePtr->addFD(fd);
}
if (newPeerCallback) {
- newPeerCallback(fd);
+ newPeerCallback(peerID, fd);
}
};
mProcessor.setNewPeerCallback(callback);
void Client::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
{
LOGS("Client setRemovedPeerCallback");
- auto callback = [removedPeerCallback, this](FileDescriptor fd) {
+ auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
if (mIPCGSourcePtr) {
mIPCGSourcePtr->removeFD(fd);
}
if (removedPeerCallback) {
- removedPeerCallback(fd);
+ removedPeerCallback(peerID, fd);
}
};
mProcessor.setRemovedPeerCallback(callback);
void startPoll();
void stopPoll();
- FileDescriptor mServiceFD;
+ PeerID mServiceID;
Processor mProcessor;
std::string mSocketPath;
IPCGSource::Pointer mIPCGSourcePtr;
unsigned int timeoutMS)
{
LOGS("Client callSync, methodID: " << methodID << ", timeoutMS: " << timeoutMS);
- return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceFD, data, timeoutMS);
+ return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceID, data, timeoutMS);
}
template<typename SentDataType, typename ReceivedDataType>
LOGS("Client callAsync, methodID: " << methodID);
mProcessor.callAsync<SentDataType,
ReceivedDataType>(methodID,
- mServiceFD,
+ mServiceID,
data,
resultCallback);
}
AddPeerRequest(const AddPeerRequest&) = delete;
AddPeerRequest& operator=(const AddPeerRequest&) = delete;
- AddPeerRequest(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
- : peerFD(peerFD),
- socketPtr(socketPtr)
+ AddPeerRequest(const std::shared_ptr<Socket>& socketPtr)
+ : socketPtr(socketPtr),
+ peerID(getNextPeerID())
{
}
- FileDescriptor peerFD;
std::shared_ptr<Socket> socketPtr;
+ PeerID peerID;
};
} // namespace ipc
template<typename SentDataType, typename ReceivedDataType>
static std::shared_ptr<MethodRequest> create(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process);
MethodID methodID;
- FileDescriptor peerFD;
+ PeerID peerID;
MessageID messageID;
std::shared_ptr<void> data;
SerializeCallback serialize;
ResultBuilderHandler process;
private:
- MethodRequest(const MethodID methodID, const FileDescriptor peerFD)
+ MethodRequest(const MethodID methodID, const PeerID peerID)
: methodID(methodID),
- peerFD(peerFD),
+ peerID(peerID),
messageID(getNextMessageID())
{}
};
template<typename SentDataType, typename ReceivedDataType>
std::shared_ptr<MethodRequest> MethodRequest::create(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process)
{
- std::shared_ptr<MethodRequest> request(new MethodRequest(methodID, peerFD));
+ std::shared_ptr<MethodRequest> request(new MethodRequest(methodID, peerID));
request->data = data;
request->serialize = [](const int fd, std::shared_ptr<void>& data)->void {
- LOGS("Method serialize, peerFD: " << fd);
+ LOGS("Method serialize, peerID: " << fd);
config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
};
request->parse = [](const int fd)->std::shared_ptr<void> {
- LOGS("Method parse, peerFD: " << fd);
+ LOGS("Method parse, peerID: " << fd);
std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
config::loadFromFD<ReceivedDataType>(fd, *data);
return data;
}
}
+Processor::Peers::iterator Processor::getPeerInfoIterator(const FileDescriptor fd)
+{
+ return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&fd](const PeerInfo & peerInfo) {
+ return fd == peerInfo.socketPtr->getFD();
+ });
+}
+
+Processor::Peers::iterator Processor::getPeerInfoIterator(const PeerID peerID)
+{
+ return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [&peerID](const PeerInfo & peerInfo) {
+ return peerID == peerInfo.peerID;
+ });
+}
+
bool Processor::isStarted()
{
Lock lock(mStateMutex);
mMethodsCallbacks.erase(methodID);
}
-FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
+PeerID Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
{
LOGS(mLogPrefix + "Processor addPeer");
Lock lock(mStateMutex);
- FileDescriptor peerFD = socketPtr->getFD();
- auto request = std::make_shared<AddPeerRequest>(peerFD, socketPtr);
- mRequestQueue.pushBack(Event::ADD_PEER, request);
+ auto requestPtr = std::make_shared<AddPeerRequest>(socketPtr);
+ mRequestQueue.pushBack(Event::ADD_PEER, requestPtr);
- LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD);
+ LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID);
- return peerFD;
+ return requestPtr->peerID;
}
-void Processor::removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock)
+void Processor::removePeerSyncInternal(const PeerID peerID, Lock& lock)
{
- LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD);
+ LOGS(mLogPrefix + "Processor removePeer peerID: " << peerID);
- auto isPeerDeleted = [&peerFD, this]()->bool {
- return mSockets.count(peerFD) == 0;
+ auto isPeerDeleted = [&peerID, this]()->bool {
+ return getPeerInfoIterator(peerID) == mPeerInfo.end();
};
- mRequestQueue.removeIf([peerFD](Request & request) {
+ mRequestQueue.removeIf([peerID](Request & request) {
return request.requestID == Event::ADD_PEER &&
- request.get<AddPeerRequest>()->peerFD == peerFD;
+ request.get<AddPeerRequest>()->peerID == peerID;
});
// Remove peer and wait till he's gone
std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
- auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
+ auto request = std::make_shared<RemovePeerRequest>(peerID, conditionPtr);
mRequestQueue.pushBack(Event::REMOVE_PEER, request);
conditionPtr->wait(lock, isPeerDeleted);
}
-void Processor::removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr)
+void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_ptr& exceptionPtr)
{
- LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD);
- LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD);
+ LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
+ LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
- if (!mSockets.erase(peerFD)) {
- LOGW(mLogPrefix + "No such peer. Another thread called removePeerInternal");
+ if (peerIt == mPeerInfo.end()) {
+ LOGW("Peer already removed");
return;
}
// Remove from signal addressees
for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
- it->second.remove(peerFD);
+ it->second.remove(peerIt->peerID);
if (it->second.empty()) {
it = mSignalsPeers.erase(it);
} else {
// Erase associated return value callbacks
for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
- if (it->second.peerFD == peerFD) {
+ if (it->second.peerID == peerIt->peerID) {
ResultBuilder resultBuilder(exceptionPtr);
IGNORE_EXCEPTIONS(it->second.process(resultBuilder));
it = mReturnCallbacks.erase(it);
if (mRemovedPeerCallback) {
// Notify about the deletion
- mRemovedPeerCallback(peerFD);
+ mRemovedPeerCallback(peerIt->peerID, peerIt->socketPtr->getFD());
}
+
+ mPeerInfo.erase(peerIt);
}
void Processor::resetPolling()
return;
}
- LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
// Setup polling on eventfd and sockets
- mFDs.resize(mSockets.size() + 1);
+ mFDs.resize(mPeerInfo.size() + 1);
+ LOGI(mLogPrefix + "Reseting mFDS.size: " << mFDs.size());
mFDs[0].fd = mRequestQueue.getFD();
mFDs[0].events = POLLIN;
- auto socketIt = mSockets.begin();
for (unsigned int i = 1; i < mFDs.size(); ++i) {
- LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
- mFDs[i].fd = socketIt->second->getFD();
+ auto fd = mPeerInfo[i - 1].socketPtr->getFD();
+
+ LOGI(mLogPrefix + "Reseting fd: " << fd);
+
+ mFDs[i].fd = fd;
mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
- ++socketIt;
// TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
}
}
Lock lock(mStateMutex);
bool isPeerRemoved = false;
- {
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- if (mFDs[i].revents & POLLHUP) {
- LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd);
- mFDs[i].revents &= ~(POLLHUP);
- removePeerInternal(mFDs[i].fd,
- std::make_exception_ptr(IPCPeerDisconnectedException()));
- isPeerRemoved = true;
- }
+
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
+ if (mFDs[i].revents & POLLHUP) {
+ auto peerIt = getPeerInfoIterator(mFDs[i].fd);
+ LOGI(mLogPrefix + "Lost connection to peer: " << peerIt->peerID);
+ mFDs[i].revents &= ~(POLLHUP);
+ removePeerInternal(peerIt,
+ std::make_exception_ptr(IPCPeerDisconnectedException()));
+ isPeerRemoved = true;
}
}
return isPeerRemoved;
}
-bool Processor::handleLostConnection(const FileDescriptor peerFD)
+bool Processor::handleLostConnection(const FileDescriptor fd)
{
Lock lock(mStateMutex);
- removePeerInternal(peerFD,
+ auto peerIt = getPeerInfoIterator(fd);
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCPeerDisconnectedException()));
return true;
}
return pollChanged;
}
-bool Processor::handleInput(const FileDescriptor peerFD)
+bool Processor::handleInput(const FileDescriptor fd)
{
- LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD);
+ LOGS(mLogPrefix + "Processor handleInput fd: " << fd);
Lock lock(mStateMutex);
- std::shared_ptr<Socket> socketPtr;
- try {
- // Get the peer's socket
- socketPtr = mSockets.at(peerFD);
- } catch (const std::out_of_range&) {
- LOGE(mLogPrefix + "No such peer: " << peerFD);
+ auto peerIt = getPeerInfoIterator(fd);
+
+ if (peerIt == mPeerInfo.end()) {
+ LOGE(mLogPrefix + "No peer for fd: " << fd);
return false;
}
+ Socket& socket = *peerIt->socketPtr;
+
MethodID methodID;
MessageID messageID;
{
- Socket::Guard guard = socketPtr->getGuard();
+ Socket::Guard guard = socket.getGuard();
try {
- socketPtr->read(&methodID, sizeof(methodID));
- socketPtr->read(&messageID, sizeof(messageID));
+ socket.read(&methodID, sizeof(methodID));
+ socket.read(&messageID, sizeof(messageID));
} catch (const IPCException& e) {
LOGE(mLogPrefix + "Error during reading the socket");
- removePeerInternal(socketPtr->getFD(),
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCNaughtyPeerException()));
return true;
}
if (methodID == RETURN_METHOD_ID) {
- return onReturnValue(*socketPtr, messageID);
+ return onReturnValue(peerIt, messageID);
} else {
if (mMethodsCallbacks.count(methodID)) {
// Method
std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
- return onRemoteMethod(*socketPtr, methodID, messageID, methodCallbacks);
+ return onRemoteMethod(peerIt, methodID, messageID, methodCallbacks);
} else if (mSignalsCallbacks.count(methodID)) {
// Signal
std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
- return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
+ return onRemoteSignal(peerIt, methodID, messageID, signalCallbacks);
} else {
- // Nothing
LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID);
- removePeerInternal(socketPtr->getFD(),
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCNaughtyPeerException()));
return true;
}
}
}
-void Processor::onNewSignals(const FileDescriptor peerFD,
- std::shared_ptr<RegisterSignalsProtocolMessage>& data)
+void Processor::onNewSignals(const PeerID peerID, std::shared_ptr<RegisterSignalsProtocolMessage>& data)
{
- LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD);
+ LOGS(mLogPrefix + "Processor onNewSignals peerID: " << peerID);
for (const MethodID methodID : data->ids) {
- mSignalsPeers[methodID].push_back(peerFD);
+ mSignalsPeers[methodID].push_back(peerID);
}
}
-void Processor::onErrorSignal(const FileDescriptor, std::shared_ptr<ErrorProtocolMessage>& data)
+void Processor::onErrorSignal(const PeerID, std::shared_ptr<ErrorProtocolMessage>& data)
{
LOGS(mLogPrefix + "Processor onErrorSignal messageID: " << data->messageID);
+ // If there is no return callback an out_of_range error will be thrown and peer will be removed
ReturnCallbacks returnCallbacks = std::move(mReturnCallbacks.at(data->messageID));
mReturnCallbacks.erase(data->messageID);
IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
}
-bool Processor::onReturnValue(const Socket& socket,
+bool Processor::onReturnValue(Peers::iterator& peerIt,
const MessageID messageID)
{
LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
mReturnCallbacks.erase(messageID);
} catch (const std::out_of_range&) {
LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
- removePeerInternal(socket.getFD(),
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCNaughtyPeerException()));
return true;
}
std::shared_ptr<void> data;
try {
LOGT(mLogPrefix + "Parsing incoming return data");
- data = returnCallbacks.parse(socket.getFD());
+ data = returnCallbacks.parse(peerIt->socketPtr->getFD());
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception during parsing: " << e.what());
ResultBuilder resultBuilder(std::make_exception_ptr(IPCParsingException()));
IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
- removePeerInternal(socket.getFD(),
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCParsingException()));
return true;
}
return false;
}
-bool Processor::onRemoteSignal(const Socket& socket,
+bool Processor::onRemoteSignal(Peers::iterator& peerIt,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<SignalHandlers> signalCallbacks)
std::shared_ptr<void> data;
try {
LOGT(mLogPrefix + "Parsing incoming data");
- data = signalCallbacks->parse(socket.getFD());
+ data = signalCallbacks->parse(peerIt->socketPtr->getFD());
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception during parsing: " << e.what());
- removePeerInternal(socket.getFD(),
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCParsingException()));
return true;
}
try {
- signalCallbacks->signal(socket.getFD(), data);
+ signalCallbacks->signal(peerIt->peerID, data);
} catch (const IPCUserException& e) {
LOGW("Discarded user's exception");
return false;
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception in method handler: " << e.what());
- removePeerInternal(socket.getFD(),
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCNaughtyPeerException()));
return true;
return false;
}
-bool Processor::onRemoteMethod(const Socket& socket,
+bool Processor::onRemoteMethod(Peers::iterator& peerIt,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<MethodHandlers> methodCallbacks)
std::shared_ptr<void> data;
try {
LOGT(mLogPrefix + "Parsing incoming data");
- data = methodCallbacks->parse(socket.getFD());
+ data = methodCallbacks->parse(peerIt->socketPtr->getFD());
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception during parsing: " << e.what());
- removePeerInternal(socket.getFD(),
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCParsingException()));
return true;
}
LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID);
std::shared_ptr<void> returnData;
try {
- returnData = methodCallbacks->method(socket.getFD(), data);
+ returnData = methodCallbacks->method(peerIt->peerID, data);
} catch (const IPCUserException& e) {
LOGW("User's exception");
auto data = std::make_shared<ErrorProtocolMessage>(messageID, e.getCode(), e.what());
- signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, socket.getFD(), data);
+ signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, peerIt->peerID, data);
return false;
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception in method handler: " << e.what());
- removePeerInternal(socket.getFD(),
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCNaughtyPeerException()));
return true;
}
LOGT(mLogPrefix + "Sending return data; methodID: " << methodID << "; messageID: " << messageID);
try {
// Send the call with the socket
+ Socket& socket = *peerIt->socketPtr;
Socket::Guard guard = socket.getGuard();
socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
socket.write(&messageID, sizeof(messageID));
methodCallbacks->serialize(socket.getFD(), returnData);
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Exception during serialization: " << e.what());
- removePeerInternal(socket.getFD(),
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCSerializationException()));
return true;
bool Processor::onMethodRequest(MethodRequest& request)
{
LOGS(mLogPrefix + "Processor onMethodRequest");
- std::shared_ptr<Socket> socketPtr;
- try {
- // Get the peer's socket
- socketPtr = mSockets.at(request.peerFD);
- } catch (const std::out_of_range&) {
- LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
+ auto peerIt = getPeerInfoIterator(request.peerID);
+
+ if (peerIt == mPeerInfo.end()) {
+ LOGE(mLogPrefix + "Peer disconnected. No user with a peerID: " << request.peerID);
// Pass the error to the processing callback
ResultBuilder resultBuilder(std::make_exception_ptr(IPCPeerDisconnectedException()));
if (mReturnCallbacks.count(request.messageID) != 0) {
LOGE(mLogPrefix + "There already was a return callback for messageID: " << request.messageID);
}
- mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(request.peerFD,
+ mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(peerIt->peerID,
std::move(request.parse),
std::move(request.process)));
+ Socket& socket = *peerIt->socketPtr;
try {
// Send the call with the socket
- Socket::Guard guard = socketPtr->getGuard();
- socketPtr->write(&request.methodID, sizeof(request.methodID));
- socketPtr->write(&request.messageID, sizeof(request.messageID));
+ Socket::Guard guard = socket.getGuard();
+ socket.write(&request.methodID, sizeof(request.methodID));
+ socket.write(&request.messageID, sizeof(request.messageID));
LOGT(mLogPrefix + "Serializing the message");
- request.serialize(socketPtr->getFD(), request.data);
+ request.serialize(socket.getFD(), request.data);
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Error during sending a method: " << e.what());
mReturnCallbacks.erase(request.messageID);
- removePeerInternal(request.peerFD,
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCSerializationException()));
-
-
return true;
}
{
LOGS(mLogPrefix + "Processor onSignalRequest");
- std::shared_ptr<Socket> socketPtr;
- try {
- // Get the peer's socket
- socketPtr = mSockets.at(request.peerFD);
- } catch (const std::out_of_range&) {
- LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
+ auto peerIt = getPeerInfoIterator(request.peerID);
+
+ if (peerIt == mPeerInfo.end()) {
+ LOGE(mLogPrefix + "Peer disconnected. No user for peerID: " << request.peerID);
return false;
}
+ Socket& socket = *peerIt->socketPtr;
try {
// Send the call with the socket
- Socket::Guard guard = socketPtr->getGuard();
- socketPtr->write(&request.methodID, sizeof(request.methodID));
- socketPtr->write(&request.messageID, sizeof(request.messageID));
- request.serialize(socketPtr->getFD(), request.data);
+ Socket::Guard guard = socket.getGuard();
+ socket.write(&request.methodID, sizeof(request.methodID));
+ socket.write(&request.messageID, sizeof(request.messageID));
+ request.serialize(socket.getFD(), request.data);
} catch (const std::exception& e) {
LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
- removePeerInternal(request.peerFD,
+ removePeerInternal(peerIt,
std::make_exception_ptr(IPCSerializationException()));
-
return true;
}
{
LOGS(mLogPrefix + "Processor onAddPeerRequest");
- if (mSockets.size() > mMaxNumberOfPeers) {
- LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerFD);
+ if (mPeerInfo.size() > mMaxNumberOfPeers) {
+ LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerID);
return false;
}
- if (mSockets.count(request.peerFD) != 0) {
- LOGE(mLogPrefix + "There already was a socket for peerFD: " << request.peerFD);
+
+ if (getPeerInfoIterator(request.peerID) != mPeerInfo.end()) {
+ LOGE(mLogPrefix + "There already was a socket for peerID: " << request.peerID);
return false;
}
- mSockets[request.peerFD] = std::move(request.socketPtr);
+ PeerInfo peerInfo(request.peerID, request.socketPtr);
+ mPeerInfo.push_back(std::move(peerInfo));
// Sending handled signals
}
auto data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
- request.peerFD,
+ request.peerID,
data);
if (mNewPeerCallback) {
// Notify about the new user.
LOGT(mLogPrefix + "Calling NewPeerCallback");
- mNewPeerCallback(request.peerFD);
+ mNewPeerCallback(request.peerID, request.socketPtr->getFD());
}
- LOGI(mLogPrefix + "New peer: " << request.peerFD);
+ LOGI(mLogPrefix + "New peerID: " << request.peerID);
return true;
}
{
LOGS(mLogPrefix + "Processor onRemovePeer");
- removePeerInternal(request.peerFD,
+ removePeerInternal(getPeerInfoIterator(request.peerID),
std::make_exception_ptr(IPCRemovedPeerException()));
request.conditionPtr->notify_all();
* Calls the newPeerCallback.
*
* @param socketPtr pointer to the new socket
- * @return peerFD of the new socket
+ * @return peerID of the new user
*/
- FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
+ PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
/**
* Saves the callbacks connected to the method id.
* Synchronous method call.
*
* @param methodID API dependent id of the method
- * @param peerFD id of the peer
+ * @param peerD id of the peer
* @param data data to sent
* @param timeoutMS how long to wait for the return value before throw
* @tparam SentDataType data type to send
*/
template<typename SentDataType, typename ReceivedDataType>
std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS = 500);
* Asynchronous method call
*
* @param methodID API dependent id of the method
- * @param peerFD id of the peer
+ * @param peerID id of the peer
* @param data data to sent
* @param process callback processing the return data
* @tparam SentDataType data type to send
*/
template<typename SentDataType, typename ReceivedDataType>
MessageID callAsync(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process);
* Removes one peer.
* Handler used in external polling.
*
- * @param peerFD file description identifying the peer
+ * @param fd file description identifying the peer
* @return should the polling structure be rebuild
*/
- bool handleLostConnection(const FileDescriptor peerFD);
+ bool handleLostConnection(const FileDescriptor fd);
/**
* Handles input from one peer.
* Handler used in external polling.
*
- * @param peerFD file description identifying the peer
+ * @param fd file description identifying the peer
* @return should the polling structure be rebuild
*/
- bool handleInput(const FileDescriptor peerFD);
+ bool handleInput(const FileDescriptor fd);
/**
* Handle one event from the internal event's queue
ReturnCallbacks(ReturnCallbacks&&) = default;
ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
- ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultBuilderHandler& process)
- : peerFD(peerFD), parse(parse), process(process) {}
+ ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultBuilderHandler& process)
+ : peerID(peerID), parse(parse), process(process) {}
- FileDescriptor peerFD;
+ PeerID peerID;
ParseCallback parse;
ResultBuilderHandler process;
};
+ struct PeerInfo {
+ PeerInfo(const PeerInfo& other) = delete;
+ PeerInfo& operator=(const PeerInfo&) = delete;
+ PeerInfo() = delete;
+
+ PeerInfo(PeerInfo&&) = default;
+ PeerInfo& operator=(PeerInfo &&) = default;
+
+ PeerInfo(PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
+ : peerID(peerID), socketPtr(socketPtr) {}
+
+ PeerID peerID;
+ std::shared_ptr<Socket> socketPtr;
+ };
+
+ typedef std::vector<PeerInfo> Peers;
+
std::string mLogPrefix;
RequestQueue<Event> mRequestQueue;
std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
- std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
+ std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
- std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
+ Peers mPeerInfo;
std::vector<struct pollfd> mFDs;
std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
template<typename SentDataType>
void signalInternal(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data);
void run();
bool handleLostConnections();
bool handleInputs();
- bool onReturnValue(const Socket& socket,
+ bool onReturnValue(Peers::iterator& peerIt,
const MessageID messageID);
- bool onRemoteMethod(const Socket& socket,
+ bool onRemoteMethod(Peers::iterator& peerIt,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<MethodHandlers> methodCallbacks);
- bool onRemoteSignal(const Socket& socket,
+ bool onRemoteSignal(Peers::iterator& peerIt,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<SignalHandlers> signalCallbacks);
void resetPolling();
- FileDescriptor getNextFileDescriptor();
- void removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr);
- void removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock);
- void onNewSignals(const FileDescriptor peerFD,
+ void removePeerInternal(Peers::iterator peerIt,
+ const std::exception_ptr& exceptionPtr);
+ void removePeerSyncInternal(const PeerID peerID, Lock& lock);
+
+ void onNewSignals(const PeerID peerID,
std::shared_ptr<RegisterSignalsProtocolMessage>& data);
- void onErrorSignal(const FileDescriptor peerFD,
+ void onErrorSignal(const PeerID peerID,
std::shared_ptr<ErrorProtocolMessage>& data);
+ Peers::iterator getPeerInfoIterator(const FileDescriptor fd);
+ Peers::iterator getPeerInfoIterator(const PeerID peerID);
};
config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
};
- methodCall.method = [method](const FileDescriptor peerFD, std::shared_ptr<void>& data)->std::shared_ptr<void> {
+ methodCall.method = [method](const PeerID peerID, std::shared_ptr<void>& data)->std::shared_ptr<void> {
std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- return method(peerFD, tmpData);
+ return method(peerID, tmpData);
};
mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
return dataToFill;
};
- signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& dataReceived) {
+ signalCall.signal = [handler](const PeerID peerID, std::shared_ptr<void>& dataReceived) {
std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(dataReceived);
- handler(peerFD, tmpData);
+ handler(peerID, tmpData);
};
mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
std::vector<MethodID> ids {methodID};
data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
- for (const auto kv : mSockets) {
+ for (const PeerInfo& peerInfo : mPeerInfo) {
signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
- kv.first,
+ peerInfo.peerID,
data);
}
}
template<typename SentDataType, typename ReceivedDataType>
MessageID Processor::callAsync(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process)
{
Lock lock(mStateMutex);
- auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
+ auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
mRequestQueue.pushBack(Event::METHOD, request);
return request->messageID;
}
template<typename SentDataType, typename ReceivedDataType>
std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS)
{
};
MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
- peerFD,
+ peerID,
data,
process);
if (isTimeout) {
LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
- removePeerSyncInternal(peerFD, lock);
+ removePeerSyncInternal(peerID, lock);
throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
} else {
LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
template<typename SentDataType>
void Processor::signalInternal(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data)
{
- auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
+ auto request = SignalRequest::create<SentDataType>(methodID, peerID, data);
mRequestQueue.pushFront(Event::SIGNAL, request);
}
LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID);
return;
}
- for (const FileDescriptor peerFD : it->second) {
- auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
+ for (const PeerID peerID : it->second) {
+ auto request = SignalRequest::create<SentDataType>(methodID, peerID, data);
mRequestQueue.pushBack(Event::SIGNAL, request);
}
}
RemovePeerRequest(const RemovePeerRequest&) = delete;
RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
- RemovePeerRequest(const FileDescriptor peerFD,
+ RemovePeerRequest(const PeerID peerID,
const std::shared_ptr<std::condition_variable>& conditionPtr)
- : peerFD(peerFD),
+ : peerID(peerID),
conditionPtr(conditionPtr)
{
}
- FileDescriptor peerFD;
+ PeerID peerID;
std::shared_ptr<std::condition_variable> conditionPtr;
};
template<typename SentDataType>
static std::shared_ptr<SignalRequest> create(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data);
MethodID methodID;
- FileDescriptor peerFD;
+ PeerID peerID;
MessageID messageID;
std::shared_ptr<void> data;
SerializeCallback serialize;
private:
- SignalRequest(const MethodID methodID, const FileDescriptor peerFD)
+ SignalRequest(const MethodID methodID, const PeerID peerID)
: methodID(methodID),
- peerFD(peerFD),
+ peerID(peerID),
messageID(getNextMessageID())
{}
template<typename SentDataType>
std::shared_ptr<SignalRequest> SignalRequest::create(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data)
{
- std::shared_ptr<SignalRequest> request(new SignalRequest(methodID, peerFD));
+ std::shared_ptr<SignalRequest> request(new SignalRequest(methodID, peerID));
request->data = data;
request->serialize = [](const int fd, std::shared_ptr<void>& data)->void {
- LOGS("Signal serialize, peerFD: " << fd);
+ LOGS("Signal serialize, peerID: " << fd);
config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
};
void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
{
LOGS("Service setNewPeerCallback");
- auto callback = [newPeerCallback, this](FileDescriptor fd) {
+ auto callback = [newPeerCallback, this](PeerID peerID, FileDescriptor fd) {
if (mIPCGSourcePtr) {
mIPCGSourcePtr->addFD(fd);
}
if (newPeerCallback) {
- newPeerCallback(fd);
+ newPeerCallback(peerID, fd);
}
};
mProcessor.setNewPeerCallback(callback);
void Service::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
{
LOGS("Service setRemovedPeerCallback");
- auto callback = [removedPeerCallback, this](FileDescriptor fd) {
+ auto callback = [removedPeerCallback, this](PeerID peerID, FileDescriptor fd) {
if (mIPCGSourcePtr) {
mIPCGSourcePtr->removeFD(fd);
}
if (removedPeerCallback) {
- removedPeerCallback(fd);
+ removedPeerCallback(peerID, fd);
}
};
mProcessor.setRemovedPeerCallback(callback);
*/
template<typename SentDataType, typename ReceivedDataType>
std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS = 500);
*/
template<typename SentDataType, typename ReceivedDataType>
void callAsync(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& resultCallback);
template<typename SentDataType, typename ReceivedDataType>
std::shared_ptr<ReceivedDataType> Service::callSync(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS)
{
LOGS("Service callSync, methodID: " << methodID
- << ", peerFD: " << peerFD
+ << ", peerID: " << peerID
<< ", timeoutMS: " << timeoutMS);
- return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerFD, data, timeoutMS);
+ return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerID, data, timeoutMS);
}
template<typename SentDataType, typename ReceivedDataType>
void Service::callAsync(const MethodID methodID,
- const FileDescriptor peerFD,
+ const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& resultCallback)
{
- LOGS("Service callAsync, methodID: " << methodID << ", peerFD: " << peerFD);
+ LOGS("Service callAsync, methodID: " << methodID << ", peerID: " << peerID);
mProcessor.callAsync<SentDataType,
ReceivedDataType>(methodID,
- peerFD,
+ peerID,
data,
resultCallback);
}
namespace {
std::atomic<MessageID> gLastMessageID(0);
+std::atomic<PeerID> gLastPeerID(0);
} // namespace
MessageID getNextMessageID()
return ++gLastMessageID;
}
+PeerID getNextPeerID()
+{
+ return ++gLastPeerID;
+}
} // namespace ipc
* @brief Types definitions
*/
-#ifndef COMMON_IPC_HANDLERS_HPP
-#define COMMON_IPC_HANDLERS_HPP
+#ifndef COMMON_IPC_TYPES_HPP
+#define COMMON_IPC_TYPES_HPP
#include "ipc/exception.hpp"
-
#include <functional>
#include <memory>
#include <string>
typedef int FileDescriptor;
typedef unsigned int MethodID;
typedef unsigned int MessageID;
+typedef unsigned int PeerID;
-typedef std::function<void(FileDescriptor)> PeerCallback;
+typedef std::function<void(const PeerID, const FileDescriptor)> PeerCallback;
typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
MessageID getNextMessageID();
+PeerID getNextPeerID();
template<typename SentDataType, typename ReceivedDataType>
struct MethodHandler {
- typedef std::function<std::shared_ptr<SentDataType>(FileDescriptor peerFD,
+ typedef std::function<std::shared_ptr<SentDataType>(PeerID peerID,
std::shared_ptr<ReceivedDataType>& data)> type;
};
template<typename ReceivedDataType>
struct SignalHandler {
- typedef std::function<void(FileDescriptor peerFD,
+ typedef std::function<void(PeerID peerID,
std::shared_ptr<ReceivedDataType>& data)> type;
};
} // namespace ipc
} // namespace vasum
-#endif // COMMON_IPC_HANDLERS_HPP
+#endif // COMMON_IPC_TYPES_HPP
}
};
-std::shared_ptr<EmptyData> returnEmptyCallback(const FileDescriptor, std::shared_ptr<EmptyData>&)
+std::shared_ptr<EmptyData> returnEmptyCallback(const PeerID, std::shared_ptr<EmptyData>&)
{
return std::make_shared<EmptyData>();
}
-std::shared_ptr<SendData> returnDataCallback(const FileDescriptor, std::shared_ptr<RecvData>&)
+std::shared_ptr<SendData> returnDataCallback(const PeerID, std::shared_ptr<RecvData>&)
{
return std::make_shared<SendData>(1);
}
-std::shared_ptr<SendData> echoCallback(const FileDescriptor, std::shared_ptr<RecvData>& data)
+std::shared_ptr<SendData> echoCallback(const PeerID, std::shared_ptr<RecvData>& data)
{
return std::make_shared<SendData>(data->intVal);
}
-std::shared_ptr<SendData> longEchoCallback(const FileDescriptor, std::shared_ptr<RecvData>& data)
+std::shared_ptr<SendData> longEchoCallback(const PeerID, std::shared_ptr<RecvData>& data)
{
std::this_thread::sleep_for(std::chrono::milliseconds(LONG_OPERATION_TIME));
return std::make_shared<SendData>(data->intVal);
}
-FileDescriptor connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientGlib = false)
+PeerID connect(Service& s, Client& c, bool isServiceGlib = false, bool isClientGlib = false)
{
- // Connects the Client to the Service and returns Clients FileDescriptor
- ValueLatch<FileDescriptor> peerFDLatch;
- auto newPeerCallback = [&peerFDLatch](const FileDescriptor newFD) {
- peerFDLatch.set(newFD);
+ // Connects the Client to the Service and returns Clients PeerID
+ ValueLatch<PeerID> peerIDLatch;
+ auto newPeerCallback = [&peerIDLatch](const PeerID newID, const FileDescriptor) {
+ peerIDLatch.set(newID);
};
s.setNewPeerCallback(newPeerCallback);
c.start(isClientGlib);
- FileDescriptor peerFD = peerFDLatch.get(TIMEOUT);
+ PeerID peerID = peerIDLatch.get(TIMEOUT);
s.setNewPeerCallback(nullptr);
- BOOST_REQUIRE_NE(peerFD, 0);
- return peerFD;
+ BOOST_REQUIRE_NE(peerID, 0);
+ return peerID;
}
-FileDescriptor connectServiceGSource(Service& s, Client& c)
+PeerID connectServiceGSource(Service& s, Client& c)
{
return connect(s, c, true, false);
}
-FileDescriptor connectClientGSource(Service& s, Client& c)
+PeerID connectClientGSource(Service& s, Client& c)
{
return connect(s, c, false, true);
}
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-void testEcho(Service& s, const MethodID methodID, const FileDescriptor peerFD)
+void testEcho(Service& s, const MethodID methodID, const PeerID peerID)
{
std::shared_ptr<SendData> sentData(new SendData(56));
- std::shared_ptr<RecvData> recvData = s.callSync<SendData, RecvData>(methodID, peerFD, sentData, TIMEOUT);
+ std::shared_ptr<RecvData> recvData = s.callSync<SendData, RecvData>(methodID, peerID, sentData, TIMEOUT);
BOOST_REQUIRE(recvData);
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
c.setMethodHandler<EmptyData, EmptyData>(1, returnEmptyCallback);
c.setMethodHandler<SendData, RecvData>(1, returnDataCallback);
- FileDescriptor peerFD = connect(s, c);
+ PeerID peerID = connect(s, c);
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
c.setMethodHandler<SendData, RecvData>(2, returnDataCallback);
- testEcho(s, 1, peerFD);
+ testEcho(s, 1, peerID);
c.removeMethod(1);
c.removeMethod(2);
- BOOST_CHECK_THROW(testEcho(s, 1, peerFD), IPCException);
+ BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException);
}
BOOST_AUTO_TEST_CASE(ServiceStartStop)
Service s(socketPath);
Client c(socketPath);
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
- FileDescriptor peerFD = connect(s, c);
+ PeerID peerID = connect(s, c);
std::shared_ptr<SendData> sentData(new SendData(56));
- std::shared_ptr<RecvData> recvData = s.callSync<SendData, RecvData>(1, peerFD, sentData);
+ std::shared_ptr<RecvData> recvData = s.callSync<SendData, RecvData>(1, peerID, sentData);
BOOST_REQUIRE(recvData);
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
Service s(socketPath);
Client c(socketPath);
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
- FileDescriptor peerFD = connect(s, c);
+ PeerID peerID = connect(s, c);
// Async call
auto dataBack = [&recvDataLatch](Result<RecvData> && r) {
recvDataLatch.set(r.get());
};
- s.callAsync<SendData, RecvData>(1, peerFD, sentData, dataBack);
+ s.callAsync<SendData, RecvData>(1, peerID, sentData, dataBack);
// Wait for the response
std::shared_ptr<RecvData> recvData(recvDataLatch.get(TIMEOUT));
ValueLatch<Result<RecvData>> retStatusLatch;
Service s(socketPath);
- auto method = [](const FileDescriptor, std::shared_ptr<ThrowOnAcceptData>&) {
+ auto method = [](const PeerID, std::shared_ptr<ThrowOnAcceptData>&) {
return std::shared_ptr<SendData>(new SendData(1));
};
BOOST_AUTO_TEST_CASE(ReadTimeout)
{
Service s(socketPath);
- auto longEchoCallback = [](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+ auto longEchoCallback = [](const PeerID, std::shared_ptr<RecvData>& data) {
return std::shared_ptr<LongSendData>(new LongSendData(data->intVal, LONG_OPERATION_TIME));
};
s.setMethodHandler<LongSendData, RecvData>(1, longEchoCallback);
Client c(socketPath);
connect(s, c);
- auto handlerA = [&recvDataLatchA](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+ auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr<RecvData>& data) {
recvDataLatchA.set(data);
};
- auto handlerB = [&recvDataLatchB](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+ auto handlerB = [&recvDataLatchB](const PeerID, std::shared_ptr<RecvData>& data) {
recvDataLatchB.set(data);
};
+ LOGH("SETTING SIGNAAALS");
c.setSignalHandler<RecvData>(1, handlerA);
c.setSignalHandler<RecvData>(2, handlerB);
Service s(socketPath);
Client c(socketPath);
- auto handlerA = [&recvDataLatchA](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+ auto handlerA = [&recvDataLatchA](const PeerID, std::shared_ptr<RecvData>& data) {
recvDataLatchA.set(data);
};
- auto handlerB = [&recvDataLatchB](const FileDescriptor, std::shared_ptr<RecvData>& data) {
+ auto handlerB = [&recvDataLatchB](const PeerID, std::shared_ptr<RecvData>& data) {
recvDataLatchB.set(data);
};
utils::Latch l;
ScopedGlibLoop loop;
- auto signalHandler = [&l](const FileDescriptor, std::shared_ptr<RecvData>&) {
+ auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
l.set();
};
utils::Latch l;
ScopedGlibLoop loop;
- auto signalHandler = [&l](const FileDescriptor, std::shared_ptr<RecvData>&) {
+ auto signalHandler = [&l](const PeerID, std::shared_ptr<RecvData>&) {
l.set();
};
c.setMethodHandler<SendData, RecvData>(1, echoCallback);
c.setSignalHandler<RecvData>(2, signalHandler);
- FileDescriptor peerFD = connectClientGSource(s, c);
+ PeerID peerID = connectClientGSource(s, c);
- testEcho(s, 1, peerFD);
+ testEcho(s, 1, peerID);
auto data = std::make_shared<SendData>(1);
s.signal<SendData>(2, data);
Client c(socketPath);
auto clientID = connect(s, c);
- auto throwingMethodHandler = [&](const FileDescriptor, std::shared_ptr<RecvData>&) -> std::shared_ptr<SendData> {
+ auto throwingMethodHandler = [&](const PeerID, std::shared_ptr<RecvData>&) -> std::shared_ptr<SendData> {
throw IPCUserException(TEST_ERROR_CODE, TEST_ERROR_MESSAGE);
};