IPC: Support older glib
[platform/core/security/vasum.git] / common / ipc / internals / processor.hpp
1 /*
2 *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 *  Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 *  Licensed under the Apache License, Version 2.0 (the "License");
7 *  you may not use this file except in compliance with the License.
8 *  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *  Unless required by applicable law or agreed to in writing, software
13 *  distributed under the License is distributed on an "AS IS" BASIS,
14 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License
17 */
18
19 /**
20  * @file
21  * @author  Jan Olszak (j.olszak@samsung.com)
22  * @brief   Data and event processing thread
23  */
24
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
27
28 #include "ipc/internals/socket.hpp"
29 #include "ipc/internals/request-queue.hpp"
30 #include "ipc/internals/method-request.hpp"
31 #include "ipc/internals/signal-request.hpp"
32 #include "ipc/internals/add-peer-request.hpp"
33 #include "ipc/internals/remove-peer-request.hpp"
34 #include "ipc/internals/finish-request.hpp"
35 #include "ipc/exception.hpp"
36 #include "ipc/types.hpp"
37 #include "config/manager.hpp"
38 #include "config/fields.hpp"
39 #include "logger/logger.hpp"
40 #include "logger/logger-scope.hpp"
41
42 #include <ostream>
43 #include <poll.h>
44 #include <condition_variable>
45 #include <mutex>
46 #include <chrono>
47 #include <vector>
48 #include <thread>
49 #include <string>
50 #include <list>
51 #include <functional>
52 #include <unordered_map>
53
54 namespace vasum {
55 namespace ipc {
56
57 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
58 const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
59
60 /**
61 * This class wraps communication via UX sockets
62 *
63 * It's intended to be used both in Client and Service classes.
64 * It uses a serialization mechanism from libConfig.
65 * Library user will only have to pass the types that each call will send and receive
66 *
67 * Message format:
68 * - MethodID  - probably casted enum.
69 *               MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
70 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
71 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
72 *
73 * TODO:
74 *  - synchronous call to many peers
75 *  - implement HandlerStore class for storing both signals and methods
76 *  - API for removing signals
77 *  - implement CallbackStore - thread safe calling/setting callbacks
78 *  - helper function for removing from unordered map
79 *  - new way to generate UIDs
80 *  - callbacks for serialization/parsing
81 *  - store Sockets in a vector, maybe SocketStore?
82 *  - poll loop outside.
83 *  - waiting till the EventQueue is empty before leaving stop()
84 *  - no new events added after stop() called
85 *  - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
86 *    there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
87 *
88 */
89 class Processor {
90 private:
91     enum class Event {
92         FINISH,     // Shutdown request
93         METHOD,     // New method call in the queue
94         SIGNAL,     // New signal call in the queue
95         ADD_PEER,   // New peer in the queue
96         REMOVE_PEER // Remove peer
97     };
98
99 public:
100
101     friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event);
102
103     /**
104      * Used to indicate a message with the return value.
105      */
106     static const MethodID RETURN_METHOD_ID;
107
108     /**
109      * Indicates an Processor's internal request/broadcast to register a Signal
110      */
111     static const MethodID REGISTER_SIGNAL_METHOD_ID;
112
113     /**
114      * Constructs the Processor, but doesn't start it.
115      * The object is ready to add methods.
116      *
117      * @param newPeerCallback called when a new peer arrives
118      * @param removedPeerCallback called when the Processor stops listening for this peer
119      */
120     Processor(const std::string& logName = "",
121               const PeerCallback& newPeerCallback = nullptr,
122               const PeerCallback& removedPeerCallback = nullptr,
123               const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
124     ~Processor();
125
126     Processor(const Processor&) = delete;
127     Processor(Processor&&) = delete;
128     Processor& operator=(const Processor&) = delete;
129
130
131     /**
132      * Start the processing thread.
133      * Quits immediately after starting the thread.
134      *
135      * @param usesExternalPolling internal or external polling is used
136      */
137     void start(const bool usesExternalPolling);
138
139     /**
140      * @return is processor running
141      */
142     bool isStarted();
143
144     /**
145      * Stops the processing thread.
146      * No incoming data will be handled after.
147      */
148     void stop();
149
150     /**
151      * Set the callback called for each new connection to a peer
152      *
153      * @param newPeerCallback the callback
154      */
155     void setNewPeerCallback(const PeerCallback& newPeerCallback);
156
157     /**
158      * Set the callback called when connection to a peer is lost
159      *
160      * @param removedPeerCallback the callback
161      */
162     void setRemovedPeerCallback(const PeerCallback& removedPeerCallback);
163
164     /**
165      * From now on socket is owned by the Processor object.
166      * Calls the newPeerCallback.
167      *
168      * @param socketPtr pointer to the new socket
169      * @return peerFD of the new socket
170      */
171     FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
172
173     /**
174      * Request removing peer and wait
175      *
176      * @param peerFD id of the peer
177      */
178     void removePeer(const FileDescriptor peerFD);
179
180     /**
181      * Saves the callbacks connected to the method id.
182      * When a message with the given method id is received,
183      * the data will be passed to the serialization callback through file descriptor.
184      *
185      * Then the process callback will be called with the parsed data.
186      *
187      * @param methodID API dependent id of the method
188      * @param process data processing callback
189      * @tparam SentDataType data type to send
190      * @tparam ReceivedDataType data type to receive
191      */
192     template<typename SentDataType, typename ReceivedDataType>
193     void setMethodHandler(const MethodID methodID,
194                           const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
195
196     /**
197      * Saves the callbacks connected to the method id.
198      * When a message with the given method id is received,
199      * the data will be passed to the serialization callback through file descriptor.
200      *
201      * Then the process callback will be called with the parsed data.
202      * There is no return data to send back.
203      *
204      * Adding signal sends a registering message to all peers
205      *
206      * @param methodID API dependent id of the method
207      * @param process data processing callback
208      * @tparam ReceivedDataType data type to receive
209      */
210     template<typename ReceivedDataType>
211     void setSignalHandler(const MethodID methodID,
212                           const typename SignalHandler<ReceivedDataType>::type& process);
213
214     /**
215      * Removes the callback
216      *
217      * @param methodID API dependent id of the method
218      */
219     void removeMethod(const MethodID methodID);
220
221     /**
222      * Synchronous method call.
223      *
224      * @param methodID API dependent id of the method
225      * @param peerFD id of the peer
226      * @param data data to sent
227      * @param timeoutMS how long to wait for the return value before throw
228      * @tparam SentDataType data type to send
229      * @tparam ReceivedDataType data type to receive
230      */
231     template<typename SentDataType, typename ReceivedDataType>
232     std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
233                                                const FileDescriptor peerFD,
234                                                const std::shared_ptr<SentDataType>& data,
235                                                unsigned int timeoutMS = 500);
236
237     /**
238      * Asynchronous method call
239      *
240      * @param methodID API dependent id of the method
241      * @param peerFD id of the peer
242      * @param data data to sent
243      * @param process callback processing the return data
244      * @tparam SentDataType data type to send
245      * @tparam ReceivedDataType data type to receive
246      */
247     template<typename SentDataType, typename ReceivedDataType>
248     MessageID callAsync(const MethodID methodID,
249                         const FileDescriptor peerFD,
250                         const std::shared_ptr<SentDataType>& data,
251                         const typename ResultHandler<ReceivedDataType>::type& process);
252
253
254     /**
255      * Send a signal to the peer.
256      * There is no return value from the peer
257      * Sends any data only if a peer registered this a signal
258      *
259      * @param methodID API dependent id of the method
260      * @param data data to sent
261      * @tparam SentDataType data type to send
262      */
263     template<typename SentDataType>
264     void signal(const MethodID methodID,
265                 const std::shared_ptr<SentDataType>& data);
266
267     /**
268      * Removes one peer.
269      * Handler used in external polling.
270      *
271      * @param peerFD file description identifying the peer
272      * @return should the polling structure be rebuild
273      */
274     bool handleLostConnection(const FileDescriptor peerFD);
275
276     /**
277      * Handles input from one peer.
278      * Handler used in external polling.
279      *
280      * @param peerFD file description identifying the peer
281      * @return should the polling structure be rebuild
282      */
283     bool handleInput(const FileDescriptor peerFD);
284
285     /**
286      * Handle one event from the internal event's queue
287      *
288      * @return should the polling structure be rebuild
289      */
290     bool handleEvent();
291
292     /**
293      * @return file descriptor for the internal event's queue
294      */
295     FileDescriptor getEventFD();
296
297 private:
298     typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
299     typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
300     typedef std::unique_lock<std::recursive_mutex> Lock;
301     typedef RequestQueue<Event>::Request Request;
302
303     struct EmptyData {
304         CONFIG_REGISTER_EMPTY
305     };
306
307     struct RegisterSignalsMessage {
308         RegisterSignalsMessage() = default;
309         RegisterSignalsMessage(const std::vector<MethodID> ids)
310             : ids(ids) {}
311
312         std::vector<MethodID> ids;
313
314         CONFIG_REGISTER
315         (
316             ids
317         )
318     };
319
320     struct MethodHandlers {
321         MethodHandlers(const MethodHandlers& other) = delete;
322         MethodHandlers& operator=(const MethodHandlers&) = delete;
323         MethodHandlers() = default;
324         MethodHandlers(MethodHandlers&&) = default;
325         MethodHandlers& operator=(MethodHandlers &&) = default;
326
327         SerializeCallback serialize;
328         ParseCallback parse;
329         MethodHandler<void, void>::type method;
330     };
331
332     struct SignalHandlers {
333         SignalHandlers(const SignalHandlers& other) = delete;
334         SignalHandlers& operator=(const SignalHandlers&) = delete;
335         SignalHandlers() = default;
336         SignalHandlers(SignalHandlers&&) = default;
337         SignalHandlers& operator=(SignalHandlers &&) = default;
338
339         ParseCallback parse;
340         SignalHandler<void>::type signal;
341     };
342
343     struct ReturnCallbacks {
344         ReturnCallbacks(const ReturnCallbacks& other) = delete;
345         ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
346         ReturnCallbacks() = default;
347         ReturnCallbacks(ReturnCallbacks&&) = default;
348         ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
349
350         ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler<void>::type& process)
351             : peerFD(peerFD), parse(parse), process(process) {}
352
353         FileDescriptor peerFD;
354         ParseCallback parse;
355         ResultHandler<void>::type process;
356     };
357
358     std::string mLogPrefix;
359
360     RequestQueue<Event> mRequestQueue;
361
362     bool mIsRunning;
363     bool mUsesExternalPolling;
364
365     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
366     std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
367     std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
368
369     std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
370     std::vector<struct pollfd> mFDs;
371
372     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
373
374     // Mutex for modifying any internal data
375     std::recursive_mutex mStateMutex;
376
377     PeerCallback mNewPeerCallback;
378     PeerCallback mRemovedPeerCallback;
379
380     unsigned int mMaxNumberOfPeers;
381
382     std::thread mThread;
383
384     template<typename SentDataType, typename ReceivedDataType>
385     void setMethodHandlerInternal(const MethodID methodID,
386                                   const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
387
388     template<typename ReceivedDataType>
389     static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
390
391     void run();
392
393     // Request handlers
394     bool onMethodRequest(MethodRequest& request);
395     bool onSignalRequest(SignalRequest& request);
396     bool onAddPeerRequest(AddPeerRequest& request);
397     bool onRemovePeerRequest(RemovePeerRequest& request);
398     bool onFinishRequest(FinishRequest& request);
399
400     bool handleLostConnections();
401     bool handleInputs();
402
403     bool onReturnValue(const Socket& socket,
404                        const MessageID messageID);
405     bool onRemoteCall(const Socket& socket,
406                       const MethodID methodID,
407                       const MessageID messageID,
408                       std::shared_ptr<MethodHandlers> methodCallbacks);
409     bool onRemoteSignal(const Socket& socket,
410                         const MethodID methodID,
411                         const MessageID messageID,
412                         std::shared_ptr<SignalHandlers> signalCallbacks);
413     void resetPolling();
414     FileDescriptor getNextFileDescriptor();
415     void removePeerInternal(const FileDescriptor peerFD, Status status);
416
417     std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
418                                             std::shared_ptr<RegisterSignalsMessage>& data);
419
420
421 };
422
423 template<typename SentDataType, typename ReceivedDataType>
424 void Processor::setMethodHandlerInternal(const MethodID methodID,
425                                          const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
426 {
427     MethodHandlers methodCall;
428
429     methodCall.parse = [](const int fd)->std::shared_ptr<void> {
430         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
431         config::loadFromFD<ReceivedDataType>(fd, *data);
432         return data;
433     };
434
435     methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
436         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
437     };
438
439     methodCall.method = [method](const FileDescriptor peerFD, std::shared_ptr<void>& data)->std::shared_ptr<void> {
440         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
441         return method(peerFD, tmpData);
442     };
443
444     {
445         Lock lock(mStateMutex);
446         mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
447     }
448 }
449
450 template<typename SentDataType, typename ReceivedDataType>
451 void Processor::setMethodHandler(const MethodID methodID,
452                                  const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
453 {
454     if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
455         LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
456         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
457     }
458
459     {
460         Lock lock(mStateMutex);
461
462         if (mSignalsCallbacks.count(methodID)) {
463             LOGE(mLogPrefix + "MethodID used by a signal: " << methodID);
464             throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
465         }
466
467         setMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
468     }
469
470 }
471
472 template<typename ReceivedDataType>
473 void Processor::setSignalHandler(const MethodID methodID,
474                                  const typename SignalHandler<ReceivedDataType>::type& handler)
475 {
476     if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
477         LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
478         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
479     }
480
481     std::shared_ptr<RegisterSignalsMessage> data;
482     std::vector<FileDescriptor> peerFDs;
483     {
484         Lock lock(mStateMutex);
485
486         // Andd the signal handler:
487         if (mMethodsCallbacks.count(methodID)) {
488             LOGE(mLogPrefix + "MethodID used by a method: " << methodID);
489             throw IPCException("MethodID used by a method: " + std::to_string(methodID));
490         }
491
492         SignalHandlers signalCall;
493
494         signalCall.parse = [](const int fd)->std::shared_ptr<void> {
495             std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
496             config::loadFromFD<ReceivedDataType>(fd, *data);
497             return data;
498         };
499
500         signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
501             std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
502             handler(peerFD, tmpData);
503         };
504
505         mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
506
507         // Broadcast the new signal:
508         std::vector<MethodID> ids {methodID};
509         data = std::make_shared<RegisterSignalsMessage>(ids);
510
511         for (const auto kv : mSockets) {
512             peerFDs.push_back(kv.first);
513         }
514     }
515
516     for (const auto peerFD : peerFDs) {
517         callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
518                                                     peerFD,
519                                                     data,
520                                                     DEFAULT_METHOD_TIMEOUT);
521     }
522 }
523
524
525 template<typename SentDataType, typename ReceivedDataType>
526 MessageID Processor::callAsync(const MethodID methodID,
527                                const FileDescriptor peerFD,
528                                const std::shared_ptr<SentDataType>& data,
529                                const typename ResultHandler<ReceivedDataType>::type& process)
530 {
531     Lock lock(mStateMutex);
532     auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
533     mRequestQueue.push(Event::METHOD, request);
534     return request->messageID;
535 }
536
537
538 template<typename SentDataType, typename ReceivedDataType>
539 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
540                                                       const FileDescriptor peerFD,
541                                                       const std::shared_ptr<SentDataType>& data,
542                                                       unsigned int timeoutMS)
543 {
544     std::shared_ptr<ReceivedDataType> result;
545
546     std::mutex mutex;
547     std::condition_variable cv;
548     Status returnStatus = ipc::Status::UNDEFINED;
549
550     auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
551         returnStatus = status;
552         result = returnedData;
553         cv.notify_all();
554     };
555
556     MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
557                                                                     peerFD,
558                                                                     data,
559                                                                     process);
560
561     auto isResultInitialized = [&returnStatus]() {
562         return returnStatus != ipc::Status::UNDEFINED;
563     };
564
565     std::unique_lock<std::mutex> lock(mutex);
566     LOGT(mLogPrefix + "Waiting for the response...");
567     if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
568         LOGW(mLogPrefix + "Probably a timeout in callSync. Checking...");
569         bool isTimeout;
570         {
571             Lock lock(mStateMutex);
572             // Call isn't sent or call is sent but there is no reply
573             isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
574                 return request.requestID == Event::METHOD &&
575                        request.get<MethodRequest>()->messageID == messageID;
576             })
577             || mRequestQueue.removeIf([messageID](Request & request) {
578                 return request.requestID == Event::SIGNAL &&
579                        request.get<SignalRequest>()->messageID == messageID;
580             })
581             || 1 == mReturnCallbacks.erase(messageID);
582         }
583         if (isTimeout) {
584             LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
585             removePeer(peerFD);
586             throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
587         } else {
588             LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
589             if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
590                 LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
591                 throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
592             }
593         }
594     }
595
596     throwOnError(returnStatus);
597
598     return result;
599 }
600
601 template<typename SentDataType>
602 void Processor::signal(const MethodID methodID,
603                        const std::shared_ptr<SentDataType>& data)
604 {
605     Lock lock(mStateMutex);
606     const auto it = mSignalsPeers.find(methodID);
607     if (it == mSignalsPeers.end()) {
608         LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID);
609         return;
610     }
611     for (const FileDescriptor peerFD : it->second) {
612         auto request =  SignalRequest::create<SentDataType>(methodID, peerFD, data);
613         mRequestQueue.push(Event::SIGNAL, request);
614     }
615 }
616
617
618
619 } // namespace ipc
620 } // namespace vasum
621
622 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP