From bc3693261fa2922ff55133432b212dc03589ba50 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Thu, 26 Jul 2007 13:15:05 +0000 Subject: [PATCH] port asyncq to make use of new fdsem object git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1538 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/asyncq.c | 207 +++++++++---------------------------------------- 1 file changed, 36 insertions(+), 171 deletions(-) diff --git a/src/pulsecore/asyncq.c b/src/pulsecore/asyncq.c index 025c695..2238124 100644 --- a/src/pulsecore/asyncq.c +++ b/src/pulsecore/asyncq.c @@ -36,12 +36,15 @@ #include #include "asyncq.h" +#include "fdsem.h" #define ASYNCQ_SIZE 128 /* For debugging purposes we can define _Y to put and extra thread * yield between each operation. */ +/* #define PROFILE */ + #ifdef PROFILE #define _Y pa_thread_yield() #else @@ -52,10 +55,7 @@ struct pa_asyncq { unsigned size; unsigned read_idx; unsigned write_idx; - pa_atomic_t read_waiting; /* a bool */ - pa_atomic_t write_waiting; /* a bool */ - int read_fds[2], write_fds[2]; - pa_atomic_t in_read_fifo, in_write_fifo; + pa_fdsem *read_fdsem, *write_fdsem; }; #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq)))) @@ -79,26 +79,18 @@ pa_asyncq *pa_asyncq_new(unsigned size) { l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size)); l->size = size; - pa_atomic_store(&l->read_waiting, 0); - pa_atomic_store(&l->write_waiting, 0); - pa_atomic_store(&l->in_read_fifo, 0); - pa_atomic_store(&l->in_write_fifo, 0); - if (pipe(l->read_fds) < 0) { + if (!(l->read_fdsem = pa_fdsem_new())) { pa_xfree(l); return NULL; } - if (pipe(l->write_fds) < 0) { - pa_close(l->read_fds[0]); - pa_close(l->read_fds[1]); + if (!(l->write_fdsem = pa_fdsem_new())) { + pa_fdsem_free(l->read_fdsem); pa_xfree(l); return NULL; } - pa_make_nonblock_fd(l->read_fds[1]); - pa_make_nonblock_fd(l->write_fds[1]); - return l; } @@ -112,11 +104,8 @@ void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) { free_cb(p); } - pa_close(l->read_fds[0]); - pa_close(l->read_fds[1]); - pa_close(l->write_fds[0]); - pa_close(l->write_fds[1]); - + pa_fdsem_free(l->read_fdsem); + pa_fdsem_free(l->write_fdsem); pa_xfree(l); } @@ -134,80 +123,20 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { - /* Let's empty the FIFO from old notifications, before we return */ - - while (pa_atomic_load(&l->in_write_fifo) > 0) { - ssize_t r; - int x[20]; - - if ((r = read(l->write_fds[0], x, sizeof(x))) < 0) { - - if (errno == EINTR) - continue; - - return -1; - } - - pa_assert(r > 0); - - if (pa_atomic_sub(&l->in_write_fifo, r) <= r) - break; - - } - - /* Now let's make sure that we didn't lose any events */ - if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { - - if (!wait) - return -1; - - /* Let's wait for changes. */ - - _Y; - - pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 0, 1)); - - for (;;) { - char x[20]; - ssize_t r; - - _Y; - - if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) - break; - - _Y; - - if ((r = read(l->write_fds[0], x, sizeof(x))) < 0) { - - if (errno == EINTR) - continue; - - pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 1, 0)); - return -1; - } - - pa_assert(r > 0); - pa_atomic_sub(&l->in_write_fifo, r); - } - - _Y; - - pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 1, 0)); - } + if (!wait) + return -1; + +/* pa_log("sleeping on push"); */ + + do { + pa_fdsem_wait(l->read_fdsem); + } while (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)); } _Y; l->write_idx++; - if (pa_atomic_load(&l->read_waiting) > 0) { - char x = 'x'; - _Y; - if (write(l->read_fds[1], &x, sizeof(x)) > 0) { - pa_atomic_inc(&l->in_read_fifo); -/* pa_log("increasing %p by 1", l); */ - } - } + pa_fdsem_post(l->write_fdsem); return 0; } @@ -226,95 +155,33 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) { if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { -/* pa_log("pop failed wait=%i", wait); */ - - /* Hmm, nothing, here, so let's drop all queued events. */ - while (pa_atomic_load(&l->in_read_fifo) > 0) { - ssize_t r; - int x[20]; - - if ((r = read(l->read_fds[0], x, sizeof(x))) < 0) { - - if (errno == EINTR) - continue; - - return NULL; - } - - pa_assert(r > 0); - -/* pa_log("decreasing %p by %i", l, r); */ - - if (pa_atomic_sub(&l->in_read_fifo, r) <= r) - break; - } - - /* Now let's make sure that we didn't lose any events */ - if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { - - if (!wait) - return NULL; - - /* Let's wait for changes. */ - - _Y; - - pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 0, 1)); - - for (;;) { - char x[20]; - ssize_t r; - - _Y; - - if ((ret = pa_atomic_ptr_load(&cells[idx]))) - break; - - _Y; - - if ((r = read(l->read_fds[0], x, sizeof(x))) < 0) { - - if (errno == EINTR) - continue; - - pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0)); - return NULL; - } - -/* pa_log("decreasing %p by %i", l, r); */ - - pa_assert(r > 0); - pa_atomic_sub(&l->in_read_fifo, r); - } - - _Y; - - pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0)); - } + if (!wait) + return NULL; + +/* pa_log("sleeping on pop"); */ + + do { + pa_fdsem_wait(l->write_fdsem); + } while (!(ret = pa_atomic_ptr_load(&cells[idx]))); } pa_assert(ret); - /* Guaranteed if we only have a single reader */ + /* Guaranteed to succeed if we only have a single reader */ pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL)); _Y; l->read_idx++; - if (pa_atomic_load(&l->write_waiting) > 0) { - char x = 'x'; - _Y; - if (write(l->write_fds[1], &x, sizeof(x)) >= 0) - pa_atomic_inc(&l->in_write_fifo); - } - + pa_fdsem_post(l->read_fdsem); + return ret; } int pa_asyncq_get_fd(pa_asyncq *q) { pa_assert(q); - return q->read_fds[0]; + return pa_fdsem_get(q->write_fdsem); } int pa_asyncq_before_poll(pa_asyncq *l) { @@ -328,14 +195,12 @@ int pa_asyncq_before_poll(pa_asyncq *l) { _Y; idx = reduce(l, l->read_idx); - if (pa_atomic_ptr_load(&cells[idx]) || pa_atomic_load(&l->in_read_fifo) > 0) - return -1; - - pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 0, 1)); + for (;;) { + if (pa_atomic_ptr_load(&cells[idx])) + return -1; - if (pa_atomic_ptr_load(&cells[idx]) || pa_atomic_load(&l->in_read_fifo) > 0) { - pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0)); - return -1; + if (pa_fdsem_before_poll(l->write_fdsem) >= 0) + return 0; } return 0; @@ -344,5 +209,5 @@ int pa_asyncq_before_poll(pa_asyncq *l) { void pa_asyncq_after_poll(pa_asyncq *l) { pa_assert(l); - pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0)); + pa_fdsem_after_poll(l->write_fdsem); } -- 2.7.4