IPC: CallQueue 23/31223/5
authorJan Olszak <j.olszak@samsung.com>
Tue, 2 Dec 2014 16:27:17 +0000 (17:27 +0100)
committerPiotr Bartosiewicz <p.bartosiewi@partner.samsung.com>
Wed, 3 Dec 2014 15:54:03 +0000 (07:54 -0800)
[Bug/Feature]   CallQueue to wrap the Calls' queue
[Cause]         N/A
[Solution]      N/A
[Verification]  Build, install, run tests

Change-Id: I9519114e1c6f2e4040e956ae54f5093646f2dfaf

common/ipc/internals/call-queue.cpp [new file with mode: 0644]
common/ipc/internals/call-queue.hpp [new file with mode: 0644]
common/ipc/internals/processor.cpp
common/ipc/internals/processor.hpp

diff --git a/common/ipc/internals/call-queue.cpp b/common/ipc/internals/call-queue.cpp
new file mode 100644 (file)
index 0000000..9be7e53
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Managing the queue with calls
+ */
+
+#include "config.hpp"
+
+#include "ipc/internals/call-queue.hpp"
+#include "ipc/exception.hpp"
+#include "logger/logger.hpp"
+
+namespace security_containers {
+namespace ipc {
+
+CallQueue::CallQueue()
+    : mMessageIDCounter(0)
+{
+}
+
+CallQueue::~CallQueue()
+{
+}
+
+bool CallQueue::isEmpty() const
+{
+    return mCalls.empty();
+}
+
+MessageID CallQueue::getNextMessageID()
+{
+    // TODO: This method of generating UIDs is buggy. To be changed.
+    return ++mMessageIDCounter;
+}
+
+CallQueue::Call CallQueue::pop()
+{
+    if (isEmpty()) {
+        LOGE("CallQueue is empty");
+        throw IPCException("CallQueue is empty");
+    }
+    Call call = std::move(mCalls.front());
+    mCalls.pop();
+    return call;
+}
+
+} // namespace ipc
+} // namespace security_containers
diff --git a/common/ipc/internals/call-queue.hpp b/common/ipc/internals/call-queue.hpp
new file mode 100644 (file)
index 0000000..4d1ecf6
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+*  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+*  Contact: Jan Olszak <j.olszak@samsung.com>
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License
+*/
+
+/**
+ * @file
+ * @author  Jan Olszak (j.olszak@samsung.com)
+ * @brief   Managing the queue with calls
+ */
+
+#ifndef COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
+#define COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
+
+#include "ipc/types.hpp"
+#include "config/manager.hpp"
+
+#include <atomic>
+#include <queue>
+
+namespace security_containers {
+namespace ipc {
+
+/**
+* Class for managing a queue of calls in the Processor
+*/
+class CallQueue {
+public:
+    typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
+    typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
+
+    struct Call {
+        Call(const Call& other) = delete;
+        Call& operator=(const Call&) = delete;
+        Call() = default;
+        Call(Call&&) = default;
+
+        PeerID peerID;
+        MethodID methodID;
+        MessageID messageID;
+        std::shared_ptr<void> data;
+        SerializeCallback serialize;
+        ParseCallback parse;
+        ResultHandler<void>::type process;
+    };
+
+    CallQueue();
+    ~CallQueue();
+
+    CallQueue(const CallQueue&) = delete;
+    CallQueue(CallQueue&&) = delete;
+    CallQueue& operator=(const CallQueue&) = delete;
+
+    template<typename SentDataType, typename ReceivedDataType>
+    MessageID push(const MethodID methodID,
+                   const PeerID peerID,
+                   const std::shared_ptr<SentDataType>& data,
+                   const typename ResultHandler<ReceivedDataType>::type& process);
+
+
+    template<typename SentDataType>
+    MessageID push(const MethodID methodID,
+                   const PeerID peerID,
+                   const std::shared_ptr<SentDataType>& data);
+
+    Call pop();
+
+    bool isEmpty() const;
+
+private:
+    std::queue<Call> mCalls;
+    std::atomic<MessageID> mMessageIDCounter;
+
+    MessageID getNextMessageID();
+};
+
+
+template<typename SentDataType, typename ReceivedDataType>
+MessageID CallQueue::push(const MethodID methodID,
+                          const PeerID peerID,
+                          const std::shared_ptr<SentDataType>& data,
+                          const typename ResultHandler<ReceivedDataType>::type& process)
+{
+    Call call;
+    call.methodID = methodID;
+    call.peerID = peerID;
+    call.data = data;
+
+    MessageID messageID = getNextMessageID();
+    call.messageID = messageID;
+
+    call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+        config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
+    };
+
+    call.parse = [](const int fd)->std::shared_ptr<void> {
+        std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
+        config::loadFromFD<ReceivedDataType>(fd, *data);
+        return data;
+    };
+
+    call.process = [process](Status status, std::shared_ptr<void>& data)->void {
+        std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
+        return process(status, tmpData);
+    };
+
+    mCalls.push(std::move(call));
+
+    return messageID;
+}
+
+template<typename SentDataType>
+MessageID CallQueue::push(const MethodID methodID,
+                          const PeerID peerID,
+                          const std::shared_ptr<SentDataType>& data)
+{
+    Call call;
+    call.methodID = methodID;
+    call.peerID = peerID;
+    call.data = data;
+
+    MessageID messageID = getNextMessageID();
+    call.messageID = messageID;
+
+    call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+        config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
+    };
+
+    mCalls.push(std::move(call));
+
+    return messageID;
+}
+
+} // namespace ipc
+} // namespace security_containers
+
+#endif // COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
index dc7df42..5565124 100644 (file)
@@ -56,7 +56,6 @@ Processor::Processor(const PeerCallback& newPeerCallback,
     : mNewPeerCallback(newPeerCallback),
       mRemovedPeerCallback(removedPeerCallback),
       mMaxNumberOfPeers(maxNumberOfPeers),
-      mMessageIDCounter(0),
       mPeerIDCounter(0)
 {
     LOGT("Creating Processor");
@@ -579,34 +578,22 @@ bool Processor::onRemovePeer()
     return true;
 }
 
-MessageID Processor::getNextMessageID()
-{
-    // TODO: This method of generating UIDs is buggy. To be changed.
-    return ++mMessageIDCounter;
-}
-
 PeerID Processor::getNextPeerID()
 {
     // TODO: This method of generating UIDs is buggy. To be changed.
     return ++mPeerIDCounter;
 }
 
-Processor::Call Processor::getCall()
+CallQueue::Call Processor::getCall()
 {
     Lock lock(mCallsMutex);
-    if (mCalls.empty()) {
-        LOGE("Calls queue empty");
-        throw IPCException("Calls queue empty");
-    }
-    Call call = std::move(mCalls.front());
-    mCalls.pop();
-    return call;
+    return mCalls.pop();
 }
 
 bool Processor::onCall()
 {
     LOGT("Handle call (from another thread) to send a message.");
-    Call call = getCall();
+    CallQueue::Call call = getCall();
 
     std::shared_ptr<Socket> socketPtr;
     try {
@@ -664,7 +651,7 @@ void Processor::cleanCommunication()
         }
         case Event::CALL: {
             LOGD("Event CALL after FINISH");
-            Call call = getCall();
+            CallQueue::Call call = getCall();
             if (call.process) {
                 IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
             }
index e43fe62..8fc17fb 100644 (file)
 
 #include "ipc/internals/socket.hpp"
 #include "ipc/internals/event-queue.hpp"
+#include "ipc/internals/call-queue.hpp"
 #include "ipc/exception.hpp"
 #include "ipc/types.hpp"
 #include "config/manager.hpp"
-#include "config/is-visitable.hpp"
 #include "config/fields.hpp"
 #include "logger/logger.hpp"
 
@@ -70,11 +70,12 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
 * TODO:
 *  - some mutexes may not be needed
 *  - synchronous call to many peers
-*  - implement CallQueue class
 *  - implement HandlerStore class for storing both signals and methods
 *  - API for removing signals
 *  - implement CallbackStore - thread safe calling/setting callbacks
 *  - helper function for removing from unordered map
+*  - new way to generate UIDs
+*  - callbacks for serialization/parsing
 */
 class Processor {
 public:
@@ -261,21 +262,6 @@ private:
         )
     };
 
-    struct Call {
-        Call(const Call& other) = delete;
-        Call& operator=(const Call&) = delete;
-        Call() = default;
-        Call(Call&&) = default;
-
-        PeerID peerID;
-        MethodID methodID;
-        std::shared_ptr<void> data;
-        SerializeCallback serialize;
-        ParseCallback parse;
-        ResultHandler<void>::type process;
-        MessageID messageID;
-    };
-
     struct MethodHandlers {
         MethodHandlers(const MethodHandlers& other) = delete;
         MethodHandlers& operator=(const MethodHandlers&) = delete;
@@ -356,7 +342,7 @@ private:
 
     // Mutex for the Calls queue and the map of methods.
     std::mutex mCallsMutex;
-    std::queue<Call> mCalls;
+    CallQueue mCalls;
     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
     std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
     std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
@@ -382,18 +368,12 @@ private:
     std::thread mThread;
     std::vector<struct pollfd> mFDs;
 
-    std::atomic<MessageID> mMessageIDCounter;
     std::atomic<PeerID> mPeerIDCounter;
 
     template<typename SentDataType, typename ReceivedDataType>
     void addMethodHandlerInternal(const MethodID methodID,
                                   const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
 
-    template<typename SentDataType>
-    MessageID callInternal(const MethodID methodID,
-                           const PeerID peerID,
-                           const std::shared_ptr<SentDataType>& data);
-
     template<typename SentDataType, typename ReceivedDataType>
     MessageID callInternal(const MethodID methodID,
                            const PeerID peerID,
@@ -425,9 +405,8 @@ private:
                         const MessageID messageID,
                         std::shared_ptr<SignalHandlers> signalCallbacks);
     void resetPolling();
-    MessageID getNextMessageID();
     PeerID getNextPeerID();
-    Call getCall();
+    CallQueue::Call getCall();
     void removePeerInternal(const PeerID peerID, Status status);
 
     std::shared_ptr<EmptyData> onNewSignals(const PeerID peerID,
@@ -547,34 +526,11 @@ MessageID Processor::callInternal(const MethodID methodID,
                                   const std::shared_ptr<SentDataType>& data,
                                   const typename ResultHandler<ReceivedDataType>::type& process)
 {
-    Call call;
-    call.peerID = peerID;
-    call.methodID = methodID;
-    call.data = data;
-    call.messageID = getNextMessageID();
-
-    call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
-        config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
-    };
-
-    call.parse = [](const int fd)->std::shared_ptr<void> {
-        std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
-        config::loadFromFD<ReceivedDataType>(fd, *data);
-        return data;
-    };
-
-    call.process = [process](Status status, std::shared_ptr<void>& data)->void {
-        std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
-        return process(status, tmpData);
-    };
+    Lock lock(mCallsMutex);
+    MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
+    mEventQueue.send(Event::CALL);
 
-    {
-        Lock lock(mCallsMutex);
-        mCalls.push(std::move(call));
-        mEventQueue.send(Event::CALL);
-    }
-
-    return call.messageID;
+    return messageID;
 }
 
 template<typename SentDataType, typename ReceivedDataType>
@@ -660,24 +616,13 @@ void Processor::signal(const MethodID methodID,
     }
 
     for (const PeerID peerID : peersIDs) {
-        Call call;
-        call.peerID = peerID;
-        call.methodID = methodID;
-        call.data = data;
-        call.messageID = getNextMessageID();
-
-        call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
-            config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
-        };
-
-        {
-            Lock lock(mCallsMutex);
-            mCalls.push(std::move(call));
-            mEventQueue.send(Event::CALL);
-        }
+        Lock lock(mCallsMutex);
+        mCalls.push<SentDataType>(methodID, peerID, data);
+        mEventQueue.send(Event::CALL);
     }
 }
 
+
 } // namespace ipc
 } // namespace security_containers