1 /* Copyright (C) 2013 BMW Group
2 * Author: Manfred Bathelt (manfred.bathelt@bmw.de)
3 * Author: Juergen Gehring (juergen.gehring@bmw.de)
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #ifndef DEMO_MAIN_LOOP_H_
9 #define DEMO_MAIN_LOOP_H_
12 #if !defined (COMMONAPI_INTERNAL_COMPILATION)
13 #define COMMONAPI_INTERNAL_COMPILATION
15 #include <CommonAPI/MainLoopContext.h>
16 #undef COMMONAPI_INTERNAL_COMPILATION
26 #include <sys/eventfd.h>
37 typedef ::DemoPollFd DemoMainLoopPollFd;
39 typedef pollfd DemoMainLoopPollFd;
45 MainLoop(const MainLoop&) = delete;
46 MainLoop& operator=(const MainLoop&) = delete;
47 MainLoop(MainLoop&&) = delete;
48 MainLoop& operator=(MainLoop&&) = delete;
50 explicit MainLoop(std::shared_ptr<MainLoopContext> context) :
51 context_(context), currentMinimalTimeoutInterval_(TIMEOUT_INFINITE), running_(false), breakLoop_(false) {
54 WSAEVENT wsaEvent = WSACreateEvent();
56 if (wsaEvent != WSA_INVALID_EVENT) {
57 wakeFd_.fd = PtrToInt(wsaEvent);
60 wakeFd_.isWsaEvent = true;
62 wakeFd_.fd = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
64 wakeFd_.events = POLLIN;
66 assert(wakeFd_.fd != -1);
67 registerFileDescriptor(wakeFd_);
69 dispatchSourceListenerSubscription_ = context_->subscribeForDispatchSources(
70 std::bind(&CommonAPI::MainLoop::registerDispatchSource, this, std::placeholders::_1, std::placeholders::_2),
71 std::bind(&CommonAPI::MainLoop::deregisterDispatchSource, this, std::placeholders::_1));
72 watchListenerSubscription_ = context_->subscribeForWatches(
73 std::bind(&CommonAPI::MainLoop::registerWatch, this, std::placeholders::_1, std::placeholders::_2),
74 std::bind(&CommonAPI::MainLoop::deregisterWatch, this, std::placeholders::_1));
75 timeoutSourceListenerSubscription_ = context_->subscribeForTimeouts(
76 std::bind(&CommonAPI::MainLoop::registerTimeout, this, std::placeholders::_1, std::placeholders::_2),
77 std::bind(&CommonAPI::MainLoop::deregisterTimeout, this, std::placeholders::_1));
78 wakeupListenerSubscription_ = context_->subscribeForWakeupEvents(
79 std::bind(&CommonAPI::MainLoop::wakeup, this));
83 deregisterFileDescriptor(wakeFd_);
85 context_->unsubscribeForDispatchSources(dispatchSourceListenerSubscription_);
86 context_->unsubscribeForWatches(watchListenerSubscription_);
87 context_->unsubscribeForTimeouts(timeoutSourceListenerSubscription_);
88 context_->unsubscribeForWakeupEvents(wakeupListenerSubscription_);
91 WSACloseEvent(IntToPtr(wakeFd_.fd));
98 * \brief Runs the mainloop indefinitely until stop() is called.
100 * Runs the mainloop indefinitely until stop() is called. The given timeout (milliseconds)
101 * will be overridden if a timeout-event is present that defines an earlier ready time.
103 void run(const int64_t& timeoutInterval = TIMEOUT_INFINITE) {
106 doSingleIteration(timeoutInterval);
116 * \brief Executes a single cycle of the mainloop.
118 * Subsequently calls prepare(), poll(), check() and, if necessary, dispatch().
119 * The given timeout (milliseconds) represents the maximum time
120 * this iteration will remain in the poll state. All other steps
121 * are handled in a non-blocking way. Note however that a source
122 * might claim to have infinite amounts of data to dispatch.
123 * This demo-implementation of a Mainloop will dispatch a source
124 * until it no longer claims to have data to dispatch.
125 * Dispatch will not be called if no sources, watches and timeouts
126 * claim to be ready during the check()-phase.
128 * @param timeout The maximum poll-timeout for this iteration.
130 void doSingleIteration(const int64_t& timeout = TIMEOUT_INFINITE) {
139 * The given timeout is a maximum timeout in ms, measured from the current time in the future
140 * (a value of 0 means "no timeout"). It will be overridden if a timeout-event is present
141 * that defines an earlier ready time.
143 void prepare(const int64_t& timeout = TIMEOUT_INFINITE) {
144 currentMinimalTimeoutInterval_ = timeout;
146 for (auto dispatchSourceIterator = registeredDispatchSources_.begin();
147 dispatchSourceIterator != registeredDispatchSources_.end();
148 dispatchSourceIterator++) {
150 int64_t dispatchTimeout = TIMEOUT_INFINITE;
151 if(dispatchSourceIterator->second->prepare(dispatchTimeout)) {
152 sourcesToDispatch_.insert(*dispatchSourceIterator);
153 } else if (dispatchTimeout < currentMinimalTimeoutInterval_) {
154 currentMinimalTimeoutInterval_ = dispatchTimeout;
158 int64_t currentContextTime = getCurrentTimeInMs();
160 for (auto timeoutPriorityRange = registeredTimeouts_.begin();
161 timeoutPriorityRange != registeredTimeouts_.end();
162 timeoutPriorityRange++) {
164 int64_t intervalToReady = timeoutPriorityRange->second->getReadyTime() - currentContextTime;
166 if (intervalToReady <= 0) {
167 timeoutsToDispatch_.insert(*timeoutPriorityRange);
168 currentMinimalTimeoutInterval_ = TIMEOUT_NONE;
169 } else if (intervalToReady < currentMinimalTimeoutInterval_) {
170 currentMinimalTimeoutInterval_ = intervalToReady;
176 for (auto fileDescriptor = managedFileDescriptors_.begin() + 1; fileDescriptor != managedFileDescriptors_.end(); ++fileDescriptor) {
177 (*fileDescriptor).revents = 0;
180 size_t numReadyFileDescriptors = ::poll(&(managedFileDescriptors_[0]), managedFileDescriptors_.size(), currentMinimalTimeoutInterval_);
182 // If no FileDescriptors are ready, poll returned because of a timeout that has expired.
183 // The only case in which this is not the reason is when the timeout handed in "prepare"
184 // expired before any other timeouts.
185 if(!numReadyFileDescriptors) {
186 int64_t currentContextTime = getCurrentTimeInMs();
188 for (auto timeoutPriorityRange = registeredTimeouts_.begin();
189 timeoutPriorityRange != registeredTimeouts_.end();
190 timeoutPriorityRange++) {
192 int64_t intervalToReady = timeoutPriorityRange->second->getReadyTime() - currentContextTime;
194 if (intervalToReady <= 0) {
195 timeoutsToDispatch_.insert(*timeoutPriorityRange);
200 if (wakeFd_.revents) {
206 //The first file descriptor always is the loop's wakeup-descriptor. All others need to be linked to a watch.
207 for (auto fileDescriptor = managedFileDescriptors_.begin() + 1; fileDescriptor != managedFileDescriptors_.end(); ++fileDescriptor) {
208 for (auto registeredWatchIterator = registeredWatches_.begin();
209 registeredWatchIterator != registeredWatches_.end();
210 registeredWatchIterator++) {
211 const auto& correspondingWatchPriority = registeredWatchIterator->first;
212 const auto& correspondingWatchPair = registeredWatchIterator->second;
214 if (std::get<0>(correspondingWatchPair) == fileDescriptor->fd && fileDescriptor->revents) {
215 watchesToDispatch_.insert( { correspondingWatchPriority, {std::get<1>(correspondingWatchPair)} } );
220 for(auto dispatchSourceIterator = registeredDispatchSources_.begin(); dispatchSourceIterator != registeredDispatchSources_.end(); ++dispatchSourceIterator) {
221 if((std::get<1>(*dispatchSourceIterator))->check()) {
222 sourcesToDispatch_.insert( {std::get<0>(*dispatchSourceIterator), std::get<1>(*dispatchSourceIterator)});
226 return !timeoutsToDispatch_.empty() || !watchesToDispatch_.empty() || !sourcesToDispatch_.empty();
230 for (auto timeoutIterator = timeoutsToDispatch_.begin();
231 timeoutIterator != timeoutsToDispatch_.end();
233 std::get<1>(*timeoutIterator)->dispatch();
236 for (auto watchIterator = watchesToDispatch_.begin();
237 watchIterator != watchesToDispatch_.end();
239 Watch* watch = watchIterator->second;
240 const unsigned int flags = 7;
241 watch->dispatch(flags);
245 for (auto dispatchSourceIterator = sourcesToDispatch_.begin();
246 dispatchSourceIterator != sourcesToDispatch_.end() && !breakLoop_;
247 dispatchSourceIterator++) {
249 while(std::get<1>(*dispatchSourceIterator)->dispatch());
252 timeoutsToDispatch_.clear();
253 sourcesToDispatch_.clear();
254 watchesToDispatch_.clear();
259 HANDLE h = IntToPtr(wakeFd_.fd);
263 ::write(wakeFd_.fd, &wake, sizeof(int64_t));
268 void registerFileDescriptor(const DemoMainLoopPollFd& fileDescriptor) {
269 managedFileDescriptors_.push_back(fileDescriptor);
272 void deregisterFileDescriptor(const DemoMainLoopPollFd& fileDescriptor) {
273 for (auto it = managedFileDescriptors_.begin(); it != managedFileDescriptors_.end(); it++) {
274 if ((*it).fd == fileDescriptor.fd) {
275 managedFileDescriptors_.erase(it);
281 void registerDispatchSource(DispatchSource* dispatchSource, const DispatchPriority dispatchPriority) {
282 registeredDispatchSources_.insert( {dispatchPriority, dispatchSource} );
285 void deregisterDispatchSource(DispatchSource* dispatchSource) {
286 for(auto dispatchSourceIterator = registeredDispatchSources_.begin();
287 dispatchSourceIterator != registeredDispatchSources_.end();
288 dispatchSourceIterator++) {
290 if(dispatchSourceIterator->second == dispatchSource) {
291 registeredDispatchSources_.erase(dispatchSourceIterator);
298 void registerWatch(Watch* watch, const DispatchPriority dispatchPriority) {
300 DemoMainLoopPollFd fdToRegister = watch->getAssociatedFileDescriptor();
301 fdToRegister.isWsaEvent = false;
303 DemoMainLoopPollFd fdToRegister = watch->getAssociatedFileDescriptor();
306 registerFileDescriptor(fdToRegister);
307 registeredWatches_.insert( { dispatchPriority, {watch->getAssociatedFileDescriptor().fd, watch} } );
310 void deregisterWatch(Watch* watch) {
311 deregisterFileDescriptor(watch->getAssociatedFileDescriptor());
313 for(auto watchIterator = registeredWatches_.begin();
314 watchIterator != registeredWatches_.end();
317 if(watchIterator->second.second == watch) {
318 registeredWatches_.erase(watchIterator);
324 void registerTimeout(Timeout* timeout, const DispatchPriority dispatchPriority) {
325 registeredTimeouts_.insert( {dispatchPriority, timeout} );
328 void deregisterTimeout(Timeout* timeout) {
329 for(auto timeoutIterator = registeredTimeouts_.begin();
330 timeoutIterator != registeredTimeouts_.end();
333 if(timeoutIterator->second == timeout) {
334 registeredTimeouts_.erase(timeoutIterator);
340 void acknowledgeWakeup() {
342 HANDLE h = IntToPtr(wakeFd_.fd);
346 while (::read(wakeFd_.fd, &buffer, sizeof(int64_t)) == sizeof(buffer));
350 std::shared_ptr<MainLoopContext> context_;
352 std::vector<DemoMainLoopPollFd> managedFileDescriptors_;
354 std::multimap<DispatchPriority, DispatchSource*> registeredDispatchSources_;
355 std::multimap<DispatchPriority, std::pair<int, Watch*>> registeredWatches_;
356 std::multimap<DispatchPriority, Timeout*> registeredTimeouts_;
358 std::set<std::pair<DispatchPriority, DispatchSource*>> sourcesToDispatch_;
359 std::set<std::pair<DispatchPriority, Watch*>> watchesToDispatch_;
360 std::set<std::pair<DispatchPriority, Timeout*>> timeoutsToDispatch_;
362 DispatchSourceListenerSubscription dispatchSourceListenerSubscription_;
363 WatchListenerSubscription watchListenerSubscription_;
364 TimeoutSourceListenerSubscription timeoutSourceListenerSubscription_;
365 WakeupListenerSubscription wakeupListenerSubscription_;
367 int64_t currentMinimalTimeoutInterval_;
371 DemoMainLoopPollFd wakeFd_;
375 } // namespace CommonAPI
377 #endif /* DEMO_MAIN_LOOP_H_ */