2 This file is part of PulseAudio.
4 Copyright 2014 David Henningsson, Canonical Ltd.
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
24 #include "srbchannel.h"
26 #include <pulsecore/atomic.h>
27 #include <pulse/xmalloc.h>
29 /* #define DEBUG_SRBCHANNEL */
31 /* This ringbuffer might be useful in other contexts too, but
32 * right now it's only used inside the srbchannel, so let's keep it here
33 * for the time being. */
34 typedef struct pa_ringbuffer pa_ringbuffer;
36 struct pa_ringbuffer {
37 pa_atomic_t *count; /* amount of data in the buffer */
40 int readindex, writeindex;
43 static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) {
44 int c = pa_atomic_load(r->count);
46 if (r->readindex + c > r->capacity)
47 *count = r->capacity - r->readindex;
51 return r->memory + r->readindex;
54 /* Returns true only if the buffer was completely full before the drop. */
55 static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) {
56 bool b = pa_atomic_sub(r->count, count) >= r->capacity;
58 r->readindex += count;
59 r->readindex %= r->capacity;
64 static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) {
65 int c = pa_atomic_load(r->count);
67 *count = PA_MIN(r->capacity - r->writeindex, r->capacity - c);
69 return r->memory + r->writeindex;
72 static void pa_ringbuffer_end_write(pa_ringbuffer *r, int count) {
73 pa_atomic_add(r->count, count);
74 r->writeindex += count;
75 r->writeindex %= r->capacity;
78 struct pa_srbchannel {
79 pa_ringbuffer rb_read, rb_write;
80 pa_fdsem *sem_read, *sem_write;
81 pa_memblock *memblock;
84 pa_srbchannel_cb_t callback;
86 pa_io_event *read_event;
87 pa_defer_event *defer_event;
88 pa_mainloop_api *mainloop;
91 /* We always listen to sem_read, and always signal on sem_write.
93 * This means we signal the same semaphore for two scenarios:
94 * 1) We have written something to our send buffer, and want the other
96 * 2) We have read something from our receive buffer that was previously
97 * completely full, and want the other side to continue writing
100 size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) {
105 void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite);
107 if ((size_t) towrite > l)
111 #ifdef DEBUG_SRBCHANNEL
112 pa_log("srbchannel output buffer full");
117 memcpy(ptr, data, towrite);
118 pa_ringbuffer_end_write(&sr->rb_write, towrite);
120 data = (uint8_t*) data + towrite;
123 #ifdef DEBUG_SRBCHANNEL
124 pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written);
127 pa_fdsem_post(sr->sem_write);
131 size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) {
136 void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread);
138 if ((size_t) toread > l)
144 memcpy(data, ptr, toread);
146 if (pa_ringbuffer_drop(&sr->rb_read, toread)) {
147 #ifdef DEBUG_SRBCHANNEL
148 pa_log("Read from full output buffer, signalling fdsem");
150 pa_fdsem_post(sr->sem_write);
154 data = (uint8_t*) data + toread;
158 #ifdef DEBUG_SRBCHANNEL
159 pa_log("Read %d bytes from srbchannel", (int) isread);
165 /* This is the memory layout of the ringbuffer shm block. It is followed by
166 read and write ringbuffer memory. */
168 pa_atomic_t read_count;
169 pa_atomic_t write_count;
171 pa_fdsem_data read_semdata;
172 pa_fdsem_data write_semdata;
178 /* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */
181 static void srbchannel_rwloop(pa_srbchannel* sr) {
183 #ifdef DEBUG_SRBCHANNEL
185 pa_ringbuffer_peek(&sr->rb_read, &q);
186 pa_log("In rw loop from srbchannel, before callback, count = %d", q);
190 if (!sr->callback(sr, sr->cb_userdata)) {
191 #ifdef DEBUG_SRBCHANNEL
192 pa_log("Aborting read loop from srbchannel");
198 #ifdef DEBUG_SRBCHANNEL
199 pa_ringbuffer_peek(&sr->rb_read, &q);
200 pa_log("In rw loop from srbchannel, after callback, count = %d", q);
203 } while (pa_fdsem_before_poll(sr->sem_read) < 0);
206 static void semread_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t events, void *userdata) {
207 pa_srbchannel* sr = userdata;
209 pa_fdsem_after_poll(sr->sem_read);
210 srbchannel_rwloop(sr);
213 static void defer_cb(pa_mainloop_api *m, pa_defer_event *e, void *userdata) {
214 pa_srbchannel* sr = userdata;
216 #ifdef DEBUG_SRBCHANNEL
217 pa_log("Calling rw loop from deferred event");
220 m->defer_enable(e, 0);
221 srbchannel_rwloop(sr);
224 pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) {
227 struct srbheader *srh;
229 pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
231 sr->memblock = pa_memblock_new_pool(p, -1);
235 srh = pa_memblock_acquire(sr->memblock);
238 sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh));
239 srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh;
241 capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2;
243 sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity);
244 srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh;
246 capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset);
248 pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes",
249 (int) pa_memblock_get_length(sr->memblock), capacity);
251 srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity;
253 sr->rb_read.count = &srh->read_count;
254 sr->rb_write.count = &srh->write_count;
256 sr->sem_read = pa_fdsem_new_shm(&srh->read_semdata);
260 sr->sem_write = pa_fdsem_new_shm(&srh->write_semdata);
264 readfd = pa_fdsem_get(sr->sem_read);
266 #ifdef DEBUG_SRBCHANNEL
267 pa_log("Enabling io event on fd %d", readfd);
270 sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
271 m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
276 pa_srbchannel_free(sr);
281 static void pa_srbchannel_swap(pa_srbchannel *sr) {
282 pa_srbchannel temp = *sr;
284 sr->sem_read = temp.sem_write;
285 sr->sem_write = temp.sem_read;
286 sr->rb_read = temp.rb_write;
287 sr->rb_write = temp.rb_read;
290 pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel_template *t)
293 struct srbheader *srh;
294 pa_srbchannel* sr = pa_xmalloc0(sizeof(pa_srbchannel));
297 sr->memblock = t->memblock;
298 pa_memblock_ref(sr->memblock);
299 srh = pa_memblock_acquire(sr->memblock);
301 sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;
302 sr->rb_read.count = &srh->read_count;
303 sr->rb_write.count = &srh->write_count;
305 sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;
306 sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;
308 sr->sem_read = pa_fdsem_open_shm(&srh->read_semdata, t->readfd);
312 sr->sem_write = pa_fdsem_open_shm(&srh->write_semdata, t->writefd);
316 pa_srbchannel_swap(sr);
317 temp = t->readfd; t->readfd = t->writefd; t->writefd = temp;
319 #ifdef DEBUG_SRBCHANNEL
320 pa_log("Enabling io event on fd %d", t->readfd);
323 sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
324 m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
329 pa_srbchannel_free(sr);
334 void pa_srbchannel_export(pa_srbchannel *sr, pa_srbchannel_template *t) {
335 t->memblock = sr->memblock;
336 t->readfd = pa_fdsem_get(sr->sem_read);
337 t->writefd = pa_fdsem_get(sr->sem_write);
340 void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata) {
342 pa_fdsem_after_poll(sr->sem_read);
344 sr->callback = callback;
345 sr->cb_userdata = userdata;
348 /* If there are events to be read already in the ringbuffer, we will not get any IO event for that,
349 because that's how pa_fdsem works. Therefore check the ringbuffer in a defer event instead. */
350 if (!sr->defer_event)
351 sr->defer_event = sr->mainloop->defer_new(sr->mainloop, defer_cb, sr);
352 sr->mainloop->defer_enable(sr->defer_event, 1);
356 void pa_srbchannel_free(pa_srbchannel *sr)
358 #ifdef DEBUG_SRBCHANNEL
359 pa_log("Freeing srbchannel");
364 sr->mainloop->defer_free(sr->defer_event);
366 sr->mainloop->io_free(sr->read_event);
369 pa_fdsem_free(sr->sem_read);
371 pa_fdsem_free(sr->sem_write);
374 pa_memblock_release(sr->memblock);
375 pa_memblock_unref(sr->memblock);