/* #define DEBUG_SRBCHANNEL */
/* This ringbuffer might be useful in other contexts too, but
- right now it's only used inside the srbchannel, so let's keep it here
- for the time being. */
+ * right now it's only used inside the srbchannel, so let's keep it here
+ * for the time being. */
typedef struct pa_ringbuffer pa_ringbuffer;
+
struct pa_ringbuffer {
- pa_atomic_t *count;
+ pa_atomic_t *count; /* amount of data in the buffer, can be negative */
int capacity;
uint8_t *memory;
int readindex, writeindex;
static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) {
int c = pa_atomic_load(r->count);
+
if (r->readindex + c > r->capacity)
*count = r->capacity - r->readindex;
else
*count = c;
+
return r->memory + r->readindex;
}
/* Returns true only if the buffer was completely full before the drop. */
static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) {
bool b = pa_atomic_sub(r->count, count) >= r->capacity;
+
r->readindex += count;
r->readindex %= r->capacity;
+
return b;
}
static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) {
int c = pa_atomic_load(r->count);
+
*count = PA_MIN(r->capacity - r->writeindex, r->capacity - c);
+
return r->memory + r->writeindex;
}
pa_ringbuffer rb_read, rb_write;
pa_fdsem *sem_read, *sem_write;
pa_memblock *memblock;
+
void *cb_userdata;
pa_srbchannel_cb_t callback;
+
pa_io_event *read_event;
pa_mainloop_api *mainloop;
};
/* We always listen to sem_read, and always signal on sem_write.
-
- This means we signal the same semaphore for two scenarios:
- 1) We have written something to our send buffer, and want the other
- side to read it
- 2) We have read something from our receive buffer that was previously
- completely full, and want the other side to continue writing
+ *
+ * This means we signal the same semaphore for two scenarios:
+ * 1) We have written something to our send buffer, and want the other
+ * side to read it
+ * 2) We have read something from our receive buffer that was previously
+ * completely full, and want the other side to continue writing
*/
size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) {
size_t written = 0;
+
while (l > 0) {
int towrite;
void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite);
+
if ((size_t) towrite > l)
towrite = l;
+
if (towrite == 0) {
#ifdef DEBUG_SRBCHANNEL
pa_log("srbchannel output buffer full");
#endif
break;
}
+
memcpy(ptr, data, towrite);
pa_ringbuffer_end_write(&sr->rb_write, towrite);
written += towrite;
#ifdef DEBUG_SRBCHANNEL
pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written);
#endif
+
pa_fdsem_post(sr->sem_write);
return written;
}
size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) {
size_t isread = 0;
+
while (l > 0) {
int toread;
void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread);
+
if ((size_t) toread > l)
toread = l;
+
if (toread == 0)
break;
+
memcpy(data, ptr, toread);
+
if (pa_ringbuffer_drop(&sr->rb_read, toread)) {
#ifdef DEBUG_SRBCHANNEL
pa_log("Read from full output buffer, signalling fdsem");
data = (uint8_t*) data + toread;
l -= toread;
}
+
#ifdef DEBUG_SRBCHANNEL
pa_log("Read %d bytes from srbchannel", (int) isread);
#endif
+
return isread;
}
struct srbheader {
pa_atomic_t read_count;
pa_atomic_t write_count;
+
pa_fdsem_data read_semdata;
pa_fdsem_data write_semdata;
+
int capacity;
int readbuf_offset;
int writebuf_offset;
+
/* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */
};
pa_log("In rw loop from srbchannel, before callback, count = %d", q);
#endif
- if (sr->callback)
+ if (sr->callback) {
if (!sr->callback(sr, sr->cb_userdata)) {
#ifdef DEBUG_SRBCHANNEL
pa_log("Aborting read loop from srbchannel");
#endif
return;
}
+ }
#ifdef DEBUG_SRBCHANNEL
pa_ringbuffer_peek(&sr->rb_read, &q);
sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh));
srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh;
+
capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2;
+
sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity);
srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh;
+
capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset);
+
pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes",
(int) pa_memblock_get_length(sr->memblock), capacity);
srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity;
+
sr->rb_read.count = &srh->read_count;
sr->rb_write.count = &srh->write_count;
goto fail;
readfd = pa_fdsem_get(sr->sem_read);
+
#ifdef DEBUG_SRBCHANNEL
pa_log("Enabling io event on fd %d", readfd);
#endif
+
sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
static void pa_srbchannel_swap(pa_srbchannel *sr) {
pa_srbchannel temp = *sr;
+
sr->sem_read = temp.sem_write;
sr->sem_write = temp.sem_read;
sr->rb_read = temp.rb_write;
sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;
sr->rb_read.count = &srh->read_count;
sr->rb_write.count = &srh->write_count;
+
sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;
sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;
#ifdef DEBUG_SRBCHANNEL
pa_log("Enabling io event on fd %d", t->readfd);
#endif
+
sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
#include <pulsecore/memblock.h>
/* An shm ringbuffer that is used for low overhead server-client communication.
- Signaling is done through eventfd semaphores (pa_fdsem). */
+ * Signaling is done through eventfd semaphores (pa_fdsem). */
typedef struct pa_srbchannel pa_srbchannel;
size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l);
/* Set the callback function that is called whenever data becomes available for reading.
- It can also be called if the output buffer was full and can now be written to.
-
- Return false to abort all processing (e g if the srbchannel has been freed during the callback).
- Otherwise return true.
-
- Note that the callback will be called immediately, to be able to process stuff that
- might already be in the buffer.
+ * It can also be called if the output buffer was full and can now be written to.
+ *
+ * Return false to abort all processing (e g if the srbchannel has been freed during the callback).
+ * Otherwise return true.
+ *
+ * Note that the callback will be called immediately, to be able to process stuff that
+ * might already be in the buffer.
*/
typedef bool (*pa_srbchannel_cb_t)(pa_srbchannel *sr, void *userdata);
void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata);