2 * Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
4 * Contact: Jan Olszak <j.olszak@samsung.com>
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License
21 * @author Jan Olszak (j.olszak@samsung.com)
22 * @brief Data and event processing thread
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
28 #include "ipc/internals/socket.hpp"
29 #include "ipc/internals/event-queue.hpp"
30 #include "ipc/internals/call-queue.hpp"
31 #include "ipc/exception.hpp"
32 #include "ipc/types.hpp"
33 #include "config/manager.hpp"
34 #include "config/fields.hpp"
35 #include "logger/logger.hpp"
38 #include <condition_variable>
47 #include <unordered_map>
52 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
53 const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
56 * This class wraps communication via UX sockets
58 * It's intended to be used both in Client and Service classes.
59 * It uses a serialization mechanism from libConfig.
60 * Library user will only have to pass the types that each call will send and receive
63 * - MethodID - probably casted enum.
64 * MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
65 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
66 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
69 * - some mutexes may not be needed
70 * - synchronous call to many peers
71 * - implement HandlerStore class for storing both signals and methods
72 * - API for removing signals
73 * - implement CallbackStore - thread safe calling/setting callbacks
74 * - helper function for removing from unordered map
75 * - new way to generate UIDs
76 * - callbacks for serialization/parsing
77 * - store Sockets in a vector, maybe SocketStore?
78 * - fix valgrind tests
79 * - poll loop outside.
80 * - waiting till the EventQueue is empty before leaving stop()
81 * - no new events added after stop() called
82 * - when using IPCGSource: addFD and removeFD can be called from addPeer removePeer callbacks, but
83 * there is no mechanism to ensure the IPCSource exists.. therefore SIGSEGV :)
90 * Used to indicate a message with the return value.
92 static const MethodID RETURN_METHOD_ID;
95 * Indicates an Processor's internal request/broadcast to register a Signal
97 static const MethodID REGISTER_SIGNAL_METHOD_ID;
100 * Constructs the Processor, but doesn't start it.
101 * The object is ready to add methods.
103 * @param newPeerCallback called when a new peer arrives
104 * @param removedPeerCallback called when the Processor stops listening for this peer
106 Processor(const PeerCallback& newPeerCallback = nullptr,
107 const PeerCallback& removedPeerCallback = nullptr,
108 const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
111 Processor(const Processor&) = delete;
112 Processor(Processor&&) = delete;
113 Processor& operator=(const Processor&) = delete;
116 * Start the processing thread.
117 * Quits immediately after starting the thread.
122 * @return is processor running
127 * Stops the processing thread.
128 * No incoming data will be handled after.
133 * Set the callback called for each new connection to a peer
135 * @param newPeerCallback the callback
137 void setNewPeerCallback(const PeerCallback& newPeerCallback);
140 * Set the callback called when connection to a peer is lost
142 * @param removedPeerCallback the callback
144 void setRemovedPeerCallback(const PeerCallback& removedPeerCallback);
147 * From now on socket is owned by the Processor object.
148 * Calls the newPeerCallback.
150 * @param socketPtr pointer to the new socket
151 * @return peerFD of the new socket
153 FileDescriptor addPeer(const std::shared_ptr<Socket>& socketPtr);
156 * Request removing peer and wait
158 * @param peerFD id of the peer
160 void removePeer(const FileDescriptor peerFD);
163 * Saves the callbacks connected to the method id.
164 * When a message with the given method id is received,
165 * the data will be passed to the serialization callback through file descriptor.
167 * Then the process callback will be called with the parsed data.
169 * @param methodID API dependent id of the method
170 * @param process data processing callback
171 * @tparam SentDataType data type to send
172 * @tparam ReceivedDataType data type to receive
174 template<typename SentDataType, typename ReceivedDataType>
175 void addMethodHandler(const MethodID methodID,
176 const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
179 * Saves the callbacks connected to the method id.
180 * When a message with the given method id is received,
181 * the data will be passed to the serialization callback through file descriptor.
183 * Then the process callback will be called with the parsed data.
184 * There is no return data to send back.
186 * Adding signal sends a registering message to all peers
188 * @param methodID API dependent id of the method
189 * @param process data processing callback
190 * @tparam ReceivedDataType data type to receive
192 template<typename ReceivedDataType>
193 void addSignalHandler(const MethodID methodID,
194 const typename SignalHandler<ReceivedDataType>::type& process);
197 * Removes the callback
199 * @param methodID API dependent id of the method
201 void removeMethod(const MethodID methodID);
204 * Synchronous method call.
206 * @param methodID API dependent id of the method
207 * @param peerFD id of the peer
208 * @param data data to sent
209 * @param timeoutMS how long to wait for the return value before throw
210 * @tparam SentDataType data type to send
211 * @tparam ReceivedDataType data type to receive
213 template<typename SentDataType, typename ReceivedDataType>
214 std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
215 const FileDescriptor peerFD,
216 const std::shared_ptr<SentDataType>& data,
217 unsigned int timeoutMS = 500);
220 * Asynchronous method call
222 * @param methodID API dependent id of the method
223 * @param peerFD id of the peer
224 * @param data data to sent
225 * @param process callback processing the return data
226 * @tparam SentDataType data type to send
227 * @tparam ReceivedDataType data type to receive
229 template<typename SentDataType, typename ReceivedDataType>
230 MessageID callAsync(const MethodID methodID,
231 const FileDescriptor peerFD,
232 const std::shared_ptr<SentDataType>& data,
233 const typename ResultHandler<ReceivedDataType>::type& process);
237 * Send a signal to the peer.
238 * There is no return value from the peer
239 * Sends any data only if a peer registered this a signal
241 * @param methodID API dependent id of the method
242 * @param data data to sent
243 * @tparam SentDataType data type to send
245 template<typename SentDataType>
246 void signal(const MethodID methodID,
247 const std::shared_ptr<SentDataType>& data);
251 * Handler used in external polling.
253 * @param peerFD file description identifying the peer
254 * @return should the polling structure be rebuild
256 bool handleLostConnection(const FileDescriptor peerFD);
259 * Handles input from one peer.
260 * Handler used in external polling.
262 * @param peerFD file description identifying the peer
263 * @return should the polling structure be rebuild
265 bool handleInput(const FileDescriptor peerFD);
268 * Handle one event from the internal event's queue
270 * @return should the polling structure be rebuild
275 * @return file descriptor for the internal event's queue
277 FileDescriptor getEventFD();
280 typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
281 typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
282 typedef std::unique_lock<std::mutex> Lock;
285 CONFIG_REGISTER_EMPTY
288 struct RegisterSignalsMessage {
289 RegisterSignalsMessage() = default;
290 RegisterSignalsMessage(const std::vector<MethodID> ids)
293 std::vector<MethodID> ids;
301 struct MethodHandlers {
302 MethodHandlers(const MethodHandlers& other) = delete;
303 MethodHandlers& operator=(const MethodHandlers&) = delete;
304 MethodHandlers() = default;
305 MethodHandlers(MethodHandlers&&) = default;
306 MethodHandlers& operator=(MethodHandlers &&) = default;
308 SerializeCallback serialize;
310 MethodHandler<void, void>::type method;
313 struct SignalHandlers {
314 SignalHandlers(const SignalHandlers& other) = delete;
315 SignalHandlers& operator=(const SignalHandlers&) = delete;
316 SignalHandlers() = default;
317 SignalHandlers(SignalHandlers&&) = default;
318 SignalHandlers& operator=(SignalHandlers &&) = default;
321 SignalHandler<void>::type signal;
324 struct ReturnCallbacks {
325 ReturnCallbacks(const ReturnCallbacks& other) = delete;
326 ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
327 ReturnCallbacks() = default;
328 ReturnCallbacks(ReturnCallbacks&&) = default;
329 ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
331 ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler<void>::type& process)
332 : peerFD(peerFD), parse(parse), process(process) {}
334 FileDescriptor peerFD;
336 ResultHandler<void>::type process;
340 SocketInfo(const SocketInfo& other) = delete;
341 SocketInfo& operator=(const SocketInfo&) = delete;
342 SocketInfo() = default;
343 SocketInfo(SocketInfo&&) = default;
344 SocketInfo& operator=(SocketInfo &&) = default;
346 SocketInfo(const FileDescriptor peerFD, const std::shared_ptr<Socket>& socketPtr)
347 : peerFD(peerFD), socketPtr(socketPtr) {}
349 FileDescriptor peerFD;
350 std::shared_ptr<Socket> socketPtr;
353 struct RemovePeerRequest {
354 RemovePeerRequest(const RemovePeerRequest& other) = delete;
355 RemovePeerRequest& operator=(const RemovePeerRequest&) = delete;
356 RemovePeerRequest() = default;
357 RemovePeerRequest(RemovePeerRequest&&) = default;
358 RemovePeerRequest& operator=(RemovePeerRequest &&) = default;
360 RemovePeerRequest(const FileDescriptor peerFD,
361 const std::shared_ptr<std::condition_variable>& conditionPtr)
362 : peerFD(peerFD), conditionPtr(conditionPtr) {}
364 FileDescriptor peerFD;
365 std::shared_ptr<std::condition_variable> conditionPtr;
368 enum class Event : int {
369 FINISH, // Shutdown request
370 CALL, // New method call in the queue
371 ADD_PEER, // New peer in the queue
372 REMOVE_PEER // Remove peer
374 EventQueue<Event> mEventQueue;
379 // Mutex for the Calls queue and the map of methods.
380 std::mutex mCallsMutex;
382 std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
383 std::unordered_map<MethodID, std::shared_ptr<SignalHandlers>> mSignalsCallbacks;
384 std::unordered_map<MethodID, std::list<FileDescriptor>> mSignalsPeers;
386 // Mutex for changing mSockets map.
387 // Shouldn't be locked on any read/write, that could block. Just copy the ptr.
388 std::mutex mSocketsMutex;
389 std::unordered_map<FileDescriptor, std::shared_ptr<Socket> > mSockets;
390 std::vector<struct pollfd> mFDs;
391 std::queue<SocketInfo> mNewSockets;
392 std::queue<RemovePeerRequest> mPeersToDelete;
394 // Mutex for modifying the map with return callbacks
395 std::mutex mReturnCallbacksMutex;
396 std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
398 // Mutex for setting callbacks
399 std::mutex mCallbacksMutex;
400 PeerCallback mNewPeerCallback;
401 PeerCallback mRemovedPeerCallback;
403 unsigned int mMaxNumberOfPeers;
407 template<typename SentDataType, typename ReceivedDataType>
408 void addMethodHandlerInternal(const MethodID methodID,
409 const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
411 template<typename SentDataType, typename ReceivedDataType>
412 MessageID callInternal(const MethodID methodID,
413 const FileDescriptor peerFD,
414 const std::shared_ptr<SentDataType>& data,
415 const typename ResultHandler<ReceivedDataType>::type& process);
417 template<typename ReceivedDataType>
418 static void discardResultHandler(Status, std::shared_ptr<ReceivedDataType>&) {}
422 bool onSignalCall(CallQueue::Call& call);
423 bool onMethodCall(CallQueue::Call& call);
426 bool handleLostConnections();
429 bool onReturnValue(const Socket& socket,
430 const MessageID messageID);
431 bool onRemoteCall(const Socket& socket,
432 const MethodID methodID,
433 const MessageID messageID,
434 std::shared_ptr<MethodHandlers> methodCallbacks);
435 bool onRemoteSignal(const Socket& socket,
436 const MethodID methodID,
437 const MessageID messageID,
438 std::shared_ptr<SignalHandlers> signalCallbacks);
440 FileDescriptor getNextFileDescriptor();
441 CallQueue::Call getCall();
442 void removePeerInternal(const FileDescriptor peerFD, Status status);
444 std::shared_ptr<EmptyData> onNewSignals(const FileDescriptor peerFD,
445 std::shared_ptr<RegisterSignalsMessage>& data);
448 void cleanCommunication();
451 template<typename SentDataType, typename ReceivedDataType>
452 void Processor::addMethodHandlerInternal(const MethodID methodID,
453 const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
455 MethodHandlers methodCall;
457 methodCall.parse = [](const int fd)->std::shared_ptr<void> {
458 std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
459 config::loadFromFD<ReceivedDataType>(fd, *data);
463 methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
464 config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
467 methodCall.method = [method](const FileDescriptor peerFD, std::shared_ptr<void>& data)->std::shared_ptr<void> {
468 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
469 return method(peerFD, tmpData);
473 Lock lock(mCallsMutex);
474 mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
478 template<typename SentDataType, typename ReceivedDataType>
479 void Processor::addMethodHandler(const MethodID methodID,
480 const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
482 if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
483 LOGE("Forbidden methodID: " << methodID);
484 throw IPCException("Forbidden methodID: " + std::to_string(methodID));
488 Lock lock(mCallsMutex);
489 if (mSignalsCallbacks.count(methodID)) {
490 LOGE("MethodID used by a signal: " << methodID);
491 throw IPCException("MethodID used by a signal: " + std::to_string(methodID));
495 addMethodHandlerInternal<SentDataType, ReceivedDataType >(methodID, method);
498 template<typename ReceivedDataType>
499 void Processor::addSignalHandler(const MethodID methodID,
500 const typename SignalHandler<ReceivedDataType>::type& handler)
502 if (methodID == RETURN_METHOD_ID || methodID == REGISTER_SIGNAL_METHOD_ID) {
503 LOGE("Forbidden methodID: " << methodID);
504 throw IPCException("Forbidden methodID: " + std::to_string(methodID));
508 Lock lock(mCallsMutex);
509 if (mMethodsCallbacks.count(methodID)) {
510 LOGE("MethodID used by a method: " << methodID);
511 throw IPCException("MethodID used by a method: " + std::to_string(methodID));
515 SignalHandlers signalCall;
517 signalCall.parse = [](const int fd)->std::shared_ptr<void> {
518 std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
519 config::loadFromFD<ReceivedDataType>(fd, *data);
523 signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr<void>& data) {
524 std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
525 handler(peerFD, tmpData);
529 Lock lock(mCallsMutex);
530 mSignalsCallbacks[methodID] = std::make_shared<SignalHandlers>(std::move(signalCall));
533 std::vector<MethodID> ids {methodID};
534 auto data = std::make_shared<RegisterSignalsMessage>(ids);
536 std::list<FileDescriptor> peersFDs;
538 Lock lock(mSocketsMutex);
539 for (const auto kv : mSockets) {
540 peersFDs.push_back(kv.first);
544 for (const FileDescriptor peerFD : peersFDs) {
545 callSync<RegisterSignalsMessage, EmptyData>(REGISTER_SIGNAL_METHOD_ID,
548 DEFAULT_METHOD_TIMEOUT);
553 template<typename SentDataType, typename ReceivedDataType>
554 MessageID Processor::callInternal(const MethodID methodID,
555 const FileDescriptor peerFD,
556 const std::shared_ptr<SentDataType>& data,
557 const typename ResultHandler<ReceivedDataType>::type& process)
559 Lock lock(mCallsMutex);
560 MessageID messageID = mCalls.push<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
561 mEventQueue.send(Event::CALL);
566 template<typename SentDataType, typename ReceivedDataType>
567 MessageID Processor::callAsync(const MethodID methodID,
568 const FileDescriptor peerFD,
569 const std::shared_ptr<SentDataType>& data,
570 const typename ResultHandler<ReceivedDataType>::type& process)
572 return callInternal<SentDataType, ReceivedDataType>(methodID, peerFD, data, process);
576 template<typename SentDataType, typename ReceivedDataType>
577 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
578 const FileDescriptor peerFD,
579 const std::shared_ptr<SentDataType>& data,
580 unsigned int timeoutMS)
582 std::shared_ptr<ReceivedDataType> result;
585 std::condition_variable cv;
586 Status returnStatus = ipc::Status::UNDEFINED;
588 auto process = [&result, &mutex, &cv, &returnStatus](Status status, std::shared_ptr<ReceivedDataType> returnedData) {
589 std::unique_lock<std::mutex> lock(mutex);
590 returnStatus = status;
591 result = returnedData;
595 MessageID messageID = callAsync<SentDataType, ReceivedDataType>(methodID,
600 auto isResultInitialized = [&returnStatus]() {
601 return returnStatus != ipc::Status::UNDEFINED;
604 std::unique_lock<std::mutex> lock(mutex);
605 if (!cv.wait_for(lock, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
606 bool isTimeout = false;
608 Lock lock(mReturnCallbacksMutex);
609 if (1 == mReturnCallbacks.erase(messageID)) {
615 LOGE("Function call timeout; methodID: " << methodID);
616 throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
618 // Timeout started during the return value processing, so wait for it to finish
619 cv.wait(lock, isResultInitialized);
623 throwOnError(returnStatus);
628 template<typename SentDataType>
629 void Processor::signal(const MethodID methodID,
630 const std::shared_ptr<SentDataType>& data)
632 std::list<FileDescriptor> peersFDs;
634 Lock lock(mSocketsMutex);
635 peersFDs = mSignalsPeers[methodID];
638 for (const FileDescriptor peerFD : peersFDs) {
639 Lock lock(mCallsMutex);
640 mCalls.push<SentDataType>(methodID, peerFD, data);
641 mEventQueue.send(Event::CALL);
649 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP