From d87373181071afe38c35d997facee62f5a3cb604 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sun, 24 Jun 2007 16:15:56 +0000 Subject: [PATCH] rework the logic of pa_asyncq git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1496 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/asyncq.c | 201 +++++++++++++++++++++++++++---------------------- 1 file changed, 113 insertions(+), 88 deletions(-) diff --git a/src/pulsecore/asyncq.c b/src/pulsecore/asyncq.c index c966e7d..025c695 100644 --- a/src/pulsecore/asyncq.c +++ b/src/pulsecore/asyncq.c @@ -52,9 +52,10 @@ struct pa_asyncq { unsigned size; unsigned read_idx; unsigned write_idx; - pa_atomic_t read_waiting, n_read; - pa_atomic_t write_waiting, n_written; + pa_atomic_t read_waiting; /* a bool */ + pa_atomic_t write_waiting; /* a bool */ int read_fds[2], write_fds[2]; + pa_atomic_t in_read_fifo, in_write_fifo; }; #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq)))) @@ -80,8 +81,8 @@ pa_asyncq *pa_asyncq_new(unsigned size) { l->size = size; pa_atomic_store(&l->read_waiting, 0); pa_atomic_store(&l->write_waiting, 0); - pa_atomic_store(&l->n_written, 0); - pa_atomic_store(&l->n_read, 0); + pa_atomic_store(&l->in_read_fifo, 0); + pa_atomic_store(&l->in_write_fifo, 0); if (pipe(l->read_fds) < 0) { pa_xfree(l); @@ -133,68 +134,79 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { - if (!wait) { - /* Let's empty the FIFO from old notifications, before we return */ + /* Let's empty the FIFO from old notifications, before we return */ - while (pa_atomic_load(&l->n_read) > 0) { - ssize_t r; - int x[20]; - - errno = 0; - if ((r = read(l->write_fds[0], x, sizeof(x))) < 0 && errno != EINTR) - return -1; + while (pa_atomic_load(&l->in_write_fifo) > 0) { + ssize_t r; + int x[20]; + + if ((r = read(l->write_fds[0], x, sizeof(x))) < 0) { - pa_assert(r != 0); + if (errno == EINTR) + continue; - if (r > 0) - if (pa_atomic_sub(&l->n_read, r) <= r) - break; + return -1; } - - return -1; - } - - /* First try failed. Let's wait for changes. */ - _Y; + pa_assert(r > 0); + + if (pa_atomic_sub(&l->in_write_fifo, r) <= r) + break; - pa_atomic_inc(&l->write_waiting); + } - for (;;) { - char x[20]; - ssize_t r; + /* Now let's make sure that we didn't lose any events */ + if (!pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) { - _Y; + if (!wait) + return -1; - if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) - break; + /* Let's wait for changes. */ _Y; - if ((r = read(l->write_fds[0], x, sizeof(x))) < 0 && errno != EINTR) { - pa_atomic_dec(&l->write_waiting); - return -1; - } + pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 0, 1)); - pa_assert(r != 0); + for (;;) { + char x[20]; + ssize_t r; + + _Y; + + if (pa_atomic_ptr_cmpxchg(&cells[idx], NULL, p)) + break; + + _Y; - if (r > 0) - pa_atomic_sub(&l->n_read, r); - } + if ((r = read(l->write_fds[0], x, sizeof(x))) < 0) { - _Y; + if (errno == EINTR) + continue; + + pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 1, 0)); + return -1; + } - pa_atomic_dec(&l->write_waiting); + pa_assert(r > 0); + pa_atomic_sub(&l->in_write_fifo, r); + } + + _Y; + + pa_assert_se(pa_atomic_cmpxchg(&l->write_waiting, 1, 0)); + } } _Y; l->write_idx++; - if (pa_atomic_load(&l->read_waiting)) { + if (pa_atomic_load(&l->read_waiting) > 0) { char x = 'x'; _Y; - if (write(l->read_fds[1], &x, sizeof(x)) > 0) - pa_atomic_inc(&l->n_written); + if (write(l->read_fds[1], &x, sizeof(x)) > 0) { + pa_atomic_inc(&l->in_read_fifo); +/* pa_log("increasing %p by 1", l); */ + } } return 0; @@ -206,7 +218,7 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) { pa_atomic_ptr_t *cells; pa_assert(l); - + cells = PA_ASYNCQ_CELLS(l); _Y; @@ -214,71 +226,86 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) { if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { - /* First try failed. Let's wait for changes. */ +/* pa_log("pop failed wait=%i", wait); */ - if (!wait) { - /* Let's empty the FIFO from old notifications, before we return */ + /* Hmm, nothing, here, so let's drop all queued events. */ + while (pa_atomic_load(&l->in_read_fifo) > 0) { + ssize_t r; + int x[20]; - while (pa_atomic_load(&l->n_written) > 0) { - ssize_t r; - int x[20]; + if ((r = read(l->read_fds[0], x, sizeof(x))) < 0) { - errno = 0; - if ((r = read(l->read_fds[0], x, sizeof(x))) < 0 && errno != EINTR) - return NULL; - - pa_assert(r != 0); + if (errno == EINTR) + continue; - if (r > 0) - if (pa_atomic_sub(&l->n_written, r) <= r) - break; + return NULL; } + + pa_assert(r > 0); + +/* pa_log("decreasing %p by %i", l, r); */ - return NULL; + if (pa_atomic_sub(&l->in_read_fifo, r) <= r) + break; } - _Y; - - pa_atomic_inc(&l->read_waiting); + /* Now let's make sure that we didn't lose any events */ + if (!(ret = pa_atomic_ptr_load(&cells[idx]))) { - for (;;) { - char x[20]; - ssize_t r; + if (!wait) + return NULL; + /* Let's wait for changes. */ + _Y; + + pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 0, 1)); + + for (;;) { + char x[20]; + ssize_t r; + + _Y; + + if ((ret = pa_atomic_ptr_load(&cells[idx]))) + break; + + _Y; + + if ((r = read(l->read_fds[0], x, sizeof(x))) < 0) { - if ((ret = pa_atomic_ptr_load(&cells[idx]))) - break; - - _Y; + if (errno == EINTR) + continue; + + pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0)); + return NULL; + } - if ((r = read(l->read_fds[0], x, sizeof(x))) < 0 && errno != EINTR) { - pa_atomic_dec(&l->read_waiting); - return NULL; +/* pa_log("decreasing %p by %i", l, r); */ + + pa_assert(r > 0); + pa_atomic_sub(&l->in_read_fifo, r); } - pa_assert(r != 0); + _Y; - if (r > 0) - pa_atomic_sub(&l->n_written, r); + pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0)); } - - _Y; - - pa_atomic_dec(&l->read_waiting); } + pa_assert(ret); + /* Guaranteed if we only have a single reader */ pa_assert_se(pa_atomic_ptr_cmpxchg(&cells[idx], ret, NULL)); _Y; l->read_idx++; - if (pa_atomic_load(&l->write_waiting)) { + if (pa_atomic_load(&l->write_waiting) > 0) { char x = 'x'; _Y; if (write(l->write_fds[1], &x, sizeof(x)) >= 0) - pa_atomic_inc(&l->n_read); + pa_atomic_inc(&l->in_write_fifo); } return ret; @@ -301,13 +328,13 @@ int pa_asyncq_before_poll(pa_asyncq *l) { _Y; idx = reduce(l, l->read_idx); - if (pa_atomic_ptr_load(&cells[idx])) + if (pa_atomic_ptr_load(&cells[idx]) || pa_atomic_load(&l->in_read_fifo) > 0) return -1; - pa_atomic_inc(&l->read_waiting); + pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 0, 1)); - if (pa_atomic_ptr_load(&cells[idx])) { - pa_atomic_dec(&l->read_waiting); + if (pa_atomic_ptr_load(&cells[idx]) || pa_atomic_load(&l->in_read_fifo) > 0) { + pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0)); return -1; } @@ -317,7 +344,5 @@ int pa_asyncq_before_poll(pa_asyncq *l) { void pa_asyncq_after_poll(pa_asyncq *l) { pa_assert(l); - pa_assert(pa_atomic_load(&l->read_waiting) > 0); - - pa_atomic_dec(&l->read_waiting); + pa_assert_se(pa_atomic_cmpxchg(&l->read_waiting, 1, 0)); } -- 2.7.4