[Bug/Feature] Fixed a bug in callSync.
Replaced all mutexes in Processor with only one.
Added LOGS loggs
[Cause] N/A
[Solution] N/A
[Verification] Build, install, run tests, run tests under valgrind
Change-Id: I6b6ec26df5f5d7ba8b930e4321f766146fa556f0
Client::Client(const std::string& socketPath)
: mSocketPath(socketPath)
{
- LOGD("Creating client");
+ LOGS("Client Constructor");
}
Client::~Client()
{
- LOGD("Destroying client...");
+ LOGS("Client Destructor");
try {
stop();
} catch (IPCException& e) {
LOGE("Error in Client's destructor: " << e.what());
}
- LOGD("Destroyed client");
}
void Client::connect()
void Client::start()
{
- LOGD("Starting client...");
-
+ LOGS("Client start");
connect();
-
- // Start polling thread
mProcessor.start();
-
- LOGD("Started client");
}
bool Client::isStarted()
void Client::stop()
{
- LOGD("Stopping client...");
+ LOGS("Client Destructor");
mProcessor.stop();
- LOGD("Stopped");
}
std::vector<FileDescriptor> Client::getFDs()
void Client::setNewPeerCallback(const PeerCallback& newPeerCallback)
{
+ LOGS("Client setNewPeerCallback");
mProcessor.setNewPeerCallback(newPeerCallback);
}
void Client::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
{
+ LOGS("Client setRemovedPeerCallback");
mProcessor.setRemovedPeerCallback(removedPeerCallback);
}
void Client::removeMethod(const MethodID methodID)
{
- LOGD("Removing method id: " << methodID);
+ LOGS("Client removeMethod methodID: " << methodID);
mProcessor.removeMethod(methodID);
}
void Client::addMethodHandler(const MethodID methodID,
const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
{
- LOGD("Adding method with id " << methodID);
+ LOGS("Client addMethodHandler, methodID: " << methodID);
mProcessor.addMethodHandler<SentDataType, ReceivedDataType>(methodID, method);
- LOGD("Added method with id " << methodID);
}
template<typename ReceivedDataType>
void Client::addSignalHandler(const MethodID methodID,
const typename SignalHandler<ReceivedDataType>::type& handler)
{
- LOGD("Adding signal with id " << methodID);
+ LOGS("Client addSignalHandler, methodID: " << methodID);
mProcessor.addSignalHandler<ReceivedDataType>(methodID, handler);
- LOGD("Added signal with id " << methodID);
}
template<typename SentDataType, typename ReceivedDataType>
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS)
{
- LOGD("Sync calling method: " << methodID);
+ LOGS("Client callSync, methodID: " << methodID << ", timeoutMS: " << timeoutMS);
return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, mServiceFD, data, timeoutMS);
}
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& resultCallback)
{
- LOGD("Async calling method: " << methodID);
+ LOGS("Client callAsync, methodID: " << methodID);
mProcessor.callAsync<SentDataType,
ReceivedDataType>(methodID,
mServiceFD,
data,
resultCallback);
- LOGD("Async called method: " << methodID);
}
template<typename SentDataType>
void Client::signal(const MethodID methodID,
const std::shared_ptr<SentDataType>& data)
{
- LOGD("Signaling: " << methodID);
+ LOGS("Client signal, methodID: " << methodID);
mProcessor.signal<SentDataType>(methodID, data);
- LOGD("Signaled: " << methodID);
}
} // namespace ipc
#include "ipc/internals/call-queue.hpp"
#include "ipc/exception.hpp"
#include "logger/logger.hpp"
+#include <algorithm>
namespace vasum {
namespace ipc {
return ++mMessageIDCounter;
}
+bool CallQueue::erase(const MessageID messageID)
+{
+ LOGT("Erase messgeID: " << messageID);
+ auto it = std::find(mCalls.begin(), mCalls.end(), messageID);
+ if (it == mCalls.end()) {
+ LOGT("No such messgeID");
+ return false;
+ }
+
+ mCalls.erase(it);
+ LOGT("Erased");
+ return true;
+}
+
CallQueue::Call CallQueue::pop()
{
if (isEmpty()) {
throw IPCException("CallQueue is empty");
}
Call call = std::move(mCalls.front());
- mCalls.pop();
+ mCalls.pop_front();
return call;
}
#include "ipc/types.hpp"
#include "config/manager.hpp"
+#include "logger/logger-scope.hpp"
#include <atomic>
-#include <queue>
+#include <list>
namespace vasum {
namespace ipc {
struct Call {
Call(const Call& other) = delete;
Call& operator=(const Call&) = delete;
+ Call& operator=(Call&&) = default;
Call() = default;
Call(Call&&) = default;
+ bool operator==(const MessageID m)
+ {
+ return m == messageID;
+ }
+
FileDescriptor peerFD;
MethodID methodID;
MessageID messageID;
Call pop();
+ bool erase(const MessageID messageID);
+
bool isEmpty() const;
private:
- std::queue<Call> mCalls;
+ std::list<Call> mCalls;
std::atomic<MessageID> mMessageIDCounter;
MessageID getNextMessageID();
call.messageID = messageID;
call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+ LOGS("Method serialize, peerFD: " << fd);
config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
};
call.parse = [](const int fd)->std::shared_ptr<void> {
+ LOGS("Method parse, peerFD: " << fd);
std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
config::loadFromFD<ReceivedDataType>(fd, *data);
return data;
};
call.process = [process](Status status, std::shared_ptr<void>& data)->void {
+ LOGS("Method process, status: " << toString(status));
std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
return process(status, tmpData);
};
- mCalls.push(std::move(call));
+ mCalls.push_back(std::move(call));
return messageID;
}
call.messageID = messageID;
call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+ LOGS("Signal serialize, peerFD: " << fd);
config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
};
- mCalls.push(std::move(call));
+ mCalls.push_back(std::move(call));
return messageID;
}
mRemovedPeerCallback(removedPeerCallback),
mMaxNumberOfPeers(maxNumberOfPeers)
{
- LOGT("Creating Processor");
+ LOGS("Processor Constructor");
utils::signalBlock(SIGPIPE);
using namespace std::placeholders;
addMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
std::bind(&Processor::onNewSignals, this, _1, _2));
-
}
Processor::~Processor()
{
- LOGT("Destroying Processor");
+ LOGS("Processor Destructor");
try {
stop();
} catch (IPCException& e) {
LOGE("Error in Processor's destructor: " << e.what());
}
-
- LOGT("Destroyed Processor");
}
bool Processor::isStarted()
void Processor::start()
{
- LOGT("Starting Processor");
+ LOGS("Processor start");
+
if (!isStarted()) {
mThread = std::thread(&Processor::run, this);
}
- LOGT("Started Processor");
}
void Processor::stop()
{
- LOGT("Stopping Processor");
+ LOGS("Processor stop");
if (isStarted()) {
- mEventQueue.send(Event::FINISH);
+ {
+ Lock lock(mStateMutex);
+ mEventQueue.send(Event::FINISH);
+ }
+ LOGT("Waiting for the Processor to stop");
mThread.join();
}
-
- LOGT("Stopped Processor");
}
void Processor::setNewPeerCallback(const PeerCallback& newPeerCallback)
{
- Lock lock(mCallbacksMutex);
+ Lock lock(mStateMutex);
mNewPeerCallback = newPeerCallback;
}
void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
{
- Lock lock(mCallbacksMutex);
+ Lock lock(mStateMutex);
mRemovedPeerCallback = removedPeerCallback;
}
FileDescriptor Processor::getEventFD()
{
+ Lock lock(mStateMutex);
return mEventQueue.getFD();
}
void Processor::removeMethod(const MethodID methodID)
{
- LOGT("Removing method " << methodID);
- Lock lock(mCallsMutex);
+ Lock lock(mStateMutex);
mMethodsCallbacks.erase(methodID);
}
FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
{
- LOGT("Adding socket");
- FileDescriptor peerFD;
- {
- Lock lock(mSocketsMutex);
- peerFD = socketPtr->getFD();
- SocketInfo socketInfo(peerFD, std::move(socketPtr));
- mNewSockets.push(std::move(socketInfo));
- mEventQueue.send(Event::ADD_PEER);
- }
+ LOGS("Processor addPeer");
+ Lock lock(mStateMutex);
+ FileDescriptor peerFD = socketPtr->getFD();
+ SocketInfo socketInfo(peerFD, std::move(socketPtr));
+ mNewSockets.push(std::move(socketInfo));
+ mEventQueue.send(Event::ADD_PEER);
+
LOGI("New peer added. Id: " << peerFD);
return peerFD;
void Processor::removePeer(const FileDescriptor peerFD)
{
- std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
+ LOGS("Processor removePeer peerFD: " << peerFD);
+
+ // TODO: Remove ADD_PEER event if it's not processed
+
+ // Remove peer and wait till he's gone
+ std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
{
- Lock lock(mSocketsMutex);
+ Lock lock(mStateMutex);
RemovePeerRequest request(peerFD, conditionPtr);
mPeersToDelete.push(std::move(request));
mEventQueue.send(Event::REMOVE_PEER);
}
-
auto isPeerDeleted = [&peerFD, this]()->bool {
- Lock lock(mSocketsMutex);
+ Lock lock(mStateMutex);
return mSockets.count(peerFD) == 0;
};
void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
{
- LOGW("Removing peer. ID: " << peerFD);
- {
- Lock lock(mSocketsMutex);
- if (!mSockets.erase(peerFD)) {
- LOGW("No such peer. Another thread called removePeerInternal");
- return;
- }
+ LOGS("Processor removePeerInternal peerFD: " << peerFD);
+ LOGI("Removing peer. peerFD: " << peerFD);
- // Remove from signal addressees
- for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
- it->second.remove(peerFD);
- if (it->second.empty()) {
- it = mSignalsPeers.erase(it);
- } else {
- ++it;
- }
- }
+ if (!mSockets.erase(peerFD)) {
+ LOGW("No such peer. Another thread called removePeerInternal");
+ return;
}
- {
- // Erase associated return value callbacks
- Lock lock(mReturnCallbacksMutex);
-
- std::shared_ptr<void> data;
- for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
- if (it->second.peerFD == peerFD) {
- IGNORE_EXCEPTIONS(it->second.process(status, data));
- it = mReturnCallbacks.erase(it);
- } else {
- ++it;
- }
+ // Remove from signal addressees
+ for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
+ it->second.remove(peerFD);
+ if (it->second.empty()) {
+ it = mSignalsPeers.erase(it);
+ } else {
+ ++it;
}
}
-
- {
- Lock lock(mCallbacksMutex);
- if (mRemovedPeerCallback) {
- // Notify about the deletion
- mRemovedPeerCallback(peerFD);
+ // Erase associated return value callbacks
+ std::shared_ptr<void> data;
+ for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
+ if (it->second.peerFD == peerFD) {
+ IGNORE_EXCEPTIONS(it->second.process(status, data));
+ it = mReturnCallbacks.erase(it);
+ } else {
+ ++it;
}
}
+ if (mRemovedPeerCallback) {
+ // Notify about the deletion
+ mRemovedPeerCallback(peerFD);
+ }
+
+
resetPolling();
}
return;
}
- LOGI("Resetting polling");
- // Setup polling on eventfd and sockets
- Lock lock(mSocketsMutex);
- mFDs.resize(mSockets.size() + 1);
+ {
+ Lock lock(mStateMutex);
- mFDs[0].fd = mEventQueue.getFD();
- mFDs[0].events = POLLIN;
+ // Setup polling on eventfd and sockets
+ mFDs.resize(mSockets.size() + 1);
- auto socketIt = mSockets.begin();
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- mFDs[i].fd = socketIt->second->getFD();
- mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
- ++socketIt;
- // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
+ mFDs[0].fd = mEventQueue.getFD();
+ mFDs[0].events = POLLIN;
+
+ auto socketIt = mSockets.begin();
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
+ mFDs[i].fd = socketIt->second->getFD();
+ mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
+ ++socketIt;
+ // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
+ }
}
}
void Processor::run()
{
+ LOGS("Processor run");
+
resetPolling();
mIsRunning = true;
bool Processor::handleLostConnections()
{
- std::vector<FileDescriptor> peersToRemove;
+ Lock lock(mStateMutex);
+
+ bool isPeerRemoved = false;
{
- Lock lock(mSocketsMutex);
for (unsigned int i = 1; i < mFDs.size(); ++i) {
if (mFDs[i].revents & POLLHUP) {
LOGI("Lost connection to peer: " << mFDs[i].fd);
mFDs[i].revents &= ~(POLLHUP);
- peersToRemove.push_back(mFDs[i].fd);
+ removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED);
+ isPeerRemoved = true;
}
}
}
- for (const FileDescriptor peerFD : peersToRemove) {
- removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
- }
-
- return !peersToRemove.empty();
+ return isPeerRemoved;
}
bool Processor::handleLostConnection(const FileDescriptor peerFD)
{
+ Lock lock(mStateMutex);
removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
return true;
}
bool Processor::handleInputs()
{
- std::vector<FileDescriptor> peersWithInput;
- {
- Lock lock(mSocketsMutex);
- for (unsigned int i = 1; i < mFDs.size(); ++i) {
- if (mFDs[i].revents & POLLIN) {
- mFDs[i].revents &= ~(POLLIN);
- peersWithInput.push_back(mFDs[i].fd);
- }
- }
- }
+ Lock lock(mStateMutex);
bool pollChanged = false;
- // Handle input outside the critical section
- for (const FileDescriptor peerFD : peersWithInput) {
- pollChanged = pollChanged || handleInput(peerFD);
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
+ if (mFDs[i].revents & POLLIN) {
+ mFDs[i].revents &= ~(POLLIN);
+ pollChanged = pollChanged || handleInput(mFDs[i].fd);
+ }
}
+
return pollChanged;
}
bool Processor::handleInput(const FileDescriptor peerFD)
{
- LOGT("Handle incoming data");
+ LOGS("Processor handleInput peerFD: " << peerFD);
+ Lock lock(mStateMutex);
std::shared_ptr<Socket> socketPtr;
try {
// Get the peer's socket
- Lock lock(mSocketsMutex);
socketPtr = mSockets.at(peerFD);
} catch (const std::out_of_range&) {
LOGE("No such peer: " << peerFD);
return onReturnValue(*socketPtr, messageID);
} else {
- Lock lock(mCallsMutex);
if (mMethodsCallbacks.count(methodID)) {
// Method
std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
- lock.unlock();
return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
} else if (mSignalsCallbacks.count(methodID)) {
// Signal
std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
- lock.unlock();
return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
} else {
// Nothing
- lock.unlock();
LOGW("No method or signal callback for methodID: " << methodID);
removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
return true;
std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
std::shared_ptr<RegisterSignalsMessage>& data)
{
- LOGD("New signals for peer: " << peerFD);
- Lock lock(mSocketsMutex);
- for (MethodID methodID : data->ids) {
+ LOGS("Processor onNewSignals peerFD: " << peerFD);
+
+ for (const MethodID methodID : data->ids) {
mSignalsPeers[methodID].push_back(peerFD);
}
bool Processor::onReturnValue(const Socket& socket,
const MessageID messageID)
{
- LOGI("Return value for messageID: " << messageID);
+ LOGS("Processor onReturnValue messageID: " << messageID);
+
+ // LOGI("Return value for messageID: " << messageID);
ReturnCallbacks returnCallbacks;
try {
- Lock lock(mReturnCallbacksMutex);
LOGT("Getting the return callback");
returnCallbacks = std::move(mReturnCallbacks.at(messageID));
mReturnCallbacks.erase(messageID);
return true;
}
- LOGT("Process return value callback for messageID: " << messageID);
+ // LOGT("Process return value callback for messageID: " << messageID);
IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
+ // LOGT("Return value for messageID: " << messageID << " processed");
return false;
}
const MessageID messageID,
std::shared_ptr<SignalHandlers> signalCallbacks)
{
- LOGI("Remote signal; methodID: " << methodID << " messageID: " << messageID);
+ LOGS("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
+
+ // LOGI("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
std::shared_ptr<void> data;
try {
return true;
}
- LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
+ // LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
try {
signalCallbacks->signal(socket.getFD(), data);
} catch (const std::exception& e) {
const MessageID messageID,
std::shared_ptr<MethodHandlers> methodCallbacks)
{
- LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
+ LOGS("Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
+ // LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
std::shared_ptr<void> data;
try {
bool Processor::handleEvent()
{
+ LOGS("Processor handleEvent");
+
+ Lock lock(mStateMutex);
+
switch (mEventQueue.receive()) {
+
case Event::FINISH: {
LOGD("Event FINISH");
-
mIsRunning = false;
cleanCommunication();
-
return false;
}
bool Processor::onNewPeer()
{
- SocketInfo socketInfo;
- {
- Lock lock(mSocketsMutex);
+ LOGS("Processor onNewPeer");
- socketInfo = std::move(mNewSockets.front());
- mNewSockets.pop();
-
- if (mSockets.size() > mMaxNumberOfPeers) {
- LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
- return false;
- }
- if (mSockets.count(socketInfo.peerFD) != 0) {
- LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
- return false;
- }
+ // TODO: What if there is no newSocket? (request removed in the mean time)
+ // Add new socket of the peer
+ SocketInfo socketInfo = std::move(mNewSockets.front());
+ mNewSockets.pop();
- mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
+ if (mSockets.size() > mMaxNumberOfPeers) {
+ LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
+ return false;
+ }
+ if (mSockets.count(socketInfo.peerFD) != 0) {
+ LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
+ return false;
}
+ mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
- // Broadcast the new signal to peers
- LOGW("Sending handled signals");
- std::list<FileDescriptor> peersFDs;
- {
- Lock lock(mSocketsMutex);
- for (const auto kv : mSockets) {
- peersFDs.push_back(kv.first);
- }
- }
+ // LOGW("Sending handled signals");
std::vector<MethodID> ids;
- {
- Lock lock(mSocketsMutex);
- for (const auto kv : mSignalsCallbacks) {
- ids.push_back(kv.first);
- }
+ for (const auto kv : mSignalsCallbacks) {
+ ids.push_back(kv.first);
}
auto data = std::make_shared<RegisterSignalsMessage>(ids);
-
- for (const FileDescriptor peerFD : peersFDs) {
- callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
- peerFD,
- data,
- discardResultHandler<EmptyData>);
- }
- LOGW("Sent handled signals");
-
+ callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
+ socketInfo.peerFD,
+ data,
+ discardResultHandler<EmptyData>);
+ // LOGW("Sent handled signals");
resetPolling();
- {
- Lock lock(mCallbacksMutex);
- if (mNewPeerCallback) {
- // Notify about the new user.
- LOGT("Calling NewPeerCallback");
- mNewPeerCallback(socketInfo.peerFD);
- }
+ if (mNewPeerCallback) {
+ // Notify about the new user.
+ LOGT("Calling NewPeerCallback");
+ mNewPeerCallback(socketInfo.peerFD);
}
+
return true;
}
bool Processor::onRemovePeer()
{
- RemovePeerRequest request;
- {
- Lock lock(mSocketsMutex);
- request = std::move(mPeersToDelete.front());
- mPeersToDelete.pop();
- }
+ LOGS("Processor onRemovePeer");
- removePeerInternal(request.peerFD, Status::REMOVED_PEER);
- request.conditionPtr->notify_all();
- return true;
-}
+ removePeerInternal(mPeersToDelete.front().peerFD, Status::REMOVED_PEER);
-CallQueue::Call Processor::getCall()
-{
- Lock lock(mCallsMutex);
- return mCalls.pop();
+ mPeersToDelete.front().conditionPtr->notify_all();
+ mPeersToDelete.pop();
+ return true;
}
bool Processor::onCall()
{
- LOGT("Handle call (from another thread) to send a message.");
- CallQueue::Call call = getCall();
+ LOGS("Processor onCall");
+ CallQueue::Call call;
+ try {
+ call = std::move(mCalls.pop());
+ } catch (const IPCException&) {
+ LOGE("No calls to serve, but got an EVENT::CALL. Event got removed before serving");
+ return false;
+ }
if (call.parse && call.process) {
return onMethodCall(call);
bool Processor::onSignalCall(CallQueue::Call& call)
{
+ LOGS("Processor onSignalCall");
+
std::shared_ptr<Socket> socketPtr;
try {
// Get the peer's socket
- Lock lock(mSocketsMutex);
socketPtr = mSockets.at(call.peerFD);
} catch (const std::out_of_range&) {
LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
}
return false;
-
}
bool Processor::onMethodCall(CallQueue::Call& call)
{
+ LOGS("Processor onMethodCall");
std::shared_ptr<Socket> socketPtr;
+
+
try {
// Get the peer's socket
- Lock lock(mSocketsMutex);
socketPtr = mSockets.at(call.peerFD);
} catch (const std::out_of_range&) {
LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
return false;
}
- {
- // Set what to do with the return message, but only if needed
- Lock lock(mReturnCallbacksMutex);
- if (mReturnCallbacks.count(call.messageID) != 0) {
- LOGE("There already was a return callback for messageID: " << call.messageID);
- }
- mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
- std::move(call.parse),
- std::move(call.process)));
+ if (mReturnCallbacks.count(call.messageID) != 0) {
+ LOGE("There already was a return callback for messageID: " << call.messageID);
}
+ mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
+ std::move(call.parse),
+ std::move(call.process)));
try {
// Send the call with the socket
Socket::Guard guard = socketPtr->getGuard();
socketPtr->write(&call.methodID, sizeof(call.methodID));
socketPtr->write(&call.messageID, sizeof(call.messageID));
+ LOGT("Serializing the message");
call.serialize(socketPtr->getFD(), call.data);
} catch (const std::exception& e) {
LOGE("Error during sending a method: " << e.what());
// Inform about the error,
IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data));
- {
- Lock lock(mReturnCallbacksMutex);
- mReturnCallbacks.erase(call.messageID);
- }
+ mReturnCallbacks.erase(call.messageID);
removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
+
return true;
+
}
return false;
void Processor::cleanCommunication()
{
+ LOGS("Processor cleanCommunication");
+
while (!mEventQueue.isEmpty()) {
switch (mEventQueue.receive()) {
case Event::FINISH: {
- LOGD("Event FINISH after FINISH");
+ LOGE("Event FINISH after FINISH");
break;
}
case Event::CALL: {
- LOGD("Event CALL after FINISH");
- CallQueue::Call call = getCall();
- if (call.process) {
- IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
+ LOGW("Event CALL after FINISH");
+ try {
+ CallQueue::Call call = mCalls.pop();
+ if (call.process) {
+ IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
+ }
+ } catch (const IPCException&) {
+ // No more calls
}
break;
}
case Event::ADD_PEER: {
- LOGD("Event ADD_PEER after FINISH");
+ LOGW("Event ADD_PEER after FINISH");
break;
}
case Event::REMOVE_PEER: {
- LOGD("Event REMOVE_PEER after FINISH");
- RemovePeerRequest request;
- {
- Lock lock(mSocketsMutex);
- request = std::move(mPeersToDelete.front());
- mPeersToDelete.pop();
- }
- request.conditionPtr->notify_all();
+ LOGW("Event REMOVE_PEER after FINISH");
+ mPeersToDelete.front().conditionPtr->notify_all();
+ mPeersToDelete.pop();
break;
}
}
#include "config/manager.hpp"
#include "config/fields.hpp"
#include "logger/logger.hpp"
+#include "logger/logger-scope.hpp"
#include <poll.h>
#include <condition_variable>
* - new way to generate UIDs
* - callbacks for serialization/parsing
* - store Sockets in a vector, maybe SocketStore?
-* - fix valgrind tests
* - poll loop outside.
* - waiting till the EventQueue is empty before leaving stop()
* - no new events added after stop() called
* - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
* there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
-*
+* - EventQueue should store std::shared_ptr<void> and it should be the only queue to the Processor thread.
+* It should have an API for removing events from the middle of the queue
*
*/
class Processor {
private:
typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
- typedef std::unique_lock<std::mutex> Lock;
+ typedef std::unique_lock<std::recursive_mutex> Lock;
struct EmptyData {
CONFIG_REGISTER_EMPTY
bool mIsRunning;
- // Mutex for the Calls queue and the map of methods.
- std::mutex mCallsMutex;
+
CallQueue mCalls;
std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
- // Mutex for changing mSockets map.
- // Shouldn't be locked on any read/write, that could block. Just copy the ptr.
- std::mutex mSocketsMutex;
std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
std::vector<struct pollfd> mFDs;
std::queue<SocketInfo> mNewSockets;
std::queue<RemovePeerRequest> mPeersToDelete;
- // Mutex for modifying the map with return callbacks
- std::mutex mReturnCallbacksMutex;
std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
- // Mutex for setting callbacks
- std::mutex mCallbacksMutex;
+ // Mutex for modifying any internal data
+ std::recursive_mutex mStateMutex;
+
PeerCallback mNewPeerCallback;
PeerCallback mRemovedPeerCallback;
void addMethodHandlerInternal(const MethodID methodID,
const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
- template<typename SentDataType, typename ReceivedDataType>
- MessageID callInternal(const MethodID methodID,
- const FileDescriptor peerFD,
- const std::shared_ptr<SentDataType>& data,
- const typename ResultHandler<ReceivedDataType>::type& process);
-
template<typename ReceivedDataType>
static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
std::shared_ptr<SignalHandlers> signalCallbacks);
void resetPolling();
FileDescriptor getNextFileDescriptor();
- CallQueue::Call getCall();
void removePeerInternal(const FileDescriptor peerFD, Status status);
std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
};
{
- Lock lock(mCallsMutex);
+ Lock lock(mStateMutex);
mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
}
}
}
{
- Lock lock(mCallsMutex);
+ Lock lock(mStateMutex);
+
if (mSignalsCallbacks.count(methodID)) {
LOGE("MethodID used by a signal: " << methodID);
throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
}
+
+ addMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
}
- addMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
}
template<typename ReceivedDataType>
throw IPCException("Forbidden methodID: " + std::to_string(methodID));
}
+ std::shared_ptr<RegisterSignalsMessage> data;
+ std::vector<FileDescriptor> peerFDs;
{
- Lock lock(mCallsMutex);
+ Lock lock(mStateMutex);
+
+ // Andd the signal handler:
if (mMethodsCallbacks.count(methodID)) {
LOGE("MethodID used by a method: " << methodID);
throw IPCException("MethodID used by a method: " + std::to_string(methodID));
}
- }
- SignalHandlers signalCall;
+ SignalHandlers signalCall;
- signalCall.parse = [](const int fd)->std::shared_ptr<void> {
- std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
- config::loadFromFD<ReceivedDataType>(fd, *data);
- return data;
- };
+ signalCall.parse = [](const int fd)->std::shared_ptr<void> {
+ std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
+ config::loadFromFD<ReceivedDataType>(fd, *data);
+ return data;
+ };
- signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
- std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- handler(peerFD, tmpData);
- };
+ signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
+ std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
+ handler(peerFD, tmpData);
+ };
- {
- Lock lock(mCallsMutex);
mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
- }
- std::vector<MethodID> ids {methodID};
- auto data = std::make_shared<RegisterSignalsMessage>(ids);
+ // Broadcast the new signal:
+ std::vector<MethodID> ids {methodID};
+ data = std::make_shared<RegisterSignalsMessage>(ids);
- std::list<FileDescriptor> peersFDs;
- {
- Lock lock(mSocketsMutex);
for (const auto kv : mSockets) {
- peersFDs.push_back(kv.first);
+ peerFDs.push_back(kv.first);
}
}
- for (const FileDescriptor peerFD : peersFDs) {
+ for (const auto peerFD : peerFDs) {
callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
peerFD,
data,
DEFAULT_METHOD_TIMEOUT);
}
-
}
-template<typename SentDataType, typename ReceivedDataType>
-MessageID Processor::callInternal(const MethodID methodID,
- const FileDescriptor peerFD,
- const std::shared_ptr<SentDataType>& data,
- const typename ResultHandler<ReceivedDataType>::type& process)
-{
- Lock lock(mCallsMutex);
- MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
- mEventQueue.send(Event::CALL);
-
- return messageID;
-}
template<typename SentDataType, typename ReceivedDataType>
MessageID Processor::callAsync(const MethodID methodID,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process)
{
- return callInternal<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
+ Lock lock(mStateMutex);
+ MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
+ mEventQueue.send(Event::CALL);
+
+ return messageID;
}
Status returnStatus = ipc::Status::UNDEFINED;
auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
- std::unique_lock<std::mutex> lock(mutex);
returnStatus = status;
result = returnedData;
cv.notify_all();
std::unique_lock<std::mutex> lock(mutex);
LOGT("Waiting for the response...");
- // TODO: There is a race here. mReturnCallbacks were used to indicate if the return call was served or not,
- // but if the timeout occurs before the call is even sent, then this method is broken.
if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
- // Timeout occurred:
- // - call isn't sent => delete it
- // - call is sent and no reply => throw IPCTimeoutError
- // - call is being serviced => wait for it with the same timeout
- LOGT("Probably a timeout in callSync. Checking...");
-
- bool isTimeout = false;
+ LOGW("Probably a timeout in callSync. Checking...");
+ bool isTimeout;
{
- Lock lock(mReturnCallbacksMutex);
- if (1 == mReturnCallbacks.erase(messageID)) {
- // Return callback was present, so there was a timeout
- isTimeout = true;
- }
+ Lock lock(mStateMutex);
+ // Call isn't sent or call is sent but there is no reply
+ isTimeout = mCalls.erase(messageID) || 1 == mReturnCallbacks.erase(messageID);
}
+
if (isTimeout) {
- removePeer(peerFD);
LOGE("Function call timeout; methodID: " << methodID);
+ removePeer(peerFD);
throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
} else {
- //Timeout started during the return value processing, so wait for it to finish
- cv.wait(lock, isResultInitialized);
+ LOGW("Timeout started during the return value processing, so wait for it to finish");
+ if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
+ LOGE("Function call timeout; methodID: " << methodID);
+ throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
+ }
}
}
void Processor::signal(const MethodID methodID,
const std::shared_ptr<SentDataType>& data)
{
- std::list<FileDescriptor> peersFDs;
- {
- Lock lock(mSocketsMutex);
- peersFDs = mSignalsPeers[methodID];
+ Lock lock(mStateMutex);
+ const auto it = mSignalsPeers.find(methodID);
+ if (it == mSignalsPeers.end()) {
+ LOGW("No peer is handling signal with methodID: " << methodID);
+ return;
}
-
- for (const FileDescriptor peerFD : peersFDs) {
- Lock lock(mCallsMutex);
+ for (const FileDescriptor peerFD : it->second) {
mCalls.push<SentDataType>(methodID, peerFD, data);
mEventQueue.send(Event::CALL);
}
const HandlerCallback& handlerCallback)
: mHandlerCallback(handlerCallback)
{
- LOGD("Constructing IPCGSource");
+ LOGS("IPCGSource constructor");
+
for (const FileDescriptor fd : fds) {
addFD(fd);
}
IPCGSource::~IPCGSource()
{
- LOGD("Destroying IPCGSource");
+ LOGS("~IPCGSource");
}
IPCGSource::Pointer IPCGSource::create(const std::vector<FileDescriptor>& fds,
const HandlerCallback& handlerCallback)
{
- LOGD("Creating IPCGSource");
+ LOGS("Creating IPCGSource");
static GSourceFuncs funcs = { &IPCGSource::prepare,
&IPCGSource::check,
void IPCGSource::addFD(const FileDescriptor fd)
{
+
if (!&mGSource) {
// In case it's called as a callback but the IPCGSource is destroyed
return;
}
+ LOGS("Adding fd to glib");
- LOGD("Adding fd to glib");
gpointer tag = g_source_add_unix_fd(&mGSource,
fd,
conditions);
return;
}
- LOGD("Removing fd from glib");
+ LOGS("Removing fd from glib");
auto it = std::find(mFDInfos.begin(), mFDInfos.end(), fd);
if (it == mFDInfos.end()) {
LOGE("No such fd");
guint IPCGSource::attach(GMainContext* context)
{
- LOGD("Attaching to GMainContext");
+ LOGS("Attaching to GMainContext");
guint ret = g_source_attach(&mGSource, context);
g_source_unref(&mGSource);
return ret;
void IPCGSource::finalize(GSource* gSource)
{
+ LOGS("IPCGSource Finalize");
+
if (gSource) {
IPCGSource* source = reinterpret_cast<IPCGSource*>(gSource);
source->~IPCGSource();
mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1))
{
- LOGD("Creating server");
+ LOGS("Service Constructor");
}
Service::~Service()
{
- LOGD("Destroying server...");
+ LOGS("Service Destructor");
try {
stop();
} catch (IPCException& e) {
LOGE("Error in Service's destructor: " << e.what());
}
- LOGD("Destroyed");
}
void Service::start()
{
- LOGD("Starting server");
+ LOGS("Service start");
mProcessor.start();
// There can be an incoming connection from mAcceptor before mProcessor is listening,
// but it's OK. It will handle the connection when ready. So no need to wait for mProcessor.
mAcceptor.start();
-
- LOGD("Started server");
}
bool Service::isStarted()
void Service::stop()
{
- LOGD("Stopping server..");
+ LOGS("Service stop");
mAcceptor.stop();
mProcessor.stop();
- LOGD("Stopped");
}
std::vector<FileDescriptor> Service::getFDs()
void Service::setNewPeerCallback(const PeerCallback& newPeerCallback)
{
+ LOGS("Service setNewPeerCallback");
mProcessor.setNewPeerCallback(newPeerCallback);
}
void Service::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
{
+ LOGS("Service setRemovedPeerCallback");
mProcessor.setRemovedPeerCallback(removedPeerCallback);
}
void Service::removeMethod(const MethodID methodID)
{
- LOGD("Removing method " << methodID);
+ LOGS("Service removeMethod methodID: " << methodID);
mProcessor.removeMethod(methodID);
- LOGD("Removed " << methodID);
}
void Service::addMethodHandler(const MethodID methodID,
const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
{
- LOGD("Adding method with id " << methodID);
+ LOGS("Service addMethodHandler, methodID " << methodID);
mProcessor.addMethodHandler<SentDataType, ReceivedDataType>(methodID, method);
- LOGD("Added method with id " << methodID);
}
template<typename ReceivedDataType>
void Service::addSignalHandler(const MethodID methodID,
const typename SignalHandler<ReceivedDataType>::type& handler)
{
- LOGD("Adding signal with id " << methodID);
+ LOGS("Service addSignalHandler, methodID " << methodID);
mProcessor.addSignalHandler<ReceivedDataType>(methodID, handler);
- LOGD("Added signal with id " << methodID);
}
template<typename SentDataType, typename ReceivedDataType>
const std::shared_ptr<SentDataType>& data,
unsigned int timeoutMS)
{
- LOGD("Sync calling method: " << methodID << " for user: " << peerFD);
+ LOGS("Service callSync, methodID: " << methodID
+ << ", peerFD: " << peerFD
+ << ", timeoutMS: " << timeoutMS);
return mProcessor.callSync<SentDataType, ReceivedDataType>(methodID, peerFD, data, timeoutMS);
}
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& resultCallback)
{
- LOGD("Async calling method: " << methodID << " for user: " << peerFD);
+ LOGS("Service callAsync, methodID: " << methodID << ", peerFD: " << peerFD);
mProcessor.callAsync<SentDataType,
ReceivedDataType>(methodID,
peerFD,
data,
resultCallback);
- LOGD("Async called method: " << methodID << "for user: " << peerFD);
}
template<typename SentDataType>
void Service::signal(const MethodID methodID,
const std::shared_ptr<SentDataType>& data)
{
- LOGD("Signaling: " << methodID);
+ LOGS("Service signal, methodID: " << methodID);
mProcessor.signal<SentDataType>(methodID, data);
- LOGD("Signaled: " << methodID);
}
} // namespace ipc
const int SHORT_OPERATION_TIME = TIMEOUT / 100;
// Time that will cause "TIMEOUT" methods to throw
-const int LONG_OPERATION_TIME = 3 * TIMEOUT;
+const int LONG_OPERATION_TIME = 500 + TIMEOUT;
struct Fixture {
std::string socketPath;
};
struct LongSendData {
- LongSendData(int i = 0, int waitTime = 1000): mSendData(i), mWaitTime(waitTime), intVal(i) {}
+ LongSendData(int i, int waitTime): mSendData(i), mWaitTime(waitTime), intVal(i) {}
template<typename Visitor>
void accept(Visitor visitor)
c.addSignalHandler<SendData>(1, handlerA);
c.addSignalHandler<SendData>(2, handlerB);
+ // Wait for the signals to propagate to the Service
+ std::this_thread::sleep_for(std::chrono::milliseconds(2 * TIMEOUT));
+
auto data = std::make_shared<SendData>(1);
s.signal<SendData>(2, data);
s.signal<SendData>(1, data);