mNewSockets.push(std::move(socketInfo));
}
LOGI("New peer added. Id: " << peerID);
- mEventQueue.send(Event::NEW_PEER);
+ mEventQueue.send(Event::ADD_PEER);
return peerID;
}
mPeersToDelete.push(std::move(request));
}
- mEventQueue.send(Event::DELETE_PEER);
+ mEventQueue.send(Event::REMOVE_PEER);
auto isPeerDeleted = [&peerID, this] {
Lock lock(mSocketsMutex);
resetPolling();
}
-void Processor::cleanCommunication()
-{
- while (!mEventQueue.isEmpty()) {
- switch (mEventQueue.receive()) {
- case Event::FINISH: {
- LOGD("Event FINISH after FINISH");
- break;
- }
- case Event::CALL: {
- LOGD("Event CALL after FINISH");
- Call call = getCall();
- IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
- break;
- }
-
- case Event::NEW_PEER: {
- LOGD("Event NEW_PEER after FINISH");
- break;
- }
-
- case Event::DELETE_PEER: {
- LOGD("Event DELETE_PEER after FINISH");
- RemovePeerRequest request;
- {
- Lock lock(mSocketsMutex);
- request = std::move(mPeersToDelete.front());
- mPeersToDelete.pop();
- }
- request.conditionPtr->notify_all();
- break;
- }
- }
- }
-}
-
void Processor::resetPolling()
{
LOGI("Resetting polling");
case Event::CALL: {
LOGD("Event CALL");
- return handleCall();
+ return onCall();
}
- case Event::NEW_PEER: {
- LOGD("Event NEW_PEER");
- SocketInfo socketInfo;
- {
- Lock lock(mSocketsMutex);
+ case Event::ADD_PEER: {
+ LOGD("Event ADD_PEER");
+ return onNewPeer();
+ }
- socketInfo = std::move(mNewSockets.front());
- mNewSockets.pop();
+ case Event::REMOVE_PEER: {
+ LOGD("Event REMOVE_PEER");
+ return onRemovePeer();
+ }
+ }
- if (mSockets.size() > mMaxNumberOfPeers) {
- LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerID);
- return false;
- }
- if (mSockets.count(socketInfo.peerID) != 0) {
- LOGE("There already was a socket for peerID: " << socketInfo.peerID);
- return false;
- }
+ return false;
+}
- mSockets[socketInfo.peerID] = std::move(socketInfo.socketPtr);
- }
- resetPolling();
- if (mNewPeerCallback) {
- // Notify about the new user.
- mNewPeerCallback(socketInfo.peerID);
- }
- return true;
- }
+bool Processor::onNewPeer()
+{
+ SocketInfo socketInfo;
+ {
+ Lock lock(mSocketsMutex);
- case Event::DELETE_PEER: {
- LOGD("Event DELETE_PEER");
- RemovePeerRequest request;
- {
- Lock lock(mSocketsMutex);
- request = std::move(mPeersToDelete.front());
- mPeersToDelete.pop();
+ socketInfo = std::move(mNewSockets.front());
+ mNewSockets.pop();
+
+ if (mSockets.size() > mMaxNumberOfPeers) {
+ LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerID);
+ return false;
+ }
+ if (mSockets.count(socketInfo.peerID) != 0) {
+ LOGE("There already was a socket for peerID: " << socketInfo.peerID);
+ return false;
}
- removePeerInternal(request.peerID, Status::REMOVED_PEER);
- request.conditionPtr->notify_all();
- return true;
+ mSockets[socketInfo.peerID] = std::move(socketInfo.socketPtr);
}
+ resetPolling();
+ if (mNewPeerCallback) {
+ // Notify about the new user.
+ mNewPeerCallback(socketInfo.peerID);
}
+ return true;
+}
- return false;
+bool Processor::onRemovePeer()
+{
+ RemovePeerRequest request;
+ {
+ Lock lock(mSocketsMutex);
+ request = std::move(mPeersToDelete.front());
+ mPeersToDelete.pop();
+ }
+
+ removePeerInternal(request.peerID, Status::REMOVED_PEER);
+ request.conditionPtr->notify_all();
+ return true;
}
Processor::MessageID Processor::getNextMessageID()
return call;
}
-bool Processor::handleCall()
+bool Processor::onCall()
{
LOGT("Handle call (from another thread) to send a message.");
Call call = getCall();
return false;
}
+void Processor::cleanCommunication()
+{
+ while (!mEventQueue.isEmpty()) {
+ switch (mEventQueue.receive()) {
+ case Event::FINISH: {
+ LOGD("Event FINISH after FINISH");
+ break;
+ }
+ case Event::CALL: {
+ LOGD("Event CALL after FINISH");
+ Call call = getCall();
+ IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
+ break;
+ }
+
+ case Event::ADD_PEER: {
+ LOGD("Event ADD_PEER after FINISH");
+ break;
+ }
+
+ case Event::REMOVE_PEER: {
+ LOGD("Event REMOVE_PEER after FINISH");
+ RemovePeerRequest request;
+ {
+ Lock lock(mSocketsMutex);
+ request = std::move(mPeersToDelete.front());
+ mPeersToDelete.pop();
+ }
+ request.conditionPtr->notify_all();
+ break;
+ }
+ }
+ }
+}
+
} // namespace ipc
} // namespace security_containers
* - Rest: The data written in a callback. One type per method.ReturnCallbacks
*
* TODO:
-* - remove ReturnCallbacks on peer disconnect
-* - on sync timeout erase the return callback
-* - don't throw timeout if the message is already processed
-* - naming convention or methods that just commissions the PROCESS thread to do something
-* - removePeer API function
-* - error handling - special message type
* - some mutexes may not be needed
-* - make addPeer synchronous like removePeer
*/
class Processor {
public:
typedef unsigned int MethodID;
typedef unsigned int MessageID;
-
/**
* Method ID. Used to indicate a message with the return value.
*/
enum class Event : int {
FINISH, // Shutdown request
CALL, // New method call in the queue
- NEW_PEER, // New peer in the queue
- DELETE_PEER // Delete peer
+ ADD_PEER, // New peer in the queue
+ REMOVE_PEER // Remove peer
};
EventQueue<Event> mEventQueue;
void run();
bool handleEvent();
- bool handleCall();
+ bool onCall();
+ bool onNewPeer();
+ bool onRemovePeer();
bool handleLostConnections();
bool handleInputs();
bool handleInput(const PeerID peerID, const Socket& socket);