2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
22 #include "kevent_internal.h"
25 #include "protocolServer.h"
27 #include <sys/mount.h>
29 #define DISPATCH_EVFILT_TIMER (-EVFILT_SYSCOUNT - 1)
30 #define DISPATCH_EVFILT_CUSTOM_ADD (-EVFILT_SYSCOUNT - 2)
31 #define DISPATCH_EVFILT_CUSTOM_OR (-EVFILT_SYSCOUNT - 3)
32 #define DISPATCH_EVFILT_SYSCOUNT (EVFILT_SYSCOUNT + 3)
34 static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
37 .ident = DISPATCH_TIMER_INDEX_WALL,
38 .filter = DISPATCH_EVFILT_TIMER,
39 .udata = &_dispatch_kevent_timer[0],
41 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[0].dk_sources),
45 .ident = DISPATCH_TIMER_INDEX_MACH,
46 .filter = DISPATCH_EVFILT_TIMER,
47 .udata = &_dispatch_kevent_timer[1],
49 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[1].dk_sources),
52 #define DISPATCH_TIMER_COUNT (sizeof _dispatch_kevent_timer / sizeof _dispatch_kevent_timer[0])
54 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
56 .filter = DISPATCH_EVFILT_CUSTOM_OR,
58 .udata = &_dispatch_kevent_data_or,
60 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
62 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
64 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
65 .udata = &_dispatch_kevent_data_add,
67 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
70 static void _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke);
71 static void _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
73 static void _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
74 static void _dispatch_kevent_machport_enable(dispatch_kevent_t dk);
75 static void _dispatch_kevent_machport_disable(dispatch_kevent_t dk);
77 static void _dispatch_drain_mach_messages(struct kevent *ke);
81 _dispatch_mach_notify_source_init(void *context __attribute__((unused)));
85 _evfiltstr(short filt)
88 #define _evfilt2(f) case (f): return #f
89 _evfilt2(EVFILT_READ);
90 _evfilt2(EVFILT_WRITE);
92 _evfilt2(EVFILT_VNODE);
93 _evfilt2(EVFILT_PROC);
94 _evfilt2(EVFILT_SIGNAL);
95 _evfilt2(EVFILT_TIMER);
97 _evfilt2(EVFILT_MACHPORT);
100 _evfilt2(EVFILT_USER);
101 #if HAVE_DECL_EVFILT_SESSION
102 _evfilt2(EVFILT_SESSION);
104 #if HAVE_DECL_EVFILT_LIO
105 _evfilt2(EVFILT_LIO);
108 _evfilt2(DISPATCH_EVFILT_TIMER);
109 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
110 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
112 return "EVFILT_missing";
116 #define DSL_HASH_SIZE 256u // must be a power of two
117 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
119 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
121 static inline uintptr_t
122 _dispatch_kevent_hash(uintptr_t ident, short filter)
126 value = (filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
130 return DSL_HASH(value);
133 static dispatch_kevent_t
134 _dispatch_kevent_find(uintptr_t ident, short filter)
136 uintptr_t hash = _dispatch_kevent_hash(ident, filter);
137 dispatch_kevent_t dki;
139 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
140 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
148 _dispatch_kevent_insert(dispatch_kevent_t dk)
150 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
151 dk->dk_kevent.filter);
153 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
158 dispatch_debug_kevents(struct kevent* kev, size_t count, const char* str)
161 for (i = 0; i < count; ++i) {
162 _dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, fflags = 0x%x, data = %p, udata = %p }: %s",
163 i, (void*)kev[i].ident, _evfiltstr(kev[i].filter), kev[i].flags, kev[i].fflags, (void*)kev[i].data, (void*)kev[i].udata, str);
169 _dispatch_source_kevent_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
171 size_t offset = _dispatch_source_debug(ds, buf, bufsiz);
172 offset += snprintf(&buf[offset], bufsiz - offset, "filter = %s }",
173 ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
178 _dispatch_source_init_tail_queue_array(void *context __attribute__((unused)))
181 for (i = 0; i < DSL_HASH_SIZE; i++) {
182 TAILQ_INIT(&_dispatch_sources[i]);
185 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
186 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);
187 TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_or, dk_list);
188 TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_add, dk_list);
191 // Find existing kevents, and merge any new flags if necessary
193 _dispatch_kevent_merge(dispatch_source_t ds)
195 static dispatch_once_t pred;
196 dispatch_kevent_t dk;
197 typeof(dk->dk_kevent.fflags) new_flags;
198 bool do_resume = false;
200 if (ds->ds_is_installed) {
203 ds->ds_is_installed = true;
205 dispatch_once_f(&pred, NULL, _dispatch_source_init_tail_queue_array);
207 dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident, ds->ds_dkev->dk_kevent.filter);
210 // If an existing dispatch kevent is found, check to see if new flags
211 // need to be added to the existing kevent
212 new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
213 dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
216 do_resume = new_flags;
219 _dispatch_kevent_insert(dk);
220 new_flags = dk->dk_kevent.fflags;
224 TAILQ_INSERT_TAIL(&dk->dk_sources, ds, ds_list);
226 // Re-register the kevent with the kernel if new flags were added
227 // by the dispatch kevent
229 dk->dk_kevent.flags |= EV_ADD;
230 _dispatch_source_kevent_resume(ds, new_flags, 0);
231 ds->ds_is_armed = true;
237 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
239 switch (dk->dk_kevent.filter) {
240 case DISPATCH_EVFILT_TIMER:
241 case DISPATCH_EVFILT_CUSTOM_ADD:
242 case DISPATCH_EVFILT_CUSTOM_OR:
243 // these types not registered with kevent
246 case EVFILT_MACHPORT:
247 _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
251 if (dk->dk_kevent.flags & EV_ONESHOT) {
256 _dispatch_update_kq(&dk->dk_kevent);
257 if (dk->dk_kevent.flags & EV_DISPATCH) {
258 dk->dk_kevent.flags &= ~EV_ADD;
265 _dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags, uint32_t del_flags)
267 _dispatch_kevent_resume(ds->ds_dkev, new_flags, del_flags);
270 #ifndef DISPATCH_NO_LEGACY
272 _dispatch_kevent_debugger2(void *context, dispatch_source_t unused __attribute__((unused)))
275 socklen_t sa_len = sizeof(sa);
276 int c, fd = (int)(long)context;
278 dispatch_kevent_t dk;
279 dispatch_source_t ds;
282 c = accept(fd, &sa, &sa_len);
284 if (errno != EAGAIN) {
285 (void)dispatch_assume_zero(errno);
290 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
292 (void)dispatch_assume_zero(errno);
295 debug_stream = fdopen(c, "a");
296 if (!dispatch_assume(debug_stream)) {
301 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
302 fprintf(debug_stream, "Content-type: text/html\r\n");
303 fprintf(debug_stream, "Pragma: nocache\r\n");
304 fprintf(debug_stream, "\r\n");
305 fprintf(debug_stream, "<html>\n<head><title>PID %u</title></head>\n<body>\n<ul>\n", getpid());
307 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td></tr>\n");
309 for (i = 0; i < DSL_HASH_SIZE; i++) {
310 if (TAILQ_EMPTY(&_dispatch_sources[i])) {
313 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
314 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags 0x%hx fflags 0x%x data 0x%lx udata %p\n",
315 dk, (unsigned long)dk->dk_kevent.ident, _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
316 dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data, dk->dk_kevent.udata);
317 fprintf(debug_stream, "\t\t<ul>\n");
318 TAILQ_FOREACH(ds, &dk->dk_sources, ds_list) {
319 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend 0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
320 ds, ds->do_ref_cnt, ds->do_suspend_cnt, ds->ds_pending_data, ds->ds_pending_data_mask,
321 ds->ds_atomic_flags);
322 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
323 dispatch_queue_t dq = ds->do_targetq;
324 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend 0x%x label: %s\n", dq, dq->do_ref_cnt, dq->do_suspend_cnt, dq->dq_label);
327 fprintf(debug_stream, "\t\t</ul>\n");
328 fprintf(debug_stream, "\t</li>\n");
331 fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
332 fflush(debug_stream);
333 fclose(debug_stream);
337 _dispatch_kevent_debugger(void *context __attribute__((unused)))
340 struct sockaddr_in sa_in;
344 .sin_family = AF_INET,
345 .sin_addr = { htonl(INADDR_LOOPBACK), },
348 dispatch_source_t ds;
350 int val, r, fd, sock_opt = 1;
351 socklen_t slen = sizeof(sa_u);
356 valstr = getenv("LIBDISPATCH_DEBUGGER");
362 sa_u.sa_in.sin_addr.s_addr = 0;
364 fd = socket(PF_INET, SOCK_STREAM, 0);
366 (void)dispatch_assume_zero(errno);
369 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt, (socklen_t) sizeof sock_opt);
371 (void)dispatch_assume_zero(errno);
375 r = fcntl(fd, F_SETFL, O_NONBLOCK);
377 (void)dispatch_assume_zero(errno);
381 r = bind(fd, &sa_u.sa, sizeof(sa_u));
383 (void)dispatch_assume_zero(errno);
386 r = listen(fd, SOMAXCONN);
388 (void)dispatch_assume_zero(errno);
391 r = getsockname(fd, &sa_u.sa, &slen);
393 (void)dispatch_assume_zero(errno);
396 ds = dispatch_source_read_create_f(fd, NULL, &_dispatch_mgr_q, (void *)(long)fd, _dispatch_kevent_debugger2);
397 if (dispatch_assume(ds)) {
398 _dispatch_log("LIBDISPATCH: debug port: %hu", ntohs(sa_u.sa_in.sin_port));
404 #endif /* DISPATCH_NO_LEGACY */
407 _dispatch_source_drain_kevent(struct kevent *ke)
409 #ifndef DISPATCH_NO_LEGACY
410 static dispatch_once_t pred;
412 dispatch_kevent_t dk = ke->udata;
413 dispatch_source_t dsi;
415 #ifndef DISPATCH_NO_LEGACY
416 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
419 dispatch_debug_kevents(ke, 1, __func__);
422 if (ke->filter == EVFILT_MACHPORT) {
423 return _dispatch_drain_mach_messages(ke);
428 if (ke->flags & EV_ONESHOT) {
429 dk->dk_kevent.flags |= EV_ONESHOT;
432 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
433 _dispatch_source_merge_kevent(dsi, ke);
438 _dispatch_kevent_dispose(dispatch_kevent_t dk)
442 switch (dk->dk_kevent.filter) {
443 case DISPATCH_EVFILT_TIMER:
444 case DISPATCH_EVFILT_CUSTOM_ADD:
445 case DISPATCH_EVFILT_CUSTOM_OR:
446 // these sources live on statically allocated lists
449 case EVFILT_MACHPORT:
450 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
454 if (dk->dk_kevent.flags & EV_ONESHOT) {
455 break; // implicitly deleted
459 if (~dk->dk_kevent.flags & EV_DELETE) {
460 dk->dk_kevent.flags |= EV_DELETE;
461 _dispatch_update_kq(&dk->dk_kevent);
466 hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
467 dk->dk_kevent.filter);
468 TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
473 _dispatch_kevent_release(dispatch_source_t ds)
475 dispatch_kevent_t dk = ds->ds_dkev;
476 dispatch_source_t dsi;
477 uint32_t del_flags, fflags = 0;
481 TAILQ_REMOVE(&dk->dk_sources, ds, ds_list);
483 if (TAILQ_EMPTY(&dk->dk_sources)) {
484 _dispatch_kevent_dispose(dk);
486 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
487 fflags |= (uint32_t)dsi->ds_pending_data_mask;
489 del_flags = (uint32_t)ds->ds_pending_data_mask & ~fflags;
491 dk->dk_kevent.flags |= EV_ADD;
492 dk->dk_kevent.fflags = fflags;
493 _dispatch_kevent_resume(dk, 0, del_flags);
497 ds->ds_is_armed = false;
498 ds->ds_needs_rearm = false; // re-arm is pointless and bad now
499 _dispatch_release(ds); // the retain is done at creation time
503 _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
507 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
511 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie.
512 // We simulate an exit event in this case. <rdar://problem/5067725>
513 if (ke->flags & EV_ERROR) {
514 if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
516 fake.flags &= ~EV_ERROR;
517 fake.fflags = NOTE_EXIT;
521 // log the unexpected error
522 (void)dispatch_assume_zero(ke->data);
527 if (ds->ds_is_level) {
528 // ke->data is signed and "negative available data" makes no sense
529 // zero bytes happens when EV_EOF is set
530 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
531 dispatch_assert(ke->data >= 0l);
532 ds->ds_pending_data = ~ke->data;
533 } else if (ds->ds_is_adder) {
534 dispatch_atomic_add(&ds->ds_pending_data, ke->data);
536 dispatch_atomic_or(&ds->ds_pending_data, ke->fflags & ds->ds_pending_data_mask);
539 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
540 if (ds->ds_needs_rearm) {
541 ds->ds_is_armed = false;
544 _dispatch_wakeup(ds);
547 const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable = {
548 .do_type = DISPATCH_SOURCE_KEVENT_TYPE,
549 .do_kind = "kevent-source",
550 .do_invoke = _dispatch_source_invoke,
551 .do_dispose = _dispatch_source_dispose,
552 .do_probe = _dispatch_source_probe,
553 .do_debug = _dispatch_source_kevent_debug,
557 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
559 struct kevent kev = {
560 .fflags = (typeof(kev.fflags))val,
564 dispatch_assert(ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
565 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
567 _dispatch_source_merge_kevent(ds, &kev);
571 dispatch_source_type_kevent_init(dispatch_source_t ds, dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t q)
573 const struct kevent *proto_kev = type->opaque;
574 dispatch_kevent_t dk = NULL;
576 switch (proto_kev->filter) {
578 if (handle >= NSIG) {
583 case DISPATCH_EVFILT_CUSTOM_ADD:
584 case DISPATCH_EVFILT_CUSTOM_OR:
585 case DISPATCH_EVFILT_TIMER:
594 dk = calloc(1ul, sizeof(struct dispatch_kevent_s));
599 dk->dk_kevent = *proto_kev;
600 dk->dk_kevent.ident = handle;
601 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
602 dk->dk_kevent.fflags |= (uint32_t)mask;
603 dk->dk_kevent.udata = dk;
604 TAILQ_INIT(&dk->dk_sources);
607 ds->ds_ident_hack = dk->dk_kevent.ident;
609 ds->ds_pending_data_mask = dk->dk_kevent.fflags;
610 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
611 ds->ds_is_level = true;
612 ds->ds_needs_rearm = true;
613 } else if (!(EV_CLEAR & proto_kev->flags)) {
614 // we cheat and use EV_CLEAR to mean a "flag thingy"
615 ds->ds_is_adder = true;
621 dispatch_source_type_timer_init(dispatch_source_t ds, dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t q)
623 if (!dispatch_source_type_kevent_init(ds, type, handle, mask, q)) {
626 ds->ds_needs_rearm = true;
627 ds->ds_timer.flags = mask;
631 static const struct kevent _dispatch_source_type_timer_ke = {
632 .filter = DISPATCH_EVFILT_TIMER,
635 const struct dispatch_source_type_s _dispatch_source_type_timer = {
636 .opaque = (void *)&_dispatch_source_type_timer_ke,
637 .mask = DISPATCH_TIMER_INTERVAL|DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK,
638 .init = dispatch_source_type_timer_init,
641 static const struct kevent _dispatch_source_type_read_ke = {
642 .filter = EVFILT_READ,
643 .flags = EV_DISPATCH,
646 const struct dispatch_source_type_s _dispatch_source_type_read = {
647 .opaque = (void *)&_dispatch_source_type_read_ke,
648 .init = dispatch_source_type_kevent_init,
651 static const struct kevent _dispatch_source_type_write_ke = {
652 .filter = EVFILT_WRITE,
653 .flags = EV_DISPATCH,
656 const struct dispatch_source_type_s _dispatch_source_type_write = {
657 .opaque = (void *)&_dispatch_source_type_write_ke,
658 .init = dispatch_source_type_kevent_init,
661 static const struct kevent _dispatch_source_type_proc_ke = {
662 .filter = EVFILT_PROC,
666 const struct dispatch_source_type_s _dispatch_source_type_proc = {
667 .opaque = (void *)&_dispatch_source_type_proc_ke,
668 .mask = NOTE_EXIT|NOTE_FORK|NOTE_EXEC
669 #if HAVE_DECL_NOTE_SIGNAL
672 #if HAVE_DECL_NOTE_REAP
676 .init = dispatch_source_type_kevent_init,
679 static const struct kevent _dispatch_source_type_signal_ke = {
680 .filter = EVFILT_SIGNAL,
683 const struct dispatch_source_type_s _dispatch_source_type_signal = {
684 .opaque = (void *)&_dispatch_source_type_signal_ke,
685 .init = dispatch_source_type_kevent_init,
688 static const struct kevent _dispatch_source_type_vnode_ke = {
689 .filter = EVFILT_VNODE,
693 const struct dispatch_source_type_s _dispatch_source_type_vnode = {
694 .opaque = (void *)&_dispatch_source_type_vnode_ke,
695 .mask = NOTE_DELETE|NOTE_WRITE|NOTE_EXTEND|NOTE_ATTRIB|NOTE_LINK|
697 #if HAVE_DECL_NOTE_REVOKE
700 #if HAVE_DECL_NOTE_NONE
704 .init = dispatch_source_type_kevent_init,
707 static const struct kevent _dispatch_source_type_vfs_ke = {
712 const struct dispatch_source_type_s _dispatch_source_type_vfs = {
713 .opaque = (void *)&_dispatch_source_type_vfs_ke,
714 .mask = VQ_NOTRESP|VQ_NEEDAUTH|VQ_LOWDISK|VQ_MOUNT|VQ_UNMOUNT|VQ_DEAD|
715 VQ_ASSIST|VQ_NOTRESPLOCK
716 #if HAVE_DECL_VQ_UPDATE
719 #if HAVE_DECL_VQ_VERYLOWDISK
723 .init = dispatch_source_type_kevent_init,
729 dispatch_source_type_mach_send_init(dispatch_source_t ds, dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t q)
731 static dispatch_once_t pred;
733 if (!dispatch_source_type_kevent_init(ds, type, handle, mask, q)) {
736 ds->ds_is_level = false;
737 dispatch_once_f(&pred, NULL, _dispatch_mach_notify_source_init);
741 static const struct kevent _dispatch_source_type_mach_send_ke = {
742 .filter = EVFILT_MACHPORT,
743 .flags = EV_DISPATCH,
744 .fflags = DISPATCH_MACHPORT_DEAD,
747 const struct dispatch_source_type_s _dispatch_source_type_mach_send = {
748 .opaque = (void *)&_dispatch_source_type_mach_send_ke,
749 .mask = DISPATCH_MACH_SEND_DEAD,
750 .init = dispatch_source_type_mach_send_init,
754 dispatch_source_type_mach_recv_init(dispatch_source_t ds, dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t q)
756 if (!dispatch_source_type_kevent_init(ds, type, handle, mask, q)) {
759 ds->ds_is_level = false;
763 static const struct kevent _dispatch_source_type_mach_recv_ke = {
764 .filter = EVFILT_MACHPORT,
765 .flags = EV_DISPATCH,
766 .fflags = DISPATCH_MACHPORT_RECV,
769 const struct dispatch_source_type_s _dispatch_source_type_mach_recv = {
770 .opaque = (void *)&_dispatch_source_type_mach_recv_ke,
771 .init = dispatch_source_type_mach_recv_init,
775 static const struct kevent _dispatch_source_type_data_add_ke = {
776 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
779 const struct dispatch_source_type_s _dispatch_source_type_data_add = {
780 .opaque = (void *)&_dispatch_source_type_data_add_ke,
781 .init = dispatch_source_type_kevent_init,
784 static const struct kevent _dispatch_source_type_data_or_ke = {
785 .filter = DISPATCH_EVFILT_CUSTOM_OR,
790 const struct dispatch_source_type_s _dispatch_source_type_data_or = {
791 .opaque = (void *)&_dispatch_source_type_data_or_ke,
792 .init = dispatch_source_type_kevent_init,
795 // Updates the ordered list of timers based on next fire date for changes to ds.
796 // Should only be called from the context of _dispatch_mgr_q.
798 _dispatch_timer_list_update(dispatch_source_t ds)
800 dispatch_source_t dsi = NULL;
803 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q);
805 // do not reschedule timers unregistered with _dispatch_kevent_release()
810 // Ensure the source is on the global kevent lists before it is removed and
812 _dispatch_kevent_merge(ds);
814 TAILQ_REMOVE(&ds->ds_dkev->dk_sources, ds, ds_list);
816 // change the list if the clock type has changed
817 if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
818 idx = DISPATCH_TIMER_INDEX_WALL;
820 idx = DISPATCH_TIMER_INDEX_MACH;
822 ds->ds_dkev = &_dispatch_kevent_timer[idx];
824 if (ds->ds_timer.target) {
825 TAILQ_FOREACH(dsi, &ds->ds_dkev->dk_sources, ds_list) {
826 if (dsi->ds_timer.target == 0 || ds->ds_timer.target < dsi->ds_timer.target) {
833 TAILQ_INSERT_BEFORE(dsi, ds, ds_list);
835 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds, ds_list);
840 _dispatch_run_timers2(unsigned int timer)
842 dispatch_source_t ds;
843 uint64_t now, missed;
845 if (timer == DISPATCH_TIMER_INDEX_MACH) {
846 now = _dispatch_absolute_time();
848 now = _dispatch_get_nanoseconds();
851 while ((ds = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources))) {
852 // We may find timers on the wrong list due to a pending update from
853 // dispatch_source_set_timer. Force an update of the list in that case.
854 if (timer != ds->ds_ident_hack) {
855 _dispatch_timer_list_update(ds);
858 if (!ds->ds_timer.target) {
859 // no configured timers on the list
862 if (ds->ds_timer.target > now) {
863 // Done running timers for now.
867 if (ds->ds_timer.flags & (DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE)) {
868 dispatch_atomic_inc(&ds->ds_pending_data);
869 ds->ds_timer.target = 0;
871 // Calculate number of missed intervals.
872 missed = (now - ds->ds_timer.target) / ds->ds_timer.interval;
873 dispatch_atomic_add(&ds->ds_pending_data, missed + 1);
874 ds->ds_timer.target += (missed + 1) * ds->ds_timer.interval;
877 _dispatch_timer_list_update(ds);
878 _dispatch_wakeup(ds);
883 _dispatch_run_timers(void)
886 for (i = 0; i < DISPATCH_TIMER_COUNT; i++) {
887 _dispatch_run_timers2(i);
891 // approx 1 year (60s * 60m * 24h * 365d)
892 #define FOREVER_SEC 3153600l
893 #define FOREVER_NSEC 31536000000000000ull
896 _dispatch_get_next_timer_fire(struct timespec *howsoon)
898 // <rdar://problem/6459649>
899 // kevent(2) does not allow large timeouts, so we use a long timeout
900 // instead (approximately 1 year).
901 dispatch_source_t ds = NULL;
903 uint64_t now, delta_tmp, delta = UINT64_MAX;
905 // We are looking for the first unsuspended timer which has its target
906 // time set. Given timers are kept in order, if we hit an timer that's
907 // unset there's no point in continuing down the list.
908 for (timer = 0; timer < DISPATCH_TIMER_COUNT; timer++) {
909 TAILQ_FOREACH(ds, &_dispatch_kevent_timer[timer].dk_sources, ds_list) {
910 if (!ds->ds_timer.target) {
913 if (DISPATCH_OBJECT_SUSPENDED(ds)) {
914 ds->ds_is_armed = false;
920 if (!ds || !ds->ds_timer.target) {
924 if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
925 now = _dispatch_get_nanoseconds();
927 now = _dispatch_absolute_time();
929 if (ds->ds_timer.target <= now) {
931 howsoon->tv_nsec = 0;
935 // the subtraction cannot go negative because the previous "if"
936 // verified that the target is greater than now.
937 delta_tmp = ds->ds_timer.target - now;
938 if (!(ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK)) {
939 delta_tmp = _dispatch_time_mach2nano(delta_tmp);
941 if (delta_tmp < delta) {
945 if (slowpath(delta > FOREVER_NSEC)) {
948 howsoon->tv_sec = (time_t)(delta / NSEC_PER_SEC);
949 howsoon->tv_nsec = (long)(delta % NSEC_PER_SEC);
955 static dispatch_source_t _dispatch_mach_notify_source;
956 static mach_port_t _dispatch_port_set;
957 static mach_port_t _dispatch_event_port;
959 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
960 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
962 #define _DISPATCH_MACHPORT_HASH_SIZE 32
963 #define _DISPATCH_MACHPORT_HASH(x) _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
965 static void _dispatch_port_set_init(void *);
966 static mach_port_t _dispatch_get_port_set(void);
969 _dispatch_drain_mach_messages(struct kevent *ke)
971 dispatch_source_t dsi;
972 dispatch_kevent_t dk;
975 if (!dispatch_assume(ke->data)) {
978 dk = _dispatch_kevent_find(ke->data, EVFILT_MACHPORT);
979 if (!dispatch_assume(dk)) {
982 _dispatch_kevent_machport_disable(dk); // emulate EV_DISPATCH
984 EV_SET(&ke2, ke->data, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH, DISPATCH_MACHPORT_RECV, 0, dk);
986 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
987 _dispatch_source_merge_kevent(dsi, &ke2);
992 _dispatch_port_set_init(void *context __attribute__((unused)))
994 struct kevent kev = {
995 .filter = EVFILT_MACHPORT,
1000 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &_dispatch_port_set);
1001 DISPATCH_VERIFY_MIG(kr);
1002 (void)dispatch_assume_zero(kr);
1003 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &_dispatch_event_port);
1004 DISPATCH_VERIFY_MIG(kr);
1005 (void)dispatch_assume_zero(kr);
1006 kr = mach_port_move_member(mach_task_self(), _dispatch_event_port, _dispatch_port_set);
1007 DISPATCH_VERIFY_MIG(kr);
1008 (void)dispatch_assume_zero(kr);
1010 kev.ident = _dispatch_port_set;
1012 _dispatch_update_kq(&kev);
1016 _dispatch_get_port_set(void)
1018 static dispatch_once_t pred;
1020 dispatch_once_f(&pred, NULL, _dispatch_port_set_init);
1022 return _dispatch_port_set;
1026 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
1028 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
1031 if ((new_flags & DISPATCH_MACHPORT_RECV) || (!new_flags && !del_flags && dk->dk_kevent.fflags & DISPATCH_MACHPORT_RECV)) {
1032 _dispatch_kevent_machport_enable(dk);
1034 if (new_flags & DISPATCH_MACHPORT_DEAD) {
1035 kr = mach_port_request_notification(mach_task_self(), port, MACH_NOTIFY_DEAD_NAME, 1,
1036 _dispatch_event_port, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
1037 DISPATCH_VERIFY_MIG(kr);
1041 case KERN_INVALID_NAME:
1042 case KERN_INVALID_RIGHT:
1046 // Else, we dont expect any errors from mach. Log any errors if we do
1047 if (dispatch_assume_zero(kr)) {
1049 } else if (dispatch_assume_zero(previous)) {
1050 // Another subsystem has beat libdispatch to requesting the Mach
1051 // dead-name notification on this port. We should technically cache the
1052 // previous port and message it when the kernel messages our port. Or
1053 // we can just say screw those subsystems and drop the previous port.
1054 // They should adopt libdispatch :-P
1055 kr = mach_port_deallocate(mach_task_self(), previous);
1056 DISPATCH_VERIFY_MIG(kr);
1057 (void)dispatch_assume_zero(kr);
1062 if (del_flags & DISPATCH_MACHPORT_RECV) {
1063 _dispatch_kevent_machport_disable(dk);
1065 if (del_flags & DISPATCH_MACHPORT_DEAD) {
1066 kr = mach_port_request_notification(mach_task_self(), (mach_port_t)dk->dk_kevent.ident,
1067 MACH_NOTIFY_DEAD_NAME, 1, MACH_PORT_NULL, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
1068 DISPATCH_VERIFY_MIG(kr);
1071 case KERN_INVALID_NAME:
1072 case KERN_INVALID_RIGHT:
1073 case KERN_INVALID_ARGUMENT:
1076 if (dispatch_assume_zero(kr)) {
1078 } else if (previous) {
1079 // the kernel has not consumed the right yet
1080 (void)dispatch_assume_zero(_dispatch_send_consume_send_once_right(previous));
1087 _dispatch_kevent_machport_enable(dispatch_kevent_t dk)
1089 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1092 kr = mach_port_move_member(mach_task_self(), mp, _dispatch_get_port_set());
1093 DISPATCH_VERIFY_MIG(kr);
1095 case KERN_INVALID_NAME:
1097 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
1101 (void)dispatch_assume_zero(kr);
1106 _dispatch_kevent_machport_disable(dispatch_kevent_t dk)
1108 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1111 kr = mach_port_move_member(mach_task_self(), mp, 0);
1112 DISPATCH_VERIFY_MIG(kr);
1114 case KERN_INVALID_RIGHT:
1115 case KERN_INVALID_NAME:
1117 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
1123 (void)dispatch_assume_zero(kr);
1128 #define _DISPATCH_MIN_MSG_SZ (8ul * 1024ul - MAX_TRAILER_SIZE)
1129 #ifndef DISPATCH_NO_LEGACY
1131 dispatch_source_mig_create(mach_port_t mport, size_t max_msg_size, dispatch_source_attr_t attr,
1132 dispatch_queue_t dq, dispatch_mig_callback_t mig_callback)
1134 if (max_msg_size < _DISPATCH_MIN_MSG_SZ) {
1135 max_msg_size = _DISPATCH_MIN_MSG_SZ;
1137 return dispatch_source_machport_create(mport, DISPATCH_MACHPORT_RECV, attr, dq,
1138 ^(dispatch_source_t ds) {
1139 if (!dispatch_source_get_error(ds, NULL)) {
1140 if (dq->dq_width != 1) {
1141 dispatch_retain(ds); // this is a shim -- use the external retain
1142 dispatch_async(dq, ^{
1143 dispatch_mig_server(ds, max_msg_size, mig_callback);
1144 dispatch_release(ds); // this is a shim -- use the external release
1147 dispatch_mig_server(ds, max_msg_size, mig_callback);
1152 #endif /* DISPATCH_NO_LEGACY */
1155 _dispatch_mach_notify_source2(void *context)
1157 dispatch_source_t ds = context;
1158 const size_t maxsz = MAX(
1159 sizeof(union __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem),
1160 sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
1162 dispatch_mig_server(ds, maxsz, libdispatch_internal_protocol_server);
1166 _dispatch_mach_notify_source_init(void *context __attribute__((unused)))
1168 _dispatch_get_port_set();
1170 _dispatch_mach_notify_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_MACH_RECV, _dispatch_event_port, 0, &_dispatch_mgr_q);
1171 dispatch_assert(_dispatch_mach_notify_source);
1172 dispatch_set_context(_dispatch_mach_notify_source, _dispatch_mach_notify_source);
1173 dispatch_source_set_event_handler_f(_dispatch_mach_notify_source, _dispatch_mach_notify_source2);
1174 dispatch_resume(_dispatch_mach_notify_source);
1178 _dispatch_mach_notify_port_deleted(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
1180 dispatch_source_t dsi;
1181 dispatch_kevent_t dk;
1185 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x deleted prematurely", name);
1188 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1193 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DELETED, 0, dk);
1195 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
1196 _dispatch_source_merge_kevent(dsi, &kev);
1197 // this can never happen again
1198 // this must happen after the merge
1199 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1200 dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DELETED;
1203 // no more sources have this flag
1204 dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DELETED;
1207 return KERN_SUCCESS;
1211 _dispatch_mach_notify_port_destroyed(mach_port_t notify __attribute__((unused)), mach_port_t name)
1214 // this function should never be called
1215 (void)dispatch_assume_zero(name);
1216 kr = mach_port_mod_refs(mach_task_self(), name, MACH_PORT_RIGHT_RECEIVE, -1);
1217 DISPATCH_VERIFY_MIG(kr);
1218 (void)dispatch_assume_zero(kr);
1219 return KERN_SUCCESS;
1223 _dispatch_mach_notify_no_senders(mach_port_t notify, mach_port_mscount_t mscnt __attribute__((unused)))
1225 // this function should never be called
1226 (void)dispatch_assume_zero(notify);
1227 return KERN_SUCCESS;
1231 _dispatch_mach_notify_send_once(mach_port_t notify __attribute__((unused)))
1233 // we only register for dead-name notifications
1234 // some code deallocated our send-once right without consuming it
1236 _dispatch_log("Corruption: An app/library deleted a libdispatch dead-name notification");
1238 return KERN_SUCCESS;
1242 _dispatch_mach_notify_dead_name(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
1244 dispatch_source_t dsi;
1245 dispatch_kevent_t dk;
1249 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1254 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DEAD, 0, dk);
1256 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
1257 _dispatch_source_merge_kevent(dsi, &kev);
1258 // this can never happen again
1259 // this must happen after the merge
1260 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1261 dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DEAD;
1264 // no more sources have this flag
1265 dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DEAD;
1268 // the act of receiving a dead name notification allocates a dead-name right that must be deallocated
1269 kr = mach_port_deallocate(mach_task_self(), name);
1270 DISPATCH_VERIFY_MIG(kr);
1271 //(void)dispatch_assume_zero(kr);
1273 return KERN_SUCCESS;
1277 _dispatch_wakeup_main_thread(mach_port_t mp __attribute__((unused)))
1279 // dummy function just to pop out the main thread out of mach_msg()
1284 _dispatch_consume_send_once_right(mach_port_t mp __attribute__((unused)))
1286 // dummy function to consume a send-once right
1291 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz, dispatch_mig_callback_t callback)
1293 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
1294 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
1295 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
1296 mach_msg_options_t tmp_options = options;
1297 mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
1298 mach_msg_return_t kr = 0;
1299 unsigned int cnt = 1000; // do not stall out serial queues
1302 maxmsgsz += MAX_TRAILER_SIZE;
1304 // XXX FIXME -- allocate these elsewhere
1305 bufRequest = alloca(maxmsgsz);
1306 bufReply = alloca(maxmsgsz);
1307 bufReply->Head.msgh_size = 0; // make CLANG happy
1309 // XXX FIXME -- change this to not starve out the target queue
1311 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
1312 options &= ~MACH_RCV_MSG;
1313 tmp_options &= ~MACH_RCV_MSG;
1315 if (!(tmp_options & MACH_SEND_MSG)) {
1320 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
1321 (mach_msg_size_t)maxmsgsz, (mach_port_t)ds->ds_ident_hack, 0, 0);
1323 tmp_options = options;
1327 case MACH_SEND_INVALID_DEST:
1328 case MACH_SEND_TIMED_OUT:
1329 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
1330 mach_msg_destroy(&bufReply->Head);
1333 case MACH_RCV_TIMED_OUT:
1334 case MACH_RCV_INVALID_NAME:
1337 (void)dispatch_assume_zero(kr);
1343 if (!(tmp_options & MACH_RCV_MSG)) {
1347 bufTemp = bufRequest;
1348 bufRequest = bufReply;
1351 demux_success = callback(&bufRequest->Head, &bufReply->Head);
1353 if (!demux_success) {
1354 // destroy the request - but not the reply port
1355 bufRequest->Head.msgh_remote_port = 0;
1356 mach_msg_destroy(&bufRequest->Head);
1357 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
1358 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode is present
1359 if (slowpath(bufReply->RetCode)) {
1360 if (bufReply->RetCode == MIG_NO_REPLY) {
1364 // destroy the request - but not the reply port
1365 bufRequest->Head.msgh_remote_port = 0;
1366 mach_msg_destroy(&bufRequest->Head);
1370 if (bufReply->Head.msgh_remote_port) {
1371 tmp_options |= MACH_SEND_MSG;
1372 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) != MACH_MSG_TYPE_MOVE_SEND_ONCE) {
1373 tmp_options |= MACH_SEND_TIMEOUT;
1380 #endif /* HAVE_MACH */