Refactor a2dp thread execution flow and improve time estimation
authorJoão Paulo Rechi Vita <joao.vita@gmail.com>
Tue, 19 Aug 2008 19:06:21 +0000 (16:06 -0300)
committerLennart Poettering <lennart@poettering.net>
Wed, 10 Sep 2008 22:12:07 +0000 (01:12 +0300)
src/modules/module-bt-device.c

index 4e1b051..b5eccb3 100644 (file)
 #include <pulse/xmalloc.h>
 #include <pulse/timeval.h>
 #include <pulse/sample.h>
-#include <pulsecore/core-util.h>
-#include <pulsecore/core-error.h>
 #include <pulsecore/module.h>
 #include <pulsecore/modargs.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/core-error.h>
+#include <pulsecore/socket-util.h>
 #include <pulsecore/thread.h>
 #include <pulsecore/thread-mq.h>
 #include <pulsecore/rtpoll.h>
@@ -505,20 +506,20 @@ static int bt_getstreamfd(struct userdata *u) {
         return -errno;
     }
 
-//    if (u->transport == BT_CAPABILITIES_TRANSPORT_A2DP) {
-//        if (setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDTIMEO, &t, sizeof(t)) < 0) {
-//            pa_log_error("failed to set socket options for A2DP: %s (%d)",pa_cstrerror(errno), errno);
-//            return -errno;
-//        }
-//    }
-//    else {
-//        if (setsockopt(u->stream_fd, SOL_SCO, SCO_TXBUFS, &period_count, sizeof(period_count)) == 0)
-//            return 0;
-//        if (setsockopt(u->stream_fd, SOL_SCO, SO_SNDBUF, &period_count, sizeof(period_count)) == 0)
-//            return 0;
-//        /* FIXME : handle error codes */
-//    }
+    if (u->transport == BT_CAPABILITIES_TRANSPORT_A2DP) {
+        if (pa_socket_set_sndbuf(u->stream_fd, 10*u->link_mtu) < 0) {
+            pa_log_error("failed to set socket options for A2DP: %s (%d)",pa_cstrerror(errno), errno);
+            return -errno;
+        }
+    }
+
+//   if (setsockopt(u->stream_fd, SOL_SCO, SCO_TXBUFS, &period_count, sizeof(period_count)) == 0)
+//       return 0;
+//   if (setsockopt(u->stream_fd, SOL_SCO, SO_SNDBUF, &period_count, sizeof(period_count)) == 0)
+//       return 0;
+//   /* FIXME : handle error codes */
     pa_make_fd_nonblock(u->stream_fd);
+//    pa_make_socket_low_delay(u->stream_fd);
 
     return 0;
 }
