2 * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
4 * Contact: Jan Olszak <j.olszak@samsung.com>
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License
21 * @author Jan Olszak (j.olszak@samsung.com)
22 * @brief Data and event processing thread
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
28 #include "ipc/internals/socket.hpp"
29 #include "ipc/internals/event-queue.hpp"
30 #include "ipc/exception.hpp"
31 #include "ipc/types.hpp"
32 #include "config/manager.hpp"
33 #include "config/is-visitable.hpp"
34 #include "logger/logger.hpp"
39 #include <condition_variable>
47 #include <unordered_map>
49 namespace security_containers {
52 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
55 * This class wraps communication via UX sockets
57 * It's intended to be used both in Client and Service classes.
58 * It uses a serialization mechanism from libConfig.
59 * Library user will only have to pass the types that each call will send and receive
62 * - MethodID - probably casted enum.
63 * MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
64 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
65 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
68 * - error codes passed to async callbacks
69 * - remove ReturnCallbacks on peer disconnect
70 * - on sync timeout erase the return callback
71 * - don't throw timeout if the message is already processed
72 * - naming convention or methods that just commissions the PROCESS thread to do something
73 * - removePeer API function
77 typedef std::function<void(int)> PeerCallback;
78 typedef unsigned int PeerID;
79 typedef unsigned int MethodID;
82 * Method ID. Used to indicate a message with the return value.
84 static const MethodID RETURN_METHOD_ID;
86 * Constructs the Processor, but doesn't start it.
87 * The object is ready to add methods.
89 * @param newPeerCallback called when a new peer arrives
90 * @param removedPeerCallback called when the Processor stops listening for this peer
92 Processor(const PeerCallback& newPeerCallback = nullptr,
93 const PeerCallback& removedPeerCallback = nullptr,
94 const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
97 Processor(const Processor&) = delete;
98 Processor(Processor&&) = delete;
99 Processor& operator=(const Processor&) = delete;
102 * Start the processing thread.
103 * Quits immediately after starting the thread.
108 * Stops the processing thread.
109 * No incoming data will be handled after.
114 * From now on socket is owned by the Processor object.
115 * Calls the newPeerCallback.
117 * @param socketPtr pointer to the new socket
118 * @return peerID of the new socket
120 PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
123 * Saves the callbacks connected to the method id.
124 * When a message with the given method id is received,
125 * the data will be passed to the serialization callback through file descriptor.
127 * Then the process callback will be called with the parsed data.
129 * @param methodID API dependent id of the method
130 * @param process data processing callback
131 * @tparam SentDataType data type to send
132 * @tparam ReceivedDataType data type to receive
134 template<typename SentDataType, typename ReceivedDataType>
135 void addMethodHandler(const MethodID methodID,
136 const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
139 * Removes the callback
141 * @param methodID API dependent id of the method
143 void removeMethod(const MethodID methodID);
146 * Synchronous method call.
148 * @param methodID API dependent id of the method
149 * @param peerID id of the peer
150 * @param data data to sent
151 * @param timeoutMS how long to wait for the return value before throw
152 * @tparam SentDataType data type to send
153 * @tparam ReceivedDataType data type to receive
155 template<typename SentDataType, typename ReceivedDataType>
156 std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
158 const std::shared_ptr<SentDataType>& data,
159 unsigned int timeoutMS = 500);
162 * Asynchronous method call
164 * @param methodID API dependent id of the method
165 * @param peerID id of the peer
166 * @param data data to sent
167 * @param process callback processing the return data
168 * @tparam SentDataType data type to send
169 * @tparam ReceivedDataType data type to receive
171 template<typename SentDataType, typename ReceivedDataType>
172 void callAsync(const MethodID methodID,
174 const std::shared_ptr<SentDataType>& data,
175 const typename ResultHandler<ReceivedDataType>::type& process);
179 typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
180 typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
181 typedef std::lock_guard<std::mutex> Lock;
182 typedef unsigned int MessageID;
185 Call(const Call& other) = delete;
186 Call& operator=(const Call&) = delete;
188 Call(Call&&) = default;
192 std::shared_ptr<void> data;
193 SerializeCallback serialize;
195 ResultHandler<void>::type process;
198 struct MethodHandlers {
199 MethodHandlers(const MethodHandlers& other) = delete;
200 MethodHandlers& operator=(const MethodHandlers&) = delete;
201 MethodHandlers() = default;
202 MethodHandlers(MethodHandlers&&) = default;
203 MethodHandlers& operator=(MethodHandlers &&) = default;
205 SerializeCallback serialize;
207 MethodHandler<void, void>::type method;
210 struct ReturnCallbacks {
211 ReturnCallbacks(const ReturnCallbacks& other) = delete;
212 ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
213 ReturnCallbacks() = default;
214 ReturnCallbacks(ReturnCallbacks&&) = default;
215 ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
218 ResultHandler<void>::type process;
222 SocketInfo(const SocketInfo& other) = delete;
223 SocketInfo& operator=(const SocketInfo&) = delete;
224 SocketInfo() = default;
225 SocketInfo(SocketInfo&&) = default;
226 SocketInfo& operator=(SocketInfo &&) = default;
228 std::shared_ptr<Socket> socketPtr;
232 enum class Event : int {
233 FINISH, // Shutdown request
234 CALL, // New method call in the queue
235 NEW_PEER // New peer in the queue
237 EventQueue<Event> mEventQueue;
242 // Mutex for the Calls queue and the map of methods.
243 std::mutex mCallsMutex;
244 std::queue<Call> mCalls;
245 std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
247 // Mutex for changing mSockets map.
248 // Shouldn't be locked on any read/write, that could block. Just copy the ptr.
249 std::mutex mSocketsMutex;
250 std::unordered_map<PeerID, std::shared_ptr<Socket> > mSockets;
251 std::queue<SocketInfo> mNewSockets;
253 // Mutex for modifying the map with return callbacks
254 std::mutex mReturnCallbacksMutex;
255 std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
258 PeerCallback mNewPeerCallback;
259 PeerCallback mRemovedPeerCallback;
261 unsigned int mMaxNumberOfPeers;
264 std::vector<struct pollfd> mFDs;
266 std::atomic<MessageID> mMessageIDCounter;
267 std::atomic<PeerID> mPeerIDCounter;
272 bool handleLostConnections();
274 bool handleInput(const PeerID peerID, const Socket& socket);
276 MessageID getNextMessageID();
277 PeerID getNextPeerID();
279 void removePeer(PeerID peerID);
283 template<typename SentDataType, typename ReceivedDataType>
284 void Processor::addMethodHandler(const MethodID methodID,
285 const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
287 static_assert(config::isVisitable<SentDataType>::value,
288 "Use the libConfig library");
289 static_assert(config::isVisitable<ReceivedDataType>::value,
290 "Use the libConfig library");
292 if (methodID == RETURN_METHOD_ID) {
293 LOGE("Forbidden methodID: " << methodID);
294 throw IPCException("Forbidden methodID: " + std::to_string(methodID));
297 using namespace std::placeholders;
299 MethodHandlers methodCall;
301 methodCall.parse = [](const int fd)->std::shared_ptr<void> {
302 std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
303 config::loadFromFD<ReceivedDataType>(fd, *data);
307 methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
308 config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
311 methodCall.method = [method](std::shared_ptr<void>& data)->std::shared_ptr<void> {
312 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
313 return method(tmpData);
317 Lock lock(mCallsMutex);
318 mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
322 template<typename SentDataType, typename ReceivedDataType>
323 void Processor::callAsync(const MethodID methodID,
325 const std::shared_ptr<SentDataType>& data,
326 const typename ResultHandler<ReceivedDataType>::type& process)
328 static_assert(config::isVisitable<SentDataType>::value,
329 "Use the libConfig library");
330 static_assert(config::isVisitable<ReceivedDataType>::value,
331 "Use the libConfig library");
333 if (!mThread.joinable()) {
334 LOGE("The Processor thread is not started. Can't send any data.");
335 throw IPCException("The Processor thread is not started. Can't send any data.");
338 using namespace std::placeholders;
341 call.peerID = peerID;
342 call.methodID = methodID;
345 call.parse = [](const int fd)->std::shared_ptr<void> {
346 std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
347 config::loadFromFD<ReceivedDataType>(fd, *data);
351 call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
352 config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
355 call.process = [process](std::shared_ptr<void>& data)->void {
356 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
357 return process(tmpData);
361 Lock lock(mCallsMutex);
362 mCalls.push(std::move(call));
365 mEventQueue.send(Event::CALL);
369 template<typename SentDataType, typename ReceivedDataType>
370 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
372 const std::shared_ptr<SentDataType>& data,
373 unsigned int timeoutMS)
375 static_assert(config::isVisitable<SentDataType>::value,
376 "Use the libConfig library");
377 static_assert(config::isVisitable<ReceivedDataType>::value,
378 "Use the libConfig library");
380 if (!mThread.joinable()) {
381 LOGE("The Processor thread is not started. Can't send any data.");
382 throw IPCException("The Processor thread is not started. Can't send any data.");
385 std::shared_ptr<ReceivedDataType> result;
388 std::unique_lock<std::mutex> lck(mtx);
389 std::condition_variable cv;
391 auto process = [&result, &cv](std::shared_ptr<ReceivedDataType> returnedData) {
392 result = returnedData;
396 callAsync<SentDataType,
397 ReceivedDataType>(methodID,
402 auto isResultInitialized = [&result]() {
403 return static_cast<bool>(result);
406 if (!cv.wait_for(lck, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
407 LOGE("Function call timeout; methodID: " << methodID);
408 throw IPCException("Function call timeout; methodID: " + std::to_string(methodID));
416 } // namespace security_containers
418 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP