2 * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
4 * Contact: Jan Olszak <j.olszak@samsung.com>
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License
21 * @author Jan Olszak (j.olszak@samsung.com)
22 * @brief Data and event processing thread
27 #include "ipc/exception.hpp"
28 #include "ipc/internals/processor.hpp"
29 #include "config/manager.hpp"
30 #include "config/exception.hpp"
37 #include <sys/socket.h>
40 using namespace utils;
44 #define IGNORE_EXCEPTIONS(expr) \
49 catch (const std::exception& e){ \
50 LOGE(mLogPrefix + "Callback threw an error: " << e.what()); \
53 const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
54 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
55 const MethodID Processor::ERROR_METHOD_ID = std::numeric_limits<MethodID>::max() - 2;
57 Processor::Processor(epoll::EventPoll& eventPoll,
58 const std::string& logName,
59 const PeerCallback& newPeerCallback,
60 const PeerCallback& removedPeerCallback,
61 const unsigned int maxNumberOfPeers)
62 : mEventPoll(eventPoll),
65 mNewPeerCallback(newPeerCallback),
66 mRemovedPeerCallback(removedPeerCallback),
67 mMaxNumberOfPeers(maxNumberOfPeers)
69 LOGS(mLogPrefix + "Processor Constructor");
71 using namespace std::placeholders;
72 setSignalHandlerInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
73 std::bind(&Processor::onNewSignals, this, _1, _2));
75 setSignalHandlerInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, std::bind(&Processor::onErrorSignal, this, _1, _2));
78 Processor::~Processor()
80 LOGS(mLogPrefix + "Processor Destructor");
83 } catch (std::exception& e) {
84 LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what());
88 Processor::Peers::iterator Processor::getPeerInfoIterator(const FileDescriptor fd)
90 return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [fd](const PeerInfo & peerInfo) {
91 return fd == peerInfo.socketPtr->getFD();
95 Processor::Peers::iterator Processor::getPeerInfoIterator(const PeerID & peerID)
97 return std::find_if(mPeerInfo.begin(), mPeerInfo.end(), [peerID](const PeerInfo & peerInfo) {
98 return peerID == peerInfo.peerID;
102 bool Processor::isStarted()
104 Lock lock(mStateMutex);
108 void Processor::start()
110 LOGS(mLogPrefix + "Processor start");
112 Lock lock(mStateMutex);
114 LOGI(mLogPrefix + "Processor start");
117 mEventPoll.addFD(mRequestQueue.getFD(), EPOLLIN, std::bind(&Processor::handleEvent, this));
121 void Processor::stop(bool wait)
123 LOGS(mLogPrefix + "Processor stop");
126 auto conditionPtr = std::make_shared<std::condition_variable>();
128 Lock lock(mStateMutex);
129 auto request = std::make_shared<FinishRequest>(conditionPtr);
130 mRequestQueue.pushBack(Event::FINISH, request);
134 LOGD(mLogPrefix + "Waiting for the Processor to stop");
136 // Wait till the FINISH request is served
137 Lock lock(mStateMutex);
138 conditionPtr->wait(lock, [this]() {
141 assert(mPeerInfo.empty());
146 void Processor::setNewPeerCallback(const PeerCallback& newPeerCallback)
148 Lock lock(mStateMutex);
149 mNewPeerCallback = newPeerCallback;
152 void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
154 Lock lock(mStateMutex);
155 mRemovedPeerCallback = removedPeerCallback;
158 FileDescriptor Processor::getEventFD()
160 Lock lock(mStateMutex);
161 return mRequestQueue.getFD();
164 void Processor::sendResult(const MethodID methodID,
165 const PeerID& peerID,
166 const MessageID& messageID,
167 const std::shared_ptr<void>& data)
169 auto requestPtr = std::make_shared<SendResultRequest>(methodID, peerID, messageID, data);
170 mRequestQueue.pushFront(Event::SEND_RESULT, requestPtr);
173 void Processor::sendError(const PeerID& peerID,
174 const MessageID& messageID,
176 const std::string& message)
178 auto data = std::make_shared<ErrorProtocolMessage>(messageID, errorCode, message);
179 signalInternal<ErrorProtocolMessage>(ERROR_METHOD_ID, peerID , data);
182 void Processor::sendVoid(const MethodID methodID,
183 const PeerID& peerID,
184 const MessageID& messageID)
186 auto data = std::make_shared<EmptyData>();
187 auto requestPtr = std::make_shared<SendResultRequest>(methodID, peerID, messageID, data);
188 mRequestQueue.pushFront(Event::SEND_RESULT, requestPtr);
191 void Processor::removeMethod(const MethodID methodID)
193 Lock lock(mStateMutex);
194 mMethodsCallbacks.erase(methodID);
197 PeerID Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
199 LOGS(mLogPrefix + "Processor addPeer");
200 Lock lock(mStateMutex);
202 auto requestPtr = std::make_shared<AddPeerRequest>(socketPtr);
203 mRequestQueue.pushBack(Event::ADD_PEER, requestPtr);
205 LOGI(mLogPrefix + "Add Peer Request. Id: " << requestPtr->peerID
206 << ", fd: " << socketPtr->getFD());
208 return requestPtr->peerID;
211 void Processor::removePeerSyncInternal(const PeerID& peerID, Lock& lock)
213 LOGS(mLogPrefix + "Processor removePeer peerID: " << peerID);
215 auto isPeerDeleted = [&peerID, this]()->bool {
216 return getPeerInfoIterator(peerID) == mPeerInfo.end();
219 mRequestQueue.removeIf([peerID](Request & request) {
220 return request.requestID == Event::ADD_PEER &&
221 request.get<AddPeerRequest>()->peerID == peerID;
224 // Remove peer and wait till he's gone
225 std::shared_ptr<std::condition_variable> conditionPtr(new std::condition_variable());
227 auto request = std::make_shared<RemovePeerRequest>(peerID, conditionPtr);
228 mRequestQueue.pushBack(Event::REMOVE_PEER, request);
230 conditionPtr->wait(lock, isPeerDeleted);
233 void Processor::removePeerInternal(Peers::iterator peerIt, const std::exception_ptr& exceptionPtr)
235 if (peerIt == mPeerInfo.end()) {
236 LOGW("Peer already removed");
240 LOGS(mLogPrefix + "Processor removePeerInternal peerID: " << peerIt->peerID);
241 LOGI(mLogPrefix + "Removing peer. peerID: " << peerIt->peerID);
243 // Remove from signal addressees
244 for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
245 it->second.remove(peerIt->peerID);
246 if (it->second.empty()) {
247 it = mSignalsPeers.erase(it);
253 // Erase associated return value callbacks
254 for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
255 if (it->second.peerID == peerIt->peerID) {
256 ResultBuilder resultBuilder(exceptionPtr);
257 IGNORE_EXCEPTIONS(it->second.process(resultBuilder));
258 it = mReturnCallbacks.erase(it);
264 if (mRemovedPeerCallback) {
265 // Notify about the deletion
266 mRemovedPeerCallback(peerIt->peerID, peerIt->socketPtr->getFD());
269 mPeerInfo.erase(peerIt);
272 bool Processor::handleLostConnection(const FileDescriptor fd)
274 Lock lock(mStateMutex);
275 auto peerIt = getPeerInfoIterator(fd);
276 removePeerInternal(peerIt,
277 std::make_exception_ptr(IPCPeerDisconnectedException()));
281 bool Processor::handleInput(const FileDescriptor fd)
283 LOGS(mLogPrefix + "Processor handleInput fd: " << fd);
285 Lock lock(mStateMutex);
287 auto peerIt = getPeerInfoIterator(fd);
289 if (peerIt == mPeerInfo.end()) {
290 LOGE(mLogPrefix + "No peer for fd: " << fd);
297 // Read information about the incoming data
298 Socket& socket = *peerIt->socketPtr;
299 Socket::Guard guard = socket.getGuard();
300 config::loadFromFD<MessageHeader>(socket.getFD(), hdr);
301 } catch (const config::ConfigException& e) {
302 LOGE(mLogPrefix + "Error during reading the socket");
303 removePeerInternal(peerIt,
304 std::make_exception_ptr(IPCNaughtyPeerException()));
308 if (hdr.methodID == RETURN_METHOD_ID) {
309 return onReturnValue(peerIt, hdr.messageID);
312 if (mMethodsCallbacks.count(hdr.methodID)) {
314 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(hdr.methodID);
315 return onRemoteMethod(peerIt, hdr.methodID, hdr.messageID, methodCallbacks);
317 } else if (mSignalsCallbacks.count(hdr.methodID)) {
319 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(hdr.methodID);
320 return onRemoteSignal(peerIt, hdr.methodID, hdr.messageID, signalCallbacks);
323 LOGW(mLogPrefix + "No method or signal callback for methodID: " << hdr.methodID);
324 removePeerInternal(peerIt,
325 std::make_exception_ptr(IPCNaughtyPeerException()));
332 void Processor::onNewSignals(const PeerID& peerID, std::shared_ptr<RegisterSignalsProtocolMessage>& data)
334 LOGS(mLogPrefix + "Processor onNewSignals peerID: " << peerID);
336 for (const MethodID methodID : data->ids) {
337 mSignalsPeers[methodID].push_back(peerID);
341 void Processor::onErrorSignal(const PeerID&, std::shared_ptr<ErrorProtocolMessage>& data)
343 LOGS(mLogPrefix + "Processor onErrorSignal messageID: " << data->messageID);
345 // If there is no return callback an out_of_range error will be thrown and peer will be removed
346 ReturnCallbacks returnCallbacks = std::move(mReturnCallbacks.at(data->messageID));
347 mReturnCallbacks.erase(data->messageID);
349 ResultBuilder resultBuilder(std::make_exception_ptr(IPCUserException(data->code, data->message)));
350 IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
353 bool Processor::onReturnValue(Peers::iterator& peerIt,
354 const MessageID& messageID)
356 LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
358 ReturnCallbacks returnCallbacks;
360 LOGT(mLogPrefix + "Getting the return callback");
361 returnCallbacks = std::move(mReturnCallbacks.at(messageID));
362 mReturnCallbacks.erase(messageID);
363 } catch (const std::out_of_range&) {
364 LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
365 removePeerInternal(peerIt,
366 std::make_exception_ptr(IPCNaughtyPeerException()));
370 std::shared_ptr<void> data;
372 LOGT(mLogPrefix + "Parsing incoming return data");
373 data = returnCallbacks.parse(peerIt->socketPtr->getFD());
374 } catch (const std::exception& e) {
375 LOGE(mLogPrefix + "Exception during parsing: " << e.what());
376 ResultBuilder resultBuilder(std::make_exception_ptr(IPCParsingException()));
377 IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
378 removePeerInternal(peerIt,
379 std::make_exception_ptr(IPCParsingException()));
383 ResultBuilder resultBuilder(data);
384 IGNORE_EXCEPTIONS(returnCallbacks.process(resultBuilder));
389 bool Processor::onRemoteSignal(Peers::iterator& peerIt,
390 __attribute__((unused)) const MethodID methodID,
391 __attribute__((unused)) const MessageID& messageID,
392 std::shared_ptr<SignalHandlers> signalCallbacks)
394 LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
396 std::shared_ptr<void> data;
398 LOGT(mLogPrefix + "Parsing incoming data");
399 data = signalCallbacks->parse(peerIt->socketPtr->getFD());
400 } catch (const std::exception& e) {
401 LOGE(mLogPrefix + "Exception during parsing: " << e.what());
402 removePeerInternal(peerIt,
403 std::make_exception_ptr(IPCParsingException()));
408 signalCallbacks->signal(peerIt->peerID, data);
409 } catch (const IPCUserException& e) {
410 LOGW("Discarded user's exception");
412 } catch (const std::exception& e) {
413 LOGE(mLogPrefix + "Exception in method handler: " << e.what());
414 removePeerInternal(peerIt,
415 std::make_exception_ptr(IPCNaughtyPeerException()));
423 bool Processor::onRemoteMethod(Peers::iterator& peerIt,
424 const MethodID methodID,
425 const MessageID& messageID,
426 std::shared_ptr<MethodHandlers> methodCallbacks)
428 LOGS(mLogPrefix + "Processor onRemoteMethod; methodID: " << methodID << " messageID: " << messageID);
430 std::shared_ptr<void> data;
432 LOGT(mLogPrefix + "Parsing incoming data");
433 data = methodCallbacks->parse(peerIt->socketPtr->getFD());
434 } catch (const std::exception& e) {
435 LOGE(mLogPrefix + "Exception during parsing: " << e.what());
436 removePeerInternal(peerIt,
437 std::make_exception_ptr(IPCParsingException()));
441 LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID);
443 methodCallbacks->method(peerIt->peerID,
445 std::make_shared<MethodResult>(*this, methodID, messageID, peerIt->peerID));
446 } catch (const IPCUserException& e) {
447 LOGW("User's exception");
448 sendError(peerIt->peerID, messageID, e.getCode(), e.what());
450 } catch (const std::exception& e) {
451 LOGE(mLogPrefix + "Exception in method handler: " << e.what());
452 removePeerInternal(peerIt,
453 std::make_exception_ptr(IPCNaughtyPeerException()));
460 bool Processor::handleEvent()
462 LOGS(mLogPrefix + "Processor handleEvent");
464 Lock lock(mStateMutex);
466 auto request = mRequestQueue.pop();
467 LOGD(mLogPrefix + "Got: " << request.requestID);
469 switch (request.requestID) {
470 case Event::METHOD: return onMethodRequest(*request.get<MethodRequest>());
471 case Event::SIGNAL: return onSignalRequest(*request.get<SignalRequest>());
472 case Event::ADD_PEER: return onAddPeerRequest(*request.get<AddPeerRequest>());
473 case Event::REMOVE_PEER: return onRemovePeerRequest(*request.get<RemovePeerRequest>());
474 case Event::SEND_RESULT: return onSendResultRequest(*request.get<SendResultRequest>());
475 case Event::FINISH: return onFinishRequest(*request.get<FinishRequest>());
481 bool Processor::onMethodRequest(MethodRequest& request)
483 LOGS(mLogPrefix + "Processor onMethodRequest");
485 auto peerIt = getPeerInfoIterator(request.peerID);
487 if (peerIt == mPeerInfo.end()) {
488 LOGE(mLogPrefix + "Peer disconnected. No user with a peerID: " << request.peerID);
490 // Pass the error to the processing callback
491 ResultBuilder resultBuilder(std::make_exception_ptr(IPCPeerDisconnectedException()));
492 IGNORE_EXCEPTIONS(request.process(resultBuilder));
497 if (mReturnCallbacks.count(request.messageID) != 0) {
498 LOGE(mLogPrefix + "There already was a return callback for messageID: " << request.messageID);
500 mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(peerIt->peerID,
501 std::move(request.parse),
502 std::move(request.process)));
506 // Send the call with the socket
507 Socket& socket = *peerIt->socketPtr;
508 Socket::Guard guard = socket.getGuard();
509 hdr.methodID = request.methodID;
510 hdr.messageID = request.messageID;
511 config::saveToFD<MessageHeader>(socket.getFD(), hdr);
512 LOGT(mLogPrefix + "Serializing the message");
513 request.serialize(socket.getFD(), request.data);
514 } catch (const std::exception& e) {
515 LOGE(mLogPrefix + "Error during sending a method: " << e.what());
517 // Inform about the error
518 ResultBuilder resultBuilder(std::make_exception_ptr(IPCSerializationException()));
519 IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(resultBuilder));
522 mReturnCallbacks.erase(request.messageID);
523 removePeerInternal(peerIt,
524 std::make_exception_ptr(IPCSerializationException()));
532 bool Processor::onSignalRequest(SignalRequest& request)
534 LOGS(mLogPrefix + "Processor onSignalRequest");
536 auto peerIt = getPeerInfoIterator(request.peerID);
538 if (peerIt == mPeerInfo.end()) {
539 LOGE(mLogPrefix + "Peer disconnected. No user for peerID: " << request.peerID);
545 // Send the call with the socket
546 Socket& socket = *peerIt->socketPtr;
547 Socket::Guard guard = socket.getGuard();
548 hdr.methodID = request.methodID;
549 hdr.messageID = request.messageID;
550 config::saveToFD<MessageHeader>(socket.getFD(), hdr);
551 request.serialize(socket.getFD(), request.data);
552 } catch (const std::exception& e) {
553 LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
555 removePeerInternal(peerIt,
556 std::make_exception_ptr(IPCSerializationException()));
563 bool Processor::onAddPeerRequest(AddPeerRequest& request)
565 LOGS(mLogPrefix + "Processor onAddPeerRequest");
567 if (mPeerInfo.size() > mMaxNumberOfPeers) {
568 LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerID);
572 if (getPeerInfoIterator(request.peerID) != mPeerInfo.end()) {
573 LOGE(mLogPrefix + "There already was a socket for peerID: " << request.peerID);
577 PeerInfo peerInfo(request.peerID, request.socketPtr);
578 mPeerInfo.push_back(std::move(peerInfo));
581 // Sending handled signals
582 std::vector<MethodID> ids;
583 for (const auto kv : mSignalsCallbacks) {
584 ids.push_back(kv.first);
586 auto data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
587 signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
591 if (mNewPeerCallback) {
592 // Notify about the new user.
593 LOGT(mLogPrefix + "Calling NewPeerCallback");
594 mNewPeerCallback(request.peerID, request.socketPtr->getFD());
597 LOGI(mLogPrefix + "New peerID: " << request.peerID);
601 bool Processor::onRemovePeerRequest(RemovePeerRequest& request)
603 LOGS(mLogPrefix + "Processor onRemovePeer");
605 removePeerInternal(getPeerInfoIterator(request.peerID),
606 std::make_exception_ptr(IPCRemovedPeerException()));
608 request.conditionPtr->notify_all();
613 bool Processor::onSendResultRequest(SendResultRequest& request)
615 LOGS(mLogPrefix + "Processor onMethodRequest");
617 auto peerIt = getPeerInfoIterator(request.peerID);
619 if (peerIt == mPeerInfo.end()) {
620 LOGE(mLogPrefix + "Peer disconnected, no result is sent. No user with a peerID: " << request.peerID);
624 std::shared_ptr<MethodHandlers> methodCallbacks;
626 methodCallbacks = mMethodsCallbacks.at(request.methodID);
627 } catch (const std::out_of_range&) {
628 LOGW(mLogPrefix + "No method, might have been deleted. methodID: " << request.methodID);
634 // Send the call with the socket
635 Socket& socket = *peerIt->socketPtr;
636 Socket::Guard guard = socket.getGuard();
637 hdr.methodID = RETURN_METHOD_ID;
638 hdr.messageID = request.messageID;
639 config::saveToFD<MessageHeader>(socket.getFD(), hdr);
640 LOGT(mLogPrefix + "Serializing the message");
641 methodCallbacks->serialize(socket.getFD(), request.data);
642 } catch (const std::exception& e) {
643 LOGE(mLogPrefix + "Error during sending a method: " << e.what());
645 // Inform about the error
646 ResultBuilder resultBuilder(std::make_exception_ptr(IPCSerializationException()));
647 IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(resultBuilder));
650 mReturnCallbacks.erase(request.messageID);
651 removePeerInternal(peerIt,
652 std::make_exception_ptr(IPCSerializationException()));
660 bool Processor::onFinishRequest(FinishRequest& requestFinisher)
662 LOGS(mLogPrefix + "Processor onFinishRequest");
664 // Clean the mRequestQueue
665 while (!mRequestQueue.isEmpty()) {
666 auto request = mRequestQueue.pop();
667 LOGE(mLogPrefix + "Got: " << request.requestID << " after FINISH");
669 switch (request.requestID) {
670 case Event::METHOD: {
671 auto requestPtr = request.get<MethodRequest>();
672 ResultBuilder resultBuilder(std::make_exception_ptr(IPCClosingException()));
673 IGNORE_EXCEPTIONS(requestPtr->process(resultBuilder));
676 case Event::REMOVE_PEER: {
677 onRemovePeerRequest(*request.get<RemovePeerRequest>());
680 case Event::SEND_RESULT: {
681 onSendResultRequest(*request.get<SendResultRequest>());
685 case Event::ADD_PEER:
692 while (!mPeerInfo.empty()) {
693 removePeerInternal(--mPeerInfo.end(),
694 std::make_exception_ptr(IPCClosingException()));
697 mEventPoll.removeFD(mRequestQueue.getFD());
700 requestFinisher.conditionPtr->notify_all();
704 std::ostream& operator<<(std::ostream& os, const Processor::Event& event)
708 case Processor::Event::FINISH: {
709 os << "Event::FINISH";
713 case Processor::Event::METHOD: {
714 os << "Event::METHOD";
718 case Processor::Event::SIGNAL: {
719 os << "Event::SIGNAL";
723 case Processor::Event::ADD_PEER: {
724 os << "Event::ADD_PEER";
728 case Processor::Event::REMOVE_PEER: {
729 os << "Event::REMOVE_PEER";
733 case Processor::Event::SEND_RESULT: {
734 os << "Event::SEND_RESULT";