Imported Upstream version 1.64.0
[platform/upstream/boost.git] / boost / asio / detail / impl / kqueue_reactor.ipp
1 //
2 // detail/impl/kqueue_reactor.ipp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 // Copyright (c) 2005 Stefan Arentz (stefan at soze dot com)
7 //
8 // Distributed under the Boost Software License, Version 1.0. (See accompanying
9 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10 //
11
12 #ifndef BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
13 #define BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
14
15 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
16 # pragma once
17 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
18
19 #include <boost/asio/detail/config.hpp>
20
21 #if defined(BOOST_ASIO_HAS_KQUEUE)
22
23 #include <boost/asio/detail/kqueue_reactor.hpp>
24 #include <boost/asio/detail/throw_error.hpp>
25 #include <boost/asio/error.hpp>
26
27 #include <boost/asio/detail/push_options.hpp>
28
29 #if defined(__NetBSD__)
30 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
31     EV_SET(ev, ident, filt, flags, fflags, data, \
32       reinterpret_cast<intptr_t>(static_cast<void*>(udata)))
33 #else
34 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
35     EV_SET(ev, ident, filt, flags, fflags, data, udata)
36 #endif
37
38 namespace boost {
39 namespace asio {
40 namespace detail {
41
42 kqueue_reactor::kqueue_reactor(boost::asio::io_service& io_service)
43   : boost::asio::detail::service_base<kqueue_reactor>(io_service),
44     io_service_(use_service<io_service_impl>(io_service)),
45     mutex_(),
46     kqueue_fd_(do_kqueue_create()),
47     interrupter_(),
48     shutdown_(false)
49 {
50   struct kevent events[1];
51   BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
52       EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
53   if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
54   {
55     boost::system::error_code error(errno,
56         boost::asio::error::get_system_category());
57     boost::asio::detail::throw_error(error);
58   }
59 }
60
61 kqueue_reactor::~kqueue_reactor()
62 {
63   close(kqueue_fd_);
64 }
65
66 void kqueue_reactor::shutdown_service()
67 {
68   mutex::scoped_lock lock(mutex_);
69   shutdown_ = true;
70   lock.unlock();
71
72   op_queue<operation> ops;
73
74   while (descriptor_state* state = registered_descriptors_.first())
75   {
76     for (int i = 0; i < max_ops; ++i)
77       ops.push(state->op_queue_[i]);
78     state->shutdown_ = true;
79     registered_descriptors_.free(state);
80   }
81
82   timer_queues_.get_all_timers(ops);
83
84   io_service_.abandon_operations(ops);
85 }
86
87 void kqueue_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)
88 {
89   if (fork_ev == boost::asio::io_service::fork_child)
90   {
91     // The kqueue descriptor is automatically closed in the child.
92     kqueue_fd_ = -1;
93     kqueue_fd_ = do_kqueue_create();
94
95     interrupter_.recreate();
96
97     struct kevent events[2];
98     BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
99         EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
100     if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
101     {
102       boost::system::error_code ec(errno,
103           boost::asio::error::get_system_category());
104       boost::asio::detail::throw_error(ec, "kqueue interrupter registration");
105     }
106
107     // Re-register all descriptors with kqueue.
108     mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
109     for (descriptor_state* state = registered_descriptors_.first();
110         state != 0; state = state->next_)
111     {
112       if (state->num_kevents_ > 0)
113       {
114         BOOST_ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_,
115             EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
116         BOOST_ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_,
117             EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
118         if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1)
119         {
120           boost::system::error_code ec(errno,
121               boost::asio::error::get_system_category());
122           boost::asio::detail::throw_error(ec, "kqueue re-registration");
123         }
124       }
125     }
126   }
127 }
128
129 void kqueue_reactor::init_task()
130 {
131   io_service_.init_task();
132 }
133
134 int kqueue_reactor::register_descriptor(socket_type descriptor,
135     kqueue_reactor::per_descriptor_data& descriptor_data)
136 {
137   descriptor_data = allocate_descriptor_state();
138
139   mutex::scoped_lock lock(descriptor_data->mutex_);
140
141   descriptor_data->descriptor_ = descriptor;
142   descriptor_data->num_kevents_ = 0;
143   descriptor_data->shutdown_ = false;
144
145   return 0;
146 }
147
148 int kqueue_reactor::register_internal_descriptor(
149     int op_type, socket_type descriptor,
150     kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
151 {
152   descriptor_data = allocate_descriptor_state();
153
154   mutex::scoped_lock lock(descriptor_data->mutex_);
155
156   descriptor_data->descriptor_ = descriptor;
157   descriptor_data->num_kevents_ = 1;
158   descriptor_data->shutdown_ = false;
159   descriptor_data->op_queue_[op_type].push(op);
160
161   struct kevent events[1];
162   BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
163       EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
164   if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
165     return errno;
166
167   return 0;
168 }
169
170 void kqueue_reactor::move_descriptor(socket_type,
171     kqueue_reactor::per_descriptor_data& target_descriptor_data,
172     kqueue_reactor::per_descriptor_data& source_descriptor_data)
173 {
174   target_descriptor_data = source_descriptor_data;
175   source_descriptor_data = 0;
176 }
177
178 void kqueue_reactor::start_op(int op_type, socket_type descriptor,
179     kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
180     bool is_continuation, bool allow_speculative)
181 {
182   if (!descriptor_data)
183   {
184     op->ec_ = boost::asio::error::bad_descriptor;
185     post_immediate_completion(op, is_continuation);
186     return;
187   }
188
189   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
190
191   if (descriptor_data->shutdown_)
192   {
193     post_immediate_completion(op, is_continuation);
194     return;
195   }
196
197   if (descriptor_data->op_queue_[op_type].empty())
198   {
199     static const int num_kevents[max_ops] = { 1, 2, 1 };
200
201     if (allow_speculative
202         && (op_type != read_op
203           || descriptor_data->op_queue_[except_op].empty()))
204     {
205       if (op->perform())
206       {
207         descriptor_lock.unlock();
208         io_service_.post_immediate_completion(op, is_continuation);
209         return;
210       }
211
212       if (descriptor_data->num_kevents_ < num_kevents[op_type])
213       {
214         struct kevent events[2];
215         BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
216             EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
217         BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
218             EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
219         if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1)
220         {
221           descriptor_data->num_kevents_ = num_kevents[op_type];
222         }
223         else
224         {
225           op->ec_ = boost::system::error_code(errno,
226               boost::asio::error::get_system_category());
227           io_service_.post_immediate_completion(op, is_continuation);
228           return;
229         }
230       }
231     }
232     else
233     {
234       if (descriptor_data->num_kevents_ < num_kevents[op_type])
235         descriptor_data->num_kevents_ = num_kevents[op_type];
236
237       struct kevent events[2];
238       BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
239           EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
240       BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
241           EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
242       ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
243     }
244   }
245
246   descriptor_data->op_queue_[op_type].push(op);
247   io_service_.work_started();
248 }
249
250 void kqueue_reactor::cancel_ops(socket_type,
251     kqueue_reactor::per_descriptor_data& descriptor_data)
252 {
253   if (!descriptor_data)
254     return;
255
256   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
257
258   op_queue<operation> ops;
259   for (int i = 0; i < max_ops; ++i)
260   {
261     while (reactor_op* op = descriptor_data->op_queue_[i].front())
262     {
263       op->ec_ = boost::asio::error::operation_aborted;
264       descriptor_data->op_queue_[i].pop();
265       ops.push(op);
266     }
267   }
268
269   descriptor_lock.unlock();
270
271   io_service_.post_deferred_completions(ops);
272 }
273
274 void kqueue_reactor::deregister_descriptor(socket_type descriptor,
275     kqueue_reactor::per_descriptor_data& descriptor_data, bool closing)
276 {
277   if (!descriptor_data)
278     return;
279
280   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
281
282   if (!descriptor_data->shutdown_)
283   {
284     if (closing)
285     {
286       // The descriptor will be automatically removed from the kqueue when it
287       // is closed.
288     }
289     else
290     {
291       struct kevent events[2];
292       BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
293           EVFILT_READ, EV_DELETE, 0, 0, 0);
294       BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
295           EVFILT_WRITE, EV_DELETE, 0, 0, 0);
296       ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
297     }
298
299     op_queue<operation> ops;
300     for (int i = 0; i < max_ops; ++i)
301     {
302       while (reactor_op* op = descriptor_data->op_queue_[i].front())
303       {
304         op->ec_ = boost::asio::error::operation_aborted;
305         descriptor_data->op_queue_[i].pop();
306         ops.push(op);
307       }
308     }
309
310     descriptor_data->descriptor_ = -1;
311     descriptor_data->shutdown_ = true;
312
313     descriptor_lock.unlock();
314
315     free_descriptor_state(descriptor_data);
316     descriptor_data = 0;
317
318     io_service_.post_deferred_completions(ops);
319   }
320 }
321
322 void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
323     kqueue_reactor::per_descriptor_data& descriptor_data)
324 {
325   if (!descriptor_data)
326     return;
327
328   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
329
330   if (!descriptor_data->shutdown_)
331   {
332     struct kevent events[2];
333     BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
334         EVFILT_READ, EV_DELETE, 0, 0, 0);
335     BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
336         EVFILT_WRITE, EV_DELETE, 0, 0, 0);
337     ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
338
339     op_queue<operation> ops;
340     for (int i = 0; i < max_ops; ++i)
341       ops.push(descriptor_data->op_queue_[i]);
342
343     descriptor_data->descriptor_ = -1;
344     descriptor_data->shutdown_ = true;
345
346     descriptor_lock.unlock();
347
348     free_descriptor_state(descriptor_data);
349     descriptor_data = 0;
350   }
351 }
352
353 void kqueue_reactor::run(bool block, op_queue<operation>& ops)
354 {
355   mutex::scoped_lock lock(mutex_);
356
357   // Determine how long to block while waiting for events.
358   timespec timeout_buf = { 0, 0 };
359   timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
360
361   lock.unlock();
362
363   // Block on the kqueue descriptor.
364   struct kevent events[128];
365   int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
366
367   // Dispatch the waiting events.
368   for (int i = 0; i < num_events; ++i)
369   {
370     void* ptr = reinterpret_cast<void*>(events[i].udata);
371     if (ptr == &interrupter_)
372     {
373       interrupter_.reset();
374     }
375     else
376     {
377       descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
378       mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
379
380       if (events[i].filter == EVFILT_WRITE
381           && descriptor_data->num_kevents_ == 2
382           && descriptor_data->op_queue_[write_op].empty())
383       {
384         // Some descriptor types, like serial ports, don't seem to support
385         // EV_CLEAR with EVFILT_WRITE. Since we have no pending write
386         // operations we'll remove the EVFILT_WRITE registration here so that
387         // we don't end up in a tight spin.
388         struct kevent delete_events[1];
389         BOOST_ASIO_KQUEUE_EV_SET(&delete_events[0],
390             descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
391         ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0);
392         descriptor_data->num_kevents_ = 1;
393       }
394
395       // Exception operations must be processed first to ensure that any
396       // out-of-band data is read before normal data.
397 #if defined(__NetBSD__)
398       static const unsigned int filter[max_ops] =
399 #else
400       static const int filter[max_ops] =
401 #endif
402         { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
403       for (int j = max_ops - 1; j >= 0; --j)
404       {
405         if (events[i].filter == filter[j])
406         {
407           if (j != except_op || events[i].flags & EV_OOBAND)
408           {
409             while (reactor_op* op = descriptor_data->op_queue_[j].front())
410             {
411               if (events[i].flags & EV_ERROR)
412               {
413                 op->ec_ = boost::system::error_code(
414                     static_cast<int>(events[i].data),
415                     boost::asio::error::get_system_category());
416                 descriptor_data->op_queue_[j].pop();
417                 ops.push(op);
418               }
419               if (op->perform())
420               {
421                 descriptor_data->op_queue_[j].pop();
422                 ops.push(op);
423               }
424               else
425                 break;
426             }
427           }
428         }
429       }
430     }
431   }
432
433   lock.lock();
434   timer_queues_.get_ready_timers(ops);
435 }
436
437 void kqueue_reactor::interrupt()
438 {
439   interrupter_.interrupt();
440 }
441
442 int kqueue_reactor::do_kqueue_create()
443 {
444   int fd = ::kqueue();
445   if (fd == -1)
446   {
447     boost::system::error_code ec(errno,
448         boost::asio::error::get_system_category());
449     boost::asio::detail::throw_error(ec, "kqueue");
450   }
451   return fd;
452 }
453
454 kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state()
455 {
456   mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
457   return registered_descriptors_.alloc();
458 }
459
460 void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s)
461 {
462   mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
463   registered_descriptors_.free(s);
464 }
465
466 void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue)
467 {
468   mutex::scoped_lock lock(mutex_);
469   timer_queues_.insert(&queue);
470 }
471
472 void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue)
473 {
474   mutex::scoped_lock lock(mutex_);
475   timer_queues_.erase(&queue);
476 }
477
478 timespec* kqueue_reactor::get_timeout(timespec& ts)
479 {
480   // By default we will wait no longer than 5 minutes. This will ensure that
481   // any changes to the system clock are detected after no longer than this.
482   long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
483   ts.tv_sec = usec / 1000000;
484   ts.tv_nsec = (usec % 1000000) * 1000;
485   return &ts;
486 }
487
488 } // namespace detail
489 } // namespace asio
490 } // namespace boost
491
492 #undef BOOST_ASIO_KQUEUE_EV_SET
493
494 #include <boost/asio/detail/pop_options.hpp>
495
496 #endif // defined(BOOST_ASIO_HAS_KQUEUE)
497
498 #endif // BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP