2 * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
4 * Contact: Jan Olszak <j.olszak@samsung.com>
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License
21 * @author Jan Olszak (j.olszak@samsung.com)
22 * @brief Data and event processing thread
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
28 #include "ipc/internals/socket.hpp"
29 #include "ipc/internals/event-queue.hpp"
30 #include "ipc/exception.hpp"
31 #include "ipc/types.hpp"
32 #include "config/manager.hpp"
33 #include "config/is-visitable.hpp"
34 #include "logger/logger.hpp"
39 #include <condition_variable>
47 #include <unordered_map>
49 namespace security_containers {
52 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
55 * This class wraps communication via UX sockets
57 * It's intended to be used both in Client and Service classes.
58 * It uses a serialization mechanism from libConfig.
59 * Library user will only have to pass the types that each call will send and receive
62 * - MethodID - probably casted enum.
63 * MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
64 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
65 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
68 * - remove ReturnCallbacks on peer disconnect
69 * - on sync timeout erase the return callback
70 * - don't throw timeout if the message is already processed
71 * - naming convention or methods that just commissions the PROCESS thread to do something
72 * - removePeer API function
73 * - error handling - special message type
74 * - some mutexes may not be needed
75 * - make addPeer synchronous like removePeer
79 typedef std::function<void(int)> PeerCallback;
80 typedef unsigned int PeerID;
81 typedef unsigned int MethodID;
82 typedef unsigned int MessageID;
86 * Method ID. Used to indicate a message with the return value.
88 static const MethodID RETURN_METHOD_ID;
90 * Constructs the Processor, but doesn't start it.
91 * The object is ready to add methods.
93 * @param newPeerCallback called when a new peer arrives
94 * @param removedPeerCallback called when the Processor stops listening for this peer
96 Processor(const PeerCallback& newPeerCallback = nullptr,
97 const PeerCallback& removedPeerCallback = nullptr,
98 const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
101 Processor(const Processor&) = delete;
102 Processor(Processor&&) = delete;
103 Processor& operator=(const Processor&) = delete;
106 * Start the processing thread.
107 * Quits immediately after starting the thread.
112 * Stops the processing thread.
113 * No incoming data will be handled after.
118 * From now on socket is owned by the Processor object.
119 * Calls the newPeerCallback.
121 * @param socketPtr pointer to the new socket
122 * @return peerID of the new socket
124 PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
127 * Request removing peer and wait
129 * @param peerID id of the peer
131 void removePeer(const PeerID peerID);
134 * Saves the callbacks connected to the method id.
135 * When a message with the given method id is received,
136 * the data will be passed to the serialization callback through file descriptor.
138 * Then the process callback will be called with the parsed data.
140 * @param methodID API dependent id of the method
141 * @param process data processing callback
142 * @tparam SentDataType data type to send
143 * @tparam ReceivedDataType data type to receive
145 template<typename SentDataType, typename ReceivedDataType>
146 void addMethodHandler(const MethodID methodID,
147 const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
150 * Removes the callback
152 * @param methodID API dependent id of the method
154 void removeMethod(const MethodID methodID);
157 * Synchronous method call.
159 * @param methodID API dependent id of the method
160 * @param peerID id of the peer
161 * @param data data to sent
162 * @param timeoutMS how long to wait for the return value before throw
163 * @tparam SentDataType data type to send
164 * @tparam ReceivedDataType data type to receive
166 template<typename SentDataType, typename ReceivedDataType>
167 std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
169 const std::shared_ptr<SentDataType>& data,
170 unsigned int timeoutMS = 500);
173 * Asynchronous method call
175 * @param methodID API dependent id of the method
176 * @param peerID id of the peer
177 * @param data data to sent
178 * @param process callback processing the return data
179 * @tparam SentDataType data type to send
180 * @tparam ReceivedDataType data type to receive
182 template<typename SentDataType, typename ReceivedDataType>
183 MessageID callAsync(const MethodID methodID,
185 const std::shared_ptr<SentDataType>& data,
186 const typename ResultHandler<ReceivedDataType>::type& process);
190 typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
191 typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
192 typedef std::lock_guard<std::mutex> Lock;
195 Call(const Call& other) = delete;
196 Call& operator=(const Call&) = delete;
198 Call(Call&&) = default;
202 std::shared_ptr<void> data;
203 SerializeCallback serialize;
205 ResultHandler<void>::type process;
209 struct MethodHandlers {
210 MethodHandlers(const MethodHandlers& other) = delete;
211 MethodHandlers& operator=(const MethodHandlers&) = delete;
212 MethodHandlers() = default;
213 MethodHandlers(MethodHandlers&&) = default;
214 MethodHandlers& operator=(MethodHandlers &&) = default;
216 SerializeCallback serialize;
218 MethodHandler<void, void>::type method;
221 struct ReturnCallbacks {
222 ReturnCallbacks(const ReturnCallbacks& other) = delete;
223 ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
224 ReturnCallbacks() = default;
225 ReturnCallbacks(ReturnCallbacks&&) = default;
226 ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
228 ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultHandler<void>::type& process)
229 : peerID(peerID), parse(parse), process(process) {}
233 ResultHandler<void>::type process;
237 SocketInfo(const SocketInfo& other) = delete;
238 SocketInfo& operator=(const SocketInfo&) = delete;
239 SocketInfo() = default;
240 SocketInfo(SocketInfo&&) = default;
241 SocketInfo& operator=(SocketInfo &&) = default;
243 SocketInfo(const PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
244 : peerID(peerID), socketPtr(socketPtr) {}
247 std::shared_ptr<Socket> socketPtr;
250 struct RemovePeerRequest {
251 RemovePeerRequest(const RemovePeerRequest& other) = delete;
252 RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
253 RemovePeerRequest() = default;
254 RemovePeerRequest(RemovePeerRequest&&) = default;
255 RemovePeerRequest& operator=(RemovePeerRequest &&) = default;
257 RemovePeerRequest(const PeerID peerID,
258 const std::shared_ptr<std::condition_variable>& conditionPtr)
259 : peerID(peerID), conditionPtr(conditionPtr) {}
262 std::shared_ptr<std::condition_variable> conditionPtr;
265 enum class Event : int {
266 FINISH, // Shutdown request
267 CALL, // New method call in the queue
268 NEW_PEER, // New peer in the queue
269 DELETE_PEER // Delete peer
271 EventQueue<Event> mEventQueue;
276 // Mutex for the Calls queue and the map of methods.
277 std::mutex mCallsMutex;
278 std::queue<Call> mCalls;
279 std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
281 // Mutex for changing mSockets map.
282 // Shouldn't be locked on any read/write, that could block. Just copy the ptr.
283 std::mutex mSocketsMutex;
284 std::unordered_map<PeerID, std::shared_ptr<Socket> > mSockets;
285 std::queue<SocketInfo> mNewSockets;
286 std::queue<RemovePeerRequest> mPeersToDelete;
288 // Mutex for modifying the map with return callbacks
289 std::mutex mReturnCallbacksMutex;
290 std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
293 PeerCallback mNewPeerCallback;
294 PeerCallback mRemovedPeerCallback;
296 unsigned int mMaxNumberOfPeers;
299 std::vector<struct pollfd> mFDs;
301 std::atomic<MessageID> mMessageIDCounter;
302 std::atomic<PeerID> mPeerIDCounter;
307 bool handleLostConnections();
309 bool handleInput(const PeerID peerID, const Socket& socket);
310 bool onReturnValue(const PeerID peerID,
311 const Socket& socket,
312 const MessageID messageID);
313 bool onRemoteCall(const PeerID peerID,
314 const Socket& socket,
315 const MethodID methodID,
316 const MessageID messageID);
318 MessageID getNextMessageID();
319 PeerID getNextPeerID();
321 void removePeerInternal(const PeerID peerID, Status status);
322 void cleanCommunication();
325 template<typename SentDataType, typename ReceivedDataType>
326 void Processor::addMethodHandler(const MethodID methodID,
327 const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
329 static_assert(config::isVisitable<SentDataType>::value,
330 "Use the libConfig library");
331 static_assert(config::isVisitable<ReceivedDataType>::value,
332 "Use the libConfig library");
334 if (methodID == RETURN_METHOD_ID) {
335 LOGE("Forbidden methodID: " << methodID);
336 throw IPCException("Forbidden methodID: " + std::to_string(methodID));
339 using namespace std::placeholders;
341 MethodHandlers methodCall;
343 methodCall.parse = [](const int fd)->std::shared_ptr<void> {
344 std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
345 config::loadFromFD<ReceivedDataType>(fd, *data);
349 methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
350 config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
353 methodCall.method = [method](std::shared_ptr<void>& data)->std::shared_ptr<void> {
354 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
355 return method(tmpData);
359 Lock lock(mCallsMutex);
360 mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
364 template<typename SentDataType, typename ReceivedDataType>
365 Processor::MessageID Processor::callAsync(const MethodID methodID,
367 const std::shared_ptr<SentDataType>& data,
368 const typename ResultHandler<ReceivedDataType>::type& process)
370 static_assert(config::isVisitable<SentDataType>::value,
371 "Use the libConfig library");
372 static_assert(config::isVisitable<ReceivedDataType>::value,
373 "Use the libConfig library");
375 if (!mThread.joinable()) {
376 LOGE("The Processor thread is not started. Can't send any data.");
377 throw IPCException("The Processor thread is not started. Can't send any data.");
380 using namespace std::placeholders;
383 call.peerID = peerID;
384 call.methodID = methodID;
386 call.messageID = getNextMessageID();
388 call.parse = [](const int fd)->std::shared_ptr<void> {
389 std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
390 config::loadFromFD<ReceivedDataType>(fd, *data);
394 call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
395 config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
398 call.process = [process](Status status, std::shared_ptr<void>& data)->void {
399 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
400 return process(status, tmpData);
404 Lock lock(mCallsMutex);
405 mCalls.push(std::move(call));
408 mEventQueue.send(Event::CALL);
410 return call.messageID;
414 template<typename SentDataType, typename ReceivedDataType>
415 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
417 const std::shared_ptr<SentDataType>& data,
418 unsigned int timeoutMS)
420 static_assert(config::isVisitable<SentDataType>::value,
421 "Use the libConfig library");
422 static_assert(config::isVisitable<ReceivedDataType>::value,
423 "Use the libConfig library");
425 if (!mThread.joinable()) {
426 LOGE("The Processor thread is not started. Can't send any data.");
427 throw IPCException("The Processor thread is not started. Can't send any data.");
430 std::shared_ptr<ReceivedDataType> result;
433 std::condition_variable cv;
434 Status returnStatus = ipc::Status::UNDEFINED;
436 auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
437 std::unique_lock<std::mutex> lock(mutex);
438 returnStatus = status;
439 result = returnedData;
443 MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
448 auto isResultInitialized = [&returnStatus]() {
449 return returnStatus != ipc::Status::UNDEFINED;
452 std::unique_lock<std::mutex> lock(mutex);
453 if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
454 bool isTimeout = false;
456 Lock lock(mReturnCallbacksMutex);
457 if (1 == mReturnCallbacks.erase(messageID)) {
463 LOGE("Function call timeout; methodID: " << methodID);
464 throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
466 // Timeout started during the return value processing, so wait for it to finish
467 cv.wait(lock, isResultInitialized);
471 throwOnError(returnStatus);
478 } // namespace security_containers
480 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP