From e5af6bc538db0cfc3995169f801430df9fb6fc2a Mon Sep 17 00:00:00 2001 From: Jan Olszak Date: Thu, 15 Jan 2015 16:37:06 +0100 Subject: [PATCH] IPC: Single request queue [Bug/Feature] Single queue for passing data between threads Prefixes in loggs inside Processor Destructor always waits till Processor ends [Cause] N/A [Solution] N/A [Verification] Build, install, run tests, run tests under valgrind Change-Id: Idc31496559b46e836528843dfc411cbdeaf259e0 --- common/ipc/client.cpp | 14 +- common/ipc/client.hpp | 11 +- common/ipc/internals/add-peer-request.hpp | 52 ++++ common/ipc/internals/call-queue.cpp | 81 ----- common/ipc/internals/call-queue.hpp | 163 ---------- common/ipc/internals/finish-request.hpp | 48 +++ common/ipc/internals/method-request.hpp | 97 ++++++ common/ipc/internals/processor.cpp | 448 ++++++++++++++------------- common/ipc/internals/processor.hpp | 133 ++++---- common/ipc/internals/remove-peer-request.hpp | 55 ++++ common/ipc/internals/request-queue.hpp | 161 ++++++++++ common/ipc/internals/signal-request.hpp | 82 +++++ common/ipc/service.cpp | 10 +- common/ipc/service.hpp | 4 +- common/ipc/types.cpp | 10 + common/ipc/types.hpp | 4 + tests/unit_tests/ipc/ut-ipc.cpp | 6 +- 17 files changed, 821 insertions(+), 558 deletions(-) create mode 100644 common/ipc/internals/add-peer-request.hpp delete mode 100644 common/ipc/internals/call-queue.cpp delete mode 100644 common/ipc/internals/call-queue.hpp create mode 100644 common/ipc/internals/finish-request.hpp create mode 100644 common/ipc/internals/method-request.hpp create mode 100644 common/ipc/internals/remove-peer-request.hpp create mode 100644 common/ipc/internals/request-queue.hpp create mode 100644 common/ipc/internals/signal-request.hpp diff --git a/common/ipc/client.cpp b/common/ipc/client.cpp index 8455b19..16f77e6 100644 --- a/common/ipc/client.cpp +++ b/common/ipc/client.cpp @@ -32,7 +32,8 @@ namespace vasum { namespace ipc { Client::Client(const std::string& socketPath) - : mSocketPath(socketPath) + : mProcessor("[CLIENT] "), + mSocketPath(socketPath) { LOGS("Client Constructor"); } @@ -47,19 +48,14 @@ Client::~Client() } } -void Client::connect() +void Client::start(const bool usesExternalPolling) { + LOGS("Client start"); // Initialize the connection with the server LOGD("Connecting to " + mSocketPath); auto socketPtr = std::make_shared(Socket::connectSocket(mSocketPath)); mServiceFD = mProcessor.addPeer(socketPtr); -} - -void Client::start() -{ - LOGS("Client start"); - connect(); - mProcessor.start(); + mProcessor.start(usesExternalPolling); } bool Client::isStarted() diff --git a/common/ipc/client.hpp b/common/ipc/client.hpp index de847a9..7b86198 100644 --- a/common/ipc/client.hpp +++ b/common/ipc/client.hpp @@ -55,16 +55,11 @@ public: Client& operator=(const Client&) = delete; /** - * Places a connection request in the internal event queue. - * - * Used with an external polling loop. - */ - void connect(); - - /** * Starts the worker thread + * + * @param usesExternalPolling internal or external polling is used */ - void start(); + void start(const bool usesExternalPolling = false); /** * @return is the communication thread running diff --git a/common/ipc/internals/add-peer-request.hpp b/common/ipc/internals/add-peer-request.hpp new file mode 100644 index 0000000..05c5524 --- /dev/null +++ b/common/ipc/internals/add-peer-request.hpp @@ -0,0 +1,52 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Processor's request to add a peer + */ + +#ifndef COMMON_IPC_INTERNALS_ADD_PEER_REQUEST_HPP +#define COMMON_IPC_INTERNALS_ADD_PEER_REQUEST_HPP + +#include "ipc/types.hpp" +#include "ipc/internals/socket.hpp" + +namespace vasum { +namespace ipc { + +class AddPeerRequest { +public: + AddPeerRequest(const AddPeerRequest&) = delete; + AddPeerRequest& operator=(const AddPeerRequest&) = delete; + + AddPeerRequest(const FileDescriptor peerFD, const std::shared_ptr& socketPtr) + : peerFD(peerFD), + socketPtr(socketPtr) + { + } + + FileDescriptor peerFD; + std::shared_ptr socketPtr; +}; + +} // namespace ipc +} // namespace vasum + +#endif // COMMON_IPC_INTERNALS_ADD_PEER_REQUEST_HPP diff --git a/common/ipc/internals/call-queue.cpp b/common/ipc/internals/call-queue.cpp deleted file mode 100644 index 70871e5..0000000 --- a/common/ipc/internals/call-queue.cpp +++ /dev/null @@ -1,81 +0,0 @@ -/* -* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved -* -* Contact: Jan Olszak -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License -*/ - -/** - * @file - * @author Jan Olszak (j.olszak@samsung.com) - * @brief Managing the queue with calls - */ - -#include "config.hpp" - -#include "ipc/internals/call-queue.hpp" -#include "ipc/exception.hpp" -#include "logger/logger.hpp" -#include - -namespace vasum { -namespace ipc { - -CallQueue::CallQueue() - : mMessageIDCounter(0) -{ -} - -CallQueue::~CallQueue() -{ -} - -bool CallQueue::isEmpty() const -{ - return mCalls.empty(); -} - -MessageID CallQueue::getNextMessageID() -{ - // TODO: This method of generating UIDs is buggy. To be changed. - return ++mMessageIDCounter; -} - -bool CallQueue::erase(const MessageID messageID) -{ - LOGT("Erase messgeID: " << messageID); - auto it = std::find(mCalls.begin(), mCalls.end(), messageID); - if (it == mCalls.end()) { - LOGT("No such messgeID"); - return false; - } - - mCalls.erase(it); - LOGT("Erased"); - return true; -} - -CallQueue::Call CallQueue::pop() -{ - if (isEmpty()) { - LOGE("CallQueue is empty"); - throw IPCException("CallQueue is empty"); - } - Call call = std::move(mCalls.front()); - mCalls.pop_front(); - return call; -} - -} // namespace ipc -} // namespace vasum diff --git a/common/ipc/internals/call-queue.hpp b/common/ipc/internals/call-queue.hpp deleted file mode 100644 index a6e45ed..0000000 --- a/common/ipc/internals/call-queue.hpp +++ /dev/null @@ -1,163 +0,0 @@ -/* -* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved -* -* Contact: Jan Olszak -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License -*/ - -/** - * @file - * @author Jan Olszak (j.olszak@samsung.com) - * @brief Managing the queue with calls - */ - -#ifndef COMMON_IPC_INTERNALS_CALL_QUEUE_HPP -#define COMMON_IPC_INTERNALS_CALL_QUEUE_HPP - -#include "ipc/types.hpp" -#include "config/manager.hpp" -#include "logger/logger-scope.hpp" - -#include -#include - -namespace vasum { -namespace ipc { - -/** -* Class for managing a queue of calls in the Processor -*/ -class CallQueue { -public: - typedef std::function& data)> SerializeCallback; - typedef std::function(int fd)> ParseCallback; - - struct Call { - Call(const Call& other) = delete; - Call& operator=(const Call&) = delete; - Call& operator=(Call&&) = default; - Call() = default; - Call(Call&&) = default; - - bool operator==(const MessageID m) - { - return m == messageID; - } - - FileDescriptor peerFD; - MethodID methodID; - MessageID messageID; - std::shared_ptr data; - SerializeCallback serialize; - ParseCallback parse; - ResultHandler::type process; - }; - - CallQueue(); - ~CallQueue(); - - CallQueue(const CallQueue&) = delete; - CallQueue(CallQueue&&) = delete; - CallQueue& operator=(const CallQueue&) = delete; - - template - MessageID push(const MethodID methodID, - const FileDescriptor peerFD, - const std::shared_ptr& data, - const typename ResultHandler::type& process); - - - template - MessageID push(const MethodID methodID, - const FileDescriptor peerFD, - const std::shared_ptr& data); - - Call pop(); - - bool erase(const MessageID messageID); - - bool isEmpty() const; - -private: - std::list mCalls; - std::atomic mMessageIDCounter; - - MessageID getNextMessageID(); -}; - - -template -MessageID CallQueue::push(const MethodID methodID, - const FileDescriptor peerFD, - const std::shared_ptr& data, - const typename ResultHandler::type& process) -{ - Call call; - call.methodID = methodID; - call.peerFD = peerFD; - call.data = data; - - MessageID messageID = getNextMessageID(); - call.messageID = messageID; - - call.serialize = [](const int fd, std::shared_ptr& data)->void { - LOGS("Method serialize, peerFD: " << fd); - config::saveToFD(fd, *std::static_pointer_cast(data)); - }; - - call.parse = [](const int fd)->std::shared_ptr { - LOGS("Method parse, peerFD: " << fd); - std::shared_ptr data(new ReceivedDataType()); - config::loadFromFD(fd, *data); - return data; - }; - - call.process = [process](Status status, std::shared_ptr& data)->void { - LOGS("Method process, status: " << toString(status)); - std::shared_ptr tmpData = std::static_pointer_cast(data); - return process(status, tmpData); - }; - - mCalls.push_back(std::move(call)); - - return messageID; -} - -template -MessageID CallQueue::push(const MethodID methodID, - const FileDescriptor peerFD, - const std::shared_ptr& data) -{ - Call call; - call.methodID = methodID; - call.peerFD = peerFD; - call.data = data; - - MessageID messageID = getNextMessageID(); - call.messageID = messageID; - - call.serialize = [](const int fd, std::shared_ptr& data)->void { - LOGS("Signal serialize, peerFD: " << fd); - config::saveToFD(fd, *std::static_pointer_cast(data)); - }; - - mCalls.push_back(std::move(call)); - - return messageID; -} - -} // namespace ipc -} // namespace vasum - -#endif // COMMON_IPC_INTERNALS_CALL_QUEUE_HPP diff --git a/common/ipc/internals/finish-request.hpp b/common/ipc/internals/finish-request.hpp new file mode 100644 index 0000000..3019475 --- /dev/null +++ b/common/ipc/internals/finish-request.hpp @@ -0,0 +1,48 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Managing the queue with requests + */ + +#ifndef COMMON_IPC_INTERNALS_FINISH_REQUEST_HPP +#define COMMON_IPC_INTERNALS_FINISH_REQUEST_HPP + +#include + +namespace vasum { +namespace ipc { + +class FinishRequest { +public: + FinishRequest(const FinishRequest&) = delete; + FinishRequest& operator=(const FinishRequest&) = delete; + + FinishRequest(const std::shared_ptr& conditionPtr) + : conditionPtr(conditionPtr) + {} + + std::shared_ptr conditionPtr; +}; + +} // namespace ipc +} // namespace vasum + +#endif // COMMON_IPC_INTERNALS_FINISH_REQUEST_HPP diff --git a/common/ipc/internals/method-request.hpp b/common/ipc/internals/method-request.hpp new file mode 100644 index 0000000..f9860f7 --- /dev/null +++ b/common/ipc/internals/method-request.hpp @@ -0,0 +1,97 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Processor's request to call a method + */ + +#ifndef COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP +#define COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP + +#include "ipc/types.hpp" +#include "logger/logger-scope.hpp" +#include "config/manager.hpp" + +namespace vasum { +namespace ipc { + +class MethodRequest { +public: + MethodRequest(const MethodRequest&) = delete; + MethodRequest& operator=(const MethodRequest&) = delete; + + template + static std::shared_ptr create(const MethodID methodID, + const FileDescriptor peerFD, + const std::shared_ptr& data, + const typename ResultHandler::type& process); + + MethodID methodID; + FileDescriptor peerFD; + MessageID messageID; + std::shared_ptr data; + SerializeCallback serialize; + ParseCallback parse; + ResultHandler::type process; + +private: + MethodRequest(const MethodID methodID, const FileDescriptor peerFD) + : methodID(methodID), + peerFD(peerFD), + messageID(getNextMessageID()) + {} +}; + + +template +std::shared_ptr MethodRequest::create(const MethodID methodID, + const FileDescriptor peerFD, + const std::shared_ptr& data, + const typename ResultHandler::type& process) +{ + std::shared_ptr request(new MethodRequest(methodID, peerFD)); + + request->data = data; + + request->serialize = [](const int fd, std::shared_ptr& data)->void { + LOGS("Method serialize, peerFD: " << fd); + config::saveToFD(fd, *std::static_pointer_cast(data)); + }; + + request->parse = [](const int fd)->std::shared_ptr { + LOGS("Method parse, peerFD: " << fd); + std::shared_ptr data(new ReceivedDataType()); + config::loadFromFD(fd, *data); + return data; + }; + + request->process = [process](Status status, std::shared_ptr& data)->void { + LOGS("Method process, status: " << toString(status)); + std::shared_ptr tmpData = std::static_pointer_cast(data); + return process(status, tmpData); + }; + + return request; +} + +} // namespace ipc +} // namespace vasum + +#endif // COMMON_IPC_INTERNALS_METHOD_REQUEST_HPP diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index d1e829e..05c97aa 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -46,20 +46,23 @@ namespace ipc { expr; \ } \ catch (const std::exception& e){ \ - LOGE("Callback threw an error: " << e.what()); \ + LOGE(mLogPrefix + "Callback threw an error: " << e.what()); \ } const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits::max(); const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits::max() - 1; -Processor::Processor(const PeerCallback& newPeerCallback, +Processor::Processor(const std::string& logName, + const PeerCallback& newPeerCallback, const PeerCallback& removedPeerCallback, const unsigned int maxNumberOfPeers) - : mNewPeerCallback(newPeerCallback), + : mLogPrefix(logName), + mIsRunning(false), + mNewPeerCallback(newPeerCallback), mRemovedPeerCallback(removedPeerCallback), mMaxNumberOfPeers(maxNumberOfPeers) { - LOGS("Processor Constructor"); + LOGS(mLogPrefix + "Processor Constructor"); utils::signalBlock(SIGPIPE); using namespace std::placeholders; @@ -69,39 +72,57 @@ Processor::Processor(const PeerCallback& newPeerCallback, Processor::~Processor() { - LOGS("Processor Destructor"); + LOGS(mLogPrefix + "Processor Destructor"); try { stop(); } catch (IPCException& e) { - LOGE("Error in Processor's destructor: " << e.what()); + LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what()); } } bool Processor::isStarted() { - return mThread.joinable(); + Lock lock(mStateMutex); + return mIsRunning; } -void Processor::start() +void Processor::start(bool usesExternalPolling) { - LOGS("Processor start"); + LOGS(mLogPrefix + "Processor start"); + Lock lock(mStateMutex); if (!isStarted()) { - mThread = std::thread(&Processor::run, this); + LOGI(mLogPrefix + "Processor start"); + mIsRunning = true; + if (!usesExternalPolling) { + mThread = std::thread(&Processor::run, this); + } } } void Processor::stop() { - LOGS("Processor stop"); + LOGS(mLogPrefix + "Processor stop"); if (isStarted()) { + auto conditionPtr = std::make_shared(); { Lock lock(mStateMutex); - mEventQueue.send(Event::FINISH); + auto request = std::make_shared(conditionPtr); + mRequestQueue.push(Event::FINISH, request); + } + + LOGD(mLogPrefix + "Waiting for the Processor to stop"); + + if (mThread.joinable()) { + mThread.join(); + } else { + // Wait till the FINISH request is served + Lock lock(mStateMutex); + conditionPtr->wait(lock, [this]() { + return !isStarted(); + }); } - LOGT("Waiting for the Processor to stop"); - mThread.join(); } } @@ -120,7 +141,7 @@ void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback) FileDescriptor Processor::getEventFD() { Lock lock(mStateMutex); - return mEventQueue.getFD(); + return mRequestQueue.getFD(); } void Processor::removeMethod(const MethodID methodID) @@ -131,51 +152,53 @@ void Processor::removeMethod(const MethodID methodID) FileDescriptor Processor::addPeer(const std::shared_ptr& socketPtr) { - LOGS("Processor addPeer"); + LOGS(mLogPrefix + "Processor addPeer"); Lock lock(mStateMutex); + FileDescriptor peerFD = socketPtr->getFD(); - SocketInfo socketInfo(peerFD, std::move(socketPtr)); - mNewSockets.push(std::move(socketInfo)); - mEventQueue.send(Event::ADD_PEER); + auto request = std::make_shared(peerFD, socketPtr); + mRequestQueue.push(Event::ADD_PEER, request); - LOGI("New peer added. Id: " << peerFD); + LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD); return peerFD; } void Processor::removePeer(const FileDescriptor peerFD) { - LOGS("Processor removePeer peerFD: " << peerFD); - - // TODO: Remove ADD_PEER event if it's not processed + LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD); + { + Lock lock(mStateMutex); + mRequestQueue.removeIf([peerFD](Request & request) { + return request.requestID == Event::ADD_PEER && + request.get()->peerFD == peerFD; + }); + } // Remove peer and wait till he's gone - std::shared_ptr conditionPtr(new std::condition_variable()); + std::shared_ptr conditionPtr(new std::condition_variable_any()); { Lock lock(mStateMutex); - RemovePeerRequest request(peerFD, conditionPtr); - mPeersToDelete.push(std::move(request)); - mEventQueue.send(Event::REMOVE_PEER); + auto request = std::make_shared(peerFD, conditionPtr); + mRequestQueue.push(Event::REMOVE_PEER, request); } auto isPeerDeleted = [&peerFD, this]()->bool { - Lock lock(mStateMutex); return mSockets.count(peerFD) == 0; }; - std::mutex mutex; - std::unique_lock lock(mutex); + Lock lock(mStateMutex); conditionPtr->wait(lock, isPeerDeleted); } void Processor::removePeerInternal(const FileDescriptor peerFD, Status status) { - LOGS("Processor removePeerInternal peerFD: " << peerFD); - LOGI("Removing peer. peerFD: " << peerFD); + LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD); + LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD); if (!mSockets.erase(peerFD)) { - LOGW("No such peer. Another thread called removePeerInternal"); + LOGW(mLogPrefix + "No such peer. Another thread called removePeerInternal"); return; } @@ -211,21 +234,25 @@ void Processor::removePeerInternal(const FileDescriptor peerFD, Status status) void Processor::resetPolling() { + LOGS(mLogPrefix + "Processor resetPolling"); + if (!isStarted()) { + LOGW(mLogPrefix + "Processor not started! Polling not reset!"); return; } { Lock lock(mStateMutex); - + LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size()); // Setup polling on eventfd and sockets mFDs.resize(mSockets.size() + 1); - mFDs[0].fd = mEventQueue.getFD(); + mFDs[0].fd = mRequestQueue.getFD(); mFDs[0].events = POLLIN; auto socketIt = mSockets.begin(); for (unsigned int i = 1; i < mFDs.size(); ++i) { + LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD()); mFDs[i].fd = socketIt->second->getFD(); mFDs[i].events = POLLIN | POLLHUP; // Listen for input events ++socketIt; @@ -236,20 +263,19 @@ void Processor::resetPolling() void Processor::run() { - LOGS("Processor run"); + LOGS(mLogPrefix + "Processor run"); resetPolling(); - mIsRunning = true; - while (mIsRunning) { - LOGT("Waiting for communication..."); + while (isStarted()) { + LOGT(mLogPrefix + "Waiting for communication..."); int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/); - LOGT("... incoming communication!"); + LOGT(mLogPrefix + "... incoming communication!"); if (ret == -1 || ret == 0) { if (errno == EINTR) { continue; } - LOGE("Error in poll: " << std::string(strerror(errno))); + LOGE(mLogPrefix + "Error in poll: " << std::string(strerror(errno))); throw IPCException("Error in poll: " + std::string(strerror(errno))); } @@ -285,7 +311,7 @@ bool Processor::handleLostConnections() { for (unsigned int i = 1; i < mFDs.size(); ++i) { if (mFDs[i].revents & POLLHUP) { - LOGI("Lost connection to peer: " << mFDs[i].fd); + LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd); mFDs[i].revents &= ~(POLLHUP); removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED); isPeerRemoved = true; @@ -320,7 +346,7 @@ bool Processor::handleInputs() bool Processor::handleInput(const FileDescriptor peerFD) { - LOGS("Processor handleInput peerFD: " << peerFD); + LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD); Lock lock(mStateMutex); std::shared_ptr socketPtr; @@ -328,7 +354,7 @@ bool Processor::handleInput(const FileDescriptor peerFD) // Get the peer's socket socketPtr = mSockets.at(peerFD); } catch (const std::out_of_range&) { - LOGE("No such peer: " << peerFD); + LOGE(mLogPrefix + "No such peer: " << peerFD); return false; } @@ -341,7 +367,7 @@ bool Processor::handleInput(const FileDescriptor peerFD) socketPtr->read(&messageID, sizeof(messageID)); } catch (const IPCException& e) { - LOGE("Error during reading the socket"); + LOGE(mLogPrefix + "Error during reading the socket"); removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER); return true; } @@ -362,7 +388,7 @@ bool Processor::handleInput(const FileDescriptor peerFD) } else { // Nothing - LOGW("No method or signal callback for methodID: " << methodID); + LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID); removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER); return true; } @@ -373,7 +399,7 @@ bool Processor::handleInput(const FileDescriptor peerFD) std::shared_ptr Processor::onNewSignals(const FileDescriptor peerFD, std::shared_ptr& data) { - LOGS("Processor onNewSignals peerFD: " << peerFD); + LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD); for (const MethodID methodID : data->ids) { mSignalsPeers[methodID].push_back(peerFD); @@ -385,35 +411,35 @@ std::shared_ptr Processor::onNewSignals(const FileDescript bool Processor::onReturnValue(const Socket& socket, const MessageID messageID) { - LOGS("Processor onReturnValue messageID: " << messageID); + LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID); - // LOGI("Return value for messageID: " << messageID); + // LOGI(mLogPrefix + "Return value for messageID: " << messageID); ReturnCallbacks returnCallbacks; try { - LOGT("Getting the return callback"); + LOGT(mLogPrefix + "Getting the return callback"); returnCallbacks = std::move(mReturnCallbacks.at(messageID)); mReturnCallbacks.erase(messageID); } catch (const std::out_of_range&) { - LOGW("No return callback for messageID: " << messageID); + LOGW(mLogPrefix + "No return callback for messageID: " << messageID); removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER); return true; } std::shared_ptr data; try { - LOGT("Parsing incoming return data"); + LOGT(mLogPrefix + "Parsing incoming return data"); data = returnCallbacks.parse(socket.getFD()); } catch (const std::exception& e) { - LOGE("Exception during parsing: " << e.what()); + LOGE(mLogPrefix + "Exception during parsing: " << e.what()); IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data)); removePeerInternal(socket.getFD(), Status::PARSING_ERROR); return true; } - // LOGT("Process return value callback for messageID: " << messageID); + // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID); IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data)); - // LOGT("Return value for messageID: " << messageID << " processed"); + // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed"); return false; } @@ -422,25 +448,25 @@ bool Processor::onRemoteSignal(const Socket& socket, const MessageID messageID, std::shared_ptr signalCallbacks) { - LOGS("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID); + LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID); - // LOGI("Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID); + // LOGI(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID); std::shared_ptr data; try { - LOGT("Parsing incoming data"); + LOGT(mLogPrefix + "Parsing incoming data"); data = signalCallbacks->parse(socket.getFD()); } catch (const std::exception& e) { - LOGE("Exception during parsing: " << e.what()); + LOGE(mLogPrefix + "Exception during parsing: " << e.what()); removePeerInternal(socket.getFD(), Status::PARSING_ERROR); return true; } - // LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID); + // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID); try { signalCallbacks->signal(socket.getFD(), data); } catch (const std::exception& e) { - LOGE("Exception in method handler: " << e.what()); + LOGE(mLogPrefix + "Exception in method handler: " << e.what()); removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER); return true; } @@ -453,30 +479,30 @@ bool Processor::onRemoteCall(const Socket& socket, const MessageID messageID, std::shared_ptr methodCallbacks) { - LOGS("Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID); - // LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID); + LOGS(mLogPrefix + "Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID); + // LOGI(mLogPrefix + "Remote call; methodID: " << methodID << " messageID: " << messageID); std::shared_ptr data; try { - LOGT("Parsing incoming data"); + LOGT(mLogPrefix + "Parsing incoming data"); data = methodCallbacks->parse(socket.getFD()); } catch (const std::exception& e) { - LOGE("Exception during parsing: " << e.what()); + LOGE(mLogPrefix + "Exception during parsing: " << e.what()); removePeerInternal(socket.getFD(), Status::PARSING_ERROR); return true; } - LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID); + LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID); std::shared_ptr returnData; try { returnData = methodCallbacks->method(socket.getFD(), data); } catch (const std::exception& e) { - LOGE("Exception in method handler: " << e.what()); + LOGE(mLogPrefix + "Exception in method handler: " << e.what()); removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER); return true; } - LOGT("Sending return data; methodID: " << methodID << "; messageID: " << messageID); + LOGT(mLogPrefix + "Sending return data; methodID: " << methodID << "; messageID: " << messageID); try { // Send the call with the socket Socket::Guard guard = socket.getGuard(); @@ -484,7 +510,7 @@ bool Processor::onRemoteCall(const Socket& socket, socket.write(&messageID, sizeof(messageID)); methodCallbacks->serialize(socket.getFD(), returnData); } catch (const std::exception& e) { - LOGE("Exception during serialization: " << e.what()); + LOGE(mLogPrefix + "Exception during serialization: " << e.what()); removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR); return true; } @@ -494,225 +520,213 @@ bool Processor::onRemoteCall(const Socket& socket, bool Processor::handleEvent() { - LOGS("Processor handleEvent"); + LOGS(mLogPrefix + "Processor handleEvent"); Lock lock(mStateMutex); - switch (mEventQueue.receive()) { - - case Event::FINISH: { - LOGD("Event FINISH"); - mIsRunning = false; - cleanCommunication(); - return false; - } - - case Event::CALL: { - LOGD("Event CALL"); - return onCall(); - } + auto request = mRequestQueue.pop(); + LOGD(mLogPrefix + "Got: " << request.requestID); - case Event::ADD_PEER: { - LOGD("Event ADD_PEER"); - return onNewPeer(); - } - - case Event::REMOVE_PEER: { - LOGD("Event REMOVE_PEER"); - return onRemovePeer(); - } + switch (request.requestID) { + case Event::METHOD: return onMethodRequest(*request.get()); + case Event::SIGNAL: return onSignalRequest(*request.get()); + case Event::ADD_PEER: return onAddPeerRequest(*request.get()); + case Event::REMOVE_PEER: return onRemovePeerRequest(*request.get()); + case Event::FINISH: return onFinishRequest(*request.get()); } return false; } -bool Processor::onNewPeer() +bool Processor::onMethodRequest(MethodRequest& request) { - LOGS("Processor onNewPeer"); + LOGS(mLogPrefix + "Processor onMethodRequest"); + std::shared_ptr socketPtr; - // TODO: What if there is no newSocket? (request removed in the mean time) - // Add new socket of the peer - SocketInfo socketInfo = std::move(mNewSockets.front()); - mNewSockets.pop(); + try { + // Get the peer's socket + socketPtr = mSockets.at(request.peerFD); + } catch (const std::out_of_range&) { + LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD); + + // Pass the error to the processing callback + IGNORE_EXCEPTIONS(request.process(Status::PEER_DISCONNECTED, request.data)); - if (mSockets.size() > mMaxNumberOfPeers) { - LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD); - return false; - } - if (mSockets.count(socketInfo.peerFD) != 0) { - LOGE("There already was a socket for peerFD: " << socketInfo.peerFD); return false; } - mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr); - - - // LOGW("Sending handled signals"); - std::vector ids; - for (const auto kv : mSignalsCallbacks) { - ids.push_back(kv.first); + if (mReturnCallbacks.count(request.messageID) != 0) { + LOGE(mLogPrefix + "There already was a return callback for messageID: " << request.messageID); } - auto data = std::make_shared(ids); - callAsync(REGISTER_SIGNAL_METHOD_ID, - socketInfo.peerFD, - data, - discardResultHandler); - // LOGW("Sent handled signals"); + mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(request.peerFD, + std::move(request.parse), + std::move(request.process))); - resetPolling(); - - if (mNewPeerCallback) { - // Notify about the new user. - LOGT("Calling NewPeerCallback"); - mNewPeerCallback(socketInfo.peerFD); - } + try { + // Send the call with the socket + Socket::Guard guard = socketPtr->getGuard(); + socketPtr->write(&request.methodID, sizeof(request.methodID)); + socketPtr->write(&request.messageID, sizeof(request.messageID)); + LOGT(mLogPrefix + "Serializing the message"); + request.serialize(socketPtr->getFD(), request.data); + } catch (const std::exception& e) { + LOGE(mLogPrefix + "Error during sending a method: " << e.what()); - return true; -} + // Inform about the error, + IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(Status::SERIALIZATION_ERROR, request.data)); -bool Processor::onRemovePeer() -{ - LOGS("Processor onRemovePeer"); - removePeerInternal(mPeersToDelete.front().peerFD, Status::REMOVED_PEER); + mReturnCallbacks.erase(request.messageID); + removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR); - mPeersToDelete.front().conditionPtr->notify_all(); - mPeersToDelete.pop(); - return true; -} + return true; -bool Processor::onCall() -{ - LOGS("Processor onCall"); - CallQueue::Call call; - try { - call = std::move(mCalls.pop()); - } catch (const IPCException&) { - LOGE("No calls to serve, but got an EVENT::CALL. Event got removed before serving"); - return false; } - if (call.parse && call.process) { - return onMethodCall(call); - } else { - return onSignalCall(call); - } + return false; } -bool Processor::onSignalCall(CallQueue::Call& call) +bool Processor::onSignalRequest(SignalRequest& request) { - LOGS("Processor onSignalCall"); + LOGS(mLogPrefix + "Processor onSignalRequest"); std::shared_ptr socketPtr; try { // Get the peer's socket - socketPtr = mSockets.at(call.peerFD); + socketPtr = mSockets.at(request.peerFD); } catch (const std::out_of_range&) { - LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD); + LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD); return false; } try { // Send the call with the socket Socket::Guard guard = socketPtr->getGuard(); - socketPtr->write(&call.methodID, sizeof(call.methodID)); - socketPtr->write(&call.messageID, sizeof(call.messageID)); - call.serialize(socketPtr->getFD(), call.data); + socketPtr->write(&request.methodID, sizeof(request.methodID)); + socketPtr->write(&request.messageID, sizeof(request.messageID)); + request.serialize(socketPtr->getFD(), request.data); } catch (const std::exception& e) { - LOGE("Error during sending a signal: " << e.what()); + LOGE(mLogPrefix + "Error during sending a signal: " << e.what()); - removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR); + removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR); return true; } return false; } -bool Processor::onMethodCall(CallQueue::Call& call) +bool Processor::onAddPeerRequest(AddPeerRequest& request) { - LOGS("Processor onMethodCall"); - std::shared_ptr socketPtr; - - - try { - // Get the peer's socket - socketPtr = mSockets.at(call.peerFD); - } catch (const std::out_of_range&) { - LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD); - - // Pass the error to the processing callback - IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data)); + LOGS(mLogPrefix + "Processor onAddPeerRequest"); + if (mSockets.size() > mMaxNumberOfPeers) { + LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerFD); return false; } - - if (mReturnCallbacks.count(call.messageID) != 0) { - LOGE("There already was a return callback for messageID: " << call.messageID); + if (mSockets.count(request.peerFD) != 0) { + LOGE(mLogPrefix + "There already was a socket for peerFD: " << request.peerFD); + return false; } - mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD, - std::move(call.parse), - std::move(call.process))); - try { - // Send the call with the socket - Socket::Guard guard = socketPtr->getGuard(); - socketPtr->write(&call.methodID, sizeof(call.methodID)); - socketPtr->write(&call.messageID, sizeof(call.messageID)); - LOGT("Serializing the message"); - call.serialize(socketPtr->getFD(), call.data); - } catch (const std::exception& e) { - LOGE("Error during sending a method: " << e.what()); + mSockets[request.peerFD] = std::move(request.socketPtr); - // Inform about the error, - IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data)); + // Sending handled signals + std::vector ids; + for (const auto kv : mSignalsCallbacks) { + ids.push_back(kv.first); + } + auto data = std::make_shared(ids); + callAsync(REGISTER_SIGNAL_METHOD_ID, + request.peerFD, + data, + discardResultHandler); - mReturnCallbacks.erase(call.messageID); - removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR); - return true; + resetPolling(); + if (mNewPeerCallback) { + // Notify about the new user. + LOGT(mLogPrefix + "Calling NewPeerCallback"); + mNewPeerCallback(request.peerFD); } - return false; + LOGI(mLogPrefix + "New peer: " << request.peerFD); + return true; } -void Processor::cleanCommunication() +bool Processor::onRemovePeerRequest(RemovePeerRequest& request) { - LOGS("Processor cleanCommunication"); + LOGS(mLogPrefix + "Processor onRemovePeer"); - while (!mEventQueue.isEmpty()) { - switch (mEventQueue.receive()) { - case Event::FINISH: { - LOGE("Event FINISH after FINISH"); - break; - } - case Event::CALL: { - LOGW("Event CALL after FINISH"); - try { - CallQueue::Call call = mCalls.pop(); - if (call.process) { - IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data)); - } - } catch (const IPCException&) { - // No more calls - } - break; - } + removePeerInternal(request.peerFD, Status::REMOVED_PEER); + request.conditionPtr->notify_all(); + + return true; +} + +bool Processor::onFinishRequest(FinishRequest& request) +{ + LOGS(mLogPrefix + "Processor onFinishRequest"); + + // Clean the mRequestQueue + while (!mRequestQueue.isEmpty()) { + auto request = mRequestQueue.pop(); + LOGE(mLogPrefix + "Got: " << request.requestID << " after FINISH"); - case Event::ADD_PEER: { - LOGW("Event ADD_PEER after FINISH"); + switch (request.requestID) { + case Event::METHOD: { + auto requestPtr = request.get(); + IGNORE_EXCEPTIONS(requestPtr->process(Status::CLOSING, requestPtr->data)); break; } - case Event::REMOVE_PEER: { - LOGW("Event REMOVE_PEER after FINISH"); - mPeersToDelete.front().conditionPtr->notify_all(); - mPeersToDelete.pop(); + request.get()->conditionPtr->notify_all(); break; } + case Event::SIGNAL: + case Event::ADD_PEER: + case Event::FINISH: + break; } } + + mIsRunning = false; + request.conditionPtr->notify_all(); + return true; +} + +std::ostream& operator<<(std::ostream& os, const Processor::Event& event) +{ + switch (event) { + + case Processor::Event::FINISH: { + os << "Event::FINISH"; + break; + } + + case Processor::Event::METHOD: { + os << "Event::METHOD"; + break; + } + + case Processor::Event::SIGNAL: { + os << "Event::SIGNAL"; + break; + } + + case Processor::Event::ADD_PEER: { + os << "Event::ADD_PEER"; + break; + } + + case Processor::Event::REMOVE_PEER: { + os << "Event::REMOVE_PEER"; + break; + } + } + + return os; } } // namespace ipc diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp index b0f7ea0..157f39c 100644 --- a/common/ipc/internals/processor.hpp +++ b/common/ipc/internals/processor.hpp @@ -26,8 +26,12 @@ #define COMMON_IPC_INTERNALS_PROCESSOR_HPP #include "ipc/internals/socket.hpp" -#include "ipc/internals/event-queue.hpp" -#include "ipc/internals/call-queue.hpp" +#include "ipc/internals/request-queue.hpp" +#include "ipc/internals/method-request.hpp" +#include "ipc/internals/signal-request.hpp" +#include "ipc/internals/add-peer-request.hpp" +#include "ipc/internals/remove-peer-request.hpp" +#include "ipc/internals/finish-request.hpp" #include "ipc/exception.hpp" #include "ipc/types.hpp" #include "config/manager.hpp" @@ -35,9 +39,9 @@ #include "logger/logger.hpp" #include "logger/logger-scope.hpp" +#include #include #include -#include #include #include #include @@ -67,7 +71,6 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000; * - Rest: The data written in a callback. One type per method.ReturnCallbacks * * TODO: -* - some mutexes may not be needed * - synchronous call to many peers * - implement HandlerStore class for storing both signals and methods * - API for removing signals @@ -81,12 +84,22 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000; * - no new events added after stop() called * - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but * there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :) -* - EventQueue should store std::shared_ptr and it should be the only queue to the Processor thread. -* It should have an API for removing events from the middle of the queue * */ class Processor { +private: + enum class Event { + FINISH, // Shutdown request + METHOD, // New method call in the queue + SIGNAL, // New signal call in the queue + ADD_PEER, // New peer in the queue + REMOVE_PEER // Remove peer + }; + public: + + friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event); + /** * Used to indicate a message with the return value. */ @@ -104,7 +117,8 @@ public: * @param newPeerCallback called when a new peer arrives * @param removedPeerCallback called when the Processor stops listening for this peer */ - Processor(const PeerCallback& newPeerCallback = nullptr, + Processor(const std::string& logName = "", + const PeerCallback& newPeerCallback = nullptr, const PeerCallback& removedPeerCallback = nullptr, const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS); ~Processor(); @@ -113,11 +127,14 @@ public: Processor(Processor&&) = delete; Processor& operator=(const Processor&) = delete; + /** * Start the processing thread. * Quits immediately after starting the thread. + * + * @param usesExternalPolling internal or external polling is used */ - void start(); + void start(const bool usesExternalPolling); /** * @return is processor running @@ -281,6 +298,7 @@ private: typedef std::function& data)> SerializeCallback; typedef std::function(int fd)> ParseCallback; typedef std::unique_lock Lock; + typedef RequestQueue::Request Request; struct EmptyData { CONFIG_REGISTER_EMPTY @@ -337,56 +355,18 @@ private: ResultHandler::type process; }; - struct SocketInfo { - SocketInfo(const SocketInfo& other) = delete; - SocketInfo& operator=(const SocketInfo&) = delete; - SocketInfo() = default; - SocketInfo(SocketInfo&&) = default; - SocketInfo& operator=(SocketInfo &&) = default; - - SocketInfo(const FileDescriptor peerFD, const std::shared_ptr& socketPtr) - : peerFD(peerFD), socketPtr(socketPtr) {} - - FileDescriptor peerFD; - std::shared_ptr socketPtr; - }; - - struct RemovePeerRequest { - RemovePeerRequest(const RemovePeerRequest& other) = delete; - RemovePeerRequest& operator=(const RemovePeerRequest&) = delete; - RemovePeerRequest() = default; - RemovePeerRequest(RemovePeerRequest&&) = default; - RemovePeerRequest& operator=(RemovePeerRequest &&) = default; - - RemovePeerRequest(const FileDescriptor peerFD, - const std::shared_ptr& conditionPtr) - : peerFD(peerFD), conditionPtr(conditionPtr) {} - - FileDescriptor peerFD; - std::shared_ptr conditionPtr; - }; - - enum class Event : int { - FINISH, // Shutdown request - CALL, // New method call in the queue - ADD_PEER, // New peer in the queue - REMOVE_PEER // Remove peer - }; - EventQueue mEventQueue; + std::string mLogPrefix; + RequestQueue mRequestQueue; bool mIsRunning; - - CallQueue mCalls; std::unordered_map> mMethodsCallbacks; std::unordered_map> mSignalsCallbacks; std::unordered_map> mSignalsPeers; std::unordered_map > mSockets; std::vector mFDs; - std::queue mNewSockets; - std::queue mPeersToDelete; std::unordered_map mReturnCallbacks; @@ -408,11 +388,14 @@ private: static void discardResultHandler(Status, std::shared_ptr&) {} void run(); - bool onCall(); - bool onSignalCall(CallQueue::Call& call); - bool onMethodCall(CallQueue::Call& call); - bool onNewPeer(); - bool onRemovePeer(); + + // Request handlers + bool onMethodRequest(MethodRequest& request); + bool onSignalRequest(SignalRequest& request); + bool onAddPeerRequest(AddPeerRequest& request); + bool onRemovePeerRequest(RemovePeerRequest& request); + bool onFinishRequest(FinishRequest& request); + bool handleLostConnections(); bool handleInputs(); @@ -434,7 +417,6 @@ private: std::shared_ptr& data); - void cleanCommunication(); }; template @@ -469,7 +451,7 @@ void Processor::addMethodHandler(const MethodID methodID, const typename MethodHandler::type& method) { if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) { - LOGE("Forbidden methodID: " << methodID); + LOGE(mLogPrefix + "Forbidden methodID: " << methodID); throw IPCException("Forbidden methodID: " + std::to_string(methodID)); } @@ -477,7 +459,7 @@ void Processor::addMethodHandler(const MethodID methodID, Lock lock(mStateMutex); if (mSignalsCallbacks.count(methodID)) { - LOGE("MethodID used by a signal: " << methodID); + LOGE(mLogPrefix + "MethodID used by a signal: " << methodID); throw IPCException("MethodID used by a signal: " + std::to_string(methodID)); } @@ -491,7 +473,7 @@ void Processor::addSignalHandler(const MethodID methodID, const typename SignalHandler::type& handler) { if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) { - LOGE("Forbidden methodID: " << methodID); + LOGE(mLogPrefix + "Forbidden methodID: " << methodID); throw IPCException("Forbidden methodID: " + std::to_string(methodID)); } @@ -502,7 +484,7 @@ void Processor::addSignalHandler(const MethodID methodID, // Andd the signal handler: if (mMethodsCallbacks.count(methodID)) { - LOGE("MethodID used by a method: " << methodID); + LOGE(mLogPrefix + "MethodID used by a method: " << methodID); throw IPCException("MethodID used by a method: " + std::to_string(methodID)); } @@ -546,10 +528,9 @@ MessageID Processor::callAsync(const MethodID methodID, const typename ResultHandler::type& process) { Lock lock(mStateMutex); - MessageID messageID = mCalls.push(methodID, peerFD, data, process); - mEventQueue.send(Event::CALL); - - return messageID; + auto request = MethodRequest::create(methodID, peerFD, data, process); + mRequestQueue.push(Event::METHOD, request); + return request->messageID; } @@ -581,24 +562,31 @@ std::shared_ptr Processor::callSync(const MethodID methodID, }; std::unique_lock lock(mutex); - LOGT("Waiting for the response..."); + LOGT(mLogPrefix + "Waiting for the response..."); if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) { - LOGW("Probably a timeout in callSync. Checking..."); + LOGW(mLogPrefix + "Probably a timeout in callSync. Checking..."); bool isTimeout; { Lock lock(mStateMutex); // Call isn't sent or call is sent but there is no reply - isTimeout = mCalls.erase(messageID) || 1 == mReturnCallbacks.erase(messageID); + isTimeout = mRequestQueue.removeIf([messageID](Request & request) { + return request.requestID == Event::METHOD && + request.get()->messageID == messageID; + }) + || mRequestQueue.removeIf([messageID](Request & request) { + return request.requestID == Event::SIGNAL && + request.get()->messageID == messageID; + }) + || 1 == mReturnCallbacks.erase(messageID); } - if (isTimeout) { - LOGE("Function call timeout; methodID: " << methodID); + LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID); removePeer(peerFD); throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID)); } else { - LOGW("Timeout started during the return value processing, so wait for it to finish"); + LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish"); if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) { - LOGE("Function call timeout; methodID: " << methodID); + LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID); throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID)); } } @@ -616,16 +604,17 @@ void Processor::signal(const MethodID methodID, Lock lock(mStateMutex); const auto it = mSignalsPeers.find(methodID); if (it == mSignalsPeers.end()) { - LOGW("No peer is handling signal with methodID: " << methodID); + LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID); return; } for (const FileDescriptor peerFD : it->second) { - mCalls.push(methodID, peerFD, data); - mEventQueue.send(Event::CALL); + auto request = SignalRequest::create(methodID, peerFD, data); + mRequestQueue.push(Event::SIGNAL, request); } } + } // namespace ipc } // namespace vasum diff --git a/common/ipc/internals/remove-peer-request.hpp b/common/ipc/internals/remove-peer-request.hpp new file mode 100644 index 0000000..ec01ac4 --- /dev/null +++ b/common/ipc/internals/remove-peer-request.hpp @@ -0,0 +1,55 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Processor's request to remove a peer + */ + +#ifndef COMMON_IPC_INTERNALS_REMOVE_PEER_REQUEST_HPP +#define COMMON_IPC_INTERNALS_REMOVE_PEER_REQUEST_HPP + +#include "ipc/types.hpp" +#include "ipc/internals/socket.hpp" +#include + + +namespace vasum { +namespace ipc { + +class RemovePeerRequest { +public: + RemovePeerRequest(const RemovePeerRequest&) = delete; + RemovePeerRequest& operator=(const RemovePeerRequest&) = delete; + + RemovePeerRequest(const FileDescriptor peerFD, + const std::shared_ptr& conditionPtr) + : peerFD(peerFD), + conditionPtr(conditionPtr) + { + } + + FileDescriptor peerFD; + std::shared_ptr conditionPtr; +}; + +} // namespace ipc +} // namespace vasum + +#endif // COMMON_IPC_INTERNALS_REMOVE_PEER_REQUEST_HPP diff --git a/common/ipc/internals/request-queue.hpp b/common/ipc/internals/request-queue.hpp new file mode 100644 index 0000000..35b5120 --- /dev/null +++ b/common/ipc/internals/request-queue.hpp @@ -0,0 +1,161 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Managing the queue of messages carrying any kind of data + */ + +#ifndef COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP +#define COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP + +#include "ipc/exception.hpp" +#include "ipc/internals/eventfd.hpp" +#include "logger/logger.hpp" + +#include +#include +#include + +namespace vasum { +namespace ipc { + +/** +* Class for managing a queue of Requests carrying any data +*/ +template +class RequestQueue { +public: + RequestQueue() = default; + + RequestQueue(const RequestQueue&) = delete; + RequestQueue& operator=(const RequestQueue&) = delete; + + struct Request { + Request(const Request& other) = delete; + Request& operator=(const Request&) = delete; + + Request(Request&&) = default; + Request(const RequestIdType requestID, const std::shared_ptr& data) + : requestID(requestID), + data(data) + {} + + template + std::shared_ptr get() + { + return std::static_pointer_cast(data); + } + + RequestIdType requestID; + std::shared_ptr data; + }; + + /** + * @return event's file descriptor + */ + int getFD() const; + + /** + * @return is the queue empty + */ + bool isEmpty() const; + + /** + * Push data to the queue + * + * @param requestID request type + * @param data data corresponding to the request + */ + void push(const RequestIdType requestID, + const std::shared_ptr& data = nullptr); + + /** + * @return get the data from the next request + */ + Request pop(); + + /** + * Remove elements from the queue when the predicate returns true + * + * @param predicate condition + * @return was anything removed + */ + template + bool removeIf(Predicate predicate); + +private: + std::list mRequests; + EventFD mEventFD; +}; + +template +int RequestQueue::getFD() const +{ + return mEventFD.getFD(); +} + +template +bool RequestQueue::isEmpty() const +{ + return mRequests.empty(); +} + +template +void RequestQueue::push(const RequestIdType requestID, + const std::shared_ptr& data) +{ + Request request(requestID, data); + mRequests.push_back(std::move(request)); + mEventFD.send(); +} + +template +typename RequestQueue::Request RequestQueue::pop() +{ + mEventFD.receive(); + if (mRequests.empty()) { + LOGE("Request queue is empty"); + throw IPCException("Request queue is empty"); + } + Request request = std::move(mRequests.front()); + mRequests.pop_front(); + return request; +} + +template +template +bool RequestQueue::removeIf(Predicate predicate) +{ + auto it = std::find_if(mRequests.begin(), mRequests.end(), predicate); + if (it == mRequests.end()) { + return false; + } + + do { + it = mRequests.erase(it); + it = std::find_if(it, mRequests.end(), predicate); + } while (it != mRequests.end()); + + return true; +} +} // namespace ipc +} // namespace vasum + +#endif // COMMON_IPC_INTERNALS_MESSAGE_QUEUE_HPP diff --git a/common/ipc/internals/signal-request.hpp b/common/ipc/internals/signal-request.hpp new file mode 100644 index 0000000..4cf62c2 --- /dev/null +++ b/common/ipc/internals/signal-request.hpp @@ -0,0 +1,82 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Processor's request to send a signal + */ + +#ifndef COMMON_IPC_INTERNALS_SIGNAL_REQUEST_HPP +#define COMMON_IPC_INTERNALS_SIGNAL_REQUEST_HPP + +#include "ipc/types.hpp" +#include "config/manager.hpp" +#include "logger/logger-scope.hpp" + +namespace vasum { +namespace ipc { + +class SignalRequest { +public: + SignalRequest(const SignalRequest&) = delete; + SignalRequest& operator=(const SignalRequest&) = delete; + + + + template + static std::shared_ptr create(const MethodID methodID, + const FileDescriptor peerFD, + const std::shared_ptr& data); + + MethodID methodID; + FileDescriptor peerFD; + MessageID messageID; + std::shared_ptr data; + SerializeCallback serialize; + +private: + SignalRequest(const MethodID methodID, const FileDescriptor peerFD) + : methodID(methodID), + peerFD(peerFD), + messageID(getNextMessageID()) + {} + +}; + +template +std::shared_ptr SignalRequest::create(const MethodID methodID, + const FileDescriptor peerFD, + const std::shared_ptr& data) +{ + std::shared_ptr request(new SignalRequest(methodID, peerFD)); + + request->data = data; + + request->serialize = [](const int fd, std::shared_ptr& data)->void { + LOGS("Signal serialize, peerFD: " << fd); + config::saveToFD(fd, *std::static_pointer_cast(data)); + }; + + return request; +} + +} // namespace ipc +} // namespace vasum + +#endif // COMMON_IPC_INTERNALS_SIGNAL_REQUEST_HPP diff --git a/common/ipc/service.cpp b/common/ipc/service.cpp index ef46346..b96bcd4 100644 --- a/common/ipc/service.cpp +++ b/common/ipc/service.cpp @@ -36,7 +36,7 @@ namespace ipc { Service::Service(const std::string& socketPath, const PeerCallback& addPeerCallback, const PeerCallback& removePeerCallback) - : mProcessor(addPeerCallback, removePeerCallback), + : mProcessor("[SERVICE] ", addPeerCallback, removePeerCallback), mAcceptor(socketPath, std::bind(&Processor::addPeer, &mProcessor, _1)) { @@ -53,14 +53,16 @@ Service::~Service() } } -void Service::start() +void Service::start(const bool usesExternalPolling) { LOGS("Service start"); - mProcessor.start(); + mProcessor.start(usesExternalPolling); // There can be an incoming connection from mAcceptor before mProcessor is listening, // but it's OK. It will handle the connection when ready. So no need to wait for mProcessor. - mAcceptor.start(); + if (!usesExternalPolling) { + mAcceptor.start(); + } } bool Service::isStarted() diff --git a/common/ipc/service.hpp b/common/ipc/service.hpp index fa12e30..9392a42 100644 --- a/common/ipc/service.hpp +++ b/common/ipc/service.hpp @@ -61,8 +61,10 @@ public: /** * Starts the worker and acceptor threads + * + * @param usesExternalPolling internal or external polling is used */ - void start(); + void start(const bool usesExternalPolling = false); /** * @return is the communication thread running diff --git a/common/ipc/types.cpp b/common/ipc/types.cpp index fa57648..ba4c1c4 100644 --- a/common/ipc/types.cpp +++ b/common/ipc/types.cpp @@ -27,10 +27,20 @@ #include "ipc/types.hpp" #include "logger/logger.hpp" +#include namespace vasum { namespace ipc { +namespace { +std::atomic gLastMessageID(0); +} // namespace + +MessageID getNextMessageID() +{ + return ++gLastMessageID; +} + std::string toString(const Status status) { switch (status) { diff --git a/common/ipc/types.hpp b/common/ipc/types.hpp index 5132911..10b87df 100644 --- a/common/ipc/types.hpp +++ b/common/ipc/types.hpp @@ -39,6 +39,8 @@ typedef unsigned int MethodID; typedef unsigned int MessageID; typedef std::function PeerCallback; +typedef std::function& data)> SerializeCallback; +typedef std::function(int fd)> ParseCallback; enum class Status : int { OK = 0, @@ -53,6 +55,8 @@ enum class Status : int { std::string toString(const Status status); void throwOnError(const Status status); +MessageID getNextMessageID(); + template struct MethodHandler { diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp index 7c9df6c..caf59d2 100644 --- a/tests/unit_tests/ipc/ut-ipc.cpp +++ b/tests/unit_tests/ipc/ut-ipc.cpp @@ -63,7 +63,7 @@ const int TIMEOUT = 1000 /*ms*/; const int SHORT_OPERATION_TIME = TIMEOUT / 100; // Time that will cause "TIMEOUT" methods to throw -const int LONG_OPERATION_TIME = 500 + TIMEOUT; +const int LONG_OPERATION_TIME = 1000 + TIMEOUT; struct Fixture { std::string socketPath; @@ -204,7 +204,7 @@ std::pair connectServiceGSource(Service& s, // TODO: On timeout remove the callback s.setNewPeerCallback(newPeerCallback); s.setRemovedPeerCallback(std::bind(&IPCGSource::removeFD, ipcGSourcePtr, _1)); - + s.start(true); // Service starts to process ipcGSourcePtr->attach(); @@ -239,7 +239,7 @@ std::pair connectClientGSource(Service& s, } - c.connect(); + c.start(true); IPCGSource::Pointer ipcGSourcePtr = IPCGSource::create(c.getFDs(), std::bind(&Client::handle, &c, _1, _2)); -- 2.7.4