removed poll emulation for windows
[profile/ivi/common-api-dbus-runtime.git] / src / test / DemoMainLoop.h
1 /* Copyright (C) 2013 BMW Group
2  * Author: Manfred Bathelt (manfred.bathelt@bmw.de)
3  * Author: Juergen Gehring (juergen.gehring@bmw.de)
4  * This Source Code Form is subject to the terms of the Mozilla Public
5  * License, v. 2.0. If a copy of the MPL was not distributed with this
6  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7
8 #ifndef DEMO_MAIN_LOOP_H_
9 #define DEMO_MAIN_LOOP_H_
10
11
12 #if !defined (COMMONAPI_INTERNAL_COMPILATION)
13 #define COMMONAPI_INTERNAL_COMPILATION
14 #endif
15 #include <CommonAPI/MainLoopContext.h>
16 #undef COMMONAPI_INTERNAL_COMPILATION
17
18 #include <vector>
19 #include <set>
20 #include <map>
21 #ifdef WIN32
22 #include <WinSock2.h>
23 #include "DemoPoll.h"
24 #else
25 #include <poll.h>
26 #include <sys/eventfd.h>
27 #include <unistd.h>
28 #endif
29
30
31 #include <cassert>
32
33
34 namespace CommonAPI {
35
36 #ifdef WIN32
37         typedef ::DemoPollFd DemoMainLoopPollFd;
38 #else
39     typedef pollfd DemoMainLoopPollFd;
40 #endif
41
42 class MainLoop {
43  public:
44     MainLoop() = delete;
45     MainLoop(const MainLoop&) = delete;
46     MainLoop& operator=(const MainLoop&) = delete;
47     MainLoop(MainLoop&&) = delete;
48     MainLoop& operator=(MainLoop&&) = delete;
49
50     explicit MainLoop(std::shared_ptr<MainLoopContext> context) :
51             context_(context), currentMinimalTimeoutInterval_(TIMEOUT_INFINITE), running_(false), breakLoop_(false) {
52         
53 #ifdef WIN32
54                 WSAEVENT wsaEvent = WSACreateEvent();
55
56                 if (wsaEvent != WSA_INVALID_EVENT) {
57                         wakeFd_.fd = PtrToInt(wsaEvent);
58                 }
59
60                 wakeFd_.isWsaEvent = true;
61 #else
62                 wakeFd_.fd = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
63 #endif
64         wakeFd_.events = POLLIN;
65
66         assert(wakeFd_.fd != -1);
67         registerFileDescriptor(wakeFd_);
68
69         dispatchSourceListenerSubscription_ = context_->subscribeForDispatchSources(
70                 std::bind(&CommonAPI::MainLoop::registerDispatchSource, this, std::placeholders::_1, std::placeholders::_2),
71                 std::bind(&CommonAPI::MainLoop::deregisterDispatchSource, this, std::placeholders::_1));
72         watchListenerSubscription_ = context_->subscribeForWatches(
73                 std::bind(&CommonAPI::MainLoop::registerWatch, this, std::placeholders::_1, std::placeholders::_2),
74                 std::bind(&CommonAPI::MainLoop::deregisterWatch, this, std::placeholders::_1));
75         timeoutSourceListenerSubscription_ = context_->subscribeForTimeouts(
76                 std::bind(&CommonAPI::MainLoop::registerTimeout, this, std::placeholders::_1, std::placeholders::_2),
77                 std::bind(&CommonAPI::MainLoop::deregisterTimeout, this, std::placeholders::_1));
78         wakeupListenerSubscription_ = context_->subscribeForWakeupEvents(
79                 std::bind(&CommonAPI::MainLoop::wakeup, this));
80     }
81
82     ~MainLoop() {
83         deregisterFileDescriptor(wakeFd_);
84
85         context_->unsubscribeForDispatchSources(dispatchSourceListenerSubscription_);
86         context_->unsubscribeForWatches(watchListenerSubscription_);
87         context_->unsubscribeForTimeouts(timeoutSourceListenerSubscription_);
88         context_->unsubscribeForWakeupEvents(wakeupListenerSubscription_);
89
90 #ifdef WIN32
91                 WSACloseEvent(IntToPtr(wakeFd_.fd));
92 #else
93                 close(wakeFd_.fd);
94 #endif
95     }
96
97     /**
98      * \brief Runs the mainloop indefinitely until stop() is called.
99      *
100      * Runs the mainloop indefinitely until stop() is called. The given timeout (milliseconds)
101      * will be overridden if a timeout-event is present that defines an earlier ready time.
102      */
103     void run(const int64_t& timeoutInterval = TIMEOUT_INFINITE) {
104         running_ = true;
105         while(running_) {
106             doSingleIteration(timeoutInterval);
107         }
108     }
109
110     void stop() {
111         running_ = false;
112         wakeup();
113     }
114
115     /**
116      * \brief Executes a single cycle of the mainloop.
117      *
118      * Subsequently calls prepare(), poll(), check() and, if necessary, dispatch().
119      * The given timeout (milliseconds) represents the maximum time
120      * this iteration will remain in the poll state. All other steps
121      * are handled in a non-blocking way. Note however that a source
122      * might claim to have infinite amounts of data to dispatch.
123      * This demo-implementation of a Mainloop will dispatch a source
124      * until it no longer claims to have data to dispatch.
125      * Dispatch will not be called if no sources, watches and timeouts
126      * claim to be ready during the check()-phase.
127      *
128      * @param timeout The maximum poll-timeout for this iteration.
129      */
130     void doSingleIteration(const int64_t& timeout = TIMEOUT_INFINITE) {
131         prepare(timeout);
132         poll();
133         if(check()) {
134             dispatch();
135         }
136     }
137
138     /*
139      * The given timeout is a maximum timeout in ms, measured from the current time in the future
140      * (a value of 0 means "no timeout"). It will be overridden if a timeout-event is present
141      * that defines an earlier ready time.
142      */
143     void prepare(const int64_t& timeout = TIMEOUT_INFINITE) {
144         currentMinimalTimeoutInterval_ = timeout;
145
146         for (auto dispatchSourceIterator = registeredDispatchSources_.begin();
147                         dispatchSourceIterator != registeredDispatchSources_.end();
148                         dispatchSourceIterator++) {
149
150             int64_t dispatchTimeout = TIMEOUT_INFINITE;
151             if(dispatchSourceIterator->second->prepare(dispatchTimeout)) {
152                 sourcesToDispatch_.insert(*dispatchSourceIterator);
153             } else if (dispatchTimeout < currentMinimalTimeoutInterval_) {
154                 currentMinimalTimeoutInterval_ = dispatchTimeout;
155             }
156         }
157
158         int64_t currentContextTime = getCurrentTimeInMs();
159
160         for (auto timeoutPriorityRange = registeredTimeouts_.begin();
161                         timeoutPriorityRange != registeredTimeouts_.end();
162                         timeoutPriorityRange++) {
163
164             int64_t intervalToReady = timeoutPriorityRange->second->getReadyTime() - currentContextTime;
165
166             if (intervalToReady <= 0) {
167                 timeoutsToDispatch_.insert(*timeoutPriorityRange);
168                 currentMinimalTimeoutInterval_ = TIMEOUT_NONE;
169             } else if (intervalToReady < currentMinimalTimeoutInterval_) {
170                 currentMinimalTimeoutInterval_ = intervalToReady;
171             }
172         }
173     }
174
175     void poll() {
176         for (auto fileDescriptor = managedFileDescriptors_.begin() + 1; fileDescriptor != managedFileDescriptors_.end(); ++fileDescriptor) {
177             (*fileDescriptor).revents = 0;
178         }
179
180         size_t numReadyFileDescriptors = ::poll(&(managedFileDescriptors_[0]), managedFileDescriptors_.size(), currentMinimalTimeoutInterval_);
181
182         // If no FileDescriptors are ready, poll returned because of a timeout that has expired.
183         // The only case in which this is not the reason is when the timeout handed in "prepare"
184         // expired before any other timeouts.
185         if(!numReadyFileDescriptors) {
186             int64_t currentContextTime = getCurrentTimeInMs();
187
188             for (auto timeoutPriorityRange = registeredTimeouts_.begin();
189                             timeoutPriorityRange != registeredTimeouts_.end();
190                             timeoutPriorityRange++) {
191
192                 int64_t intervalToReady = timeoutPriorityRange->second->getReadyTime() - currentContextTime;
193
194                 if (intervalToReady <= 0) {
195                     timeoutsToDispatch_.insert(*timeoutPriorityRange);
196                 }
197             }
198         }
199
200         if (wakeFd_.revents) {
201             acknowledgeWakeup();
202         }
203     }
204
205     bool check() {
206         //The first file descriptor always is the loop's wakeup-descriptor. All others need to be linked to a watch.
207         for (auto fileDescriptor = managedFileDescriptors_.begin() + 1; fileDescriptor != managedFileDescriptors_.end(); ++fileDescriptor) {
208             for (auto registeredWatchIterator = registeredWatches_.begin();
209                         registeredWatchIterator != registeredWatches_.end();
210                         registeredWatchIterator++) {
211                 const auto& correspondingWatchPriority = registeredWatchIterator->first;
212                 const auto& correspondingWatchPair = registeredWatchIterator->second;
213
214                 if (std::get<0>(correspondingWatchPair) == fileDescriptor->fd && fileDescriptor->revents) {
215                     watchesToDispatch_.insert( { correspondingWatchPriority, {std::get<1>(correspondingWatchPair)} } );
216                 }
217             }
218         }
219
220         for(auto dispatchSourceIterator = registeredDispatchSources_.begin(); dispatchSourceIterator != registeredDispatchSources_.end(); ++dispatchSourceIterator) {
221             if((std::get<1>(*dispatchSourceIterator))->check()) {
222                 sourcesToDispatch_.insert( {std::get<0>(*dispatchSourceIterator), std::get<1>(*dispatchSourceIterator)});
223             }
224         }
225
226         return !timeoutsToDispatch_.empty() || !watchesToDispatch_.empty() || !sourcesToDispatch_.empty();
227     }
228
229     void dispatch() {
230         for (auto timeoutIterator = timeoutsToDispatch_.begin();
231                 timeoutIterator != timeoutsToDispatch_.end();
232                 timeoutIterator++) {
233             std::get<1>(*timeoutIterator)->dispatch();
234         }
235
236         for (auto watchIterator = watchesToDispatch_.begin();
237                 watchIterator != watchesToDispatch_.end();
238                 watchIterator++) {
239             Watch* watch = watchIterator->second;
240             const unsigned int flags = 7;
241             watch->dispatch(flags);
242         }
243
244         breakLoop_ = false;
245         for (auto dispatchSourceIterator = sourcesToDispatch_.begin();
246                         dispatchSourceIterator != sourcesToDispatch_.end() && !breakLoop_;
247                         dispatchSourceIterator++) {
248
249             while(std::get<1>(*dispatchSourceIterator)->dispatch());
250         }
251
252         timeoutsToDispatch_.clear();
253         sourcesToDispatch_.clear();
254         watchesToDispatch_.clear();
255     }
256
257     void wakeup() {
258 #ifdef WIN32
259                 HANDLE h = IntToPtr(wakeFd_.fd);
260                 SetEvent(h);
261 #else
262                 int64_t wake = 1;
263                 ::write(wakeFd_.fd, &wake, sizeof(int64_t));
264 #endif
265     }
266
267  private:
268         void registerFileDescriptor(const DemoMainLoopPollFd& fileDescriptor) {
269         managedFileDescriptors_.push_back(fileDescriptor);
270     }
271
272         void deregisterFileDescriptor(const DemoMainLoopPollFd& fileDescriptor) {
273         for (auto it = managedFileDescriptors_.begin(); it != managedFileDescriptors_.end(); it++) {
274             if ((*it).fd == fileDescriptor.fd) {
275                 managedFileDescriptors_.erase(it);
276                 break;
277             }
278         }
279     }
280
281     void registerDispatchSource(DispatchSource* dispatchSource, const DispatchPriority dispatchPriority) {
282         registeredDispatchSources_.insert( {dispatchPriority, dispatchSource} );
283     }
284
285     void deregisterDispatchSource(DispatchSource* dispatchSource) {
286         for(auto dispatchSourceIterator = registeredDispatchSources_.begin();
287                 dispatchSourceIterator != registeredDispatchSources_.end();
288                 dispatchSourceIterator++) {
289
290             if(dispatchSourceIterator->second == dispatchSource) {
291                 registeredDispatchSources_.erase(dispatchSourceIterator);
292                 break;
293             }
294         }
295         breakLoop_ = true;
296     }
297
298     void registerWatch(Watch* watch, const DispatchPriority dispatchPriority) {
299 #ifdef WIN32
300                 DemoMainLoopPollFd fdToRegister = watch->getAssociatedFileDescriptor();
301                 fdToRegister.isWsaEvent = false;
302 #else
303                 DemoMainLoopPollFd fdToRegister = watch->getAssociatedFileDescriptor();
304 #endif
305
306                 registerFileDescriptor(fdToRegister);
307         registeredWatches_.insert( { dispatchPriority, {watch->getAssociatedFileDescriptor().fd, watch} } );
308     }
309
310     void deregisterWatch(Watch* watch) {
311         deregisterFileDescriptor(watch->getAssociatedFileDescriptor());
312
313         for(auto watchIterator = registeredWatches_.begin();
314                 watchIterator != registeredWatches_.end();
315                 watchIterator++) {
316
317             if(watchIterator->second.second == watch) {
318                 registeredWatches_.erase(watchIterator);
319                 break;
320             }
321         }
322     }
323
324     void registerTimeout(Timeout* timeout, const DispatchPriority dispatchPriority) {
325         registeredTimeouts_.insert( {dispatchPriority, timeout} );
326     }
327
328     void deregisterTimeout(Timeout* timeout) {
329         for(auto timeoutIterator = registeredTimeouts_.begin();
330                 timeoutIterator != registeredTimeouts_.end();
331                 timeoutIterator++) {
332
333             if(timeoutIterator->second == timeout) {
334                 registeredTimeouts_.erase(timeoutIterator);
335                 break;
336             }
337         }
338     }
339
340     void acknowledgeWakeup() {
341 #ifdef WIN32
342                 HANDLE h = IntToPtr(wakeFd_.fd);
343                 ResetEvent(h);
344 #else
345                 int64_t buffer;
346                 while (::read(wakeFd_.fd, &buffer, sizeof(int64_t)) == sizeof(buffer));
347 #endif
348     }
349
350     std::shared_ptr<MainLoopContext> context_;
351
352         std::vector<DemoMainLoopPollFd> managedFileDescriptors_;
353
354     std::multimap<DispatchPriority, DispatchSource*> registeredDispatchSources_;
355     std::multimap<DispatchPriority, std::pair<int, Watch*>> registeredWatches_;
356     std::multimap<DispatchPriority, Timeout*> registeredTimeouts_;
357
358     std::set<std::pair<DispatchPriority, DispatchSource*>> sourcesToDispatch_;
359     std::set<std::pair<DispatchPriority, Watch*>> watchesToDispatch_;
360     std::set<std::pair<DispatchPriority, Timeout*>> timeoutsToDispatch_;
361
362     DispatchSourceListenerSubscription dispatchSourceListenerSubscription_;
363     WatchListenerSubscription watchListenerSubscription_;
364     TimeoutSourceListenerSubscription timeoutSourceListenerSubscription_;
365     WakeupListenerSubscription wakeupListenerSubscription_;
366
367     int64_t currentMinimalTimeoutInterval_;
368     bool breakLoop_;
369     bool running_;
370
371         DemoMainLoopPollFd wakeFd_;
372 };
373
374
375 } // namespace CommonAPI
376
377 #endif /* DEMO_MAIN_LOOP_H_ */