merge glitch-free branch back into trunk
[profile/ivi/pulseaudio.git] / src / pulsecore / shmasyncq.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2006 Lennart Poettering
7
8   PulseAudio is free software; you can redistribute it and/or modify
9   it under the terms of the GNU Lesser General Public License as
10   published by the Free Software Foundation; either version 2.1 of the
11   License, or (at your option) any later version.
12
13   PulseAudio is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public
19   License along with PulseAudio; if not, write to the Free Software
20   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21   USA.
22 ***/
23
24 #ifdef HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <unistd.h>
29 #include <errno.h>
30
31 #include <pulsecore/atomic.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/core-util.h>
36 #include <pulse/xmalloc.h>
37
38 #include "fdsem.h"
39
40 /* For debugging purposes we can define _Y to put and extra thread
41  * yield between each operation. */
42
43 /* #define PROFILE */
44
45 #ifdef PROFILE
46 #define _Y pa_thread_yield()
47 #else
48 #define _Y do { } while(0)
49 #endif
50
51
52 struct pa_shmasyncq {
53     pa_fdsem *read_fdsem, *write_fdsem;
54     pa_shmasyncq_data *data;
55 };
56
57 static int is_power_of_two(unsigned size) {
58     return !(size & (size - 1));
59 }
60
61 static int reduce(pa_shmasyncq *l, int value) {
62     return value & (unsigned) (l->n_elements - 1);
63 }
64
65 static pa_atomic_t* get_cell(pa_shmasyncq *l, unsigned i) {
66     pa_assert(i < l->data->n_elements);
67
68     return (pa_atomic_t*) ((uint8*t) l->data + PA_ALIGN(sizeof(pa_shmasyncq_data)) + i * (PA_ALIGN(sizeof(pa_atomic_t)) + PA_ALIGN(element_size)))
69 }
70
71 static void *get_cell_data(pa_atomic_t *a) {
72     return (uint8_t*) a + PA_ALIGN(sizeof(atomic_t));
73 }
74
75 pa_shmasyncq *pa_shmasyncq_new(unsigned n_elements, size_t element_size, void *data, int fd[2]) {
76     pa_shmasyncq *l;
77
78     pa_assert(n_elements > 0);
79     pa_assert(is_power_of_two(n_elements));
80     pa_assert(element_size > 0);
81     pa_assert(data);
82     pa_assert(fd);
83
84     l = pa_xnew(pa_shmasyncq, 1);
85
86     l->data = data;
87     memset(data, 0, PA_SHMASYNCQ_SIZE(n_elements, element_size));
88
89     l->data->n_elements = n_elements;
90     l->data->element_size = element_size;
91
92     if (!(l->read_fdsem = pa_fdsem_new_shm(&d->read_fdsem_data, &fd[0]))) {
93         pa_xfree(l);
94         return NULL;
95     }
96
97     if (!(l->write_fdsem = pa_fdsem_new(&d->write_fdsem_data, &fd[1]))) {
98         pa_fdsem_free(l->read_fdsem);
99         pa_xfree(l);
100         return NULL;
101     }
102
103     return l;
104 }
105
106 void pa_shmasyncq_free(pa_shmasyncq *l, pa_free_cb_t free_cb) {
107     pa_assert(l);
108
109     if (free_cb) {
110         void *p;
111
112         while ((p = pa_shmasyncq_pop(l, 0)))
113             free_cb(p);
114     }
115
116     pa_fdsem_free(l->read_fdsem);
117     pa_fdsem_free(l->write_fdsem);
118     pa_xfree(l);
119 }
120
121 int pa_shmasyncq_push(pa_shmasyncq*l, void *p, int wait) {
122     int idx;
123     pa_atomic_ptr_t *cells;
124
125     pa_assert(l);
126     pa_assert(p);
127
128     cells = PA_SHMASYNCQ_CELLS(l);
129
130     _Y;
131     idx = reduce(l, l->write_idx);
132
133     if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) {
134
135         if (!wait)
136             return -1;
137
138 /*         pa_log("sleeping on push"); */
139
140         do {
141             pa_fdsem_wait(l->read_fdsem);
142         } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p));
143     }
144
145     _Y;
146     l->write_idx++;
147
148     pa_fdsem_post(l->write_fdsem);
149
150     return 0;
151 }
152
153 void* pa_shmasyncq_pop(pa_shmasyncq*l, int wait) {
154     int idx;
155     void *ret;
156     pa_atomic_ptr_t *cells;
157
158     pa_assert(l);
159
160     cells = PA_SHMASYNCQ_CELLS(l);
161
162     _Y;
163     idx = reduce(l, l->read_idx);
164
165     if (!(ret = pa_atomic_ptr_load(&cells[idx]))) {
166
167         if (!wait)
168             return NULL;
169
170 /*         pa_log("sleeping on pop"); */
171
172         do {
173             pa_fdsem_wait(l->write_fdsem);
174         } while (!(ret = pa_atomic_ptr_load(&cells[idx])));
175     }
176
177     pa_assert(ret);
178
179     /* Guaranteed to succeed if we only have a single reader */
180     pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL));
181
182     _Y;
183     l->read_idx++;
184
185     pa_fdsem_post(l->read_fdsem);
186
187     return ret;
188 }
189
190 int pa_shmasyncq_get_fd(pa_shmasyncq *q) {
191     pa_assert(q);
192
193     return pa_fdsem_get(q->write_fdsem);
194 }
195
196 int pa_shmasyncq_before_poll(pa_shmasyncq *l) {
197     int idx;
198     pa_atomic_ptr_t *cells;
199
200     pa_assert(l);
201
202     cells = PA_SHMASYNCQ_CELLS(l);
203
204     _Y;
205     idx = reduce(l, l->read_idx);
206
207     for (;;) {
208         if (pa_atomic_ptr_load(&cells[idx]))
209             return -1;
210
211         if (pa_fdsem_before_poll(l->write_fdsem) >= 0)
212             return 0;
213     }
214
215     return 0;
216 }
217
218 void pa_shmasyncq_after_poll(pa_shmasyncq *l) {
219     pa_assert(l);
220
221     pa_fdsem_after_poll(l->write_fdsem);
222 }