From: Jan Olszak Date: Tue, 2 Dec 2014 16:27:17 +0000 (+0100) Subject: IPC: CallQueue X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=161b6dc2bb991737ee46df3bbe1714f1e190144c;p=platform%2Fcore%2Fsecurity%2Fvasum.git IPC: CallQueue [Bug/Feature] CallQueue to wrap the Calls' queue [Cause] N/A [Solution] N/A [Verification] Build, install, run tests Change-Id: I9519114e1c6f2e4040e956ae54f5093646f2dfaf --- diff --git a/common/ipc/internals/call-queue.cpp b/common/ipc/internals/call-queue.cpp new file mode 100644 index 0000000..9be7e53 --- /dev/null +++ b/common/ipc/internals/call-queue.cpp @@ -0,0 +1,66 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Managing the queue with calls + */ + +#include "config.hpp" + +#include "ipc/internals/call-queue.hpp" +#include "ipc/exception.hpp" +#include "logger/logger.hpp" + +namespace security_containers { +namespace ipc { + +CallQueue::CallQueue() + : mMessageIDCounter(0) +{ +} + +CallQueue::~CallQueue() +{ +} + +bool CallQueue::isEmpty() const +{ + return mCalls.empty(); +} + +MessageID CallQueue::getNextMessageID() +{ + // TODO: This method of generating UIDs is buggy. To be changed. + return ++mMessageIDCounter; +} + +CallQueue::Call CallQueue::pop() +{ + if (isEmpty()) { + LOGE("CallQueue is empty"); + throw IPCException("CallQueue is empty"); + } + Call call = std::move(mCalls.front()); + mCalls.pop(); + return call; +} + +} // namespace ipc +} // namespace security_containers diff --git a/common/ipc/internals/call-queue.hpp b/common/ipc/internals/call-queue.hpp new file mode 100644 index 0000000..4d1ecf6 --- /dev/null +++ b/common/ipc/internals/call-queue.hpp @@ -0,0 +1,150 @@ +/* +* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved +* +* Contact: Jan Olszak +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License +*/ + +/** + * @file + * @author Jan Olszak (j.olszak@samsung.com) + * @brief Managing the queue with calls + */ + +#ifndef COMMON_IPC_INTERNALS_CALL_QUEUE_HPP +#define COMMON_IPC_INTERNALS_CALL_QUEUE_HPP + +#include "ipc/types.hpp" +#include "config/manager.hpp" + +#include +#include + +namespace security_containers { +namespace ipc { + +/** +* Class for managing a queue of calls in the Processor +*/ +class CallQueue { +public: + typedef std::function& data)> SerializeCallback; + typedef std::function(int fd)> ParseCallback; + + struct Call { + Call(const Call& other) = delete; + Call& operator=(const Call&) = delete; + Call() = default; + Call(Call&&) = default; + + PeerID peerID; + MethodID methodID; + MessageID messageID; + std::shared_ptr data; + SerializeCallback serialize; + ParseCallback parse; + ResultHandler::type process; + }; + + CallQueue(); + ~CallQueue(); + + CallQueue(const CallQueue&) = delete; + CallQueue(CallQueue&&) = delete; + CallQueue& operator=(const CallQueue&) = delete; + + template + MessageID push(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + const typename ResultHandler::type& process); + + + template + MessageID push(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data); + + Call pop(); + + bool isEmpty() const; + +private: + std::queue mCalls; + std::atomic mMessageIDCounter; + + MessageID getNextMessageID(); +}; + + +template +MessageID CallQueue::push(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data, + const typename ResultHandler::type& process) +{ + Call call; + call.methodID = methodID; + call.peerID = peerID; + call.data = data; + + MessageID messageID = getNextMessageID(); + call.messageID = messageID; + + call.serialize = [](const int fd, std::shared_ptr& data)->void { + config::saveToFD(fd, *std::static_pointer_cast(data)); + }; + + call.parse = [](const int fd)->std::shared_ptr { + std::shared_ptr data(new ReceivedDataType()); + config::loadFromFD(fd, *data); + return data; + }; + + call.process = [process](Status status, std::shared_ptr& data)->void { + std::shared_ptr tmpData = std::static_pointer_cast(data); + return process(status, tmpData); + }; + + mCalls.push(std::move(call)); + + return messageID; +} + +template +MessageID CallQueue::push(const MethodID methodID, + const PeerID peerID, + const std::shared_ptr& data) +{ + Call call; + call.methodID = methodID; + call.peerID = peerID; + call.data = data; + + MessageID messageID = getNextMessageID(); + call.messageID = messageID; + + call.serialize = [](const int fd, std::shared_ptr& data)->void { + config::saveToFD(fd, *std::static_pointer_cast(data)); + }; + + mCalls.push(std::move(call)); + + return messageID; +} + +} // namespace ipc +} // namespace security_containers + +#endif // COMMON_IPC_INTERNALS_CALL_QUEUE_HPP diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp index dc7df42..5565124 100644 --- a/common/ipc/internals/processor.cpp +++ b/common/ipc/internals/processor.cpp @@ -56,7 +56,6 @@ Processor::Processor(const PeerCallback& newPeerCallback, : mNewPeerCallback(newPeerCallback), mRemovedPeerCallback(removedPeerCallback), mMaxNumberOfPeers(maxNumberOfPeers), - mMessageIDCounter(0), mPeerIDCounter(0) { LOGT("Creating Processor"); @@ -579,34 +578,22 @@ bool Processor::onRemovePeer() return true; } -MessageID Processor::getNextMessageID() -{ - // TODO: This method of generating UIDs is buggy. To be changed. - return ++mMessageIDCounter; -} - PeerID Processor::getNextPeerID() { // TODO: This method of generating UIDs is buggy. To be changed. return ++mPeerIDCounter; } -Processor::Call Processor::getCall() +CallQueue::Call Processor::getCall() { Lock lock(mCallsMutex); - if (mCalls.empty()) { - LOGE("Calls queue empty"); - throw IPCException("Calls queue empty"); - } - Call call = std::move(mCalls.front()); - mCalls.pop(); - return call; + return mCalls.pop(); } bool Processor::onCall() { LOGT("Handle call (from another thread) to send a message."); - Call call = getCall(); + CallQueue::Call call = getCall(); std::shared_ptr socketPtr; try { @@ -664,7 +651,7 @@ void Processor::cleanCommunication() } case Event::CALL: { LOGD("Event CALL after FINISH"); - Call call = getCall(); + CallQueue::Call call = getCall(); if (call.process) { IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data)); } diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp index e43fe62..8fc17fb 100644 --- a/common/ipc/internals/processor.hpp +++ b/common/ipc/internals/processor.hpp @@ -27,10 +27,10 @@ #include "ipc/internals/socket.hpp" #include "ipc/internals/event-queue.hpp" +#include "ipc/internals/call-queue.hpp" #include "ipc/exception.hpp" #include "ipc/types.hpp" #include "config/manager.hpp" -#include "config/is-visitable.hpp" #include "config/fields.hpp" #include "logger/logger.hpp" @@ -70,11 +70,12 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000; * TODO: * - some mutexes may not be needed * - synchronous call to many peers -* - implement CallQueue class * - implement HandlerStore class for storing both signals and methods * - API for removing signals * - implement CallbackStore - thread safe calling/setting callbacks * - helper function for removing from unordered map +* - new way to generate UIDs +* - callbacks for serialization/parsing */ class Processor { public: @@ -261,21 +262,6 @@ private: ) }; - struct Call { - Call(const Call& other) = delete; - Call& operator=(const Call&) = delete; - Call() = default; - Call(Call&&) = default; - - PeerID peerID; - MethodID methodID; - std::shared_ptr data; - SerializeCallback serialize; - ParseCallback parse; - ResultHandler::type process; - MessageID messageID; - }; - struct MethodHandlers { MethodHandlers(const MethodHandlers& other) = delete; MethodHandlers& operator=(const MethodHandlers&) = delete; @@ -356,7 +342,7 @@ private: // Mutex for the Calls queue and the map of methods. std::mutex mCallsMutex; - std::queue mCalls; + CallQueue mCalls; std::unordered_map> mMethodsCallbacks; std::unordered_map> mSignalsCallbacks; std::unordered_map> mSignalsPeers; @@ -382,18 +368,12 @@ private: std::thread mThread; std::vector mFDs; - std::atomic mMessageIDCounter; std::atomic mPeerIDCounter; template void addMethodHandlerInternal(const MethodID methodID, const typename MethodHandler::type& process); - template - MessageID callInternal(const MethodID methodID, - const PeerID peerID, - const std::shared_ptr& data); - template MessageID callInternal(const MethodID methodID, const PeerID peerID, @@ -425,9 +405,8 @@ private: const MessageID messageID, std::shared_ptr signalCallbacks); void resetPolling(); - MessageID getNextMessageID(); PeerID getNextPeerID(); - Call getCall(); + CallQueue::Call getCall(); void removePeerInternal(const PeerID peerID, Status status); std::shared_ptr onNewSignals(const PeerID peerID, @@ -547,34 +526,11 @@ MessageID Processor::callInternal(const MethodID methodID, const std::shared_ptr& data, const typename ResultHandler::type& process) { - Call call; - call.peerID = peerID; - call.methodID = methodID; - call.data = data; - call.messageID = getNextMessageID(); - - call.serialize = [](const int fd, std::shared_ptr& data)->void { - config::saveToFD(fd, *std::static_pointer_cast(data)); - }; - - call.parse = [](const int fd)->std::shared_ptr { - std::shared_ptr data(new ReceivedDataType()); - config::loadFromFD(fd, *data); - return data; - }; - - call.process = [process](Status status, std::shared_ptr& data)->void { - std::shared_ptr tmpData = std::static_pointer_cast(data); - return process(status, tmpData); - }; + Lock lock(mCallsMutex); + MessageID messageID = mCalls.push(methodID, peerID, data, process); + mEventQueue.send(Event::CALL); - { - Lock lock(mCallsMutex); - mCalls.push(std::move(call)); - mEventQueue.send(Event::CALL); - } - - return call.messageID; + return messageID; } template @@ -660,24 +616,13 @@ void Processor::signal(const MethodID methodID, } for (const PeerID peerID : peersIDs) { - Call call; - call.peerID = peerID; - call.methodID = methodID; - call.data = data; - call.messageID = getNextMessageID(); - - call.serialize = [](const int fd, std::shared_ptr& data)->void { - config::saveToFD(fd, *std::static_pointer_cast(data)); - }; - - { - Lock lock(mCallsMutex); - mCalls.push(std::move(call)); - mEventQueue.send(Event::CALL); - } + Lock lock(mCallsMutex); + mCalls.push(methodID, peerID, data); + mEventQueue.send(Event::CALL); } } + } // namespace ipc } // namespace security_containers