IPC: Replace PeerID witch peer's file descriptor
[platform/core/security/vasum.git] / common / ipc / internals / processor.cpp
1 /*
2 *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 *  Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 *  Licensed under the Apache License, Version 2.0 (the "License");
7 *  you may not use this file except in compliance with the License.
8 *  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *  Unless required by applicable law or agreed to in writing, software
13 *  distributed under the License is distributed on an "AS IS" BASIS,
14 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License
17 */
18
19 /**
20  * @file
21  * @author  Jan Olszak (j.olszak@samsung.com)
22  * @brief   Data and event processing thread
23  */
24
25 #include "config.hpp"
26
27 #include "ipc/exception.hpp"
28 #include "ipc/internals/processor.hpp"
29 #include "ipc/internals/utils.hpp"
30
31 #include <cerrno>
32 #include <cstring>
33 #include <stdexcept>
34
35 #include <sys/socket.h>
36 #include <limits>
37
38 namespace security_containers {
39 namespace ipc {
40
41 #define IGNORE_EXCEPTIONS(expr)                        \
42     try                                                \
43     {                                                  \
44         expr;                                          \
45     }                                                  \
46     catch (const std::exception& e){                   \
47         LOGE("Callback threw an error: " << e.what()); \
48     }
49
50 const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
51 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
52
53 Processor::Processor(const PeerCallback& newPeerCallback,
54                      const PeerCallback& removedPeerCallback,
55                      const unsigned int maxNumberOfPeers)
56     : mNewPeerCallback(newPeerCallback),
57       mRemovedPeerCallback(removedPeerCallback),
58       mMaxNumberOfPeers(maxNumberOfPeers)
59 {
60     LOGT("Creating Processor");
61     using namespace std::placeholders;
62
63     addMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
64                                                                 std::bind(&Processor::onNewSignals, this, _1, _2));
65
66 }
67
68 Processor::~Processor()
69 {
70     LOGT("Destroying Processor");
71     try {
72         stop();
73     } catch (IPCException& e) {
74         LOGE("Error in Processor's destructor: " << e.what());
75     }
76     LOGT("Destroyed Processor");
77 }
78
79 bool Processor::isStarted()
80 {
81     return mThread.joinable();
82 }
83
84 void Processor::start()
85 {
86     LOGT("Starting Processor");
87     if (!isStarted()) {
88         mThread = std::thread(&Processor::run, this);
89     }
90     LOGT("Started Processor");
91 }
92
93 void Processor::stop()
94 {
95     LOGT("Stopping Processor");
96     if (isStarted()) {
97         mEventQueue.send(Event::FINISH);
98         mThread.join();
99     }
100     LOGT("Stopped Processor");
101 }
102
103 void Processor::setNewPeerCallback(const PeerCallback& newPeerCallback)
104 {
105     Lock lock(mCallbacksMutex);
106     mNewPeerCallback = newPeerCallback;
107 }
108
109 void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
110 {
111     Lock lock(mCallbacksMutex);
112     mRemovedPeerCallback = removedPeerCallback;
113 }
114
115 void Processor::removeMethod(const MethodID methodID)
116 {
117     LOGT("Removing method " << methodID);
118     Lock lock(mCallsMutex);
119     mMethodsCallbacks.erase(methodID);
120 }
121
122 FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
123 {
124     LOGT("Adding socket");
125     FileDescriptor peerFD;
126     {
127         Lock lock(mSocketsMutex);
128         peerFD = socketPtr->getFD();
129         SocketInfo socketInfo(peerFD, std::move(socketPtr));
130         mNewSockets.push(std::move(socketInfo));
131     }
132     LOGI("New peer added. Id: " << peerFD);
133     mEventQueue.send(Event::ADD_PEER);
134
135     return peerFD;
136 }
137
138 void Processor::removePeer(const FileDescriptor peerFD)
139 {
140     std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
141
142     {
143         Lock lock(mSocketsMutex);
144         RemovePeerRequest request(peerFD, conditionPtr);
145         mPeersToDelete.push(std::move(request));
146     }
147
148     mEventQueue.send(Event::REMOVE_PEER);
149
150     auto isPeerDeleted = [&peerFD, this]()->bool {
151         Lock lock(mSocketsMutex);
152         return mSockets.count(peerFD) == 0;
153     };
154
155     std::mutex mutex;
156     std::unique_lock<std::mutex> lock(mutex);
157     conditionPtr->wait(lock, isPeerDeleted);
158 }
159
160 void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
161 {
162     LOGW("Removing peer. ID: " << peerFD);
163     {
164         Lock lock(mSocketsMutex);
165         mSockets.erase(peerFD);
166
167         // Remove from signal addressees
168         for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
169             it->second.remove(peerFD);
170             if (it->second.empty()) {
171                 it = mSignalsPeers.erase(it);
172             } else {
173                 ++it;
174             }
175         }
176     }
177
178     {
179         // Erase associated return value callbacks
180         Lock lock(mReturnCallbacksMutex);
181
182         std::shared_ptr<void> data;
183         for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
184             if (it->second.peerFD == peerFD) {
185                 IGNORE_EXCEPTIONS(it->second.process(status, data));
186                 it = mReturnCallbacks.erase(it);
187             } else {
188                 ++it;
189             }
190         }
191     }
192
193
194     {
195         Lock lock(mCallbacksMutex);
196         if (mRemovedPeerCallback) {
197             // Notify about the deletion
198             mRemovedPeerCallback(peerFD);
199         }
200     }
201
202     resetPolling();
203 }
204
205 void Processor::resetPolling()
206 {
207     LOGI("Resetting polling");
208     // Setup polling on eventfd and sockets
209     Lock lock(mSocketsMutex);
210     mFDs.resize(mSockets.size() + 1);
211
212     mFDs[0].fd = mEventQueue.getFD();
213     mFDs[0].events = POLLIN;
214
215     auto socketIt = mSockets.begin();
216     for (unsigned int i = 1; i < mFDs.size(); ++i) {
217         mFDs[i].fd = socketIt->second->getFD();
218         mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
219         ++socketIt;
220         // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
221     }
222 }
223
224 void Processor::run()
225 {
226     resetPolling();
227
228     mIsRunning = true;
229     while (mIsRunning) {
230         LOGT("Waiting for communication...");
231         int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
232         LOGT("... incoming communication!");
233         if (ret == -1 || ret == 0) {
234             if (errno == EINTR) {
235                 continue;
236             }
237             LOGE("Error in poll: " << std::string(strerror(errno)));
238             throw IPCException("Error in poll: " + std::string(strerror(errno)));
239         }
240
241         // Check for lost connections:
242         if (handleLostConnections()) {
243             // mFDs changed
244             continue;
245         }
246
247         // Check for incoming data.
248         if (handleInputs()) {
249             // mFDs changed
250             continue;
251         }
252
253         // Check for incoming events
254         if (handleEvent()) {
255             // mFDs changed
256             continue;
257         }
258     }
259
260     cleanCommunication();
261 }
262
263
264 bool Processor::handleLostConnections()
265 {
266     std::vector<FileDescriptor> peersToRemove;
267
268     {
269         Lock lock(mSocketsMutex);
270         for (unsigned int i = 1; i < mFDs.size(); ++i) {
271             if (mFDs[i].revents & POLLHUP) {
272                 LOGI("Lost connection to peer: " << mFDs[i].fd);
273                 mFDs[i].revents &= ~(POLLHUP);
274                 peersToRemove.push_back(mFDs[i].fd);
275             }
276         }
277     }
278
279     for (const FileDescriptor peerFD : peersToRemove) {
280         removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
281     }
282
283     return !peersToRemove.empty();
284 }
285
286 bool Processor::handleInputs()
287 {
288     std::vector<std::shared_ptr<Socket>> socketsWithInput;
289     {
290         Lock lock(mSocketsMutex);
291         for (unsigned int i = 1; i < mFDs.size(); ++i) {
292             if (mFDs[i].revents & POLLIN) {
293                 mFDs[i].revents &= ~(POLLIN);
294                 socketsWithInput.push_back(mSockets[mFDs[i].fd]);
295             }
296         }
297     }
298
299     bool pollChanged = false;
300     // Handle input outside the critical section
301     for (const auto& socketPtr : socketsWithInput) {
302         pollChanged = pollChanged || handleInput(*socketPtr);
303     }
304     return pollChanged;
305 }
306
307 bool Processor::handleInput(const Socket& socket)
308 {
309     LOGT("Handle incoming data");
310     MethodID methodID;
311     MessageID messageID;
312     {
313         Socket::Guard guard = socket.getGuard();
314         socket.read(&methodID, sizeof(methodID));
315         socket.read(&messageID, sizeof(messageID));
316
317         if (methodID == RETURN_METHOD_ID) {
318             return onReturnValue(socket, messageID);
319
320         } else {
321             Lock lock(mCallsMutex);
322             if (mMethodsCallbacks.count(methodID)) {
323                 // Method
324                 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
325                 lock.unlock();
326                 return onRemoteCall(socket, methodID, messageID, methodCallbacks);
327
328             } else if (mSignalsCallbacks.count(methodID)) {
329                 // Signal
330                 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
331                 lock.unlock();
332                 return onRemoteSignal(socket, methodID, messageID, signalCallbacks);
333
334             } else {
335                 // Nothing
336                 lock.unlock();
337                 LOGW("No method or signal callback for methodID: " << methodID);
338                 removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
339                 return true;
340             }
341         }
342     }
343
344     return false;
345 }
346
347 std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
348                                                               std::shared_ptr<RegisterSignalsMessage>& data)
349 {
350     LOGD("New signals for peer: " << peerFD);
351     Lock lock(mSocketsMutex);
352     for (MethodID methodID : data->ids) {
353         mSignalsPeers[methodID].push_back(peerFD);
354     }
355
356     return std::make_shared<EmptyData>();
357 }
358
359 bool Processor::onReturnValue(const Socket& socket,
360                               const MessageID messageID)
361 {
362     LOGI("Return value for messageID: " << messageID);
363     ReturnCallbacks returnCallbacks;
364     try {
365         Lock lock(mReturnCallbacksMutex);
366         LOGT("Getting the return callback");
367         returnCallbacks = std::move(mReturnCallbacks.at(messageID));
368         mReturnCallbacks.erase(messageID);
369     } catch (const std::out_of_range&) {
370         LOGW("No return callback for messageID: " << messageID);
371         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
372         return true;
373     }
374
375     std::shared_ptr<void> data;
376     try {
377         LOGT("Parsing incoming return data");
378         data = returnCallbacks.parse(socket.getFD());
379     } catch (const std::exception& e) {
380         LOGE("Exception during parsing: " << e.what());
381         IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
382         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
383         return true;
384     }
385
386     LOGT("Process return value callback for messageID: " << messageID);
387     IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
388
389     return false;
390 }
391
392 bool Processor::onRemoteSignal(const Socket& socket,
393                                const MethodID methodID,
394                                const MessageID messageID,
395                                std::shared_ptr<SignalHandlers> signalCallbacks)
396 {
397     LOGI("Remote signal; methodID: " << methodID << " messageID: " << messageID);
398
399     std::shared_ptr<void> data;
400     try {
401         LOGT("Parsing incoming data");
402         data = signalCallbacks->parse(socket.getFD());
403     } catch (const std::exception& e) {
404         LOGE("Exception during parsing: " << e.what());
405         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
406         return true;
407     }
408
409     LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
410     try {
411         signalCallbacks->signal(socket.getFD(), data);
412     } catch (const std::exception& e) {
413         LOGE("Exception in method handler: " << e.what());
414         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
415         return true;
416     }
417
418     return false;
419 }
420
421 bool Processor::onRemoteCall(const Socket& socket,
422                              const MethodID methodID,
423                              const MessageID messageID,
424                              std::shared_ptr<MethodHandlers> methodCallbacks)
425 {
426     LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
427
428     std::shared_ptr<void> data;
429     try {
430         LOGT("Parsing incoming data");
431         data = methodCallbacks->parse(socket.getFD());
432     } catch (const std::exception& e) {
433         LOGE("Exception during parsing: " << e.what());
434         removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
435         return true;
436     }
437
438     LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
439     std::shared_ptr<void> returnData;
440     try {
441         returnData = methodCallbacks->method(socket.getFD(), data);
442     } catch (const std::exception& e) {
443         LOGE("Exception in method handler: " << e.what());
444         removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
445         return true;
446     }
447
448     LOGT("Sending return data; methodID: " << methodID << "; messageID: " << messageID);
449     try {
450         // Send the call with the socket
451         Socket::Guard guard = socket.getGuard();
452         socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
453         socket.write(&messageID, sizeof(messageID));
454         methodCallbacks->serialize(socket.getFD(), returnData);
455     } catch (const std::exception& e) {
456         LOGE("Exception during serialization: " << e.what());
457         removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
458         return true;
459     }
460
461     return false;
462 }
463
464 bool Processor::handleEvent()
465 {
466     if (!(mFDs[0].revents & POLLIN)) {
467         // No event to serve
468         return false;
469     }
470
471     mFDs[0].revents &= ~(POLLIN);
472
473     switch (mEventQueue.receive()) {
474     case Event::FINISH: {
475         LOGD("Event FINISH");
476         mIsRunning = false;
477         return false;
478     }
479
480     case Event::CALL: {
481         LOGD("Event CALL");
482         return onCall();
483     }
484
485     case Event::ADD_PEER: {
486         LOGD("Event ADD_PEER");
487         return onNewPeer();
488     }
489
490     case Event::REMOVE_PEER: {
491         LOGD("Event REMOVE_PEER");
492         return onRemovePeer();
493     }
494     }
495
496     return false;
497 }
498
499 bool Processor::onNewPeer()
500 {
501     SocketInfo socketInfo;
502     {
503         Lock lock(mSocketsMutex);
504
505         socketInfo = std::move(mNewSockets.front());
506         mNewSockets.pop();
507
508         if (mSockets.size() > mMaxNumberOfPeers) {
509             LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
510             return false;
511         }
512         if (mSockets.count(socketInfo.peerFD) != 0) {
513             LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
514             return false;
515         }
516
517         mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
518     }
519
520
521     // Broadcast the new signal to peers
522     LOGW("Sending handled signals");
523     std::list<FileDescriptor> peersIDs;
524     {
525         Lock lock(mSocketsMutex);
526         for (const auto kv : mSockets) {
527             peersIDs.push_back(kv.first);
528         }
529     }
530
531     std::vector<MethodID> ids;
532     {
533         Lock lock(mSocketsMutex);
534         for (const auto kv : mSignalsCallbacks) {
535             ids.push_back(kv.first);
536         }
537     }
538     auto data = std::make_shared<RegisterSignalsMessage>(ids);
539
540     for (const FileDescriptor peerFD : peersIDs) {
541         callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
542                                                         peerFD,
543                                                         data,
544                                                         discardResultHandler<EmptyData>);
545     }
546     LOGW("Sent handled signals");
547
548
549     resetPolling();
550
551     {
552         Lock lock(mCallbacksMutex);
553         if (mNewPeerCallback) {
554             // Notify about the new user.
555             mNewPeerCallback(socketInfo.peerFD);
556         }
557     }
558     return true;
559 }
560
561 bool Processor::onRemovePeer()
562 {
563     RemovePeerRequest request;
564     {
565         Lock lock(mSocketsMutex);
566         request = std::move(mPeersToDelete.front());
567         mPeersToDelete.pop();
568     }
569
570     removePeerInternal(request.peerFD, Status::REMOVED_PEER);
571     request.conditionPtr->notify_all();
572     return true;
573 }
574
575 CallQueue::Call Processor::getCall()
576 {
577     Lock lock(mCallsMutex);
578     return mCalls.pop();
579 }
580
581 bool Processor::onCall()
582 {
583     LOGT("Handle call (from another thread) to send a message.");
584     CallQueue::Call call = getCall();
585
586     std::shared_ptr<Socket> socketPtr;
587     try {
588         // Get the peer's socket
589         Lock lock(mSocketsMutex);
590         socketPtr = mSockets.at(call.peerFD);
591     } catch (const std::out_of_range&) {
592         LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
593         IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
594         return false;
595     }
596
597     if (call.parse && call.process) {
598         // Set what to do with the return message, but only if needed
599         Lock lock(mReturnCallbacksMutex);
600         if (mReturnCallbacks.count(call.messageID) != 0) {
601             LOGE("There already was a return callback for messageID: " << call.messageID);
602         }
603         mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
604                                                                      std::move(call.parse),
605                                                                      std::move(call.process)));
606     }
607
608     try {
609         // Send the call with the socket
610         Socket::Guard guard = socketPtr->getGuard();
611         socketPtr->write(&call.methodID, sizeof(call.methodID));
612         socketPtr->write(&call.messageID, sizeof(call.messageID));
613         call.serialize(socketPtr->getFD(), call.data);
614     } catch (const std::exception& e) {
615         LOGE("Error during sending a message: " << e.what());
616
617         // Inform about the error
618         IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data));
619
620         {
621             Lock lock(mReturnCallbacksMutex);
622             mReturnCallbacks.erase(call.messageID);
623         }
624
625         removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
626         return true;
627     }
628
629     return false;
630 }
631
632 void Processor::cleanCommunication()
633 {
634     while (!mEventQueue.isEmpty()) {
635         switch (mEventQueue.receive()) {
636         case Event::FINISH: {
637             LOGD("Event FINISH after FINISH");
638             break;
639         }
640         case Event::CALL: {
641             LOGD("Event CALL after FINISH");
642             CallQueue::Call call = getCall();
643             if (call.process) {
644                 IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
645             }
646             break;
647         }
648
649         case Event::ADD_PEER: {
650             LOGD("Event ADD_PEER after FINISH");
651             break;
652         }
653
654         case Event::REMOVE_PEER: {
655             LOGD("Event REMOVE_PEER after FINISH");
656             RemovePeerRequest request;
657             {
658                 Lock lock(mSocketsMutex);
659                 request = std::move(mPeersToDelete.front());
660                 mPeersToDelete.pop();
661             }
662             request.conditionPtr->notify_all();
663             break;
664         }
665         }
666     }
667 }
668
669 } // namespace ipc
670 } // namespace security_containers