2 * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
4 * Contact: Jan Olszak <j.olszak@samsung.com>
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License
21 * @author Jan Olszak (j.olszak@samsung.com)
22 * @brief Data and event processing thread
27 #include "ipc/exception.hpp"
28 #include "ipc/internals/processor.hpp"
29 #include "ipc/internals/utils.hpp"
30 #include "utils/signal.hpp"
37 #include <sys/socket.h>
43 #define IGNORE_EXCEPTIONS(expr) \
48 catch (const std::exception& e){ \
49 LOGE("Callback threw an error: " << e.what()); \
52 const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
53 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
55 Processor::Processor(const PeerCallback& newPeerCallback,
56 const PeerCallback& removedPeerCallback,
57 const unsigned int maxNumberOfPeers)
58 : mNewPeerCallback(newPeerCallback),
59 mRemovedPeerCallback(removedPeerCallback),
60 mMaxNumberOfPeers(maxNumberOfPeers)
62 LOGT("Creating Processor");
64 utils::signalBlock(SIGPIPE);
65 using namespace std::placeholders;
66 addMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
67 std::bind(&Processor::onNewSignals, this, _1, _2));
71 Processor::~Processor()
73 LOGT("Destroying Processor");
76 } catch (IPCException& e) {
77 LOGE("Error in Processor's destructor: " << e.what());
80 LOGT("Destroyed Processor");
83 bool Processor::isStarted()
85 return mThread.joinable();
88 void Processor::start()
90 LOGT("Starting Processor");
92 mThread = std::thread(&Processor::run, this);
94 LOGT("Started Processor");
97 void Processor::stop()
99 LOGT("Stopping Processor");
102 mEventQueue.send(Event::FINISH);
106 LOGT("Stopped Processor");
109 void Processor::setNewPeerCallback(const PeerCallback& newPeerCallback)
111 Lock lock(mCallbacksMutex);
112 mNewPeerCallback = newPeerCallback;
115 void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
117 Lock lock(mCallbacksMutex);
118 mRemovedPeerCallback = removedPeerCallback;
121 FileDescriptor Processor::getEventFD()
123 return mEventQueue.getFD();
126 void Processor::removeMethod(const MethodID methodID)
128 LOGT("Removing method " << methodID);
129 Lock lock(mCallsMutex);
130 mMethodsCallbacks.erase(methodID);
133 FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
135 LOGT("Adding socket");
136 FileDescriptor peerFD;
138 Lock lock(mSocketsMutex);
139 peerFD = socketPtr->getFD();
140 SocketInfo socketInfo(peerFD, std::move(socketPtr));
141 mNewSockets.push(std::move(socketInfo));
142 mEventQueue.send(Event::ADD_PEER);
144 LOGI("New peer added. Id: " << peerFD);
149 void Processor::removePeer(const FileDescriptor peerFD)
151 std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
154 Lock lock(mSocketsMutex);
155 RemovePeerRequest request(peerFD, conditionPtr);
156 mPeersToDelete.push(std::move(request));
157 mEventQueue.send(Event::REMOVE_PEER);
161 auto isPeerDeleted = [&peerFD, this]()->bool {
162 Lock lock(mSocketsMutex);
163 return mSockets.count(peerFD) == 0;
167 std::unique_lock<std::mutex> lock(mutex);
168 conditionPtr->wait(lock, isPeerDeleted);
171 void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
173 LOGW("Removing peer. ID: " << peerFD);
175 Lock lock(mSocketsMutex);
176 if (!mSockets.erase(peerFD)) {
177 LOGW("No such peer. Another thread called removePeerInternal");
181 // Remove from signal addressees
182 for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
183 it->second.remove(peerFD);
184 if (it->second.empty()) {
185 it = mSignalsPeers.erase(it);
193 // Erase associated return value callbacks
194 Lock lock(mReturnCallbacksMutex);
196 std::shared_ptr<void> data;
197 for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
198 if (it->second.peerFD == peerFD) {
199 IGNORE_EXCEPTIONS(it->second.process(status, data));
200 it = mReturnCallbacks.erase(it);
209 Lock lock(mCallbacksMutex);
210 if (mRemovedPeerCallback) {
211 // Notify about the deletion
212 mRemovedPeerCallback(peerFD);
219 void Processor::resetPolling()
225 LOGI("Resetting polling");
226 // Setup polling on eventfd and sockets
227 Lock lock(mSocketsMutex);
228 mFDs.resize(mSockets.size() + 1);
230 mFDs[0].fd = mEventQueue.getFD();
231 mFDs[0].events = POLLIN;
233 auto socketIt = mSockets.begin();
234 for (unsigned int i = 1; i < mFDs.size(); ++i) {
235 mFDs[i].fd = socketIt->second->getFD();
236 mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
238 // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
242 void Processor::run()
248 LOGT("Waiting for communication...");
249 int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
250 LOGT("... incoming communication!");
251 if (ret == -1 || ret == 0) {
252 if (errno == EINTR) {
255 LOGE("Error in poll: " << std::string(strerror(errno)));
256 throw IPCException("Error in poll: " + std::string(strerror(errno)));
259 // Check for lost connections:
260 if (handleLostConnections()) {
265 // Check for incoming data.
266 if (handleInputs()) {
271 // Check for incoming events
272 if (mFDs[0].revents & POLLIN) {
273 mFDs[0].revents &= ~(POLLIN);
283 bool Processor::handleLostConnections()
285 std::vector<FileDescriptor> peersToRemove;
287 Lock lock(mSocketsMutex);
288 for (unsigned int i = 1; i < mFDs.size(); ++i) {
289 if (mFDs[i].revents & POLLHUP) {
290 LOGI("Lost connection to peer: " << mFDs[i].fd);
291 mFDs[i].revents &= ~(POLLHUP);
292 peersToRemove.push_back(mFDs[i].fd);
297 for (const FileDescriptor peerFD : peersToRemove) {
298 removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
301 return !peersToRemove.empty();
304 bool Processor::handleLostConnection(const FileDescriptor peerFD)
306 removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
310 bool Processor::handleInputs()
312 std::vector<FileDescriptor> peersWithInput;
314 Lock lock(mSocketsMutex);
315 for (unsigned int i = 1; i < mFDs.size(); ++i) {
316 if (mFDs[i].revents & POLLIN) {
317 mFDs[i].revents &= ~(POLLIN);
318 peersWithInput.push_back(mFDs[i].fd);
323 bool pollChanged = false;
324 // Handle input outside the critical section
325 for (const FileDescriptor peerFD : peersWithInput) {
326 pollChanged = pollChanged || handleInput(peerFD);
331 bool Processor::handleInput(const FileDescriptor peerFD)
333 LOGT("Handle incoming data");
335 std::shared_ptr<Socket> socketPtr;
337 // Get the peer's socket
338 Lock lock(mSocketsMutex);
339 socketPtr = mSockets.at(peerFD);
340 } catch (const std::out_of_range&) {
341 LOGE("No such peer: " << peerFD);
348 Socket::Guard guard = socketPtr->getGuard();
350 socketPtr->read(&methodID, sizeof(methodID));
351 socketPtr->read(&messageID, sizeof(messageID));
353 } catch (const IPCException& e) {
354 LOGE("Error during reading the socket");
355 removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
359 if (methodID == RETURN_METHOD_ID) {
360 return onReturnValue(*socketPtr, messageID);
363 Lock lock(mCallsMutex);
364 if (mMethodsCallbacks.count(methodID)) {
366 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
368 return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
370 } else if (mSignalsCallbacks.count(methodID)) {
372 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
374 return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
379 LOGW("No method or signal callback for methodID: " << methodID);
380 removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
387 std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
388 std::shared_ptr<RegisterSignalsMessage>& data)
390 LOGD("New signals for peer: " << peerFD);
391 Lock lock(mSocketsMutex);
392 for (MethodID methodID : data->ids) {
393 mSignalsPeers[methodID].push_back(peerFD);
396 return std::make_shared<EmptyData>();
399 bool Processor::onReturnValue(const Socket& socket,
400 const MessageID messageID)
402 LOGI("Return value for messageID: " << messageID);
403 ReturnCallbacks returnCallbacks;
405 Lock lock(mReturnCallbacksMutex);
406 LOGT("Getting the return callback");
407 returnCallbacks = std::move(mReturnCallbacks.at(messageID));
408 mReturnCallbacks.erase(messageID);
409 } catch (const std::out_of_range&) {
410 LOGW("No return callback for messageID: " << messageID);
411 removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
415 std::shared_ptr<void> data;
417 LOGT("Parsing incoming return data");
418 data = returnCallbacks.parse(socket.getFD());
419 } catch (const std::exception& e) {
420 LOGE("Exception during parsing: " << e.what());
421 IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
422 removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
426 LOGT("Process return value callback for messageID: " << messageID);
427 IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
432 bool Processor::onRemoteSignal(const Socket& socket,
433 const MethodID methodID,
434 const MessageID messageID,
435 std::shared_ptr<SignalHandlers> signalCallbacks)
437 LOGI("Remote signal; methodID: " << methodID << " messageID: " << messageID);
439 std::shared_ptr<void> data;
441 LOGT("Parsing incoming data");
442 data = signalCallbacks->parse(socket.getFD());
443 } catch (const std::exception& e) {
444 LOGE("Exception during parsing: " << e.what());
445 removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
449 LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
451 signalCallbacks->signal(socket.getFD(), data);
452 } catch (const std::exception& e) {
453 LOGE("Exception in method handler: " << e.what());
454 removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
461 bool Processor::onRemoteCall(const Socket& socket,
462 const MethodID methodID,
463 const MessageID messageID,
464 std::shared_ptr<MethodHandlers> methodCallbacks)
466 LOGI("Remote call; methodID: " << methodID << " messageID: " << messageID);
468 std::shared_ptr<void> data;
470 LOGT("Parsing incoming data");
471 data = methodCallbacks->parse(socket.getFD());
472 } catch (const std::exception& e) {
473 LOGE("Exception during parsing: " << e.what());
474 removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
478 LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
479 std::shared_ptr<void> returnData;
481 returnData = methodCallbacks->method(socket.getFD(), data);
482 } catch (const std::exception& e) {
483 LOGE("Exception in method handler: " << e.what());
484 removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
488 LOGT("Sending return data; methodID: " << methodID << "; messageID: " << messageID);
490 // Send the call with the socket
491 Socket::Guard guard = socket.getGuard();
492 socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
493 socket.write(&messageID, sizeof(messageID));
494 methodCallbacks->serialize(socket.getFD(), returnData);
495 } catch (const std::exception& e) {
496 LOGE("Exception during serialization: " << e.what());
497 removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
504 bool Processor::handleEvent()
506 switch (mEventQueue.receive()) {
507 case Event::FINISH: {
508 LOGD("Event FINISH");
511 cleanCommunication();
521 case Event::ADD_PEER: {
522 LOGD("Event ADD_PEER");
526 case Event::REMOVE_PEER: {
527 LOGD("Event REMOVE_PEER");
528 return onRemovePeer();
535 bool Processor::onNewPeer()
537 SocketInfo socketInfo;
539 Lock lock(mSocketsMutex);
541 socketInfo = std::move(mNewSockets.front());
544 if (mSockets.size() > mMaxNumberOfPeers) {
545 LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
548 if (mSockets.count(socketInfo.peerFD) != 0) {
549 LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
553 mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
557 // Broadcast the new signal to peers
558 LOGW("Sending handled signals");
559 std::list<FileDescriptor> peersFDs;
561 Lock lock(mSocketsMutex);
562 for (const auto kv : mSockets) {
563 peersFDs.push_back(kv.first);
567 std::vector<MethodID> ids;
569 Lock lock(mSocketsMutex);
570 for (const auto kv : mSignalsCallbacks) {
571 ids.push_back(kv.first);
574 auto data = std::make_shared<RegisterSignalsMessage>(ids);
576 for (const FileDescriptor peerFD : peersFDs) {
577 callInternal<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
580 discardResultHandler<EmptyData>);
582 LOGW("Sent handled signals");
588 Lock lock(mCallbacksMutex);
589 if (mNewPeerCallback) {
590 // Notify about the new user.
591 mNewPeerCallback(socketInfo.peerFD);
597 bool Processor::onRemovePeer()
599 RemovePeerRequest request;
601 Lock lock(mSocketsMutex);
602 request = std::move(mPeersToDelete.front());
603 mPeersToDelete.pop();
606 removePeerInternal(request.peerFD, Status::REMOVED_PEER);
607 request.conditionPtr->notify_all();
611 CallQueue::Call Processor::getCall()
613 Lock lock(mCallsMutex);
617 bool Processor::onCall()
619 LOGT("Handle call (from another thread) to send a message.");
620 CallQueue::Call call = getCall();
622 if (call.parse && call.process) {
623 return onMethodCall(call);
625 return onSignalCall(call);
629 bool Processor::onSignalCall(CallQueue::Call& call)
631 std::shared_ptr<Socket> socketPtr;
633 // Get the peer's socket
634 Lock lock(mSocketsMutex);
635 socketPtr = mSockets.at(call.peerFD);
636 } catch (const std::out_of_range&) {
637 LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
642 // Send the call with the socket
643 Socket::Guard guard = socketPtr->getGuard();
644 socketPtr->write(&call.methodID, sizeof(call.methodID));
645 socketPtr->write(&call.messageID, sizeof(call.messageID));
646 call.serialize(socketPtr->getFD(), call.data);
647 } catch (const std::exception& e) {
648 LOGE("Error during sending a signal: " << e.what());
650 removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
658 bool Processor::onMethodCall(CallQueue::Call& call)
660 std::shared_ptr<Socket> socketPtr;
662 // Get the peer's socket
663 Lock lock(mSocketsMutex);
664 socketPtr = mSockets.at(call.peerFD);
665 } catch (const std::out_of_range&) {
666 LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
668 // Pass the error to the processing callback
669 IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
675 // Set what to do with the return message, but only if needed
676 Lock lock(mReturnCallbacksMutex);
677 if (mReturnCallbacks.count(call.messageID) != 0) {
678 LOGE("There already was a return callback for messageID: " << call.messageID);
680 mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
681 std::move(call.parse),
682 std::move(call.process)));
686 // Send the call with the socket
687 Socket::Guard guard = socketPtr->getGuard();
688 socketPtr->write(&call.methodID, sizeof(call.methodID));
689 socketPtr->write(&call.messageID, sizeof(call.messageID));
690 call.serialize(socketPtr->getFD(), call.data);
691 } catch (const std::exception& e) {
692 LOGE("Error during sending a method: " << e.what());
694 // Inform about the error,
695 IGNORE_EXCEPTIONS(mReturnCallbacks[call.messageID].process(Status::SERIALIZATION_ERROR, call.data));
698 Lock lock(mReturnCallbacksMutex);
699 mReturnCallbacks.erase(call.messageID);
702 removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
709 void Processor::cleanCommunication()
711 while (!mEventQueue.isEmpty()) {
712 switch (mEventQueue.receive()) {
713 case Event::FINISH: {
714 LOGD("Event FINISH after FINISH");
718 LOGD("Event CALL after FINISH");
719 CallQueue::Call call = getCall();
721 IGNORE_EXCEPTIONS(call.process(Status::CLOSING, call.data));
726 case Event::ADD_PEER: {
727 LOGD("Event ADD_PEER after FINISH");
731 case Event::REMOVE_PEER: {
732 LOGD("Event REMOVE_PEER after FINISH");
733 RemovePeerRequest request;
735 Lock lock(mSocketsMutex);
736 request = std::move(mPeersToDelete.front());
737 mPeersToDelete.pop();
739 request.conditionPtr->notify_all();