IPC: External polling loop with a Client
[platform/core/security/vasum.git] / common / ipc / internals / processor.cpp
1 /*
2 *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 *  Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 *  Licensed under the Apache License, Version 2.0 (the "License");
7 *  you may not use this file except in compliance with the License.
8 *  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *  Unless required by applicable law or agreed to in writing, software
13 *  distributed under the License is distributed on an "AS IS" BASIS,
14 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License
17 */
18
19 /**
20  * @file
21  * @author  Jan Olszak (j.olszak@samsung.com)
22  * @brief   Data and event processing thread
23  */
24
25 #include "config.hpp"
26
27 #include "ipc/exception.hpp"
28 #include "ipc/internals/processor.hpp"
29 #include "ipc/internals/utils.hpp"
30 #include "utils/signal.hpp"
31
32 #include <cerrno>
33 #include <cstring>
34 #include <csignal>
35 #include <stdexcept>
36
37 #include <sys/socket.h>
38 #include <limits>
39
40 namespace vasum {
41 namespace ipc {
42
43 #define IGNORE_EXCEPTIONS(expr)                        \
44     try                                                \
45     {                                                  \
46         expr;                                          \
47     }                                                  \
48     catch (const std::exception& e){                   \
49         LOGE("Callback threw an error: " << e.what()); \
50     }
51
52 const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
53 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
54
55 Processor::Processor(const PeerCallback& newPeerCallback,
56                      const PeerCallback& removedPeerCallback,
57                      const unsigned int maxNumberOfPeers)
58     : mNewPeerCallback(newPeerCallback),
59       mRemovedPeerCallback(removedPeerCallback),
60       mMaxNumberOfPeers(maxNumberOfPeers)
61 {
62     LOGT("Creating Processor");
63
64     utils::signalBlock(SIGPIPE);
65     using namespace std::placeholders;
66     addMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
67                                                                 std::bind(&Processor::onNewSignals, this, _1, _2));
68
69 }
70
71 Processor::~Processor()
72 {
73     LOGT("Destroying Processor");
74     try {
75         stop();
76     } catch (IPCException& e) {
77         LOGE("Error in Processor's destructor: " << e.what());
78     }
79
80     LOGT("Destroyed Processor");
81 }
82
83 bool Processor::isStarted()
84 {
85     return mThread.joinable();
86 }
87
88 void Processor::start()
89 {
90     LOGT("Starting Processor");
91     if (!isStarted()) {
92         mThread = std::thread(&Processor::run, this);
93     }
94     LOGT("Started Processor");
95 }
96
97 void Processor::stop()
98 {
99     LOGT("Stopping Processor");
100
101     if (isStarted()) {
102         mEventQueue.send(Event::FINISH);
103         mThread.join();
104     }
105
106     LOGT("Stopped Processor");
107 }
108
109 void Processor::setNewPeerCallback(const PeerCallback& newPeerCallback)
110 {
111     Lock lock(mCallbacksMutex);
112     mNewPeerCallback = newPeerCallback;
113 }
114
115 void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
116 {
117     Lock lock(mCallbacksMutex);
118     mRemovedPeerCallback = removedPeerCallback;
119 }
120
121 FileDescriptor Processor::getEventFD()
122 {
123     return mEventQueue.getFD();
124 }
125
126 void Processor::removeMethod(const MethodID methodID)
127 {
128     LOGT("Removing method " << methodID);
129     Lock lock(mCallsMutex);
130     mMethodsCallbacks.erase(methodID);
131 }
132
133 FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
134 {
135     LOGT("Adding socket");
136     FileDescriptor peerFD;
137     {
138         Lock lock(mSocketsMutex);
139         peerFD = socketPtr->getFD();
140         SocketInfo socketInfo(peerFD, std::move(socketPtr));
141         mNewSockets.push(std::move(socketInfo));
142         mEventQueue.send(Event::ADD_PEER);
143     }
144     LOGI("New peer added. Id: " << peerFD);
145
146     return peerFD;
147 }
148
149 void Processor::removePeer(const FileDescriptor peerFD)
150 {
151     std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
152
153     {
154         Lock lock(mSocketsMutex);
155         RemovePeerRequest request(peerFD, conditionPtr);
156         mPeersToDelete.push(std::move(request));
157         mEventQueue.send(Event::REMOVE_PEER);
158     }
159
160
161     auto isPeerDeleted = [&peerFD, this]()->bool {
162         Lock lock(mSocketsMutex);
163         return mSockets.count(peerFD) == 0;
164     };
165
166     std::mutex mutex;
167     std::unique_lock<std::mutex> lock(mutex);
168     conditionPtr->wait(lock, isPeerDeleted);
169 }
170
171 void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
172 {
173     LOGW("Removing peer. ID: " << peerFD);
174     {
175         Lock lock(mSocketsMutex);
176         if (!mSockets.erase(peerFD)) {
177             LOGW("No such peer. Another thread called removePeerInternal");
178             return;
179         }
180
181         // Remove from signal addressees
182         for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
183             it->second.remove(peerFD);
184             if (it->second.empty()) {
185                 it = mSignalsPeers.erase(it);
186             } else {
187                 ++it;
188             }
189         }
190     }
191
192     {
193         // Erase associated return value callbacks
194         Lock lock(mReturnCallbacksMutex);
195
196         std::shared_ptr<void> data;
197         for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
198             if (it->second.peerFD == peerFD) {
199                 IGNORE_EXCEPTIONS(it->second.process(status, data));
200                 it = mReturnCallbacks.erase(it);
201             } else {
202                 ++it;
203             }
204         }
205     }
206
207
208     {
209         Lock lock(mCallbacksMutex);
210         if (mRemovedPeerCallback) {
211             // Notify about the deletion
212             mRemovedPeerCallback(peerFD);
213         }
214     }
215
216     resetPolling();
217 }
218
219 void Processor::resetPolling()
220 {
221     if (!isStarted()) {
222         return;
223     }
224
225     LOGI("Resetting polling");
226     // Setup polling on eventfd and sockets
227     Lock lock(mSocketsMutex);
228     mFDs.resize(mSockets.size() + 1);
229
230     mFDs[0].fd = mEventQueue.getFD();
231     mFDs[0].events = POLLIN;
232
233     auto socketIt = mSockets.begin();
234     for (unsigned int i = 1; i < mFDs.size(); ++i) {
235         mFDs[i].fd = socketIt->second->getFD();
236         mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
237         ++socketIt;
238         // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
239     }
240 }
241
242 void Processor::run()
243 {
244     resetPolling();
245
246     mIsRunning = true;
247     while (mIsRunning) {
248         LOGT("Waiting for communication...");
249         int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
250         LOGT("... incoming communication!");
251         if (ret == -1 || ret == 0) {
252             if (errno == EINTR) {
253                 continue;
254             }
255             LOGE("Error in poll: " << std::string(strerror(errno)));
256             throw IPCException("Error in poll: " + std::string(strerror(errno)));
257         }
258
259         // Check for lost connections:
260         if (handleLostConnections()) {
261             // mFDs changed
262             continue;
263         }
264
265         // Check for incoming data.
266         if (handleInputs()) {
267             // mFDs changed
268             continue;
269         }
270
271         // Check for incoming events
272         if (mFDs[0].revents & POLLIN) {
273             mFDs[0].revents &= ~(POLLIN);
274             if (handleEvent()) {
275                 // mFDs changed
276                 continue;
277             }
278         }
279
280     }
281 }
282
283 bool Processor::handleLostConnections()
284 {
285     std::vector<FileDescriptor> peersToRemove;
286     {
287         Lock lock(mSocketsMutex);
288         for (unsigned int i = 1; i < mFDs.size(); ++i) {
289             if (mFDs[i].revents & POLLHUP) {
290                 LOGI("Lost connection to peer: " << mFDs[i].fd);
291                 mFDs[i].revents &= ~(POLLHUP);
292                 peersToRemove.push_back(mFDs[i].fd);
293             }
294         }
295     }
296
297     for (const FileDescriptor peerFD : peersToRemove) {
298         removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
299     }
300
301     return !peersToRemove.empty();
302 }
303
304 bool Processor::handleLostConnection(const FileDescriptor peerFD)
305 {
306     removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
307     return true;
308 }
309
310 bool Processor::handleInputs()
311 {
312     std::vector<FileDescriptor> peersWithInput;
313     {
314         Lock lock(mSocketsMutex);
315         for (unsigned int i = 1; i < mFDs.size(); ++i) {
316             if (mFDs[i].revents & POLLIN) {
317                 mFDs[i].revents &= ~(POLLIN);
318                 peersWithInput.push_back(mFDs[i].fd);
319             }
320         }
321     }
322
323     bool pollChanged = false;
324     // Handle input outside the critical section
325     for (const FileDescriptor peerFD : peersWithInput) {
326         pollChanged = pollChanged || handleInput(peerFD);
327     }
328     return pollChanged;
329 }
330
331 bool Processor::handleInput(const FileDescriptor peerFD)
332 {
333     LOGT("Handle incoming data");
334
335     std::shared_ptr<Socket> socketPtr;
336     try {
337         // Get the peer's socket
338         Lock lock(mSocketsMutex);
339         socketPtr = mSockets.at(peerFD);
340     } catch (const std::out_of_range&) {
341         LOGE("No such peer: " << peerFD);
342         return false;
343     }
344
345     MethodID methodID;
346     MessageID messageID;
347     {
348         Socket::Guard guard = socketPtr->getGuard();
349         try {
350             socketPtr->read(&methodID, sizeof(methodID));
351             socketPtr->read(&messageID, sizeof(messageID));
352
353         } catch (const IPCException& e) {
354             LOGE("Error during reading the socket");
355             removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
356             return true;
357         }
358
359         if (methodID == RETURN_METHOD_ID) {
360             return onReturnValue(*socketPtr, messageID);
361
362         } else {
363             Lock lock(mCallsMutex);
364             if (mMethodsCallbacks.count(methodID)) {
365                 // Method
366                 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
367                 lock.unlock();
368                 return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
369
370             } else if (mSignalsCallbacks.count(methodID)) {
371                 // Signal
372                 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
373                 lock.unlock();
374                 return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
375
376             } else {
377                 // Nothing
378                 lock.unlock();
379                 LOGW("No method or signal callback for methodID: " << methodID);
380                 removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
381                 return true;
382             }
383         }
384     }
385 }
386
387 std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
388                                                               std::shared_ptr<RegisterSignalsMessage>& data)
389 {
390     LOGD("New signals for peer: " << peerFD);
391     Lock lock(mSocketsMutex);
392     for (MethodID methodID : data->ids) {
393         mSignalsPeers[methodID].push_back(peerFD);
394     }
395
396     return std::make_shared<EmptyData>();
397 }
398
399 bool Processor::onReturnValue(const Socket& socket,
400                               const MessageID messageID)
401 {
402     LOGI("Return value for messageID: " << messageID);
403     ReturnCallbacks returnCallbacks;
404     try {
405         Lock lock(mReturnCallbacksMutex);
406         LOGT("Getting the return callback");
407         returnCallbacks = std::move(mReturnCallbacks.at(messageID));
408         mReturnCallbacks.erase(messageID);
409     } catch (const std::out_of_range&) {
410         LOGW("No return callback for messageID: " << messageID);
411         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
412         return true;
413     }
414
415     std::shared_ptr<void> data;
416     try {
417         LOGT("Parsing incoming return data");
418         data = returnCallbacks.parse(socket.getFD());
419     } catch (const std::exception& e) {
420         LOGE("Exception during parsing: " << e.what());
421         IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
422         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
423         return true;
424     }
425
426     LOGT("Process return value callback for messageID: " << messageID);
427     IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
428
429     return false;
430 }
431
432 bool Processor::onRemoteSignal(const Socket& socket,
433                                const MethodID methodID,
434                                const MessageID messageID,
435                                std::shared_ptr<SignalHandlers> signalCallbacks)
436 {
437     LOGI("Remote signal; methodID: " << methodID << " messageID: " << messageID);
438
439     std::shared_ptr<void> data;
440     try {
441         LOGT("Parsing incoming data");
442         data = signalCallbacks->parse(socket.getFD());
443     } catch (const std::exception& e) {
444         LOGE("Exception during parsing: " << e.what());
445         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
446         return true;
447     }
448
449     LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
450     try {
451         signalCallbacks->signal(socket.getFD(), data);
452     } catch (const std::exception& e) {
453         LOGE("Exception in method handler: " << e.what());
454         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
455         return true;
456     }
457
458     return false;
459 }
460
461 bool Processor::onRemoteCall(const Socket& socket,
462                              const MethodID methodID,
463                              const MessageID messageID,
464                              std::shared_ptr<MethodHandlers> methodCallbacks)
465 {
466     LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
467
468     std::shared_ptr<void> data;
469     try {
470         LOGT("Parsing incoming data");
471         data = methodCallbacks->parse(socket.getFD());
472     } catch (const std::exception& e) {
473         LOGE("Exception during parsing: " << e.what());
474         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
475         return true;
476     }
477
478     LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
479     std::shared_ptr<void> returnData;
480     try {
481         returnData = methodCallbacks->method(socket.getFD(), data);
482     } catch (const std::exception& e) {
483         LOGE("Exception in method handler: " << e.what());
484         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
485         return true;
486     }
487
488     LOGT("Sending return data; methodID: " << methodID << "; messageID: " << messageID);
489     try {
490         // Send the call with the socket
491         Socket::Guard guard = socket.getGuard();
492         socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
493         socket.write(&messageID, sizeof(messageID));
494         methodCallbacks->serialize(socket.getFD(), returnData);
495     } catch (const std::exception& e) {
496         LOGE("Exception during serialization: " << e.what());
497         removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
498         return true;
499     }
500
501     return false;
502 }
503
504 bool Processor::handleEvent()
505 {
506     switch (mEventQueue.receive()) {
507     case Event::FINISH: {
508         LOGD("Event FINISH");
509
510         mIsRunning = false;
511         cleanCommunication();
512
513         return false;
514     }
515
516     case Event::CALL: {
517         LOGD("Event CALL");
518         return onCall();
519     }
520
521     case Event::ADD_PEER: {
522         LOGD("Event ADD_PEER");
523         return onNewPeer();
524     }
525
526     case Event::REMOVE_PEER: {
527         LOGD("Event REMOVE_PEER");
528         return onRemovePeer();
529     }
530     }
531
532     return false;
533 }
534
535 bool Processor::onNewPeer()
536 {
537     SocketInfo socketInfo;
538     {
539         Lock lock(mSocketsMutex);
540
541         socketInfo = std::move(mNewSockets.front());
542         mNewSockets.pop();
543
544         if (mSockets.size() > mMaxNumberOfPeers) {
545             LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
546             return false;
547         }
548         if (mSockets.count(socketInfo.peerFD) != 0) {
549             LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
550             return false;
551         }
552
553         mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
554     }
555
556
557     // Broadcast the new signal to peers
558     LOGW("Sending handled signals");
559     std::list<FileDescriptor> peersFDs;
560     {
561         Lock lock(mSocketsMutex);
562         for (const auto kv : mSockets) {
563             peersFDs.push_back(kv.first);
564         }
565     }
566
567     std::vector<MethodID> ids;
568     {
569         Lock lock(mSocketsMutex);
570         for (const auto kv : mSignalsCallbacks) {
571             ids.push_back(kv.first);
572         }
573     }
574     auto data = std::make_shared<RegisterSignalsMessage>(ids);
575
576     for (const FileDescriptor peerFD : peersFDs) {
577         callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
578                                                         peerFD,
579                                                         data,
580                                                         discardResultHandler<EmptyData>);
581     }
582     LOGW("Sent handled signals");
583
584
585     resetPolling();
586
587     {
588         Lock lock(mCallbacksMutex);
589         if (mNewPeerCallback) {
590             // Notify about the new user.
591             mNewPeerCallback(socketInfo.peerFD);
592         }
593     }
594     return true;
595 }
596
597 bool Processor::onRemovePeer()
598 {
599     RemovePeerRequest request;
600     {
601         Lock lock(mSocketsMutex);
602         request = std::move(mPeersToDelete.front());
603         mPeersToDelete.pop();
604     }
605
606     removePeerInternal(request.peerFD, Status::REMOVED_PEER);
607     request.conditionPtr->notify_all();
608     return true;
609 }
610
611 CallQueue::Call Processor::getCall()
612 {
613     Lock lock(mCallsMutex);
614     return mCalls.pop();
615 }
616
617 bool Processor::onCall()
618 {
619     LOGT("Handle call (from another thread) to send a message.");
620     CallQueue::Call call = getCall();
621
622     if (call.parse && call.process) {
623         return onMethodCall(call);
624     } else {
625         return onSignalCall(call);
626     }
627 }
628
629 bool Processor::onSignalCall(CallQueue::Call& call)
630 {
631     std::shared_ptr<Socket> socketPtr;
632     try {
633         // Get the peer's socket
634         Lock lock(mSocketsMutex);
635         socketPtr = mSockets.at(call.peerFD);
636     } catch (const std::out_of_range&) {
637         LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
638         return false;
639     }
640
641     try {
642         // Send the call with the socket
643         Socket::Guard guard = socketPtr->getGuard();
644         socketPtr->write(&call.methodID, sizeof(call.methodID));
645         socketPtr->write(&call.messageID, sizeof(call.messageID));
646         call.serialize(socketPtr->getFD(), call.data);
647     } catch (const std::exception& e) {
648         LOGE("Error during sending a signal: " << e.what());
649
650         removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
651         return true;
652     }
653
654     return false;
655
656 }
657
658 bool Processor::onMethodCall(CallQueue::Call& call)
659 {
660     std::shared_ptr<Socket> socketPtr;
661     try {
662         // Get the peer's socket
663         Lock lock(mSocketsMutex);
664         socketPtr = mSockets.at(call.peerFD);
665     } catch (const std::out_of_range&) {
666         LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
667
668         // Pass the error to the processing callback
669         IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
670
671         return false;
672     }
673
674     {
675         // Set what to do with the return message, but only if needed
676         Lock lock(mReturnCallbacksMutex);
677         if (mReturnCallbacks.count(call.messageID) != 0) {
678             LOGE("There already was a return callback for messageID: " << call.messageID);
679         }
680         mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
681                                                                      std::move(call.parse),
682                                                                      std::move(call.process)));
683     }
684
685     try {
686         // Send the call with the socket
687         Socket::Guard guard = socketPtr->getGuard();
688         socketPtr->write(&call.methodID, sizeof(call.methodID));
689         socketPtr->write(&call.messageID, sizeof(call.messageID));
690         call.serialize(socketPtr->getFD(), call.data);
691     } catch (const std::exception& e) {
692         LOGE("Error during sending a method: " << e.what());
693
694         // Inform about the error,
695         IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data));
696
697         {
698             Lock lock(mReturnCallbacksMutex);
699             mReturnCallbacks.erase(call.messageID);
700         }
701
702         removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
703         return true;
704     }
705
706     return false;
707 }
708
709 void Processor::cleanCommunication()
710 {
711     while (!mEventQueue.isEmpty()) {
712         switch (mEventQueue.receive()) {
713         case Event::FINISH: {
714             LOGD("Event FINISH after FINISH");
715             break;
716         }
717         case Event::CALL: {
718             LOGD("Event CALL after FINISH");
719             CallQueue::Call call = getCall();
720             if (call.process) {
721                 IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
722             }
723             break;
724         }
725
726         case Event::ADD_PEER: {
727             LOGD("Event ADD_PEER after FINISH");
728             break;
729         }
730
731         case Event::REMOVE_PEER: {
732             LOGD("Event REMOVE_PEER after FINISH");
733             RemovePeerRequest request;
734             {
735                 Lock lock(mSocketsMutex);
736                 request = std::move(mPeersToDelete.front());
737                 mPeersToDelete.pop();
738             }
739             request.conditionPtr->notify_all();
740             break;
741         }
742         }
743     }
744 }
745
746 } // namespace ipc
747 } // namespace vasum