IPC: NONBLOCK sockets
[platform/core/security/vasum.git] / common / ipc / internals / processor.hpp
1 /*
2 *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 *  Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 *  Licensed under the Apache License, Version 2.0 (the "License");
7 *  you may not use this file except in compliance with the License.
8 *  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *  Unless required by applicable law or agreed to in writing, software
13 *  distributed under the License is distributed on an "AS IS" BASIS,
14 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License
17 */
18
19 /**
20  * @file
21  * @author  Jan Olszak (j.olszak@samsung.com)
22  * @brief   Data and event processing thread
23  */
24
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
27
28 #include "ipc/internals/socket.hpp"
29 #include "ipc/internals/event-queue.hpp"
30 #include "ipc/exception.hpp"
31 #include "ipc/types.hpp"
32 #include "config/manager.hpp"
33 #include "config/is-visitable.hpp"
34 #include "logger/logger.hpp"
35
36 #include <poll.h>
37
38 #include <atomic>
39 #include <condition_variable>
40 #include <queue>
41 #include <mutex>
42 #include <chrono>
43 #include <vector>
44 #include <thread>
45 #include <string>
46 #include <functional>
47 #include <unordered_map>
48
49 namespace security_containers {
50 namespace ipc {
51 namespace {
52 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
53 }
54 /**
55 * This class wraps communication via UX sockets
56 *
57 * It's intended to be used both in Client and Service classes.
58 * It uses a serialization mechanism from libConfig.
59 * Library user will only have to pass the types that each call will send and receive
60 *
61 * Message format:
62 * - MethodID  - probably casted enum.
63 *               MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
64 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
65 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
66 *
67 * TODO:
68 *  - remove ReturnCallbacks on peer disconnect
69 *  - on sync timeout erase the return callback
70 *  - don't throw timeout if the message is already processed
71 *  - naming convention or methods that just commissions the PROCESS thread to do something
72 *  - removePeer API function
73 *  - error handling - special message type
74 *  - some mutexes may not be needed
75 *  - make addPeer synchronous like removePeer
76 */
77 class Processor {
78 public:
79     typedef std::function<void(int)> PeerCallback;
80     typedef unsigned int PeerID;
81     typedef unsigned int MethodID;
82     typedef unsigned int MessageID;
83
84
85     /**
86      * Method ID. Used to indicate a message with the return value.
87      */
88     static const MethodID RETURN_METHOD_ID;
89     /**
90      * Constructs the Processor, but doesn't start it.
91      * The object is ready to add methods.
92      *
93      * @param newPeerCallback called when a new peer arrives
94      * @param removedPeerCallback called when the Processor stops listening for this peer
95      */
96     Processor(const PeerCallback& newPeerCallback = nullptr,
97               const PeerCallback& removedPeerCallback = nullptr,
98               const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
99     ~Processor();
100
101     Processor(const Processor&) = delete;
102     Processor(Processor&&) = delete;
103     Processor& operator=(const Processor&) = delete;
104
105     /**
106      * Start the processing thread.
107      * Quits immediately after starting the thread.
108      */
109     void start();
110
111     /**
112      * Stops the processing thread.
113      * No incoming data will be handled after.
114      */
115     void stop();
116
117     /**
118      * From now on socket is owned by the Processor object.
119      * Calls the newPeerCallback.
120      *
121      * @param socketPtr pointer to the new socket
122      * @return peerID of the new socket
123      */
124     PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
125
126     /**
127      * Request removing peer and wait
128      *
129      * @param peerID id of the peer
130      */
131     void removePeer(const PeerID peerID);
132
133     /**
134      * Saves the callbacks connected to the method id.
135      * When a message with the given method id is received,
136      * the data will be passed to the serialization callback through file descriptor.
137      *
138      * Then the process callback will be called with the parsed data.
139      *
140      * @param methodID API dependent id of the method
141      * @param process data processing callback
142      * @tparam SentDataType data type to send
143      * @tparam ReceivedDataType data type to receive
144      */
145     template<typename SentDataType, typename ReceivedDataType>
146     void addMethodHandler(const MethodID methodID,
147                           const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
148
149     /**
150      * Removes the callback
151      *
152      * @param methodID API dependent id of the method
153      */
154     void removeMethod(const MethodID methodID);
155
156     /**
157      * Synchronous method call.
158      *
159      * @param methodID API dependent id of the method
160      * @param peerID id of the peer
161      * @param data data to sent
162      * @param timeoutMS how long to wait for the return value before throw
163      * @tparam SentDataType data type to send
164      * @tparam ReceivedDataType data type to receive
165      */
166     template<typename SentDataType, typename ReceivedDataType>
167     std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
168                                                const PeerID peerID,
169                                                const std::shared_ptr<SentDataType>& data,
170                                                unsigned int timeoutMS = 500);
171
172     /**
173      * Asynchronous method call
174      *
175      * @param methodID API dependent id of the method
176      * @param peerID id of the peer
177      * @param data data to sent
178      * @param process callback processing the return data
179      * @tparam SentDataType data type to send
180      * @tparam ReceivedDataType data type to receive
181      */
182     template<typename SentDataType, typename ReceivedDataType>
183     MessageID callAsync(const MethodID methodID,
184                         const PeerID peerID,
185                         const std::shared_ptr<SentDataType>& data,
186                         const typename ResultHandler<ReceivedDataType>::type& process);
187
188
189 private:
190     typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
191     typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
192     typedef std::lock_guard<std::mutex> Lock;
193
194     struct Call {
195         Call(const Call& other) = delete;
196         Call& operator=(const Call&) = delete;
197         Call() = default;
198         Call(Call&&) = default;
199
200         PeerID peerID;
201         MethodID methodID;
202         std::shared_ptr<void> data;
203         SerializeCallback serialize;
204         ParseCallback parse;
205         ResultHandler<void>::type process;
206         MessageID messageID;
207     };
208
209     struct MethodHandlers {
210         MethodHandlers(const MethodHandlers& other) = delete;
211         MethodHandlers& operator=(const MethodHandlers&) = delete;
212         MethodHandlers() = default;
213         MethodHandlers(MethodHandlers&&) = default;
214         MethodHandlers& operator=(MethodHandlers &&) = default;
215
216         SerializeCallback serialize;
217         ParseCallback parse;
218         MethodHandler<void, void>::type method;
219     };
220
221     struct ReturnCallbacks {
222         ReturnCallbacks(const ReturnCallbacks& other) = delete;
223         ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
224         ReturnCallbacks() = default;
225         ReturnCallbacks(ReturnCallbacks&&) = default;
226         ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
227
228         ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultHandler<void>::type& process)
229             : peerID(peerID), parse(parse), process(process) {}
230
231         PeerID peerID;
232         ParseCallback parse;
233         ResultHandler<void>::type process;
234     };
235
236     struct SocketInfo {
237         SocketInfo(const SocketInfo& other) = delete;
238         SocketInfo& operator=(const SocketInfo&) = delete;
239         SocketInfo() = default;
240         SocketInfo(SocketInfo&&) = default;
241         SocketInfo& operator=(SocketInfo &&) = default;
242
243         SocketInfo(const PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
244             : peerID(peerID), socketPtr(socketPtr) {}
245
246         PeerID peerID;
247         std::shared_ptr<Socket> socketPtr;
248     };
249
250     struct RemovePeerRequest {
251         RemovePeerRequest(const RemovePeerRequest& other) = delete;
252         RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
253         RemovePeerRequest() = default;
254         RemovePeerRequest(RemovePeerRequest&&) = default;
255         RemovePeerRequest& operator=(RemovePeerRequest &&) = default;
256
257         RemovePeerRequest(const PeerID peerID,
258                           const std::shared_ptr<std::condition_variable>& conditionPtr)
259             : peerID(peerID), conditionPtr(conditionPtr) {}
260
261         PeerID peerID;
262         std::shared_ptr<std::condition_variable> conditionPtr;
263     };
264
265     enum class Event : int {
266         FINISH,     // Shutdown request
267         CALL,       // New method call in the queue
268         NEW_PEER,   // New peer in the queue
269         DELETE_PEER // Delete peer
270     };
271     EventQueue<Event> mEventQueue;
272
273
274     bool mIsRunning;
275
276     // Mutex for the Calls queue and the map of methods.
277     std::mutex mCallsMutex;
278     std::queue<Call> mCalls;
279     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
280
281     // Mutex for changing mSockets map.
282     // Shouldn't be locked on any read/write, that could block. Just copy the ptr.
283     std::mutex mSocketsMutex;
284     std::unordered_map<PeerID, std::shared_ptr<Socket> > mSockets;
285     std::queue<SocketInfo> mNewSockets;
286     std::queue<RemovePeerRequest> mPeersToDelete;
287
288     // Mutex for modifying the map with return callbacks
289     std::mutex mReturnCallbacksMutex;
290     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
291
292
293     PeerCallback mNewPeerCallback;
294     PeerCallback mRemovedPeerCallback;
295
296     unsigned int mMaxNumberOfPeers;
297
298     std::thread mThread;
299     std::vector<struct pollfd> mFDs;
300
301     std::atomic<MessageID> mMessageIDCounter;
302     std::atomic<PeerID> mPeerIDCounter;
303
304     void run();
305     bool handleEvent();
306     bool handleCall();
307     bool handleLostConnections();
308     bool handleInputs();
309     bool handleInput(const PeerID peerID, const Socket& socket);
310     bool onReturnValue(const PeerID peerID,
311                        const Socket& socket,
312                        const MessageID messageID);
313     bool onRemoteCall(const PeerID peerID,
314                       const Socket& socket,
315                       const MethodID methodID,
316                       const MessageID messageID);
317     void resetPolling();
318     MessageID getNextMessageID();
319     PeerID getNextPeerID();
320     Call getCall();
321     void removePeerInternal(const PeerID peerID, Status status);
322     void cleanCommunication();
323 };
324
325 template<typename SentDataType, typename ReceivedDataType>
326 void Processor::addMethodHandler(const MethodID methodID,
327                                  const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
328 {
329     static_assert(config::isVisitable<SentDataType>::value,
330                   "Use the libConfig library");
331     static_assert(config::isVisitable<ReceivedDataType>::value,
332                   "Use the libConfig library");
333
334     if (methodID == RETURN_METHOD_ID) {
335         LOGE("Forbidden methodID: " << methodID);
336         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
337     }
338
339     using namespace std::placeholders;
340
341     MethodHandlers methodCall;
342
343     methodCall.parse = [](const int fd)->std::shared_ptr<void> {
344         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
345         config::loadFromFD<ReceivedDataType>(fd, *data);
346         return data;
347     };
348
349     methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
350         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
351     };
352
353     methodCall.method = [method](std::shared_ptr<void>& data)->std::shared_ptr<void> {
354         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
355         return method(tmpData);
356     };
357
358     {
359         Lock lock(mCallsMutex);
360         mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
361     }
362 }
363
364 template<typename SentDataType, typename ReceivedDataType>
365 Processor::MessageID Processor::callAsync(const MethodID methodID,
366                                           const PeerID peerID,
367                                           const std::shared_ptr<SentDataType>& data,
368                                           const typename ResultHandler<ReceivedDataType>::type& process)
369 {
370     static_assert(config::isVisitable<SentDataType>::value,
371                   "Use the libConfig library");
372     static_assert(config::isVisitable<ReceivedDataType>::value,
373                   "Use the libConfig library");
374
375     if (!mThread.joinable()) {
376         LOGE("The Processor thread is not started. Can't send any data.");
377         throw IPCException("The Processor thread is not started. Can't send any data.");
378     }
379
380     using namespace std::placeholders;
381
382     Call call;
383     call.peerID = peerID;
384     call.methodID = methodID;
385     call.data = data;
386     call.messageID = getNextMessageID();
387
388     call.parse = [](const int fd)->std::shared_ptr<void> {
389         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
390         config::loadFromFD<ReceivedDataType>(fd, *data);
391         return data;
392     };
393
394     call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
395         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
396     };
397
398     call.process = [process](Status status, std::shared_ptr<void>& data)->void {
399         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
400         return process(status, tmpData);
401     };
402
403     {
404         Lock lock(mCallsMutex);
405         mCalls.push(std::move(call));
406     }
407
408     mEventQueue.send(Event::CALL);
409
410     return call.messageID;
411 }
412
413
414 template<typename SentDataType, typename ReceivedDataType>
415 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
416                                                       const PeerID peerID,
417                                                       const std::shared_ptr<SentDataType>& data,
418                                                       unsigned int timeoutMS)
419 {
420     static_assert(config::isVisitable<SentDataType>::value,
421                   "Use the libConfig library");
422     static_assert(config::isVisitable<ReceivedDataType>::value,
423                   "Use the libConfig library");
424
425     if (!mThread.joinable()) {
426         LOGE("The Processor thread is not started. Can't send any data.");
427         throw IPCException("The Processor thread is not started. Can't send any data.");
428     }
429
430     std::shared_ptr<ReceivedDataType> result;
431
432     std::mutex mutex;
433     std::condition_variable cv;
434     Status returnStatus = ipc::Status::UNDEFINED;
435
436     auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
437         std::unique_lock<std::mutex> lock(mutex);
438         returnStatus = status;
439         result = returnedData;
440         cv.notify_all();
441     };
442
443     MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
444                                                                     peerID,
445                                                                     data,
446                                                                     process);
447
448     auto isResultInitialized = [&returnStatus]() {
449         return returnStatus != ipc::Status::UNDEFINED;
450     };
451
452     std::unique_lock<std::mutex> lock(mutex);
453     if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
454         bool isTimeout = false;
455         {
456             Lock lock(mReturnCallbacksMutex);
457             if (1 == mReturnCallbacks.erase(messageID)) {
458                 isTimeout = true;
459             }
460         }
461         if (isTimeout) {
462             removePeer(peerID);
463             LOGE("Function call timeout; methodID: " << methodID);
464             throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
465         } else {
466             // Timeout started during the return value processing, so wait for it to finish
467             cv.wait(lock, isResultInitialized);
468         }
469     }
470
471     throwOnError(returnStatus);
472
473     return result;
474 }
475
476
477 } // namespace ipc
478 } // namespace security_containers
479
480 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP