IPC unit tests and testing framework improvements
[platform/core/security/vasum.git] / libs / ipc / internals / processor.cpp
1 /*
2 *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 *  Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 *  Licensed under the Apache License, Version 2.0 (the "License");
7 *  you may not use this file except in compliance with the License.
8 *  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *  Unless required by applicable law or agreed to in writing, software
13 *  distributed under the License is distributed on an "AS IS" BASIS,
14 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License
17 */
18
19 /**
20  * @file
21  * @author  Jan Olszak (j.olszak@samsung.com)
22  * @brief   Data and event processing thread
23  */
24
25 #include "config.hpp"
26
27 #include "ipc/exception.hpp"
28 #include "ipc/internals/processor.hpp"
29 #include "config/manager.hpp"
30 #include "config/exception.hpp"
31
32 #include <cerrno>
33 #include <cstring>
34 #include <csignal>
35 #include <stdexcept>
36
37 #include <sys/socket.h>
38 #include <limits>
39
40 using namespace utils;
41
42 namespace ipc {
43
44 #define IGNORE_EXCEPTIONS(expr)                        \
45     try                                                \
46     {                                                  \
47         expr;                                          \
48     }                                                  \
49     catch (const std::exception& e){                   \
50         LOGE(mLogPrefix + "Callback threw an error: " << e.what()); \
51     }
52
53 const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
54 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
55 const MethodID Processor::ERROR_METHOD_ID = std::numeric_limits<MethodID>::max() - 2;
56
57 Processor::Processor(epoll::EventPoll& eventPoll,
58                      const std::string& logName,
59                      const PeerCallback& newPeerCallback,
60                      const PeerCallback& removedPeerCallback,
61                      const unsigned int maxNumberOfPeers)
62     : mEventPoll(eventPoll),
63       mLogPrefix(logName),
64       mIsRunning(false),
65       mNewPeerCallback(newPeerCallback),
66       mRemovedPeerCallback(removedPeerCallback),
67       mMaxNumberOfPeers(maxNumberOfPeers)
68 {
69     LOGS(mLogPrefix + "Processor Constructor");
70
71     using namespace std::placeholders;
72     setSignalHandlerInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
73                                                              std::bind(&Processor::onNewSignals, this, _1, _2));
74
75     setSignalHandlerInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, std::bind(&Processor::onErrorSignal, this, _1, _2));
76 }
77
78 Processor::~Processor()
79 {
80     LOGS(mLogPrefix + "Processor Destructor");
81     try {
82         stop(false);
83     } catch (std::exception& e) {
84         LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what());
85     }
86 }
87
88 Processor::Peers::iterator Processor::getPeerInfoIterator(const FileDescriptor fd)
89 {
90     return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [fd](const PeerInfo & peerInfo) {
91         return fd == peerInfo.socketPtr->getFD();
92     });
93 }
94
95 Processor::Peers::iterator Processor::getPeerInfoIterator(const PeerID & peerID)
96 {
97     return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [peerID](const PeerInfo & peerInfo) {
98         return peerID == peerInfo.peerID;
99     });
100 }
101
102 bool Processor::isStarted()
103 {
104     Lock lock(mStateMutex);
105     return mIsRunning;
106 }
107
108 void Processor::start()
109 {
110     LOGS(mLogPrefix + "Processor start");
111
112     Lock lock(mStateMutex);
113     if (!mIsRunning) {
114         LOGI(mLogPrefix + "Processor start");
115         mIsRunning = true;
116
117         mEventPoll.addFD(mRequestQueue.getFD(), EPOLLIN, std::bind(&Processor::handleEvent, this));
118     }
119 }
120
121 void Processor::stop(bool wait)
122 {
123     LOGS(mLogPrefix + "Processor stop");
124
125     if (isStarted()) {
126         auto conditionPtr = std::make_shared<std::condition_variable>();
127         {
128             Lock lock(mStateMutex);
129             auto request = std::make_shared<FinishRequest>(conditionPtr);
130             mRequestQueue.pushBack(Event::FINISH, request);
131         }
132
133         if (wait) {
134             LOGD(mLogPrefix + "Waiting for the Processor to stop");
135
136             // Wait till the FINISH request is served
137             Lock lock(mStateMutex);
138             conditionPtr->wait(lock, [this]() {
139                 return !mIsRunning;
140             });
141             assert(mPeerInfo.empty());
142         }
143     }
144 }
145
146 void Processor::setNewPeerCallback(const PeerCallback& newPeerCallback)
147 {
148     Lock lock(mStateMutex);
149     mNewPeerCallback = newPeerCallback;
150 }
151
152 void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
153 {
154     Lock lock(mStateMutex);
155     mRemovedPeerCallback = removedPeerCallback;
156 }
157
158 FileDescriptor Processor::getEventFD()
159 {
160     Lock lock(mStateMutex);
161     return mRequestQueue.getFD();
162 }
163
164 void Processor::sendResult(const MethodID methodID,
165                            const PeerID& peerID,
166                            const MessageID& messageID,
167                            const std::shared_ptr<void>& data)
168 {
169     auto requestPtr = std::make_shared<SendResultRequest>(methodID, peerID, messageID, data);
170     mRequestQueue.pushFront(Event::SEND_RESULT, requestPtr);
171 }
172
173 void Processor::sendError(const PeerID& peerID,
174                           const MessageID& messageID,
175                           const int errorCode,
176                           const std::string& message)
177 {
178     auto data = std::make_shared<ErrorProtocolMessage>(messageID, errorCode, message);
179     signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, peerID , data);
180 }
181
182 void Processor::sendVoid(const MethodID methodID,
183                          const PeerID& peerID,
184                          const MessageID& messageID)
185 {
186     auto data = std::make_shared<EmptyData>();
187     auto requestPtr = std::make_shared<SendResultRequest>(methodID, peerID, messageID, data);
188     mRequestQueue.pushFront(Event::SEND_RESULT, requestPtr);
189 }
190
191 void Processor::removeMethod(const MethodID methodID)
192 {
193     Lock lock(mStateMutex);
194     mMethodsCallbacks.erase(methodID);
195 }
196
197 PeerID Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
198 {
199     LOGS(mLogPrefix + "Processor addPeer");
200     Lock lock(mStateMutex);
201
202     auto requestPtr = std::make_shared<AddPeerRequest>(socketPtr);
203     mRequestQueue.pushBack(Event::ADD_PEER, requestPtr);
204
205     LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID
206          << ", fd: " << socketPtr->getFD());
207
208     return requestPtr->peerID;
209 }
210
211 void Processor::removePeerSyncInternal(const PeerID& peerID, Lock& lock)
212 {
213     LOGS(mLogPrefix + "Processor removePeer peerID: " << peerID);
214
215     auto isPeerDeleted = [&peerID, this]()->bool {
216         return getPeerInfoIterator(peerID) == mPeerInfo.end();
217     };
218
219     mRequestQueue.removeIf([peerID](Request & request) {
220         return request.requestID == Event::ADD_PEER &&
221                request.get<AddPeerRequest>()->peerID == peerID;
222     });
223
224     // Remove peer and wait till he's gone
225     std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
226
227     auto request = std::make_shared<RemovePeerRequest>(peerID, conditionPtr);
228     mRequestQueue.pushBack(Event::REMOVE_PEER, request);
229
230     conditionPtr->wait(lock, isPeerDeleted);
231 }
232
233 void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_ptr& exceptionPtr)
234 {
235     if (peerIt == mPeerInfo.end()) {
236         LOGW("Peer already removed");
237         return;
238     }
239
240     LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
241     LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
242
243     // Remove from signal addressees
244     for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
245         it->second.remove(peerIt->peerID);
246         if (it->second.empty()) {
247             it = mSignalsPeers.erase(it);
248         } else {
249             ++it;
250         }
251     }
252
253     // Erase associated return value callbacks
254     for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
255         if (it->second.peerID == peerIt->peerID) {
256             ResultBuilder resultBuilder(exceptionPtr);
257             IGNORE_EXCEPTIONS(it->second.process(resultBuilder));
258             it = mReturnCallbacks.erase(it);
259         } else {
260             ++it;
261         }
262     }
263
264     if (mRemovedPeerCallback) {
265         // Notify about the deletion
266         mRemovedPeerCallback(peerIt->peerID, peerIt->socketPtr->getFD());
267     }
268
269     mPeerInfo.erase(peerIt);
270 }
271
272 bool Processor::handleLostConnection(const FileDescriptor fd)
273 {
274     Lock lock(mStateMutex);
275     auto peerIt = getPeerInfoIterator(fd);
276     removePeerInternal(peerIt,
277                        std::make_exception_ptr(IPCPeerDisconnectedException()));
278     return true;
279 }
280
281 bool Processor::handleInput(const FileDescriptor fd)
282 {
283     LOGS(mLogPrefix + "Processor handleInput fd: " << fd);
284
285     Lock lock(mStateMutex);
286
287     auto peerIt = getPeerInfoIterator(fd);
288
289     if (peerIt == mPeerInfo.end()) {
290         LOGE(mLogPrefix + "No peer for fd: " << fd);
291         return false;
292     }
293
294     MessageHeader hdr;
295     {
296         try {
297             // Read information about the incoming data
298             Socket& socket = *peerIt->socketPtr;
299             Socket::Guard guard = socket.getGuard();
300             config::loadFromFD<MessageHeader>(socket.getFD(), hdr);
301         } catch (const config::ConfigException& e) {
302             LOGE(mLogPrefix + "Error during reading the socket");
303             removePeerInternal(peerIt,
304                                std::make_exception_ptr(IPCNaughtyPeerException()));
305             return true;
306         }
307
308         if (hdr.methodID == RETURN_METHOD_ID) {
309             return onReturnValue(peerIt, hdr.messageID);
310
311         } else {
312             if (mMethodsCallbacks.count(hdr.methodID)) {
313                 // Method
314                 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(hdr.methodID);
315                 return onRemoteMethod(peerIt, hdr.methodID, hdr.messageID, methodCallbacks);
316
317             } else if (mSignalsCallbacks.count(hdr.methodID)) {
318                 // Signal
319                 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(hdr.methodID);
320                 return onRemoteSignal(peerIt, hdr.methodID, hdr.messageID, signalCallbacks);
321
322             } else {
323                 LOGW(mLogPrefix + "No method or signal callback for methodID: " << hdr.methodID);
324                 removePeerInternal(peerIt,
325                                    std::make_exception_ptr(IPCNaughtyPeerException()));
326                 return true;
327             }
328         }
329     }
330 }
331
332 void Processor::onNewSignals(const PeerID& peerID, std::shared_ptr<RegisterSignalsProtocolMessage>& data)
333 {
334     LOGS(mLogPrefix + "Processor onNewSignals peerID: " << peerID);
335
336     for (const MethodID methodID : data->ids) {
337         mSignalsPeers[methodID].push_back(peerID);
338     }
339 }
340
341 void Processor::onErrorSignal(const PeerID&, std::shared_ptr<ErrorProtocolMessage>& data)
342 {
343     LOGS(mLogPrefix + "Processor onErrorSignal messageID: " << data->messageID);
344
345     // If there is no return callback an out_of_range error will be thrown and peer will be removed
346     ReturnCallbacks returnCallbacks = std::move(mReturnCallbacks.at(data->messageID));
347     mReturnCallbacks.erase(data->messageID);
348
349     ResultBuilder resultBuilder(std::make_exception_ptr(IPCUserException(data->code, data->message)));
350     IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
351 }
352
353 bool Processor::onReturnValue(Peers::iterator& peerIt,
354                               const MessageID& messageID)
355 {
356     LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
357
358     ReturnCallbacks returnCallbacks;
359     try {
360         LOGT(mLogPrefix + "Getting the return callback");
361         returnCallbacks = std::move(mReturnCallbacks.at(messageID));
362         mReturnCallbacks.erase(messageID);
363     } catch (const std::out_of_range&) {
364         LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
365         removePeerInternal(peerIt,
366                            std::make_exception_ptr(IPCNaughtyPeerException()));
367         return true;
368     }
369
370     std::shared_ptr<void> data;
371     try {
372         LOGT(mLogPrefix + "Parsing incoming return data");
373         data = returnCallbacks.parse(peerIt->socketPtr->getFD());
374     } catch (const std::exception& e) {
375         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
376         ResultBuilder resultBuilder(std::make_exception_ptr(IPCParsingException()));
377         IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
378         removePeerInternal(peerIt,
379                            std::make_exception_ptr(IPCParsingException()));
380         return true;
381     }
382
383     ResultBuilder resultBuilder(data);
384     IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
385
386     return false;
387 }
388
389 bool Processor::onRemoteSignal(Peers::iterator& peerIt,
390                                __attribute__((unused)) const MethodID methodID,
391                                __attribute__((unused)) const MessageID& messageID,
392                                std::shared_ptr<SignalHandlers> signalCallbacks)
393 {
394     LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
395
396     std::shared_ptr<void> data;
397     try {
398         LOGT(mLogPrefix + "Parsing incoming data");
399         data = signalCallbacks->parse(peerIt->socketPtr->getFD());
400     } catch (const std::exception& e) {
401         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
402         removePeerInternal(peerIt,
403                            std::make_exception_ptr(IPCParsingException()));
404         return true;
405     }
406
407     try {
408         signalCallbacks->signal(peerIt->peerID, data);
409     } catch (const IPCUserException& e) {
410         LOGW("Discarded user's exception");
411         return false;
412     } catch (const std::exception& e) {
413         LOGE(mLogPrefix + "Exception in method handler: " << e.what());
414         removePeerInternal(peerIt,
415                            std::make_exception_ptr(IPCNaughtyPeerException()));
416
417         return true;
418     }
419
420     return false;
421 }
422
423 bool Processor::onRemoteMethod(Peers::iterator& peerIt,
424                                const MethodID methodID,
425                                const MessageID& messageID,
426                                std::shared_ptr<MethodHandlers> methodCallbacks)
427 {
428     LOGS(mLogPrefix + "Processor onRemoteMethod; methodID: " << methodID << " messageID: " << messageID);
429
430     std::shared_ptr<void> data;
431     try {
432         LOGT(mLogPrefix + "Parsing incoming data");
433         data = methodCallbacks->parse(peerIt->socketPtr->getFD());
434     } catch (const std::exception& e) {
435         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
436         removePeerInternal(peerIt,
437                            std::make_exception_ptr(IPCParsingException()));
438         return true;
439     }
440
441     LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID);
442     try {
443         methodCallbacks->method(peerIt->peerID,
444                                 data,
445                                 std::make_shared<MethodResult>(*this, methodID, messageID, peerIt->peerID));
446     } catch (const IPCUserException& e) {
447         LOGW("User's exception");
448         sendError(peerIt->peerID, messageID, e.getCode(), e.what());
449         return false;
450     } catch (const std::exception& e) {
451         LOGE(mLogPrefix + "Exception in method handler: " << e.what());
452         removePeerInternal(peerIt,
453                            std::make_exception_ptr(IPCNaughtyPeerException()));
454         return true;
455     }
456
457     return false;
458 }
459
460 bool Processor::handleEvent()
461 {
462     LOGS(mLogPrefix + "Processor handleEvent");
463
464     Lock lock(mStateMutex);
465
466     auto request = mRequestQueue.pop();
467     LOGD(mLogPrefix + "Got: " << request.requestID);
468
469     switch (request.requestID) {
470     case Event::METHOD:      return onMethodRequest(*request.get<MethodRequest>());
471     case Event::SIGNAL:      return onSignalRequest(*request.get<SignalRequest>());
472     case Event::ADD_PEER:    return onAddPeerRequest(*request.get<AddPeerRequest>());
473     case Event::REMOVE_PEER: return onRemovePeerRequest(*request.get<RemovePeerRequest>());
474     case Event::SEND_RESULT: return onSendResultRequest(*request.get<SendResultRequest>());
475     case Event::FINISH:      return onFinishRequest(*request.get<FinishRequest>());
476     }
477
478     return false;
479 }
480
481 bool Processor::onMethodRequest(MethodRequest& request)
482 {
483     LOGS(mLogPrefix + "Processor onMethodRequest");
484
485     auto peerIt = getPeerInfoIterator(request.peerID);
486
487     if (peerIt == mPeerInfo.end()) {
488         LOGE(mLogPrefix + "Peer disconnected. No user with a peerID: " << request.peerID);
489
490         // Pass the error to the processing callback
491         ResultBuilder resultBuilder(std::make_exception_ptr(IPCPeerDisconnectedException()));
492         IGNORE_EXCEPTIONS(request.process(resultBuilder));
493
494         return false;
495     }
496
497     if (mReturnCallbacks.count(request.messageID) != 0) {
498         LOGE(mLogPrefix + "There already was a return callback for messageID: " << request.messageID);
499     }
500     mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(peerIt->peerID,
501                                                                     std::move(request.parse),
502                                                                     std::move(request.process)));
503
504     MessageHeader hdr;
505     try {
506         // Send the call with the socket
507         Socket& socket = *peerIt->socketPtr;
508         Socket::Guard guard = socket.getGuard();
509         hdr.methodID = request.methodID;
510         hdr.messageID = request.messageID;
511         config::saveToFD<MessageHeader>(socket.getFD(), hdr);
512         LOGT(mLogPrefix + "Serializing the message");
513         request.serialize(socket.getFD(), request.data);
514     } catch (const std::exception& e) {
515         LOGE(mLogPrefix + "Error during sending a method: " << e.what());
516
517         // Inform about the error
518         ResultBuilder resultBuilder(std::make_exception_ptr(IPCSerializationException()));
519         IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(resultBuilder));
520
521
522         mReturnCallbacks.erase(request.messageID);
523         removePeerInternal(peerIt,
524                            std::make_exception_ptr(IPCSerializationException()));
525         return true;
526
527     }
528
529     return false;
530 }
531
532 bool Processor::onSignalRequest(SignalRequest& request)
533 {
534     LOGS(mLogPrefix + "Processor onSignalRequest");
535
536     auto peerIt = getPeerInfoIterator(request.peerID);
537
538     if (peerIt == mPeerInfo.end()) {
539         LOGE(mLogPrefix + "Peer disconnected. No user for peerID: " << request.peerID);
540         return false;
541     }
542
543     MessageHeader hdr;
544     try {
545         // Send the call with the socket
546         Socket& socket = *peerIt->socketPtr;
547         Socket::Guard guard = socket.getGuard();
548         hdr.methodID = request.methodID;
549         hdr.messageID = request.messageID;
550         config::saveToFD<MessageHeader>(socket.getFD(), hdr);
551         request.serialize(socket.getFD(), request.data);
552     } catch (const std::exception& e) {
553         LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
554
555         removePeerInternal(peerIt,
556                            std::make_exception_ptr(IPCSerializationException()));
557         return true;
558     }
559
560     return false;
561 }
562
563 bool Processor::onAddPeerRequest(AddPeerRequest& request)
564 {
565     LOGS(mLogPrefix + "Processor onAddPeerRequest");
566
567     if (mPeerInfo.size() > mMaxNumberOfPeers) {
568         LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerID);
569         return false;
570     }
571
572     if (getPeerInfoIterator(request.peerID) != mPeerInfo.end()) {
573         LOGE(mLogPrefix + "There already was a socket for peerID: " << request.peerID);
574         return false;
575     }
576
577     PeerInfo peerInfo(request.peerID, request.socketPtr);
578     mPeerInfo.push_back(std::move(peerInfo));
579
580
581     // Sending handled signals
582     std::vector<MethodID> ids;
583     for (const auto kv : mSignalsCallbacks) {
584         ids.push_back(kv.first);
585     }
586     auto data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
587     signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
588                                                    request.peerID,
589                                                    data);
590
591     if (mNewPeerCallback) {
592         // Notify about the new user.
593         LOGT(mLogPrefix + "Calling NewPeerCallback");
594         mNewPeerCallback(request.peerID, request.socketPtr->getFD());
595     }
596
597     LOGI(mLogPrefix + "New peerID: " << request.peerID);
598     return true;
599 }
600
601 bool Processor::onRemovePeerRequest(RemovePeerRequest& request)
602 {
603     LOGS(mLogPrefix + "Processor onRemovePeer");
604
605     removePeerInternal(getPeerInfoIterator(request.peerID),
606                        std::make_exception_ptr(IPCRemovedPeerException()));
607
608     request.conditionPtr->notify_all();
609
610     return true;
611 }
612
613 bool Processor::onSendResultRequest(SendResultRequest& request)
614 {
615     LOGS(mLogPrefix + "Processor onMethodRequest");
616
617     auto peerIt = getPeerInfoIterator(request.peerID);
618
619     if (peerIt == mPeerInfo.end()) {
620         LOGE(mLogPrefix + "Peer disconnected, no result is sent. No user with a peerID: " << request.peerID);
621         return false;
622     }
623
624     std::shared_ptr<MethodHandlers> methodCallbacks;
625     try {
626         methodCallbacks = mMethodsCallbacks.at(request.methodID);
627     } catch (const std::out_of_range&) {
628         LOGW(mLogPrefix + "No method, might have been deleted. methodID: " << request.methodID);
629         return true;
630     }
631
632     MessageHeader hdr;
633     try {
634         // Send the call with the socket
635         Socket& socket = *peerIt->socketPtr;
636         Socket::Guard guard = socket.getGuard();
637         hdr.methodID = RETURN_METHOD_ID;
638         hdr.messageID = request.messageID;
639         config::saveToFD<MessageHeader>(socket.getFD(), hdr);
640         LOGT(mLogPrefix + "Serializing the message");
641         methodCallbacks->serialize(socket.getFD(), request.data);
642     } catch (const std::exception& e) {
643         LOGE(mLogPrefix + "Error during sending a method: " << e.what());
644
645         // Inform about the error
646         ResultBuilder resultBuilder(std::make_exception_ptr(IPCSerializationException()));
647         IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(resultBuilder));
648
649
650         mReturnCallbacks.erase(request.messageID);
651         removePeerInternal(peerIt,
652                            std::make_exception_ptr(IPCSerializationException()));
653         return true;
654
655     }
656
657     return false;
658 }
659
660 bool Processor::onFinishRequest(FinishRequest& requestFinisher)
661 {
662     LOGS(mLogPrefix + "Processor onFinishRequest");
663
664     // Clean the mRequestQueue
665     while (!mRequestQueue.isEmpty()) {
666         auto request = mRequestQueue.pop();
667         LOGE(mLogPrefix + "Got: " << request.requestID << " after FINISH");
668
669         switch (request.requestID) {
670         case Event::METHOD: {
671             auto requestPtr = request.get<MethodRequest>();
672             ResultBuilder resultBuilder(std::make_exception_ptr(IPCClosingException()));
673             IGNORE_EXCEPTIONS(requestPtr->process(resultBuilder));
674             break;
675         }
676         case Event::REMOVE_PEER: {
677             onRemovePeerRequest(*request.get<RemovePeerRequest>());
678             break;
679         }
680         case Event::SEND_RESULT: {
681             onSendResultRequest(*request.get<SendResultRequest>());
682             break;
683         }
684         case Event::SIGNAL:
685         case Event::ADD_PEER:
686         case Event::FINISH:
687             break;
688         }
689     }
690
691     // Close peers
692     while (!mPeerInfo.empty()) {
693         removePeerInternal(--mPeerInfo.end(),
694                            std::make_exception_ptr(IPCClosingException()));
695     }
696
697     mEventPoll.removeFD(mRequestQueue.getFD());
698     mIsRunning = false;
699
700     requestFinisher.conditionPtr->notify_all();
701     return true;
702 }
703
704 std::ostream& operator<<(std::ostream& os, const Processor::Event& event)
705 {
706     switch (event) {
707
708     case Processor::Event::FINISH: {
709         os << "Event::FINISH";
710         break;
711     }
712
713     case Processor::Event::METHOD: {
714         os << "Event::METHOD";
715         break;
716     }
717
718     case Processor::Event::SIGNAL: {
719         os << "Event::SIGNAL";
720         break;
721     }
722
723     case Processor::Event::ADD_PEER: {
724         os << "Event::ADD_PEER";
725         break;
726     }
727
728     case Processor::Event::REMOVE_PEER: {
729         os << "Event::REMOVE_PEER";
730         break;
731     }
732
733     case Processor::Event::SEND_RESULT: {
734         os << "Event::SEND_RESULT";
735         break;
736     }
737     }
738
739     return os;
740 }
741
742 } // namespace ipc