2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
22 // We'd use __attribute__((aligned(x))), but it does not atually increase the
23 // alignment of stack variables. All we really need is the stack usage of the
24 // local thread to be sufficiently away to avoid cache-line contention with the
25 // busy 'da_index' variable.
27 // NOTE: 'char' arrays cause GCC to insert buffer overflow detection logic
28 struct dispatch_apply_s {
29 long _da_pad0[DISPATCH_CACHELINE_SIZE / sizeof(long)];
30 void (*da_func)(void *, size_t);
35 dispatch_semaphore_t da_sema;
36 long _da_pad1[DISPATCH_CACHELINE_SIZE / sizeof(long)];
40 _dispatch_apply2(void *_ctxt)
42 struct dispatch_apply_s *da = _ctxt;
43 size_t const iter = da->da_iterations;
44 typeof(da->da_func) const func = da->da_func;
45 void *const ctxt = da->da_ctxt;
48 _dispatch_workitem_dec(); // this unit executes many items
50 // Striding is the responsibility of the caller.
51 while (fastpath((idx = dispatch_atomic_inc(&da->da_index) - 1) < iter)) {
53 _dispatch_workitem_inc();
56 if (dispatch_atomic_dec(&da->da_thr_cnt) == 0) {
57 dispatch_semaphore_signal(da->da_sema);
62 _dispatch_apply_serial(void *context)
64 struct dispatch_apply_s *da = context;
67 _dispatch_workitem_dec(); // this unit executes many items
69 da->da_func(da->da_ctxt, idx);
70 _dispatch_workitem_inc();
71 } while (++idx < da->da_iterations);
76 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
78 struct Block_basic *bb = (void *)work;
80 dispatch_apply_f(iterations, dq, bb, (void *)bb->Block_invoke);
84 // 256 threads should be good enough for the short to mid term
85 #define DISPATCH_APPLY_MAX_CPUS 256
89 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
91 struct dispatch_apply_dc_s {
92 DISPATCH_CONTINUATION_HEADER(dispatch_apply_dc_s);
93 } da_dc[DISPATCH_APPLY_MAX_CPUS];
94 struct dispatch_apply_s da;
99 da.da_iterations = iterations;
101 da.da_thr_cnt = _dispatch_hw_config.cc_max_active;
103 if (da.da_thr_cnt > DISPATCH_APPLY_MAX_CPUS) {
104 da.da_thr_cnt = DISPATCH_APPLY_MAX_CPUS;
106 if (slowpath(iterations == 0)) {
109 if (iterations < da.da_thr_cnt) {
110 da.da_thr_cnt = (uint32_t)iterations;
112 if (slowpath(dq->dq_width <= 2 || da.da_thr_cnt <= 1)) {
113 return dispatch_sync_f(dq, &da, _dispatch_apply_serial);
116 for (i = 0; i < da.da_thr_cnt; i++) {
117 da_dc[i].do_vtable = NULL;
118 da_dc[i].do_next = &da_dc[i + 1];
119 da_dc[i].dc_func = _dispatch_apply2;
120 da_dc[i].dc_ctxt = &da;
123 da.da_sema = _dispatch_get_thread_semaphore();
125 // some queues are easy to borrow and some are not
126 if (slowpath(dq->do_targetq)) {
127 _dispatch_queue_push_list(dq, (void *)&da_dc[0], (void *)&da_dc[da.da_thr_cnt - 1]);
129 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
130 // root queues are always concurrent and safe to borrow
131 _dispatch_queue_push_list(dq, (void *)&da_dc[1], (void *)&da_dc[da.da_thr_cnt - 1]);
132 _dispatch_thread_setspecific(dispatch_queue_key, dq);
133 // The first da_dc[] element was explicitly not pushed on to the queue.
134 // We need to either call it like so:
135 // da_dc[0].dc_func(da_dc[0].dc_ctxt);
136 // Or, given that we know the 'func' and 'ctxt', we can call it directly:
137 _dispatch_apply2(&da);
138 _dispatch_workitem_inc();
139 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
141 dispatch_semaphore_wait(da.da_sema, DISPATCH_TIME_FOREVER);
142 _dispatch_put_thread_semaphore(da.da_sema);
148 dispatch_stride(size_t offset, size_t stride, size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
150 struct Block_basic *bb = (void *)work;
151 dispatch_stride_f(offset, stride, iterations, dq, bb, (void *)bb->Block_invoke);
157 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
158 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
163 dispatch_apply(iterations / stride, queue, ^(size_t idx) {
164 size_t i = idx * stride + offset;
165 size_t stop = i + stride;
171 dispatch_sync(queue, ^{
173 for (i = iterations - (iterations % stride); i < iterations; i++) {
174 func(ctxt, i + offset);