2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
24 #include "protocolServer.h"
26 #include <sys/mount.h>
28 static void _dispatch_source_merge_kevent(dispatch_source_t ds,
29 const struct kevent *ke);
30 static void _dispatch_kevent_register(dispatch_source_t ds);
31 static void _dispatch_kevent_unregister(dispatch_source_t ds);
32 static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
34 static inline void _dispatch_source_timer_init(void);
35 static void _dispatch_timer_list_update(dispatch_source_t ds);
36 static inline unsigned long _dispatch_source_timer_data(
37 dispatch_source_refs_t dr, unsigned long prev);
39 static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
40 uint32_t new_flags, uint32_t del_flags);
41 static void _dispatch_drain_mach_messages(struct kevent *ke);
44 static void _dispatch_kevent_debugger(void *context);
48 #pragma mark dispatch_source_t
51 dispatch_source_create(dispatch_source_type_t type,
56 const struct kevent *proto_kev = &type->ke;
57 dispatch_source_t ds = NULL;
58 dispatch_kevent_t dk = NULL;
61 if (type == NULL || (mask & ~type->mask)) {
65 switch (type->ke.filter) {
72 #if DISPATCH_USE_VM_PRESSURE
75 case DISPATCH_EVFILT_CUSTOM_ADD:
76 case DISPATCH_EVFILT_CUSTOM_OR:
77 case DISPATCH_EVFILT_TIMER:
86 dk = calloc(1ul, sizeof(struct dispatch_kevent_s));
87 dk->dk_kevent = *proto_kev;
88 dk->dk_kevent.ident = handle;
89 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
90 dk->dk_kevent.fflags |= (uint32_t)mask;
91 dk->dk_kevent.udata = dk;
92 TAILQ_INIT(&dk->dk_sources);
94 ds = _dispatch_alloc(DISPATCH_VTABLE(source),
95 sizeof(struct dispatch_source_s));
96 // Initialize as a queue first, then override some settings below.
97 _dispatch_queue_init((dispatch_queue_t)ds);
98 strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));
101 ds->do_ref_cnt++; // the reference the manger queue holds
102 ds->do_ref_cnt++; // since source is created suspended
103 ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
104 // The initial target queue is the manager queue, in order to get
105 // the source installed. <rdar://problem/8928171>
106 ds->do_targetq = &_dispatch_mgr_q;
109 ds->ds_ident_hack = dk->dk_kevent.ident;
111 ds->ds_pending_data_mask = dk->dk_kevent.fflags;
112 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
113 ds->ds_is_level = true;
114 ds->ds_needs_rearm = true;
115 } else if (!(EV_CLEAR & proto_kev->flags)) {
116 // we cheat and use EV_CLEAR to mean a "flag thingy"
117 ds->ds_is_adder = true;
120 // Some sources require special processing
121 if (type->init != NULL) {
122 type->init(ds, type, handle, mask, q);
124 if (fastpath(!ds->ds_refs)) {
125 ds->ds_refs = calloc(1ul, sizeof(struct dispatch_source_refs_s));
126 if (slowpath(!ds->ds_refs)) {
130 ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
131 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
133 // First item on the queue sets the user-specified target queue
134 dispatch_set_target_queue(ds, q);
136 dispatch_debug(ds, "%s", __func__);
147 _dispatch_source_dispose(dispatch_source_t ds)
150 _dispatch_queue_dispose((dispatch_queue_t)ds);
154 _dispatch_source_xref_dispose(dispatch_source_t ds)
156 _dispatch_wakeup(ds);
160 dispatch_source_cancel(dispatch_source_t ds)
163 dispatch_debug(ds, "%s", __func__);
165 // Right after we set the cancel flag, someone else
166 // could potentially invoke the source, do the cancelation,
167 // unregister the source, and deallocate it. We would
168 // need to therefore retain/release before setting the bit
170 _dispatch_retain(ds);
171 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED);
172 _dispatch_wakeup(ds);
173 _dispatch_release(ds);
177 dispatch_source_testcancel(dispatch_source_t ds)
179 return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
184 dispatch_source_get_mask(dispatch_source_t ds)
186 return ds->ds_pending_data_mask;
190 dispatch_source_get_handle(dispatch_source_t ds)
192 return (int)ds->ds_ident_hack;
196 dispatch_source_get_data(dispatch_source_t ds)
202 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
204 struct kevent kev = {
205 .fflags = (typeof(kev.fflags))val,
210 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
211 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
213 _dispatch_source_merge_kevent(ds, &kev);
217 #pragma mark dispatch_source_handler
220 // 6618342 Contact the team that owns the Instrument DTrace probe before
221 // renaming this symbol
223 _dispatch_source_set_event_handler2(void *context)
225 struct Block_layout *bl = context;
227 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
228 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
229 dispatch_source_refs_t dr = ds->ds_refs;
231 if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
232 Block_release(dr->ds_handler_ctxt);
234 dr->ds_handler_func = bl ? (void *)bl->invoke : NULL;
235 dr->ds_handler_ctxt = bl;
236 ds->ds_handler_is_block = true;
240 dispatch_source_set_event_handler(dispatch_source_t ds,
241 dispatch_block_t handler)
243 handler = _dispatch_Block_copy(handler);
244 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
245 _dispatch_source_set_event_handler2);
247 #endif /* __BLOCKS__ */
250 _dispatch_source_set_event_handler_f(void *context)
252 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
253 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
254 dispatch_source_refs_t dr = ds->ds_refs;
257 if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
258 Block_release(dr->ds_handler_ctxt);
261 dr->ds_handler_func = context;
262 dr->ds_handler_ctxt = ds->do_ctxt;
263 ds->ds_handler_is_block = false;
267 dispatch_source_set_event_handler_f(dispatch_source_t ds,
268 dispatch_function_t handler)
270 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
271 _dispatch_source_set_event_handler_f);
275 // 6618342 Contact the team that owns the Instrument DTrace probe before
276 // renaming this symbol
278 _dispatch_source_set_cancel_handler2(void *context)
280 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
281 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
282 dispatch_source_refs_t dr = ds->ds_refs;
284 if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
285 Block_release(dr->ds_cancel_handler);
287 dr->ds_cancel_handler = context;
288 ds->ds_cancel_is_block = true;
292 dispatch_source_set_cancel_handler(dispatch_source_t ds,
293 dispatch_block_t handler)
295 handler = _dispatch_Block_copy(handler);
296 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
297 _dispatch_source_set_cancel_handler2);
299 #endif /* __BLOCKS__ */
302 _dispatch_source_set_cancel_handler_f(void *context)
304 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
305 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
306 dispatch_source_refs_t dr = ds->ds_refs;
309 if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
310 Block_release(dr->ds_cancel_handler);
313 dr->ds_cancel_handler = context;
314 ds->ds_cancel_is_block = false;
318 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
319 dispatch_function_t handler)
321 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
322 _dispatch_source_set_cancel_handler_f);
327 _dispatch_source_set_registration_handler2(void *context)
329 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
330 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
331 dispatch_source_refs_t dr = ds->ds_refs;
333 if (ds->ds_registration_is_block && dr->ds_registration_handler) {
334 Block_release(dr->ds_registration_handler);
336 dr->ds_registration_handler = context;
337 ds->ds_registration_is_block = true;
341 dispatch_source_set_registration_handler(dispatch_source_t ds,
342 dispatch_block_t handler)
344 handler = _dispatch_Block_copy(handler);
345 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
346 _dispatch_source_set_registration_handler2);
348 #endif /* __BLOCKS__ */
351 _dispatch_source_set_registration_handler_f(void *context)
353 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
354 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
355 dispatch_source_refs_t dr = ds->ds_refs;
358 if (ds->ds_registration_is_block && dr->ds_registration_handler) {
359 Block_release(dr->ds_registration_handler);
362 dr->ds_registration_handler = context;
363 ds->ds_registration_is_block = false;
367 dispatch_source_set_registration_handler_f(dispatch_source_t ds,
368 dispatch_function_t handler)
370 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
371 _dispatch_source_set_registration_handler_f);
375 #pragma mark dispatch_source_invoke
378 _dispatch_source_registration_callout(dispatch_source_t ds)
380 dispatch_source_refs_t dr = ds->ds_refs;
382 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
383 // no registration callout if source is canceled rdar://problem/8955246
385 if (ds->ds_registration_is_block) {
386 Block_release(dr->ds_registration_handler);
388 } else if (ds->ds_registration_is_block) {
389 dispatch_block_t b = dr->ds_registration_handler;
390 _dispatch_client_callout_block(b);
391 Block_release(dr->ds_registration_handler);
394 dispatch_function_t f = dr->ds_registration_handler;
395 _dispatch_client_callout(ds->do_ctxt, f);
397 ds->ds_registration_is_block = false;
398 dr->ds_registration_handler = NULL;
402 _dispatch_source_cancel_callout(dispatch_source_t ds)
404 dispatch_source_refs_t dr = ds->ds_refs;
406 ds->ds_pending_data_mask = 0;
407 ds->ds_pending_data = 0;
411 if (ds->ds_handler_is_block) {
412 Block_release(dr->ds_handler_ctxt);
413 ds->ds_handler_is_block = false;
414 dr->ds_handler_func = NULL;
415 dr->ds_handler_ctxt = NULL;
417 if (ds->ds_registration_is_block) {
418 Block_release(dr->ds_registration_handler);
419 ds->ds_registration_is_block = false;
420 dr->ds_registration_handler = NULL;
424 if (!dr->ds_cancel_handler) {
427 if (ds->ds_cancel_is_block) {
429 dispatch_block_t b = dr->ds_cancel_handler;
430 if (ds->ds_atomic_flags & DSF_CANCELED) {
431 _dispatch_client_callout_block(b);
433 Block_release(dr->ds_cancel_handler);
434 ds->ds_cancel_is_block = false;
437 dispatch_function_t f = dr->ds_cancel_handler;
438 if (ds->ds_atomic_flags & DSF_CANCELED) {
439 _dispatch_client_callout(ds->do_ctxt, f);
442 dr->ds_cancel_handler = NULL;
446 _dispatch_source_latch_and_call(dispatch_source_t ds)
450 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
453 dispatch_source_refs_t dr = ds->ds_refs;
454 prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0);
455 if (ds->ds_is_level) {
457 } else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
458 ds->ds_data = _dispatch_source_timer_data(dr, prev);
462 if (dispatch_assume(prev) && dr->ds_handler_func) {
463 _dispatch_client_callout(dr->ds_handler_ctxt, dr->ds_handler_func);
468 _dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
470 switch (ds->ds_dkev->dk_kevent.filter) {
471 case DISPATCH_EVFILT_TIMER:
472 // called on manager queue only
473 return _dispatch_timer_list_update(ds);
474 case EVFILT_MACHPORT:
475 if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) {
476 new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
480 if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) {
481 _dispatch_kevent_unregister(ds);
486 _dispatch_source_invoke(dispatch_source_t ds)
488 // This function performs all source actions. Each action is responsible
489 // for verifying that it takes place on the appropriate queue. If the
490 // current queue is not the correct queue for this action, the correct queue
491 // will be returned and the invoke will be re-driven on that queue.
493 // The order of tests here in invoke and in probe should be consistent.
495 dispatch_queue_t dq = _dispatch_queue_get_current();
496 dispatch_source_refs_t dr = ds->ds_refs;
498 if (!ds->ds_is_installed) {
499 // The source needs to be installed on the manager queue.
500 if (dq != &_dispatch_mgr_q) {
501 return &_dispatch_mgr_q;
503 _dispatch_kevent_register(ds);
504 if (dr->ds_registration_handler) {
505 return ds->do_targetq;
507 if (slowpath(ds->do_xref_cnt == -1)) {
508 return &_dispatch_mgr_q; // rdar://problem/9558246
510 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
511 // Source suspended by an item drained from the source queue.
513 } else if (dr->ds_registration_handler) {
514 // The source has been registered and the registration handler needs
515 // to be delivered on the target queue.
516 if (dq != ds->do_targetq) {
517 return ds->do_targetq;
519 // clears ds_registration_handler
520 _dispatch_source_registration_callout(ds);
521 if (slowpath(ds->do_xref_cnt == -1)) {
522 return &_dispatch_mgr_q; // rdar://problem/9558246
524 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
525 // The source has been cancelled and needs to be uninstalled from the
526 // manager queue. After uninstallation, the cancellation handler needs
527 // to be delivered to the target queue.
529 if (dq != &_dispatch_mgr_q) {
530 return &_dispatch_mgr_q;
532 _dispatch_kevent_unregister(ds);
534 if (dr->ds_cancel_handler || ds->ds_handler_is_block ||
535 ds->ds_registration_is_block) {
536 if (dq != ds->do_targetq) {
537 return ds->do_targetq;
540 _dispatch_source_cancel_callout(ds);
541 } else if (ds->ds_pending_data) {
542 // The source has pending data to deliver via the event handler callback
543 // on the target queue. Some sources need to be rearmed on the manager
544 // queue after event delivery.
545 if (dq != ds->do_targetq) {
546 return ds->do_targetq;
548 _dispatch_source_latch_and_call(ds);
549 if (ds->ds_needs_rearm) {
550 return &_dispatch_mgr_q;
552 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
553 // The source needs to be rearmed on the manager queue.
554 if (dq != &_dispatch_mgr_q) {
555 return &_dispatch_mgr_q;
557 _dispatch_source_kevent_resume(ds, 0);
558 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED);
565 _dispatch_source_probe(dispatch_source_t ds)
567 // This function determines whether the source needs to be invoked.
568 // The order of tests here in probe and in invoke should be consistent.
570 dispatch_source_refs_t dr = ds->ds_refs;
571 if (!ds->ds_is_installed) {
572 // The source needs to be installed on the manager queue.
574 } else if (dr->ds_registration_handler) {
575 // The registration handler needs to be delivered to the target queue.
577 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
578 // The source needs to be uninstalled from the manager queue, or the
579 // cancellation handler needs to be delivered to the target queue.
580 // Note: cancellation assumes installation.
581 if (ds->ds_dkev || dr->ds_cancel_handler
583 || ds->ds_handler_is_block || ds->ds_registration_is_block
588 } else if (ds->ds_pending_data) {
589 // The source has pending data to deliver to the target queue.
591 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
592 // The source needs to be rearmed on the manager queue.
600 #pragma mark dispatch_source_kevent
603 _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
607 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
611 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
612 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
613 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
614 if (ke->flags & EV_ERROR) {
615 if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
617 fake.flags &= ~EV_ERROR;
618 fake.fflags = NOTE_EXIT;
621 #if DISPATCH_USE_VM_PRESSURE
622 } else if (ke->filter == EVFILT_VM && ke->data == ENOTSUP) {
623 // Memory pressure kevent is not supported on all platforms
624 // <rdar://problem/8636227>
628 // log the unexpected error
629 (void)dispatch_assume_zero(ke->data);
634 if (ds->ds_is_level) {
635 // ke->data is signed and "negative available data" makes no sense
636 // zero bytes happens when EV_EOF is set
637 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
638 dispatch_assert(ke->data >= 0l);
639 ds->ds_pending_data = ~ke->data;
640 } else if (ds->ds_is_adder) {
641 (void)dispatch_atomic_add2o(ds, ds_pending_data, ke->data);
642 } else if (ke->fflags & ds->ds_pending_data_mask) {
643 (void)dispatch_atomic_or2o(ds, ds_pending_data,
644 ke->fflags & ds->ds_pending_data_mask);
647 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
648 if (ds->ds_needs_rearm) {
649 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED);
652 _dispatch_wakeup(ds);
656 _dispatch_source_drain_kevent(struct kevent *ke)
658 dispatch_kevent_t dk = ke->udata;
659 dispatch_source_refs_t dri;
662 static dispatch_once_t pred;
663 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
666 dispatch_debug_kevents(ke, 1, __func__);
669 if (ke->filter == EVFILT_MACHPORT) {
670 return _dispatch_drain_mach_messages(ke);
675 if (ke->flags & EV_ONESHOT) {
676 dk->dk_kevent.flags |= EV_ONESHOT;
679 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
680 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
685 #pragma mark dispatch_kevent_t
687 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
689 .filter = DISPATCH_EVFILT_CUSTOM_OR,
691 .udata = &_dispatch_kevent_data_or,
693 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
695 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
697 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
698 .udata = &_dispatch_kevent_data_add,
700 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
703 #if TARGET_OS_EMBEDDED
704 #define DSL_HASH_SIZE 64u // must be a power of two
706 #define DSL_HASH_SIZE 256u // must be a power of two
708 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
710 DISPATCH_CACHELINE_ALIGN
711 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
713 static dispatch_once_t __dispatch_kevent_init_pred;
716 _dispatch_kevent_init(void *context DISPATCH_UNUSED)
719 for (i = 0; i < DSL_HASH_SIZE; i++) {
720 TAILQ_INIT(&_dispatch_sources[i]);
723 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
724 &_dispatch_kevent_data_or, dk_list);
725 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
726 &_dispatch_kevent_data_add, dk_list);
728 _dispatch_source_timer_init();
731 static inline uintptr_t
732 _dispatch_kevent_hash(uintptr_t ident, short filter)
736 value = (filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
740 return DSL_HASH(value);
743 static dispatch_kevent_t
744 _dispatch_kevent_find(uintptr_t ident, short filter)
746 uintptr_t hash = _dispatch_kevent_hash(ident, filter);
747 dispatch_kevent_t dki;
749 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
750 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
758 _dispatch_kevent_insert(dispatch_kevent_t dk)
760 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
761 dk->dk_kevent.filter);
763 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
766 // Find existing kevents, and merge any new flags if necessary
768 _dispatch_kevent_register(dispatch_source_t ds)
770 dispatch_kevent_t dk;
771 typeof(dk->dk_kevent.fflags) new_flags;
772 bool do_resume = false;
774 if (ds->ds_is_installed) {
777 ds->ds_is_installed = true;
779 dispatch_once_f(&__dispatch_kevent_init_pred,
780 NULL, _dispatch_kevent_init);
782 dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident,
783 ds->ds_dkev->dk_kevent.filter);
786 // If an existing dispatch kevent is found, check to see if new flags
787 // need to be added to the existing kevent
788 new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
789 dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
792 do_resume = new_flags;
795 _dispatch_kevent_insert(dk);
796 new_flags = dk->dk_kevent.fflags;
800 TAILQ_INSERT_TAIL(&dk->dk_sources, ds->ds_refs, dr_list);
802 // Re-register the kevent with the kernel if new flags were added
803 // by the dispatch kevent
805 dk->dk_kevent.flags |= EV_ADD;
807 if (do_resume || ds->ds_needs_rearm) {
808 _dispatch_source_kevent_resume(ds, new_flags);
810 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED);
814 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
818 switch (dk->dk_kevent.filter) {
819 case DISPATCH_EVFILT_TIMER:
820 case DISPATCH_EVFILT_CUSTOM_ADD:
821 case DISPATCH_EVFILT_CUSTOM_OR:
822 // these types not registered with kevent
825 case EVFILT_MACHPORT:
826 return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
829 if (dk->dk_kevent.flags & EV_ONESHOT) {
834 r = _dispatch_update_kq(&dk->dk_kevent);
835 if (dk->dk_kevent.flags & EV_DISPATCH) {
836 dk->dk_kevent.flags &= ~EV_ADD;
843 _dispatch_kevent_dispose(dispatch_kevent_t dk)
847 switch (dk->dk_kevent.filter) {
848 case DISPATCH_EVFILT_TIMER:
849 case DISPATCH_EVFILT_CUSTOM_ADD:
850 case DISPATCH_EVFILT_CUSTOM_OR:
851 // these sources live on statically allocated lists
854 case EVFILT_MACHPORT:
855 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
859 if (dk->dk_kevent.flags & EV_ONESHOT) {
860 break; // implicitly deleted
864 if (~dk->dk_kevent.flags & EV_DELETE) {
865 dk->dk_kevent.flags |= EV_DELETE;
866 _dispatch_update_kq(&dk->dk_kevent);
871 hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
872 dk->dk_kevent.filter);
873 TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
878 _dispatch_kevent_unregister(dispatch_source_t ds)
880 dispatch_kevent_t dk = ds->ds_dkev;
881 dispatch_source_refs_t dri;
882 uint32_t del_flags, fflags = 0;
886 TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
888 if (TAILQ_EMPTY(&dk->dk_sources)) {
889 _dispatch_kevent_dispose(dk);
891 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
892 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
893 fflags |= (uint32_t)dsi->ds_pending_data_mask;
895 del_flags = (uint32_t)ds->ds_pending_data_mask & ~fflags;
897 dk->dk_kevent.flags |= EV_ADD;
898 dk->dk_kevent.fflags = fflags;
899 _dispatch_kevent_resume(dk, 0, del_flags);
903 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED);
904 ds->ds_needs_rearm = false; // re-arm is pointless and bad now
905 _dispatch_release(ds); // the retain is done at creation time
909 #pragma mark dispatch_timer
911 DISPATCH_CACHELINE_ALIGN
912 static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
913 [DISPATCH_TIMER_INDEX_WALL] = {
915 .ident = DISPATCH_TIMER_INDEX_WALL,
916 .filter = DISPATCH_EVFILT_TIMER,
917 .udata = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL],
919 .dk_sources = TAILQ_HEAD_INITIALIZER(
920 _dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL].dk_sources),
922 [DISPATCH_TIMER_INDEX_MACH] = {
924 .ident = DISPATCH_TIMER_INDEX_MACH,
925 .filter = DISPATCH_EVFILT_TIMER,
926 .udata = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH],
928 .dk_sources = TAILQ_HEAD_INITIALIZER(
929 _dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH].dk_sources),
931 [DISPATCH_TIMER_INDEX_DISARM] = {
933 .ident = DISPATCH_TIMER_INDEX_DISARM,
934 .filter = DISPATCH_EVFILT_TIMER,
935 .udata = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM],
937 .dk_sources = TAILQ_HEAD_INITIALIZER(
938 _dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM].dk_sources),
941 // Don't count disarmed timer list
942 #define DISPATCH_TIMER_COUNT ((sizeof(_dispatch_kevent_timer) \
943 / sizeof(_dispatch_kevent_timer[0])) - 1)
946 _dispatch_source_timer_init(void)
948 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)],
949 &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
950 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)],
951 &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);
952 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_DISARM)],
953 &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM], dk_list);
956 DISPATCH_ALWAYS_INLINE
957 static inline unsigned int
958 _dispatch_source_timer_idx(dispatch_source_refs_t dr)
960 return ds_timer(dr).flags & DISPATCH_TIMER_WALL_CLOCK ?
961 DISPATCH_TIMER_INDEX_WALL : DISPATCH_TIMER_INDEX_MACH;
964 DISPATCH_ALWAYS_INLINE
965 static inline uint64_t
966 _dispatch_source_timer_now2(unsigned int timer)
969 case DISPATCH_TIMER_INDEX_MACH:
970 return _dispatch_absolute_time();
971 case DISPATCH_TIMER_INDEX_WALL:
972 return _dispatch_get_nanoseconds();
974 DISPATCH_CRASH("Invalid timer");
978 DISPATCH_ALWAYS_INLINE
979 static inline uint64_t
980 _dispatch_source_timer_now(dispatch_source_refs_t dr)
982 return _dispatch_source_timer_now2(_dispatch_source_timer_idx(dr));
985 // Updates the ordered list of timers based on next fire date for changes to ds.
986 // Should only be called from the context of _dispatch_mgr_q.
988 _dispatch_timer_list_update(dispatch_source_t ds)
990 dispatch_source_refs_t dr = ds->ds_refs, dri = NULL;
992 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q);
994 // do not reschedule timers unregistered with _dispatch_kevent_unregister()
999 // Ensure the source is on the global kevent lists before it is removed and
1001 _dispatch_kevent_register(ds);
1003 TAILQ_REMOVE(&ds->ds_dkev->dk_sources, dr, dr_list);
1005 // Move timers that are disabled, suspended or have missed intervals to the
1006 // disarmed list, rearm after resume resp. source invoke will reenable them
1007 if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) ||
1008 ds->ds_pending_data) {
1009 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED);
1010 ds->ds_dkev = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM];
1011 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, (dispatch_source_refs_t)dr,
1016 // change the list if the clock type has changed
1017 ds->ds_dkev = &_dispatch_kevent_timer[_dispatch_source_timer_idx(dr)];
1019 TAILQ_FOREACH(dri, &ds->ds_dkev->dk_sources, dr_list) {
1020 if (ds_timer(dri).target == 0 ||
1021 ds_timer(dr).target < ds_timer(dri).target) {
1027 TAILQ_INSERT_BEFORE(dri, dr, dr_list);
1029 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, dr, dr_list);
1034 _dispatch_run_timers2(unsigned int timer)
1036 dispatch_source_refs_t dr;
1037 dispatch_source_t ds;
1038 uint64_t now, missed;
1040 now = _dispatch_source_timer_now2(timer);
1041 while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources))) {
1042 ds = _dispatch_source_from_refs(dr);
1043 // We may find timers on the wrong list due to a pending update from
1044 // dispatch_source_set_timer. Force an update of the list in that case.
1045 if (timer != ds->ds_ident_hack) {
1046 _dispatch_timer_list_update(ds);
1049 if (!ds_timer(dr).target) {
1050 // no configured timers on the list
1053 if (ds_timer(dr).target > now) {
1054 // Done running timers for now.
1057 // Remove timers that are suspended or have missed intervals from the
1058 // list, rearm after resume resp. source invoke will reenable them
1059 if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) {
1060 _dispatch_timer_list_update(ds);
1063 // Calculate number of missed intervals.
1064 missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
1065 if (++missed > INT_MAX) {
1068 ds_timer(dr).target += missed * ds_timer(dr).interval;
1069 _dispatch_timer_list_update(ds);
1070 ds_timer(dr).last_fire = now;
1071 (void)dispatch_atomic_add2o(ds, ds_pending_data, (int)missed);
1072 _dispatch_wakeup(ds);
1077 _dispatch_run_timers(void)
1079 dispatch_once_f(&__dispatch_kevent_init_pred,
1080 NULL, _dispatch_kevent_init);
1083 for (i = 0; i < DISPATCH_TIMER_COUNT; i++) {
1084 if (!TAILQ_EMPTY(&_dispatch_kevent_timer[i].dk_sources)) {
1085 _dispatch_run_timers2(i);
1090 static inline unsigned long
1091 _dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1093 // calculate the number of intervals since last fire
1094 unsigned long data, missed;
1095 uint64_t now = _dispatch_source_timer_now(dr);
1096 missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1097 ds_timer(dr).interval);
1098 // correct for missed intervals already delivered last time
1099 data = prev - ds_timer(dr).missed + missed;
1100 ds_timer(dr).missed = missed;
1104 // approx 1 year (60s * 60m * 24h * 365d)
1105 #define FOREVER_NSEC 31536000000000000ull
1108 _dispatch_get_next_timer_fire(struct timespec *howsoon)
1110 // <rdar://problem/6459649>
1111 // kevent(2) does not allow large timeouts, so we use a long timeout
1112 // instead (approximately 1 year).
1113 dispatch_source_refs_t dr = NULL;
1115 uint64_t now, delta_tmp, delta = UINT64_MAX;
1117 for (timer = 0; timer < DISPATCH_TIMER_COUNT; timer++) {
1118 // Timers are kept in order, first one will fire next
1119 dr = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources);
1120 if (!dr || !ds_timer(dr).target) {
1121 // Empty list or disabled timer
1124 now = _dispatch_source_timer_now(dr);
1125 if (ds_timer(dr).target <= now) {
1126 howsoon->tv_sec = 0;
1127 howsoon->tv_nsec = 0;
1130 // the subtraction cannot go negative because the previous "if"
1131 // verified that the target is greater than now.
1132 delta_tmp = ds_timer(dr).target - now;
1133 if (!(ds_timer(dr).flags & DISPATCH_TIMER_WALL_CLOCK)) {
1134 delta_tmp = _dispatch_time_mach2nano(delta_tmp);
1136 if (delta_tmp < delta) {
1140 if (slowpath(delta > FOREVER_NSEC)) {
1143 howsoon->tv_sec = (time_t)(delta / NSEC_PER_SEC);
1144 howsoon->tv_nsec = (long)(delta % NSEC_PER_SEC);
1149 struct dispatch_set_timer_params {
1150 dispatch_source_t ds;
1152 struct dispatch_timer_source_s values;
1156 _dispatch_source_set_timer3(void *context)
1158 // Called on the _dispatch_mgr_q
1159 struct dispatch_set_timer_params *params = context;
1160 dispatch_source_t ds = params->ds;
1161 ds->ds_ident_hack = params->ident;
1162 ds_timer(ds->ds_refs) = params->values;
1163 // Clear any pending data that might have accumulated on
1164 // older timer params <rdar://problem/8574886>
1165 ds->ds_pending_data = 0;
1166 _dispatch_timer_list_update(ds);
1167 dispatch_resume(ds);
1168 dispatch_release(ds);
1173 _dispatch_source_set_timer2(void *context)
1175 // Called on the source queue
1176 struct dispatch_set_timer_params *params = context;
1177 dispatch_suspend(params->ds);
1178 dispatch_barrier_async_f(&_dispatch_mgr_q, params,
1179 _dispatch_source_set_timer3);
1183 dispatch_source_set_timer(dispatch_source_t ds,
1184 dispatch_time_t start,
1188 if (slowpath(!ds->ds_is_timer)) {
1189 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1192 struct dispatch_set_timer_params *params;
1194 // we use zero internally to mean disabled
1195 if (interval == 0) {
1197 } else if ((int64_t)interval < 0) {
1198 // 6866347 - make sure nanoseconds won't overflow
1199 interval = INT64_MAX;
1201 if ((int64_t)leeway < 0) {
1205 if (start == DISPATCH_TIME_NOW) {
1206 start = _dispatch_absolute_time();
1207 } else if (start == DISPATCH_TIME_FOREVER) {
1211 while (!(params = calloc(1ul, sizeof(struct dispatch_set_timer_params)))) {
1216 params->values.flags = ds_timer(ds->ds_refs).flags;
1218 if ((int64_t)start < 0) {
1220 params->ident = DISPATCH_TIMER_INDEX_WALL;
1221 params->values.target = -((int64_t)start);
1222 params->values.interval = interval;
1223 params->values.leeway = leeway;
1224 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1227 params->ident = DISPATCH_TIMER_INDEX_MACH;
1228 params->values.target = start;
1229 params->values.interval = _dispatch_time_nano2mach(interval);
1231 // rdar://problem/7287561 interval must be at least one in
1232 // in order to avoid later division by zero when calculating
1233 // the missed interval count. (NOTE: the wall clock's
1234 // interval is already "fixed" to be 1 or more)
1235 if (params->values.interval < 1) {
1236 params->values.interval = 1;
1239 params->values.leeway = _dispatch_time_nano2mach(leeway);
1240 params->values.flags &= ~DISPATCH_TIMER_WALL_CLOCK;
1242 // Suspend the source so that it doesn't fire with pending changes
1243 // The use of suspend/resume requires the external retain/release
1244 dispatch_retain(ds);
1245 dispatch_barrier_async_f((dispatch_queue_t)ds, params,
1246 _dispatch_source_set_timer2);
1250 #pragma mark dispatch_mach
1254 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
1255 #define _dispatch_debug_machport(name) \
1256 dispatch_debug_machport((name), __func__)
1258 #define _dispatch_debug_machport(name)
1261 // Flags for all notifications that are registered/unregistered when a
1262 // send-possible notification is requested/delivered
1263 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
1264 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
1266 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
1267 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
1268 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
1270 #define _DISPATCH_MACHPORT_HASH_SIZE 32
1271 #define _DISPATCH_MACHPORT_HASH(x) \
1272 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
1274 static dispatch_source_t _dispatch_mach_notify_source;
1275 static mach_port_t _dispatch_port_set;
1276 static mach_port_t _dispatch_event_port;
1278 static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
1279 uint32_t new_flags, uint32_t del_flags, uint32_t mask,
1280 mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
1283 _dispatch_port_set_init(void *context DISPATCH_UNUSED)
1285 struct kevent kev = {
1286 .filter = EVFILT_MACHPORT,
1291 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
1292 &_dispatch_port_set);
1293 DISPATCH_VERIFY_MIG(kr);
1295 _dispatch_bug_mach_client(
1296 "_dispatch_port_set_init: mach_port_allocate() failed", kr);
1297 DISPATCH_CLIENT_CRASH(
1298 "mach_port_allocate() failed: cannot create port set");
1300 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
1301 &_dispatch_event_port);
1302 DISPATCH_VERIFY_MIG(kr);
1304 _dispatch_bug_mach_client(
1305 "_dispatch_port_set_init: mach_port_allocate() failed", kr);
1306 DISPATCH_CLIENT_CRASH(
1307 "mach_port_allocate() failed: cannot create receive right");
1309 kr = mach_port_move_member(mach_task_self(), _dispatch_event_port,
1310 _dispatch_port_set);
1311 DISPATCH_VERIFY_MIG(kr);
1313 _dispatch_bug_mach_client(
1314 "_dispatch_port_set_init: mach_port_move_member() failed", kr);
1315 DISPATCH_CLIENT_CRASH("mach_port_move_member() failed");
1318 kev.ident = _dispatch_port_set;
1320 _dispatch_update_kq(&kev);
1324 _dispatch_get_port_set(void)
1326 static dispatch_once_t pred;
1328 dispatch_once_f(&pred, NULL, _dispatch_port_set_init);
1330 return _dispatch_port_set;
1333 static kern_return_t
1334 _dispatch_kevent_machport_enable(dispatch_kevent_t dk)
1336 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1339 _dispatch_debug_machport(mp);
1340 kr = mach_port_move_member(mach_task_self(), mp, _dispatch_get_port_set());
1342 DISPATCH_VERIFY_MIG(kr);
1344 case KERN_INVALID_NAME:
1346 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
1350 case KERN_INVALID_RIGHT:
1351 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
1352 "mach_port_move_member() failed ", kr);
1355 (void)dispatch_assume_zero(kr);
1363 _dispatch_kevent_machport_disable(dispatch_kevent_t dk)
1365 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1368 _dispatch_debug_machport(mp);
1369 kr = mach_port_move_member(mach_task_self(), mp, 0);
1371 DISPATCH_VERIFY_MIG(kr);
1373 case KERN_INVALID_RIGHT:
1374 case KERN_INVALID_NAME:
1376 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
1381 (void)dispatch_assume_zero(kr);
1388 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
1391 kern_return_t kr_recv = 0, kr_sp = 0;
1393 dispatch_assert_zero(new_flags & del_flags);
1394 if (new_flags & DISPATCH_MACH_RECV_MESSAGE) {
1395 kr_recv = _dispatch_kevent_machport_enable(dk);
1396 } else if (del_flags & DISPATCH_MACH_RECV_MESSAGE) {
1397 _dispatch_kevent_machport_disable(dk);
1399 if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
1400 (del_flags & _DISPATCH_MACH_SP_FLAGS)) {
1401 // Requesting a (delayed) non-sync send-possible notification
1402 // registers for both immediate dead-name notification and delayed-arm
1403 // send-possible notification for the port.
1404 // The send-possible notification is armed when a mach_msg() with the
1405 // the MACH_SEND_NOTIFY to the port times out.
1406 // If send-possible is unavailable, fall back to immediate dead-name
1407 // registration rdar://problem/2527840&9008724
1408 kr_sp = _dispatch_mach_notify_update(dk, new_flags, del_flags,
1409 _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
1410 MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
1413 return (kr_recv ? kr_recv : kr_sp);
1417 _dispatch_drain_mach_messages(struct kevent *ke)
1419 mach_port_t name = (mach_port_name_t)ke->data;
1420 dispatch_source_refs_t dri;
1421 dispatch_kevent_t dk;
1424 if (!dispatch_assume(name)) {
1427 _dispatch_debug_machport(name);
1428 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1429 if (!dispatch_assume(dk)) {
1432 _dispatch_kevent_machport_disable(dk); // emulate EV_DISPATCH
1434 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH,
1435 DISPATCH_MACH_RECV_MESSAGE, 0, dk);
1437 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1438 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), &kev);
1443 _dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, uint32_t unreg,
1446 dispatch_source_refs_t dri;
1447 dispatch_kevent_t dk;
1450 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1455 // Update notification registration state.
1456 dk->dk_kevent.data &= ~unreg;
1458 // Re-register for notification before delivery
1459 _dispatch_kevent_resume(dk, flag, 0);
1462 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE, flag, 0, dk);
1464 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1465 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), &kev);
1467 // this can never happen again
1468 // this must happen after the merge
1469 // this may be racy in the future, but we don't provide a 'setter'
1470 // API for the mask yet
1471 _dispatch_source_from_refs(dri)->ds_pending_data_mask &= ~unreg;
1476 // no more sources have these flags
1477 dk->dk_kevent.fflags &= ~unreg;
1481 static kern_return_t
1482 _dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
1483 uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
1484 mach_port_mscount_t notify_sync)
1486 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
1487 typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
1488 kern_return_t kr, krr = 0;
1490 // Update notification registration state.
1491 dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
1492 dk->dk_kevent.data &= ~(del_flags & mask);
1494 _dispatch_debug_machport(port);
1495 if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
1496 previous = MACH_PORT_NULL;
1497 krr = mach_port_request_notification(mach_task_self(), port,
1498 notify_msgid, notify_sync, _dispatch_event_port,
1499 MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
1500 DISPATCH_VERIFY_MIG(krr);
1503 case KERN_INVALID_NAME:
1504 case KERN_INVALID_RIGHT:
1505 // Supress errors & clear registration state
1506 dk->dk_kevent.data &= ~mask;
1509 // Else, we dont expect any errors from mach. Log any errors
1510 if (dispatch_assume_zero(krr)) {
1511 // log the error & clear registration state
1512 dk->dk_kevent.data &= ~mask;
1513 } else if (dispatch_assume_zero(previous)) {
1514 // Another subsystem has beat libdispatch to requesting the
1515 // specified Mach notification on this port. We should
1516 // technically cache the previous port and message it when the
1517 // kernel messages our port. Or we can just say screw those
1518 // subsystems and deallocate the previous port.
1519 // They should adopt libdispatch :-P
1520 kr = mach_port_deallocate(mach_task_self(), previous);
1521 DISPATCH_VERIFY_MIG(kr);
1522 (void)dispatch_assume_zero(kr);
1523 previous = MACH_PORT_NULL;
1526 } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
1527 previous = MACH_PORT_NULL;
1528 kr = mach_port_request_notification(mach_task_self(), port,
1529 notify_msgid, notify_sync, MACH_PORT_NULL,
1530 MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
1531 DISPATCH_VERIFY_MIG(kr);
1534 case KERN_INVALID_NAME:
1535 case KERN_INVALID_RIGHT:
1536 case KERN_INVALID_ARGUMENT:
1539 if (dispatch_assume_zero(kr)) {
1546 if (slowpath(previous)) {
1547 // the kernel has not consumed the send-once right yet
1548 (void)dispatch_assume_zero(
1549 _dispatch_send_consume_send_once_right(previous));
1555 _dispatch_mach_notify_source2(void *context)
1557 dispatch_source_t ds = context;
1558 size_t maxsz = MAX(sizeof(union
1559 __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem),
1561 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
1563 dispatch_mig_server(ds, maxsz, libdispatch_internal_protocol_server);
1567 _dispatch_mach_notify_source_init(void *context DISPATCH_UNUSED)
1569 _dispatch_get_port_set();
1571 _dispatch_mach_notify_source = dispatch_source_create(
1572 DISPATCH_SOURCE_TYPE_MACH_RECV, _dispatch_event_port, 0,
1574 dispatch_assert(_dispatch_mach_notify_source);
1575 dispatch_set_context(_dispatch_mach_notify_source,
1576 _dispatch_mach_notify_source);
1577 dispatch_source_set_event_handler_f(_dispatch_mach_notify_source,
1578 _dispatch_mach_notify_source2);
1579 dispatch_resume(_dispatch_mach_notify_source);
1583 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
1584 mach_port_name_t name)
1587 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
1588 "deleted prematurely", name);
1591 _dispatch_debug_machport(name);
1592 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED,
1593 _DISPATCH_MACH_SP_FLAGS, true);
1595 return KERN_SUCCESS;
1599 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
1600 mach_port_name_t name)
1605 _dispatch_log("machport[0x%08x]: dead-name notification: %s",
1608 _dispatch_debug_machport(name);
1609 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD,
1610 _DISPATCH_MACH_SP_FLAGS, true);
1612 // the act of receiving a dead name notification allocates a dead-name
1613 // right that must be deallocated
1614 kr = mach_port_deallocate(mach_task_self(), name);
1615 DISPATCH_VERIFY_MIG(kr);
1616 //(void)dispatch_assume_zero(kr);
1618 return KERN_SUCCESS;
1622 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
1623 mach_port_name_t name)
1626 _dispatch_log("machport[0x%08x]: send-possible notification: %s",
1629 _dispatch_debug_machport(name);
1630 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE,
1631 _DISPATCH_MACH_SP_FLAGS, false);
1633 return KERN_SUCCESS;
1637 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
1638 dispatch_mig_callback_t callback)
1640 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
1641 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
1642 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
1643 mach_msg_options_t tmp_options;
1644 mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
1645 mach_msg_return_t kr = 0;
1646 unsigned int cnt = 1000; // do not stall out serial queues
1648 bool received = false;
1649 size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
1651 // XXX FIXME -- allocate these elsewhere
1652 bufRequest = alloca(rcv_size);
1653 bufReply = alloca(rcv_size);
1654 bufReply->Head.msgh_size = 0; // make CLANG happy
1655 bufRequest->RetCode = 0;
1658 options |= MACH_RCV_LARGE; // rdar://problem/8422992
1660 tmp_options = options;
1661 // XXX FIXME -- change this to not starve out the target queue
1663 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
1664 options &= ~MACH_RCV_MSG;
1665 tmp_options &= ~MACH_RCV_MSG;
1667 if (!(tmp_options & MACH_SEND_MSG)) {
1671 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
1672 (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
1674 tmp_options = options;
1678 case MACH_SEND_INVALID_DEST:
1679 case MACH_SEND_TIMED_OUT:
1680 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
1681 mach_msg_destroy(&bufReply->Head);
1684 case MACH_RCV_TIMED_OUT:
1685 // Don't return an error if a message was sent this time or
1686 // a message was successfully received previously
1687 // rdar://problems/7363620&7791738
1688 if(bufReply->Head.msgh_remote_port || received) {
1689 kr = MACH_MSG_SUCCESS;
1692 case MACH_RCV_INVALID_NAME:
1695 case MACH_RCV_TOO_LARGE:
1696 // receive messages that are too large and log their id and size
1697 // rdar://problem/8422992
1698 tmp_options &= ~MACH_RCV_LARGE;
1699 size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
1700 void *large_buf = malloc(large_size);
1702 rcv_size = large_size;
1703 bufReply = large_buf;
1705 if (!mach_msg(&bufReply->Head, tmp_options, 0,
1706 (mach_msg_size_t)rcv_size,
1707 (mach_port_t)ds->ds_ident_hack, 0, 0)) {
1708 _dispatch_log("BUG in libdispatch client: "
1709 "dispatch_mig_server received message larger than "
1710 "requested size %zd: id = 0x%x, size = %d",
1711 maxmsgsz, bufReply->Head.msgh_id,
1712 bufReply->Head.msgh_size);
1720 _dispatch_bug_mach_client(
1721 "dispatch_mig_server: mach_msg() failed", kr);
1727 if (!(tmp_options & MACH_RCV_MSG)) {
1732 bufTemp = bufRequest;
1733 bufRequest = bufReply;
1736 demux_success = callback(&bufRequest->Head, &bufReply->Head);
1738 if (!demux_success) {
1739 // destroy the request - but not the reply port
1740 bufRequest->Head.msgh_remote_port = 0;
1741 mach_msg_destroy(&bufRequest->Head);
1742 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
1743 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
1745 if (slowpath(bufReply->RetCode)) {
1746 if (bufReply->RetCode == MIG_NO_REPLY) {
1750 // destroy the request - but not the reply port
1751 bufRequest->Head.msgh_remote_port = 0;
1752 mach_msg_destroy(&bufRequest->Head);
1756 if (bufReply->Head.msgh_remote_port) {
1757 tmp_options |= MACH_SEND_MSG;
1758 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
1759 MACH_MSG_TYPE_MOVE_SEND_ONCE) {
1760 tmp_options |= MACH_SEND_TIMEOUT;
1768 #endif /* HAVE_MACH */
1771 #pragma mark dispatch_source_debug
1775 _evfiltstr(short filt)
1778 #define _evfilt2(f) case (f): return #f
1779 _evfilt2(EVFILT_READ);
1780 _evfilt2(EVFILT_WRITE);
1781 _evfilt2(EVFILT_AIO);
1782 _evfilt2(EVFILT_VNODE);
1783 _evfilt2(EVFILT_PROC);
1784 _evfilt2(EVFILT_SIGNAL);
1785 _evfilt2(EVFILT_TIMER);
1787 _evfilt2(EVFILT_VM);
1790 _evfilt2(EVFILT_MACHPORT);
1792 _evfilt2(EVFILT_FS);
1793 _evfilt2(EVFILT_USER);
1795 _evfilt2(DISPATCH_EVFILT_TIMER);
1796 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
1797 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
1799 return "EVFILT_missing";
1804 _dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
1806 dispatch_queue_t target = ds->do_targetq;
1807 return snprintf(buf, bufsiz, "target = %s[%p], pending_data = 0x%lx, "
1808 "pending_data_mask = 0x%lx, ",
1809 target ? target->dq_label : "", target,
1810 ds->ds_pending_data, ds->ds_pending_data_mask);
1814 _dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
1816 dispatch_source_refs_t dr = ds->ds_refs;
1817 return snprintf(buf, bufsiz, "timer = { target = 0x%llx, "
1818 "last_fire = 0x%llx, interval = 0x%llx, flags = 0x%llx }, ",
1819 ds_timer(dr).target, ds_timer(dr).last_fire, ds_timer(dr).interval,
1820 ds_timer(dr).flags);
1824 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
1827 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
1829 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
1830 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
1831 if (ds->ds_is_timer) {
1832 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
1834 offset += snprintf(&buf[offset], bufsiz - offset, "filter = %s }",
1835 ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
1841 dispatch_debug_kevents(struct kevent* kev, size_t count, const char* str)
1844 for (i = 0; i < count; ++i) {
1845 _dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, "
1846 "fflags = 0x%x, data = %p, udata = %p }: %s",
1847 i, (void*)kev[i].ident, _evfiltstr(kev[i].filter), kev[i].flags,
1848 kev[i].fflags, (void*)kev[i].data, (void*)kev[i].udata, str);
1853 _dispatch_kevent_debugger2(void *context)
1856 socklen_t sa_len = sizeof(sa);
1857 int c, fd = (int)(long)context;
1859 dispatch_kevent_t dk;
1860 dispatch_source_t ds;
1861 dispatch_source_refs_t dr;
1864 c = accept(fd, &sa, &sa_len);
1866 if (errno != EAGAIN) {
1867 (void)dispatch_assume_zero(errno);
1872 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
1874 (void)dispatch_assume_zero(errno);
1877 debug_stream = fdopen(c, "a");
1878 if (!dispatch_assume(debug_stream)) {
1883 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
1884 fprintf(debug_stream, "Content-type: text/html\r\n");
1885 fprintf(debug_stream, "Pragma: nocache\r\n");
1886 fprintf(debug_stream, "\r\n");
1887 fprintf(debug_stream, "<html>\n");
1888 fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid());
1889 fprintf(debug_stream, "<body>\n<ul>\n");
1891 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
1892 // "<td>DK</td><td>DK</td><td>DK</td></tr>\n");
1894 for (i = 0; i < DSL_HASH_SIZE; i++) {
1895 if (TAILQ_EMPTY(&_dispatch_sources[i])) {
1898 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
1899 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags "
1900 "0x%hx fflags 0x%x data 0x%lx udata %p\n",
1901 dk, (unsigned long)dk->dk_kevent.ident,
1902 _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
1903 dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data,
1904 dk->dk_kevent.udata);
1905 fprintf(debug_stream, "\t\t<ul>\n");
1906 TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) {
1907 ds = _dispatch_source_from_refs(dr);
1908 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend "
1909 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
1910 ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt,
1911 ds->ds_pending_data, ds->ds_pending_data_mask,
1912 ds->ds_atomic_flags);
1913 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
1914 dispatch_queue_t dq = ds->do_targetq;
1915 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend "
1916 "0x%x label: %s\n", dq, dq->do_ref_cnt + 1,
1917 dq->do_suspend_cnt, dq->dq_label);
1920 fprintf(debug_stream, "\t\t</ul>\n");
1921 fprintf(debug_stream, "\t</li>\n");
1924 fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
1925 fflush(debug_stream);
1926 fclose(debug_stream);
1930 _dispatch_kevent_debugger2_cancel(void *context)
1932 int ret, fd = (int)(long)context;
1936 (void)dispatch_assume_zero(errno);
1941 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED)
1944 struct sockaddr_in sa_in;
1948 .sin_family = AF_INET,
1949 .sin_addr = { htonl(INADDR_LOOPBACK), },
1952 dispatch_source_t ds;
1954 int val, r, fd, sock_opt = 1;
1955 socklen_t slen = sizeof(sa_u);
1960 valstr = getenv("LIBDISPATCH_DEBUGGER");
1966 sa_u.sa_in.sin_addr.s_addr = 0;
1968 fd = socket(PF_INET, SOCK_STREAM, 0);
1970 (void)dispatch_assume_zero(errno);
1973 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt,
1974 (socklen_t) sizeof sock_opt);
1976 (void)dispatch_assume_zero(errno);
1980 r = fcntl(fd, F_SETFL, O_NONBLOCK);
1982 (void)dispatch_assume_zero(errno);
1986 r = bind(fd, &sa_u.sa, sizeof(sa_u));
1988 (void)dispatch_assume_zero(errno);
1991 r = listen(fd, SOMAXCONN);
1993 (void)dispatch_assume_zero(errno);
1996 r = getsockname(fd, &sa_u.sa, &slen);
1998 (void)dispatch_assume_zero(errno);
2002 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0,
2004 if (dispatch_assume(ds)) {
2005 _dispatch_log("LIBDISPATCH: debug port: %hu",
2006 (in_port_t)ntohs(sa_u.sa_in.sin_port));
2008 /* ownership of fd transfers to ds */
2009 dispatch_set_context(ds, (void *)(long)fd);
2010 dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2);
2011 dispatch_source_set_cancel_handler_f(ds,
2012 _dispatch_kevent_debugger2_cancel);
2013 dispatch_resume(ds);
2023 #ifndef MACH_PORT_TYPE_SPREQUEST
2024 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
2028 dispatch_debug_machport(mach_port_t name, const char* str)
2030 mach_port_type_t type;
2031 mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
2032 unsigned int dnreqs = 0, dnrsiz;
2033 kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
2035 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
2036 kr, mach_error_string(kr), str);
2039 if (type & MACH_PORT_TYPE_SEND) {
2040 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2041 MACH_PORT_RIGHT_SEND, &ns));
2043 if (type & MACH_PORT_TYPE_SEND_ONCE) {
2044 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2045 MACH_PORT_RIGHT_SEND_ONCE, &nso));
2047 if (type & MACH_PORT_TYPE_DEAD_NAME) {
2048 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2049 MACH_PORT_RIGHT_DEAD_NAME, &nd));
2051 if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND|
2052 MACH_PORT_TYPE_SEND_ONCE)) {
2053 (void)dispatch_assume_zero(mach_port_dnrequest_info(mach_task_self(),
2054 name, &dnrsiz, &dnreqs));
2056 if (type & MACH_PORT_TYPE_RECEIVE) {
2057 mach_port_status_t status = { .mps_pset = 0, };
2058 mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
2059 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2060 MACH_PORT_RIGHT_RECEIVE, &nr));
2061 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
2062 name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
2063 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
2064 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
2065 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
2066 "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
2067 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
2068 status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
2069 status.mps_srights ? "Y":"N", status.mps_sorights,
2070 status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
2071 status.mps_seqno, str);
2072 } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
2073 MACH_PORT_TYPE_DEAD_NAME)) {
2074 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
2075 "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
2076 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
2078 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
2085 #endif // DISPATCH_DEBUG