libdispatch update
[platform/upstream/gcd.git] / dispatch-1.0 / src / source.c
1 /*
2  * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
3  *
4  * @APPLE_APACHE_LICENSE_HEADER_START@
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * @APPLE_APACHE_LICENSE_HEADER_END@
19  */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #include "protocolServer.h"
25 #endif
26 #include <sys/mount.h>
27
28 static void _dispatch_source_merge_kevent(dispatch_source_t ds,
29                 const struct kevent *ke);
30 static void _dispatch_kevent_register(dispatch_source_t ds);
31 static void _dispatch_kevent_unregister(dispatch_source_t ds);
32 static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
33                 uint32_t del_flags);
34 static inline void _dispatch_source_timer_init(void);
35 static void _dispatch_timer_list_update(dispatch_source_t ds);
36 static inline unsigned long _dispatch_source_timer_data(
37                 dispatch_source_refs_t dr, unsigned long prev);
38 #if HAVE_MACH
39 static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
40                 uint32_t new_flags, uint32_t del_flags);
41 static void _dispatch_drain_mach_messages(struct kevent *ke);
42 #endif
43 #if DISPATCH_DEBUG
44 static void _dispatch_kevent_debugger(void *context);
45 #endif
46
47 #pragma mark -
48 #pragma mark dispatch_source_t
49
50 dispatch_source_t
51 dispatch_source_create(dispatch_source_type_t type,
52         uintptr_t handle,
53         unsigned long mask,
54         dispatch_queue_t q)
55 {
56         const struct kevent *proto_kev = &type->ke;
57         dispatch_source_t ds = NULL;
58         dispatch_kevent_t dk = NULL;
59
60         // input validation
61         if (type == NULL || (mask & ~type->mask)) {
62                 return NULL;
63         }
64
65         switch (type->ke.filter) {
66         case EVFILT_SIGNAL:
67                 if (handle >= NSIG) {
68                         return NULL;
69                 }
70                 break;
71         case EVFILT_FS:
72 #if DISPATCH_USE_VM_PRESSURE
73         case EVFILT_VM:
74 #endif
75         case DISPATCH_EVFILT_CUSTOM_ADD:
76         case DISPATCH_EVFILT_CUSTOM_OR:
77         case DISPATCH_EVFILT_TIMER:
78                 if (handle) {
79                         return NULL;
80                 }
81                 break;
82         default:
83                 break;
84         }
85
86         dk = calloc(1ul, sizeof(struct dispatch_kevent_s));
87         dk->dk_kevent = *proto_kev;
88         dk->dk_kevent.ident = handle;
89         dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
90         dk->dk_kevent.fflags |= (uint32_t)mask;
91         dk->dk_kevent.udata = dk;
92         TAILQ_INIT(&dk->dk_sources);
93
94         ds = _dispatch_alloc(DISPATCH_VTABLE(source),
95                         sizeof(struct dispatch_source_s));
96         // Initialize as a queue first, then override some settings below.
97         _dispatch_queue_init((dispatch_queue_t)ds);
98         strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));
99
100         // Dispatch Object
101         ds->do_ref_cnt++; // the reference the manger queue holds
102         ds->do_ref_cnt++; // since source is created suspended
103         ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
104         // The initial target queue is the manager queue, in order to get
105         // the source installed. <rdar://problem/8928171>
106         ds->do_targetq = &_dispatch_mgr_q;
107
108         // Dispatch Source
109         ds->ds_ident_hack = dk->dk_kevent.ident;
110         ds->ds_dkev = dk;
111         ds->ds_pending_data_mask = dk->dk_kevent.fflags;
112         if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
113                 ds->ds_is_level = true;
114                 ds->ds_needs_rearm = true;
115         } else if (!(EV_CLEAR & proto_kev->flags)) {
116                 // we cheat and use EV_CLEAR to mean a "flag thingy"
117                 ds->ds_is_adder = true;
118         }
119
120         // Some sources require special processing
121         if (type->init != NULL) {
122                 type->init(ds, type, handle, mask, q);
123         }
124         if (fastpath(!ds->ds_refs)) {
125                 ds->ds_refs = calloc(1ul, sizeof(struct dispatch_source_refs_s));
126                 if (slowpath(!ds->ds_refs)) {
127                         goto out_bad;
128                 }
129         }
130         ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
131         dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
132
133         // First item on the queue sets the user-specified target queue
134         dispatch_set_target_queue(ds, q);
135 #if DISPATCH_DEBUG
136         dispatch_debug(ds, "%s", __func__);
137 #endif
138         return ds;
139
140 out_bad:
141         free(ds);
142         free(dk);
143         return NULL;
144 }
145
146 void
147 _dispatch_source_dispose(dispatch_source_t ds)
148 {
149         free(ds->ds_refs);
150         _dispatch_queue_dispose((dispatch_queue_t)ds);
151 }
152
153 void
154 _dispatch_source_xref_dispose(dispatch_source_t ds)
155 {
156         _dispatch_wakeup(ds);
157 }
158
159 void
160 dispatch_source_cancel(dispatch_source_t ds)
161 {
162 #if DISPATCH_DEBUG
163         dispatch_debug(ds, "%s", __func__);
164 #endif
165         // Right after we set the cancel flag, someone else
166         // could potentially invoke the source, do the cancelation,
167         // unregister the source, and deallocate it. We would
168         // need to therefore retain/release before setting the bit
169
170         _dispatch_retain(ds);
171         (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED);
172         _dispatch_wakeup(ds);
173         _dispatch_release(ds);
174 }
175
176 long
177 dispatch_source_testcancel(dispatch_source_t ds)
178 {
179         return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
180 }
181
182
183 unsigned long
184 dispatch_source_get_mask(dispatch_source_t ds)
185 {
186         return ds->ds_pending_data_mask;
187 }
188
189 uintptr_t
190 dispatch_source_get_handle(dispatch_source_t ds)
191 {
192         return (int)ds->ds_ident_hack;
193 }
194
195 unsigned long
196 dispatch_source_get_data(dispatch_source_t ds)
197 {
198         return ds->ds_data;
199 }
200
201 void
202 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
203 {
204         struct kevent kev = {
205                 .fflags = (typeof(kev.fflags))val,
206                 .data = val,
207         };
208
209         dispatch_assert(
210                         ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
211                         ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
212
213         _dispatch_source_merge_kevent(ds, &kev);
214 }
215
216 #pragma mark -
217 #pragma mark dispatch_source_handler
218
219 #ifdef __BLOCKS__
220 // 6618342 Contact the team that owns the Instrument DTrace probe before
221 //         renaming this symbol
222 static void
223 _dispatch_source_set_event_handler2(void *context)
224 {
225         struct Block_layout *bl = context;
226
227         dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
228         dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
229         dispatch_source_refs_t dr = ds->ds_refs;
230
231         if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
232                 Block_release(dr->ds_handler_ctxt);
233         }
234         dr->ds_handler_func = bl ? (void *)bl->invoke : NULL;
235         dr->ds_handler_ctxt = bl;
236         ds->ds_handler_is_block = true;
237 }
238
239 void
240 dispatch_source_set_event_handler(dispatch_source_t ds,
241                 dispatch_block_t handler)
242 {
243         handler = _dispatch_Block_copy(handler);
244         dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
245                         _dispatch_source_set_event_handler2);
246 }
247 #endif /* __BLOCKS__ */
248
249 static void
250 _dispatch_source_set_event_handler_f(void *context)
251 {
252         dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
253         dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
254         dispatch_source_refs_t dr = ds->ds_refs;
255
256 #ifdef __BLOCKS__
257         if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
258                 Block_release(dr->ds_handler_ctxt);
259         }
260 #endif
261         dr->ds_handler_func = context;
262         dr->ds_handler_ctxt = ds->do_ctxt;
263         ds->ds_handler_is_block = false;
264 }
265
266 void
267 dispatch_source_set_event_handler_f(dispatch_source_t ds,
268         dispatch_function_t handler)
269 {
270         dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
271                         _dispatch_source_set_event_handler_f);
272 }
273
274 #ifdef __BLOCKS__
275 // 6618342 Contact the team that owns the Instrument DTrace probe before
276 //         renaming this symbol
277 static void
278 _dispatch_source_set_cancel_handler2(void *context)
279 {
280         dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
281         dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
282         dispatch_source_refs_t dr = ds->ds_refs;
283
284         if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
285                 Block_release(dr->ds_cancel_handler);
286         }
287         dr->ds_cancel_handler = context;
288         ds->ds_cancel_is_block = true;
289 }
290
291 void
292 dispatch_source_set_cancel_handler(dispatch_source_t ds,
293         dispatch_block_t handler)
294 {
295         handler = _dispatch_Block_copy(handler);
296         dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
297                         _dispatch_source_set_cancel_handler2);
298 }
299 #endif /* __BLOCKS__ */
300
301 static void
302 _dispatch_source_set_cancel_handler_f(void *context)
303 {
304         dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
305         dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
306         dispatch_source_refs_t dr = ds->ds_refs;
307
308 #ifdef __BLOCKS__
309         if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
310                 Block_release(dr->ds_cancel_handler);
311         }
312 #endif
313         dr->ds_cancel_handler = context;
314         ds->ds_cancel_is_block = false;
315 }
316
317 void
318 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
319         dispatch_function_t handler)
320 {
321         dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
322                         _dispatch_source_set_cancel_handler_f);
323 }
324
325 #ifdef __BLOCKS__
326 static void
327 _dispatch_source_set_registration_handler2(void *context)
328 {
329         dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
330         dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
331         dispatch_source_refs_t dr = ds->ds_refs;
332
333         if (ds->ds_registration_is_block && dr->ds_registration_handler) {
334                 Block_release(dr->ds_registration_handler);
335         }
336         dr->ds_registration_handler = context;
337         ds->ds_registration_is_block = true;
338 }
339
340 void
341 dispatch_source_set_registration_handler(dispatch_source_t ds,
342         dispatch_block_t handler)
343 {
344         handler = _dispatch_Block_copy(handler);
345         dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
346                         _dispatch_source_set_registration_handler2);
347 }
348 #endif /* __BLOCKS__ */
349
350 static void
351 _dispatch_source_set_registration_handler_f(void *context)
352 {
353         dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
354         dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
355         dispatch_source_refs_t dr = ds->ds_refs;
356
357 #ifdef __BLOCKS__
358         if (ds->ds_registration_is_block && dr->ds_registration_handler) {
359                 Block_release(dr->ds_registration_handler);
360         }
361 #endif
362         dr->ds_registration_handler = context;
363         ds->ds_registration_is_block = false;
364 }
365
366 void
367 dispatch_source_set_registration_handler_f(dispatch_source_t ds,
368         dispatch_function_t handler)
369 {
370         dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
371                         _dispatch_source_set_registration_handler_f);
372 }
373
374 #pragma mark -
375 #pragma mark dispatch_source_invoke
376
377 static void
378 _dispatch_source_registration_callout(dispatch_source_t ds)
379 {
380         dispatch_source_refs_t dr = ds->ds_refs;
381
382         if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
383                 // no registration callout if source is canceled rdar://problem/8955246
384 #ifdef __BLOCKS__
385                 if (ds->ds_registration_is_block) {
386                         Block_release(dr->ds_registration_handler);
387                 }
388         } else if (ds->ds_registration_is_block) {
389                 dispatch_block_t b = dr->ds_registration_handler;
390                 _dispatch_client_callout_block(b);
391                 Block_release(dr->ds_registration_handler);
392 #endif
393         } else {
394                 dispatch_function_t f = dr->ds_registration_handler;
395                 _dispatch_client_callout(ds->do_ctxt, f);
396         }
397         ds->ds_registration_is_block = false;
398         dr->ds_registration_handler = NULL;
399 }
400
401 static void
402 _dispatch_source_cancel_callout(dispatch_source_t ds)
403 {
404         dispatch_source_refs_t dr = ds->ds_refs;
405
406         ds->ds_pending_data_mask = 0;
407         ds->ds_pending_data = 0;
408         ds->ds_data = 0;
409
410 #ifdef __BLOCKS__
411         if (ds->ds_handler_is_block) {
412                 Block_release(dr->ds_handler_ctxt);
413                 ds->ds_handler_is_block = false;
414                 dr->ds_handler_func = NULL;
415                 dr->ds_handler_ctxt = NULL;
416         }
417         if (ds->ds_registration_is_block) {
418                 Block_release(dr->ds_registration_handler);
419                 ds->ds_registration_is_block = false;
420                 dr->ds_registration_handler = NULL;
421         }
422 #endif
423
424         if (!dr->ds_cancel_handler) {
425                 return;
426         }
427         if (ds->ds_cancel_is_block) {
428 #ifdef __BLOCKS__
429                 dispatch_block_t b = dr->ds_cancel_handler;
430                 if (ds->ds_atomic_flags & DSF_CANCELED) {
431                         _dispatch_client_callout_block(b);
432                 }
433                 Block_release(dr->ds_cancel_handler);
434                 ds->ds_cancel_is_block = false;
435 #endif
436         } else {
437                 dispatch_function_t f = dr->ds_cancel_handler;
438                 if (ds->ds_atomic_flags & DSF_CANCELED) {
439                         _dispatch_client_callout(ds->do_ctxt, f);
440                 }
441         }
442         dr->ds_cancel_handler = NULL;
443 }
444
445 static void
446 _dispatch_source_latch_and_call(dispatch_source_t ds)
447 {
448         unsigned long prev;
449
450         if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
451                 return;
452         }
453         dispatch_source_refs_t dr = ds->ds_refs;
454         prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0);
455         if (ds->ds_is_level) {
456                 ds->ds_data = ~prev;
457         } else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
458                 ds->ds_data = _dispatch_source_timer_data(dr, prev);
459         } else {
460                 ds->ds_data = prev;
461         }
462         if (dispatch_assume(prev) && dr->ds_handler_func) {
463                 _dispatch_client_callout(dr->ds_handler_ctxt, dr->ds_handler_func);
464         }
465 }
466
467 static void
468 _dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
469 {
470         switch (ds->ds_dkev->dk_kevent.filter) {
471         case DISPATCH_EVFILT_TIMER:
472                 // called on manager queue only
473                 return _dispatch_timer_list_update(ds);
474         case EVFILT_MACHPORT:
475                 if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) {
476                         new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
477                 }
478                 break;
479         }
480         if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) {
481                 _dispatch_kevent_unregister(ds);
482         }
483 }
484
485 dispatch_queue_t
486 _dispatch_source_invoke(dispatch_source_t ds)
487 {
488         // This function performs all source actions. Each action is responsible
489         // for verifying that it takes place on the appropriate queue. If the
490         // current queue is not the correct queue for this action, the correct queue
491         // will be returned and the invoke will be re-driven on that queue.
492
493         // The order of tests here in invoke and in probe should be consistent.
494
495         dispatch_queue_t dq = _dispatch_queue_get_current();
496         dispatch_source_refs_t dr = ds->ds_refs;
497
498         if (!ds->ds_is_installed) {
499                 // The source needs to be installed on the manager queue.
500                 if (dq != &_dispatch_mgr_q) {
501                         return &_dispatch_mgr_q;
502                 }
503                 _dispatch_kevent_register(ds);
504                 if (dr->ds_registration_handler) {
505                         return ds->do_targetq;
506                 }
507                 if (slowpath(ds->do_xref_cnt == -1)) {
508                         return &_dispatch_mgr_q; // rdar://problem/9558246
509                 }
510         } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
511                 // Source suspended by an item drained from the source queue.
512                 return NULL;
513         } else if (dr->ds_registration_handler) {
514                 // The source has been registered and the registration handler needs
515                 // to be delivered on the target queue.
516                 if (dq != ds->do_targetq) {
517                         return ds->do_targetq;
518                 }
519                 // clears ds_registration_handler
520                 _dispatch_source_registration_callout(ds);
521                 if (slowpath(ds->do_xref_cnt == -1)) {
522                         return &_dispatch_mgr_q; // rdar://problem/9558246
523                 }
524         } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
525                 // The source has been cancelled and needs to be uninstalled from the
526                 // manager queue. After uninstallation, the cancellation handler needs
527                 // to be delivered to the target queue.
528                 if (ds->ds_dkev) {
529                         if (dq != &_dispatch_mgr_q) {
530                                 return &_dispatch_mgr_q;
531                         }
532                         _dispatch_kevent_unregister(ds);
533                 }
534                 if (dr->ds_cancel_handler || ds->ds_handler_is_block ||
535                                 ds->ds_registration_is_block) {
536                         if (dq != ds->do_targetq) {
537                                 return ds->do_targetq;
538                         }
539                 }
540                 _dispatch_source_cancel_callout(ds);
541         } else if (ds->ds_pending_data) {
542                 // The source has pending data to deliver via the event handler callback
543                 // on the target queue. Some sources need to be rearmed on the manager
544                 // queue after event delivery.
545                 if (dq != ds->do_targetq) {
546                         return ds->do_targetq;
547                 }
548                 _dispatch_source_latch_and_call(ds);
549                 if (ds->ds_needs_rearm) {
550                         return &_dispatch_mgr_q;
551                 }
552         } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
553                 // The source needs to be rearmed on the manager queue.
554                 if (dq != &_dispatch_mgr_q) {
555                         return &_dispatch_mgr_q;
556                 }
557                 _dispatch_source_kevent_resume(ds, 0);
558                 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED);
559         }
560
561         return NULL;
562 }
563
564 bool
565 _dispatch_source_probe(dispatch_source_t ds)
566 {
567         // This function determines whether the source needs to be invoked.
568         // The order of tests here in probe and in invoke should be consistent.
569
570         dispatch_source_refs_t dr = ds->ds_refs;
571         if (!ds->ds_is_installed) {
572                 // The source needs to be installed on the manager queue.
573                 return true;
574         } else if (dr->ds_registration_handler) {
575                 // The registration handler needs to be delivered to the target queue.
576                 return true;
577         } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
578                 // The source needs to be uninstalled from the manager queue, or the
579                 // cancellation handler needs to be delivered to the target queue.
580                 // Note: cancellation assumes installation.
581                 if (ds->ds_dkev || dr->ds_cancel_handler
582 #ifdef __BLOCKS__
583                                 || ds->ds_handler_is_block || ds->ds_registration_is_block
584 #endif
585                 ) {
586                         return true;
587                 }
588         } else if (ds->ds_pending_data) {
589                 // The source has pending data to deliver to the target queue.
590                 return true;
591         } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
592                 // The source needs to be rearmed on the manager queue.
593                 return true;
594         }
595         // Nothing to do.
596         return false;
597 }
598
599 #pragma mark -
600 #pragma mark dispatch_source_kevent
601
602 static void
603 _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
604 {
605         struct kevent fake;
606
607         if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
608                 return;
609         }
610
611         // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
612         // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
613         // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
614         if (ke->flags & EV_ERROR) {
615                 if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
616                         fake = *ke;
617                         fake.flags &= ~EV_ERROR;
618                         fake.fflags = NOTE_EXIT;
619                         fake.data = 0;
620                         ke = &fake;
621 #if DISPATCH_USE_VM_PRESSURE
622                 } else if (ke->filter == EVFILT_VM && ke->data == ENOTSUP) {
623                         // Memory pressure kevent is not supported on all platforms
624                         // <rdar://problem/8636227>
625                         return;
626 #endif
627                 } else {
628                         // log the unexpected error
629                         (void)dispatch_assume_zero(ke->data);
630                         return;
631                 }
632         }
633
634         if (ds->ds_is_level) {
635                 // ke->data is signed and "negative available data" makes no sense
636                 // zero bytes happens when EV_EOF is set
637                 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
638                 dispatch_assert(ke->data >= 0l);
639                 ds->ds_pending_data = ~ke->data;
640         } else if (ds->ds_is_adder) {
641                 (void)dispatch_atomic_add2o(ds, ds_pending_data, ke->data);
642         } else if (ke->fflags & ds->ds_pending_data_mask) {
643                 (void)dispatch_atomic_or2o(ds, ds_pending_data,
644                                 ke->fflags & ds->ds_pending_data_mask);
645         }
646
647         // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
648         if (ds->ds_needs_rearm) {
649                 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED);
650         }
651
652         _dispatch_wakeup(ds);
653 }
654
655 void
656 _dispatch_source_drain_kevent(struct kevent *ke)
657 {
658         dispatch_kevent_t dk = ke->udata;
659         dispatch_source_refs_t dri;
660
661 #if DISPATCH_DEBUG
662         static dispatch_once_t pred;
663         dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
664 #endif
665
666         dispatch_debug_kevents(ke, 1, __func__);
667
668 #if HAVE_MACH
669         if (ke->filter == EVFILT_MACHPORT) {
670                 return _dispatch_drain_mach_messages(ke);
671         }
672 #endif
673         dispatch_assert(dk);
674
675         if (ke->flags & EV_ONESHOT) {
676                 dk->dk_kevent.flags |= EV_ONESHOT;
677         }
678
679         TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
680                 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
681         }
682 }
683
684 #pragma mark -
685 #pragma mark dispatch_kevent_t
686
687 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
688         .dk_kevent = {
689                 .filter = DISPATCH_EVFILT_CUSTOM_OR,
690                 .flags = EV_CLEAR,
691                 .udata = &_dispatch_kevent_data_or,
692         },
693         .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
694 };
695 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
696         .dk_kevent = {
697                 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
698                 .udata = &_dispatch_kevent_data_add,
699         },
700         .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
701 };
702
703 #if TARGET_OS_EMBEDDED
704 #define DSL_HASH_SIZE  64u // must be a power of two
705 #else
706 #define DSL_HASH_SIZE 256u // must be a power of two
707 #endif
708 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
709
710 DISPATCH_CACHELINE_ALIGN
711 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
712
713 static dispatch_once_t __dispatch_kevent_init_pred;
714
715 static void
716 _dispatch_kevent_init(void *context DISPATCH_UNUSED)
717 {
718         unsigned int i;
719         for (i = 0; i < DSL_HASH_SIZE; i++) {
720                 TAILQ_INIT(&_dispatch_sources[i]);
721         }
722
723         TAILQ_INSERT_TAIL(&_dispatch_sources[0],
724                         &_dispatch_kevent_data_or, dk_list);
725         TAILQ_INSERT_TAIL(&_dispatch_sources[0],
726                         &_dispatch_kevent_data_add, dk_list);
727
728         _dispatch_source_timer_init();
729 }
730
731 static inline uintptr_t
732 _dispatch_kevent_hash(uintptr_t ident, short filter)
733 {
734         uintptr_t value;
735 #if HAVE_MACH
736         value = (filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
737 #else
738         value = ident;
739 #endif
740         return DSL_HASH(value);
741 }
742
743 static dispatch_kevent_t
744 _dispatch_kevent_find(uintptr_t ident, short filter)
745 {
746         uintptr_t hash = _dispatch_kevent_hash(ident, filter);
747         dispatch_kevent_t dki;
748
749         TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
750                 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
751                         break;
752                 }
753         }
754         return dki;
755 }
756
757 static void
758 _dispatch_kevent_insert(dispatch_kevent_t dk)
759 {
760         uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
761                         dk->dk_kevent.filter);
762
763         TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
764 }
765
766 // Find existing kevents, and merge any new flags if necessary
767 static void
768 _dispatch_kevent_register(dispatch_source_t ds)
769 {
770         dispatch_kevent_t dk;
771         typeof(dk->dk_kevent.fflags) new_flags;
772         bool do_resume = false;
773
774         if (ds->ds_is_installed) {
775                 return;
776         }
777         ds->ds_is_installed = true;
778
779         dispatch_once_f(&__dispatch_kevent_init_pred,
780                         NULL, _dispatch_kevent_init);
781
782         dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident,
783                         ds->ds_dkev->dk_kevent.filter);
784
785         if (dk) {
786                 // If an existing dispatch kevent is found, check to see if new flags
787                 // need to be added to the existing kevent
788                 new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
789                 dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
790                 free(ds->ds_dkev);
791                 ds->ds_dkev = dk;
792                 do_resume = new_flags;
793         } else {
794                 dk = ds->ds_dkev;
795                 _dispatch_kevent_insert(dk);
796                 new_flags = dk->dk_kevent.fflags;
797                 do_resume = true;
798         }
799
800         TAILQ_INSERT_TAIL(&dk->dk_sources, ds->ds_refs, dr_list);
801
802         // Re-register the kevent with the kernel if new flags were added
803         // by the dispatch kevent
804         if (do_resume) {
805                 dk->dk_kevent.flags |= EV_ADD;
806         }
807         if (do_resume || ds->ds_needs_rearm) {
808                 _dispatch_source_kevent_resume(ds, new_flags);
809         }
810         (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED);
811 }
812
813 static bool
814 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
815                 uint32_t del_flags)
816 {
817         long r;
818         switch (dk->dk_kevent.filter) {
819         case DISPATCH_EVFILT_TIMER:
820         case DISPATCH_EVFILT_CUSTOM_ADD:
821         case DISPATCH_EVFILT_CUSTOM_OR:
822                 // these types not registered with kevent
823                 return 0;
824 #if HAVE_MACH
825         case EVFILT_MACHPORT:
826                 return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
827 #endif
828         case EVFILT_PROC:
829                 if (dk->dk_kevent.flags & EV_ONESHOT) {
830                         return 0;
831                 }
832                 // fall through
833         default:
834                 r = _dispatch_update_kq(&dk->dk_kevent);
835                 if (dk->dk_kevent.flags & EV_DISPATCH) {
836                         dk->dk_kevent.flags &= ~EV_ADD;
837                 }
838                 return r;
839         }
840 }
841
842 static void
843 _dispatch_kevent_dispose(dispatch_kevent_t dk)
844 {
845         uintptr_t hash;
846
847         switch (dk->dk_kevent.filter) {
848         case DISPATCH_EVFILT_TIMER:
849         case DISPATCH_EVFILT_CUSTOM_ADD:
850         case DISPATCH_EVFILT_CUSTOM_OR:
851                 // these sources live on statically allocated lists
852                 return;
853 #if HAVE_MACH
854         case EVFILT_MACHPORT:
855                 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
856                 break;
857 #endif
858         case EVFILT_PROC:
859                 if (dk->dk_kevent.flags & EV_ONESHOT) {
860                         break; // implicitly deleted
861                 }
862                 // fall through
863         default:
864                 if (~dk->dk_kevent.flags & EV_DELETE) {
865                         dk->dk_kevent.flags |= EV_DELETE;
866                         _dispatch_update_kq(&dk->dk_kevent);
867                 }
868                 break;
869         }
870
871         hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
872                         dk->dk_kevent.filter);
873         TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
874         free(dk);
875 }
876
877 static void
878 _dispatch_kevent_unregister(dispatch_source_t ds)
879 {
880         dispatch_kevent_t dk = ds->ds_dkev;
881         dispatch_source_refs_t dri;
882         uint32_t del_flags, fflags = 0;
883
884         ds->ds_dkev = NULL;
885
886         TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
887
888         if (TAILQ_EMPTY(&dk->dk_sources)) {
889                 _dispatch_kevent_dispose(dk);
890         } else {
891                 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
892                         dispatch_source_t dsi = _dispatch_source_from_refs(dri);
893                         fflags |= (uint32_t)dsi->ds_pending_data_mask;
894                 }
895                 del_flags = (uint32_t)ds->ds_pending_data_mask & ~fflags;
896                 if (del_flags) {
897                         dk->dk_kevent.flags |= EV_ADD;
898                         dk->dk_kevent.fflags = fflags;
899                         _dispatch_kevent_resume(dk, 0, del_flags);
900                 }
901         }
902
903         (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED);
904         ds->ds_needs_rearm = false; // re-arm is pointless and bad now
905         _dispatch_release(ds); // the retain is done at creation time
906 }
907
908 #pragma mark -
909 #pragma mark dispatch_timer
910
911 DISPATCH_CACHELINE_ALIGN
912 static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
913         [DISPATCH_TIMER_INDEX_WALL] = {
914                 .dk_kevent = {
915                         .ident = DISPATCH_TIMER_INDEX_WALL,
916                         .filter = DISPATCH_EVFILT_TIMER,
917                         .udata = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL],
918                 },
919                 .dk_sources = TAILQ_HEAD_INITIALIZER(
920                                 _dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL].dk_sources),
921         },
922         [DISPATCH_TIMER_INDEX_MACH] = {
923                 .dk_kevent = {
924                         .ident = DISPATCH_TIMER_INDEX_MACH,
925                         .filter = DISPATCH_EVFILT_TIMER,
926                         .udata = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH],
927                 },
928                 .dk_sources = TAILQ_HEAD_INITIALIZER(
929                                 _dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH].dk_sources),
930         },
931         [DISPATCH_TIMER_INDEX_DISARM] = {
932                 .dk_kevent = {
933                         .ident = DISPATCH_TIMER_INDEX_DISARM,
934                         .filter = DISPATCH_EVFILT_TIMER,
935                         .udata = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM],
936                 },
937                 .dk_sources = TAILQ_HEAD_INITIALIZER(
938                                 _dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM].dk_sources),
939         },
940 };
941 // Don't count disarmed timer list
942 #define DISPATCH_TIMER_COUNT ((sizeof(_dispatch_kevent_timer) \
943                 / sizeof(_dispatch_kevent_timer[0])) - 1)
944
945 static inline void
946 _dispatch_source_timer_init(void)
947 {
948         TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)],
949                         &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
950         TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)],
951                         &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);
952         TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_DISARM)],
953                         &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM], dk_list);
954 }
955
956 DISPATCH_ALWAYS_INLINE
957 static inline unsigned int
958 _dispatch_source_timer_idx(dispatch_source_refs_t dr)
959 {
960         return ds_timer(dr).flags & DISPATCH_TIMER_WALL_CLOCK ?
961                 DISPATCH_TIMER_INDEX_WALL : DISPATCH_TIMER_INDEX_MACH;
962 }
963
964 DISPATCH_ALWAYS_INLINE
965 static inline uint64_t
966 _dispatch_source_timer_now2(unsigned int timer)
967 {
968         switch (timer) {
969         case DISPATCH_TIMER_INDEX_MACH:
970                 return _dispatch_absolute_time();
971         case DISPATCH_TIMER_INDEX_WALL:
972                 return _dispatch_get_nanoseconds();
973         default:
974                 DISPATCH_CRASH("Invalid timer");
975         }
976 }
977
978 DISPATCH_ALWAYS_INLINE
979 static inline uint64_t
980 _dispatch_source_timer_now(dispatch_source_refs_t dr)
981 {
982         return _dispatch_source_timer_now2(_dispatch_source_timer_idx(dr));
983 }
984
985 // Updates the ordered list of timers based on next fire date for changes to ds.
986 // Should only be called from the context of _dispatch_mgr_q.
987 static void
988 _dispatch_timer_list_update(dispatch_source_t ds)
989 {
990         dispatch_source_refs_t dr = ds->ds_refs, dri = NULL;
991
992         dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q);
993
994         // do not reschedule timers unregistered with _dispatch_kevent_unregister()
995         if (!ds->ds_dkev) {
996                 return;
997         }
998
999         // Ensure the source is on the global kevent lists before it is removed and
1000         // readded below.
1001         _dispatch_kevent_register(ds);
1002
1003         TAILQ_REMOVE(&ds->ds_dkev->dk_sources, dr, dr_list);
1004
1005         // Move timers that are disabled, suspended or have missed intervals to the
1006         // disarmed list, rearm after resume resp. source invoke will reenable them
1007         if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) ||
1008                         ds->ds_pending_data) {
1009                 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED);
1010                 ds->ds_dkev = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM];
1011                 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, (dispatch_source_refs_t)dr,
1012                                 dr_list);
1013                 return;
1014         }
1015
1016         // change the list if the clock type has changed
1017         ds->ds_dkev = &_dispatch_kevent_timer[_dispatch_source_timer_idx(dr)];
1018
1019         TAILQ_FOREACH(dri, &ds->ds_dkev->dk_sources, dr_list) {
1020                 if (ds_timer(dri).target == 0 ||
1021                                 ds_timer(dr).target < ds_timer(dri).target) {
1022                         break;
1023                 }
1024         }
1025
1026         if (dri) {
1027                 TAILQ_INSERT_BEFORE(dri, dr, dr_list);
1028         } else {
1029                 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, dr, dr_list);
1030         }
1031 }
1032
1033 static inline void
1034 _dispatch_run_timers2(unsigned int timer)
1035 {
1036         dispatch_source_refs_t dr;
1037         dispatch_source_t ds;
1038         uint64_t now, missed;
1039
1040         now = _dispatch_source_timer_now2(timer);
1041         while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources))) {
1042                 ds = _dispatch_source_from_refs(dr);
1043                 // We may find timers on the wrong list due to a pending update from
1044                 // dispatch_source_set_timer. Force an update of the list in that case.
1045                 if (timer != ds->ds_ident_hack) {
1046                         _dispatch_timer_list_update(ds);
1047                         continue;
1048                 }
1049                 if (!ds_timer(dr).target) {
1050                         // no configured timers on the list
1051                         break;
1052                 }
1053                 if (ds_timer(dr).target > now) {
1054                         // Done running timers for now.
1055                         break;
1056                 }
1057                 // Remove timers that are suspended or have missed intervals from the
1058                 // list, rearm after resume resp. source invoke will reenable them
1059                 if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) {
1060                         _dispatch_timer_list_update(ds);
1061                         continue;
1062                 }
1063                 // Calculate number of missed intervals.
1064                 missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
1065                 if (++missed > INT_MAX) {
1066                         missed = INT_MAX;
1067                 }
1068                 ds_timer(dr).target += missed * ds_timer(dr).interval;
1069                 _dispatch_timer_list_update(ds);
1070                 ds_timer(dr).last_fire = now;
1071                 (void)dispatch_atomic_add2o(ds, ds_pending_data, (int)missed);
1072                 _dispatch_wakeup(ds);
1073         }
1074 }
1075
1076 void
1077 _dispatch_run_timers(void)
1078 {
1079         dispatch_once_f(&__dispatch_kevent_init_pred,
1080                         NULL, _dispatch_kevent_init);
1081
1082         unsigned int i;
1083         for (i = 0; i < DISPATCH_TIMER_COUNT; i++) {
1084                 if (!TAILQ_EMPTY(&_dispatch_kevent_timer[i].dk_sources)) {
1085                         _dispatch_run_timers2(i);
1086                 }
1087         }
1088 }
1089
1090 static inline unsigned long
1091 _dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1092 {
1093         // calculate the number of intervals since last fire
1094         unsigned long data, missed;
1095         uint64_t now = _dispatch_source_timer_now(dr);
1096         missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1097                         ds_timer(dr).interval);
1098         // correct for missed intervals already delivered last time
1099         data = prev - ds_timer(dr).missed + missed;
1100         ds_timer(dr).missed = missed;
1101         return data;
1102 }
1103
1104 // approx 1 year (60s * 60m * 24h * 365d)
1105 #define FOREVER_NSEC 31536000000000000ull
1106
1107 struct timespec *
1108 _dispatch_get_next_timer_fire(struct timespec *howsoon)
1109 {
1110         // <rdar://problem/6459649>
1111         // kevent(2) does not allow large timeouts, so we use a long timeout
1112         // instead (approximately 1 year).
1113         dispatch_source_refs_t dr = NULL;
1114         unsigned int timer;
1115         uint64_t now, delta_tmp, delta = UINT64_MAX;
1116
1117         for (timer = 0; timer < DISPATCH_TIMER_COUNT; timer++) {
1118                 // Timers are kept in order, first one will fire next
1119                 dr = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources);
1120                 if (!dr || !ds_timer(dr).target) {
1121                         // Empty list or disabled timer
1122                         continue;
1123                 }
1124                 now = _dispatch_source_timer_now(dr);
1125                 if (ds_timer(dr).target <= now) {
1126                         howsoon->tv_sec = 0;
1127                         howsoon->tv_nsec = 0;
1128                         return howsoon;
1129                 }
1130                 // the subtraction cannot go negative because the previous "if"
1131                 // verified that the target is greater than now.
1132                 delta_tmp = ds_timer(dr).target - now;
1133                 if (!(ds_timer(dr).flags & DISPATCH_TIMER_WALL_CLOCK)) {
1134                         delta_tmp = _dispatch_time_mach2nano(delta_tmp);
1135                 }
1136                 if (delta_tmp < delta) {
1137                         delta = delta_tmp;
1138                 }
1139         }
1140         if (slowpath(delta > FOREVER_NSEC)) {
1141                 return NULL;
1142         } else {
1143                 howsoon->tv_sec = (time_t)(delta / NSEC_PER_SEC);
1144                 howsoon->tv_nsec = (long)(delta % NSEC_PER_SEC);
1145         }
1146         return howsoon;
1147 }
1148
1149 struct dispatch_set_timer_params {
1150         dispatch_source_t ds;
1151         uintptr_t ident;
1152         struct dispatch_timer_source_s values;
1153 };
1154
1155 static void
1156 _dispatch_source_set_timer3(void *context)
1157 {
1158         // Called on the _dispatch_mgr_q
1159         struct dispatch_set_timer_params *params = context;
1160         dispatch_source_t ds = params->ds;
1161         ds->ds_ident_hack = params->ident;
1162         ds_timer(ds->ds_refs) = params->values;
1163         // Clear any pending data that might have accumulated on
1164         // older timer params <rdar://problem/8574886>
1165         ds->ds_pending_data = 0;
1166         _dispatch_timer_list_update(ds);
1167         dispatch_resume(ds);
1168         dispatch_release(ds);
1169         free(params);
1170 }
1171
1172 static void
1173 _dispatch_source_set_timer2(void *context)
1174 {
1175         // Called on the source queue
1176         struct dispatch_set_timer_params *params = context;
1177         dispatch_suspend(params->ds);
1178         dispatch_barrier_async_f(&_dispatch_mgr_q, params,
1179                         _dispatch_source_set_timer3);
1180 }
1181
1182 void
1183 dispatch_source_set_timer(dispatch_source_t ds,
1184         dispatch_time_t start,
1185         uint64_t interval,
1186         uint64_t leeway)
1187 {
1188         if (slowpath(!ds->ds_is_timer)) {
1189                 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1190         }
1191
1192         struct dispatch_set_timer_params *params;
1193
1194         // we use zero internally to mean disabled
1195         if (interval == 0) {
1196                 interval = 1;
1197         } else if ((int64_t)interval < 0) {
1198                 // 6866347 - make sure nanoseconds won't overflow
1199                 interval = INT64_MAX;
1200         }
1201         if ((int64_t)leeway < 0) {
1202                 leeway = INT64_MAX;
1203         }
1204
1205         if (start == DISPATCH_TIME_NOW) {
1206                 start = _dispatch_absolute_time();
1207         } else if (start == DISPATCH_TIME_FOREVER) {
1208                 start = INT64_MAX;
1209         }
1210
1211         while (!(params = calloc(1ul, sizeof(struct dispatch_set_timer_params)))) {
1212                 sleep(1);
1213         }
1214
1215         params->ds = ds;
1216         params->values.flags = ds_timer(ds->ds_refs).flags;
1217
1218         if ((int64_t)start < 0) {
1219                 // wall clock
1220                 params->ident = DISPATCH_TIMER_INDEX_WALL;
1221                 params->values.target = -((int64_t)start);
1222                 params->values.interval = interval;
1223                 params->values.leeway = leeway;
1224                 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1225         } else {
1226                 // absolute clock
1227                 params->ident = DISPATCH_TIMER_INDEX_MACH;
1228                 params->values.target = start;
1229                 params->values.interval = _dispatch_time_nano2mach(interval);
1230
1231                 // rdar://problem/7287561 interval must be at least one in
1232                 // in order to avoid later division by zero when calculating
1233                 // the missed interval count. (NOTE: the wall clock's
1234                 // interval is already "fixed" to be 1 or more)
1235                 if (params->values.interval < 1) {
1236                         params->values.interval = 1;
1237                 }
1238
1239                 params->values.leeway = _dispatch_time_nano2mach(leeway);
1240                 params->values.flags &= ~DISPATCH_TIMER_WALL_CLOCK;
1241         }
1242         // Suspend the source so that it doesn't fire with pending changes
1243         // The use of suspend/resume requires the external retain/release
1244         dispatch_retain(ds);
1245         dispatch_barrier_async_f((dispatch_queue_t)ds, params,
1246                         _dispatch_source_set_timer2);
1247 }
1248
1249 #pragma mark -
1250 #pragma mark dispatch_mach
1251
1252 #if HAVE_MACH
1253
1254 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
1255 #define _dispatch_debug_machport(name) \
1256                 dispatch_debug_machport((name), __func__)
1257 #else
1258 #define _dispatch_debug_machport(name)
1259 #endif
1260
1261 // Flags for all notifications that are registered/unregistered when a
1262 // send-possible notification is requested/delivered
1263 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
1264                 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
1265
1266 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
1267 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
1268                 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
1269
1270 #define _DISPATCH_MACHPORT_HASH_SIZE 32
1271 #define _DISPATCH_MACHPORT_HASH(x) \
1272                 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
1273
1274 static dispatch_source_t _dispatch_mach_notify_source;
1275 static mach_port_t _dispatch_port_set;
1276 static mach_port_t _dispatch_event_port;
1277
1278 static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
1279         uint32_t new_flags, uint32_t del_flags, uint32_t mask,
1280         mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
1281
1282 static void
1283 _dispatch_port_set_init(void *context DISPATCH_UNUSED)
1284 {
1285         struct kevent kev = {
1286                 .filter = EVFILT_MACHPORT,
1287                 .flags = EV_ADD,
1288         };
1289         kern_return_t kr;
1290
1291         kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
1292                         &_dispatch_port_set);
1293         DISPATCH_VERIFY_MIG(kr);
1294         if (kr) {
1295                 _dispatch_bug_mach_client(
1296                                 "_dispatch_port_set_init: mach_port_allocate() failed", kr);
1297                 DISPATCH_CLIENT_CRASH(
1298                                 "mach_port_allocate() failed: cannot create port set");
1299         }
1300         kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
1301                         &_dispatch_event_port);
1302         DISPATCH_VERIFY_MIG(kr);
1303         if (kr) {
1304                 _dispatch_bug_mach_client(
1305                                 "_dispatch_port_set_init: mach_port_allocate() failed", kr);
1306                 DISPATCH_CLIENT_CRASH(
1307                                 "mach_port_allocate() failed: cannot create receive right");
1308         }
1309         kr = mach_port_move_member(mach_task_self(), _dispatch_event_port,
1310                         _dispatch_port_set);
1311         DISPATCH_VERIFY_MIG(kr);
1312         if (kr) {
1313                 _dispatch_bug_mach_client(
1314                                 "_dispatch_port_set_init: mach_port_move_member() failed", kr);
1315                 DISPATCH_CLIENT_CRASH("mach_port_move_member() failed");
1316         }
1317
1318         kev.ident = _dispatch_port_set;
1319
1320         _dispatch_update_kq(&kev);
1321 }
1322
1323 static mach_port_t
1324 _dispatch_get_port_set(void)
1325 {
1326         static dispatch_once_t pred;
1327
1328         dispatch_once_f(&pred, NULL, _dispatch_port_set_init);
1329
1330         return _dispatch_port_set;
1331 }
1332
1333 static kern_return_t
1334 _dispatch_kevent_machport_enable(dispatch_kevent_t dk)
1335 {
1336         mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1337         kern_return_t kr;
1338
1339         _dispatch_debug_machport(mp);
1340         kr = mach_port_move_member(mach_task_self(), mp, _dispatch_get_port_set());
1341         if (slowpath(kr)) {
1342                 DISPATCH_VERIFY_MIG(kr);
1343                 switch (kr) {
1344                 case KERN_INVALID_NAME:
1345 #if DISPATCH_DEBUG
1346                         _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
1347                                         "prematurely", mp);
1348 #endif
1349                         break;
1350                 case KERN_INVALID_RIGHT:
1351                         _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
1352                                         "mach_port_move_member() failed ", kr);
1353                         break;
1354                 default:
1355                         (void)dispatch_assume_zero(kr);
1356                         break;
1357                 }
1358         }
1359         return kr;
1360 }
1361
1362 static void
1363 _dispatch_kevent_machport_disable(dispatch_kevent_t dk)
1364 {
1365         mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1366         kern_return_t kr;
1367
1368         _dispatch_debug_machport(mp);
1369         kr = mach_port_move_member(mach_task_self(), mp, 0);
1370         if (slowpath(kr)) {
1371                 DISPATCH_VERIFY_MIG(kr);
1372                 switch (kr) {
1373                 case KERN_INVALID_RIGHT:
1374                 case KERN_INVALID_NAME:
1375 #if DISPATCH_DEBUG
1376                         _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
1377                                         "prematurely", mp);
1378 #endif
1379                         break;
1380                 default:
1381                         (void)dispatch_assume_zero(kr);
1382                         break;
1383                 }
1384         }
1385 }
1386
1387 kern_return_t
1388 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
1389                 uint32_t del_flags)
1390 {
1391         kern_return_t kr_recv = 0, kr_sp = 0;
1392
1393         dispatch_assert_zero(new_flags & del_flags);
1394         if (new_flags & DISPATCH_MACH_RECV_MESSAGE) {
1395                 kr_recv = _dispatch_kevent_machport_enable(dk);
1396         } else if (del_flags & DISPATCH_MACH_RECV_MESSAGE) {
1397                 _dispatch_kevent_machport_disable(dk);
1398         }
1399         if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
1400                         (del_flags & _DISPATCH_MACH_SP_FLAGS)) {
1401                 // Requesting a (delayed) non-sync send-possible notification
1402                 // registers for both immediate dead-name notification and delayed-arm
1403                 // send-possible notification for the port.
1404                 // The send-possible notification is armed when a mach_msg() with the
1405                 // the MACH_SEND_NOTIFY to the port times out.
1406                 // If send-possible is unavailable, fall back to immediate dead-name
1407                 // registration rdar://problem/2527840&9008724
1408                 kr_sp = _dispatch_mach_notify_update(dk, new_flags, del_flags,
1409                                 _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
1410                                 MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
1411         }
1412
1413         return (kr_recv ? kr_recv : kr_sp);
1414 }
1415
1416 void
1417 _dispatch_drain_mach_messages(struct kevent *ke)
1418 {
1419         mach_port_t name = (mach_port_name_t)ke->data;
1420         dispatch_source_refs_t dri;
1421         dispatch_kevent_t dk;
1422         struct kevent kev;
1423
1424         if (!dispatch_assume(name)) {
1425                 return;
1426         }
1427         _dispatch_debug_machport(name);
1428         dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1429         if (!dispatch_assume(dk)) {
1430                 return;
1431         }
1432         _dispatch_kevent_machport_disable(dk); // emulate EV_DISPATCH
1433
1434         EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH,
1435                         DISPATCH_MACH_RECV_MESSAGE, 0, dk);
1436
1437         TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1438                 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), &kev);
1439         }
1440 }
1441
1442 static inline void
1443 _dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, uint32_t unreg,
1444                 bool final)
1445 {
1446         dispatch_source_refs_t dri;
1447         dispatch_kevent_t dk;
1448         struct kevent kev;
1449
1450         dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1451         if (!dk) {
1452                 return;
1453         }
1454
1455         // Update notification registration state.
1456         dk->dk_kevent.data &= ~unreg;
1457         if (!final) {
1458                 // Re-register for notification before delivery
1459                 _dispatch_kevent_resume(dk, flag, 0);
1460         }
1461
1462         EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE, flag, 0, dk);
1463
1464         TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1465                 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), &kev);
1466                 if (final) {
1467                         // this can never happen again
1468                         // this must happen after the merge
1469                         // this may be racy in the future, but we don't provide a 'setter'
1470                         // API for the mask yet
1471                         _dispatch_source_from_refs(dri)->ds_pending_data_mask &= ~unreg;
1472                 }
1473         }
1474
1475         if (final) {
1476                 // no more sources have these flags
1477                 dk->dk_kevent.fflags &= ~unreg;
1478         }
1479 }
1480
1481 static kern_return_t
1482 _dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
1483                 uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
1484                 mach_port_mscount_t notify_sync)
1485 {
1486         mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
1487         typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
1488         kern_return_t kr, krr = 0;
1489
1490         // Update notification registration state.
1491         dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
1492         dk->dk_kevent.data &= ~(del_flags & mask);
1493
1494         _dispatch_debug_machport(port);
1495         if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
1496                 previous = MACH_PORT_NULL;
1497                 krr = mach_port_request_notification(mach_task_self(), port,
1498                                 notify_msgid, notify_sync, _dispatch_event_port,
1499                                 MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
1500                 DISPATCH_VERIFY_MIG(krr);
1501
1502                 switch(krr) {
1503                 case KERN_INVALID_NAME:
1504                 case KERN_INVALID_RIGHT:
1505                         // Supress errors & clear registration state
1506                         dk->dk_kevent.data &= ~mask;
1507                         break;
1508                 default:
1509                         // Else, we dont expect any errors from mach. Log any errors
1510                         if (dispatch_assume_zero(krr)) {
1511                                 // log the error & clear registration state
1512                                 dk->dk_kevent.data &= ~mask;
1513                         } else if (dispatch_assume_zero(previous)) {
1514                                 // Another subsystem has beat libdispatch to requesting the
1515                                 // specified Mach notification on this port. We should
1516                                 // technically cache the previous port and message it when the
1517                                 // kernel messages our port. Or we can just say screw those
1518                                 // subsystems and deallocate the previous port.
1519                                 // They should adopt libdispatch :-P
1520                                 kr = mach_port_deallocate(mach_task_self(), previous);
1521                                 DISPATCH_VERIFY_MIG(kr);
1522                                 (void)dispatch_assume_zero(kr);
1523                                 previous = MACH_PORT_NULL;
1524                         }
1525                 }
1526         } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
1527                 previous = MACH_PORT_NULL;
1528                 kr = mach_port_request_notification(mach_task_self(), port,
1529                                 notify_msgid, notify_sync, MACH_PORT_NULL,
1530                                 MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
1531                 DISPATCH_VERIFY_MIG(kr);
1532
1533                 switch (kr) {
1534                 case KERN_INVALID_NAME:
1535                 case KERN_INVALID_RIGHT:
1536                 case KERN_INVALID_ARGUMENT:
1537                         break;
1538                 default:
1539                         if (dispatch_assume_zero(kr)) {
1540                                 // log the error
1541                         }
1542                 }
1543         } else {
1544                 return 0;
1545         }
1546         if (slowpath(previous)) {
1547                 // the kernel has not consumed the send-once right yet
1548                 (void)dispatch_assume_zero(
1549                                 _dispatch_send_consume_send_once_right(previous));
1550         }
1551         return krr;
1552 }
1553
1554 static void
1555 _dispatch_mach_notify_source2(void *context)
1556 {
1557         dispatch_source_t ds = context;
1558         size_t maxsz = MAX(sizeof(union
1559                 __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem),
1560                 sizeof(union
1561                 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
1562
1563         dispatch_mig_server(ds, maxsz, libdispatch_internal_protocol_server);
1564 }
1565
1566 void
1567 _dispatch_mach_notify_source_init(void *context DISPATCH_UNUSED)
1568 {
1569         _dispatch_get_port_set();
1570
1571         _dispatch_mach_notify_source = dispatch_source_create(
1572                         DISPATCH_SOURCE_TYPE_MACH_RECV, _dispatch_event_port, 0,
1573                         &_dispatch_mgr_q);
1574         dispatch_assert(_dispatch_mach_notify_source);
1575         dispatch_set_context(_dispatch_mach_notify_source,
1576                         _dispatch_mach_notify_source);
1577         dispatch_source_set_event_handler_f(_dispatch_mach_notify_source,
1578                         _dispatch_mach_notify_source2);
1579         dispatch_resume(_dispatch_mach_notify_source);
1580 }
1581
1582 kern_return_t
1583 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
1584                 mach_port_name_t name)
1585 {
1586 #if DISPATCH_DEBUG
1587         _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
1588                         "deleted prematurely", name);
1589 #endif
1590
1591         _dispatch_debug_machport(name);
1592         _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED,
1593                                 _DISPATCH_MACH_SP_FLAGS, true);
1594
1595         return KERN_SUCCESS;
1596 }
1597
1598 kern_return_t
1599 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
1600                 mach_port_name_t name)
1601 {
1602         kern_return_t kr;
1603
1604 #if DISPATCH_DEBUG
1605         _dispatch_log("machport[0x%08x]: dead-name notification: %s",
1606                         name, __func__);
1607 #endif
1608         _dispatch_debug_machport(name);
1609         _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD,
1610                                 _DISPATCH_MACH_SP_FLAGS, true);
1611
1612         // the act of receiving a dead name notification allocates a dead-name
1613         // right that must be deallocated
1614         kr = mach_port_deallocate(mach_task_self(), name);
1615         DISPATCH_VERIFY_MIG(kr);
1616         //(void)dispatch_assume_zero(kr);
1617
1618         return KERN_SUCCESS;
1619 }
1620
1621 kern_return_t
1622 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
1623                 mach_port_name_t name)
1624 {
1625 #if DISPATCH_DEBUG
1626         _dispatch_log("machport[0x%08x]: send-possible notification: %s",
1627                         name, __func__);
1628 #endif
1629         _dispatch_debug_machport(name);
1630         _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE,
1631                                 _DISPATCH_MACH_SP_FLAGS, false);
1632
1633         return KERN_SUCCESS;
1634 }
1635
1636 mach_msg_return_t
1637 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
1638                 dispatch_mig_callback_t callback)
1639 {
1640         mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
1641                 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
1642                 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
1643         mach_msg_options_t tmp_options;
1644         mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
1645         mach_msg_return_t kr = 0;
1646         unsigned int cnt = 1000; // do not stall out serial queues
1647         int demux_success;
1648         bool received = false;
1649         size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
1650
1651         // XXX FIXME -- allocate these elsewhere
1652         bufRequest = alloca(rcv_size);
1653         bufReply = alloca(rcv_size);
1654         bufReply->Head.msgh_size = 0; // make CLANG happy
1655         bufRequest->RetCode = 0;
1656
1657 #if DISPATCH_DEBUG
1658         options |= MACH_RCV_LARGE; // rdar://problem/8422992
1659 #endif
1660         tmp_options = options;
1661         // XXX FIXME -- change this to not starve out the target queue
1662         for (;;) {
1663                 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
1664                         options &= ~MACH_RCV_MSG;
1665                         tmp_options &= ~MACH_RCV_MSG;
1666
1667                         if (!(tmp_options & MACH_SEND_MSG)) {
1668                                 break;
1669                         }
1670                 }
1671                 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
1672                                 (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
1673
1674                 tmp_options = options;
1675
1676                 if (slowpath(kr)) {
1677                         switch (kr) {
1678                         case MACH_SEND_INVALID_DEST:
1679                         case MACH_SEND_TIMED_OUT:
1680                                 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
1681                                         mach_msg_destroy(&bufReply->Head);
1682                                 }
1683                                 break;
1684                         case MACH_RCV_TIMED_OUT:
1685                                 // Don't return an error if a message was sent this time or
1686                                 // a message was successfully received previously
1687                                 // rdar://problems/7363620&7791738
1688                                 if(bufReply->Head.msgh_remote_port || received) {
1689                                         kr = MACH_MSG_SUCCESS;
1690                                 }
1691                                 break;
1692                         case MACH_RCV_INVALID_NAME:
1693                                 break;
1694 #if DISPATCH_DEBUG
1695                         case MACH_RCV_TOO_LARGE:
1696                                 // receive messages that are too large and log their id and size
1697                                 // rdar://problem/8422992
1698                                 tmp_options &= ~MACH_RCV_LARGE;
1699                                 size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
1700                                 void *large_buf = malloc(large_size);
1701                                 if (large_buf) {
1702                                         rcv_size = large_size;
1703                                         bufReply = large_buf;
1704                                 }
1705                                 if (!mach_msg(&bufReply->Head, tmp_options, 0,
1706                                                 (mach_msg_size_t)rcv_size,
1707                                                 (mach_port_t)ds->ds_ident_hack, 0, 0)) {
1708                                         _dispatch_log("BUG in libdispatch client: "
1709                                                         "dispatch_mig_server received message larger than "
1710                                                         "requested size %zd: id = 0x%x, size = %d",
1711                                                         maxmsgsz, bufReply->Head.msgh_id,
1712                                                         bufReply->Head.msgh_size);
1713                                 }
1714                                 if (large_buf) {
1715                                         free(large_buf);
1716                                 }
1717                                 // fall through
1718 #endif
1719                         default:
1720                                 _dispatch_bug_mach_client(
1721                                                 "dispatch_mig_server: mach_msg() failed", kr);
1722                                 break;
1723                         }
1724                         break;
1725                 }
1726
1727                 if (!(tmp_options & MACH_RCV_MSG)) {
1728                         break;
1729                 }
1730                 received = true;
1731
1732                 bufTemp = bufRequest;
1733                 bufRequest = bufReply;
1734                 bufReply = bufTemp;
1735
1736                 demux_success = callback(&bufRequest->Head, &bufReply->Head);
1737
1738                 if (!demux_success) {
1739                         // destroy the request - but not the reply port
1740                         bufRequest->Head.msgh_remote_port = 0;
1741                         mach_msg_destroy(&bufRequest->Head);
1742                 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
1743                         // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
1744                         // is present
1745                         if (slowpath(bufReply->RetCode)) {
1746                                 if (bufReply->RetCode == MIG_NO_REPLY) {
1747                                         continue;
1748                                 }
1749
1750                                 // destroy the request - but not the reply port
1751                                 bufRequest->Head.msgh_remote_port = 0;
1752                                 mach_msg_destroy(&bufRequest->Head);
1753                         }
1754                 }
1755
1756                 if (bufReply->Head.msgh_remote_port) {
1757                         tmp_options |= MACH_SEND_MSG;
1758                         if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
1759                                         MACH_MSG_TYPE_MOVE_SEND_ONCE) {
1760                                 tmp_options |= MACH_SEND_TIMEOUT;
1761                         }
1762                 }
1763         }
1764
1765         return kr;
1766 }
1767
1768 #endif /* HAVE_MACH */
1769
1770 #pragma mark -
1771 #pragma mark dispatch_source_debug
1772
1773 DISPATCH_NOINLINE
1774 static const char *
1775 _evfiltstr(short filt)
1776 {
1777         switch (filt) {
1778 #define _evfilt2(f) case (f): return #f
1779         _evfilt2(EVFILT_READ);
1780         _evfilt2(EVFILT_WRITE);
1781         _evfilt2(EVFILT_AIO);
1782         _evfilt2(EVFILT_VNODE);
1783         _evfilt2(EVFILT_PROC);
1784         _evfilt2(EVFILT_SIGNAL);
1785         _evfilt2(EVFILT_TIMER);
1786 #ifdef EVFILT_VM
1787         _evfilt2(EVFILT_VM);
1788 #endif
1789 #if HAVE_MACH
1790         _evfilt2(EVFILT_MACHPORT);
1791 #endif
1792         _evfilt2(EVFILT_FS);
1793         _evfilt2(EVFILT_USER);
1794
1795         _evfilt2(DISPATCH_EVFILT_TIMER);
1796         _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
1797         _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
1798         default:
1799                 return "EVFILT_missing";
1800         }
1801 }
1802
1803 static size_t
1804 _dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
1805 {
1806         dispatch_queue_t target = ds->do_targetq;
1807         return snprintf(buf, bufsiz, "target = %s[%p], pending_data = 0x%lx, "
1808                         "pending_data_mask = 0x%lx, ",
1809                         target ? target->dq_label : "", target,
1810                         ds->ds_pending_data, ds->ds_pending_data_mask);
1811 }
1812
1813 static size_t
1814 _dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
1815 {
1816         dispatch_source_refs_t dr = ds->ds_refs;
1817         return snprintf(buf, bufsiz, "timer = { target = 0x%llx, "
1818                         "last_fire = 0x%llx, interval = 0x%llx, flags = 0x%llx }, ",
1819                         ds_timer(dr).target, ds_timer(dr).last_fire, ds_timer(dr).interval,
1820                         ds_timer(dr).flags);
1821 }
1822
1823 size_t
1824 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
1825 {
1826         size_t offset = 0;
1827         offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
1828                         dx_kind(ds), ds);
1829         offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
1830         offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
1831         if (ds->ds_is_timer) {
1832                 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
1833         }
1834         offset += snprintf(&buf[offset], bufsiz - offset, "filter = %s }",
1835                         ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
1836         return offset;
1837 }
1838
1839 #if DISPATCH_DEBUG
1840 void
1841 dispatch_debug_kevents(struct kevent* kev, size_t count, const char* str)
1842 {
1843         size_t i;
1844         for (i = 0; i < count; ++i) {
1845                 _dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, "
1846                                 "fflags = 0x%x, data = %p, udata = %p }: %s",
1847                                 i, (void*)kev[i].ident, _evfiltstr(kev[i].filter), kev[i].flags,
1848                                 kev[i].fflags, (void*)kev[i].data, (void*)kev[i].udata, str);
1849         }
1850 }
1851
1852 static void
1853 _dispatch_kevent_debugger2(void *context)
1854 {
1855         struct sockaddr sa;
1856         socklen_t sa_len = sizeof(sa);
1857         int c, fd = (int)(long)context;
1858         unsigned int i;
1859         dispatch_kevent_t dk;
1860         dispatch_source_t ds;
1861         dispatch_source_refs_t dr;
1862         FILE *debug_stream;
1863
1864         c = accept(fd, &sa, &sa_len);
1865         if (c == -1) {
1866                 if (errno != EAGAIN) {
1867                         (void)dispatch_assume_zero(errno);
1868                 }
1869                 return;
1870         }
1871 #if 0
1872         int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
1873         if (r == -1) {
1874                 (void)dispatch_assume_zero(errno);
1875         }
1876 #endif
1877         debug_stream = fdopen(c, "a");
1878         if (!dispatch_assume(debug_stream)) {
1879                 close(c);
1880                 return;
1881         }
1882
1883         fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
1884         fprintf(debug_stream, "Content-type: text/html\r\n");
1885         fprintf(debug_stream, "Pragma: nocache\r\n");
1886         fprintf(debug_stream, "\r\n");
1887         fprintf(debug_stream, "<html>\n");
1888         fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid());
1889         fprintf(debug_stream, "<body>\n<ul>\n");
1890
1891         //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
1892         //              "<td>DK</td><td>DK</td><td>DK</td></tr>\n");
1893
1894         for (i = 0; i < DSL_HASH_SIZE; i++) {
1895                 if (TAILQ_EMPTY(&_dispatch_sources[i])) {
1896                         continue;
1897                 }
1898                 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
1899                         fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags "
1900                                         "0x%hx fflags 0x%x data 0x%lx udata %p\n",
1901                                         dk, (unsigned long)dk->dk_kevent.ident,
1902                                         _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
1903                                         dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data,
1904                                         dk->dk_kevent.udata);
1905                         fprintf(debug_stream, "\t\t<ul>\n");
1906                         TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) {
1907                                 ds = _dispatch_source_from_refs(dr);
1908                                 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend "
1909                                                 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
1910                                                 ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt,
1911                                                 ds->ds_pending_data, ds->ds_pending_data_mask,
1912                                                 ds->ds_atomic_flags);
1913                                 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
1914                                         dispatch_queue_t dq = ds->do_targetq;
1915                                         fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend "
1916                                                         "0x%x label: %s\n", dq, dq->do_ref_cnt + 1,
1917                                                         dq->do_suspend_cnt, dq->dq_label);
1918                                 }
1919                         }
1920                         fprintf(debug_stream, "\t\t</ul>\n");
1921                         fprintf(debug_stream, "\t</li>\n");
1922                 }
1923         }
1924         fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
1925         fflush(debug_stream);
1926         fclose(debug_stream);
1927 }
1928
1929 static void
1930 _dispatch_kevent_debugger2_cancel(void *context)
1931 {
1932         int ret, fd = (int)(long)context;
1933
1934         ret = close(fd);
1935         if (ret != -1) {
1936                 (void)dispatch_assume_zero(errno);
1937         }
1938 }
1939
1940 static void
1941 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED)
1942 {
1943         union {
1944                 struct sockaddr_in sa_in;
1945                 struct sockaddr sa;
1946         } sa_u = {
1947                 .sa_in = {
1948                         .sin_family = AF_INET,
1949                         .sin_addr = { htonl(INADDR_LOOPBACK), },
1950                 },
1951         };
1952         dispatch_source_t ds;
1953         const char *valstr;
1954         int val, r, fd, sock_opt = 1;
1955         socklen_t slen = sizeof(sa_u);
1956
1957         if (issetugid()) {
1958                 return;
1959         }
1960         valstr = getenv("LIBDISPATCH_DEBUGGER");
1961         if (!valstr) {
1962                 return;
1963         }
1964         val = atoi(valstr);
1965         if (val == 2) {
1966                 sa_u.sa_in.sin_addr.s_addr = 0;
1967         }
1968         fd = socket(PF_INET, SOCK_STREAM, 0);
1969         if (fd == -1) {
1970                 (void)dispatch_assume_zero(errno);
1971                 return;
1972         }
1973         r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt,
1974                         (socklen_t) sizeof sock_opt);
1975         if (r == -1) {
1976                 (void)dispatch_assume_zero(errno);
1977                 goto out_bad;
1978         }
1979 #if 0
1980         r = fcntl(fd, F_SETFL, O_NONBLOCK);
1981         if (r == -1) {
1982                 (void)dispatch_assume_zero(errno);
1983                 goto out_bad;
1984         }
1985 #endif
1986         r = bind(fd, &sa_u.sa, sizeof(sa_u));
1987         if (r == -1) {
1988                 (void)dispatch_assume_zero(errno);
1989                 goto out_bad;
1990         }
1991         r = listen(fd, SOMAXCONN);
1992         if (r == -1) {
1993                 (void)dispatch_assume_zero(errno);
1994                 goto out_bad;
1995         }
1996         r = getsockname(fd, &sa_u.sa, &slen);
1997         if (r == -1) {
1998                 (void)dispatch_assume_zero(errno);
1999                 goto out_bad;
2000         }
2001
2002         ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0,
2003                         &_dispatch_mgr_q);
2004         if (dispatch_assume(ds)) {
2005                 _dispatch_log("LIBDISPATCH: debug port: %hu",
2006                                 (in_port_t)ntohs(sa_u.sa_in.sin_port));
2007
2008                 /* ownership of fd transfers to ds */
2009                 dispatch_set_context(ds, (void *)(long)fd);
2010                 dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2);
2011                 dispatch_source_set_cancel_handler_f(ds,
2012                                 _dispatch_kevent_debugger2_cancel);
2013                 dispatch_resume(ds);
2014
2015                 return;
2016         }
2017 out_bad:
2018         close(fd);
2019 }
2020
2021 #if HAVE_MACH
2022
2023 #ifndef MACH_PORT_TYPE_SPREQUEST
2024 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
2025 #endif
2026
2027 void
2028 dispatch_debug_machport(mach_port_t name, const char* str)
2029 {
2030         mach_port_type_t type;
2031         mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
2032         unsigned int dnreqs = 0, dnrsiz;
2033         kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
2034         if (kr) {
2035                 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
2036                                 kr, mach_error_string(kr), str);
2037                 return;
2038         }
2039         if (type & MACH_PORT_TYPE_SEND) {
2040                 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2041                                 MACH_PORT_RIGHT_SEND, &ns));
2042         }
2043         if (type & MACH_PORT_TYPE_SEND_ONCE) {
2044                 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2045                                 MACH_PORT_RIGHT_SEND_ONCE, &nso));
2046         }
2047         if (type & MACH_PORT_TYPE_DEAD_NAME) {
2048                 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2049                                 MACH_PORT_RIGHT_DEAD_NAME, &nd));
2050         }
2051         if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND|
2052                         MACH_PORT_TYPE_SEND_ONCE)) {
2053                 (void)dispatch_assume_zero(mach_port_dnrequest_info(mach_task_self(),
2054                                 name, &dnrsiz, &dnreqs));
2055                 }
2056         if (type & MACH_PORT_TYPE_RECEIVE) {
2057                 mach_port_status_t status = { .mps_pset = 0, };
2058                 mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
2059                 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2060                                 MACH_PORT_RIGHT_RECEIVE, &nr));
2061                 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
2062                                 name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
2063                 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
2064                                 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
2065                                 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
2066                                 "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
2067                                 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
2068                                 status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
2069                                 status.mps_srights ? "Y":"N", status.mps_sorights,
2070                                 status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
2071                                 status.mps_seqno, str);
2072         } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
2073                         MACH_PORT_TYPE_DEAD_NAME)) {
2074                 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
2075                                 "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
2076                                 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
2077         } else {
2078                 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
2079                                 str);
2080         }
2081 }
2082
2083 #endif // HAVE_MACH
2084
2085 #endif // DISPATCH_DEBUG