From 255f9b0fe65631815895c7a2c0c4dd6752ec23f3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jo=C3=A3o=20Paulo=20Rechi=20Vita?= Date: Mon, 11 Aug 2008 18:10:14 -0300 Subject: [PATCH] Initial code for rt thread --- src/modules/module-bt-device.c | 286 +++++++++++++++++++++-------------------- 1 file changed, 147 insertions(+), 139 deletions(-) diff --git a/src/modules/module-bt-device.c b/src/modules/module-bt-device.c index 9a40c55..c0ca637 100644 --- a/src/modules/module-bt-device.c +++ b/src/modules/module-bt-device.c @@ -24,9 +24,12 @@ #endif #include +#include +#include #include #include +#include #include #include #include @@ -78,7 +81,7 @@ struct userdata { pa_rtpoll_item *rtpoll_item; pa_thread *thread; - pa_sample_spec ss; + int64_t offset; pa_smoother *smoother; pa_memchunk memchunk; @@ -88,11 +91,15 @@ struct userdata { const char *profile; int rate; int channels; + pa_sample_spec ss; int audioservice_fd; int stream_fd; + int transport; int link_mtu; + size_t block_size; + pa_usec_t latency; struct bt_a2dp a2dp; }; @@ -235,7 +242,6 @@ static uint8_t default_bitpool(uint8_t freq, uint8_t mode) { static int bt_a2dp_init(struct userdata *u) { sbc_capabilities_t *cap = &u->a2dp.sbc_capabilities; unsigned int max_bitpool, min_bitpool, rate, channels; - int dir; switch (u->rate) { case 48000: @@ -427,7 +433,7 @@ static int bt_setconf(struct userdata *u) { } u->transport = setconf_rsp->transport; - u->link_mtu = setconf_rsp->link_mtu; + u->block_size = u->link_mtu = setconf_rsp->link_mtu; /* setup SBC encoder now we agree on parameters */ if (u->transport == BT_CAPABILITIES_TRANSPORT_A2DP) { @@ -440,7 +446,7 @@ static int bt_setconf(struct userdata *u) { } static int bt_getstreamfd(struct userdata *u) { - int e, opt_name; + int e/*, opt_name*/; char buf[BT_AUDIO_IPC_PACKET_SIZE]; struct bt_streamstart_req *start_req = (void*) buf; bt_audio_rsp_msg_header_t *rsp_hdr = (void*) buf; @@ -532,10 +538,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse break; case PA_SINK_MESSAGE_GET_LATENCY: { -// pa_usec_t w, r; -// r = pa_smoother_get(u->smoother, pa_rtclock_usec()); -// w = pa_bytes_to_usec(u->offset + u->memchunk.length, &u->sink->sample_spec); -// *((pa_usec_t*) data) = w > r ? w - r : 0; + pa_usec_t w, r; + r = pa_smoother_get(u->smoother, pa_rtclock_usec()); + w = pa_bytes_to_usec(u->offset + u->memchunk.length, &u->sink->sample_spec); + *((pa_usec_t*) data) = w > r ? w - r : 0; break; } @@ -554,94 +560,94 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse } static void thread_func(void *userdata) { -// struct userdata *u = userdata; -// int write_type = 0; -// -// pa_assert(u); -// -// pa_log_debug("Thread starting up"); -// -// pa_thread_mq_install(&u->thread_mq); -// pa_rtpoll_install(u->rtpoll); -// -// pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec()); -// -// for (;;) { -// int ret; -// -// if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) -// if (u->sink->thread_info.rewind_requested) -// pa_sink_process_rewind(u->sink, 0); -// -// if (u->rtpoll_item) { -// struct pollfd *pollfd; -// pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); -// -// /* Render some data and write it to the fifo */ -// if (PA_SINK_IS_OPENED(u->sink->thread_info.state) && pollfd->revents) { -// pa_usec_t usec; -// int64_t n; -// -// for (;;) { -// ssize_t l; -// void *p; -// -// if (u->memchunk.length <= 0) -// pa_sink_render(u->sink, u->block_size, &u->memchunk); -// -// pa_assert(u->memchunk.length > 0); -// -// p = pa_memblock_acquire(u->memchunk.memblock); -// l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type); -// pa_memblock_release(u->memchunk.memblock); -// -// pa_assert(l != 0); -// -// if (l < 0) { -// -// if (errno == EINTR) -// continue; -// else if (errno == EAGAIN) { -// -// /* OK, we filled all socket buffers up -// * now. */ -// goto filled_up; -// -// } else { -// pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); -// goto fail; -// } -// -// } else { -// u->offset += l; -// -// u->memchunk.index += l; -// u->memchunk.length -= l; -// -// if (u->memchunk.length <= 0) { -// pa_memblock_unref(u->memchunk.memblock); -// pa_memchunk_reset(&u->memchunk); -// } -// -// pollfd->revents = 0; -// -// if (u->memchunk.length > 0) -// -// /* OK, we wrote less that we asked for, -// * hence we can assume that the socket -// * buffers are full now */ -// goto filled_up; -// } -// } -// -// filled_up: -// -// /* At this spot we know that the socket buffers are -// * fully filled up. This is the best time to estimate -// * the playback position of the server */ -// -// n = u->offset; -// + struct userdata *u = userdata; + int write_type = 0; + + pa_assert(u); + + pa_log/*_debug*/("Thread starting up"); + + pa_thread_mq_install(&u->thread_mq); + pa_rtpoll_install(u->rtpoll); + + pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec()); + + for (;;) { + int ret; + + if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) + if (u->sink->thread_info.rewind_requested) + pa_sink_process_rewind(u->sink, 0); + + if (u->rtpoll_item) { + struct pollfd *pollfd; + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + + /* Render some data and write it to the fifo */ + if (PA_SINK_IS_OPENED(u->sink->thread_info.state) && pollfd->revents) { + pa_usec_t usec; + int64_t n; + + for (;;) { + ssize_t l; + void *p; + + if (u->memchunk.length <= 0) + pa_sink_render(u->sink, u->block_size, &u->memchunk); + + pa_assert(u->memchunk.length > 0); + + p = pa_memblock_acquire(u->memchunk.memblock); + l = pa_write(u->stream_fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type); + pa_memblock_release(u->memchunk.memblock); + + pa_assert(l != 0); + + if (l < 0) { + + if (errno == EINTR) + continue; + else if (errno == EAGAIN) { + + /* OK, we filled all socket buffers up + * now. */ + goto filled_up; + + } else { + pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); + goto fail; + } + + } else { + u->offset += l; + + u->memchunk.index += l; + u->memchunk.length -= l; + + if (u->memchunk.length <= 0) { + pa_memblock_unref(u->memchunk.memblock); + pa_memchunk_reset(&u->memchunk); + } + + pollfd->revents = 0; + + if (u->memchunk.length > 0) + + /* OK, we wrote less that we asked for, + * hence we can assume that the socket + * buffers are full now */ + goto filled_up; + } + } + + filled_up: + + /* At this spot we know that the socket buffers are + * fully filled up. This is the best time to estimate + * the playback position of the server */ + + n = u->offset; + //#ifdef SIOCOUTQ // { // int l; @@ -649,52 +655,52 @@ static void thread_func(void *userdata) { // n -= l; // } //#endif -// -// usec = pa_bytes_to_usec(n, &u->sink->sample_spec); -// -// if (usec > u->latency) -// usec -= u->latency; -// else -// usec = 0; -// -// pa_smoother_put(u->smoother, pa_rtclock_usec(), usec); -// } -// -// /* Hmm, nothing to do. Let's sleep */ -// pollfd->events = PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0; -// } -// -// if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0) -// goto fail; -// -// if (ret == 0) -// goto finish; -// -// if (u->rtpoll_item) { -// struct pollfd* pollfd; -// -// pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); -// -// if (pollfd->revents & ~POLLOUT) { -// pa_log("FIFO shutdown."); -// goto fail; -// } -// } -// } -// -//fail: -// /* If this was no regular exit from the loop we have to continue -// * processing messages until we received PA_MESSAGE_SHUTDOWN */ -// pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); -// pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); -// -//finish: -// pa_log_debug("Thread shutting down"); + + usec = pa_bytes_to_usec(n, &u->sink->sample_spec); + + if (usec > u->latency) + usec -= u->latency; + else + usec = 0; + + pa_smoother_put(u->smoother, pa_rtclock_usec(), usec); + } + + /* Hmm, nothing to do. Let's sleep */ + pollfd->events = PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0; + } + + if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0) + goto fail; + + if (ret == 0) + goto finish; + + if (u->rtpoll_item) { + struct pollfd* pollfd; + + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + + if (pollfd->revents & ~POLLOUT) { + pa_log("FIFO shutdown."); + goto fail; + } + } + } + +fail: + /* If this was no regular exit from the loop we have to continue + * processing messages until we received PA_MESSAGE_SHUTDOWN */ + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); + pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); + +finish: + pa_log_debug("Thread shutting down"); } int pa__init(pa_module* m) { int e; - char *rate, *channels; + const char *rate, *channels; pa_modargs *ma; pa_sink_new_data data; struct userdata *u; @@ -706,6 +712,8 @@ int pa__init(pa_module* m) { u->audioservice_fd = -1; u->stream_fd = -1; u->transport = -1; + u->offset = 0; + u->latency = 0; u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10); pa_memchunk_reset(&u->memchunk); u->rtpoll = pa_rtpoll_new(); @@ -783,7 +791,7 @@ int pa__init(pa_module* m) { pa_sink_new_data_set_sample_spec(&data, &u->ss); pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, u->name); pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Bluetooth sink '%s' (%s)", u->name, u->addr); - u->sink = pa_sink_new(m->core, &data, PA_SINK_HARDWARE|PA_SINK_NETWORK); + u->sink = pa_sink_new(m->core, &data, PA_SINK_HARDWARE|PA_SINK_LATENCY|PA_SINK_NETWORK); pa_sink_new_data_done(&data); if (!u->sink) { pa_log_error("failed to create sink"); -- 2.7.4