Fix IPCSuite/WriteTimeout
[platform/core/security/vasum.git] / libs / ipc / internals / processor.hpp
1 /*
2 *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 *  Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 *  Licensed under the Apache License, Version 2.0 (the "License");
7 *  you may not use this file except in compliance with the License.
8 *  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *  Unless required by applicable law or agreed to in writing, software
13 *  distributed under the License is distributed on an "AS IS" BASIS,
14 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License
17 */
18
19 /**
20  * @file
21  * @author  Jan Olszak (j.olszak@samsung.com)
22  * @brief   Data and event processing thread
23  */
24
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
27
28 #include "ipc/internals/result-builder.hpp"
29 #include "ipc/internals/socket.hpp"
30 #include "ipc/internals/request-queue.hpp"
31 #include "ipc/internals/method-request.hpp"
32 #include "ipc/internals/signal-request.hpp"
33 #include "ipc/internals/add-peer-request.hpp"
34 #include "ipc/internals/remove-peer-request.hpp"
35 #include "ipc/internals/send-result-request.hpp"
36 #include "ipc/internals/finish-request.hpp"
37 #include "ipc/exception.hpp"
38 #include "ipc/method-result.hpp"
39 #include "ipc/types.hpp"
40 #include "config/manager.hpp"
41 #include "config/fields.hpp"
42 #include "logger/logger.hpp"
43 #include "logger/logger-scope.hpp"
44
45 #include <ostream>
46 #include <condition_variable>
47 #include <mutex>
48 #include <chrono>
49 #include <vector>
50 #include <thread>
51 #include <string>
52 #include <list>
53 #include <functional>
54 #include <unordered_map>
55 #include <utility>
56
57 namespace ipc {
58
59 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
60 /**
61 * This class wraps communication via UX sockets
62 *
63 * It's intended to be used both in Client and Service classes.
64 * It uses a serialization mechanism from Config.
65 * Library user will only have to pass the types that each call will send and receive
66 *
67 * Message format:
68 * - MethodID  - probably casted enum.
69 *               MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
70 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
71 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
72 *
73 * TODO:
74 *  - synchronous call to many peers
75 *  - implement HandlerStore class for storing both signals and methods
76 *  - API for removing signals
77 *  - implement CallbackStore - thread safe calling/setting callbacks
78 *  - helper function for removing from unordered map
79 *  - new way to generate UIDs
80 *  - callbacks for serialization/parsing
81 *  - store Sockets in a vector, maybe SocketStore?
82 *  - waiting till the EventQueue is empty before leaving stop()
83 *  - no new events added after stop() called
84 *
85 */
86 class Processor {
87 private:
88     enum class Event {
89         FINISH,      // Shutdown request
90         METHOD,      // New method call in the queue
91         SIGNAL,      // New signal call in the queue
92         ADD_PEER,    // New peer in the queue
93         REMOVE_PEER, // Remove peer
94         SEND_RESULT  // Send the result of a method's call
95     };
96
97 public:
98     friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event);
99
100     /**
101      * Used to indicate a message with the return value.
102      */
103     static const MethodID RETURN_METHOD_ID;
104
105     /**
106      * Indicates an Processor's internal request/broadcast to register a Signal
107      */
108     static const MethodID REGISTER_SIGNAL_METHOD_ID;
109
110     /**
111     * Error return message
112     */
113     static const MethodID ERROR_METHOD_ID;
114
115     /**
116      * Constructs the Processor, but doesn't start it.
117      * The object is ready to add methods.
118      *
119      * @param newPeerCallback called when a new peer arrives
120      * @param removedPeerCallback called when the Processor stops listening for this peer
121      */
122     Processor(const std::string& logName = "",
123               const PeerCallback& newPeerCallback = nullptr,
124               const PeerCallback& removedPeerCallback = nullptr,
125               const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
126     ~Processor();
127
128     Processor(const Processor&) = delete;
129     Processor(Processor&&) = delete;
130     Processor& operator=(const Processor&) = delete;
131
132
133     /**
134      * Start processing.
135      */
136     void start();
137
138     /**
139      * @return is processor running
140      */
141     bool isStarted();
142
143     /**
144      * Stops the processing thread.
145      * No incoming data will be handled after.
146      */
147     void stop();
148
149     /**
150      * Set the callback called for each new connection to a peer
151      *
152      * @param newPeerCallback the callback
153      */
154     void setNewPeerCallback(const PeerCallback& newPeerCallback);
155
156     /**
157      * Set the callback called when connection to a peer is lost
158      *
159      * @param removedPeerCallback the callback
160      */
161     void setRemovedPeerCallback(const PeerCallback& removedPeerCallback);
162
163     /**
164      * From now on socket is owned by the Processor object.
165      * Calls the newPeerCallback.
166      *
167      * @param socketPtr pointer to the new socket
168      * @return peerID of the new user
169      */
170     PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
171
172     /**
173      * Saves the callbacks connected to the method id.
174      * When a message with the given method id is received,
175      * the data will be passed to the serialization callback through file descriptor.
176      *
177      * Then the process callback will be called with the parsed data.
178      *
179      * @param methodID API dependent id of the method
180      * @param process data processing callback
181      * @tparam SentDataType data type to send
182      * @tparam ReceivedDataType data type to receive
183      */
184     template<typename SentDataType, typename ReceivedDataType>
185     void setMethodHandler(const MethodID methodID,
186                           const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
187
188     /**
189      * Saves the callbacks connected to the method id.
190      * When a message with the given method id is received,
191      * the data will be passed to the serialization callback through file descriptor.
192      *
193      * Then the process callback will be called with the parsed data.
194      * There is no return data to send back.
195      *
196      * Adding signal sends a registering message to all peers
197      *
198      * @param methodID API dependent id of the method
199      * @param process data processing callback
200      * @tparam ReceivedDataType data type to receive
201      */
202     template<typename ReceivedDataType>
203     void setSignalHandler(const MethodID methodID,
204                           const typename SignalHandler<ReceivedDataType>::type& process);
205
206     /**
207      * Send result of the method.
208      * Used for asynchronous communication, only internally.
209      *
210      * @param methodID API dependent id of the method
211      * @param peerID id of the peer
212      * @param messageID id of the message to which it replies
213      * @param data data to send
214      */
215     void sendResult(const MethodID methodID,
216                     const PeerID peerID,
217                     const MessageID messageID,
218                     const std::shared_ptr<void>& data);
219
220     /**
221      * Send error result of the method
222      *
223      * @param peerID id of the peer
224      * @param messageID id of the message to which it replies
225      * @param errorCode code of the error
226      * @param message description of the error
227      */
228     void sendError(const PeerID peerID,
229                    const MessageID messageID,
230                    const int errorCode,
231                    const std::string& message);
232
233     /**
234      * Indicate that the method handler finished
235      *
236      * @param methodID API dependent id of the method
237      * @param peerID id of the peer
238      * @param messageID id of the message to which it replies
239      */
240     void sendVoid(const MethodID methodID,
241                   const PeerID peerID,
242                   const MessageID messageID);
243
244     /**
245      * Removes the callback
246      *
247      * @param methodID API dependent id of the method
248      */
249     void removeMethod(const MethodID methodID);
250
251     /**
252      * Synchronous method call.
253      *
254      * @param methodID API dependent id of the method
255      * @param peerID id of the peer
256      * @param data data to send
257      * @param timeoutMS how long to wait for the return value before throw
258      * @tparam SentDataType data type to send
259      * @tparam ReceivedDataType data type to receive
260      */
261     template<typename SentDataType, typename ReceivedDataType>
262     std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
263                                                const PeerID peerID,
264                                                const std::shared_ptr<SentDataType>& data,
265                                                unsigned int timeoutMS = 500);
266
267     /**
268      * Asynchronous method call
269      *
270      * @param methodID API dependent id of the method
271      * @param peerID id of the peer
272      * @param data data to sent
273      * @param process callback processing the return data
274      * @tparam SentDataType data type to send
275      * @tparam ReceivedDataType data type to receive
276      */
277     template<typename SentDataType, typename ReceivedDataType>
278     MessageID callAsync(const MethodID methodID,
279                         const PeerID peerID,
280                         const std::shared_ptr<SentDataType>& data,
281                         const typename ResultHandler<ReceivedDataType>::type& process);
282
283
284     /**
285      * Send a signal to the peer.
286      * There is no return value from the peer
287      * Sends any data only if a peer registered this a signal
288      *
289      * @param methodID API dependent id of the method
290      * @param data data to sent
291      * @tparam SentDataType data type to send
292      */
293     template<typename SentDataType>
294     void signal(const MethodID methodID,
295                 const std::shared_ptr<SentDataType>& data);
296
297     /**
298      * Removes one peer.
299      * Handler used in external polling.
300      *
301      * @param fd file description identifying the peer
302      * @return should the polling structure be rebuild
303      */
304     bool handleLostConnection(const FileDescriptor fd);
305
306     /**
307      * Handles input from one peer.
308      * Handler used in external polling.
309      *
310      * @param fd file description identifying the peer
311      * @return should the polling structure be rebuild
312      */
313     bool handleInput(const FileDescriptor fd);
314
315     /**
316      * Handle one event from the internal event's queue
317      *
318      * @return should the polling structure be rebuild
319      */
320     bool handleEvent();
321
322     /**
323      * @return file descriptor for the internal event's queue
324      */
325     FileDescriptor getEventFD();
326
327 private:
328     typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
329     typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
330     typedef std::unique_lock<std::mutex> Lock;
331     typedef RequestQueue<Event>::Request Request;
332
333     struct EmptyData {
334         CONFIG_REGISTER_EMPTY
335     };
336
337     struct RegisterSignalsProtocolMessage {
338         RegisterSignalsProtocolMessage() = default;
339         RegisterSignalsProtocolMessage(const std::vector<MethodID> ids)
340             : ids(ids) {}
341
342         std::vector<MethodID> ids;
343
344         CONFIG_REGISTER
345         (
346             ids
347         )
348     };
349
350     struct ErrorProtocolMessage {
351         ErrorProtocolMessage() = default;
352         ErrorProtocolMessage(const MessageID messageID, const int code, const std::string& message)
353             : messageID(messageID), code(code), message(message) {}
354
355         MessageID messageID;
356         int code;
357         std::string message;
358
359         CONFIG_REGISTER
360         (
361             messageID,
362             code,
363             message
364         )
365     };
366
367     struct MethodHandlers {
368         MethodHandlers(const MethodHandlers& other) = delete;
369         MethodHandlers& operator=(const MethodHandlers&) = delete;
370         MethodHandlers() = default;
371         MethodHandlers(MethodHandlers&&) = default;
372         MethodHandlers& operator=(MethodHandlers &&) = default;
373
374         SerializeCallback serialize;
375         ParseCallback parse;
376         MethodHandler<void, void>::type method;
377     };
378
379     struct SignalHandlers {
380         SignalHandlers(const SignalHandlers& other) = delete;
381         SignalHandlers& operator=(const SignalHandlers&) = delete;
382         SignalHandlers() = default;
383         SignalHandlers(SignalHandlers&&) = default;
384         SignalHandlers& operator=(SignalHandlers &&) = default;
385
386         ParseCallback parse;
387         SignalHandler<void>::type signal;
388     };
389
390     struct ReturnCallbacks {
391         ReturnCallbacks(const ReturnCallbacks& other) = delete;
392         ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
393         ReturnCallbacks() = default;
394         ReturnCallbacks(ReturnCallbacks&&) = default;
395         ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
396
397         ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultBuilderHandler& process)
398             : peerID(peerID), parse(parse), process(process) {}
399
400         PeerID peerID;
401         ParseCallback parse;
402         ResultBuilderHandler process;
403     };
404
405     struct PeerInfo {
406         PeerInfo(const PeerInfo& other) = delete;
407         PeerInfo& operator=(const PeerInfo&) = delete;
408         PeerInfo() = delete;
409
410         PeerInfo(PeerInfo&&) = default;
411         PeerInfo& operator=(PeerInfo &&) = default;
412
413         PeerInfo(PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
414             : peerID(peerID), socketPtr(socketPtr) {}
415
416         PeerID peerID;
417         std::shared_ptr<Socket> socketPtr;
418     };
419
420     typedef std::vector<PeerInfo> Peers;
421
422     std::string mLogPrefix;
423
424     RequestQueue<Event> mRequestQueue;
425
426     bool mIsRunning;
427
428     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
429     std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
430     std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
431
432     Peers mPeerInfo;
433
434     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
435
436     // Mutex for modifying any internal data
437     std::mutex mStateMutex;
438
439     PeerCallback mNewPeerCallback;
440     PeerCallback mRemovedPeerCallback;
441
442     unsigned int mMaxNumberOfPeers;
443
444     template<typename SentDataType, typename ReceivedDataType>
445     MessageID callAsyncInternal(const MethodID methodID,
446                                 const PeerID peerID,
447                                 const std::shared_ptr<SentDataType>& data,
448                                 const typename ResultHandler<ReceivedDataType>::type& process);
449
450     template<typename SentDataType, typename ReceivedDataType>
451     void setMethodHandlerInternal(const MethodID methodID,
452                                   const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
453
454     template<typename ReceivedDataType>
455     void setSignalHandlerInternal(const MethodID methodID,
456                                   const typename SignalHandler<ReceivedDataType>::type& handler);
457
458     template<typename SentDataType>
459     void signalInternal(const MethodID methodID,
460                         const PeerID peerID,
461                         const std::shared_ptr<SentDataType>& data);
462
463     // Request handlers
464     bool onMethodRequest(MethodRequest& request);
465     bool onSignalRequest(SignalRequest& request);
466     bool onAddPeerRequest(AddPeerRequest& request);
467     bool onRemovePeerRequest(RemovePeerRequest& request);
468     bool onSendResultRequest(SendResultRequest& request);
469     bool onFinishRequest(FinishRequest& request);
470
471     bool onReturnValue(Peers::iterator& peerIt,
472                        const MessageID messageID);
473     bool onRemoteMethod(Peers::iterator& peerIt,
474                         const MethodID methodID,
475                         const MessageID messageID,
476                         std::shared_ptr<MethodHandlers> methodCallbacks);
477     bool onRemoteSignal(Peers::iterator& peerIt,
478                         const MethodID methodID,
479                         const MessageID messageID,
480                         std::shared_ptr<SignalHandlers> signalCallbacks);
481
482     void removePeerInternal(Peers::iterator peerIt,
483                             const std::exception_ptr& exceptionPtr);
484     void removePeerSyncInternal(const PeerID peerID, Lock& lock);
485
486     void onNewSignals(const PeerID peerID,
487                       std::shared_ptr<RegisterSignalsProtocolMessage>& data);
488
489     void onErrorSignal(const PeerID peerID,
490                        std::shared_ptr<ErrorProtocolMessage>& data);
491
492     Peers::iterator getPeerInfoIterator(const FileDescriptor fd);
493     Peers::iterator getPeerInfoIterator(const PeerID peerID);
494
495 };
496
497 template<typename SentDataType, typename ReceivedDataType>
498 void Processor::setMethodHandlerInternal(const MethodID methodID,
499                                          const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
500 {
501     MethodHandlers methodCall;
502
503     methodCall.parse = [](const int fd)->std::shared_ptr<void> {
504         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
505         config::loadFromFD<ReceivedDataType>(fd, *data);
506         return data;
507     };
508
509     methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
510         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
511     };
512
513     methodCall.method = [method](const PeerID peerID, std::shared_ptr<void>& data, MethodResult::Pointer && methodResult) {
514         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
515         method(peerID, tmpData, std::forward<MethodResult::Pointer>(methodResult));
516     };
517
518     mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
519 }
520
521 template<typename SentDataType, typename ReceivedDataType>
522 void Processor::setMethodHandler(const MethodID methodID,
523                                  const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
524 {
525     if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
526         LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
527         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
528     }
529
530     {
531         Lock lock(mStateMutex);
532
533         if (mSignalsCallbacks.count(methodID)) {
534             LOGE(mLogPrefix + "MethodID used by a signal: " << methodID);
535             throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
536         }
537
538         setMethodHandlerInternal<SentDataType, ReceivedDataType>(methodID, method);
539     }
540
541 }
542
543 template<typename ReceivedDataType>
544 void Processor::setSignalHandlerInternal(const MethodID methodID,
545                                          const typename SignalHandler<ReceivedDataType>::type& handler)
546 {
547     SignalHandlers signalCall;
548
549     signalCall.parse = [](const int fd)->std::shared_ptr<void> {
550         std::shared_ptr<ReceivedDataType> dataToFill(new ReceivedDataType());
551         config::loadFromFD<ReceivedDataType>(fd, *dataToFill);
552         return dataToFill;
553     };
554
555     signalCall.signal = [handler](const PeerID peerID, std::shared_ptr<void>& dataReceived) {
556         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(dataReceived);
557         handler(peerID, tmpData);
558     };
559
560     mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
561 }
562
563
564 template<typename ReceivedDataType>
565 void Processor::setSignalHandler(const MethodID methodID,
566                                  const typename SignalHandler<ReceivedDataType>::type& handler)
567 {
568     if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
569         LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
570         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
571     }
572
573     std::shared_ptr<RegisterSignalsProtocolMessage> data;
574
575     {
576         Lock lock(mStateMutex);
577
578         // Andd the signal handler:
579         if (mMethodsCallbacks.count(methodID)) {
580             LOGE(mLogPrefix + "MethodID used by a method: " << methodID);
581             throw IPCException("MethodID used by a method: " + std::to_string(methodID));
582         }
583
584         setSignalHandlerInternal<ReceivedDataType>(methodID, handler);
585
586         // Broadcast the new signal:
587         std::vector<MethodID> ids {methodID};
588         data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
589
590         for (const PeerInfo& peerInfo : mPeerInfo) {
591             signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
592                                                            peerInfo.peerID,
593                                                            data);
594         }
595     }
596 }
597
598
599 template<typename SentDataType, typename ReceivedDataType>
600 MessageID Processor::callAsync(const MethodID methodID,
601                                const PeerID peerID,
602                                const std::shared_ptr<SentDataType>& data,
603                                const typename ResultHandler<ReceivedDataType>::type& process)
604 {
605     Lock lock(mStateMutex);
606     return callAsyncInternal<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
607 }
608
609 template<typename SentDataType, typename ReceivedDataType>
610 MessageID Processor::callAsyncInternal(const MethodID methodID,
611                                        const PeerID peerID,
612                                        const std::shared_ptr<SentDataType>& data,
613                                        const typename ResultHandler<ReceivedDataType>::type& process)
614 {
615     auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
616     mRequestQueue.pushBack(Event::METHOD, request);
617     return request->messageID;
618 }
619
620
621 template<typename SentDataType, typename ReceivedDataType>
622 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
623                                                       const PeerID peerID,
624                                                       const std::shared_ptr<SentDataType>& data,
625                                                       unsigned int timeoutMS)
626 {
627     Result<ReceivedDataType> result;
628     std::condition_variable cv;
629
630     auto process = [&result, &cv](const Result<ReceivedDataType> && r) {
631         // This is called under lock(mStateMutex)
632         result = std::move(r);
633         cv.notify_all();
634     };
635
636     Lock lock(mStateMutex);
637     MessageID messageID = callAsyncInternal<SentDataType, ReceivedDataType>(methodID,
638                                                                             peerID,
639                                                                             data,
640                                                                             process);
641
642     auto isResultInitialized = [&result]() {
643         return result.isValid();
644     };
645
646     LOGT(mLogPrefix + "Waiting for the response...");
647     //In the case of too large sending time response can be received far after timeoutMS but
648     //before this thread wakes up and before predicate check (there will by no timeout exception)
649     if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
650         LOGW(mLogPrefix + "Probably a timeout in callSync. Checking...");
651
652         // Call isn't sent or call is sent but there is no reply
653         bool isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
654             return request.requestID == Event::METHOD &&
655                    request.get<MethodRequest>()->messageID == messageID;
656         })
657         || 1 == mReturnCallbacks.erase(messageID);
658
659         if (isTimeout) {
660             LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
661             removePeerSyncInternal(peerID, lock);
662             throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
663         } else {
664             LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
665             if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
666                 LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
667                 throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
668             }
669         }
670     }
671
672     return result.get();
673 }
674
675 template<typename SentDataType>
676 void Processor::signalInternal(const MethodID methodID,
677                                const PeerID peerID,
678                                const std::shared_ptr<SentDataType>& data)
679 {
680     auto requestPtr = SignalRequest::create<SentDataType>(methodID, peerID, data);
681     mRequestQueue.pushFront(Event::SIGNAL, requestPtr);
682 }
683
684 template<typename SentDataType>
685 void Processor::signal(const MethodID methodID,
686                        const std::shared_ptr<SentDataType>& data)
687 {
688     Lock lock(mStateMutex);
689     const auto it = mSignalsPeers.find(methodID);
690     if (it == mSignalsPeers.end()) {
691         LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID);
692         return;
693     }
694     for (const PeerID peerID : it->second) {
695         auto requestPtr =  SignalRequest::create<SentDataType>(methodID, peerID, data);
696         mRequestQueue.pushBack(Event::SIGNAL, requestPtr);
697     }
698 }
699
700
701
702 } // namespace ipc
703
704 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP