Merge HUGE set of changes temporarily into a branch, to allow me to move them from...
[profile/ivi/pulseaudio-panda.git] / src / pulsecore / asyncq.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2006 Lennart Poettering
7
8   PulseAudio is free software; you can redistribute it and/or modify
9   it under the terms of the GNU Lesser General Public License as
10   published by the Free Software Foundation; either version 2.1 of the
11   License, or (at your option) any later version.
12
13   PulseAudio is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public
19   License along with PulseAudio; if not, write to the Free Software
20   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21   USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulse/xmalloc.h>
37
38 #include "asyncq.h"
39
40 #define ASYNCQ_SIZE 128
41
42 /* For debugging purposes we can define _Y to put and extra thread
43  * yield between each operation. */
44
45 #ifdef PROFILE
46 #define _Y pa_thread_yield()
47 #else
48 #define _Y do { } while(0)
49 #endif
50
51 struct pa_asyncq {
52     unsigned size;
53     unsigned read_idx;
54     unsigned write_idx;
55     pa_atomic_int_t read_waiting;
56     pa_atomic_int_t write_waiting;
57     int read_fds[2], write_fds[2];
58 };
59
60 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
61
62 static int is_power_of_two(unsigned size) {
63     return !(size & (size - 1));
64 }
65
66 static int reduce(pa_asyncq *l, int value) {
67     return value & (unsigned) (l->size - 1);
68 }
69
70 pa_asyncq *pa_asyncq_new(unsigned size) {
71     pa_asyncq *l;
72
73     if (!size)
74         size = ASYNCQ_SIZE;
75
76     pa_assert(is_power_of_two(size));
77
78     l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
79
80     l->size = size;
81     pa_atomic_store(&l->read_waiting, 0);
82     pa_atomic_store(&l->write_waiting, 0);
83
84     if (pipe(l->read_fds) < 0) {
85         pa_xfree(l);
86         return NULL;
87     }
88     
89     if (pipe(l->write_fds) < 0) {
90         pa_close(l->read_fds[0]);
91         pa_close(l->read_fds[1]);
92         pa_xfree(l);
93         return NULL;
94     }
95
96     pa_make_nonblock_fd(l->read_fds[1]);
97     pa_make_nonblock_fd(l->write_fds[1]);
98
99     return l;
100 }
101
102 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
103     pa_assert(l);
104
105     if (free_cb) {
106         void *p;
107         
108         while ((p = pa_asyncq_pop(l, 0)))
109             free_cb(p);
110     }
111
112     pa_close(l->read_fds[0]);
113     pa_close(l->read_fds[1]);
114     pa_close(l->write_fds[0]);
115     pa_close(l->write_fds[1]);
116     
117     pa_xfree(l);
118 }
119
120 int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
121     int idx;
122     pa_atomic_ptr_t *cells;
123
124     pa_assert(l);
125     pa_assert(p);
126
127     cells = PA_ASYNCQ_CELLS(l);
128     
129     _Y;
130     idx = reduce(l, l->write_idx);
131
132     if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
133         
134         /* First try failed. Let's wait for changes. */
135
136         if (!wait)
137             return -1;
138
139         _Y;
140
141         pa_atomic_inc(&l->write_waiting);
142
143         for (;;) {
144             char x[20];
145             
146             _Y;
147
148             if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p))
149                 break;
150
151             _Y;
152
153             if (read(l->write_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
154                 pa_atomic_dec(&l->write_waiting);
155                 return -1;
156             }
157         }
158         
159         _Y;
160
161         pa_atomic_dec(&l->write_waiting);
162     }
163     
164     _Y;
165     l->write_idx++;
166     
167     if (pa_atomic_load(&l->read_waiting)) {
168         char x = 'x';
169         _Y;
170         write(l->read_fds[1], &x, sizeof(x));
171     }
172     
173     return 0;
174 }
175
176 void* pa_asyncq_pop(pa_asyncq*l, int wait) {
177     int idx;
178     void *ret;
179     pa_atomic_ptr_t *cells;
180
181     pa_assert(l);
182
183     cells = PA_ASYNCQ_CELLS(l);
184
185     _Y;
186     idx = reduce(l, l->read_idx);
187
188     if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
189
190         /* First try failed. Let's wait for changes. */
191     
192         if (!wait)
193             return NULL;
194
195         _Y;
196
197         pa_atomic_inc(&l->read_waiting);
198
199         for (;;) {
200             char x[20];
201
202             _Y;
203
204             if ((ret = pa_atomic_ptr_load(&cells[idx])))
205                 break;
206
207             _Y;
208
209             if (read(l->read_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
210                 pa_atomic_dec(&l->read_waiting);
211                 return NULL;
212             }
213         }
214
215         _Y;
216
217         pa_atomic_dec(&l->read_waiting);
218     }
219
220     /* Guaranteed if we only have a single reader */
221     pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
222
223     _Y;
224     l->read_idx++;
225
226     if (pa_atomic_load(&l->write_waiting)) {
227         char x = 'x';
228         _Y;
229         write(l->write_fds[1], &x, sizeof(x));
230     }
231             
232     return ret;
233 }
234
235 int pa_asyncq_get_fd(pa_asyncq *q) {
236     pa_assert(q);
237
238     return q->read_fds[0];
239 }
240
241 int pa_asyncq_before_poll(pa_asyncq *l) {
242     int idx;
243     pa_atomic_ptr_t *cells;
244
245     pa_assert(l);
246
247     cells = PA_ASYNCQ_CELLS(l);
248
249     _Y;
250     idx = reduce(l, l->read_idx);
251
252     if (pa_atomic_ptr_load(&cells[idx]))
253         return -1;
254
255     pa_atomic_inc(&l->read_waiting);
256     
257     if (pa_atomic_ptr_load(&cells[idx])) {
258         pa_atomic_dec(&l->read_waiting);
259         return -1;
260     }
261
262     return 0;
263 }
264
265 int pa_asyncq_after_poll(pa_asyncq *l) {
266     pa_assert(l);
267
268     pa_assert(pa_atomic_load(&l->read_waiting) > 0);
269
270     pa_atomic_dec(&l->read_waiting);
271 }