namespace ipc {
Client::Client(const std::string& socketPath)
- : mSocketPath(socketPath)
+ : mProcessor("[CLIENT] "),
+ mSocketPath(socketPath)
{
LOGS("Client Constructor");
}
}
}
-void Client::connect()
+void Client::start(const bool usesExternalPolling)
{
+ LOGS("Client start");
// Initialize the connection with the server
LOGD("Connecting to " + mSocketPath);
auto socketPtr = std::make_shared<Socket>(Socket::connectSocket(mSocketPath));
mServiceFD = mProcessor.addPeer(socketPtr);
-}
-
-void Client::start()
-{
- LOGS("Client start");
- connect();
- mProcessor.start();
+ mProcessor.start(usesExternalPolling);
}
bool Client::isStarted()
Client& operator=(const Client&) = delete;
/**
- * Places a connection request in the internal event queue.
- *
- * Used with an external polling loop.
- */
- void connect();
-
- /**
* Starts the worker thread
+ *
+ * @param usesExternalPolling internal or external polling is used
*/
- void start();
+ void start(const bool usesExternalPolling = false);
/**
* @return is the communication thread running
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Processor's request to add a peer
+ */
+
+#ifndef COMMON_IPC_INTERNALS_ADD_PEER_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_ADD_PEER_REQUEST_HPP
+
+#include "ipc/types.hpp"
+#include "ipc/internals/socket.hpp"
+
+namespace vasum {
+namespace ipc {
+
+class AddPeerRequest {
+public:
+ AddPeerRequest(const AddPeerRequest&) = delete;
+ AddPeerRequest& operator=(const AddPeerRequest&) = delete;
+
+ AddPeerRequest(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
+ : peerFD(peerFD),
+ socketPtr(socketPtr)
+ {
+ }
+
+ FileDescriptor peerFD;
+ std::shared_ptr<Socket> socketPtr;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_ADD_PEER_REQUEST_HPP
+++ /dev/null
-/*
-* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
-*
-* Contact: Jan Olszak <j.olszak@samsung.com>
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License
-*/
-
-/**
- * @file
- * @author Jan Olszak (j.olszak@samsung.com)
- * @brief Managing the queue with calls
- */
-
-#include "config.hpp"
-
-#include "ipc/internals/call-queue.hpp"
-#include "ipc/exception.hpp"
-#include "logger/logger.hpp"
-#include <algorithm>
-
-namespace vasum {
-namespace ipc {
-
-CallQueue::CallQueue()
- : mMessageIDCounter(0)
-{
-}
-
-CallQueue::~CallQueue()
-{
-}
-
-bool CallQueue::isEmpty() const
-{
- return mCalls.empty();
-}
-
-MessageID CallQueue::getNextMessageID()
-{
- // TODO: This method of generating UIDs is buggy. To be changed.
- return ++mMessageIDCounter;
-}
-
-bool CallQueue::erase(const MessageID messageID)
-{
- LOGT("Erase messgeID: " << messageID);
- auto it = std::find(mCalls.begin(), mCalls.end(), messageID);
- if (it == mCalls.end()) {
- LOGT("No such messgeID");
- return false;
- }
-
- mCalls.erase(it);
- LOGT("Erased");
- return true;
-}
-
-CallQueue::Call CallQueue::pop()
-{
- if (isEmpty()) {
- LOGE("CallQueue is empty");
- throw IPCException("CallQueue is empty");
- }
- Call call = std::move(mCalls.front());
- mCalls.pop_front();
- return call;
-}
-
-} // namespace ipc
-} // namespace vasum
+++ /dev/null
-/*
-* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
-*
-* Contact: Jan Olszak <j.olszak@samsung.com>
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License
-*/
-
-/**
- * @file
- * @author Jan Olszak (j.olszak@samsung.com)
- * @brief Managing the queue with calls
- */
-
-#ifndef COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
-#define COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
-
-#include "ipc/types.hpp"
-#include "config/manager.hpp"
-#include "logger/logger-scope.hpp"
-
-#include <atomic>
-#include <list>
-
-namespace vasum {
-namespace ipc {
-
-/**
-* Class for managing a queue of calls in the Processor
-*/
-class CallQueue {
-public:
- typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
- typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
-
- struct Call {
- Call(const Call& other) = delete;
- Call& operator=(const Call&) = delete;
- Call& operator=(Call&&) = default;
- Call() = default;
- Call(Call&&) = default;
-
- bool operator==(const MessageID m)
- {
- return m == messageID;
- }
-
- FileDescriptor peerFD;
- MethodID methodID;
- MessageID messageID;
- std::shared_ptr<void> data;
- SerializeCallback serialize;
- ParseCallback parse;
- ResultHandler<void>::type process;
- };
-
- CallQueue();
- ~CallQueue();
-
- CallQueue(const CallQueue&) = delete;
- CallQueue(CallQueue&&) = delete;
- CallQueue& operator=(const CallQueue&) = delete;
-
- template<typename SentDataType, typename ReceivedDataType>
- MessageID push(const MethodID methodID,
- const FileDescriptor peerFD,
- const std::shared_ptr<SentDataType>& data,
- const typename ResultHandler<ReceivedDataType>::type& process);
-
-
- template<typename SentDataType>
- MessageID push(const MethodID methodID,
- const FileDescriptor peerFD,
- const std::shared_ptr<SentDataType>& data);
-
- Call pop();
-
- bool erase(const MessageID messageID);
-
- bool isEmpty() const;
-
-private:
- std::list<Call> mCalls;
- std::atomic<MessageID> mMessageIDCounter;
-
- MessageID getNextMessageID();
-};
-
-
-template<typename SentDataType, typename ReceivedDataType>
-MessageID CallQueue::push(const MethodID methodID,
- const FileDescriptor peerFD,
- const std::shared_ptr<SentDataType>& data,
- const typename ResultHandler<ReceivedDataType>::type& process)
-{
- Call call;
- call.methodID = methodID;
- call.peerFD = peerFD;
- call.data = data;
-
- MessageID messageID = getNextMessageID();
- call.messageID = messageID;
-
- call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
- LOGS("Method serialize, peerFD: " << fd);
- config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
- };
-
- call.parse = [](const int fd)->std::shared_ptr<void> {
- LOGS("Method parse, peerFD: " << fd);
- std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
- config::loadFromFD<ReceivedDataType>(fd, *data);
- return data;
- };
-
- call.process = [process](Status status, std::shared_ptr<void>& data)->void {
- LOGS("Method process, status: " << toString(status));
- std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- return process(status, tmpData);
- };
-
- mCalls.push_back(std::move(call));
-
- return messageID;
-}
-
-template<typename SentDataType>
-MessageID CallQueue::push(const MethodID methodID,
- const FileDescriptor peerFD,
- const std::shared_ptr<SentDataType>& data)
-{
- Call call;
- call.methodID = methodID;
- call.peerFD = peerFD;
- call.data = data;
-
- MessageID messageID = getNextMessageID();
- call.messageID = messageID;
-
- call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
- LOGS("Signal serialize, peerFD: " << fd);
- config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
- };
-
- mCalls.push_back(std::move(call));
-
- return messageID;
-}
-
-} // namespace ipc
-} // namespace vasum
-
-#endif // COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Managing the queue with requests
+ */
+
+#ifndef COMMON_IPC_INTERNALS_FINISH_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_FINISH_REQUEST_HPP
+
+#include <condition_variable>
+
+namespace vasum {
+namespace ipc {
+
+class FinishRequest {
+public:
+ FinishRequest(const FinishRequest&) = delete;
+ FinishRequest& operator=(const FinishRequest&) = delete;
+
+ FinishRequest(const std::shared_ptr<std::condition_variable_any>& conditionPtr)
+ : conditionPtr(conditionPtr)
+ {}
+
+ std::shared_ptr<std::condition_variable_any> conditionPtr;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_FINISH_REQUEST_HPP
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Processor's request to call a method
+ */
+
+#ifndef COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
+
+#include "ipc/types.hpp"
+#include "logger/logger-scope.hpp"
+#include "config/manager.hpp"
+
+namespace vasum {
+namespace ipc {
+
+class MethodRequest {
+public:
+ MethodRequest(const MethodRequest&) = delete;
+ MethodRequest& operator=(const MethodRequest&) = delete;
+
+ template<typename SentDataType, typename ReceivedDataType>
+ static std::shared_ptr<MethodRequest> create(const MethodID methodID,
+ const FileDescriptor peerFD,
+ const std::shared_ptr<SentDataType>& data,
+ const typename ResultHandler<ReceivedDataType>::type& process);
+
+ MethodID methodID;
+ FileDescriptor peerFD;
+ MessageID messageID;
+ std::shared_ptr<void> data;
+ SerializeCallback serialize;
+ ParseCallback parse;
+ ResultHandler<void>::type process;
+
+private:
+ MethodRequest(const MethodID methodID, const FileDescriptor peerFD)
+ : methodID(methodID),
+ peerFD(peerFD),
+ messageID(getNextMessageID())
+ {}
+};
+
+
+template<typename SentDataType, typename ReceivedDataType>
+std::shared_ptr<MethodRequest> MethodRequest::create(const MethodID methodID,
+ const FileDescriptor peerFD,
+ const std::shared_ptr<SentDataType>& data,
+ const typename ResultHandler<ReceivedDataType>::type& process)
+{
+ std::shared_ptr<MethodRequest> request(new MethodRequest(methodID, peerFD));
+
+ request->data = data;
+
+ request->serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+ LOGS("Method serialize, peerFD: " << fd);
+ config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
+ };
+
+ request->parse = [](const int fd)->std::shared_ptr<void> {
+ LOGS("Method parse, peerFD: " << fd);
+ std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
+ config::loadFromFD<ReceivedDataType>(fd, *data);
+ return data;
+ };
+
+ request->process = [process](Status status, std::shared_ptr<void>& data)->void {
+ LOGS("Method process, status: " << toString(status));
+ std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
+ return process(status, tmpData);
+ };
+
+ return request;
+}
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP
expr; \
} \
catch (const std::exception& e){ \
- LOGE("Callback threw an error: " << e.what()); \
+ LOGE(mLogPrefix + "Callback threw an error: " << e.what()); \
}
const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
-Processor::Processor(const PeerCallback& newPeerCallback,
+Processor::Processor(const std::string& logName,
+ const PeerCallback& newPeerCallback,
const PeerCallback& removedPeerCallback,
const unsigned int maxNumberOfPeers)
- : mNewPeerCallback(newPeerCallback),
+ : mLogPrefix(logName),
+ mIsRunning(false),
+ mNewPeerCallback(newPeerCallback),
mRemovedPeerCallback(removedPeerCallback),
mMaxNumberOfPeers(maxNumberOfPeers)
{
- LOGS("Processor Constructor");
+ LOGS(mLogPrefix + "Processor Constructor");
utils::signalBlock(SIGPIPE);
using namespace std::placeholders;
Processor::~Processor()
{
- LOGS("Processor Destructor");
+ LOGS(mLogPrefix + "Processor Destructor");
try {
stop();
} catch (IPCException& e) {
- LOGE("Error in Processor's destructor: " << e.what());
+ LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what());
}
}
bool Processor::isStarted()
{
- return mThread.joinable();
+ Lock lock(mStateMutex);
+ return mIsRunning;
}
-void Processor::start()
+void Processor::start(bool usesExternalPolling)
{
- LOGS("Processor start");
+ LOGS(mLogPrefix + "Processor start");
+ Lock lock(mStateMutex);
if (!isStarted()) {
- mThread = std::thread(&Processor::run, this);
+ LOGI(mLogPrefix + "Processor start");
+ mIsRunning = true;
+ if (!usesExternalPolling) {
+ mThread = std::thread(&Processor::run, this);
+ }
}
}
void Processor::stop()
{
- LOGS("Processor stop");
+ LOGS(mLogPrefix + "Processor stop");
if (isStarted()) {
+ auto conditionPtr = std::make_shared<std::condition_variable_any>();
{
Lock lock(mStateMutex);
- mEventQueue.send(Event::FINISH);
+ auto request = std::make_shared<FinishRequest>(conditionPtr);
+ mRequestQueue.push(Event::FINISH, request);
+ }
+
+ LOGD(mLogPrefix + "Waiting for the Processor to stop");
+
+ if (mThread.joinable()) {
+ mThread.join();
+ } else {
+ // Wait till the FINISH request is served
+ Lock lock(mStateMutex);
+ conditionPtr->wait(lock, [this]() {
+ return !isStarted();
+ });
}
- LOGT("Waiting for the Processor to stop");
- mThread.join();
}
}
FileDescriptor Processor::getEventFD()
{
Lock lock(mStateMutex);
- return mEventQueue.getFD();
+ return mRequestQueue.getFD();
}
void Processor::removeMethod(const MethodID methodID)
FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
{
- LOGS("Processor addPeer");
+ LOGS(mLogPrefix + "Processor addPeer");
Lock lock(mStateMutex);
+
FileDescriptor peerFD = socketPtr->getFD();
- SocketInfo socketInfo(peerFD, std::move(socketPtr));
- mNewSockets.push(std::move(socketInfo));
- mEventQueue.send(Event::ADD_PEER);
+ auto request = std::make_shared<AddPeerRequest>(peerFD, socketPtr);
+ mRequestQueue.push(Event::ADD_PEER, request);
- LOGI("New peer added. Id: " << peerFD);
+ LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD);
return peerFD;
}
void Processor::removePeer(const FileDescriptor peerFD)
{
- LOGS("Processor removePeer peerFD: " << peerFD);
-
- // TODO: Remove ADD_PEER event if it's not processed
+ LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD);
+ {
+ Lock lock(mStateMutex);
+ mRequestQueue.removeIf([peerFD](Request & request) {
+ return request.requestID == Event::ADD_PEER &&
+ request.get<AddPeerRequest>()->peerFD == peerFD;
+ });
+ }
// Remove peer and wait till he's gone
- std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
+ std::shared_ptr<std::condition_variable_any> conditionPtr(new std::condition_variable_any());
{
Lock lock(mStateMutex);
- RemovePeerRequest request(peerFD, conditionPtr);
- mPeersToDelete.push(std::move(request));
- mEventQueue.send(Event::REMOVE_PEER);
+ auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
+ mRequestQueue.push(Event::REMOVE_PEER, request);
}
auto isPeerDeleted = [&peerFD, this]()->bool {
- Lock lock(mStateMutex);
return mSockets.count(peerFD) == 0;
};
- std::mutex mutex;
- std::unique_lock<std::mutex> lock(mutex);
+ Lock lock(mStateMutex);
conditionPtr->wait(lock, isPeerDeleted);
}
void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
{
- LOGS("Processor removePeerInternal peerFD: " << peerFD);
- LOGI("Removing peer. peerFD: " << peerFD);
+ LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD);
+ LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD);
if (!mSockets.erase(peerFD)) {
- LOGW("No such peer. Another thread called removePeerInternal");
+ LOGW(mLogPrefix + "No such peer. Another thread called removePeerInternal");
return;
}
void Processor::resetPolling()
{
+ LOGS(mLogPrefix + "Processor resetPolling");
+
if (!isStarted()) {
+ LOGW(mLogPrefix + "Processor not started! Polling not reset!");
return;
}
{
Lock lock(mStateMutex);
-
+ LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
// Setup polling on eventfd and sockets
mFDs.resize(mSockets.size() + 1);
- mFDs[0].fd = mEventQueue.getFD();
+ mFDs[0].fd = mRequestQueue.getFD();
mFDs[0].events = POLLIN;
auto socketIt = mSockets.begin();
for (unsigned int i = 1; i < mFDs.size(); ++i) {
+ LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
mFDs[i].fd = socketIt->second->getFD();
mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
++socketIt;
void Processor::run()
{
- LOGS("Processor run");
+ LOGS(mLogPrefix + "Processor run");
resetPolling();
- mIsRunning = true;
- while (mIsRunning) {
- LOGT("Waiting for communication...");
+ while (isStarted()) {
+ LOGT(mLogPrefix + "Waiting for communication...");
int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
- LOGT("... incoming communication!");
+ LOGT(mLogPrefix + "... incoming communication!");
if (ret == -1 || ret == 0) {
if (errno == EINTR) {
continue;
}
- LOGE("Error in poll: " << std::string(strerror(errno)));
+ LOGE(mLogPrefix + "Error in poll: " << std::string(strerror(errno)));
throw IPCException("Error in poll: " + std::string(strerror(errno)));
}
{
for (unsigned int i = 1; i < mFDs.size(); ++i) {
if (mFDs[i].revents & POLLHUP) {
- LOGI("Lost connection to peer: " << mFDs[i].fd);
+ LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd);
mFDs[i].revents &= ~(POLLHUP);
removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED);
isPeerRemoved = true;
bool Processor::handleInput(const FileDescriptor peerFD)
{
- LOGS("Processor handleInput peerFD: " << peerFD);
+ LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD);
Lock lock(mStateMutex);
std::shared_ptr<Socket> socketPtr;
// Get the peer's socket
socketPtr = mSockets.at(peerFD);
} catch (const std::out_of_range&) {
- LOGE("No such peer: " << peerFD);
+ LOGE(mLogPrefix + "No such peer: " << peerFD);
return false;
}
socketPtr->read(&messageID, sizeof(messageID));
} catch (const IPCException& e) {
- LOGE("Error during reading the socket");
+ LOGE(mLogPrefix + "Error during reading the socket");
removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
return true;
}
} else {
// Nothing
- LOGW("No method or signal callback for methodID: " << methodID);
+ LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID);
removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
return true;
}
std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
std::shared_ptr<RegisterSignalsMessage>& data)
{
- LOGS("Processor onNewSignals peerFD: " << peerFD);
+ LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD);
for (const MethodID methodID : data->ids) {
mSignalsPeers[methodID].push_back(peerFD);
bool Processor::onReturnValue(const Socket& socket,
const MessageID messageID)
{
- LOGS("Processor onReturnValue messageID: " << messageID);
+ LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
- // LOGI("Return value for messageID: " << messageID);
+ // LOGI(mLogPrefix + "Return value for messageID: " << messageID);
ReturnCallbacks returnCallbacks;
try {
- LOGT("Getting the return callback");
+ LOGT(mLogPrefix + "Getting the return callback");
returnCallbacks = std::move(mReturnCallbacks.at(messageID));
mReturnCallbacks.erase(messageID);
} catch (const std::out_of_range&) {
- LOGW("No return callback for messageID: " << messageID);
+ LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
std::shared_ptr<void> data;
try {
- LOGT("Parsing incoming return data");
+ LOGT(mLogPrefix + "Parsing incoming return data");
data = returnCallbacks.parse(socket.getFD());
} catch (const std::exception& e) {
- LOGE("Exception during parsing: " << e.what());
+ LOGE(mLogPrefix + "Exception during parsing: " << e.what());
IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
- // LOGT("Process return value callback for messageID: " << messageID);
+ // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID);
IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
- // LOGT("Return value for messageID: " << messageID << " processed");
+ // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed");
return false;
}
const MessageID messageID,
std::shared_ptr<SignalHandlers> signalCallbacks)
{
- LOGS("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
+ LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
- // LOGI("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
+ // LOGI(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
std::shared_ptr<void> data;
try {
- LOGT("Parsing incoming data");
+ LOGT(mLogPrefix + "Parsing incoming data");
data = signalCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
- LOGE("Exception during parsing: " << e.what());
+ LOGE(mLogPrefix + "Exception during parsing: " << e.what());
removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
- // LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
+ // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID);
try {
signalCallbacks->signal(socket.getFD(), data);
} catch (const std::exception& e) {
- LOGE("Exception in method handler: " << e.what());
+ LOGE(mLogPrefix + "Exception in method handler: " << e.what());
removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
const MessageID messageID,
std::shared_ptr<MethodHandlers> methodCallbacks)
{
- LOGS("Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
- // LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
+ LOGS(mLogPrefix + "Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
+ // LOGI(mLogPrefix + "Remote call; methodID: " << methodID << " messageID: " << messageID);
std::shared_ptr<void> data;
try {
- LOGT("Parsing incoming data");
+ LOGT(mLogPrefix + "Parsing incoming data");
data = methodCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
- LOGE("Exception during parsing: " << e.what());
+ LOGE(mLogPrefix + "Exception during parsing: " << e.what());
removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
- LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
+ LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID);
std::shared_ptr<void> returnData;
try {
returnData = methodCallbacks->method(socket.getFD(), data);
} catch (const std::exception& e) {
- LOGE("Exception in method handler: " << e.what());
+ LOGE(mLogPrefix + "Exception in method handler: " << e.what());
removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
- LOGT("Sending return data; methodID: " << methodID << "; messageID: " << messageID);
+ LOGT(mLogPrefix + "Sending return data; methodID: " << methodID << "; messageID: " << messageID);
try {
// Send the call with the socket
Socket::Guard guard = socket.getGuard();
socket.write(&messageID, sizeof(messageID));
methodCallbacks->serialize(socket.getFD(), returnData);
} catch (const std::exception& e) {
- LOGE("Exception during serialization: " << e.what());
+ LOGE(mLogPrefix + "Exception during serialization: " << e.what());
removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
return true;
}
bool Processor::handleEvent()
{
- LOGS("Processor handleEvent");
+ LOGS(mLogPrefix + "Processor handleEvent");
Lock lock(mStateMutex);
- switch (mEventQueue.receive()) {
-
- case Event::FINISH: {
- LOGD("Event FINISH");
- mIsRunning = false;
- cleanCommunication();
- return false;
- }
-
- case Event::CALL: {
- LOGD("Event CALL");
- return onCall();
- }
+ auto request = mRequestQueue.pop();
+ LOGD(mLogPrefix + "Got: " << request.requestID);
- case Event::ADD_PEER: {
- LOGD("Event ADD_PEER");
- return onNewPeer();
- }
-
- case Event::REMOVE_PEER: {
- LOGD("Event REMOVE_PEER");
- return onRemovePeer();
- }
+ switch (request.requestID) {
+ case Event::METHOD: return onMethodRequest(*request.get<MethodRequest>());
+ case Event::SIGNAL: return onSignalRequest(*request.get<SignalRequest>());
+ case Event::ADD_PEER: return onAddPeerRequest(*request.get<AddPeerRequest>());
+ case Event::REMOVE_PEER: return onRemovePeerRequest(*request.get<RemovePeerRequest>());
+ case Event::FINISH: return onFinishRequest(*request.get<FinishRequest>());
}
return false;
}
-bool Processor::onNewPeer()
+bool Processor::onMethodRequest(MethodRequest& request)
{
- LOGS("Processor onNewPeer");
+ LOGS(mLogPrefix + "Processor onMethodRequest");
+ std::shared_ptr<Socket> socketPtr;
- // TODO: What if there is no newSocket? (request removed in the mean time)
- // Add new socket of the peer
- SocketInfo socketInfo = std::move(mNewSockets.front());
- mNewSockets.pop();
+ try {
+ // Get the peer's socket
+ socketPtr = mSockets.at(request.peerFD);
+ } catch (const std::out_of_range&) {
+ LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
+
+ // Pass the error to the processing callback
+ IGNORE_EXCEPTIONS(request.process(Status::PEER_DISCONNECTED, request.data));
- if (mSockets.size() > mMaxNumberOfPeers) {
- LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
- return false;
- }
- if (mSockets.count(socketInfo.peerFD) != 0) {
- LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
return false;
}
- mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
-
-
- // LOGW("Sending handled signals");
- std::vector<MethodID> ids;
- for (const auto kv : mSignalsCallbacks) {
- ids.push_back(kv.first);
+ if (mReturnCallbacks.count(request.messageID) != 0) {
+ LOGE(mLogPrefix + "There already was a return callback for messageID: " << request.messageID);
}
- auto data = std::make_shared<RegisterSignalsMessage>(ids);
- callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
- socketInfo.peerFD,
- data,
- discardResultHandler<EmptyData>);
- // LOGW("Sent handled signals");
+ mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(request.peerFD,
+ std::move(request.parse),
+ std::move(request.process)));
- resetPolling();
-
- if (mNewPeerCallback) {
- // Notify about the new user.
- LOGT("Calling NewPeerCallback");
- mNewPeerCallback(socketInfo.peerFD);
- }
+ try {
+ // Send the call with the socket
+ Socket::Guard guard = socketPtr->getGuard();
+ socketPtr->write(&request.methodID, sizeof(request.methodID));
+ socketPtr->write(&request.messageID, sizeof(request.messageID));
+ LOGT(mLogPrefix + "Serializing the message");
+ request.serialize(socketPtr->getFD(), request.data);
+ } catch (const std::exception& e) {
+ LOGE(mLogPrefix + "Error during sending a method: " << e.what());
- return true;
-}
+ // Inform about the error,
+ IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(Status::SERIALIZATION_ERROR, request.data));
-bool Processor::onRemovePeer()
-{
- LOGS("Processor onRemovePeer");
- removePeerInternal(mPeersToDelete.front().peerFD, Status::REMOVED_PEER);
+ mReturnCallbacks.erase(request.messageID);
+ removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
- mPeersToDelete.front().conditionPtr->notify_all();
- mPeersToDelete.pop();
- return true;
-}
+ return true;
-bool Processor::onCall()
-{
- LOGS("Processor onCall");
- CallQueue::Call call;
- try {
- call = std::move(mCalls.pop());
- } catch (const IPCException&) {
- LOGE("No calls to serve, but got an EVENT::CALL. Event got removed before serving");
- return false;
}
- if (call.parse && call.process) {
- return onMethodCall(call);
- } else {
- return onSignalCall(call);
- }
+ return false;
}
-bool Processor::onSignalCall(CallQueue::Call& call)
+bool Processor::onSignalRequest(SignalRequest& request)
{
- LOGS("Processor onSignalCall");
+ LOGS(mLogPrefix + "Processor onSignalRequest");
std::shared_ptr<Socket> socketPtr;
try {
// Get the peer's socket
- socketPtr = mSockets.at(call.peerFD);
+ socketPtr = mSockets.at(request.peerFD);
} catch (const std::out_of_range&) {
- LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
+ LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
return false;
}
try {
// Send the call with the socket
Socket::Guard guard = socketPtr->getGuard();
- socketPtr->write(&call.methodID, sizeof(call.methodID));
- socketPtr->write(&call.messageID, sizeof(call.messageID));
- call.serialize(socketPtr->getFD(), call.data);
+ socketPtr->write(&request.methodID, sizeof(request.methodID));
+ socketPtr->write(&request.messageID, sizeof(request.messageID));
+ request.serialize(socketPtr->getFD(), request.data);
} catch (const std::exception& e) {
- LOGE("Error during sending a signal: " << e.what());
+ LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
- removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
+ removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
return true;
}
return false;
}
-bool Processor::onMethodCall(CallQueue::Call& call)
+bool Processor::onAddPeerRequest(AddPeerRequest& request)
{
- LOGS("Processor onMethodCall");
- std::shared_ptr<Socket> socketPtr;
-
-
- try {
- // Get the peer's socket
- socketPtr = mSockets.at(call.peerFD);
- } catch (const std::out_of_range&) {
- LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
-
- // Pass the error to the processing callback
- IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
+ LOGS(mLogPrefix + "Processor onAddPeerRequest");
+ if (mSockets.size() > mMaxNumberOfPeers) {
+ LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerFD);
return false;
}
-
- if (mReturnCallbacks.count(call.messageID) != 0) {
- LOGE("There already was a return callback for messageID: " << call.messageID);
+ if (mSockets.count(request.peerFD) != 0) {
+ LOGE(mLogPrefix + "There already was a socket for peerFD: " << request.peerFD);
+ return false;
}
- mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
- std::move(call.parse),
- std::move(call.process)));
- try {
- // Send the call with the socket
- Socket::Guard guard = socketPtr->getGuard();
- socketPtr->write(&call.methodID, sizeof(call.methodID));
- socketPtr->write(&call.messageID, sizeof(call.messageID));
- LOGT("Serializing the message");
- call.serialize(socketPtr->getFD(), call.data);
- } catch (const std::exception& e) {
- LOGE("Error during sending a method: " << e.what());
+ mSockets[request.peerFD] = std::move(request.socketPtr);
- // Inform about the error,
- IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data));
+ // Sending handled signals
+ std::vector<MethodID> ids;
+ for (const auto kv : mSignalsCallbacks) {
+ ids.push_back(kv.first);
+ }
+ auto data = std::make_shared<RegisterSignalsMessage>(ids);
+ callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
+ request.peerFD,
+ data,
+ discardResultHandler<EmptyData>);
- mReturnCallbacks.erase(call.messageID);
- removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
- return true;
+ resetPolling();
+ if (mNewPeerCallback) {
+ // Notify about the new user.
+ LOGT(mLogPrefix + "Calling NewPeerCallback");
+ mNewPeerCallback(request.peerFD);
}
- return false;
+ LOGI(mLogPrefix + "New peer: " << request.peerFD);
+ return true;
}
-void Processor::cleanCommunication()
+bool Processor::onRemovePeerRequest(RemovePeerRequest& request)
{
- LOGS("Processor cleanCommunication");
+ LOGS(mLogPrefix + "Processor onRemovePeer");
- while (!mEventQueue.isEmpty()) {
- switch (mEventQueue.receive()) {
- case Event::FINISH: {
- LOGE("Event FINISH after FINISH");
- break;
- }
- case Event::CALL: {
- LOGW("Event CALL after FINISH");
- try {
- CallQueue::Call call = mCalls.pop();
- if (call.process) {
- IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
- }
- } catch (const IPCException&) {
- // No more calls
- }
- break;
- }
+ removePeerInternal(request.peerFD, Status::REMOVED_PEER);
+ request.conditionPtr->notify_all();
+
+ return true;
+}
+
+bool Processor::onFinishRequest(FinishRequest& request)
+{
+ LOGS(mLogPrefix + "Processor onFinishRequest");
+
+ // Clean the mRequestQueue
+ while (!mRequestQueue.isEmpty()) {
+ auto request = mRequestQueue.pop();
+ LOGE(mLogPrefix + "Got: " << request.requestID << " after FINISH");
- case Event::ADD_PEER: {
- LOGW("Event ADD_PEER after FINISH");
+ switch (request.requestID) {
+ case Event::METHOD: {
+ auto requestPtr = request.get<MethodRequest>();
+ IGNORE_EXCEPTIONS(requestPtr->process(Status::CLOSING, requestPtr->data));
break;
}
-
case Event::REMOVE_PEER: {
- LOGW("Event REMOVE_PEER after FINISH");
- mPeersToDelete.front().conditionPtr->notify_all();
- mPeersToDelete.pop();
+ request.get<RemovePeerRequest>()->conditionPtr->notify_all();
break;
}
+ case Event::SIGNAL:
+ case Event::ADD_PEER:
+ case Event::FINISH:
+ break;
}
}
+
+ mIsRunning = false;
+ request.conditionPtr->notify_all();
+ return true;
+}
+
+std::ostream& operator<<(std::ostream& os, const Processor::Event& event)
+{
+ switch (event) {
+
+ case Processor::Event::FINISH: {
+ os << "Event::FINISH";
+ break;
+ }
+
+ case Processor::Event::METHOD: {
+ os << "Event::METHOD";
+ break;
+ }
+
+ case Processor::Event::SIGNAL: {
+ os << "Event::SIGNAL";
+ break;
+ }
+
+ case Processor::Event::ADD_PEER: {
+ os << "Event::ADD_PEER";
+ break;
+ }
+
+ case Processor::Event::REMOVE_PEER: {
+ os << "Event::REMOVE_PEER";
+ break;
+ }
+ }
+
+ return os;
}
} // namespace ipc
#define COMMON_IPC_INTERNALS_PROCESSOR_HPP
#include "ipc/internals/socket.hpp"
-#include "ipc/internals/event-queue.hpp"
-#include "ipc/internals/call-queue.hpp"
+#include "ipc/internals/request-queue.hpp"
+#include "ipc/internals/method-request.hpp"
+#include "ipc/internals/signal-request.hpp"
+#include "ipc/internals/add-peer-request.hpp"
+#include "ipc/internals/remove-peer-request.hpp"
+#include "ipc/internals/finish-request.hpp"
#include "ipc/exception.hpp"
#include "ipc/types.hpp"
#include "config/manager.hpp"
#include "logger/logger.hpp"
#include "logger/logger-scope.hpp"
+#include <ostream>
#include <poll.h>
#include <condition_variable>
-#include <queue>
#include <mutex>
#include <chrono>
#include <vector>
* - Rest: The data written in a callback. One type per method.ReturnCallbacks
*
* TODO:
-* - some mutexes may not be needed
* - synchronous call to many peers
* - implement HandlerStore class for storing both signals and methods
* - API for removing signals
* - no new events added after stop() called
* - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
* there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
-* - EventQueue should store std::shared_ptr<void> and it should be the only queue to the Processor thread.
-* It should have an API for removing events from the middle of the queue
*
*/
class Processor {
+private:
+ enum class Event {
+ FINISH, // Shutdown request
+ METHOD, // New method call in the queue
+ SIGNAL, // New signal call in the queue
+ ADD_PEER, // New peer in the queue
+ REMOVE_PEER // Remove peer
+ };
+
public:
+
+ friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event);
+
/**
* Used to indicate a message with the return value.
*/
* @param newPeerCallback called when a new peer arrives
* @param removedPeerCallback called when the Processor stops listening for this peer
*/
- Processor(const PeerCallback& newPeerCallback = nullptr,
+ Processor(const std::string& logName = "",
+ const PeerCallback& newPeerCallback = nullptr,
const PeerCallback& removedPeerCallback = nullptr,
const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
~Processor();
Processor(Processor&&) = delete;
Processor& operator=(const Processor&) = delete;
+
/**
* Start the processing thread.
* Quits immediately after starting the thread.
+ *
+ * @param usesExternalPolling internal or external polling is used
*/
- void start();
+ void start(const bool usesExternalPolling);
/**
* @return is processor running
typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
typedef std::unique_lock<std::recursive_mutex> Lock;
+ typedef RequestQueue<Event>::Request Request;
struct EmptyData {
CONFIG_REGISTER_EMPTY
ResultHandler<void>::type process;
};
- struct SocketInfo {
- SocketInfo(const SocketInfo& other) = delete;
- SocketInfo& operator=(const SocketInfo&) = delete;
- SocketInfo() = default;
- SocketInfo(SocketInfo&&) = default;
- SocketInfo& operator=(SocketInfo &&) = default;
-
- SocketInfo(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
- : peerFD(peerFD), socketPtr(socketPtr) {}
-
- FileDescriptor peerFD;
- std::shared_ptr<Socket> socketPtr;
- };
-
- struct RemovePeerRequest {
- RemovePeerRequest(const RemovePeerRequest& other) = delete;
- RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
- RemovePeerRequest() = default;
- RemovePeerRequest(RemovePeerRequest&&) = default;
- RemovePeerRequest& operator=(RemovePeerRequest &&) = default;
-
- RemovePeerRequest(const FileDescriptor peerFD,
- const std::shared_ptr<std::condition_variable>& conditionPtr)
- : peerFD(peerFD), conditionPtr(conditionPtr) {}
-
- FileDescriptor peerFD;
- std::shared_ptr<std::condition_variable> conditionPtr;
- };
-
- enum class Event : int {
- FINISH, // Shutdown request
- CALL, // New method call in the queue
- ADD_PEER, // New peer in the queue
- REMOVE_PEER // Remove peer
- };
- EventQueue<Event> mEventQueue;
+ std::string mLogPrefix;
+ RequestQueue<Event> mRequestQueue;
bool mIsRunning;
-
- CallQueue mCalls;
std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
std::vector<struct pollfd> mFDs;
- std::queue<SocketInfo> mNewSockets;
- std::queue<RemovePeerRequest> mPeersToDelete;
std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
void run();
- bool onCall();
- bool onSignalCall(CallQueue::Call& call);
- bool onMethodCall(CallQueue::Call& call);
- bool onNewPeer();
- bool onRemovePeer();
+
+ // Request handlers
+ bool onMethodRequest(MethodRequest& request);
+ bool onSignalRequest(SignalRequest& request);
+ bool onAddPeerRequest(AddPeerRequest& request);
+ bool onRemovePeerRequest(RemovePeerRequest& request);
+ bool onFinishRequest(FinishRequest& request);
+
bool handleLostConnections();
bool handleInputs();
std::shared_ptr<RegisterSignalsMessage>& data);
- void cleanCommunication();
};
template<typename SentDataType, typename ReceivedDataType>
const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
{
if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
- LOGE("Forbidden methodID: " << methodID);
+ LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
throw IPCException("Forbidden methodID: " + std::to_string(methodID));
}
Lock lock(mStateMutex);
if (mSignalsCallbacks.count(methodID)) {
- LOGE("MethodID used by a signal: " << methodID);
+ LOGE(mLogPrefix + "MethodID used by a signal: " << methodID);
throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
}
const typename SignalHandler<ReceivedDataType>::type& handler)
{
if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
- LOGE("Forbidden methodID: " << methodID);
+ LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
throw IPCException("Forbidden methodID: " + std::to_string(methodID));
}
// Andd the signal handler:
if (mMethodsCallbacks.count(methodID)) {
- LOGE("MethodID used by a method: " << methodID);
+ LOGE(mLogPrefix + "MethodID used by a method: " << methodID);
throw IPCException("MethodID used by a method: " + std::to_string(methodID));
}
const typename ResultHandler<ReceivedDataType>::type& process)
{
Lock lock(mStateMutex);
- MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
- mEventQueue.send(Event::CALL);
-
- return messageID;
+ auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
+ mRequestQueue.push(Event::METHOD, request);
+ return request->messageID;
}
};
std::unique_lock<std::mutex> lock(mutex);
- LOGT("Waiting for the response...");
+ LOGT(mLogPrefix + "Waiting for the response...");
if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
- LOGW("Probably a timeout in callSync. Checking...");
+ LOGW(mLogPrefix + "Probably a timeout in callSync. Checking...");
bool isTimeout;
{
Lock lock(mStateMutex);
// Call isn't sent or call is sent but there is no reply
- isTimeout = mCalls.erase(messageID) || 1 == mReturnCallbacks.erase(messageID);
+ isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
+ return request.requestID == Event::METHOD &&
+ request.get<MethodRequest>()->messageID == messageID;
+ })
+ || mRequestQueue.removeIf([messageID](Request & request) {
+ return request.requestID == Event::SIGNAL &&
+ request.get<SignalRequest>()->messageID == messageID;
+ })
+ || 1 == mReturnCallbacks.erase(messageID);
}
-
if (isTimeout) {
- LOGE("Function call timeout; methodID: " << methodID);
+ LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
removePeer(peerFD);
throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
} else {
- LOGW("Timeout started during the return value processing, so wait for it to finish");
+ LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
- LOGE("Function call timeout; methodID: " << methodID);
+ LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
}
}
Lock lock(mStateMutex);
const auto it = mSignalsPeers.find(methodID);
if (it == mSignalsPeers.end()) {
- LOGW("No peer is handling signal with methodID: " << methodID);
+ LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID);
return;
}
for (const FileDescriptor peerFD : it->second) {
- mCalls.push<SentDataType>(methodID, peerFD, data);
- mEventQueue.send(Event::CALL);
+ auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
+ mRequestQueue.push(Event::SIGNAL, request);
}
}
+
} // namespace ipc
} // namespace vasum
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Processor's request to remove a peer
+ */
+
+#ifndef COMMON_IPC_INTERNALS_REMOVE_PEER_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_REMOVE_PEER_REQUEST_HPP
+
+#include "ipc/types.hpp"
+#include "ipc/internals/socket.hpp"
+#include <condition_variable>
+
+
+namespace vasum {
+namespace ipc {
+
+class RemovePeerRequest {
+public:
+ RemovePeerRequest(const RemovePeerRequest&) = delete;
+ RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
+
+ RemovePeerRequest(const FileDescriptor peerFD,
+ const std::shared_ptr<std::condition_variable_any>& conditionPtr)
+ : peerFD(peerFD),
+ conditionPtr(conditionPtr)
+ {
+ }
+
+ FileDescriptor peerFD;
+ std::shared_ptr<std::condition_variable_any> conditionPtr;
+};
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_REMOVE_PEER_REQUEST_HPP
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Managing the queue of messages carrying any kind of data
+ */
+
+#ifndef COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP
+#define COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP
+
+#include "ipc/exception.hpp"
+#include "ipc/internals/eventfd.hpp"
+#include "logger/logger.hpp"
+
+#include <list>
+#include <memory>
+#include <algorithm>
+
+namespace vasum {
+namespace ipc {
+
+/**
+* Class for managing a queue of Requests carrying any data
+*/
+template<typename RequestIdType>
+class RequestQueue {
+public:
+ RequestQueue() = default;
+
+ RequestQueue(const RequestQueue&) = delete;
+ RequestQueue& operator=(const RequestQueue&) = delete;
+
+ struct Request {
+ Request(const Request& other) = delete;
+ Request& operator=(const Request&) = delete;
+
+ Request(Request&&) = default;
+ Request(const RequestIdType requestID, const std::shared_ptr<void>& data)
+ : requestID(requestID),
+ data(data)
+ {}
+
+ template<typename DataType>
+ std::shared_ptr<DataType> get()
+ {
+ return std::static_pointer_cast<DataType>(data);
+ }
+
+ RequestIdType requestID;
+ std::shared_ptr<void> data;
+ };
+
+ /**
+ * @return event's file descriptor
+ */
+ int getFD() const;
+
+ /**
+ * @return is the queue empty
+ */
+ bool isEmpty() const;
+
+ /**
+ * Push data to the queue
+ *
+ * @param requestID request type
+ * @param data data corresponding to the request
+ */
+ void push(const RequestIdType requestID,
+ const std::shared_ptr<void>& data = nullptr);
+
+ /**
+ * @return get the data from the next request
+ */
+ Request pop();
+
+ /**
+ * Remove elements from the queue when the predicate returns true
+ *
+ * @param predicate condition
+ * @return was anything removed
+ */
+ template<typename Predicate>
+ bool removeIf(Predicate predicate);
+
+private:
+ std::list<Request> mRequests;
+ EventFD mEventFD;
+};
+
+template<typename RequestIdType>
+int RequestQueue<RequestIdType>::getFD() const
+{
+ return mEventFD.getFD();
+}
+
+template<typename RequestIdType>
+bool RequestQueue<RequestIdType>::isEmpty() const
+{
+ return mRequests.empty();
+}
+
+template<typename RequestIdType>
+void RequestQueue<RequestIdType>::push(const RequestIdType requestID,
+ const std::shared_ptr<void>& data)
+{
+ Request request(requestID, data);
+ mRequests.push_back(std::move(request));
+ mEventFD.send();
+}
+
+template<typename RequestIdType>
+typename RequestQueue<RequestIdType>::Request RequestQueue<RequestIdType>::pop()
+{
+ mEventFD.receive();
+ if (mRequests.empty()) {
+ LOGE("Request queue is empty");
+ throw IPCException("Request queue is empty");
+ }
+ Request request = std::move(mRequests.front());
+ mRequests.pop_front();
+ return request;
+}
+
+template<typename RequestIdType>
+template<typename Predicate>
+bool RequestQueue<RequestIdType>::removeIf(Predicate predicate)
+{
+ auto it = std::find_if(mRequests.begin(), mRequests.end(), predicate);
+ if (it == mRequests.end()) {
+ return false;
+ }
+
+ do {
+ it = mRequests.erase(it);
+ it = std::find_if(it, mRequests.end(), predicate);
+ } while (it != mRequests.end());
+
+ return true;
+}
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Processor's request to send a signal
+ */
+
+#ifndef COMMON_IPC_INTERNALS_SIGNAL_REQUEST_HPP
+#define COMMON_IPC_INTERNALS_SIGNAL_REQUEST_HPP
+
+#include "ipc/types.hpp"
+#include "config/manager.hpp"
+#include "logger/logger-scope.hpp"
+
+namespace vasum {
+namespace ipc {
+
+class SignalRequest {
+public:
+ SignalRequest(const SignalRequest&) = delete;
+ SignalRequest& operator=(const SignalRequest&) = delete;
+
+
+
+ template<typename SentDataType>
+ static std::shared_ptr<SignalRequest> create(const MethodID methodID,
+ const FileDescriptor peerFD,
+ const std::shared_ptr<SentDataType>& data);
+
+ MethodID methodID;
+ FileDescriptor peerFD;
+ MessageID messageID;
+ std::shared_ptr<void> data;
+ SerializeCallback serialize;
+
+private:
+ SignalRequest(const MethodID methodID, const FileDescriptor peerFD)
+ : methodID(methodID),
+ peerFD(peerFD),
+ messageID(getNextMessageID())
+ {}
+
+};
+
+template<typename SentDataType>
+std::shared_ptr<SignalRequest> SignalRequest::create(const MethodID methodID,
+ const FileDescriptor peerFD,
+ const std::shared_ptr<SentDataType>& data)
+{
+ std::shared_ptr<SignalRequest> request(new SignalRequest(methodID, peerFD));
+
+ request->data = data;
+
+ request->serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+ LOGS("Signal serialize, peerFD: " << fd);
+ config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
+ };
+
+ return request;
+}
+
+} // namespace ipc
+} // namespace vasum
+
+#endif // COMMON_IPC_INTERNALS_SIGNAL_REQUEST_HPP
Service::Service(const std::string& socketPath,
const PeerCallback& addPeerCallback,
const PeerCallback& removePeerCallback)
- : mProcessor(addPeerCallback, removePeerCallback),
+ : mProcessor("[SERVICE] ", addPeerCallback, removePeerCallback),
mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1))
{
}
}
-void Service::start()
+void Service::start(const bool usesExternalPolling)
{
LOGS("Service start");
- mProcessor.start();
+ mProcessor.start(usesExternalPolling);
// There can be an incoming connection from mAcceptor before mProcessor is listening,
// but it's OK. It will handle the connection when ready. So no need to wait for mProcessor.
- mAcceptor.start();
+ if (!usesExternalPolling) {
+ mAcceptor.start();
+ }
}
bool Service::isStarted()
/**
* Starts the worker and acceptor threads
+ *
+ * @param usesExternalPolling internal or external polling is used
*/
- void start();
+ void start(const bool usesExternalPolling = false);
/**
* @return is the communication thread running
#include "ipc/types.hpp"
#include "logger/logger.hpp"
+#include <atomic>
namespace vasum {
namespace ipc {
+namespace {
+std::atomic<MessageID> gLastMessageID(0);
+} // namespace
+
+MessageID getNextMessageID()
+{
+ return ++gLastMessageID;
+}
+
std::string toString(const Status status)
{
switch (status) {
typedef unsigned int MessageID;
typedef std::function<void(FileDescriptor)> PeerCallback;
+typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
+typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
enum class Status : int {
OK = 0,
std::string toString(const Status status);
void throwOnError(const Status status);
+MessageID getNextMessageID();
+
template<typename SentDataType, typename ReceivedDataType>
struct MethodHandler {
const int SHORT_OPERATION_TIME = TIMEOUT / 100;
// Time that will cause "TIMEOUT" methods to throw
-const int LONG_OPERATION_TIME = 500 + TIMEOUT;
+const int LONG_OPERATION_TIME = 1000 + TIMEOUT;
struct Fixture {
std::string socketPath;
// TODO: On timeout remove the callback
s.setNewPeerCallback(newPeerCallback);
s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, ipcGSourcePtr, _1));
-
+ s.start(true);
// Service starts to process
ipcGSourcePtr->attach();
}
- c.connect();
+ c.start(true);
IPCGSource::Pointer ipcGSourcePtr = IPCGSource::create(c.getFDs(),
std::bind(&Client::handle, &c, _1, _2));