@@ -692,11 +693,10 @@ finish:
 
 static void a2dp_thread_func(void *userdata) {
     struct userdata *u = userdata;
-    int write_type = 0;
 
     pa_assert(u);
 
-    pa_log/*_debug*/("A2DP Thread starting up");
+    pa_log_debug("A2DP Thread starting up");
 
     pa_thread_mq_install(&u->thread_mq);
     pa_rtpoll_install(u->rtpoll);
@@ -709,127 +709,107 @@ static void a2dp_thread_func(void *userdata) {
 
         if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
             if (u->sink->thread_info.rewind_requested) {
-                pa_log("rewind_requested");
                 pa_sink_process_rewind(u->sink, 0);
-                pa_log("rewind_finished");
             }
         }
 
         pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
 
-        /* Render some data and write it to the fifo */
         if (PA_SINK_IS_OPENED(u->sink->thread_info.state) && pollfd->revents) {
             pa_usec_t usec;
             int64_t n;
-            pa_log("Render some data and write it to the fifo");
-
-            for (;;) {
-                ssize_t l;
+            ssize_t l;
+            int write_type = 0, written;
+            struct bt_a2dp *a2dp = &u->a2dp;
+            struct rtp_header *header = (void *) a2dp->buffer;
+            struct rtp_payload *payload = (void *) (a2dp->buffer + sizeof(*header));
+
+            do {
+                /* Render some data */
                 void *p;
-                struct bt_a2dp *a2dp = &u->a2dp;
-                int frame_size, encoded, written;
+                int frame_size, encoded;
 
                 u->memchunk.memblock = pa_memblock_new(u->mempool, u->block_size);
-                pa_log("memblock allocated %d", u->block_size);
+                pa_log_debug("memblock asked size%d", u->block_size);
                 u->memchunk.length = pa_memblock_get_length(u->memchunk.memblock);
-                pa_log("memchunk length %d", u->memchunk.length);
+                pa_log_debug("memchunk length %d", u->memchunk.length);
                 pa_sink_render_into_full(u->sink, &u->memchunk);
-                pa_log("rendered");
+                pa_log_debug("rendered");
 
                 pa_assert(u->memchunk.length > 0);
 
                 p = pa_memblock_acquire(u->memchunk.memblock);
-                pa_log("memblock acquired");
-
                 frame_size = sbc_get_frame_length(&a2dp->sbc);
-                pa_log("frame_size: %d", frame_size);
+                pa_log_debug("frame_size: %d", frame_size);
 
                 encoded = sbc_encode(&a2dp->sbc, (uint8_t*) p, a2dp->codesize, a2dp->buffer + a2dp->count,
                         sizeof(a2dp->buffer) - a2dp->count, &written);
-                pa_log("encoded: %d", encoded);
-                pa_log("written: %d", encoded);
+                pa_log_debug("encoded: %d", encoded);
+                pa_log_debug("written: %d", written);
                 if (encoded <= 0) {
                     pa_log_error("SBC encoding error (%d)", encoded);
                     goto fail;
                 }
                 pa_memblock_release(u->memchunk.memblock);
-                pa_log("memblock released");
                 pa_memblock_unref(u->memchunk.memblock);
-                pa_log("memblock unrefered");
                 pa_memchunk_reset(&u->memchunk);
-                pa_log("memchunk reseted");
+                pa_log_debug("memchunk reseted");
 
                 a2dp->count += written;
                 a2dp->frame_count++;
                 a2dp->samples += encoded / frame_size;
                 a2dp->nsamples += encoded / frame_size;
 
-                if (a2dp->count + written >= u->link_mtu) {
-                    struct rtp_header *header = (void *) a2dp->buffer;
-                    struct rtp_payload *payload = (void *) (a2dp->buffer + sizeof(*header));
-
-                    memset(a2dp->buffer, 0, sizeof(*header) + sizeof(*payload));
+            } while (a2dp->count + written <= u->link_mtu);
 
-                    payload->frame_count = a2dp->frame_count;
-                    header->v = 2;
-                    header->pt = 1;
-                    header->sequence_number = htons(a2dp->seq_num);
-                    header->timestamp = htonl(a2dp->nsamples);
-                    header->ssrc = htonl(1);
+            /* write it to the fifo */
+            memset(a2dp->buffer, 0, sizeof(*header) + sizeof(*payload));
+            payload->frame_count = a2dp->frame_count;
+            header->v = 2;
+            header->pt = 1;
+            header->sequence_number = htons(a2dp->seq_num);
+            header->timestamp = htonl(a2dp->nsamples);
+            header->ssrc = htonl(1);
 
-                    l = pa_write(u->stream_fd, a2dp->buffer, a2dp->count, 0);
-                    pa_log("avdtp_write %d", a2dp->count);
+avdtp_write:
+            l = pa_write(u->stream_fd, a2dp->buffer, a2dp->count, write_type);
+            pa_log_debug("avdtp_write: requested %d bytes; written %d bytes", a2dp->count, l);
 
-                    if (l > 0) {
-                        /* Reset buffer of data to send */
-                        a2dp->count = sizeof(struct rtp_header) + sizeof(struct rtp_payload);
-                        a2dp->frame_count = 0;
-                        a2dp->samples = 0;
-                        a2dp->seq_num++;
-                    }
+            pa_assert(l != 0);
 
-                    pa_log("avdtp written %d bytes", l);
-
-                    pa_assert(l != 0);
-
-                    if (l < 0) {
-                        pa_log("l = %d < 0", l);
-                        if (errno == EINTR) {
-                            pa_log("EINTR");
-                            continue;
-                        }
-                        else if (errno == EAGAIN) {
-                            pa_log("EAGAIN");
-                            goto filled_up;
-                        }
-                        else {
-                            pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
-                            goto fail;
-                        }
-                    } else {
-                        pa_log("l = %d >= 0", l);
-                        u->offset += l;
-                        pollfd->revents = 0;
-                        goto filled_up;
-                    }
+            if (l < 0) {
+                if (errno == EINTR) {
+                    pa_log_debug("EINTR");
+                    continue;
+                }
+                else if (errno == EAGAIN) {
+                    pa_log_debug("EAGAIN");
+                    goto avdtp_write;
+                }
+                else {
+                    pa_log_error("Failed to write data to FIFO: %s", pa_cstrerror(errno));
+                    goto fail;
                 }
             }
 
-filled_up:
+            u->offset += a2dp->codesize*a2dp->frame_count;
+            pollfd->revents = 0;
 
-            pa_log("filled_up");
-            /* At this spot we know that the socket buffers are fully filled up.
-             * This is the best time to estimate the playback position of the server */
-            n = u->offset;
+            /* Reset buffer of data to send */
+            a2dp->count = sizeof(struct rtp_header) + sizeof(struct rtp_payload);
+            a2dp->frame_count = 0;
+            a2dp->samples = 0;
+            a2dp->seq_num++;
 
+            /* feed the time smoother */
+            n = u->offset;
 #ifdef SIOCOUTQ
             {
-                int l;
-                if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0)
-                    n -= l;
+                int ll;
+                if (ioctl(u->fd, SIOCOUTQ, &ll) >= 0 && ll > 0)
+                    n -= ll;
             }
 #endif
-
             usec = pa_bytes_to_usec(n, &u->sink->sample_spec);
             if (usec > u->latency)
                 usec -= u->latency;
@@ -839,35 +819,34 @@ filled_up:
         }
 
         /* Hmm, nothing to do. Let's sleep */
-        pa_log("let's sleep");
+        pa_log_debug("A2DP thread going to sleep");
         pollfd->events = PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;
-
         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0) {
             pa_log("ret < 0");
             goto fail;
         }
-        pa_log("waking up");
+        pa_log_debug("A2DP thread waking up");
 
         if (ret == 0) {
-            pa_log("ret == 0");
+            pa_log_warn("ret == 0");
             goto finish;
         }
 
         pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
         if (pollfd->revents & ~POLLOUT) {
-            pa_log("FIFO shutdown.");
+            pa_log_error("FIFO shutdown.");
             goto fail;
         }
     }
 
 fail:
     /* If this was no regular exit from the loop we have to continue processing messages until we receive PA_MESSAGE_SHUTDOWN */
-    pa_log("fail");
+    pa_log_debug("A2DP thread failed");
     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
 
 finish:
-    pa_log_debug("A2DP Thread shutting down");
+    pa_log_debug("A2DP thread shutting down");
 }
 
 int pa__init(pa_module* m) {