--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Managing the queue with calls
+ */
+
+#include "config.hpp"
+
+#include "ipc/internals/call-queue.hpp"
+#include "ipc/exception.hpp"
+#include "logger/logger.hpp"
+
+namespace security_containers {
+namespace ipc {
+
+CallQueue::CallQueue()
+ : mMessageIDCounter(0)
+{
+}
+
+CallQueue::~CallQueue()
+{
+}
+
+bool CallQueue::isEmpty() const
+{
+ return mCalls.empty();
+}
+
+MessageID CallQueue::getNextMessageID()
+{
+ // TODO: This method of generating UIDs is buggy. To be changed.
+ return ++mMessageIDCounter;
+}
+
+CallQueue::Call CallQueue::pop()
+{
+ if (isEmpty()) {
+ LOGE("CallQueue is empty");
+ throw IPCException("CallQueue is empty");
+ }
+ Call call = std::move(mCalls.front());
+ mCalls.pop();
+ return call;
+}
+
+} // namespace ipc
+} // namespace security_containers
--- /dev/null
+/*
+* Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
+*
+* Contact: Jan Olszak <j.olszak@samsung.com>
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+/**
+ * @file
+ * @author Jan Olszak (j.olszak@samsung.com)
+ * @brief Managing the queue with calls
+ */
+
+#ifndef COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
+#define COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
+
+#include "ipc/types.hpp"
+#include "config/manager.hpp"
+
+#include <atomic>
+#include <queue>
+
+namespace security_containers {
+namespace ipc {
+
+/**
+* Class for managing a queue of calls in the Processor
+*/
+class CallQueue {
+public:
+ typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
+ typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
+
+ struct Call {
+ Call(const Call& other) = delete;
+ Call& operator=(const Call&) = delete;
+ Call() = default;
+ Call(Call&&) = default;
+
+ PeerID peerID;
+ MethodID methodID;
+ MessageID messageID;
+ std::shared_ptr<void> data;
+ SerializeCallback serialize;
+ ParseCallback parse;
+ ResultHandler<void>::type process;
+ };
+
+ CallQueue();
+ ~CallQueue();
+
+ CallQueue(const CallQueue&) = delete;
+ CallQueue(CallQueue&&) = delete;
+ CallQueue& operator=(const CallQueue&) = delete;
+
+ template<typename SentDataType, typename ReceivedDataType>
+ MessageID push(const MethodID methodID,
+ const PeerID peerID,
+ const std::shared_ptr<SentDataType>& data,
+ const typename ResultHandler<ReceivedDataType>::type& process);
+
+
+ template<typename SentDataType>
+ MessageID push(const MethodID methodID,
+ const PeerID peerID,
+ const std::shared_ptr<SentDataType>& data);
+
+ Call pop();
+
+ bool isEmpty() const;
+
+private:
+ std::queue<Call> mCalls;
+ std::atomic<MessageID> mMessageIDCounter;
+
+ MessageID getNextMessageID();
+};
+
+
+template<typename SentDataType, typename ReceivedDataType>
+MessageID CallQueue::push(const MethodID methodID,
+ const PeerID peerID,
+ const std::shared_ptr<SentDataType>& data,
+ const typename ResultHandler<ReceivedDataType>::type& process)
+{
+ Call call;
+ call.methodID = methodID;
+ call.peerID = peerID;
+ call.data = data;
+
+ MessageID messageID = getNextMessageID();
+ call.messageID = messageID;
+
+ call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+ config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
+ };
+
+ call.parse = [](const int fd)->std::shared_ptr<void> {
+ std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
+ config::loadFromFD<ReceivedDataType>(fd, *data);
+ return data;
+ };
+
+ call.process = [process](Status status, std::shared_ptr<void>& data)->void {
+ std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
+ return process(status, tmpData);
+ };
+
+ mCalls.push(std::move(call));
+
+ return messageID;
+}
+
+template<typename SentDataType>
+MessageID CallQueue::push(const MethodID methodID,
+ const PeerID peerID,
+ const std::shared_ptr<SentDataType>& data)
+{
+ Call call;
+ call.methodID = methodID;
+ call.peerID = peerID;
+ call.data = data;
+
+ MessageID messageID = getNextMessageID();
+ call.messageID = messageID;
+
+ call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
+ config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
+ };
+
+ mCalls.push(std::move(call));
+
+ return messageID;
+}
+
+} // namespace ipc
+} // namespace security_containers
+
+#endif // COMMON_IPC_INTERNALS_CALL_QUEUE_HPP
: mNewPeerCallback(newPeerCallback),
mRemovedPeerCallback(removedPeerCallback),
mMaxNumberOfPeers(maxNumberOfPeers),
- mMessageIDCounter(0),
mPeerIDCounter(0)
{
LOGT("Creating Processor");
return true;
}
-MessageID Processor::getNextMessageID()
-{
- // TODO: This method of generating UIDs is buggy. To be changed.
- return ++mMessageIDCounter;
-}
-
PeerID Processor::getNextPeerID()
{
// TODO: This method of generating UIDs is buggy. To be changed.
return ++mPeerIDCounter;
}
-Processor::Call Processor::getCall()
+CallQueue::Call Processor::getCall()
{
Lock lock(mCallsMutex);
- if (mCalls.empty()) {
- LOGE("Calls queue empty");
- throw IPCException("Calls queue empty");
- }
- Call call = std::move(mCalls.front());
- mCalls.pop();
- return call;
+ return mCalls.pop();
}
bool Processor::onCall()
{
LOGT("Handle call (from another thread) to send a message.");
- Call call = getCall();
+ CallQueue::Call call = getCall();
std::shared_ptr<Socket> socketPtr;
try {
}
case Event::CALL: {
LOGD("Event CALL after FINISH");
- Call call = getCall();
+ CallQueue::Call call = getCall();
if (call.process) {
IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
}
#include "ipc/internals/socket.hpp"
#include "ipc/internals/event-queue.hpp"
+#include "ipc/internals/call-queue.hpp"
#include "ipc/exception.hpp"
#include "ipc/types.hpp"
#include "config/manager.hpp"
-#include "config/is-visitable.hpp"
#include "config/fields.hpp"
#include "logger/logger.hpp"
* TODO:
* - some mutexes may not be needed
* - synchronous call to many peers
-* - implement CallQueue class
* - implement HandlerStore class for storing both signals and methods
* - API for removing signals
* - implement CallbackStore - thread safe calling/setting callbacks
* - helper function for removing from unordered map
+* - new way to generate UIDs
+* - callbacks for serialization/parsing
*/
class Processor {
public:
)
};
- struct Call {
- Call(const Call& other) = delete;
- Call& operator=(const Call&) = delete;
- Call() = default;
- Call(Call&&) = default;
-
- PeerID peerID;
- MethodID methodID;
- std::shared_ptr<void> data;
- SerializeCallback serialize;
- ParseCallback parse;
- ResultHandler<void>::type process;
- MessageID messageID;
- };
-
struct MethodHandlers {
MethodHandlers(const MethodHandlers& other) = delete;
MethodHandlers& operator=(const MethodHandlers&) = delete;
// Mutex for the Calls queue and the map of methods.
std::mutex mCallsMutex;
- std::queue<Call> mCalls;
+ CallQueue mCalls;
std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
std::thread mThread;
std::vector<struct pollfd> mFDs;
- std::atomic<MessageID> mMessageIDCounter;
std::atomic<PeerID> mPeerIDCounter;
template<typename SentDataType, typename ReceivedDataType>
void addMethodHandlerInternal(const MethodID methodID,
const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
- template<typename SentDataType>
- MessageID callInternal(const MethodID methodID,
- const PeerID peerID,
- const std::shared_ptr<SentDataType>& data);
-
template<typename SentDataType, typename ReceivedDataType>
MessageID callInternal(const MethodID methodID,
const PeerID peerID,
const MessageID messageID,
std::shared_ptr<SignalHandlers> signalCallbacks);
void resetPolling();
- MessageID getNextMessageID();
PeerID getNextPeerID();
- Call getCall();
+ CallQueue::Call getCall();
void removePeerInternal(const PeerID peerID, Status status);
std::shared_ptr<EmptyData> onNewSignals(const PeerID peerID,
const std::shared_ptr<SentDataType>& data,
const typename ResultHandler<ReceivedDataType>::type& process)
{
- Call call;
- call.peerID = peerID;
- call.methodID = methodID;
- call.data = data;
- call.messageID = getNextMessageID();
-
- call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
- config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
- };
-
- call.parse = [](const int fd)->std::shared_ptr<void> {
- std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
- config::loadFromFD<ReceivedDataType>(fd, *data);
- return data;
- };
-
- call.process = [process](Status status, std::shared_ptr<void>& data)->void {
- std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
- return process(status, tmpData);
- };
+ Lock lock(mCallsMutex);
+ MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
+ mEventQueue.send(Event::CALL);
- {
- Lock lock(mCallsMutex);
- mCalls.push(std::move(call));
- mEventQueue.send(Event::CALL);
- }
-
- return call.messageID;
+ return messageID;
}
template<typename SentDataType, typename ReceivedDataType>
}
for (const PeerID peerID : peersIDs) {
- Call call;
- call.peerID = peerID;
- call.methodID = methodID;
- call.data = data;
- call.messageID = getNextMessageID();
-
- call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
- config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
- };
-
- {
- Lock lock(mCallsMutex);
- mCalls.push(std::move(call));
- mEventQueue.send(Event::CALL);
- }
+ Lock lock(mCallsMutex);
+ mCalls.push<SentDataType>(methodID, peerID, data);
+ mEventQueue.send(Event::CALL);
}
}
+
} // namespace ipc
} // namespace security_containers