merge glitch-free branch back into trunk
[profile/ivi/pulseaudio.git] / src / pulsecore / asyncq.c
index 75b15c0..8e0dfbc 100644 (file)
@@ -3,7 +3,7 @@
 /***
   This file is part of PulseAudio.
 
-  Copyright 2006 Lennart Poettering
+  Copyright 2006-2008 Lennart Poettering
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as
 #include <pulsecore/thread.h>
 #include <pulsecore/macro.h>
 #include <pulsecore/core-util.h>
+#include <pulsecore/llist.h>
+#include <pulsecore/flist.h>
 #include <pulse/xmalloc.h>
 
 #include "asyncq.h"
 #include "fdsem.h"
 
-#define ASYNCQ_SIZE 128
+#define ASYNCQ_SIZE 256
 
-/* For debugging purposes we can define _Y to put and extra thread
+/* For debugging purposes we can define _Y to put an extra thread
  * yield between each operation. */
 
 /* #define PROFILE */
 #define _Y do { } while(0)
 #endif
 
+struct localq {
+    void *data;
+    PA_LLIST_FIELDS(struct localq);
+};
+
 struct pa_asyncq {
     unsigned size;
     unsigned read_idx;
     unsigned write_idx;
     pa_fdsem *read_fdsem, *write_fdsem;
+
+    PA_LLIST_HEAD(struct localq, localq);
+    struct localq *last_localq;
+    pa_bool_t waiting_for_post;
 };
 
-#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
+PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree);
 
-static int is_power_of_two(unsigned size) {
-    return !(size & (size - 1));
-}
+#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
 
 static int reduce(pa_asyncq *l, int value) {
     return value & (unsigned) (l->size - 1);
@@ -74,12 +83,16 @@ pa_asyncq *pa_asyncq_new(unsigned size) {
     if (!size)
         size = ASYNCQ_SIZE;
 
-    pa_assert(is_power_of_two(size));
+    pa_assert(pa_is_power_of_two(size));
 
     l = pa_xmalloc0(PA_ALIGN(sizeof(pa_asyncq)) + (sizeof(pa_atomic_ptr_t) * size));
 
     l->size = size;
 
+    PA_LLIST_HEAD_INIT(struct localq, l->localq);
+    l->last_localq = NULL;
+    l->waiting_for_post = FALSE;
+
     if (!(l->read_fdsem = pa_fdsem_new())) {
         pa_xfree(l);
         return NULL;
@@ -95,6 +108,7 @@ pa_asyncq *pa_asyncq_new(unsigned size) {
 }
 
 void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
+    struct localq *q;
     pa_assert(l);
 
     if (free_cb) {
@@ -104,12 +118,22 @@ void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
             free_cb(p);
     }
 
+    while ((q = l->localq)) {
+        if (free_cb)
+            free_cb(q->data);
+
+        PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+            pa_xfree(q);
+    }
+
     pa_fdsem_free(l->read_fdsem);
     pa_fdsem_free(l->write_fdsem);
     pa_xfree(l);
 }
 
-int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
+static int push(pa_asyncq*l, void *p, pa_bool_t wait) {
     int idx;
     pa_atomic_ptr_t *cells;
 
@@ -141,7 +165,63 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
     return 0;
 }
 
-void* pa_asyncq_pop(pa_asyncq*l, int wait) {
+static pa_bool_t flush_postq(pa_asyncq *l) {
+    struct localq *q;
+
+    pa_assert(l);
+
+    while ((q = l->last_localq)) {
+
+        if (push(l, q->data, FALSE) < 0)
+            return FALSE;
+
+        l->last_localq = q->prev;
+
+        PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+        if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+            pa_xfree(q);
+    }
+
+    return TRUE;
+}
+
+int pa_asyncq_push(pa_asyncq*l, void *p, pa_bool_t wait) {
+    pa_assert(l);
+
+    if (!flush_postq(l))
+        return -1;
+
+    return push(l, p, wait);
+}
+
+void pa_asyncq_post(pa_asyncq*l, void *p) {
+    struct localq *q;
+
+    pa_assert(l);
+    pa_assert(p);
+
+    if (pa_asyncq_push(l, p, FALSE) >= 0)
+        return;
+
+    /* OK, we couldn't push anything in the queue. So let's queue it
+     * locally and push it later */
+
+    pa_log("q overrun, queuing locally");
+
+    if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq))))
+        q = pa_xnew(struct localq, 1);
+
+    q->data = p;
+    PA_LLIST_PREPEND(struct localq, l->localq, q);
+
+    if (!l->last_localq)
+        l->last_localq = q;
+
+    return;
+}
+
+void* pa_asyncq_pop(pa_asyncq*l, pa_bool_t wait) {
     int idx;
     void *ret;
     pa_atomic_ptr_t *cells;
@@ -178,13 +258,13 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {
     return ret;
 }
 
-int pa_asyncq_get_fd(pa_asyncq *q) {
+int pa_asyncq_read_fd(pa_asyncq *q) {
     pa_assert(q);
 
     return pa_fdsem_get(q->write_fdsem);
 }
 
-int pa_asyncq_before_poll(pa_asyncq *l) {
+int pa_asyncq_read_before_poll(pa_asyncq *l) {
     int idx;
     pa_atomic_ptr_t *cells;
 
@@ -206,8 +286,38 @@ int pa_asyncq_before_poll(pa_asyncq *l) {
     return 0;
 }
 
-void pa_asyncq_after_poll(pa_asyncq *l) {
+void pa_asyncq_read_after_poll(pa_asyncq *l) {
     pa_assert(l);
 
     pa_fdsem_after_poll(l->write_fdsem);
 }
+
+int pa_asyncq_write_fd(pa_asyncq *q) {
+    pa_assert(q);
+
+    return pa_fdsem_get(q->read_fdsem);
+}
+
+void pa_asyncq_write_before_poll(pa_asyncq *l) {
+    pa_assert(l);
+
+    for (;;) {
+
+        if (flush_postq(l))
+            break;
+
+        if (pa_fdsem_before_poll(l->read_fdsem) >= 0) {
+            l->waiting_for_post = TRUE;
+            break;
+        }
+    }
+}
+
+void pa_asyncq_write_after_poll(pa_asyncq *l) {
+    pa_assert(l);
+
+    if (l->waiting_for_post) {
+        pa_fdsem_after_poll(l->read_fdsem);
+        l->waiting_for_post = FALSE;
+    }
+}