IPC via UX sockets
[platform/core/security/vasum.git] / common / ipc / internals / processor.hpp
1 /*
2 *  Copyright (c) 2014 Samsung Electronics Co., Ltd All Rights Reserved
3 *
4 *  Contact: Jan Olszak <j.olszak@samsung.com>
5 *
6 *  Licensed under the Apache License, Version 2.0 (the "License");
7 *  you may not use this file except in compliance with the License.
8 *  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *  Unless required by applicable law or agreed to in writing, software
13 *  distributed under the License is distributed on an "AS IS" BASIS,
14 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License
17 */
18
19 /**
20  * @file
21  * @author  Jan Olszak (j.olszak@samsung.com)
22  * @brief   Data and event processing thread
23  */
24
25 #ifndef COMMON_IPC_INTERNALS_PROCESSOR_HPP
26 #define COMMON_IPC_INTERNALS_PROCESSOR_HPP
27
28 #include "ipc/internals/socket.hpp"
29 #include "ipc/internals/event-queue.hpp"
30 #include "ipc/exception.hpp"
31 #include "ipc/types.hpp"
32 #include "config/manager.hpp"
33 #include "config/is-visitable.hpp"
34 #include "logger/logger.hpp"
35
36 #include <poll.h>
37
38 #include <atomic>
39 #include <condition_variable>
40 #include <queue>
41 #include <mutex>
42 #include <chrono>
43 #include <vector>
44 #include <thread>
45 #include <string>
46 #include <functional>
47 #include <unordered_map>
48
49 namespace security_containers {
50 namespace ipc {
51 namespace {
52 const unsigned int DEFAULT_MAX_NUMBER_OF_PEERS = 500;
53 }
54 /**
55 * This class wraps communication via UX sockets
56 *
57 * It's intended to be used both in Client and Service classes.
58 * It uses a serialization mechanism from libConfig.
59 * Library user will only have to pass the types that each call will send and receive
60 *
61 * Message format:
62 * - MethodID  - probably casted enum.
63 *               MethodID == std::numeric_limits<MethodID>::max() is reserved for return messages
64 * - MessageID - unique id of a message exchange sent by this object instance. Used to identify reply messages.
65 * - Rest: The data written in a callback. One type per method.ReturnCallbacks
66 *
67 * TODO:
68 *  - error codes passed to async callbacks
69 *  - remove ReturnCallbacks on peer disconnect
70 *  - on sync timeout erase the return callback
71 *  - don't throw timeout if the message is already processed
72 *  - naming convention or methods that just commissions the PROCESS thread to do something
73 *  - removePeer API function
74 */
75 class Processor {
76 public:
77     typedef std::function<void(int)> PeerCallback;
78     typedef unsigned int PeerID;
79     typedef unsigned int MethodID;
80
81     /**
82      * Method ID. Used to indicate a message with the return value.
83      */
84     static const MethodID RETURN_METHOD_ID;
85     /**
86      * Constructs the Processor, but doesn't start it.
87      * The object is ready to add methods.
88      *
89      * @param newPeerCallback called when a new peer arrives
90      * @param removedPeerCallback called when the Processor stops listening for this peer
91      */
92     Processor(const PeerCallback& newPeerCallback = nullptr,
93               const PeerCallback& removedPeerCallback = nullptr,
94               const unsigned int maxNumberOfPeers = DEFAULT_MAX_NUMBER_OF_PEERS);
95     ~Processor();
96
97     Processor(const Processor&) = delete;
98     Processor(Processor&&) = delete;
99     Processor& operator=(const Processor&) = delete;
100
101     /**
102      * Start the processing thread.
103      * Quits immediately after starting the thread.
104      */
105     void start();
106
107     /**
108      * Stops the processing thread.
109      * No incoming data will be handled after.
110      */
111     void stop();
112
113     /**
114      * From now on socket is owned by the Processor object.
115      * Calls the newPeerCallback.
116      *
117      * @param socketPtr pointer to the new socket
118      * @return peerID of the new socket
119      */
120     PeerID addPeer(const std::shared_ptr<Socket>& socketPtr);
121
122     /**
123      * Saves the callbacks connected to the method id.
124      * When a message with the given method id is received,
125      * the data will be passed to the serialization callback through file descriptor.
126      *
127      * Then the process callback will be called with the parsed data.
128      *
129      * @param methodID API dependent id of the method
130      * @param process data processing callback
131      * @tparam SentDataType data type to send
132      * @tparam ReceivedDataType data type to receive
133      */
134     template<typename SentDataType, typename ReceivedDataType>
135     void addMethodHandler(const MethodID methodID,
136                           const typename MethodHandler<SentDataType, ReceivedDataType>::type& process);
137
138     /**
139      * Removes the callback
140      *
141      * @param methodID API dependent id of the method
142      */
143     void removeMethod(const MethodID methodID);
144
145     /**
146      * Synchronous method call.
147      *
148      * @param methodID API dependent id of the method
149      * @param peerID id of the peer
150      * @param data data to sent
151      * @param timeoutMS how long to wait for the return value before throw
152      * @tparam SentDataType data type to send
153      * @tparam ReceivedDataType data type to receive
154      */
155     template<typename SentDataType, typename ReceivedDataType>
156     std::shared_ptr<ReceivedDataType> callSync(const MethodID methodID,
157                                                const PeerID peerID,
158                                                const std::shared_ptr<SentDataType>& data,
159                                                unsigned int timeoutMS = 500);
160
161     /**
162      * Asynchronous method call
163      *
164      * @param methodID API dependent id of the method
165      * @param peerID id of the peer
166      * @param data data to sent
167      * @param process callback processing the return data
168      * @tparam SentDataType data type to send
169      * @tparam ReceivedDataType data type to receive
170      */
171     template<typename SentDataType, typename ReceivedDataType>
172     void callAsync(const MethodID methodID,
173                    const PeerID peerID,
174                    const std::shared_ptr<SentDataType>& data,
175                    const typename ResultHandler<ReceivedDataType>::type& process);
176
177
178 private:
179     typedef std::function<void(int fd, std::shared_ptr<void>& data)> SerializeCallback;
180     typedef std::function<std::shared_ptr<void>(int fd)> ParseCallback;
181     typedef std::lock_guard<std::mutex> Lock;
182     typedef unsigned int MessageID;
183
184     struct Call {
185         Call(const Call& other) = delete;
186         Call& operator=(const Call&) = delete;
187         Call() = default;
188         Call(Call&&) = default;
189
190         PeerID peerID;
191         MethodID methodID;
192         std::shared_ptr<void> data;
193         SerializeCallback serialize;
194         ParseCallback parse;
195         ResultHandler<void>::type process;
196     };
197
198     struct MethodHandlers {
199         MethodHandlers(const MethodHandlers& other) = delete;
200         MethodHandlers& operator=(const MethodHandlers&) = delete;
201         MethodHandlers() = default;
202         MethodHandlers(MethodHandlers&&) = default;
203         MethodHandlers& operator=(MethodHandlers &&) = default;
204
205         SerializeCallback serialize;
206         ParseCallback parse;
207         MethodHandler<void, void>::type method;
208     };
209
210     struct ReturnCallbacks {
211         ReturnCallbacks(const ReturnCallbacks& other) = delete;
212         ReturnCallbacks& operator=(const ReturnCallbacks&) = delete;
213         ReturnCallbacks() = default;
214         ReturnCallbacks(ReturnCallbacks&&) = default;
215         ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
216
217         ParseCallback parse;
218         ResultHandler<void>::type process;
219     };
220
221     struct SocketInfo {
222         SocketInfo(const SocketInfo& other) = delete;
223         SocketInfo& operator=(const SocketInfo&) = delete;
224         SocketInfo() = default;
225         SocketInfo(SocketInfo&&) = default;
226         SocketInfo& operator=(SocketInfo &&) = default;
227
228         std::shared_ptr<Socket> socketPtr;
229         PeerID peerID;
230     };
231
232     enum class Event : int {
233         FINISH,     // Shutdown request
234         CALL,       // New method call in the queue
235         NEW_PEER    // New peer in the queue
236     };
237     EventQueue<Event> mEventQueue;
238
239
240     bool mIsRunning;
241
242     // Mutex for the Calls queue and the map of methods.
243     std::mutex mCallsMutex;
244     std::queue<Call> mCalls;
245     std::unordered_map<MethodID, std::shared_ptr<MethodHandlers>> mMethodsCallbacks;
246
247     // Mutex for changing mSockets map.
248     // Shouldn't be locked on any read/write, that could block. Just copy the ptr.
249     std::mutex mSocketsMutex;
250     std::unordered_map<PeerID, std::shared_ptr<Socket> > mSockets;
251     std::queue<SocketInfo> mNewSockets;
252
253     // Mutex for modifying the map with return callbacks
254     std::mutex mReturnCallbacksMutex;
255     std::unordered_map<MessageID, ReturnCallbacks> mReturnCallbacks;
256
257
258     PeerCallback mNewPeerCallback;
259     PeerCallback mRemovedPeerCallback;
260
261     unsigned int mMaxNumberOfPeers;
262
263     std::thread mThread;
264     std::vector<struct pollfd> mFDs;
265
266     std::atomic<MessageID> mMessageIDCounter;
267     std::atomic<PeerID> mPeerIDCounter;
268
269     void run();
270     bool handleEvent();
271     void handleCall();
272     bool handleLostConnections();
273     bool handleInputs();
274     bool handleInput(const PeerID peerID, const Socket& socket);
275     void resetPolling();
276     MessageID getNextMessageID();
277     PeerID getNextPeerID();
278     Call getCall();
279     void removePeer(PeerID peerID);
280
281 };
282
283 template<typename SentDataType, typename ReceivedDataType>
284 void Processor::addMethodHandler(const MethodID methodID,
285                                  const typename MethodHandler<SentDataType, ReceivedDataType>::type& method)
286 {
287     static_assert(config::isVisitable<SentDataType>::value,
288                   "Use the libConfig library");
289     static_assert(config::isVisitable<ReceivedDataType>::value,
290                   "Use the libConfig library");
291
292     if (methodID == RETURN_METHOD_ID) {
293         LOGE("Forbidden methodID: " << methodID);
294         throw IPCException("Forbidden methodID: " + std::to_string(methodID));
295     }
296
297     using namespace std::placeholders;
298
299     MethodHandlers methodCall;
300
301     methodCall.parse = [](const int fd)->std::shared_ptr<void> {
302         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
303         config::loadFromFD<ReceivedDataType>(fd, *data);
304         return data;
305     };
306
307     methodCall.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
308         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
309     };
310
311     methodCall.method = [method](std::shared_ptr<void>& data)->std::shared_ptr<void> {
312         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
313         return method(tmpData);
314     };
315
316     {
317         Lock lock(mCallsMutex);
318         mMethodsCallbacks[methodID] = std::make_shared<MethodHandlers>(std::move(methodCall));
319     }
320 }
321
322 template<typename SentDataType, typename ReceivedDataType>
323 void Processor::callAsync(const MethodID methodID,
324                           const PeerID peerID,
325                           const std::shared_ptr<SentDataType>& data,
326                           const typename  ResultHandler<ReceivedDataType>::type& process)
327 {
328     static_assert(config::isVisitable<SentDataType>::value,
329                   "Use the libConfig library");
330     static_assert(config::isVisitable<ReceivedDataType>::value,
331                   "Use the libConfig library");
332
333     if (!mThread.joinable()) {
334         LOGE("The Processor thread is not started. Can't send any data.");
335         throw IPCException("The Processor thread is not started. Can't send any data.");
336     }
337
338     using namespace std::placeholders;
339
340     Call call;
341     call.peerID = peerID;
342     call.methodID = methodID;
343     call.data = data;
344
345     call.parse = [](const int fd)->std::shared_ptr<void> {
346         std::shared_ptr<ReceivedDataType> data(new ReceivedDataType());
347         config::loadFromFD<ReceivedDataType>(fd, *data);
348         return data;
349     };
350
351     call.serialize = [](const int fd, std::shared_ptr<void>& data)->void {
352         config::saveToFD<SentDataType>(fd, *std::static_pointer_cast<SentDataType>(data));
353     };
354
355     call.process = [process](std::shared_ptr<void>& data)->void {
356         std::shared_ptr<ReceivedDataType> tmpData = std::static_pointer_cast<ReceivedDataType>(data);
357         return process(tmpData);
358     };
359
360     {
361         Lock lock(mCallsMutex);
362         mCalls.push(std::move(call));
363     }
364
365     mEventQueue.send(Event::CALL);
366 }
367
368
369 template<typename SentDataType, typename ReceivedDataType>
370 std::shared_ptr<ReceivedDataType> Processor::callSync(const MethodID methodID,
371                                                       const PeerID peerID,
372                                                       const std::shared_ptr<SentDataType>& data,
373                                                       unsigned int timeoutMS)
374 {
375     static_assert(config::isVisitable<SentDataType>::value,
376                   "Use the libConfig library");
377     static_assert(config::isVisitable<ReceivedDataType>::value,
378                   "Use the libConfig library");
379
380     if (!mThread.joinable()) {
381         LOGE("The Processor thread is not started. Can't send any data.");
382         throw IPCException("The Processor thread is not started. Can't send any data.");
383     }
384
385     std::shared_ptr<ReceivedDataType> result;
386
387     std::mutex mtx;
388     std::unique_lock<std::mutex> lck(mtx);
389     std::condition_variable cv;
390
391     auto process = [&result, &cv](std::shared_ptr<ReceivedDataType> returnedData) {
392         result = returnedData;
393         cv.notify_one();
394     };
395
396     callAsync<SentDataType,
397               ReceivedDataType>(methodID,
398                                 peerID,
399                                 data,
400                                 process);
401
402     auto isResultInitialized = [&result]() {
403         return static_cast<bool>(result);
404     };
405
406     if (!cv.wait_for(lck, std::chrono::milliseconds(timeoutMS), isResultInitialized)) {
407         LOGE("Function call timeout; methodID: " << methodID);
408         throw IPCException("Function call timeout; methodID: " + std::to_string(methodID));
409     }
410
411     return result;
412 }
413
414
415 } // namespace ipc
416 } // namespace security_containers
417
418 #endif // COMMON_IPC_INTERNALS_PROCESSOR_HPP