IPC: External polling loop with a Client
[platform/core/security/vasum.git] / common / ipc / internals / processor.hpp
1 /*
2 *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 *  Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 *  Licensed under the Apache License, Version 2.0 (the "License");
7 *  you may not use this file except in compliance with the License.
8 *  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *  Unless required by applicable law or agreed to in writing, software
13 *  distributed under the License is distributed on an "AS IS" BASIS,
14 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License
17 */
18
19 /**
20  * @file
21  * @author  Jan Olszak (j.olszak@samsung.com)
22  * @brief   Data and event processing thread
23  */
24
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
27
28 #include "ipc/internals/socket.hpp"
29 #include "ipc/internals/event-queue.hpp"
30 #include "ipc/internals/call-queue.hpp"
31 #include "ipc/exception.hpp"
32 #include "ipc/types.hpp"
33 #include "config/manager.hpp"
34 #include "config/fields.hpp"
35 #include "logger/logger.hpp"
36
37 #include <poll.h>
38 #include <condition_variable>
39 #include <queue>
40 #include <mutex>
41 #include <chrono>
42 #include <vector>
43 #include <thread>
44 #include <string>
45 #include <list>
46 #include <functional>
47 #include <unordered_map>
48
49 namespace vasum {
50 namespace ipc {
51
52 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
53 const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
54
55 /**
56 * This class wraps communication via UX sockets
57 *
58 * It's intended to be used both in Client and Service classes.
59 * It uses a serialization mechanism from libConfig.
60 * Library user will only have to pass the types that each call will send and receive
61 *
62 * Message format:
63 * - MethodID  - probably casted enum.
64 *               MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
65 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
66 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
67 *
68 * TODO:
69 *  - some mutexes may not be needed
70 *  - synchronous call to many peers
71 *  - implement HandlerStore class for storing both signals and methods
72 *  - API for removing signals
73 *  - implement CallbackStore - thread safe calling/setting callbacks
74 *  - helper function for removing from unordered map
75 *  - new way to generate UIDs
76 *  - callbacks for serialization/parsing
77 *  - store Sockets in a vector, maybe SocketStore?
78 *  - fix valgrind tests
79 *  - poll loop outside.
80 *  - waiting till the EventQueue is empty before leaving stop()
81 *  - no new events added after stop() called
82 *  - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
83 *    there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
84 *
85 *
86 */
87 class Processor {
88 public:
89     /**
90      * Used to indicate a message with the return value.
91      */
92     static const MethodID RETURN_METHOD_ID;
93
94     /**
95      * Indicates an Processor's internal request/broadcast to register a Signal
96      */
97     static const MethodID REGISTER_SIGNAL_METHOD_ID;
98
99     /**
100      * Constructs the Processor, but doesn't start it.
101      * The object is ready to add methods.
102      *
103      * @param newPeerCallback called when a new peer arrives
104      * @param removedPeerCallback called when the Processor stops listening for this peer
105      */
106     Processor(const PeerCallback& newPeerCallback = nullptr,
107               const PeerCallback& removedPeerCallback = nullptr,
108               const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
109     ~Processor();
110
111     Processor(const Processor&) = delete;
112     Processor(Processor&&) = delete;
113     Processor& operator=(const Processor&) = delete;
114
115     /**
116      * Start the processing thread.
117      * Quits immediately after starting the thread.
118      */
119     void start();
120
121     /**
122      * @return is processor running
123      */
124     bool isStarted();
125
126     /**
127      * Stops the processing thread.
128      * No incoming data will be handled after.
129      */
130     void stop();
131
132     /**
133      * Set the callback called for each new connection to a peer
134      *
135      * @param newPeerCallback the callback
136      */
137     void setNewPeerCallback(const PeerCallback& newPeerCallback);
138
139     /**
140      * Set the callback called when connection to a peer is lost
141      *
142      * @param removedPeerCallback the callback
143      */
144     void setRemovedPeerCallback(const PeerCallback& removedPeerCallback);
145
146     /**
147      * From now on socket is owned by the Processor object.
148      * Calls the newPeerCallback.
149      *
150      * @param socketPtr pointer to the new socket
151      * @return peerFD of the new socket
152      */
153     FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
154
155     /**
156      * Request removing peer and wait
157      *
158      * @param peerFD id of the peer
159      */
160     void removePeer(const FileDescriptor peerFD);
161
162     /**
163      * Saves the callbacks connected to the method id.
164      * When a message with the given method id is received,
165      * the data will be passed to the serialization callback through file descriptor.
166      *
167      * Then the process callback will be called with the parsed data.
168      *
169      * @param methodID API dependent id of the method
170      * @param process data processing callback
171      * @tparam SentDataType data type to send
172      * @tparam ReceivedDataType data type to receive
173      */
174     template<typename SentDataType, typename ReceivedDataType>
175     void addMethodHandler(const MethodID methodID,
176                           const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
177
178     /**
179      * Saves the callbacks connected to the method id.
180      * When a message with the given method id is received,
181      * the data will be passed to the serialization callback through file descriptor.
182      *
183      * Then the process callback will be called with the parsed data.
184      * There is no return data to send back.
185      *
186      * Adding signal sends a registering message to all peers
187      *
188      * @param methodID API dependent id of the method
189      * @param process data processing callback
190      * @tparam ReceivedDataType data type to receive
191      */
192     template<typename ReceivedDataType>
193     void addSignalHandler(const MethodID methodID,
194                           const typename SignalHandler<ReceivedDataType>::type& process);
195
196     /**
197      * Removes the callback
198      *
199      * @param methodID API dependent id of the method
200      */
201     void removeMethod(const MethodID methodID);
202
203     /**
204      * Synchronous method call.
205      *
206      * @param methodID API dependent id of the method
207      * @param peerFD id of the peer
208      * @param data data to sent
209      * @param timeoutMS how long to wait for the return value before throw
210      * @tparam SentDataType data type to send
211      * @tparam ReceivedDataType data type to receive
212      */
213     template<typename SentDataType, typename ReceivedDataType>
214     std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
215                                                const FileDescriptor peerFD,
216                                                const std::shared_ptr<SentDataType>& data,
217                                                unsigned int timeoutMS = 500);
218
219     /**
220      * Asynchronous method call
221      *
222      * @param methodID API dependent id of the method
223      * @param peerFD id of the peer
224      * @param data data to sent
225      * @param process callback processing the return data
226      * @tparam SentDataType data type to send
227      * @tparam ReceivedDataType data type to receive
228      */
229     template<typename SentDataType, typename ReceivedDataType>
230     MessageID callAsync(const MethodID methodID,
231                         const FileDescriptor peerFD,
232                         const std::shared_ptr<SentDataType>& data,
233                         const typename ResultHandler<ReceivedDataType>::type& process);
234
235
236     /**
237      * Send a signal to the peer.
238      * There is no return value from the peer
239      * Sends any data only if a peer registered this a signal
240      *
241      * @param methodID API dependent id of the method
242      * @param data data to sent
243      * @tparam SentDataType data type to send
244      */
245     template<typename SentDataType>
246     void signal(const MethodID methodID,
247                 const std::shared_ptr<SentDataType>& data);
248
249     /**
250      * Removes one peer.
251      * Handler used in external polling.
252      *
253      * @param peerFD file description identifying the peer
254      * @return should the polling structure be rebuild
255      */
256     bool handleLostConnection(const FileDescriptor peerFD);
257
258     /**
259      * Handles input from one peer.
260      * Handler used in external polling.
261      *
262      * @param peerFD file description identifying the peer
263      * @return should the polling structure be rebuild
264      */
265     bool handleInput(const FileDescriptor peerFD);
266
267     /**
268      * Handle one event from the internal event's queue
269      *
270      * @return should the polling structure be rebuild
271      */
272     bool handleEvent();
273
274     /**
275      * @return file descriptor for the internal event's queue
276      */
277     FileDescriptor getEventFD();
278
279 private:
280     typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
281     typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
282     typedef std::unique_lock<std::mutex> Lock;
283
284     struct EmptyData {
285         CONFIG_REGISTER_EMPTY
286     };
287
288     struct RegisterSignalsMessage {
289         RegisterSignalsMessage() = default;
290         RegisterSignalsMessage(const std::vector<MethodID> ids)
291             : ids(ids) {}
292
293         std::vector<MethodID> ids;
294
295         CONFIG_REGISTER
296         (
297             ids
298         )
299     };
300
301     struct MethodHandlers {
302         MethodHandlers(const MethodHandlers& other) = delete;
303         MethodHandlers& operator=(const MethodHandlers&) = delete;
304         MethodHandlers() = default;
305         MethodHandlers(MethodHandlers&&) = default;
306         MethodHandlers& operator=(MethodHandlers &&) = default;
307
308         SerializeCallback serialize;
309         ParseCallback parse;
310         MethodHandler<void, void>::type method;
311     };
312
313     struct SignalHandlers {
314         SignalHandlers(const SignalHandlers& other) = delete;
315         SignalHandlers& operator=(const SignalHandlers&) = delete;
316         SignalHandlers() = default;
317         SignalHandlers(SignalHandlers&&) = default;
318         SignalHandlers& operator=(SignalHandlers &&) = default;
319
320         ParseCallback parse;
321         SignalHandler<void>::type signal;
322     };
323
324     struct ReturnCallbacks {
325         ReturnCallbacks(const ReturnCallbacks& other) = delete;
326         ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
327         ReturnCallbacks() = default;
328         ReturnCallbacks(ReturnCallbacks&&) = default;
329         ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
330
331         ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler<void>::type& process)
332             : peerFD(peerFD), parse(parse), process(process) {}
333
334         FileDescriptor peerFD;
335         ParseCallback parse;
336         ResultHandler<void>::type process;
337     };
338
339     struct SocketInfo {
340         SocketInfo(const SocketInfo& other) = delete;
341         SocketInfo& operator=(const SocketInfo&) = delete;
342         SocketInfo() = default;
343         SocketInfo(SocketInfo&&) = default;
344         SocketInfo& operator=(SocketInfo &&) = default;
345
346         SocketInfo(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
347             : peerFD(peerFD), socketPtr(socketPtr) {}
348
349         FileDescriptor peerFD;
350         std::shared_ptr<Socket> socketPtr;
351     };
352
353     struct RemovePeerRequest {
354         RemovePeerRequest(const RemovePeerRequest& other) = delete;
355         RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
356         RemovePeerRequest() = default;
357         RemovePeerRequest(RemovePeerRequest&&) = default;
358         RemovePeerRequest& operator=(RemovePeerRequest &&) = default;
359
360         RemovePeerRequest(const FileDescriptor peerFD,
361                           const std::shared_ptr<std::condition_variable>& conditionPtr)
362             : peerFD(peerFD), conditionPtr(conditionPtr) {}
363
364         FileDescriptor peerFD;
365         std::shared_ptr<std::condition_variable> conditionPtr;
366     };
367
368     enum class Event : int {
369         FINISH,     // Shutdown request
370         CALL,       // New method call in the queue
371         ADD_PEER,   // New peer in the queue
372         REMOVE_PEER // Remove peer
373     };
374     EventQueue<Event> mEventQueue;
375
376
377     bool mIsRunning;
378
379     // Mutex for the Calls queue and the map of methods.
380     std::mutex mCallsMutex;
381     CallQueue mCalls;
382     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
383     std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
384     std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
385
386     // Mutex for changing mSockets map.
387     // Shouldn't be locked on any read/write, that could block. Just copy the ptr.
388     std::mutex mSocketsMutex;
389     std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
390     std::vector<struct pollfd> mFDs;
391     std::queue<SocketInfo> mNewSockets;
392     std::queue<RemovePeerRequest> mPeersToDelete;
393
394     // Mutex for modifying the map with return callbacks
395     std::mutex mReturnCallbacksMutex;
396     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
397
398     // Mutex for setting callbacks
399     std::mutex mCallbacksMutex;
400     PeerCallback mNewPeerCallback;
401     PeerCallback mRemovedPeerCallback;
402
403     unsigned int mMaxNumberOfPeers;
404
405     std::thread mThread;
406
407     template<typename SentDataType, typename ReceivedDataType>
408     void addMethodHandlerInternal(const MethodID methodID,
409                                   const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
410
411     template<typename SentDataType, typename ReceivedDataType>
412     MessageID callInternal(const MethodID methodID,
413                            const FileDescriptor peerFD,
414                            const std::shared_ptr<SentDataType>& data,
415                            const typename ResultHandler<ReceivedDataType>::type& process);
416
417     template<typename ReceivedDataType>
418     static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
419
420     void run();
421     bool onCall();
422     bool onSignalCall(CallQueue::Call& call);
423     bool onMethodCall(CallQueue::Call& call);
424     bool onNewPeer();
425     bool onRemovePeer();
426     bool handleLostConnections();
427     bool handleInputs();
428
429     bool onReturnValue(const Socket& socket,
430                        const MessageID messageID);
431     bool onRemoteCall(const Socket& socket,
432                       const MethodID methodID,
433                       const MessageID messageID,
434                       std::shared_ptr<MethodHandlers> methodCallbacks);
435     bool onRemoteSignal(const Socket& socket,
436                         const MethodID methodID,
437                         const MessageID messageID,
438                         std::shared_ptr<SignalHandlers> signalCallbacks);
439     void resetPolling();
440     FileDescriptor getNextFileDescriptor();
441     CallQueue::Call getCall();
442     void removePeerInternal(const FileDescriptor peerFD, Status status);
443
444     std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
445                                             std::shared_ptr<RegisterSignalsMessage>& data);
446
447
448     void cleanCommunication();
449 };
450
451 template<typename SentDataType, typename ReceivedDataType>
452 void Processor::addMethodHandlerInternal(const MethodID methodID,
453                                          const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
454 {
455     MethodHandlers methodCall;
456
457     methodCall.parse = [](const int fd)->std::shared_ptr<void> {
458         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
459         config::loadFromFD<ReceivedDataType>(fd, *data);
460         return data;
461     };
462
463     methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
464         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
465     };
466
467     methodCall.method = [method](const FileDescriptor peerFD, std::shared_ptr<void>& data)->std::shared_ptr<void> {
468         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
469         return method(peerFD, tmpData);
470     };
471
472     {
473         Lock lock(mCallsMutex);
474         mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
475     }
476 }
477
478 template<typename SentDataType, typename ReceivedDataType>
479 void Processor::addMethodHandler(const MethodID methodID,
480                                  const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
481 {
482     if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
483         LOGE("Forbidden methodID: " << methodID);
484         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
485     }
486
487     {
488         Lock lock(mCallsMutex);
489         if (mSignalsCallbacks.count(methodID)) {
490             LOGE("MethodID used by a signal: " << methodID);
491             throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
492         }
493     }
494
495     addMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
496 }
497
498 template<typename ReceivedDataType>
499 void Processor::addSignalHandler(const MethodID methodID,
500                                  const typename SignalHandler<ReceivedDataType>::type& handler)
501 {
502     if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
503         LOGE("Forbidden methodID: " << methodID);
504         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
505     }
506
507     {
508         Lock lock(mCallsMutex);
509         if (mMethodsCallbacks.count(methodID)) {
510             LOGE("MethodID used by a method: " << methodID);
511             throw IPCException("MethodID used by a method: " + std::to_string(methodID));
512         }
513     }
514
515     SignalHandlers signalCall;
516
517     signalCall.parse = [](const int fd)->std::shared_ptr<void> {
518         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
519         config::loadFromFD<ReceivedDataType>(fd, *data);
520         return data;
521     };
522
523     signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
524         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
525         handler(peerFD, tmpData);
526     };
527
528     {
529         Lock lock(mCallsMutex);
530         mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
531     }
532
533     std::vector<MethodID> ids {methodID};
534     auto data = std::make_shared<RegisterSignalsMessage>(ids);
535
536     std::list<FileDescriptor> peersFDs;
537     {
538         Lock lock(mSocketsMutex);
539         for (const auto kv : mSockets) {
540             peersFDs.push_back(kv.first);
541         }
542     }
543
544     for (const FileDescriptor peerFD : peersFDs) {
545         callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
546                                                     peerFD,
547                                                     data,
548                                                     DEFAULT_METHOD_TIMEOUT);
549     }
550
551 }
552
553 template<typename SentDataType, typename ReceivedDataType>
554 MessageID Processor::callInternal(const MethodID methodID,
555                                   const FileDescriptor peerFD,
556                                   const std::shared_ptr<SentDataType>& data,
557                                   const typename ResultHandler<ReceivedDataType>::type& process)
558 {
559     Lock lock(mCallsMutex);
560     MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
561     mEventQueue.send(Event::CALL);
562
563     return messageID;
564 }
565
566 template<typename SentDataType, typename ReceivedDataType>
567 MessageID Processor::callAsync(const MethodID methodID,
568                                const FileDescriptor peerFD,
569                                const std::shared_ptr<SentDataType>& data,
570                                const typename ResultHandler<ReceivedDataType>::type& process)
571 {
572     return callInternal<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
573 }
574
575
576 template<typename SentDataType, typename ReceivedDataType>
577 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
578                                                       const FileDescriptor peerFD,
579                                                       const std::shared_ptr<SentDataType>& data,
580                                                       unsigned int timeoutMS)
581 {
582     std::shared_ptr<ReceivedDataType> result;
583
584     std::mutex mutex;
585     std::condition_variable cv;
586     Status returnStatus = ipc::Status::UNDEFINED;
587
588     auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
589         std::unique_lock<std::mutex> lock(mutex);
590         returnStatus = status;
591         result = returnedData;
592         cv.notify_all();
593     };
594
595     MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
596                                                                     peerFD,
597                                                                     data,
598                                                                     process);
599
600     auto isResultInitialized = [&returnStatus]() {
601         return returnStatus != ipc::Status::UNDEFINED;
602     };
603
604     std::unique_lock<std::mutex> lock(mutex);
605     if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
606         bool isTimeout = false;
607         {
608             Lock lock(mReturnCallbacksMutex);
609             if (1 == mReturnCallbacks.erase(messageID)) {
610                 isTimeout = true;
611             }
612         }
613         if (isTimeout) {
614             removePeer(peerFD);
615             LOGE("Function call timeout; methodID: " << methodID);
616             throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
617         } else {
618             // Timeout started during the return value processing, so wait for it to finish
619             cv.wait(lock, isResultInitialized);
620         }
621     }
622
623     throwOnError(returnStatus);
624
625     return result;
626 }
627
628 template<typename SentDataType>
629 void Processor::signal(const MethodID methodID,
630                        const std::shared_ptr<SentDataType>& data)
631 {
632     std::list<FileDescriptor> peersFDs;
633     {
634         Lock lock(mSocketsMutex);
635         peersFDs = mSignalsPeers[methodID];
636     }
637
638     for (const FileDescriptor peerFD : peersFDs) {
639         Lock lock(mCallsMutex);
640         mCalls.push<SentDataType>(methodID, peerFD, data);
641         mEventQueue.send(Event::CALL);
642     }
643 }
644
645
646 } // namespace ipc
647 } // namespace vasum
648
649 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP