2 * Copyright (c) 2015 Samsung Electronics Co., Ltd All Rights Reserved
4 * Contact: Jan Olszak <j.olszak@samsung.com>
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License
21 * @author Jan Olszak (j.olszak@samsung.com)
22 * @brief Data and event processing thread
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
28 #include "ipc/internals/socket.hpp"
29 #include "ipc/internals/request-queue.hpp"
30 #include "ipc/internals/method-request.hpp"
31 #include "ipc/internals/signal-request.hpp"
32 #include "ipc/internals/add-peer-request.hpp"
33 #include "ipc/internals/remove-peer-request.hpp"
34 #include "ipc/internals/finish-request.hpp"
35 #include "ipc/exception.hpp"
36 #include "ipc/types.hpp"
37 #include "config/manager.hpp"
38 #include "config/fields.hpp"
39 #include "logger/logger.hpp"
40 #include "logger/logger-scope.hpp"
44 #include <condition_variable>
52 #include <unordered_map>
57 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
58 const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
61 * This class wraps communication via UX sockets
63 * It's intended to be used both in Client and Service classes.
64 * It uses a serialization mechanism from libConfig.
65 * Library user will only have to pass the types that each call will send and receive
68 * - MethodID - probably casted enum.
69 * MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
70 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
71 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
74 * - synchronous call to many peers
75 * - implement HandlerStore class for storing both signals and methods
76 * - API for removing signals
77 * - implement CallbackStore - thread safe calling/setting callbacks
78 * - helper function for removing from unordered map
79 * - new way to generate UIDs
80 * - callbacks for serialization/parsing
81 * - store Sockets in a vector, maybe SocketStore?
82 * - poll loop outside.
83 * - waiting till the EventQueue is empty before leaving stop()
84 * - no new events added after stop() called
85 * - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
86 * there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
92 FINISH, // Shutdown request
93 METHOD, // New method call in the queue
94 SIGNAL, // New signal call in the queue
95 ADD_PEER, // New peer in the queue
96 REMOVE_PEER // Remove peer
101 friend std::ostream& operator<<(std::ostream& os, const Processor::Event& event);
104 * Used to indicate a message with the return value.
106 static const MethodID RETURN_METHOD_ID;
109 * Indicates an Processor's internal request/broadcast to register a Signal
111 static const MethodID REGISTER_SIGNAL_METHOD_ID;
114 * Constructs the Processor, but doesn't start it.
115 * The object is ready to add methods.
117 * @param newPeerCallback called when a new peer arrives
118 * @param removedPeerCallback called when the Processor stops listening for this peer
120 Processor(const std::string& logName = "",
121 const PeerCallback& newPeerCallback = nullptr,
122 const PeerCallback& removedPeerCallback = nullptr,
123 const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
126 Processor(const Processor&) = delete;
127 Processor(Processor&&) = delete;
128 Processor& operator=(const Processor&) = delete;
132 * Start the processing thread.
133 * Quits immediately after starting the thread.
135 * @param usesExternalPolling internal or external polling is used
137 void start(const bool usesExternalPolling);
140 * @return is processor running
145 * Stops the processing thread.
146 * No incoming data will be handled after.
151 * Set the callback called for each new connection to a peer
153 * @param newPeerCallback the callback
155 void setNewPeerCallback(const PeerCallback& newPeerCallback);
158 * Set the callback called when connection to a peer is lost
160 * @param removedPeerCallback the callback
162 void setRemovedPeerCallback(const PeerCallback& removedPeerCallback);
165 * From now on socket is owned by the Processor object.
166 * Calls the newPeerCallback.
168 * @param socketPtr pointer to the new socket
169 * @return peerFD of the new socket
171 FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
174 * Request removing peer and wait
176 * @param peerFD id of the peer
178 void removePeer(const FileDescriptor peerFD);
181 * Saves the callbacks connected to the method id.
182 * When a message with the given method id is received,
183 * the data will be passed to the serialization callback through file descriptor.
185 * Then the process callback will be called with the parsed data.
187 * @param methodID API dependent id of the method
188 * @param process data processing callback
189 * @tparam SentDataType data type to send
190 * @tparam ReceivedDataType data type to receive
192 template<typename SentDataType, typename ReceivedDataType>
193 void setMethodHandler(const MethodID methodID,
194 const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
197 * Saves the callbacks connected to the method id.
198 * When a message with the given method id is received,
199 * the data will be passed to the serialization callback through file descriptor.
201 * Then the process callback will be called with the parsed data.
202 * There is no return data to send back.
204 * Adding signal sends a registering message to all peers
206 * @param methodID API dependent id of the method
207 * @param process data processing callback
208 * @tparam ReceivedDataType data type to receive
210 template<typename ReceivedDataType>
211 void setSignalHandler(const MethodID methodID,
212 const typename SignalHandler<ReceivedDataType>::type& process);
215 * Removes the callback
217 * @param methodID API dependent id of the method
219 void removeMethod(const MethodID methodID);
222 * Synchronous method call.
224 * @param methodID API dependent id of the method
225 * @param peerFD id of the peer
226 * @param data data to sent
227 * @param timeoutMS how long to wait for the return value before throw
228 * @tparam SentDataType data type to send
229 * @tparam ReceivedDataType data type to receive
231 template<typename SentDataType, typename ReceivedDataType>
232 std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
233 const FileDescriptor peerFD,
234 const std::shared_ptr<SentDataType>& data,
235 unsigned int timeoutMS = 500);
238 * Asynchronous method call
240 * @param methodID API dependent id of the method
241 * @param peerFD id of the peer
242 * @param data data to sent
243 * @param process callback processing the return data
244 * @tparam SentDataType data type to send
245 * @tparam ReceivedDataType data type to receive
247 template<typename SentDataType, typename ReceivedDataType>
248 MessageID callAsync(const MethodID methodID,
249 const FileDescriptor peerFD,
250 const std::shared_ptr<SentDataType>& data,
251 const typename ResultHandler<ReceivedDataType>::type& process);
255 * Send a signal to the peer.
256 * There is no return value from the peer
257 * Sends any data only if a peer registered this a signal
259 * @param methodID API dependent id of the method
260 * @param data data to sent
261 * @tparam SentDataType data type to send
263 template<typename SentDataType>
264 void signal(const MethodID methodID,
265 const std::shared_ptr<SentDataType>& data);
269 * Handler used in external polling.
271 * @param peerFD file description identifying the peer
272 * @return should the polling structure be rebuild
274 bool handleLostConnection(const FileDescriptor peerFD);
277 * Handles input from one peer.
278 * Handler used in external polling.
280 * @param peerFD file description identifying the peer
281 * @return should the polling structure be rebuild
283 bool handleInput(const FileDescriptor peerFD);
286 * Handle one event from the internal event's queue
288 * @return should the polling structure be rebuild
293 * @return file descriptor for the internal event's queue
295 FileDescriptor getEventFD();
298 typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
299 typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
300 typedef std::unique_lock<std::recursive_mutex> Lock;
301 typedef RequestQueue<Event>::Request Request;
304 CONFIG_REGISTER_EMPTY
307 struct RegisterSignalsMessage {
308 RegisterSignalsMessage() = default;
309 RegisterSignalsMessage(const std::vector<MethodID> ids)
312 std::vector<MethodID> ids;
320 struct MethodHandlers {
321 MethodHandlers(const MethodHandlers& other) = delete;
322 MethodHandlers& operator=(const MethodHandlers&) = delete;
323 MethodHandlers() = default;
324 MethodHandlers(MethodHandlers&&) = default;
325 MethodHandlers& operator=(MethodHandlers &&) = default;
327 SerializeCallback serialize;
329 MethodHandler<void, void>::type method;
332 struct SignalHandlers {
333 SignalHandlers(const SignalHandlers& other) = delete;
334 SignalHandlers& operator=(const SignalHandlers&) = delete;
335 SignalHandlers() = default;
336 SignalHandlers(SignalHandlers&&) = default;
337 SignalHandlers& operator=(SignalHandlers &&) = default;
340 SignalHandler<void>::type signal;
343 struct ReturnCallbacks {
344 ReturnCallbacks(const ReturnCallbacks& other) = delete;
345 ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
346 ReturnCallbacks() = default;
347 ReturnCallbacks(ReturnCallbacks&&) = default;
348 ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
350 ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler<void>::type& process)
351 : peerFD(peerFD), parse(parse), process(process) {}
353 FileDescriptor peerFD;
355 ResultHandler<void>::type process;
358 std::string mLogPrefix;
360 RequestQueue<Event> mRequestQueue;
363 bool mUsesExternalPolling;
365 std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
366 std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
367 std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
369 std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
370 std::vector<struct pollfd> mFDs;
372 std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
374 // Mutex for modifying any internal data
375 std::recursive_mutex mStateMutex;
377 PeerCallback mNewPeerCallback;
378 PeerCallback mRemovedPeerCallback;
380 unsigned int mMaxNumberOfPeers;
384 template<typename SentDataType, typename ReceivedDataType>
385 void setMethodHandlerInternal(const MethodID methodID,
386 const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
388 template<typename ReceivedDataType>
389 static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
394 bool onMethodRequest(MethodRequest& request);
395 bool onSignalRequest(SignalRequest& request);
396 bool onAddPeerRequest(AddPeerRequest& request);
397 bool onRemovePeerRequest(RemovePeerRequest& request);
398 bool onFinishRequest(FinishRequest& request);
400 bool handleLostConnections();
403 bool onReturnValue(const Socket& socket,
404 const MessageID messageID);
405 bool onRemoteCall(const Socket& socket,
406 const MethodID methodID,
407 const MessageID messageID,
408 std::shared_ptr<MethodHandlers> methodCallbacks);
409 bool onRemoteSignal(const Socket& socket,
410 const MethodID methodID,
411 const MessageID messageID,
412 std::shared_ptr<SignalHandlers> signalCallbacks);
414 FileDescriptor getNextFileDescriptor();
415 void removePeerInternal(const FileDescriptor peerFD, Status status);
417 std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
418 std::shared_ptr<RegisterSignalsMessage>& data);
423 template<typename SentDataType, typename ReceivedDataType>
424 void Processor::setMethodHandlerInternal(const MethodID methodID,
425 const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
427 MethodHandlers methodCall;
429 methodCall.parse = [](const int fd)->std::shared_ptr<void> {
430 std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
431 config::loadFromFD<ReceivedDataType>(fd, *data);
435 methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
436 config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
439 methodCall.method = [method](const FileDescriptor peerFD, std::shared_ptr<void>& data)->std::shared_ptr<void> {
440 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
441 return method(peerFD, tmpData);
445 Lock lock(mStateMutex);
446 mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
450 template<typename SentDataType, typename ReceivedDataType>
451 void Processor::setMethodHandler(const MethodID methodID,
452 const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
454 if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
455 LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
456 throw IPCException("Forbidden methodID: " + std::to_string(methodID));
460 Lock lock(mStateMutex);
462 if (mSignalsCallbacks.count(methodID)) {
463 LOGE(mLogPrefix + "MethodID used by a signal: " << methodID);
464 throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
467 setMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
472 template<typename ReceivedDataType>
473 void Processor::setSignalHandler(const MethodID methodID,
474 const typename SignalHandler<ReceivedDataType>::type& handler)
476 if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
477 LOGE(mLogPrefix + "Forbidden methodID: " << methodID);
478 throw IPCException("Forbidden methodID: " + std::to_string(methodID));
481 std::shared_ptr<RegisterSignalsMessage> data;
482 std::vector<FileDescriptor> peerFDs;
484 Lock lock(mStateMutex);
486 // Andd the signal handler:
487 if (mMethodsCallbacks.count(methodID)) {
488 LOGE(mLogPrefix + "MethodID used by a method: " << methodID);
489 throw IPCException("MethodID used by a method: " + std::to_string(methodID));
492 SignalHandlers signalCall;
494 signalCall.parse = [](const int fd)->std::shared_ptr<void> {
495 std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
496 config::loadFromFD<ReceivedDataType>(fd, *data);
500 signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
501 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
502 handler(peerFD, tmpData);
505 mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
507 // Broadcast the new signal:
508 std::vector<MethodID> ids {methodID};
509 data = std::make_shared<RegisterSignalsMessage>(ids);
511 for (const auto kv : mSockets) {
512 peerFDs.push_back(kv.first);
516 for (const auto peerFD : peerFDs) {
517 callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
520 DEFAULT_METHOD_TIMEOUT);
525 template<typename SentDataType, typename ReceivedDataType>
526 MessageID Processor::callAsync(const MethodID methodID,
527 const FileDescriptor peerFD,
528 const std::shared_ptr<SentDataType>& data,
529 const typename ResultHandler<ReceivedDataType>::type& process)
531 Lock lock(mStateMutex);
532 auto request = MethodRequest::create<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
533 mRequestQueue.push(Event::METHOD, request);
534 return request->messageID;
538 template<typename SentDataType, typename ReceivedDataType>
539 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
540 const FileDescriptor peerFD,
541 const std::shared_ptr<SentDataType>& data,
542 unsigned int timeoutMS)
544 std::shared_ptr<ReceivedDataType> result;
547 std::condition_variable cv;
548 Status returnStatus = ipc::Status::UNDEFINED;
550 auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
551 returnStatus = status;
552 result = returnedData;
556 MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
561 auto isResultInitialized = [&returnStatus]() {
562 return returnStatus != ipc::Status::UNDEFINED;
565 std::unique_lock<std::mutex> lock(mutex);
566 LOGT(mLogPrefix + "Waiting for the response...");
567 if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
568 LOGW(mLogPrefix + "Probably a timeout in callSync. Checking...");
571 Lock lock(mStateMutex);
572 // Call isn't sent or call is sent but there is no reply
573 isTimeout = mRequestQueue.removeIf([messageID](Request & request) {
574 return request.requestID == Event::METHOD &&
575 request.get<MethodRequest>()->messageID == messageID;
577 || mRequestQueue.removeIf([messageID](Request & request) {
578 return request.requestID == Event::SIGNAL &&
579 request.get<SignalRequest>()->messageID == messageID;
581 || 1 == mReturnCallbacks.erase(messageID);
584 LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
586 throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
588 LOGW(mLogPrefix + "Timeout started during the return value processing, so wait for it to finish");
589 if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
590 LOGE(mLogPrefix + "Function call timeout; methodID: " << methodID);
591 throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
596 throwOnError(returnStatus);
601 template<typename SentDataType>
602 void Processor::signal(const MethodID methodID,
603 const std::shared_ptr<SentDataType>& data)
605 Lock lock(mStateMutex);
606 const auto it = mSignalsPeers.find(methodID);
607 if (it == mSignalsPeers.end()) {
608 LOGW(mLogPrefix + "No peer is handling signal with methodID: " << methodID);
611 for (const FileDescriptor peerFD : it->second) {
612 auto request = SignalRequest::create<SentDataType>(methodID, peerFD, data);
613 mRequestQueue.push(Event::SIGNAL, request);
622 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP