const unsigned int maxNumberOfPeers)
: mNewPeerCallback(newPeerCallback),
mRemovedPeerCallback(removedPeerCallback),
- mMaxNumberOfPeers(maxNumberOfPeers),
- mPeerIDCounter(0)
+ mMaxNumberOfPeers(maxNumberOfPeers)
{
LOGT("Creating Processor");
using namespace std::placeholders;
mMethodsCallbacks.erase(methodID);
}
-PeerID Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
+FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
{
LOGT("Adding socket");
- PeerID peerID;
+ FileDescriptor peerFD;
{
Lock lock(mSocketsMutex);
- peerID = getNextPeerID();
- SocketInfo socketInfo(peerID, std::move(socketPtr));
+ peerFD = socketPtr->getFD();
+ SocketInfo socketInfo(peerFD, std::move(socketPtr));
mNewSockets.push(std::move(socketInfo));
}
- LOGI("New peer added. Id: " << peerID);
+ LOGI("New peer added. Id: " << peerFD);
mEventQueue.send(Event::ADD_PEER);
- return peerID;
+ return peerFD;
}
-void Processor::removePeer(const PeerID peerID)
+void Processor::removePeer(const FileDescriptor peerFD)
{
std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
{
Lock lock(mSocketsMutex);
- RemovePeerRequest request(peerID, conditionPtr);
+ RemovePeerRequest request(peerFD, conditionPtr);
mPeersToDelete.push(std::move(request));
}
mEventQueue.send(Event::REMOVE_PEER);
- auto isPeerDeleted = [&peerID, this]()->bool {
+ auto isPeerDeleted = [&peerFD, this]()->bool {
Lock lock(mSocketsMutex);
- return mSockets.count(peerID) == 0;
+ return mSockets.count(peerFD) == 0;
};
std::mutex mutex;
conditionPtr->wait(lock, isPeerDeleted);
}
-void Processor::removePeerInternal(const PeerID peerID, Status status)
+void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
{
- LOGW("Removing peer. ID: " << peerID);
+ LOGW("Removing peer. ID: " << peerFD);
{
Lock lock(mSocketsMutex);
- mSockets.erase(peerID);
+ mSockets.erase(peerFD);
// Remove from signal addressees
for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
- it->second.remove(peerID);
+ it->second.remove(peerFD);
if (it->second.empty()) {
it = mSignalsPeers.erase(it);
} else {
std::shared_ptr<void> data;
for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
- if (it->second.peerID == peerID) {
+ if (it->second.peerFD == peerFD) {
IGNORE_EXCEPTIONS(it->second.process(status, data));
it = mReturnCallbacks.erase(it);
} else {
Lock lock(mCallbacksMutex);
if (mRemovedPeerCallback) {
// Notify about the deletion
- mRemovedPeerCallback(peerID);
+ mRemovedPeerCallback(peerFD);
}
}
bool Processor::handleLostConnections()
{
- std::list<PeerID> peersToRemove;
+ std::vector<FileDescriptor> peersToRemove;
{
Lock lock(mSocketsMutex);
- auto socketIt = mSockets.begin();
- for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) {
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
if (mFDs[i].revents & POLLHUP) {
- LOGI("Lost connection to peer: " << socketIt->first);
+ LOGI("Lost connection to peer: " << mFDs[i].fd);
mFDs[i].revents &= ~(POLLHUP);
- peersToRemove.push_back(socketIt->first);
+ peersToRemove.push_back(mFDs[i].fd);
}
}
}
- for (const PeerID peerID : peersToRemove) {
- removePeerInternal(peerID, Status::PEER_DISCONNECTED);
+ for (const FileDescriptor peerFD : peersToRemove) {
+ removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
}
return !peersToRemove.empty();
bool Processor::handleInputs()
{
- std::list<std::pair<PeerID, std::shared_ptr<Socket>> > peersWithInput;
+ std::vector<std::shared_ptr<Socket>> socketsWithInput;
{
Lock lock(mSocketsMutex);
- auto socketIt = mSockets.begin();
- for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) {
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
if (mFDs[i].revents & POLLIN) {
mFDs[i].revents &= ~(POLLIN);
- peersWithInput.push_back(*socketIt);
+ socketsWithInput.push_back(mSockets[mFDs[i].fd]);
}
}
}
bool pollChanged = false;
// Handle input outside the critical section
- for (const auto& peer : peersWithInput) {
- pollChanged = pollChanged || handleInput(peer.first, *peer.second);
+ for (const auto& socketPtr : socketsWithInput) {
+ pollChanged = pollChanged || handleInput(*socketPtr);
}
return pollChanged;
}
-bool Processor::handleInput(const PeerID peerID, const Socket& socket)
+bool Processor::handleInput(const Socket& socket)
{
LOGT("Handle incoming data");
MethodID methodID;
socket.read(&messageID, sizeof(messageID));
if (methodID == RETURN_METHOD_ID) {
- return onReturnValue(peerID, socket, messageID);
+ return onReturnValue(socket, messageID);
} else {
Lock lock(mCallsMutex);
// Method
std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
lock.unlock();
- return onRemoteCall(peerID, socket, methodID, messageID, methodCallbacks);
+ return onRemoteCall(socket, methodID, messageID, methodCallbacks);
} else if (mSignalsCallbacks.count(methodID)) {
// Signal
std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
lock.unlock();
- return onRemoteSignal(peerID, socket, methodID, messageID, signalCallbacks);
+ return onRemoteSignal(socket, methodID, messageID, signalCallbacks);
} else {
// Nothing
lock.unlock();
LOGW("No method or signal callback for methodID: " << methodID);
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
}
return false;
}
-std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const PeerID peerID,
+std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
std::shared_ptr<RegisterSignalsMessage>& data)
{
- LOGD("New signals for peer: " << peerID);
+ LOGD("New signals for peer: " << peerFD);
Lock lock(mSocketsMutex);
for (MethodID methodID : data->ids) {
- mSignalsPeers[methodID].push_back(peerID);
+ mSignalsPeers[methodID].push_back(peerFD);
}
return std::make_shared<EmptyData>();
}
-bool Processor::onReturnValue(const PeerID peerID,
- const Socket& socket,
+bool Processor::onReturnValue(const Socket& socket,
const MessageID messageID)
{
LOGI("Return value for messageID: " << messageID);
mReturnCallbacks.erase(messageID);
} catch (const std::out_of_range&) {
LOGW("No return callback for messageID: " << messageID);
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
} catch (const std::exception& e) {
LOGE("Exception during parsing: " << e.what());
IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
- removePeerInternal(peerID, Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
return false;
}
-bool Processor::onRemoteSignal(const PeerID peerID,
- const Socket& socket,
+bool Processor::onRemoteSignal(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<SignalHandlers> signalCallbacks)
data = signalCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
LOGE("Exception during parsing: " << e.what());
- removePeerInternal(peerID, Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
try {
- signalCallbacks->signal(peerID, data);
+ signalCallbacks->signal(socket.getFD(), data);
} catch (const std::exception& e) {
LOGE("Exception in method handler: " << e.what());
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
return false;
}
-bool Processor::onRemoteCall(const PeerID peerID,
- const Socket& socket,
+bool Processor::onRemoteCall(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr<MethodHandlers> methodCallbacks)
data = methodCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
LOGE("Exception during parsing: " << e.what());
- removePeerInternal(peerID, Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
std::shared_ptr<void> returnData;
try {
- returnData = methodCallbacks->method(peerID, data);
+ returnData = methodCallbacks->method(socket.getFD(), data);
} catch (const std::exception& e) {
LOGE("Exception in method handler: " << e.what());
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
methodCallbacks->serialize(socket.getFD(), returnData);
} catch (const std::exception& e) {
LOGE("Exception during serialization: " << e.what());
- removePeerInternal(peerID, Status::SERIALIZATION_ERROR);
+ removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
return true;
}
mNewSockets.pop();
if (mSockets.size() > mMaxNumberOfPeers) {
- LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerID);
+ LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
return false;
}
- if (mSockets.count(socketInfo.peerID) != 0) {
- LOGE("There already was a socket for peerID: " << socketInfo.peerID);
+ if (mSockets.count(socketInfo.peerFD) != 0) {
+ LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
return false;
}
- mSockets[socketInfo.peerID] = std::move(socketInfo.socketPtr);
+ mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
}
// Broadcast the new signal to peers
LOGW("Sending handled signals");
- std::list<PeerID> peersIDs;
+ std::list<FileDescriptor> peersIDs;
{
Lock lock(mSocketsMutex);
for (const auto kv : mSockets) {
}
auto data = std::make_shared<RegisterSignalsMessage>(ids);
- for (const PeerID peerID : peersIDs) {
+ for (const FileDescriptor peerFD : peersIDs) {
callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
- peerID,
+ peerFD,
data,
discardResultHandler<EmptyData>);
}
Lock lock(mCallbacksMutex);
if (mNewPeerCallback) {
// Notify about the new user.
- mNewPeerCallback(socketInfo.peerID);
+ mNewPeerCallback(socketInfo.peerFD);
}
}
return true;
mPeersToDelete.pop();
}
- removePeerInternal(request.peerID, Status::REMOVED_PEER);
+ removePeerInternal(request.peerFD, Status::REMOVED_PEER);
request.conditionPtr->notify_all();
return true;
}
-PeerID Processor::getNextPeerID()
-{
- // TODO: This method of generating UIDs is buggy. To be changed.
- return ++mPeerIDCounter;
-}
-
CallQueue::Call Processor::getCall()
{
Lock lock(mCallsMutex);
try {
// Get the peer's socket
Lock lock(mSocketsMutex);
- socketPtr = mSockets.at(call.peerID);
+ socketPtr = mSockets.at(call.peerFD);
} catch (const std::out_of_range&) {
- LOGE("Peer disconnected. No socket with a peerID: " << call.peerID);
+ LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
return false;
}
if (mReturnCallbacks.count(call.messageID) != 0) {
LOGE("There already was a return callback for messageID: " << call.messageID);
}
- mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerID,
+ mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
std::move(call.parse),
std::move(call.process)));
}
mReturnCallbacks.erase(call.messageID);
}
- removePeerInternal(call.peerID, Status::SERIALIZATION_ERROR);
+ removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
return true;
}