4 This file is part of PulseAudio.
6 Copyright 2006 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as
10 published by the Free Software Foundation; either version 2.1 of the
11 License, or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public
19 License along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulse/xmalloc.h>
40 #define ASYNCQ_SIZE 128
42 /* For debugging purposes we can define _Y to put and extra thread
43 * yield between each operation. */
46 #define _Y pa_thread_yield()
48 #define _Y do { } while(0)
55 pa_atomic_int_t read_waiting;
56 pa_atomic_int_t write_waiting;
57 int read_fds[2], write_fds[2];
60 #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
62 static int is_power_of_two(unsigned size) {
63 return !(size & (size - 1));
66 static int reduce(pa_asyncq *l, int value) {
67 return value & (unsigned) (l->size - 1);
70 pa_asyncq *pa_asyncq_new(unsigned size) {
76 pa_assert(is_power_of_two(size));
78 l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
81 pa_atomic_store(&l->read_waiting, 0);
82 pa_atomic_store(&l->write_waiting, 0);
84 if (pipe(l->read_fds) < 0) {
89 if (pipe(l->write_fds) < 0) {
90 pa_close(l->read_fds[0]);
91 pa_close(l->read_fds[1]);
96 pa_make_nonblock_fd(l->read_fds[1]);
97 pa_make_nonblock_fd(l->write_fds[1]);
102 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
108 while ((p = pa_asyncq_pop(l, 0)))
112 pa_close(l->read_fds[0]);
113 pa_close(l->read_fds[1]);
114 pa_close(l->write_fds[0]);
115 pa_close(l->write_fds[1]);
120 int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
122 pa_atomic_ptr_t *cells;
127 cells = PA_ASYNCQ_CELLS(l);
130 idx = reduce(l, l->write_idx);
132 if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
134 /* First try failed. Let's wait for changes. */
141 pa_atomic_inc(&l->write_waiting);
148 if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p))
153 if (read(l->write_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
154 pa_atomic_dec(&l->write_waiting);
161 pa_atomic_dec(&l->write_waiting);
167 if (pa_atomic_load(&l->read_waiting)) {
170 write(l->read_fds[1], &x, sizeof(x));
176 void* pa_asyncq_pop(pa_asyncq*l, int wait) {
179 pa_atomic_ptr_t *cells;
183 cells = PA_ASYNCQ_CELLS(l);
186 idx = reduce(l, l->read_idx);
188 if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
190 /* First try failed. Let's wait for changes. */
197 pa_atomic_inc(&l->read_waiting);
204 if ((ret = pa_atomic_ptr_load(&cells[idx])))
209 if (read(l->read_fds[0], x, sizeof(x)) < 0 && errno != EINTR) {
210 pa_atomic_dec(&l->read_waiting);
217 pa_atomic_dec(&l->read_waiting);
220 /* Guaranteed if we only have a single reader */
221 pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
226 if (pa_atomic_load(&l->write_waiting)) {
229 write(l->write_fds[1], &x, sizeof(x));
235 int pa_asyncq_get_fd(pa_asyncq *q) {
238 return q->read_fds[0];
241 int pa_asyncq_before_poll(pa_asyncq *l) {
243 pa_atomic_ptr_t *cells;
247 cells = PA_ASYNCQ_CELLS(l);
250 idx = reduce(l, l->read_idx);
252 if (pa_atomic_ptr_load(&cells[idx]))
255 pa_atomic_inc(&l->read_waiting);
257 if (pa_atomic_ptr_load(&cells[idx])) {
258 pa_atomic_dec(&l->read_waiting);
265 int pa_asyncq_after_poll(pa_asyncq *l) {
268 pa_assert(pa_atomic_load(&l->read_waiting) > 0);
270 pa_atomic_dec(&l->read_waiting);