port remaining sinks to pa_rtpoll
authorLennart Poettering <lennart@poettering.net>
Wed, 22 Aug 2007 22:27:53 +0000 (22:27 +0000)
committerLennart Poettering <lennart@poettering.net>
Wed, 22 Aug 2007 22:27:53 +0000 (22:27 +0000)
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1705 fefdeb5f-60dc-0310-8127-8f9354f1896f

src/modules/module-alsa-source.c
src/modules/module-null-sink.c
src/modules/module-oss.c
src/modules/module-pipe-sink.c

index 58e7cb4..6ea99ec 100644 (file)
 #include <assert.h>
 #include <stdio.h>
 
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#else
-#include "poll.h"
-#endif
-
 #include <asoundlib.h>
 
 #include <pulse/xmalloc.h>
+#include <pulse/util.h>
 
 #include <pulsecore/core-error.h>
 #include <pulsecore/core.h>
@@ -52,6 +47,7 @@
 #include <pulsecore/thread.h>
 #include <pulsecore/core-error.h>
 #include <pulsecore/thread-mq.h>
+#include <pulsecore/rtpoll.h>
 
 #include "alsa-util.h"
 #include "module-alsa-source-symdef.h"
@@ -76,8 +72,10 @@ struct userdata {
     pa_core *core;
     pa_module *module;
     pa_source *source;
+    
     pa_thread *thread;
     pa_thread_mq thread_mq;
+    pa_rtpoll *rtpoll;
 
     snd_pcm_t *pcm_handle;
 
@@ -93,13 +91,7 @@ struct userdata {
 
     int use_mmap;
 
-    struct pollfd *pollfd;
-    int n_alsa_fds;
-};
-
-enum {
-    POLLFD_ASYNCQ,
-    POLLFD_ALSA_BASE
+    pa_rtpoll_item *alsa_rtpoll_item;
 };
 
 static const char* const valid_modargs[] = {
@@ -116,16 +108,16 @@ static const char* const valid_modargs[] = {
 };
 
 static int mmap_read(struct userdata *u) {
-    snd_pcm_sframes_t n;
-    int err;
-    const snd_pcm_channel_area_t *areas;
-    snd_pcm_uframes_t offset, frames;
     int work_done = 0;
     
     pa_assert(u);
-    pa_assert(u->source);
+    pa_source_assert_ref(u->source);
 
     for (;;) {
+        snd_pcm_sframes_t n;
+        int err;
+        const snd_pcm_channel_area_t *areas;
+        snd_pcm_uframes_t offset, frames;
         pa_memchunk chunk;
         void *p;
         
@@ -207,6 +199,73 @@ static int mmap_read(struct userdata *u) {
     }
 }
 
+static int unix_read(struct userdata *u) {
+    snd_pcm_status_t *status;
+    int work_done = 0;
+
+    snd_pcm_status_alloca(&status);
+
+    pa_assert(u);
+    pa_source_assert_ref(u->source);
+
+    for (;;) {
+        void *p;
+        snd_pcm_sframes_t t;
+        ssize_t l;
+        int err;
+        pa_memchunk chunk;
+        
+        if ((err = snd_pcm_status(u->pcm_handle, status)) < 0) {
+            pa_log("Failed to query DSP status data: %s", snd_strerror(t));
+            return -1;
+        }
+
+        if (snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size)
+            pa_log_debug("Buffer overrun!");
+                    
+        l = snd_pcm_status_get_avail(status) * u->frame_size;
+
+        if (l <= 0)
+            return work_done;
+                    
+        chunk.memblock = pa_memblock_new(u->core->mempool, l);
+
+        p = pa_memblock_acquire(chunk.memblock);
+        t = snd_pcm_readi(u->pcm_handle, (uint8_t*) p, l / u->frame_size);
+        pa_memblock_release(chunk.memblock);
+        
+/*                     pa_log("wrote %i bytes of %u (%u)", t*u->frame_size, u->memchunk.length, l);   */
+        
+        pa_assert(t != 0);
+                    
+        if (t < 0) {
+            pa_memblock_unref(chunk.memblock);
+            
+            if ((t = snd_pcm_recover(u->pcm_handle, t, 1)) == 0)
+                continue;
+                        
+            if (t == -EAGAIN) {
+                pa_log_debug("EAGAIN");
+                return work_done;
+            } else {
+                pa_log("Failed to read data from DSP: %s", snd_strerror(t));
+                return -1;
+            }
+        } 
+                        
+        chunk.index = 0;
+        chunk.length = t * u->frame_size;
+
+        pa_source_post(u->source, &chunk);
+        pa_memblock_unref(chunk.memblock);
+                    
+        work_done = 1;
+
+        if (t * u->frame_size >= (unsigned) l)
+            return work_done;
+    }
+}
+
 static pa_usec_t source_get_latency(struct userdata *u) {
     pa_usec_t r = 0;
     snd_pcm_status_t *status;
@@ -216,6 +275,7 @@ static pa_usec_t source_get_latency(struct userdata *u) {
     snd_pcm_status_alloca(&status);
     
     pa_assert(u);
+    pa_assert(u->pcm_handle);
 
     if ((err = snd_pcm_status(u->pcm_handle, status)) < 0) 
         pa_log("Failed to get delay: %s", snd_strerror(err));
@@ -230,22 +290,24 @@ static pa_usec_t source_get_latency(struct userdata *u) {
 
 static int build_pollfd(struct userdata *u) {
     int err;
+    struct pollfd *pollfd;
+    int n;
     
     pa_assert(u);
     pa_assert(u->pcm_handle);
 
-    if ((u->n_alsa_fds = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) {
-        pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(u->n_alsa_fds));
+    if ((n = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) {
+        pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(n));
         return -1;
     }
 
-    pa_xfree(u->pollfd);
-    u->pollfd = pa_xnew0(struct pollfd, POLLFD_ALSA_BASE + u->n_alsa_fds);
-
-    u->pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->thread_mq.inq);
-    u->pollfd[POLLFD_ASYNCQ].events = POLLIN;
+    if (u->alsa_rtpoll_item)
+        pa_rtpoll_item_free(u->alsa_rtpoll_item);
 
-    if ((err = snd_pcm_poll_descriptors(u->pcm_handle, u->pollfd+POLLFD_ALSA_BASE, u->n_alsa_fds)) < 0) {
+    u->alsa_rtpoll_item = pa_rtpoll_item_new(u->rtpoll, n);
+    pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, NULL);
+    
+    if ((err = snd_pcm_poll_descriptors(u->pcm_handle, pollfd, n)) < 0) {
         pa_log("snd_pcm_poll_descriptors() failed: %s", snd_strerror(err));
         return -1;
     }
@@ -261,6 +323,11 @@ static int suspend(struct userdata *u) {
     snd_pcm_close(u->pcm_handle);
     u->pcm_handle = NULL;
 
+    if (u->alsa_rtpoll_item) {
+        pa_rtpoll_item_free(u->alsa_rtpoll_item);
+        u->alsa_rtpoll_item = NULL;
+    }
+    
     pa_log_debug("Device suspended...");
     
     return 0;
@@ -505,14 +572,9 @@ static int source_set_mute_cb(pa_source *s) {
 }
 
 static void thread_func(void *userdata) {
-
     struct userdata *u = userdata;
-    int err;
-    unsigned short revents = 0;
-    snd_pcm_status_t *status;
 
     pa_assert(u);
-    snd_pcm_status_alloca(&status);
 
     pa_log_debug("Thread starting up");
 
@@ -520,20 +582,37 @@ static void thread_func(void *userdata) {
         pa_make_realtime();
 
     pa_thread_mq_install(&u->thread_mq);
+    pa_rtpoll_install(u->rtpoll);
     
     if (build_pollfd(u) < 0)
         goto fail;
 
+    snd_pcm_start(u->pcm_handle);
+    
     for (;;) {
         pa_msgobject *object;
         int code;
         void *data;
-        int r;
         int64_t offset;
         pa_memchunk chunk;
 
 /*         pa_log("loop");     */
         
+        /* Render some data and write it to the dsp */
+        if (PA_SOURCE_OPENED(u->source->thread_info.state)) {
+            
+            if (u->use_mmap) {
+                if (mmap_read(u) < 0)
+                    goto fail;
+
+            } else {
+                if (unix_read(u) < 0)
+                    goto fail;
+            }
+        }
+
+/*         pa_log("loop2"); */
+        
         /* Check whether there is a message for us to process */
         if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) {
             int ret;
@@ -550,109 +629,20 @@ static void thread_func(void *userdata) {
             continue;
         } 
 
-/*         pa_log("loop2"); */
-
-        /* Render some data and write it to the dsp */
-
-        if (PA_SOURCE_OPENED(u->source->thread_info.state) && (revents & POLLIN)) {
-            int work_done = 0;
-            pa_assert(u->pcm_handle);
-
-            if (u->use_mmap) {
-
-                if ((work_done = mmap_read(u)) < 0)
-                    goto fail;
-
-            } else {
-
-                for (;;) {
-                    void *p;
-                    snd_pcm_sframes_t t;
-                    ssize_t l;
-
-                    if ((err = snd_pcm_status(u->pcm_handle, status)) < 0) {
-                        pa_log("Failed to query DSP status data: %s", snd_strerror(t));
-                        goto fail;
-                    }
-
-                    if (snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size)
-                        pa_log_debug("Buffer overrun!");
-                    
-                    l = snd_pcm_status_get_avail(status) * u->frame_size;
-
-                    if (l <= 0)
-                        break;
-                    
-                    chunk.memblock = pa_memblock_new(u->core->mempool, l);
-
-                    p = pa_memblock_acquire(chunk.memblock);
-                    t = snd_pcm_readi(u->pcm_handle, (uint8_t*) p, l / u->frame_size);
-                    pa_memblock_release(chunk.memblock);
-                    
-/*                     pa_log("wrote %i bytes of %u (%u)", t*u->frame_size, u->memchunk.length, l);   */
-                    
-                    pa_assert(t != 0);
-                    
-                    if (t < 0) {
-                        pa_memblock_unref(chunk.memblock);
-
-                        if ((t = snd_pcm_recover(u->pcm_handle, t, 1)) == 0)
-                            continue;
-                        
-                        if (t == -EAGAIN) {
-                            pa_log_debug("EAGAIN");
-                            break;
-                        } else {
-                            pa_log("Failed to read data from DSP: %s", snd_strerror(t));
-                            goto fail;
-                        }
-                        
-                    } 
-                        
-                    chunk.index = 0;
-                    chunk.length = t * u->frame_size;
-
-                    pa_source_post(u->source, &chunk);
-                    pa_memblock_unref(chunk.memblock);
-                    
-                    work_done = 1;
-
-                    if (t * u->frame_size >= (unsigned) l)
-                        break;
-                } 
-            }
-
-            revents &= ~POLLIN;
-            
-            if (work_done)
-                continue;
-        }
-
-        /* Hmm, nothing to do. Let's sleep */
-        if (pa_asyncmsgq_before_poll(u->thread_mq.inq) < 0)
-            continue;
-
-/*         pa_log("polling for %i", POLLFD_ALSA_BASE + (PA_SOURCE_OPENED(u->source->thread_info.state) ? n_alsa_fds : 0));   */
-        r = poll(u->pollfd, POLLFD_ALSA_BASE + (PA_SOURCE_OPENED(u->source->thread_info.state) ? u->n_alsa_fds : 0), -1);
-/*         pa_log("poll end"); */
-
-        pa_asyncmsgq_after_poll(u->thread_mq.inq);
-
-        if (r < 0) {
-            if (errno == EINTR) {
-                u->pollfd[POLLFD_ASYNCQ].revents = 0;
-                revents = 0;
-                continue;
-            }
-
+        if (pa_rtpoll_run(u->rtpoll) < 0) {
             pa_log("poll() failed: %s", pa_cstrerror(errno));
             goto fail;
         }
 
-        pa_assert(r > 0);
-
         if (PA_SOURCE_OPENED(u->source->thread_info.state)) {
-            if ((err = snd_pcm_poll_descriptors_revents(u->pcm_handle, u->pollfd + POLLFD_ALSA_BASE, u->n_alsa_fds, &revents)) < 0) {
+            struct pollfd *pollfd;
+            unsigned short revents = 0;
+            int err;
+            unsigned n;
+
+            pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, &n);
+            
+            if ((err = snd_pcm_poll_descriptors_revents(u->pcm_handle, pollfd, n, &revents)) < 0) {
                 pa_log("snd_pcm_poll_descriptors_revents() failed: %s", snd_strerror(err));
                 goto fail;
             }
@@ -668,10 +658,7 @@ static void thread_func(void *userdata) {
                 goto fail;
             }
 /*             pa_log("got alsa event"); */
-        } else
-            revents = 0;
-        
-        pa_assert((u->pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0);
+        }
     }
 
 fail:
@@ -687,7 +674,6 @@ finish:
 int pa__init(pa_module*m) {
     
     pa_modargs *ma = NULL;
-    int ret = -1;
     struct userdata *u = NULL;
     const char *dev;
     pa_sample_spec ss;
@@ -703,6 +689,8 @@ int pa__init(pa_module*m) {
     int namereg_fail;
     int use_mmap = 1, b;
 
+    snd_pcm_info_alloca(&pcm_info);
+
     pa_assert(m);
     
     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
@@ -739,9 +727,10 @@ int pa__init(pa_module*m) {
     u->module = m;
     m->userdata = u;
     u->use_mmap = use_mmap;
-    u->n_alsa_fds = 0;
-    u->pollfd = NULL;
     pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
+    u->rtpoll = pa_rtpoll_new();
+    u->alsa_rtpoll_item = NULL;
+    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, u->thread_mq.inq);
 
     snd_config_update_free_global();
     if ((err = snd_pcm_open(&u->pcm_handle, dev = pa_modargs_get_value(ma, "device", DEFAULT_DEVICE), SND_PCM_STREAM_CAPTURE, SND_PCM_NONBLOCK)) < 0) {
@@ -751,8 +740,7 @@ int pa__init(pa_module*m) {
 
     u->device_name = pa_xstrdup(dev);
 
-    if ((err = snd_pcm_info_malloc(&pcm_info)) < 0 ||
-        (err = snd_pcm_info(u->pcm_handle, pcm_info)) < 0) {
+    if ((err = snd_pcm_info(u->pcm_handle, pcm_info)) < 0) {
         pa_log("Error fetching PCM info: %s", snd_strerror(err));
         goto fail;
     }
@@ -875,26 +863,18 @@ int pa__init(pa_module*m) {
     if (u->source->get_mute)
         u->source->get_mute(u->source);
 
-    snd_pcm_start(u->pcm_handle);
+    pa_modargs_free(ma);
     
-    ret = 0;
-
-finish:
-
-    if (ma)
-        pa_modargs_free(ma);
+    return 0;
     
-    if (pcm_info)
-        snd_pcm_info_free(pcm_info);
-
-    return ret;
-
 fail:
 
-    if (u)
-        pa__done(m);
+    if (ma)
+        pa_modargs_free(ma);
 
-    goto finish;
+    pa__done(m);
+    
+    return -1;
 }
 
 void pa__done(pa_module*m) {
@@ -918,6 +898,12 @@ void pa__done(pa_module*m) {
     if (u->source)
         pa_source_unref(u->source);
 
+    if (u->alsa_rtpoll_item)
+        pa_rtpoll_item_free(u->alsa_rtpoll_item);
+    
+    if (u->rtpoll)
+        pa_rtpoll_free(u->rtpoll);
+    
     if (u->mixer_fdl)
         pa_alsa_fdlist_free(u->mixer_fdl);
 
@@ -929,7 +915,6 @@ void pa__done(pa_module*m) {
         snd_pcm_close(u->pcm_handle);
     }
 
-    pa_xfree(u->pollfd);
     pa_xfree(u->device_name);
     pa_xfree(u);
 
index 8b17b59..3b51237 100644 (file)
@@ -33,7 +33,6 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include <limits.h>
-#include <sys/poll.h>
 
 #include <pulse/timeval.h>
 #include <pulse/xmalloc.h>
@@ -47,6 +46,8 @@
 #include <pulsecore/log.h>
 #include <pulsecore/thread.h>
 #include <pulsecore/thread-mq.h>
+#include <pulsecore/rtpoll.h>
+#include <pulsecore/rtclock.h>
 
 #include "module-null-sink-symdef.h"
 
@@ -67,11 +68,14 @@ struct userdata {
     pa_core *core;
     pa_module *module;
     pa_sink *sink;
+
     pa_thread *thread;
     pa_thread_mq thread_mq;
+    pa_rtpoll *rtpoll;
+
     size_t block_size;
     
-    struct timeval timestamp;
+    struct timespec timestamp;
 };
 
 static const char* const valid_modargs[] = {
@@ -91,19 +95,19 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
         case PA_SINK_MESSAGE_SET_STATE:
 
             if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING)
-                pa_gettimeofday(&u->timestamp);
+                pa_rtclock_get(&u->timestamp);
             
             break;
             
         case PA_SINK_MESSAGE_GET_LATENCY: {
-            struct timeval now;
+            struct timespec now;
     
-            pa_gettimeofday(&now);
+            pa_rtclock_get(&now);
             
-            if (pa_timeval_cmp(&u->timestamp, &now) > 0)
+            if (pa_timespec_cmp(&u->timestamp, &now) > 0)
                 *((pa_usec_t*) data) = 0;
             else
-                *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now);
+                *((pa_usec_t*) data) = pa_timespec_diff(&u->timestamp, &now);
             break;
         }
     }
@@ -113,29 +117,41 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 
 static void thread_func(void *userdata) {
     struct userdata *u = userdata;
-    struct pollfd pollfd;
 
     pa_assert(u);
 
     pa_log_debug("Thread starting up");
 
     pa_thread_mq_install(&u->thread_mq);
+    pa_rtpoll_install(u->rtpoll);
 
-    pa_gettimeofday(&u->timestamp);
-
-    memset(&pollfd, 0, sizeof(pollfd));
-    pollfd.fd = pa_asyncmsgq_get_fd(u->thread_mq.inq);
-    pollfd.events = POLLIN;
+    pa_rtclock_get(&u->timestamp);
 
     for (;;) {
         pa_msgobject *object;
         int code;
         void *data;
         pa_memchunk chunk;
-        int r, timeout;
-        struct timeval now;
         int64_t offset;
 
+        /* Render some data and drop it immediately */
+        if (u->sink->thread_info.state == PA_SINK_RUNNING) {
+            struct timespec now;
+            
+            pa_rtclock_get(&now);
+
+            if (pa_timespec_cmp(&u->timestamp, &now) <= 0) {
+
+                pa_sink_render(u->sink, u->block_size, &chunk);
+                pa_memblock_unref(chunk.memblock);
+
+                pa_timespec_add(&u->timestamp, pa_bytes_to_usec(chunk.length, &u->sink->sample_spec));
+            }
+
+            pa_rtpoll_set_timer_absolute(u->rtpoll, &u->timestamp);
+        } else
+            pa_rtpoll_set_timer_disabled(u->rtpoll);
+
         /* Check whether there is a message for us to process */
         if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) {
             int ret;
@@ -150,45 +166,11 @@ static void thread_func(void *userdata) {
             continue;
         }
 
-        /* Render some data and drop it immediately */
-        if (u->sink->thread_info.state == PA_SINK_RUNNING) {
-            pa_gettimeofday(&now);
-
-            if (pa_timeval_cmp(&u->timestamp, &now) <= 0) {
-
-                pa_sink_render(u->sink, u->block_size, &chunk);
-                pa_memblock_unref(chunk.memblock);
-
-                pa_timeval_add(&u->timestamp, pa_bytes_to_usec(chunk.length, &u->sink->sample_spec));
-                continue;
-            }
-
-            timeout = pa_timeval_diff(&u->timestamp, &now)/1000;
-
-            if (timeout < 1)
-                timeout = 1;
-        } else
-            timeout = -1;
-
         /* Hmm, nothing to do. Let's sleep */
-
-        if (pa_asyncmsgq_before_poll(u->thread_mq.inq) < 0)
-            continue;
-
-        r = poll(&pollfd, 1, timeout);
-        pa_asyncmsgq_after_poll(u->thread_mq.inq);
-
-        if (r < 0) {
-            if (errno == EINTR) {
-                pollfd.revents = 0;
-                continue;
-            }
-
+        if (pa_rtpoll_run(u->rtpoll) < 0) {
             pa_log("poll() failed: %s", pa_cstrerror(errno));
             goto fail;
         }
-
-        pa_assert(r == 0 || pollfd.revents == POLLIN);
     }
 
 fail:
@@ -224,8 +206,9 @@ int pa__init(pa_module*m) {
     u->core = m->core;
     u->module = m;
     m->userdata = u;
-
     pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
+    u->rtpoll = pa_rtpoll_new();
+    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, u->thread_mq.inq);
     
     if (!(u->sink = pa_sink_new(m->core, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) {
         pa_log("Failed to create sink.");
@@ -282,5 +265,8 @@ void pa__done(pa_module*m) {
     if (u->sink)
         pa_sink_unref(u->sink);
 
+    if (u->rtpoll)
+        pa_rtpoll_free(u->rtpoll);
+    
     pa_xfree(u);
 }
index f445481..85fdd9c 100644 (file)
@@ -32,9 +32,7 @@
  *   the device. If none is avilable from the inputs, we write silence
  *   instead.
  *
- *   If power should be saved on IDLE this should be implemented in a
- *   special suspend-on-idle module that will put us into SUSPEND mode
- *   as soon and we're idle for too long.
+ *   If power should be saved on IDLE module-suspend-on-idle should be used.
  *
  */
 
 #include <sys/mman.h>
 #endif
 
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#else
-#include "poll.h"
-#endif
-
 #include <sys/soundcard.h>
 #include <sys/ioctl.h>
 #include <stdlib.h>
@@ -80,6 +72,7 @@
 #include <pulsecore/log.h>
 #include <pulsecore/macro.h>
 #include <pulsecore/thread-mq.h>
+#include <pulsecore/rtpoll.h>
 
 #include "oss-util.h"
 #include "module-oss-symdef.h"
@@ -108,8 +101,10 @@ struct userdata {
     pa_module *module;
     pa_sink *sink;
     pa_source *source;
+    
     pa_thread *thread;
     pa_thread_mq thread_mq;
+    pa_rtpoll *rtpoll;
 
     char *device_name;
     
@@ -135,6 +130,8 @@ struct userdata {
     pa_memblock **in_mmap_memblocks, **out_mmap_memblocks;
 
     int in_mmap_saved_nfrags, out_mmap_saved_nfrags;
+
+    pa_rtpoll_item *rtpoll_item;
 };
 
 static const char* const valid_modargs[] = {
@@ -156,10 +153,12 @@ static const char* const valid_modargs[] = {
 static void trigger(struct userdata *u, int quick) {
     int enable_bits = 0, zero = 0;
 
+    pa_assert(u);
+    
     if (u->fd < 0)
         return;
 
-    pa_log_debug("trigger"); 
+/*     pa_log_debug("trigger");  */
 
     if (u->source && PA_SOURCE_OPENED(u->source->thread_info.state))
         enable_bits |= PCM_ENABLE_INPUT;
@@ -479,6 +478,11 @@ static int suspend(struct userdata *u) {
     close(u->fd);
     u->fd = -1;
 
+    if (u->rtpoll_item) {
+        pa_rtpoll_item_free(u->rtpoll_item);
+        u->rtpoll_item = NULL;
+    }
+    
     pa_log_debug("Device suspended...");
     
     return 0;
@@ -490,6 +494,7 @@ static int unsuspend(struct userdata *u) {
     int frag_size, in_frag_size, out_frag_size;
     int in_nfrags, out_nfrags;
     struct audio_buf_info info;
+    struct pollfd *pollfd;
 
     pa_assert(u);
     pa_assert(u->fd < 0);
@@ -568,7 +573,15 @@ static int unsuspend(struct userdata *u) {
 
     u->out_mmap_current = u->in_mmap_current = 0;
     u->out_mmap_saved_nfrags = u->in_mmap_saved_nfrags = 0;
+
+    pa_assert(!u->rtpoll_item);
     
+    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, 1);
+    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+    pollfd->fd = u->fd;
+    pollfd->events = 0;
+    pollfd->revents = 0;
+
     pa_log_debug("Resumed successfully...");
 
     return 0;
@@ -777,15 +790,9 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
 }
 
 static void thread_func(void *userdata) {
-    enum {
-        POLLFD_ASYNCQ,
-        POLLFD_DSP,
-        POLLFD_MAX,
-    };
-
     struct userdata *u = userdata;
-    struct pollfd pollfd[POLLFD_MAX];
     int write_type = 0, read_type = 0;
+    unsigned short revents = 0;
 
     pa_assert(u);
 
@@ -795,46 +802,22 @@ static void thread_func(void *userdata) {
         pa_make_realtime();
 
     pa_thread_mq_install(&u->thread_mq);
+    pa_rtpoll_install(u->rtpoll);
 
     trigger(u, 0);
     
-    memset(&pollfd, 0, sizeof(pollfd));
-
-    pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->thread_mq.inq);
-    pollfd[POLLFD_ASYNCQ].events = POLLIN;
-    pollfd[POLLFD_DSP].fd = u->fd;
-
     for (;;) {
         pa_msgobject *object;
         int code;
         void *data;
         pa_memchunk chunk;
-        int r;
         int64_t offset;
 
 /*        pa_log("loop");    */
         
-        /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) {
-            int ret;
-
-/*             pa_log("processing msg"); */
-
-            if (!object && code == PA_MESSAGE_SHUTDOWN) {
-                pa_asyncmsgq_done(u->thread_mq.inq, 0);
-                goto finish;
-            }
-
-            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
-            pa_asyncmsgq_done(u->thread_mq.inq, ret);
-            continue;
-        } 
-
-/*         pa_log("loop2"); */
-
         /* Render some data and write it to the dsp */
 
-        if (u->sink && u->sink->thread_info.state != PA_SINK_DISCONNECTED && u->fd >= 0 && (pollfd[POLLFD_DSP].revents & POLLOUT)) {
+        if (u->sink && u->sink->thread_info.state != PA_SINK_DISCONNECTED && u->fd >= 0 && (revents & POLLOUT)) {
 
             if (u->use_mmap) {
                 int ret;
@@ -842,7 +825,7 @@ static void thread_func(void *userdata) {
                 if ((ret = mmap_write(u)) < 0)
                     goto fail;
 
-                pollfd[POLLFD_DSP].revents &= ~POLLOUT;
+                revents &= ~POLLOUT;
                 
                 if (ret > 0)
                     continue;
@@ -894,7 +877,7 @@ static void thread_func(void *userdata) {
                         else if (errno == EAGAIN) {
                             pa_log_debug("EAGAIN"); 
                             
-                            pollfd[POLLFD_DSP].revents &= ~POLLOUT;
+                            revents &= ~POLLOUT;
                             break;
                             
                         } else {
@@ -914,7 +897,7 @@ static void thread_func(void *userdata) {
                         
                         l -= t;
                         
-                        pollfd[POLLFD_DSP].revents &= ~POLLOUT;
+                        revents &= ~POLLOUT;
                     }
                     
                 } while (loop && l > 0);
@@ -925,7 +908,7 @@ static void thread_func(void *userdata) {
 
         /* Try to read some data and pass it on to the source driver */
 
-        if (u->source && u->source->thread_info.state != PA_SOURCE_DISCONNECTED && u->fd >= 0 && ((pollfd[POLLFD_DSP].revents & POLLIN))) {
+        if (u->source && u->source->thread_info.state != PA_SOURCE_DISCONNECTED && u->fd >= 0 && ((revents & POLLIN))) {
 
             if (u->use_mmap) {
                 int ret;
@@ -933,7 +916,7 @@ static void thread_func(void *userdata) {
                 if ((ret = mmap_read(u)) < 0)
                     goto fail;
 
-                pollfd[POLLFD_DSP].revents &= ~POLLIN;
+                revents &= ~POLLIN;
                 
                 if (ret > 0)
                     continue;
@@ -985,7 +968,7 @@ static void thread_func(void *userdata) {
                         else if (errno == EAGAIN) {
                             pa_log_debug("EAGAIN"); 
 
-                            pollfd[POLLFD_DSP].revents &= ~POLLIN;
+                            revents &= ~POLLIN;
                             break;
 
                         } else {
@@ -1002,7 +985,7 @@ static void thread_func(void *userdata) {
 
                         l -= t;
 
-                        pollfd[POLLFD_DSP].revents &= ~POLLIN;
+                        revents &= ~POLLIN;
                     }
                 } while (loop && l > 0);
 
@@ -1010,46 +993,53 @@ static void thread_func(void *userdata) {
             }
         }
 
-        if (u->fd >= 0) {
-            pollfd[POLLFD_DSP].fd = u->fd;
-            pollfd[POLLFD_DSP].events =
-                ((u->source && PA_SOURCE_OPENED(u->source->thread_info.state)) ? POLLIN : 0) |
-                ((u->sink && PA_SINK_OPENED(u->sink->thread_info.state)) ? POLLOUT : 0);
-        }
-            
-        /* Hmm, nothing to do. Let's sleep */
+/*         pa_log("loop2"); */
 
-        if (pa_asyncmsgq_before_poll(u->thread_mq.inq) < 0)
+        /* Check whether there is a message for us to process */
+        if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) {
+            int ret;
+
+/*             pa_log("processing msg"); */
+
+            if (!object && code == PA_MESSAGE_SHUTDOWN) {
+                pa_asyncmsgq_done(u->thread_mq.inq, 0);
+                goto finish;
+            }
+
+            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
+            pa_asyncmsgq_done(u->thread_mq.inq, ret);
             continue;
+        } 
 
-/*         pa_log("polling for %i (legend: %i=POLLIN, %i=POLLOUT)", u->fd >= 0 ? pollfd[POLLFD_DSP].events : -1, POLLIN, POLLOUT); */
-        r = poll(pollfd, u->fd >= 0 ? POLLFD_MAX : POLLFD_DSP, -1);
-/*         pa_log("polling got dsp=%i amq=%i (%i)", r > 0 ? pollfd[POLLFD_DSP].revents : 0, r > 0 ? pollfd[POLLFD_ASYNCQ].revents : 0, r); */
+        if (u->fd >= 0) {
+            struct pollfd *pollfd;
 
-        pa_asyncmsgq_after_poll(u->thread_mq.inq);
+            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+            pollfd->events =
+                ((u->source && PA_SOURCE_OPENED(u->source->thread_info.state)) ? POLLIN : 0) |
+                ((u->sink && PA_SINK_OPENED(u->sink->thread_info.state)) ? POLLOUT : 0);
+        }
 
-        if (u->fd < 0)
-            pollfd[POLLFD_DSP].revents = 0;
         
-        if (r < 0) {
-            if (errno == EINTR) {
-                pollfd[POLLFD_ASYNCQ].revents = 0;
-                pollfd[POLLFD_DSP].revents = 0;
-                continue;
-            }
-
+        /* Hmm, nothing to do. Let's sleep */
+        if (pa_rtpoll_run(u->rtpoll) < 0) {
             pa_log("poll() failed: %s", pa_cstrerror(errno));
             goto fail;
         }
 
-        pa_assert(r > 0);
-
-        if (pollfd[POLLFD_DSP].revents & ~(POLLOUT|POLLIN)) {
-            pa_log("DSP shutdown.");
-            goto fail;
-        }
+        if (u->fd >= 0) {
+            struct pollfd *pollfd;
+            
+            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+            
+            if (pollfd->revents & ~(POLLOUT|POLLIN)) {
+                pa_log("DSP shutdown.");
+                goto fail;
+            }
 
-        pa_assert((pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0);
+            revents = pollfd->revents;
+        } else
+            revents = 0;
     }
 
 fail:
@@ -1077,6 +1067,7 @@ int pa__init(pa_module*m) {
     char hwdesc[64], *t;
     const char *name;
     int namereg_fail;
+    struct pollfd *pollfd;
 
     pa_assert(m);
 
@@ -1165,7 +1156,14 @@ int pa__init(pa_module*m) {
     u->out_fragment_size = u->in_fragment_size = u->frag_size = frag_size;
     u->use_mmap = use_mmap;
     pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
-
+    u->rtpoll = pa_rtpoll_new();
+    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, u->thread_mq.inq);
+    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, 1);
+    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+    pollfd->fd = fd;
+    pollfd->events = 0;
+    pollfd->revents = 0;
+    
     if (ioctl(fd, SNDCTL_DSP_GETISPACE, &info) >= 0) {
         pa_log_info("Input -- %u fragments of size %u.", info.fragstotal, info.fragsize);
         u->in_fragment_size = info.fragsize;
@@ -1294,14 +1292,14 @@ go_on:
         goto fail;
     }
 
-    pa_modargs_free(ma);
-
     /* Read mixer settings */
     if (u->source)
         pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_GET_VOLUME, &u->source->volume, 0, NULL, NULL);
     if (u->sink)
         pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_GET_VOLUME, &u->sink->volume, 0, NULL, NULL);
 
+    pa_modargs_free(ma);
+
     return 0;
 
 fail:
@@ -1343,10 +1341,16 @@ void pa__done(pa_module*m) {
 
     if (u->source)
         pa_source_unref(u->source);
-
+    
     if (u->memchunk.memblock)
         pa_memblock_unref(u->memchunk.memblock);
 
+    if (u->rtpoll_item)
+        pa_rtpoll_item_free(u->rtpoll_item);
+    
+    if (u->rtpoll)
+        pa_rtpoll_free(u->rtpoll);
+    
     if (u->out_mmap_memblocks) {
         unsigned i;
         for (i = 0; i < u->out_nfrags; i++)
index 2f82cae..a1101ab 100644 (file)
@@ -34,7 +34,6 @@
 #include <unistd.h>
 #include <limits.h>
 #include <sys/ioctl.h>
-#include <sys/poll.h>
 
 #include <pulse/xmalloc.h>
 
@@ -46,6 +45,7 @@
 #include <pulsecore/log.h>
 #include <pulsecore/thread.h>
 #include <pulsecore/thread-mq.h>
+#include <pulsecore/rtpoll.h>
 
 #include "module-pipe-sink-symdef.h"
 
@@ -67,12 +67,17 @@ struct userdata {
     pa_core *core;
     pa_module *module;
     pa_sink *sink;
+    
     pa_thread *thread;
     pa_thread_mq thread_mq;
+    pa_rtpoll *rtpoll;
+    
     char *filename;
     int fd;
 
     pa_memchunk memchunk;
+
+    pa_rtpoll_item *rtpoll_item;
 };
 
 static const char* const valid_modargs[] = {
@@ -108,14 +113,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 }
 
 static void thread_func(void *userdata) {
-    enum {
-        POLLFD_ASYNCQ,
-        POLLFD_FIFO,
-        POLLFD_MAX,
-    };
-    
     struct userdata *u = userdata;
-    struct pollfd pollfd[POLLFD_MAX];
     int write_type = 0;
 
     pa_assert(u);
@@ -123,38 +121,21 @@ static void thread_func(void *userdata) {
     pa_log_debug("Thread starting up");
 
     pa_thread_mq_install(&u->thread_mq);
-
-    memset(&pollfd, 0, sizeof(pollfd));
-    
-    pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->thread_mq.inq);
-    pollfd[POLLFD_ASYNCQ].events = POLLIN;
-    pollfd[POLLFD_FIFO].fd = u->fd;
+    pa_rtpoll_install(u->rtpoll);
 
     for (;;) {
         pa_msgobject *object;
         int code;
         void *data;
         pa_memchunk chunk;
-        int r;
         int64_t offset;
-
-        /* Check whether there is a message for us to process */
-        if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) {
-            int ret;
-
-            if (!object && code == PA_MESSAGE_SHUTDOWN) {
-                pa_asyncmsgq_done(u->thread_mq.inq, 0);
-                goto finish;
-            }
-
-            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
-            pa_asyncmsgq_done(u->thread_mq.inq, ret);
-            continue;
-        }
+        struct pollfd *pollfd;
 
         /* Render some data and write it to the fifo */
 
-        if (u->sink->thread_info.state == PA_SINK_RUNNING && pollfd[POLLFD_FIFO].revents) {
+        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+        
+        if (u->sink->thread_info.state == PA_SINK_RUNNING && pollfd->revents) {
             ssize_t l;
             void *p;
 
@@ -188,41 +169,37 @@ static void thread_func(void *userdata) {
                     pa_memchunk_reset(&u->memchunk);
                 }
 
-                pollfd[POLLFD_FIFO].revents = 0;
-                continue;
+                pollfd->revents = 0;
             }
         }
 
-        pollfd[POLLFD_FIFO].events = u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0;
+        /* Check whether there is a message for us to process */
+        if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) {
+            int ret;
 
-        /* Hmm, nothing to do. Let's sleep */
+            if (!object && code == PA_MESSAGE_SHUTDOWN) {
+                pa_asyncmsgq_done(u->thread_mq.inq, 0);
+                goto finish;
+            }
 
-        if (pa_asyncmsgq_before_poll(u->thread_mq.inq) < 0)
+            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
+            pa_asyncmsgq_done(u->thread_mq.inq, ret);
             continue;
+        }
+        
+        /* Hmm, nothing to do. Let's sleep */
+        pollfd->events = u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0;
 
-/*         pa_log("polling for %u", pollfd[POLLFD_FIFO].events);  */
-        r = poll(pollfd, POLLFD_MAX, -1);
-/*         pa_log("polling got %u", r > 0 ? pollfd[POLLFD_FIFO].revents : 0);  */
-
-        pa_asyncmsgq_after_poll(u->thread_mq.inq);
-
-        if (r < 0) {
-            if (errno == EINTR) {
-                pollfd[POLLFD_ASYNCQ].revents = 0;
-                pollfd[POLLFD_FIFO].revents = 0;
-                continue;
-            }
-
+        if (pa_rtpoll_run(u->rtpoll) < 0) {
             pa_log("poll() failed: %s", pa_cstrerror(errno));
             goto fail;
         }
 
-        if (pollfd[POLLFD_FIFO].revents & ~POLLOUT) {
+        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+        if (pollfd->revents & ~POLLOUT) {
             pa_log("FIFO shutdown.");
             goto fail;
         }
-
-        pa_assert((pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0);
     }
 
 fail:
@@ -242,6 +219,7 @@ int pa__init(pa_module*m) {
     pa_channel_map map;
     pa_modargs *ma;
     char *t;
+    struct pollfd *pollfd;
 
     pa_assert(m);
 
@@ -262,6 +240,8 @@ int pa__init(pa_module*m) {
     m->userdata = u;
     pa_memchunk_reset(&u->memchunk);
     pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
+    u->rtpoll = pa_rtpoll_new();
+    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, u->thread_mq.inq);
     
     u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
 
@@ -297,6 +277,11 @@ int pa__init(pa_module*m) {
     pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", u->filename));
     pa_xfree(t);
 
+    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, 1);
+    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+    pollfd->fd = u->fd;
+    pollfd->events = pollfd->revents = 0;
+   
     if (!(u->thread = pa_thread_new(thread_func, u))) {
         pa_log("Failed to create thread.");
         goto fail;
@@ -339,6 +324,12 @@ void pa__done(pa_module*m) {
     if (u->memchunk.memblock)
        pa_memblock_unref(u->memchunk.memblock);
 
+    if (u->rtpoll_item)
+        pa_rtpoll_item_free(u->rtpoll_item);
+    
+    if (u->rtpoll)
+        pa_rtpoll_free(u->rtpoll);
+
     if (u->filename) {
         unlink(u->filename);
         pa_xfree(u->filename);