IPC: Support older glib
[platform/core/security/vasum.git] / common / ipc / internals / processor.cpp
1 /*
2 *  Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 *  Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 *  Licensed under the Apache License, Version 2.0 (the "License");
7 *  you may not use this file except in compliance with the License.
8 *  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *  Unless required by applicable law or agreed to in writing, software
13 *  distributed under the License is distributed on an "AS IS" BASIS,
14 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License
17 */
18
19 /**
20  * @file
21  * @author  Jan Olszak (j.olszak@samsung.com)
22  * @brief   Data and event processing thread
23  */
24
25 #include "config.hpp"
26
27 #include "ipc/exception.hpp"
28 #include "ipc/internals/processor.hpp"
29 #include "ipc/internals/utils.hpp"
30 #include "utils/signal.hpp"
31
32 #include <cerrno>
33 #include <cstring>
34 #include <csignal>
35 #include <stdexcept>
36
37 #include <sys/socket.h>
38 #include <limits>
39
40 namespace vasum {
41 namespace ipc {
42
43 #define IGNORE_EXCEPTIONS(expr)                        \
44     try                                                \
45     {                                                  \
46         expr;                                          \
47     }                                                  \
48     catch (const std::exception& e){                   \
49         LOGE(mLogPrefix + "Callback threw an error: " << e.what()); \
50     }
51
52 const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
53 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
54
55 Processor::Processor(const std::string& logName,
56                      const PeerCallback& newPeerCallback,
57                      const PeerCallback& removedPeerCallback,
58                      const unsigned int maxNumberOfPeers)
59     : mLogPrefix(logName),
60       mIsRunning(false),
61       mNewPeerCallback(newPeerCallback),
62       mRemovedPeerCallback(removedPeerCallback),
63       mMaxNumberOfPeers(maxNumberOfPeers)
64 {
65     LOGS(mLogPrefix + "Processor Constructor");
66
67     utils::signalBlock(SIGPIPE);
68     using namespace std::placeholders;
69     setMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
70                                                                 std::bind(&Processor::onNewSignals, this, _1, _2));
71 }
72
73 Processor::~Processor()
74 {
75     LOGS(mLogPrefix + "Processor Destructor");
76     try {
77         stop();
78     } catch (IPCException& e) {
79         LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what());
80     }
81 }
82
83 bool Processor::isStarted()
84 {
85     Lock lock(mStateMutex);
86     return mIsRunning;
87 }
88
89 void Processor::start(bool usesExternalPolling)
90 {
91     LOGS(mLogPrefix + "Processor start");
92
93     Lock lock(mStateMutex);
94     if (!isStarted()) {
95         LOGI(mLogPrefix + "Processor start");
96         mIsRunning = true;
97         mUsesExternalPolling = usesExternalPolling;
98         if (!usesExternalPolling) {
99             mThread = std::thread(&Processor::run, this);
100         }
101     }
102 }
103
104 void Processor::stop()
105 {
106     LOGS(mLogPrefix + "Processor stop");
107
108     if (isStarted()) {
109         auto conditionPtr = std::make_shared<std::condition_variable_any>();
110         {
111             Lock lock(mStateMutex);
112             auto request = std::make_shared<FinishRequest>(conditionPtr);
113             mRequestQueue.push(Event::FINISH, request);
114         }
115
116         LOGD(mLogPrefix + "Waiting for the Processor to stop");
117
118         if (mThread.joinable()) {
119             mThread.join();
120         } else {
121             // Wait till the FINISH request is served
122             Lock lock(mStateMutex);
123             conditionPtr->wait(lock, [this]() {
124                 return !isStarted();
125             });
126         }
127     }
128 }
129
130 void Processor::setNewPeerCallback(const PeerCallback& newPeerCallback)
131 {
132     Lock lock(mStateMutex);
133     mNewPeerCallback = newPeerCallback;
134 }
135
136 void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
137 {
138     Lock lock(mStateMutex);
139     mRemovedPeerCallback = removedPeerCallback;
140 }
141
142 FileDescriptor Processor::getEventFD()
143 {
144     Lock lock(mStateMutex);
145     return mRequestQueue.getFD();
146 }
147
148 void Processor::removeMethod(const MethodID methodID)
149 {
150     Lock lock(mStateMutex);
151     mMethodsCallbacks.erase(methodID);
152 }
153
154 FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
155 {
156     LOGS(mLogPrefix + "Processor addPeer");
157     Lock lock(mStateMutex);
158
159     FileDescriptor peerFD = socketPtr->getFD();
160     auto request = std::make_shared<AddPeerRequest>(peerFD, socketPtr);
161     mRequestQueue.push(Event::ADD_PEER, request);
162
163     LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD);
164
165     return peerFD;
166 }
167
168 void Processor::removePeer(const FileDescriptor peerFD)
169 {
170     LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD);
171
172     {
173         Lock lock(mStateMutex);
174         mRequestQueue.removeIf([peerFD](Request & request) {
175             return request.requestID == Event::ADD_PEER &&
176                    request.get<AddPeerRequest>()->peerFD == peerFD;
177         });
178     }
179
180     // Remove peer and wait till he's gone
181     std::shared_ptr<std::condition_variable_any> conditionPtr(new std::condition_variable_any());
182     {
183         Lock lock(mStateMutex);
184         auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
185         mRequestQueue.push(Event::REMOVE_PEER, request);
186     }
187
188     auto isPeerDeleted = [&peerFD, this]()->bool {
189         return mSockets.count(peerFD) == 0;
190     };
191
192     Lock lock(mStateMutex);
193     conditionPtr->wait(lock, isPeerDeleted);
194 }
195
196 void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
197 {
198     LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD);
199     LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD);
200
201     if (!mSockets.erase(peerFD)) {
202         LOGW(mLogPrefix + "No such peer. Another thread called removePeerInternal");
203         return;
204     }
205
206     // Remove from signal addressees
207     for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
208         it->second.remove(peerFD);
209         if (it->second.empty()) {
210             it = mSignalsPeers.erase(it);
211         } else {
212             ++it;
213         }
214     }
215
216     // Erase associated return value callbacks
217     std::shared_ptr<void> data;
218     for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
219         if (it->second.peerFD == peerFD) {
220             IGNORE_EXCEPTIONS(it->second.process(status, data));
221             it = mReturnCallbacks.erase(it);
222         } else {
223             ++it;
224         }
225     }
226
227     if (mRemovedPeerCallback) {
228         // Notify about the deletion
229         mRemovedPeerCallback(peerFD);
230     }
231
232     resetPolling();
233 }
234
235 void Processor::resetPolling()
236 {
237     LOGS(mLogPrefix + "Processor resetPolling");
238
239     if (mUsesExternalPolling) {
240         return;
241     }
242
243     {
244         Lock lock(mStateMutex);
245         LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
246         // Setup polling on eventfd and sockets
247         mFDs.resize(mSockets.size() + 1);
248
249         mFDs[0].fd = mRequestQueue.getFD();
250         mFDs[0].events = POLLIN;
251
252         auto socketIt = mSockets.begin();
253         for (unsigned int i = 1; i < mFDs.size(); ++i) {
254             LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
255             mFDs[i].fd = socketIt->second->getFD();
256             mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
257             ++socketIt;
258             // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
259         }
260     }
261 }
262
263 void Processor::run()
264 {
265     LOGS(mLogPrefix + "Processor run");
266
267     resetPolling();
268
269     while (isStarted()) {
270         LOGT(mLogPrefix + "Waiting for communication...");
271         int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
272         LOGT(mLogPrefix + "... incoming communication!");
273         if (ret == -1 || ret == 0) {
274             if (errno == EINTR) {
275                 continue;
276             }
277             LOGE(mLogPrefix + "Error in poll: " << std::string(strerror(errno)));
278             throw IPCException("Error in poll: " + std::string(strerror(errno)));
279         }
280
281         // Check for lost connections:
282         if (handleLostConnections()) {
283             // mFDs changed
284             continue;
285         }
286
287         // Check for incoming data.
288         if (handleInputs()) {
289             // mFDs changed
290             continue;
291         }
292
293         // Check for incoming events
294         if (mFDs[0].revents & POLLIN) {
295             mFDs[0].revents &= ~(POLLIN);
296             if (handleEvent()) {
297                 // mFDs changed
298                 continue;
299             }
300         }
301
302     }
303 }
304
305 bool Processor::handleLostConnections()
306 {
307     Lock lock(mStateMutex);
308
309     bool isPeerRemoved = false;
310     {
311         for (unsigned int i = 1; i < mFDs.size(); ++i) {
312             if (mFDs[i].revents & POLLHUP) {
313                 LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd);
314                 mFDs[i].revents &= ~(POLLHUP);
315                 removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED);
316                 isPeerRemoved = true;
317             }
318         }
319     }
320
321     return isPeerRemoved;
322 }
323
324 bool Processor::handleLostConnection(const FileDescriptor peerFD)
325 {
326     Lock lock(mStateMutex);
327     removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
328     return true;
329 }
330
331 bool Processor::handleInputs()
332 {
333     Lock lock(mStateMutex);
334
335     bool pollChanged = false;
336     for (unsigned int i = 1; i < mFDs.size(); ++i) {
337         if (mFDs[i].revents & POLLIN) {
338             mFDs[i].revents &= ~(POLLIN);
339             pollChanged = pollChanged || handleInput(mFDs[i].fd);
340         }
341     }
342
343     return pollChanged;
344 }
345
346 bool Processor::handleInput(const FileDescriptor peerFD)
347 {
348     LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD);
349     Lock lock(mStateMutex);
350
351     std::shared_ptr<Socket> socketPtr;
352     try {
353         // Get the peer's socket
354         socketPtr = mSockets.at(peerFD);
355     } catch (const std::out_of_range&) {
356         LOGE(mLogPrefix + "No such peer: " << peerFD);
357         return false;
358     }
359
360     MethodID methodID;
361     MessageID messageID;
362     {
363         Socket::Guard guard = socketPtr->getGuard();
364         try {
365             socketPtr->read(&methodID, sizeof(methodID));
366             socketPtr->read(&messageID, sizeof(messageID));
367
368         } catch (const IPCException& e) {
369             LOGE(mLogPrefix + "Error during reading the socket");
370             removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
371             return true;
372         }
373
374         if (methodID == RETURN_METHOD_ID) {
375             return onReturnValue(*socketPtr, messageID);
376
377         } else {
378             if (mMethodsCallbacks.count(methodID)) {
379                 // Method
380                 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
381                 return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
382
383             } else if (mSignalsCallbacks.count(methodID)) {
384                 // Signal
385                 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
386                 return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
387
388             } else {
389                 // Nothing
390                 LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID);
391                 removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
392                 return true;
393             }
394         }
395     }
396 }
397
398 std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
399                                                               std::shared_ptr<RegisterSignalsMessage>& data)
400 {
401     LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD);
402
403     for (const MethodID methodID : data->ids) {
404         mSignalsPeers[methodID].push_back(peerFD);
405     }
406
407     return std::make_shared<EmptyData>();
408 }
409
410 bool Processor::onReturnValue(const Socket& socket,
411                               const MessageID messageID)
412 {
413     LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
414
415     // LOGI(mLogPrefix + "Return value for messageID: " << messageID);
416     ReturnCallbacks returnCallbacks;
417     try {
418         LOGT(mLogPrefix + "Getting the return callback");
419         returnCallbacks = std::move(mReturnCallbacks.at(messageID));
420         mReturnCallbacks.erase(messageID);
421     } catch (const std::out_of_range&) {
422         LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
423         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
424         return true;
425     }
426
427     std::shared_ptr<void> data;
428     try {
429         LOGT(mLogPrefix + "Parsing incoming return data");
430         data = returnCallbacks.parse(socket.getFD());
431     } catch (const std::exception& e) {
432         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
433         IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
434         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
435         return true;
436     }
437
438     // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID);
439     IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
440
441     // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed");
442     return false;
443 }
444
445 bool Processor::onRemoteSignal(const Socket& socket,
446                                const MethodID methodID,
447                                const MessageID messageID,
448                                std::shared_ptr<SignalHandlers> signalCallbacks)
449 {
450     LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
451
452     // LOGI(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
453
454     std::shared_ptr<void> data;
455     try {
456         LOGT(mLogPrefix + "Parsing incoming data");
457         data = signalCallbacks->parse(socket.getFD());
458     } catch (const std::exception& e) {
459         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
460         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
461         return true;
462     }
463
464     // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID);
465     try {
466         signalCallbacks->signal(socket.getFD(), data);
467     } catch (const std::exception& e) {
468         LOGE(mLogPrefix + "Exception in method handler: " << e.what());
469         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
470         return true;
471     }
472
473     return false;
474 }
475
476 bool Processor::onRemoteCall(const Socket& socket,
477                              const MethodID methodID,
478                              const MessageID messageID,
479                              std::shared_ptr<MethodHandlers> methodCallbacks)
480 {
481     LOGS(mLogPrefix + "Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
482     // LOGI(mLogPrefix + "Remote call; methodID: " << methodID << " messageID: " << messageID);
483
484     std::shared_ptr<void> data;
485     try {
486         LOGT(mLogPrefix + "Parsing incoming data");
487         data = methodCallbacks->parse(socket.getFD());
488     } catch (const std::exception& e) {
489         LOGE(mLogPrefix + "Exception during parsing: " << e.what());
490         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
491         return true;
492     }
493
494     LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID);
495     std::shared_ptr<void> returnData;
496     try {
497         returnData = methodCallbacks->method(socket.getFD(), data);
498     } catch (const std::exception& e) {
499         LOGE(mLogPrefix + "Exception in method handler: " << e.what());
500         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
501         return true;
502     }
503
504     LOGT(mLogPrefix + "Sending return data; methodID: " << methodID << "; messageID: " << messageID);
505     try {
506         // Send the call with the socket
507         Socket::Guard guard = socket.getGuard();
508         socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
509         socket.write(&messageID, sizeof(messageID));
510         methodCallbacks->serialize(socket.getFD(), returnData);
511     } catch (const std::exception& e) {
512         LOGE(mLogPrefix + "Exception during serialization: " << e.what());
513         removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
514         return true;
515     }
516
517     return false;
518 }
519
520 bool Processor::handleEvent()
521 {
522     LOGS(mLogPrefix + "Processor handleEvent");
523
524     Lock lock(mStateMutex);
525
526     auto request = mRequestQueue.pop();
527     LOGD(mLogPrefix + "Got: " << request.requestID);
528
529     switch (request.requestID) {
530     case Event::METHOD:      return onMethodRequest(*request.get<MethodRequest>());
531     case Event::SIGNAL:      return onSignalRequest(*request.get<SignalRequest>());
532     case Event::ADD_PEER:    return onAddPeerRequest(*request.get<AddPeerRequest>());
533     case Event::REMOVE_PEER: return onRemovePeerRequest(*request.get<RemovePeerRequest>());
534     case Event::FINISH:      return onFinishRequest(*request.get<FinishRequest>());
535     }
536
537     return false;
538 }
539
540 bool Processor::onMethodRequest(MethodRequest& request)
541 {
542     LOGS(mLogPrefix + "Processor onMethodRequest");
543     std::shared_ptr<Socket> socketPtr;
544
545     try {
546         // Get the peer's socket
547         socketPtr = mSockets.at(request.peerFD);
548     } catch (const std::out_of_range&) {
549         LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
550
551         // Pass the error to the processing callback
552         IGNORE_EXCEPTIONS(request.process(Status::PEER_DISCONNECTED, request.data));
553
554         return false;
555     }
556
557     if (mReturnCallbacks.count(request.messageID) != 0) {
558         LOGE(mLogPrefix + "There already was a return callback for messageID: " << request.messageID);
559     }
560     mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(request.peerFD,
561                                                                     std::move(request.parse),
562                                                                     std::move(request.process)));
563
564     try {
565         // Send the call with the socket
566         Socket::Guard guard = socketPtr->getGuard();
567         socketPtr->write(&request.methodID, sizeof(request.methodID));
568         socketPtr->write(&request.messageID, sizeof(request.messageID));
569         LOGT(mLogPrefix + "Serializing the message");
570         request.serialize(socketPtr->getFD(), request.data);
571     } catch (const std::exception& e) {
572         LOGE(mLogPrefix + "Error during sending a method: " << e.what());
573
574         // Inform about the error,
575         IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(Status::SERIALIZATION_ERROR, request.data));
576
577
578         mReturnCallbacks.erase(request.messageID);
579         removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
580
581         return true;
582
583     }
584
585     return false;
586 }
587
588 bool Processor::onSignalRequest(SignalRequest& request)
589 {
590     LOGS(mLogPrefix + "Processor onSignalRequest");
591
592     std::shared_ptr<Socket> socketPtr;
593     try {
594         // Get the peer's socket
595         socketPtr = mSockets.at(request.peerFD);
596     } catch (const std::out_of_range&) {
597         LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
598         return false;
599     }
600
601     try {
602         // Send the call with the socket
603         Socket::Guard guard = socketPtr->getGuard();
604         socketPtr->write(&request.methodID, sizeof(request.methodID));
605         socketPtr->write(&request.messageID, sizeof(request.messageID));
606         request.serialize(socketPtr->getFD(), request.data);
607     } catch (const std::exception& e) {
608         LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
609
610         removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
611         return true;
612     }
613
614     return false;
615 }
616
617 bool Processor::onAddPeerRequest(AddPeerRequest& request)
618 {
619     LOGS(mLogPrefix + "Processor onAddPeerRequest");
620
621     if (mSockets.size() > mMaxNumberOfPeers) {
622         LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerFD);
623         return false;
624     }
625     if (mSockets.count(request.peerFD) != 0) {
626         LOGE(mLogPrefix + "There already was a socket for peerFD: " << request.peerFD);
627         return false;
628     }
629
630     mSockets[request.peerFD] = std::move(request.socketPtr);
631
632
633     // Sending handled signals
634     std::vector<MethodID> ids;
635     for (const auto kv : mSignalsCallbacks) {
636         ids.push_back(kv.first);
637     }
638     auto data = std::make_shared<RegisterSignalsMessage>(ids);
639     callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
640                                                  request.peerFD,
641                                                  data,
642                                                  discardResultHandler<EmptyData>);
643
644
645     resetPolling();
646
647     if (mNewPeerCallback) {
648         // Notify about the new user.
649         LOGT(mLogPrefix + "Calling NewPeerCallback");
650         mNewPeerCallback(request.peerFD);
651     }
652
653     LOGI(mLogPrefix + "New peer: " << request.peerFD);
654     return true;
655 }
656
657 bool Processor::onRemovePeerRequest(RemovePeerRequest& request)
658 {
659     LOGS(mLogPrefix + "Processor onRemovePeer");
660
661     removePeerInternal(request.peerFD, Status::REMOVED_PEER);
662     request.conditionPtr->notify_all();
663
664     return true;
665 }
666
667 bool Processor::onFinishRequest(FinishRequest& request)
668 {
669     LOGS(mLogPrefix + "Processor onFinishRequest");
670
671     // Clean the mRequestQueue
672     while (!mRequestQueue.isEmpty()) {
673         auto request = mRequestQueue.pop();
674         LOGE(mLogPrefix + "Got: " << request.requestID << " after FINISH");
675
676         switch (request.requestID) {
677         case Event::METHOD: {
678             auto requestPtr = request.get<MethodRequest>();
679             IGNORE_EXCEPTIONS(requestPtr->process(Status::CLOSING, requestPtr->data));
680             break;
681         }
682         case Event::REMOVE_PEER: {
683             onRemovePeerRequest(*request.get<RemovePeerRequest>());
684             break;
685         }
686         case Event::SIGNAL:
687         case Event::ADD_PEER:
688         case Event::FINISH:
689             break;
690         }
691     }
692
693     mIsRunning = false;
694
695     request.conditionPtr->notify_all();
696     return true;
697 }
698
699 std::ostream& operator<<(std::ostream& os, const Processor::Event& event)
700 {
701     switch (event) {
702
703     case Processor::Event::FINISH: {
704         os << "Event::FINISH";
705         break;
706     }
707
708     case Processor::Event::METHOD: {
709         os << "Event::METHOD";
710         break;
711     }
712
713     case Processor::Event::SIGNAL: {
714         os << "Event::SIGNAL";
715         break;
716     }
717
718     case Processor::Event::ADD_PEER: {
719         os << "Event::ADD_PEER";
720         break;
721     }
722
723     case Processor::Event::REMOVE_PEER: {
724         os << "Event::REMOVE_PEER";
725         break;
726     }
727     }
728
729     return os;
730 }
731
732 } // namespace ipc
733 } // namespace vasum