Still send silence when we are not doing anything else, but also flush the buffers...
authorColin Guthrie <pulse@colin.guthr.ie>
Wed, 11 Jun 2008 00:02:10 +0000 (00:02 +0000)
committerColin Guthrie <pulse@colin.guthr.ie>
Wed, 8 Oct 2008 19:32:09 +0000 (20:32 +0100)
Close the RTP socket correctly after passing messages about.
When not sending silence, the RTSP socket will be closed after some period of inactivity. I'm not sure why this is.
Sending silence keeps things working and with the flushes after suspension we now get a better latency. As this relies on the auto-suspend feature, it's not exactly ideal.
Typical latencies are currently about 3s which makes it more or less usuable for listening to music.
If the connection is disconnected, it will reconnect but I've found that the second connection is silent. Hopefully the silence will prevent the first connection dropping.
Refs #69

git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/coling@2504 fefdeb5f-60dc-0310-8127-8f9354f1896f

src/modules/module-raop-sink.c

index 96c98a6..51c2368 100644 (file)
@@ -127,9 +127,31 @@ static const char* const valid_modargs[] = {
 };
 
 enum {
-    SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX
+    SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX,
+    SINK_MESSAGE_RIP_SOCKET
 };
 
+static void on_connection(PA_GCC_UNUSED int fd, void*userdata) {
+    struct userdata *u = userdata;
+    pa_assert(u);
+
+    pa_assert(u->fd < 0);
+    u->fd = fd;
+
+    pa_log_debug("Connection authenticated, handing fd to IO thread...");
+
+    pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
+}
+
+static void on_close(void*userdata) {
+    struct userdata *u = userdata;
+    pa_assert(u);
+
+    pa_log_debug("Connection closed, informing IO thread...");
+
+    pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL);
+}
+
 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
 
@@ -143,14 +165,27 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
                     pa_assert(PA_SINK_OPENED(u->sink->thread_info.state));
 
                     pa_smoother_pause(u->smoother, pa_rtclock_usec());
+
+                    /* Issue a FLUSH if we are connected */
+                    if (u->fd >= 0) {
+                        pa_raop_flush(u->raop);
+                    }
                     break;
 
                 case PA_SINK_IDLE:
                 case PA_SINK_RUNNING:
 
-                    if (u->sink->thread_info.state == PA_SINK_SUSPENDED)
+                    if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
                         pa_smoother_resume(u->smoother, pa_rtclock_usec());
 
+                        /* The connection can be closed when idle, so check to
+                           see if we need to reestablish it */
+                        if (u->fd < 0)
+                            pa_raop_connect(u->raop);
+                        else
+                            pa_raop_flush(u->raop);
+                    }
+
                     break;
 
                 case PA_SINK_UNLINKED:
@@ -179,8 +214,34 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
             pollfd->fd = u->fd;
             pollfd->events = POLLOUT;
-            pollfd->revents = 0;
+            /*pollfd->events = */pollfd->revents = 0;
+
+            if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+                /* Our stream has been suspended so we just flush it.... */
+                pa_raop_flush(u->raop);
+            }
+            return 0;
+        }
+
+        case SINK_MESSAGE_RIP_SOCKET: {
+            pa_assert(u->fd >= 0);
 
+            pa_close(u->fd);
+            u->fd = -1;
+
+            if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
+
+                pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
+
+                if (u->rtpoll_item)
+                    pa_rtpoll_item_free(u->rtpoll_item);
+                u->rtpoll_item = NULL;
+            } else {
+                /* Quesiton: is this valid here: or should we do some sort of:
+                   return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
+                   ?? */
+                pa_module_unload_request(u->module);
+            }
             return 0;
         }
     }
@@ -215,7 +276,7 @@ static void thread_func(void *userdata) {
             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
 
             /* Render some data and write it to the fifo */
-            if (pollfd->revents) {
+            if (/*PA_SINK_OPENED(u->sink->thread_info.state) && */pollfd->revents) {
                 pa_usec_t usec;
                 int64_t n;
                 void *p;
@@ -264,9 +325,10 @@ static void thread_func(void *userdata) {
                             u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);
                         } else {
                             /* We render some silence into our memchunk */
-                            u->encoding_overhead += u->next_encoding_overhead;
                             memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk));
                             pa_memblock_ref(silence.memblock);
+
+                            /* Calculate/store some values to be used with the smoother */
                             u->next_encoding_overhead = silence_overhead;
                             u->encoding_ratio = silence_ratio;
                         }
@@ -302,12 +364,15 @@ static void thread_func(void *userdata) {
 
                         pollfd->revents = 0;
 
-                        if (u->encoded_memchunk.length > 0)
+                        if (u->encoded_memchunk.length > 0) {
+                            /* we've completely written the encoded data, so update our overhead */
+                            u->encoding_overhead += u->next_encoding_overhead;
 
                             /* OK, we wrote less that we asked for,
                              * hence we can assume that the socket
                              * buffers are full now */
                             goto filled_up;
+                        }
                     }
                 }
 
@@ -338,7 +403,7 @@ static void thread_func(void *userdata) {
             }
 
             /* Hmm, nothing to do. Let's sleep */
-            /* pollfd->events = PA_SINK_OPENED(u->sink->thread_info.state)  ? POLLOUT : 0; */
+            pollfd->events = POLLOUT; /*PA_SINK_OPENED(u->sink->thread_info.state)  ? POLLOUT : 0;*/
         }
 
         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
@@ -353,8 +418,16 @@ static void thread_func(void *userdata) {
             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
 
             if (pollfd->revents & ~POLLOUT) {
-                pa_log("FIFO shutdown.");
-                goto fail;
+                if (u->sink->thread_info.state != PA_SINK_SUSPENDED) {
+                    pa_log("FIFO shutdown.");
+                    goto fail;
+                }
+
+                /* We expect this to happen on occasion if we are not sending data.
+                   It's perfectly natural and normal and natural */
+                if (u->rtpoll_item)
+                    pa_rtpoll_item_free(u->rtpoll_item);
+                u->rtpoll_item = NULL;
             }
         }
     }
@@ -371,26 +444,6 @@ finish:
     pa_log_debug("Thread shutting down");
 }
 
-static void on_connection(PA_GCC_UNUSED int fd, void*userdata) {
-    struct userdata *u = userdata;
-    pa_assert(u);
-
-    pa_assert(u->fd < 0);
-    u->fd = fd;
-
-    pa_log_debug("Connection authenticated, handing fd to IO thread...");
-
-    pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
-}
-
-static void on_close(void*userdata) {
-    struct userdata *u = userdata;
-    pa_assert(u);
-
-    pa_log_debug("Control connection closed.");
-    pa_module_unload_request(u->module);
-}
-
 int pa__init(pa_module*m) {
     struct userdata *u = NULL;
     const char *p;