LOGS(mLogPrefix + "Processor Constructor");
utils::signalBlock(SIGPIPE);
+
using namespace std::placeholders;
setSignalHandlerInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
std::bind(&Processor::onNewSignals, this, _1, _2));
LOGS(mLogPrefix + "Processor start");
Lock lock(mStateMutex);
- if (!isStarted()) {
+ if (!mIsRunning) {
LOGI(mLogPrefix + "Processor start");
mIsRunning = true;
mUsesExternalPolling = usesExternalPolling;
LOGS(mLogPrefix + "Processor stop");
if (isStarted()) {
- auto conditionPtr = std::make_shared<std::condition_variable_any>();
+ auto conditionPtr = std::make_shared<std::condition_variable>();
{
Lock lock(mStateMutex);
auto request = std::make_shared<FinishRequest>(conditionPtr);
// Wait till the FINISH request is served
Lock lock(mStateMutex);
conditionPtr->wait(lock, [this]() {
- return !isStarted();
+ return !mIsRunning;
});
}
}
return peerFD;
}
-void Processor::removePeer(const FileDescriptor peerFD)
+void Processor::removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock)
{
LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD);
- {
- Lock lock(mStateMutex);
- mRequestQueue.removeIf([peerFD](Request & request) {
- return request.requestID == Event::ADD_PEER &&
- request.get<AddPeerRequest>()->peerFD == peerFD;
- });
- }
-
- // Remove peer and wait till he's gone
- std::shared_ptr<std::condition_variable_any> conditionPtr(new std::condition_variable_any());
- {
- Lock lock(mStateMutex);
- auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
- mRequestQueue.pushBack(Event::REMOVE_PEER, request);
- }
-
auto isPeerDeleted = [&peerFD, this]()->bool {
return mSockets.count(peerFD) == 0;
};
- Lock lock(mStateMutex);
+ mRequestQueue.removeIf([peerFD](Request & request) {
+ return request.requestID == Event::ADD_PEER &&
+ request.get<AddPeerRequest>()->peerFD == peerFD;
+ });
+
+ // Remove peer and wait till he's gone
+ std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
+
+ auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
+ mRequestQueue.pushBack(Event::REMOVE_PEER, request);
+
conditionPtr->wait(lock, isPeerDeleted);
}
// Notify about the deletion
mRemovedPeerCallback(peerFD);
}
-
- resetPolling();
}
void Processor::resetPolling()
return;
}
- {
- Lock lock(mStateMutex);
- LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
- // Setup polling on eventfd and sockets
- mFDs.resize(mSockets.size() + 1);
+ LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
+ // Setup polling on eventfd and sockets
+ mFDs.resize(mSockets.size() + 1);
- mFDs[0].fd = mRequestQueue.getFD();
- mFDs[0].events = POLLIN;
+ mFDs[0].fd = mRequestQueue.getFD();
+ mFDs[0].events = POLLIN;
- auto socketIt = mSockets.begin();
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
- mFDs[i].fd = socketIt->second->getFD();
- mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
- ++socketIt;
- // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
- }
+ auto socketIt = mSockets.begin();
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
+ LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
+ mFDs[i].fd = socketIt->second->getFD();
+ mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
+ ++socketIt;
+ // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
}
}
{
LOGS(mLogPrefix + "Processor run");
- resetPolling();
+ {
+ Lock lock(mStateMutex);
+ resetPolling();
+ }
while (isStarted()) {
LOGT(mLogPrefix + "Waiting for communication...");
// Check for lost connections:
if (handleLostConnections()) {
// mFDs changed
+ resetPolling();
continue;
}
// Check for incoming data.
if (handleInputs()) {
// mFDs changed
+ resetPolling();
continue;
}
mFDs[0].revents &= ~(POLLIN);
if (handleEvent()) {
// mFDs changed
+ resetPolling();
continue;
}
}
bool Processor::handleInputs()
{
- Lock lock(mStateMutex);
+ // Lock not needed, mFDs won't be changed by handleInput
bool pollChanged = false;
for (unsigned int i = 1; i < mFDs.size(); ++i) {
bool Processor::handleInput(const FileDescriptor peerFD)
{
LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD);
+
Lock lock(mStateMutex);
std::shared_ptr<Socket> socketPtr;
if (mMethodsCallbacks.count(methodID)) {
// Method
std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
- return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
+ return onRemoteMethod(*socketPtr, methodID, messageID, methodCallbacks);
} else if (mSignalsCallbacks.count(methodID)) {
// Signal
{
LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
- // LOGI(mLogPrefix + "Return value for messageID: " << messageID);
ReturnCallbacks returnCallbacks;
try {
LOGT(mLogPrefix + "Getting the return callback");
return true;
}
- // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID);
ResultBuilder resultBuilder(data);
IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
- // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed");
return false;
}
{
LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
- // LOGI(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
-
std::shared_ptr<void> data;
try {
LOGT(mLogPrefix + "Parsing incoming data");
return true;
}
- // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID);
try {
signalCallbacks->signal(socket.getFD(), data);
} catch (const IPCUserException& e) {
return false;
}
-bool Processor::onRemoteCall(const Socket& socket,
- const MethodID methodID,
- const MessageID messageID,
- std::shared_ptr<MethodHandlers> methodCallbacks)
+bool Processor::onRemoteMethod(const Socket& socket,
+ const MethodID methodID,
+ const MessageID messageID,
+ std::shared_ptr<MethodHandlers> methodCallbacks)
{
- LOGS(mLogPrefix + "Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
- // LOGI(mLogPrefix + "Remote call; methodID: " << methodID << " messageID: " << messageID);
+ LOGS(mLogPrefix + "Processor onRemoteMethod; methodID: " << methodID << " messageID: " << messageID);
std::shared_ptr<void> data;
try {
request.peerFD,
data);
-
- resetPolling();
-
if (mNewPeerCallback) {
// Notify about the new user.
LOGT(mLogPrefix + "Calling NewPeerCallback");
* - no new events added after stop() called
* - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
* there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
-* - remove recursive mutex
*
*/
class Processor {
FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
/**
- * Request removing peer and wait
- *
- * @param peerFD id of the peer
- */
- void removePeer(const FileDescriptor peerFD);
-
- /**
* Saves the callbacks connected to the method id.
* When a message with the given method id is received,
* the data will be passed to the serialization callback through file descriptor.
private:
typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
- typedef std::unique_lock<std::recursive_mutex> Lock;
+ typedef std::unique_lock<std::mutex> Lock;
typedef RequestQueue<Event>::Request Request;
struct EmptyData {
std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
// Mutex for modifying any internal data
- std::recursive_mutex mStateMutex;
+ std::mutex mStateMutex;
PeerCallback mNewPeerCallback;
PeerCallback mRemovedPeerCallback;
bool onReturnValue(const Socket& socket,
const MessageID messageID);
- bool onRemoteCall(const Socket& socket,
- const MethodID methodID,
- const MessageID messageID,
- std::shared_ptr<MethodHandlers> methodCallbacks);
+ bool onRemoteMethod(const Socket& socket,
+ const MethodID methodID,
+ const MessageID messageID,
+ std::shared_ptr<MethodHandlers> methodCallbacks);
bool onRemoteSignal(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
void resetPolling();
FileDescriptor getNextFileDescriptor();
void removePeerInternal(const FileDescriptor peerFD, const std::exception_ptr& exceptionPtr);
+ void removePeerSyncInternal(const FileDescriptor peerFD, Lock& lock);
void onNewSignals(const FileDescriptor peerFD,
std::shared_ptr<RegisterSignalsProtocolMessage>& data);
}
std::shared_ptr<RegisterSignalsProtocolMessage> data;
- std::vector<FileDescriptor> peerFDs;
{
Lock lock(mStateMutex);
data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
for (const auto kv : mSockets) {
- peerFDs.push_back(kv.first);
+ signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
+ kv.first,
+ data);
}
}
-
- for (const auto peerFD : peerFDs) {
- signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
- peerFD,
- data);
- }
}
unsigned int timeoutMS)
{
Result<ReceivedDataType> result;
-
- std::mutex mutex;
std::condition_variable cv;
- auto process = [&result, &mutex, &cv](const Result<ReceivedDataType> && r) {
- std::unique_lock<std::mutex> lock(mutex);
+ auto process = [&result, &cv](const Result<ReceivedDataType> && r) {
+ // This is called under lock(mStateMutex)
result = std::move(r);
cv.notify_all();
};
return result.isValid();
};
- std::unique_lock<std::mutex> lock(mutex);
+ Lock lock(mStateMutex);
LOGT(mLogPrefix + "Waiting for the response...");
if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
LOGW(mLogPrefix + "Probably a timeout in callSync. Checking...");
- bool isTimeout;
- {
- Lock lock(mStateMutex);
- // Call isn't sent or call is sent but there is no reply
- isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
- return request.requestID == Event::METHOD &&
- request.get<MethodRequest>()->messageID == messageID;
- })
- || mRequestQueue.removeIf([messageID](Request & request) {
- return request.requestID == Event::SIGNAL &&
- request.get<SignalRequest>()->messageID == messageID;
- })
- || 1 == mReturnCallbacks.erase(messageID);
- }
+
+ // Call isn't sent or call is sent but there is no reply
+ bool isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
+ return request.requestID == Event::METHOD &&
+ request.get<MethodRequest>()->messageID == messageID;
+ })
+ || 1 == mReturnCallbacks.erase(messageID);
+
if (isTimeout) {
LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
- removePeer(peerFD);
+ removePeerSyncInternal(peerFD, lock);
throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
} else {
LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
const FileDescriptor peerFD,
const std::shared_ptr<SentDataType>& data)
{
- Lock lock(mStateMutex);
auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
mRequestQueue.pushFront(Event::SIGNAL, request);
}