2 * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
4 * Contact: Jan Olszak <j.olszak@samsung.com>
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License
21 * @author Jan Olszak (j.olszak@samsung.com)
22 * @brief Data and event processing thread
27 #include "ipc/exception.hpp"
28 #include "ipc/internals/processor.hpp"
29 #include "ipc/internals/utils.hpp"
30 #include "utils/signal.hpp"
37 #include <sys/socket.h>
43 #define IGNORE_EXCEPTIONS(expr) \
48 catch (const std::exception& e){ \
49 LOGE(mLogPrefix + "Callback threw an error: " << e.what()); \
52 const MethodID Processor::RETURN_METHOD_ID = std::numeric_limits<MethodID>::max();
53 const MethodID Processor::REGISTER_SIGNAL_METHOD_ID = std::numeric_limits<MethodID>::max() - 1;
55 Processor::Processor(const std::string& logName,
56 const PeerCallback& newPeerCallback,
57 const PeerCallback& removedPeerCallback,
58 const unsigned int maxNumberOfPeers)
59 : mLogPrefix(logName),
61 mNewPeerCallback(newPeerCallback),
62 mRemovedPeerCallback(removedPeerCallback),
63 mMaxNumberOfPeers(maxNumberOfPeers)
65 LOGS(mLogPrefix + "Processor Constructor");
67 utils::signalBlock(SIGPIPE);
68 using namespace std::placeholders;
69 setMethodHandlerInternal<EmptyData, RegisterSignalsMessage>(REGISTER_SIGNAL_METHOD_ID,
70 std::bind(&Processor::onNewSignals, this, _1, _2));
73 Processor::~Processor()
75 LOGS(mLogPrefix + "Processor Destructor");
78 } catch (IPCException& e) {
79 LOGE(mLogPrefix + "Error in Processor's destructor: " << e.what());
83 bool Processor::isStarted()
85 Lock lock(mStateMutex);
89 void Processor::start(bool usesExternalPolling)
91 LOGS(mLogPrefix + "Processor start");
93 Lock lock(mStateMutex);
95 LOGI(mLogPrefix + "Processor start");
97 mUsesExternalPolling = usesExternalPolling;
98 if (!usesExternalPolling) {
99 mThread = std::thread(&Processor::run, this);
104 void Processor::stop()
106 LOGS(mLogPrefix + "Processor stop");
109 auto conditionPtr = std::make_shared<std::condition_variable_any>();
111 Lock lock(mStateMutex);
112 auto request = std::make_shared<FinishRequest>(conditionPtr);
113 mRequestQueue.push(Event::FINISH, request);
116 LOGD(mLogPrefix + "Waiting for the Processor to stop");
118 if (mThread.joinable()) {
121 // Wait till the FINISH request is served
122 Lock lock(mStateMutex);
123 conditionPtr->wait(lock, [this]() {
130 void Processor::setNewPeerCallback(const PeerCallback& newPeerCallback)
132 Lock lock(mStateMutex);
133 mNewPeerCallback = newPeerCallback;
136 void Processor::setRemovedPeerCallback(const PeerCallback& removedPeerCallback)
138 Lock lock(mStateMutex);
139 mRemovedPeerCallback = removedPeerCallback;
142 FileDescriptor Processor::getEventFD()
144 Lock lock(mStateMutex);
145 return mRequestQueue.getFD();
148 void Processor::removeMethod(const MethodID methodID)
150 Lock lock(mStateMutex);
151 mMethodsCallbacks.erase(methodID);
154 FileDescriptor Processor::addPeer(const std::shared_ptr<Socket>& socketPtr)
156 LOGS(mLogPrefix + "Processor addPeer");
157 Lock lock(mStateMutex);
159 FileDescriptor peerFD = socketPtr->getFD();
160 auto request = std::make_shared<AddPeerRequest>(peerFD, socketPtr);
161 mRequestQueue.push(Event::ADD_PEER, request);
163 LOGI(mLogPrefix + "Add Peer Request. Id: " << peerFD);
168 void Processor::removePeer(const FileDescriptor peerFD)
170 LOGS(mLogPrefix + "Processor removePeer peerFD: " << peerFD);
173 Lock lock(mStateMutex);
174 mRequestQueue.removeIf([peerFD](Request & request) {
175 return request.requestID == Event::ADD_PEER &&
176 request.get<AddPeerRequest>()->peerFD == peerFD;
180 // Remove peer and wait till he's gone
181 std::shared_ptr<std::condition_variable_any> conditionPtr(new std::condition_variable_any());
183 Lock lock(mStateMutex);
184 auto request = std::make_shared<RemovePeerRequest>(peerFD, conditionPtr);
185 mRequestQueue.push(Event::REMOVE_PEER, request);
188 auto isPeerDeleted = [&peerFD, this]()->bool {
189 return mSockets.count(peerFD) == 0;
192 Lock lock(mStateMutex);
193 conditionPtr->wait(lock, isPeerDeleted);
196 void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
198 LOGS(mLogPrefix + "Processor removePeerInternal peerFD: " << peerFD);
199 LOGI(mLogPrefix + "Removing peer. peerFD: " << peerFD);
201 if (!mSockets.erase(peerFD)) {
202 LOGW(mLogPrefix + "No such peer. Another thread called removePeerInternal");
206 // Remove from signal addressees
207 for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
208 it->second.remove(peerFD);
209 if (it->second.empty()) {
210 it = mSignalsPeers.erase(it);
216 // Erase associated return value callbacks
217 std::shared_ptr<void> data;
218 for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
219 if (it->second.peerFD == peerFD) {
220 IGNORE_EXCEPTIONS(it->second.process(status, data));
221 it = mReturnCallbacks.erase(it);
227 if (mRemovedPeerCallback) {
228 // Notify about the deletion
229 mRemovedPeerCallback(peerFD);
235 void Processor::resetPolling()
237 LOGS(mLogPrefix + "Processor resetPolling");
239 if (mUsesExternalPolling) {
244 Lock lock(mStateMutex);
245 LOGI(mLogPrefix + "Reseting mFDS.size: " << mSockets.size());
246 // Setup polling on eventfd and sockets
247 mFDs.resize(mSockets.size() + 1);
249 mFDs[0].fd = mRequestQueue.getFD();
250 mFDs[0].events = POLLIN;
252 auto socketIt = mSockets.begin();
253 for (unsigned int i = 1; i < mFDs.size(); ++i) {
254 LOGI(mLogPrefix + "Reseting fd: " << socketIt->second->getFD());
255 mFDs[i].fd = socketIt->second->getFD();
256 mFDs[i].events = POLLIN | POLLHUP; // Listen for input events
258 // TODO: It's possible to block on writing to fd. Maybe listen for POLLOUT too?
263 void Processor::run()
265 LOGS(mLogPrefix + "Processor run");
269 while (isStarted()) {
270 LOGT(mLogPrefix + "Waiting for communication...");
271 int ret = poll(mFDs.data(), mFDs.size(), -1 /*blocking call*/);
272 LOGT(mLogPrefix + "... incoming communication!");
273 if (ret == -1 || ret == 0) {
274 if (errno == EINTR) {
277 LOGE(mLogPrefix + "Error in poll: " << std::string(strerror(errno)));
278 throw IPCException("Error in poll: " + std::string(strerror(errno)));
281 // Check for lost connections:
282 if (handleLostConnections()) {
287 // Check for incoming data.
288 if (handleInputs()) {
293 // Check for incoming events
294 if (mFDs[0].revents & POLLIN) {
295 mFDs[0].revents &= ~(POLLIN);
305 bool Processor::handleLostConnections()
307 Lock lock(mStateMutex);
309 bool isPeerRemoved = false;
311 for (unsigned int i = 1; i < mFDs.size(); ++i) {
312 if (mFDs[i].revents & POLLHUP) {
313 LOGI(mLogPrefix + "Lost connection to peer: " << mFDs[i].fd);
314 mFDs[i].revents &= ~(POLLHUP);
315 removePeerInternal(mFDs[i].fd, Status::PEER_DISCONNECTED);
316 isPeerRemoved = true;
321 return isPeerRemoved;
324 bool Processor::handleLostConnection(const FileDescriptor peerFD)
326 Lock lock(mStateMutex);
327 removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
331 bool Processor::handleInputs()
333 Lock lock(mStateMutex);
335 bool pollChanged = false;
336 for (unsigned int i = 1; i < mFDs.size(); ++i) {
337 if (mFDs[i].revents & POLLIN) {
338 mFDs[i].revents &= ~(POLLIN);
339 pollChanged = pollChanged || handleInput(mFDs[i].fd);
346 bool Processor::handleInput(const FileDescriptor peerFD)
348 LOGS(mLogPrefix + "Processor handleInput peerFD: " << peerFD);
349 Lock lock(mStateMutex);
351 std::shared_ptr<Socket> socketPtr;
353 // Get the peer's socket
354 socketPtr = mSockets.at(peerFD);
355 } catch (const std::out_of_range&) {
356 LOGE(mLogPrefix + "No such peer: " << peerFD);
363 Socket::Guard guard = socketPtr->getGuard();
365 socketPtr->read(&methodID, sizeof(methodID));
366 socketPtr->read(&messageID, sizeof(messageID));
368 } catch (const IPCException& e) {
369 LOGE(mLogPrefix + "Error during reading the socket");
370 removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
374 if (methodID == RETURN_METHOD_ID) {
375 return onReturnValue(*socketPtr, messageID);
378 if (mMethodsCallbacks.count(methodID)) {
380 std::shared_ptr<MethodHandlers> methodCallbacks = mMethodsCallbacks.at(methodID);
381 return onRemoteCall(*socketPtr, methodID, messageID, methodCallbacks);
383 } else if (mSignalsCallbacks.count(methodID)) {
385 std::shared_ptr<SignalHandlers> signalCallbacks = mSignalsCallbacks.at(methodID);
386 return onRemoteSignal(*socketPtr, methodID, messageID, signalCallbacks);
390 LOGW(mLogPrefix + "No method or signal callback for methodID: " << methodID);
391 removePeerInternal(socketPtr->getFD(), Status::NAUGHTY_PEER);
398 std::shared_ptr<Processor::EmptyData> Processor::onNewSignals(const FileDescriptor peerFD,
399 std::shared_ptr<RegisterSignalsMessage>& data)
401 LOGS(mLogPrefix + "Processor onNewSignals peerFD: " << peerFD);
403 for (const MethodID methodID : data->ids) {
404 mSignalsPeers[methodID].push_back(peerFD);
407 return std::make_shared<EmptyData>();
410 bool Processor::onReturnValue(const Socket& socket,
411 const MessageID messageID)
413 LOGS(mLogPrefix + "Processor onReturnValue messageID: " << messageID);
415 // LOGI(mLogPrefix + "Return value for messageID: " << messageID);
416 ReturnCallbacks returnCallbacks;
418 LOGT(mLogPrefix + "Getting the return callback");
419 returnCallbacks = std::move(mReturnCallbacks.at(messageID));
420 mReturnCallbacks.erase(messageID);
421 } catch (const std::out_of_range&) {
422 LOGW(mLogPrefix + "No return callback for messageID: " << messageID);
423 removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
427 std::shared_ptr<void> data;
429 LOGT(mLogPrefix + "Parsing incoming return data");
430 data = returnCallbacks.parse(socket.getFD());
431 } catch (const std::exception& e) {
432 LOGE(mLogPrefix + "Exception during parsing: " << e.what());
433 IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
434 removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
438 // LOGT(mLogPrefix + "Process return value callback for messageID: " << messageID);
439 IGNORE_EXCEPTIONS(returnCallbacks.process(Status::OK, data));
441 // LOGT(mLogPrefix + "Return value for messageID: " << messageID << " processed");
445 bool Processor::onRemoteSignal(const Socket& socket,
446 const MethodID methodID,
447 const MessageID messageID,
448 std::shared_ptr<SignalHandlers> signalCallbacks)
450 LOGS(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
452 // LOGI(mLogPrefix + "Processor onRemoteSignal; methodID: " << methodID << " messageID: " << messageID);
454 std::shared_ptr<void> data;
456 LOGT(mLogPrefix + "Parsing incoming data");
457 data = signalCallbacks->parse(socket.getFD());
458 } catch (const std::exception& e) {
459 LOGE(mLogPrefix + "Exception during parsing: " << e.what());
460 removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
464 // LOGT(mLogPrefix + "Signal callback for methodID: " << methodID << "; messageID: " << messageID);
466 signalCallbacks->signal(socket.getFD(), data);
467 } catch (const std::exception& e) {
468 LOGE(mLogPrefix + "Exception in method handler: " << e.what());
469 removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
476 bool Processor::onRemoteCall(const Socket& socket,
477 const MethodID methodID,
478 const MessageID messageID,
479 std::shared_ptr<MethodHandlers> methodCallbacks)
481 LOGS(mLogPrefix + "Processor onRemoteCall; methodID: " << methodID << " messageID: " << messageID);
482 // LOGI(mLogPrefix + "Remote call; methodID: " << methodID << " messageID: " << messageID);
484 std::shared_ptr<void> data;
486 LOGT(mLogPrefix + "Parsing incoming data");
487 data = methodCallbacks->parse(socket.getFD());
488 } catch (const std::exception& e) {
489 LOGE(mLogPrefix + "Exception during parsing: " << e.what());
490 removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
494 LOGT(mLogPrefix + "Process callback for methodID: " << methodID << "; messageID: " << messageID);
495 std::shared_ptr<void> returnData;
497 returnData = methodCallbacks->method(socket.getFD(), data);
498 } catch (const std::exception& e) {
499 LOGE(mLogPrefix + "Exception in method handler: " << e.what());
500 removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
504 LOGT(mLogPrefix + "Sending return data; methodID: " << methodID << "; messageID: " << messageID);
506 // Send the call with the socket
507 Socket::Guard guard = socket.getGuard();
508 socket.write(&RETURN_METHOD_ID, sizeof(RETURN_METHOD_ID));
509 socket.write(&messageID, sizeof(messageID));
510 methodCallbacks->serialize(socket.getFD(), returnData);
511 } catch (const std::exception& e) {
512 LOGE(mLogPrefix + "Exception during serialization: " << e.what());
513 removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
520 bool Processor::handleEvent()
522 LOGS(mLogPrefix + "Processor handleEvent");
524 Lock lock(mStateMutex);
526 auto request = mRequestQueue.pop();
527 LOGD(mLogPrefix + "Got: " << request.requestID);
529 switch (request.requestID) {
530 case Event::METHOD: return onMethodRequest(*request.get<MethodRequest>());
531 case Event::SIGNAL: return onSignalRequest(*request.get<SignalRequest>());
532 case Event::ADD_PEER: return onAddPeerRequest(*request.get<AddPeerRequest>());
533 case Event::REMOVE_PEER: return onRemovePeerRequest(*request.get<RemovePeerRequest>());
534 case Event::FINISH: return onFinishRequest(*request.get<FinishRequest>());
540 bool Processor::onMethodRequest(MethodRequest& request)
542 LOGS(mLogPrefix + "Processor onMethodRequest");
543 std::shared_ptr<Socket> socketPtr;
546 // Get the peer's socket
547 socketPtr = mSockets.at(request.peerFD);
548 } catch (const std::out_of_range&) {
549 LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
551 // Pass the error to the processing callback
552 IGNORE_EXCEPTIONS(request.process(Status::PEER_DISCONNECTED, request.data));
557 if (mReturnCallbacks.count(request.messageID) != 0) {
558 LOGE(mLogPrefix + "There already was a return callback for messageID: " << request.messageID);
560 mReturnCallbacks[request.messageID] = std::move(ReturnCallbacks(request.peerFD,
561 std::move(request.parse),
562 std::move(request.process)));
565 // Send the call with the socket
566 Socket::Guard guard = socketPtr->getGuard();
567 socketPtr->write(&request.methodID, sizeof(request.methodID));
568 socketPtr->write(&request.messageID, sizeof(request.messageID));
569 LOGT(mLogPrefix + "Serializing the message");
570 request.serialize(socketPtr->getFD(), request.data);
571 } catch (const std::exception& e) {
572 LOGE(mLogPrefix + "Error during sending a method: " << e.what());
574 // Inform about the error,
575 IGNORE_EXCEPTIONS(mReturnCallbacks[request.messageID].process(Status::SERIALIZATION_ERROR, request.data));
578 mReturnCallbacks.erase(request.messageID);
579 removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
588 bool Processor::onSignalRequest(SignalRequest& request)
590 LOGS(mLogPrefix + "Processor onSignalRequest");
592 std::shared_ptr<Socket> socketPtr;
594 // Get the peer's socket
595 socketPtr = mSockets.at(request.peerFD);
596 } catch (const std::out_of_range&) {
597 LOGE(mLogPrefix + "Peer disconnected. No socket with a peerFD: " << request.peerFD);
602 // Send the call with the socket
603 Socket::Guard guard = socketPtr->getGuard();
604 socketPtr->write(&request.methodID, sizeof(request.methodID));
605 socketPtr->write(&request.messageID, sizeof(request.messageID));
606 request.serialize(socketPtr->getFD(), request.data);
607 } catch (const std::exception& e) {
608 LOGE(mLogPrefix + "Error during sending a signal: " << e.what());
610 removePeerInternal(request.peerFD, Status::SERIALIZATION_ERROR);
617 bool Processor::onAddPeerRequest(AddPeerRequest& request)
619 LOGS(mLogPrefix + "Processor onAddPeerRequest");
621 if (mSockets.size() > mMaxNumberOfPeers) {
622 LOGE(mLogPrefix + "There are too many peers. I don't accept the connection with " << request.peerFD);
625 if (mSockets.count(request.peerFD) != 0) {
626 LOGE(mLogPrefix + "There already was a socket for peerFD: " << request.peerFD);
630 mSockets[request.peerFD] = std::move(request.socketPtr);
633 // Sending handled signals
634 std::vector<MethodID> ids;
635 for (const auto kv : mSignalsCallbacks) {
636 ids.push_back(kv.first);
638 auto data = std::make_shared<RegisterSignalsMessage>(ids);
639 callAsync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
642 discardResultHandler<EmptyData>);
647 if (mNewPeerCallback) {
648 // Notify about the new user.
649 LOGT(mLogPrefix + "Calling NewPeerCallback");
650 mNewPeerCallback(request.peerFD);
653 LOGI(mLogPrefix + "New peer: " << request.peerFD);
657 bool Processor::onRemovePeerRequest(RemovePeerRequest& request)
659 LOGS(mLogPrefix + "Processor onRemovePeer");
661 removePeerInternal(request.peerFD, Status::REMOVED_PEER);
662 request.conditionPtr->notify_all();
667 bool Processor::onFinishRequest(FinishRequest& request)
669 LOGS(mLogPrefix + "Processor onFinishRequest");
671 // Clean the mRequestQueue
672 while (!mRequestQueue.isEmpty()) {
673 auto request = mRequestQueue.pop();
674 LOGE(mLogPrefix + "Got: " << request.requestID << " after FINISH");
676 switch (request.requestID) {
677 case Event::METHOD: {
678 auto requestPtr = request.get<MethodRequest>();
679 IGNORE_EXCEPTIONS(requestPtr->process(Status::CLOSING, requestPtr->data));
682 case Event::REMOVE_PEER: {
683 onRemovePeerRequest(*request.get<RemovePeerRequest>());
687 case Event::ADD_PEER:
695 request.conditionPtr->notify_all();
699 std::ostream& operator<<(std::ostream& os, const Processor::Event& event)
703 case Processor::Event::FINISH: {
704 os << "Event::FINISH";
708 case Processor::Event::METHOD: {
709 os << "Event::METHOD";
713 case Processor::Event::SIGNAL: {
714 os << "Event::SIGNAL";
718 case Processor::Event::ADD_PEER: {
719 os << "Event::ADD_PEER";
723 case Processor::Event::REMOVE_PEER: {
724 os << "Event::REMOVE_PEER";