Initial code for rt thread
authorJoão Paulo Rechi Vita <joao.vita@gmail.com>
Mon, 11 Aug 2008 21:10:14 +0000 (18:10 -0300)
committerLennart Poettering <lennart@poettering.net>
Wed, 10 Sep 2008 22:12:04 +0000 (01:12 +0300)
src/modules/module-bt-device.c

index 9a40c55..c0ca637 100644 (file)
 #endif
 
 #include <string.h>
+#include <errno.h>
+#include <poll.h>
 
 #include <pulse/xmalloc.h>
 #include <pulse/timeval.h>
+#include <pulsecore/core-error.h>
 #include <pulsecore/module.h>
 #include <pulsecore/modargs.h>
 #include <pulsecore/thread.h>
@@ -78,7 +81,7 @@ struct userdata {
     pa_rtpoll_item *rtpoll_item;
     pa_thread *thread;
 
-    pa_sample_spec ss;
+    int64_t offset;
     pa_smoother *smoother;
 
     pa_memchunk memchunk;
@@ -88,11 +91,15 @@ struct userdata {
     const char *profile;
     int rate;
     int channels;
+    pa_sample_spec ss;
 
     int audioservice_fd;
     int stream_fd;
+
     int transport;
     int link_mtu;
+    size_t block_size;
+    pa_usec_t latency;
 
     struct bt_a2dp a2dp;
 };
@@ -235,7 +242,6 @@ static uint8_t default_bitpool(uint8_t freq, uint8_t mode) {
 static int bt_a2dp_init(struct userdata *u) {
     sbc_capabilities_t *cap = &u->a2dp.sbc_capabilities;
     unsigned int max_bitpool, min_bitpool, rate, channels;
-    int dir;
 
     switch (u->rate) {
         case 48000:
@@ -427,7 +433,7 @@ static int bt_setconf(struct userdata *u) {
     }
 
     u->transport = setconf_rsp->transport;
-    u->link_mtu = setconf_rsp->link_mtu;
+    u->block_size = u->link_mtu = setconf_rsp->link_mtu;
 
     /* setup SBC encoder now we agree on parameters */
     if (u->transport == BT_CAPABILITIES_TRANSPORT_A2DP) {
@@ -440,7 +446,7 @@ static int bt_setconf(struct userdata *u) {
 }
 
 static int bt_getstreamfd(struct userdata *u) {
-    int e, opt_name;
+    int e/*, opt_name*/;
     char buf[BT_AUDIO_IPC_PACKET_SIZE];
     struct bt_streamstart_req *start_req = (void*) buf;
     bt_audio_rsp_msg_header_t *rsp_hdr = (void*) buf;
@@ -532,10 +538,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             break;
 
         case PA_SINK_MESSAGE_GET_LATENCY: {
-//            pa_usec_t w, r;
-//            r = pa_smoother_get(u->smoother, pa_rtclock_usec());
-//            w = pa_bytes_to_usec(u->offset + u->memchunk.length, &u->sink->sample_spec);
-//            *((pa_usec_t*) data) = w > r ? w - r : 0;
+            pa_usec_t w, r;
+            r = pa_smoother_get(u->smoother, pa_rtclock_usec());
+            w = pa_bytes_to_usec(u->offset + u->memchunk.length, &u->sink->sample_spec);
+            *((pa_usec_t*) data) = w > r ? w - r : 0;
             break;
         }
 
@@ -554,94 +560,94 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
 }
 
 static void thread_func(void *userdata) {
-//    struct userdata *u = userdata;
-//    int write_type = 0;
-//
-//    pa_assert(u);
-//
-//    pa_log_debug("Thread starting up");
-//
-//    pa_thread_mq_install(&u->thread_mq);
-//    pa_rtpoll_install(u->rtpoll);
-//
-//    pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
-//
-//    for (;;) {
-//        int ret;
-//
-//        if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
-//            if (u->sink->thread_info.rewind_requested)
-//                pa_sink_process_rewind(u->sink, 0);
-//
-//        if (u->rtpoll_item) {
-//            struct pollfd *pollfd;
-//            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
-//
-//            /* Render some data and write it to the fifo */
-//            if (PA_SINK_IS_OPENED(u->sink->thread_info.state) && pollfd->revents) {
-//                pa_usec_t usec;
-//                int64_t n;
-//
-//                for (;;) {
-//                    ssize_t l;
-//                    void *p;
-//
-//                    if (u->memchunk.length <= 0)
-//                        pa_sink_render(u->sink, u->block_size, &u->memchunk);
-//
-//                    pa_assert(u->memchunk.length > 0);
-//
-//                    p = pa_memblock_acquire(u->memchunk.memblock);
-//                    l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type);
-//                    pa_memblock_release(u->memchunk.memblock);
-//
-//                    pa_assert(l != 0);
-//
-//                    if (l < 0) {
-//
-//                        if (errno == EINTR)
-//                            continue;
-//                        else if (errno == EAGAIN) {
-//
-//                            /* OK, we filled all socket buffers up
-//                             * now. */
-//                            goto filled_up;
-//
-//                        } else {
-//                            pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
-//                            goto fail;
-//                        }
-//
-//                    } else {
-//                        u->offset += l;
-//
-//                        u->memchunk.index += l;
-//                        u->memchunk.length -= l;
-//
-//                        if (u->memchunk.length <= 0) {
-//                            pa_memblock_unref(u->memchunk.memblock);
-//                            pa_memchunk_reset(&u->memchunk);
-//                        }
-//
-//                        pollfd->revents = 0;
-//
-//                        if (u->memchunk.length > 0)
-//
-//                            /* OK, we wrote less that we asked for,
-//                             * hence we can assume that the socket
-//                             * buffers are full now */
-//                            goto filled_up;
-//                    }
-//                }
-//
-//            filled_up:
-//
-//                /* At this spot we know that the socket buffers are
-//                 * fully filled up. This is the best time to estimate
-//                 * the playback position of the server */
-//
-//                n = u->offset;
-//
+    struct userdata *u = userdata;
+    int write_type = 0;
+
+    pa_assert(u);
+
+    pa_log/*_debug*/("Thread starting up");
+
+    pa_thread_mq_install(&u->thread_mq);
+    pa_rtpoll_install(u->rtpoll);
+
+    pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
+
+    for (;;) {
+        int ret;
+
+        if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
+            if (u->sink->thread_info.rewind_requested)
+                pa_sink_process_rewind(u->sink, 0);
+
+        if (u->rtpoll_item) {
+            struct pollfd *pollfd;
+            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+
+            /* Render some data and write it to the fifo */
+            if (PA_SINK_IS_OPENED(u->sink->thread_info.state) && pollfd->revents) {
+                pa_usec_t usec;
+                int64_t n;
+
+                for (;;) {
+                    ssize_t l;
+                    void *p;
+
+                    if (u->memchunk.length <= 0)
+                        pa_sink_render(u->sink, u->block_size, &u->memchunk);
+
+                    pa_assert(u->memchunk.length > 0);
+
+                    p = pa_memblock_acquire(u->memchunk.memblock);
+                    l = pa_write(u->stream_fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type);
+                    pa_memblock_release(u->memchunk.memblock);
+
+                    pa_assert(l != 0);
+
+                    if (l < 0) {
+
+                        if (errno == EINTR)
+                            continue;
+                        else if (errno == EAGAIN) {
+
+                            /* OK, we filled all socket buffers up
+                             * now. */
+                            goto filled_up;
+
+                        } else {
+                            pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
+                            goto fail;
+                        }
+
+                    } else {
+                        u->offset += l;
+
+                        u->memchunk.index += l;
+                        u->memchunk.length -= l;
+
+                        if (u->memchunk.length <= 0) {
+                            pa_memblock_unref(u->memchunk.memblock);
+                            pa_memchunk_reset(&u->memchunk);
+                        }
+
+                        pollfd->revents = 0;
+
+                        if (u->memchunk.length > 0)
+
+                            /* OK, we wrote less that we asked for,
+                             * hence we can assume that the socket
+                             * buffers are full now */
+                            goto filled_up;
+                    }
+                }
+
+            filled_up:
+
+                /* At this spot we know that the socket buffers are
+                 * fully filled up. This is the best time to estimate
+                 * the playback position of the server */
+
+                n = u->offset;
+
 //#ifdef SIOCOUTQ
 //                {
 //                    int l;
@@ -649,52 +655,52 @@ static void thread_func(void *userdata) {
 //                        n -= l;
 //                }
 //#endif
-//
-//                usec = pa_bytes_to_usec(n, &u->sink->sample_spec);
-//
-//                if (usec > u->latency)
-//                    usec -= u->latency;
-//                else
-//                    usec = 0;
-//
-//                pa_smoother_put(u->smoother, pa_rtclock_usec(), usec);
-//            }
-//
-//            /* Hmm, nothing to do. Let's sleep */
-//            pollfd->events = PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;
-//        }
-//
-//        if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
-//            goto fail;
-//
-//        if (ret == 0)
-//            goto finish;
-//
-//        if (u->rtpoll_item) {
-//            struct pollfd* pollfd;
-//
-//            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
-//
-//            if (pollfd->revents & ~POLLOUT) {
-//                pa_log("FIFO shutdown.");
-//                goto fail;
-//            }
-//        }
-//    }
-//
-//fail:
-//    /* If this was no regular exit from the loop we have to continue
-//     * processing messages until we received PA_MESSAGE_SHUTDOWN */
-//    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
-//    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
-//
-//finish:
-//    pa_log_debug("Thread shutting down");
+
+                usec = pa_bytes_to_usec(n, &u->sink->sample_spec);
+
+                if (usec > u->latency)
+                    usec -= u->latency;
+                else
+                    usec = 0;
+
+                pa_smoother_put(u->smoother, pa_rtclock_usec(), usec);
+            }
+
+            /* Hmm, nothing to do. Let's sleep */
+            pollfd->events = PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;
+        }
+
+        if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
+            goto fail;
+
+        if (ret == 0)
+            goto finish;
+
+        if (u->rtpoll_item) {
+            struct pollfd* pollfd;
+
+            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
+
+            if (pollfd->revents & ~POLLOUT) {
+                pa_log("FIFO shutdown.");
+                goto fail;
+            }
+        }
+    }
+
+fail:
+    /* If this was no regular exit from the loop we have to continue
+     * processing messages until we received PA_MESSAGE_SHUTDOWN */
+    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
+    pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
+
+finish:
+    pa_log_debug("Thread shutting down");
 }
 
 int pa__init(pa_module* m) {
     int e;
-    char *rate, *channels;
+    const char *rate, *channels;
     pa_modargs *ma;
     pa_sink_new_data data;
     struct userdata *u;
@@ -706,6 +712,8 @@ int pa__init(pa_module* m) {
     u->audioservice_fd = -1;
     u->stream_fd = -1;
     u->transport = -1;
+    u->offset = 0;
+    u->latency = 0;
     u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
     pa_memchunk_reset(&u->memchunk);
     u->rtpoll = pa_rtpoll_new();
@@ -783,7 +791,7 @@ int pa__init(pa_module* m) {
     pa_sink_new_data_set_sample_spec(&data, &u->ss);
     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, u->name);
     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Bluetooth sink '%s' (%s)", u->name, u->addr);
-    u->sink = pa_sink_new(m->core, &data, PA_SINK_HARDWARE|PA_SINK_NETWORK);
+    u->sink = pa_sink_new(m->core, &data, PA_SINK_HARDWARE|PA_SINK_LATENCY|PA_SINK_NETWORK);
     pa_sink_new_data_done(&data);
     if (!u->sink) {
         pa_log_error("failed to create sink");