2 * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
4 * Contact: Jan Olszak <j.olszak@samsung.com>
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License
21 * @author Jan Olszak (j.olszak@samsung.com)
22 * @brief Data and event processing thread
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
28 #include "ipc/internals/result-builder.hpp"
29 #include "ipc/internals/socket.hpp"
30 #include "ipc/internals/request-queue.hpp"
31 #include "ipc/internals/method-request.hpp"
32 #include "ipc/internals/signal-request.hpp"
33 #include "ipc/internals/add-peer-request.hpp"
34 #include "ipc/internals/remove-peer-request.hpp"
35 #include "ipc/internals/send-result-request.hpp"
36 #include "ipc/internals/finish-request.hpp"
37 #include "ipc/exception.hpp"
38 #include "ipc/method-result.hpp"
39 #include "ipc/types.hpp"
40 #include "config/manager.hpp"
41 #include "config/fields.hpp"
42 #include "logger/logger.hpp"
43 #include "logger/logger-scope.hpp"
46 #include <condition_variable>
54 #include <unordered_map>
59 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
61 * This class wraps communication via UX sockets
63 * It's intended to be used both in Client and Service classes.
64 * It uses a serialization mechanism from Config.
65 * Library user will only have to pass the types that each call will send and receive
68 * - MethodID - probably casted enum.
69 * MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
70 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
71 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
74 * - synchronous call to many peers
75 * - implement HandlerStore class for storing both signals and methods
76 * - API for removing signals
77 * - implement CallbackStore - thread safe calling/setting callbacks
78 * - helper function for removing from unordered map
79 * - new way to generate UIDs
80 * - callbacks for serialization/parsing
81 * - store Sockets in a vector, maybe SocketStore?
82 * - waiting till the EventQueue is empty before leaving stop()
83 * - no new events added after stop() called
89 FINISH, // Shutdown request
90 METHOD, // New method call in the queue
91 SIGNAL, // New signal call in the queue
92 ADD_PEER, // New peer in the queue
93 REMOVE_PEER, // Remove peer
94 SEND_RESULT // Send the result of a method's call
98 friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event);
101 * Used to indicate a message with the return value.
103 static const MethodID RETURN_METHOD_ID;
106 * Indicates an Processor's internal request/broadcast to register a Signal
108 static const MethodID REGISTER_SIGNAL_METHOD_ID;
111 * Error return message
113 static const MethodID ERROR_METHOD_ID;
116 * Constructs the Processor, but doesn't start it.
117 * The object is ready to add methods.
119 * @param newPeerCallback called when a new peer arrives
120 * @param removedPeerCallback called when the Processor stops listening for this peer
122 Processor(const std::string& logName = "",
123 const PeerCallback& newPeerCallback = nullptr,
124 const PeerCallback& removedPeerCallback = nullptr,
125 const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
128 Processor(const Processor&) = delete;
129 Processor(Processor&&) = delete;
130 Processor& operator=(const Processor&) = delete;
139 * @return is processor running
144 * Stops the processing thread.
145 * No incoming data will be handled after.
150 * Set the callback called for each new connection to a peer
152 * @param newPeerCallback the callback
154 void setNewPeerCallback(const PeerCallback& newPeerCallback);
157 * Set the callback called when connection to a peer is lost
159 * @param removedPeerCallback the callback
161 void setRemovedPeerCallback(const PeerCallback& removedPeerCallback);
164 * From now on socket is owned by the Processor object.
165 * Calls the newPeerCallback.
167 * @param socketPtr pointer to the new socket
168 * @return peerID of the new user
170 PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
173 * Saves the callbacks connected to the method id.
174 * When a message with the given method id is received,
175 * the data will be passed to the serialization callback through file descriptor.
177 * Then the process callback will be called with the parsed data.
179 * @param methodID API dependent id of the method
180 * @param process data processing callback
181 * @tparam SentDataType data type to send
182 * @tparam ReceivedDataType data type to receive
184 template<typename SentDataType, typename ReceivedDataType>
185 void setMethodHandler(const MethodID methodID,
186 const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
189 * Saves the callbacks connected to the method id.
190 * When a message with the given method id is received,
191 * the data will be passed to the serialization callback through file descriptor.
193 * Then the process callback will be called with the parsed data.
194 * There is no return data to send back.
196 * Adding signal sends a registering message to all peers
198 * @param methodID API dependent id of the method
199 * @param process data processing callback
200 * @tparam ReceivedDataType data type to receive
202 template<typename ReceivedDataType>
203 void setSignalHandler(const MethodID methodID,
204 const typename SignalHandler<ReceivedDataType>::type& process);
207 * Send result of the method.
208 * Used for asynchronous communication, only internally.
210 * @param methodID API dependent id of the method
211 * @param peerID id of the peer
212 * @param messageID id of the message to which it replies
213 * @param data data to send
215 void sendResult(const MethodID methodID,
217 const MessageID messageID,
218 const std::shared_ptr<void>& data);
221 * Send error result of the method
223 * @param peerID id of the peer
224 * @param messageID id of the message to which it replies
225 * @param errorCode code of the error
226 * @param message description of the error
228 void sendError(const PeerID peerID,
229 const MessageID messageID,
231 const std::string& message);
234 * Indicate that the method handler finished
236 * @param methodID API dependent id of the method
237 * @param peerID id of the peer
238 * @param messageID id of the message to which it replies
240 void sendVoid(const MethodID methodID,
242 const MessageID messageID);
245 * Removes the callback
247 * @param methodID API dependent id of the method
249 void removeMethod(const MethodID methodID);
252 * Synchronous method call.
254 * @param methodID API dependent id of the method
255 * @param peerID id of the peer
256 * @param data data to send
257 * @param timeoutMS how long to wait for the return value before throw
258 * @tparam SentDataType data type to send
259 * @tparam ReceivedDataType data type to receive
261 template<typename SentDataType, typename ReceivedDataType>
262 std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
264 const std::shared_ptr<SentDataType>& data,
265 unsigned int timeoutMS = 500);
268 * Asynchronous method call
270 * @param methodID API dependent id of the method
271 * @param peerID id of the peer
272 * @param data data to sent
273 * @param process callback processing the return data
274 * @tparam SentDataType data type to send
275 * @tparam ReceivedDataType data type to receive
277 template<typename SentDataType, typename ReceivedDataType>
278 MessageID callAsync(const MethodID methodID,
280 const std::shared_ptr<SentDataType>& data,
281 const typename ResultHandler<ReceivedDataType>::type& process);
285 * Send a signal to the peer.
286 * There is no return value from the peer
287 * Sends any data only if a peer registered this a signal
289 * @param methodID API dependent id of the method
290 * @param data data to sent
291 * @tparam SentDataType data type to send
293 template<typename SentDataType>
294 void signal(const MethodID methodID,
295 const std::shared_ptr<SentDataType>& data);
299 * Handler used in external polling.
301 * @param fd file description identifying the peer
302 * @return should the polling structure be rebuild
304 bool handleLostConnection(const FileDescriptor fd);
307 * Handles input from one peer.
308 * Handler used in external polling.
310 * @param fd file description identifying the peer
311 * @return should the polling structure be rebuild
313 bool handleInput(const FileDescriptor fd);
316 * Handle one event from the internal event's queue
318 * @return should the polling structure be rebuild
323 * @return file descriptor for the internal event's queue
325 FileDescriptor getEventFD();
328 typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
329 typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
330 typedef std::unique_lock<std::mutex> Lock;
331 typedef RequestQueue<Event>::Request Request;
334 CONFIG_REGISTER_EMPTY
337 struct RegisterSignalsProtocolMessage {
338 RegisterSignalsProtocolMessage() = default;
339 RegisterSignalsProtocolMessage(const std::vector<MethodID> ids)
342 std::vector<MethodID> ids;
350 struct ErrorProtocolMessage {
351 ErrorProtocolMessage() = default;
352 ErrorProtocolMessage(const MessageID messageID, const int code, const std::string& message)
353 : messageID(messageID), code(code), message(message) {}
367 struct MethodHandlers {
368 MethodHandlers(const MethodHandlers& other) = delete;
369 MethodHandlers& operator=(const MethodHandlers&) = delete;
370 MethodHandlers() = default;
371 MethodHandlers(MethodHandlers&&) = default;
372 MethodHandlers& operator=(MethodHandlers &&) = default;
374 SerializeCallback serialize;
376 MethodHandler<void, void>::type method;
379 struct SignalHandlers {
380 SignalHandlers(const SignalHandlers& other) = delete;
381 SignalHandlers& operator=(const SignalHandlers&) = delete;
382 SignalHandlers() = default;
383 SignalHandlers(SignalHandlers&&) = default;
384 SignalHandlers& operator=(SignalHandlers &&) = default;
387 SignalHandler<void>::type signal;
390 struct ReturnCallbacks {
391 ReturnCallbacks(const ReturnCallbacks& other) = delete;
392 ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
393 ReturnCallbacks() = default;
394 ReturnCallbacks(ReturnCallbacks&&) = default;
395 ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
397 ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultBuilderHandler& process)
398 : peerID(peerID), parse(parse), process(process) {}
402 ResultBuilderHandler process;
406 PeerInfo(const PeerInfo& other) = delete;
407 PeerInfo& operator=(const PeerInfo&) = delete;
410 PeerInfo(PeerInfo&&) = default;
411 PeerInfo& operator=(PeerInfo &&) = default;
413 PeerInfo(PeerID peerID, const std::shared_ptr<Socket>& socketPtr)
414 : peerID(peerID), socketPtr(socketPtr) {}
417 std::shared_ptr<Socket> socketPtr;
420 typedef std::vector<PeerInfo> Peers;
422 std::string mLogPrefix;
424 RequestQueue<Event> mRequestQueue;
428 std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
429 std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
430 std::unordered_map<MethodID, std::list<PeerID>> mSignalsPeers;
434 std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
436 // Mutex for modifying any internal data
437 std::mutex mStateMutex;
439 PeerCallback mNewPeerCallback;
440 PeerCallback mRemovedPeerCallback;
442 unsigned int mMaxNumberOfPeers;
444 template<typename SentDataType, typename ReceivedDataType>
445 MessageID callAsyncInternal(const MethodID methodID,
447 const std::shared_ptr<SentDataType>& data,
448 const typename ResultHandler<ReceivedDataType>::type& process);
450 template<typename SentDataType, typename ReceivedDataType>
451 void setMethodHandlerInternal(const MethodID methodID,
452 const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
454 template<typename ReceivedDataType>
455 void setSignalHandlerInternal(const MethodID methodID,
456 const typename SignalHandler<ReceivedDataType>::type& handler);
458 template<typename SentDataType>
459 void signalInternal(const MethodID methodID,
461 const std::shared_ptr<SentDataType>& data);
464 bool onMethodRequest(MethodRequest& request);
465 bool onSignalRequest(SignalRequest& request);
466 bool onAddPeerRequest(AddPeerRequest& request);
467 bool onRemovePeerRequest(RemovePeerRequest& request);
468 bool onSendResultRequest(SendResultRequest& request);
469 bool onFinishRequest(FinishRequest& request);
471 bool onReturnValue(Peers::iterator& peerIt,
472 const MessageID messageID);
473 bool onRemoteMethod(Peers::iterator& peerIt,
474 const MethodID methodID,
475 const MessageID messageID,
476 std::shared_ptr<MethodHandlers> methodCallbacks);
477 bool onRemoteSignal(Peers::iterator& peerIt,
478 const MethodID methodID,
479 const MessageID messageID,
480 std::shared_ptr<SignalHandlers> signalCallbacks);
482 void removePeerInternal(Peers::iterator peerIt,
483 const std::exception_ptr& exceptionPtr);
484 void removePeerSyncInternal(const PeerID peerID, Lock& lock);
486 void onNewSignals(const PeerID peerID,
487 std::shared_ptr<RegisterSignalsProtocolMessage>& data);
489 void onErrorSignal(const PeerID peerID,
490 std::shared_ptr<ErrorProtocolMessage>& data);
492 Peers::iterator getPeerInfoIterator(const FileDescriptor fd);
493 Peers::iterator getPeerInfoIterator(const PeerID peerID);
497 template<typename SentDataType, typename ReceivedDataType>
498 void Processor::setMethodHandlerInternal(const MethodID methodID,
499 const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
501 MethodHandlers methodCall;
503 methodCall.parse = [](const int fd)->std::shared_ptr<void> {
504 std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
505 config::loadFromFD<ReceivedDataType>(fd, *data);
509 methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
510 config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
513 methodCall.method = [method](const PeerID peerID, std::shared_ptr<void>& data, MethodResult::Pointer && methodResult) {
514 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
515 method(peerID, tmpData, std::forward<MethodResult::Pointer>(methodResult));
518 mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
521 template<typename SentDataType, typename ReceivedDataType>
522 void Processor::setMethodHandler(const MethodID methodID,
523 const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
525 if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
526 LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
527 throw IPCException("Forbidden methodID: " + std::to_string(methodID));
531 Lock lock(mStateMutex);
533 if (mSignalsCallbacks.count(methodID)) {
534 LOGE(mLogPrefix + "MethodID used by a signal: " << methodID);
535 throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
538 setMethodHandlerInternal<SentDataType, ReceivedDataType>(methodID, method);
543 template<typename ReceivedDataType>
544 void Processor::setSignalHandlerInternal(const MethodID methodID,
545 const typename SignalHandler<ReceivedDataType>::type& handler)
547 SignalHandlers signalCall;
549 signalCall.parse = [](const int fd)->std::shared_ptr<void> {
550 std::shared_ptr<ReceivedDataType> dataToFill(new ReceivedDataType());
551 config::loadFromFD<ReceivedDataType>(fd, *dataToFill);
555 signalCall.signal = [handler](const PeerID peerID, std::shared_ptr<void>& dataReceived) {
556 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(dataReceived);
557 handler(peerID, tmpData);
560 mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
564 template<typename ReceivedDataType>
565 void Processor::setSignalHandler(const MethodID methodID,
566 const typename SignalHandler<ReceivedDataType>::type& handler)
568 if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
569 LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
570 throw IPCException("Forbidden methodID: " + std::to_string(methodID));
573 std::shared_ptr<RegisterSignalsProtocolMessage> data;
576 Lock lock(mStateMutex);
578 // Andd the signal handler:
579 if (mMethodsCallbacks.count(methodID)) {
580 LOGE(mLogPrefix + "MethodID used by a method: " << methodID);
581 throw IPCException("MethodID used by a method: " + std::to_string(methodID));
584 setSignalHandlerInternal<ReceivedDataType>(methodID, handler);
586 // Broadcast the new signal:
587 std::vector<MethodID> ids {methodID};
588 data = std::make_shared<RegisterSignalsProtocolMessage>(ids);
590 for (const PeerInfo& peerInfo : mPeerInfo) {
591 signalInternal<RegisterSignalsProtocolMessage>(REGISTER_SIGNAL_METHOD_ID,
599 template<typename SentDataType, typename ReceivedDataType>
600 MessageID Processor::callAsync(const MethodID methodID,
602 const std::shared_ptr<SentDataType>& data,
603 const typename ResultHandler<ReceivedDataType>::type& process)
605 Lock lock(mStateMutex);
606 return callAsyncInternal<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
609 template<typename SentDataType, typename ReceivedDataType>
610 MessageID Processor::callAsyncInternal(const MethodID methodID,
612 const std::shared_ptr<SentDataType>& data,
613 const typename ResultHandler<ReceivedDataType>::type& process)
615 auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerID, data, process);
616 mRequestQueue.pushBack(Event::METHOD, request);
617 return request->messageID;
621 template<typename SentDataType, typename ReceivedDataType>
622 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
624 const std::shared_ptr<SentDataType>& data,
625 unsigned int timeoutMS)
627 Result<ReceivedDataType> result;
628 std::condition_variable cv;
630 auto process = [&result, &cv](const Result<ReceivedDataType> && r) {
631 // This is called under lock(mStateMutex)
632 result = std::move(r);
636 Lock lock(mStateMutex);
637 MessageID messageID = callAsyncInternal<SentDataType, ReceivedDataType>(methodID,
642 auto isResultInitialized = [&result]() {
643 return result.isValid();
646 LOGT(mLogPrefix + "Waiting for the response...");
647 //In the case of too large sending time response can be received far after timeoutMS but
648 //before this thread wakes up and before predicate check (there will by no timeout exception)
649 if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
650 LOGW(mLogPrefix + "Probably a timeout in callSync. Checking...");
652 // Call isn't sent or call is sent but there is no reply
653 bool isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
654 return request.requestID == Event::METHOD &&
655 request.get<MethodRequest>()->messageID == messageID;
657 || 1 == mReturnCallbacks.erase(messageID);
660 LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
661 removePeerSyncInternal(peerID, lock);
662 throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
664 LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
665 if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
666 LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
667 throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
675 template<typename SentDataType>
676 void Processor::signalInternal(const MethodID methodID,
678 const std::shared_ptr<SentDataType>& data)
680 auto requestPtr = SignalRequest::create<SentDataType>(methodID, peerID, data);
681 mRequestQueue.pushFront(Event::SIGNAL, requestPtr);
684 template<typename SentDataType>
685 void Processor::signal(const MethodID methodID,
686 const std::shared_ptr<SentDataType>& data)
688 Lock lock(mStateMutex);
689 const auto it = mSignalsPeers.find(methodID);
690 if (it == mSignalsPeers.end()) {
691 LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID);
694 for (const PeerID peerID : it->second) {
695 auto requestPtr = SignalRequest::create<SentDataType>(methodID, peerID, data);
696 mRequestQueue.pushBack(Event::SIGNAL, requestPtr);
704 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP