Tizen 2.1 base
[platform/upstream/gcd.git] / dispatch-1.0 / src / apply.c
1 /*
2  * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3  *
4  * @APPLE_APACHE_LICENSE_HEADER_START@
5  * 
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  * 
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  * 
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * 
18  * @APPLE_APACHE_LICENSE_HEADER_END@
19  */
20 #include "internal.h"
21
22 // We'd use __attribute__((aligned(x))), but it does not atually increase the
23 // alignment of stack variables. All we really need is the stack usage of the
24 // local thread to be sufficiently away to avoid cache-line contention with the
25 // busy 'da_index' variable.
26 //
27 // NOTE: 'char' arrays cause GCC to insert buffer overflow detection logic 
28 struct dispatch_apply_s {
29         long    _da_pad0[DISPATCH_CACHELINE_SIZE / sizeof(long)];
30         void    (*da_func)(void *, size_t);
31         void    *da_ctxt;
32         size_t  da_iterations;
33         size_t  da_index;
34         uint32_t        da_thr_cnt;
35         dispatch_semaphore_t da_sema;
36         long    _da_pad1[DISPATCH_CACHELINE_SIZE / sizeof(long)];
37 };
38
39 static void
40 _dispatch_apply2(void *_ctxt)
41 {
42         struct dispatch_apply_s *da = _ctxt;
43         size_t const iter = da->da_iterations;
44         typeof(da->da_func) const func = da->da_func;
45         void *const ctxt = da->da_ctxt;
46         size_t idx;
47
48         _dispatch_workitem_dec(); // this unit executes many items
49
50         // Striding is the responsibility of the caller.
51         while (fastpath((idx = dispatch_atomic_inc(&da->da_index) - 1) < iter)) {
52                 func(ctxt, idx);
53                 _dispatch_workitem_inc();
54         }
55
56         if (dispatch_atomic_dec(&da->da_thr_cnt) == 0) {
57                 dispatch_semaphore_signal(da->da_sema);
58         }
59 }
60
61 static void
62 _dispatch_apply_serial(void *context)
63 {
64         struct dispatch_apply_s *da = context;
65         size_t idx = 0;
66
67         _dispatch_workitem_dec(); // this unit executes many items
68         do {
69                 da->da_func(da->da_ctxt, idx);
70                 _dispatch_workitem_inc();
71         } while (++idx < da->da_iterations);
72 }
73
74 #ifdef __BLOCKS__
75 void
76 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
77 {
78         struct Block_basic *bb = (void *)work;
79
80         dispatch_apply_f(iterations, dq, bb, (void *)bb->Block_invoke);
81 }
82 #endif
83
84 // 256 threads should be good enough for the short to mid term
85 #define DISPATCH_APPLY_MAX_CPUS 256
86
87 DISPATCH_NOINLINE
88 void
89 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
90 {
91         struct dispatch_apply_dc_s {
92                 DISPATCH_CONTINUATION_HEADER(dispatch_apply_dc_s);
93         } da_dc[DISPATCH_APPLY_MAX_CPUS];
94         struct dispatch_apply_s da;
95         size_t i;
96
97         da.da_func = func;
98         da.da_ctxt = ctxt;
99         da.da_iterations = iterations;
100         da.da_index = 0;
101         da.da_thr_cnt = _dispatch_hw_config.cc_max_active;
102
103         if (da.da_thr_cnt > DISPATCH_APPLY_MAX_CPUS) {
104                 da.da_thr_cnt = DISPATCH_APPLY_MAX_CPUS;
105         }
106         if (slowpath(iterations == 0)) {
107                 return;
108         }
109         if (iterations < da.da_thr_cnt) {
110                 da.da_thr_cnt = (uint32_t)iterations;
111         }
112         if (slowpath(dq->dq_width <= 2 || da.da_thr_cnt <= 1)) {
113                 return dispatch_sync_f(dq, &da, _dispatch_apply_serial);
114         }
115
116         for (i = 0; i < da.da_thr_cnt; i++) {
117                 da_dc[i].do_vtable = NULL;
118                 da_dc[i].do_next = &da_dc[i + 1];
119                 da_dc[i].dc_func = _dispatch_apply2;
120                 da_dc[i].dc_ctxt = &da;
121         }
122
123         da.da_sema = _dispatch_get_thread_semaphore();
124
125         // some queues are easy to borrow and some are not
126         if (slowpath(dq->do_targetq)) {
127                 _dispatch_queue_push_list(dq, (void *)&da_dc[0], (void *)&da_dc[da.da_thr_cnt - 1]);
128         } else {
129                 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
130                 // root queues are always concurrent and safe to borrow
131                 _dispatch_queue_push_list(dq, (void *)&da_dc[1], (void *)&da_dc[da.da_thr_cnt - 1]);
132                 _dispatch_thread_setspecific(dispatch_queue_key, dq);
133                 // The first da_dc[] element was explicitly not pushed on to the queue.
134                 // We need to either call it like so:
135                 //     da_dc[0].dc_func(da_dc[0].dc_ctxt);
136                 // Or, given that we know the 'func' and 'ctxt', we can call it directly:
137                 _dispatch_apply2(&da);
138                 _dispatch_workitem_inc();
139                 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
140         }
141         dispatch_semaphore_wait(da.da_sema, DISPATCH_TIME_FOREVER);
142         _dispatch_put_thread_semaphore(da.da_sema);
143 }
144
145 #if 0
146 #ifdef __BLOCKS__
147 void
148 dispatch_stride(size_t offset, size_t stride, size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
149 {
150         struct Block_basic *bb = (void *)work;
151         dispatch_stride_f(offset, stride, iterations, dq, bb, (void *)bb->Block_invoke);
152 }
153 #endif
154
155 DISPATCH_NOINLINE
156 void
157 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
158                 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
159 {
160         if (stride == 0) {
161                 stride = 1;
162         }
163         dispatch_apply(iterations / stride, queue, ^(size_t idx) {
164                 size_t i = idx * stride + offset;
165                 size_t stop = i + stride;
166                 do {
167                         func(ctxt, i++);
168                 } while (i < stop);
169         });
170
171         dispatch_sync(queue, ^{
172                 size_t i;
173                 for (i = iterations - (iterations % stride); i < iterations; i++) {
174                         func(ctxt, i + offset);
175                 }
176         });
177 }
178 #endif