2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2008 Colin Guthrie
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
36 #include <sys/socket.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #include <sys/ioctl.h>
41 #ifdef HAVE_LINUX_SOCKIOS_H
42 #include <linux/sockios.h>
45 #include <pulse/xmalloc.h>
46 #include <pulse/timeval.h>
48 #include <pulsecore/core-error.h>
49 #include <pulsecore/iochannel.h>
50 #include <pulsecore/sink.h>
51 #include <pulsecore/module.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/modargs.h>
54 #include <pulsecore/log.h>
55 #include <pulsecore/socket-client.h>
56 #include <pulsecore/authkey.h>
57 #include <pulsecore/thread-mq.h>
58 #include <pulsecore/thread.h>
59 #include <pulsecore/time-smoother.h>
60 #include <pulsecore/rtclock.h>
61 #include <pulsecore/socket-util.h>
63 #include "module-raop-sink-symdef.h"
67 #include "raop_client.h"
69 PA_MODULE_AUTHOR("Colin Guthrie");
70 PA_MODULE_DESCRIPTION("RAOP Sink");
71 PA_MODULE_VERSION(PACKAGE_VERSION);
72 PA_MODULE_LOAD_ONCE(FALSE);
74 "sink_name=<name for the sink> "
75 "description=<description for the sink> "
77 "format=<sample format> "
78 "channels=<number of channels> "
79 "rate=<sample rate>");
81 #define DEFAULT_SINK_NAME "raop"
88 pa_thread_mq thread_mq;
90 pa_rtpoll_item *rtpoll_item;
93 pa_memchunk raw_memchunk;
94 pa_memchunk encoded_memchunk;
97 size_t write_length, write_index;
100 size_t read_length, read_index;
104 /*esd_format_t format;*/
107 pa_smoother *smoother;
111 int64_t encoding_overhead;
112 int32_t next_encoding_overhead;
113 double encoding_ratio;
115 pa_raop_client *raop;
120 static const char* const valid_modargs[] = {
131 SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX,
132 SINK_MESSAGE_RIP_SOCKET
135 /* Forward declaration */
136 static void sink_set_volume_cb(pa_sink *);
138 static void on_connection(int fd, void*userdata) {
140 socklen_t sl = sizeof(int);
141 struct userdata *u = userdata;
144 pa_assert(u->fd < 0);
147 if (getsockopt(u->fd, SOL_SOCKET, SO_SNDBUF, &so_sndbuf, &sl) < 0)
148 pa_log_warn("getsockopt(SO_SNDBUF) failed: %s", pa_cstrerror(errno));
150 pa_log_debug("SO_SNDBUF is %zu.", (size_t) so_sndbuf);
151 pa_sink_set_max_request(u->sink, PA_MAX((size_t) so_sndbuf, u->block_size));
154 /* Set the initial volume */
155 sink_set_volume_cb(u->sink);
157 pa_log_debug("Connection authenticated, handing fd to IO thread...");
159 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
162 static void on_close(void*userdata) {
163 struct userdata *u = userdata;
166 pa_log_debug("Connection closed, informing IO thread...");
168 pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL);
171 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
172 struct userdata *u = PA_SINK(o)->userdata;
176 case PA_SINK_MESSAGE_SET_STATE:
178 switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
180 case PA_SINK_SUSPENDED:
181 pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
183 pa_smoother_pause(u->smoother, pa_rtclock_usec());
185 /* Issue a FLUSH if we are connected */
187 pa_raop_flush(u->raop);
192 case PA_SINK_RUNNING:
194 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
195 pa_smoother_resume(u->smoother, pa_rtclock_usec(), TRUE);
197 /* The connection can be closed when idle, so check to
198 see if we need to reestablish it */
200 pa_raop_connect(u->raop);
202 pa_raop_flush(u->raop);
207 case PA_SINK_UNLINKED:
209 case PA_SINK_INVALID_STATE:
215 case PA_SINK_MESSAGE_GET_LATENCY: {
218 r = pa_smoother_get(u->smoother, pa_rtclock_usec());
219 w = pa_bytes_to_usec((u->offset - u->encoding_overhead + (u->encoded_memchunk.length / u->encoding_ratio)), &u->sink->sample_spec);
221 *((pa_usec_t*) data) = w > r ? w - r : 0;
225 case SINK_MESSAGE_PASS_SOCKET: {
226 struct pollfd *pollfd;
228 pa_assert(!u->rtpoll_item);
230 u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
231 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
233 pollfd->events = POLLOUT;
234 /*pollfd->events = */pollfd->revents = 0;
236 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
237 /* Our stream has been suspended so we just flush it.... */
238 pa_raop_flush(u->raop);
243 case SINK_MESSAGE_RIP_SOCKET: {
244 pa_assert(u->fd >= 0);
249 if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
251 pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
254 pa_rtpoll_item_free(u->rtpoll_item);
255 u->rtpoll_item = NULL;
257 /* Quesiton: is this valid here: or should we do some sort of:
258 return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
260 pa_module_unload_request(u->module, TRUE);
266 return pa_sink_process_msg(o, code, data, offset, chunk);
269 static void sink_set_volume_cb(pa_sink *s) {
270 struct userdata *u = s->userdata;
273 char t[PA_CVOLUME_SNPRINT_MAX];
277 /* If we're muted we don't need to do anything */
281 /* Calculate the max volume of all channels.
282 We'll use this as our (single) volume on the APEX device and emulate
283 any variation in channel volumes in software */
284 v = pa_cvolume_max(&s->virtual_volume);
286 /* Create a pa_cvolume version of our single value */
287 pa_cvolume_set(&hw, s->sample_spec.channels, v);
289 /* Perform any software manipulation of the volume needed */
290 pa_sw_cvolume_divide(&s->soft_volume, &s->virtual_volume, &hw);
292 pa_log_debug("Requested volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->virtual_volume));
293 pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint(t, sizeof(t), &hw));
294 pa_log_debug("Calculated software volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->soft_volume));
296 /* Any necessary software volume manipulateion is done so set
297 our hw volume (or v as a single value) on the device */
298 pa_raop_client_set_volume(u->raop, v);
301 static void sink_set_mute_cb(pa_sink *s) {
302 struct userdata *u = s->userdata;
307 pa_raop_client_set_volume(u->raop, PA_VOLUME_MUTED);
309 sink_set_volume_cb(s);
313 static void thread_func(void *userdata) {
314 struct userdata *u = userdata;
317 uint32_t silence_overhead = 0;
318 double silence_ratio = 0;
322 pa_log_debug("Thread starting up");
324 pa_thread_mq_install(&u->thread_mq);
325 pa_rtpoll_install(u->rtpoll);
327 pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
329 /* Create a chunk of memory that is our encoded silence sample. */
330 pa_memchunk_reset(&silence);
335 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
336 if (u->sink->thread_info.rewind_requested)
337 pa_sink_process_rewind(u->sink, 0);
339 if (u->rtpoll_item) {
340 struct pollfd *pollfd;
341 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
343 /* Render some data and write it to the fifo */
344 if (/*PA_SINK_IS_OPENED(u->sink->thread_info.state) && */pollfd->revents) {
349 if (!silence.memblock) {
350 pa_memchunk silence_tmp;
352 pa_memchunk_reset(&silence_tmp);
353 silence_tmp.memblock = pa_memblock_new(u->core->mempool, 4096);
354 silence_tmp.length = 4096;
355 p = pa_memblock_acquire(silence_tmp.memblock);
357 pa_memblock_release(silence_tmp.memblock);
358 pa_raop_client_encode_sample(u->raop, &silence_tmp, &silence);
359 pa_assert(0 == silence_tmp.length);
360 silence_overhead = silence_tmp.length - 4096;
361 silence_ratio = silence_tmp.length / 4096;
362 pa_memblock_unref(silence_tmp.memblock);
368 if (u->encoded_memchunk.length <= 0) {
369 if (u->encoded_memchunk.memblock)
370 pa_memblock_unref(u->encoded_memchunk.memblock);
371 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
374 /* We render real data */
375 if (u->raw_memchunk.length <= 0) {
376 if (u->raw_memchunk.memblock)
377 pa_memblock_unref(u->raw_memchunk.memblock);
378 pa_memchunk_reset(&u->raw_memchunk);
380 /* Grab unencoded data */
381 pa_sink_render(u->sink, u->block_size, &u->raw_memchunk);
383 pa_assert(u->raw_memchunk.length > 0);
386 rl = u->raw_memchunk.length;
387 u->encoding_overhead += u->next_encoding_overhead;
388 pa_raop_client_encode_sample(u->raop, &u->raw_memchunk, &u->encoded_memchunk);
389 u->next_encoding_overhead = (u->encoded_memchunk.length - (rl - u->raw_memchunk.length));
390 u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);
392 /* We render some silence into our memchunk */
393 memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk));
394 pa_memblock_ref(silence.memblock);
396 /* Calculate/store some values to be used with the smoother */
397 u->next_encoding_overhead = silence_overhead;
398 u->encoding_ratio = silence_ratio;
401 pa_assert(u->encoded_memchunk.length > 0);
403 p = pa_memblock_acquire(u->encoded_memchunk.memblock);
404 l = pa_write(u->fd, (uint8_t*) p + u->encoded_memchunk.index, u->encoded_memchunk.length, &write_type);
405 pa_memblock_release(u->encoded_memchunk.memblock);
413 else if (errno == EAGAIN) {
415 /* OK, we filled all socket buffers up
420 pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
427 u->encoded_memchunk.index += l;
428 u->encoded_memchunk.length -= l;
432 if (u->encoded_memchunk.length > 0) {
433 /* we've completely written the encoded data, so update our overhead */
434 u->encoding_overhead += u->next_encoding_overhead;
436 /* OK, we wrote less that we asked for,
437 * hence we can assume that the socket
438 * buffers are full now */
446 /* At this spot we know that the socket buffers are
447 * fully filled up. This is the best time to estimate
448 * the playback position of the server */
450 n = u->offset - u->encoding_overhead;
455 if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0)
456 n -= (l / u->encoding_ratio);
460 usec = pa_bytes_to_usec(n, &u->sink->sample_spec);
462 if (usec > u->latency)
467 pa_smoother_put(u->smoother, pa_rtclock_usec(), usec);
470 /* Hmm, nothing to do. Let's sleep */
471 pollfd->events = POLLOUT; /*PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0;*/
474 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
480 if (u->rtpoll_item) {
481 struct pollfd* pollfd;
483 pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
485 if (pollfd->revents & ~POLLOUT) {
486 if (u->sink->thread_info.state != PA_SINK_SUSPENDED) {
487 pa_log("FIFO shutdown.");
491 /* We expect this to happen on occasion if we are not sending data.
492 It's perfectly natural and normal and natural */
494 pa_rtpoll_item_free(u->rtpoll_item);
495 u->rtpoll_item = NULL;
501 /* If this was no regular exit from the loop we have to continue
502 * processing messages until we received PA_MESSAGE_SHUTDOWN */
503 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
504 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
507 if (silence.memblock)
508 pa_memblock_unref(silence.memblock);
509 pa_log_debug("Thread shutting down");
512 int pa__init(pa_module*m) {
513 struct userdata *u = NULL;
515 pa_modargs *ma = NULL;
516 const char *server, *desc;
517 pa_sink_new_data data;
521 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
522 pa_log("failed to parse module arguments");
526 ss = m->core->default_sample_spec;
527 if (pa_modargs_get_sample_spec(ma, &ss) < 0) {
528 pa_log("invalid sample format specification");
532 if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss.format != PA_SAMPLE_S16NE) ||
534 pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
538 u = pa_xnew0(struct userdata, 1);
543 u->smoother = pa_smoother_new(
551 pa_memchunk_reset(&u->raw_memchunk);
552 pa_memchunk_reset(&u->encoded_memchunk);
554 u->encoding_overhead = 0;
555 u->next_encoding_overhead = 0;
556 u->encoding_ratio = 1.0;
558 u->rtpoll = pa_rtpoll_new();
559 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
560 u->rtpoll_item = NULL;
563 (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
564 (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
566 u->block_size = pa_usec_to_bytes(PA_USEC_PER_SEC/20, &ss);
568 u->read_data = u->write_data = NULL;
569 u->read_index = u->write_index = u->read_length = u->write_length = 0;
571 /*u->state = STATE_AUTH;*/
574 if (!(server = pa_modargs_get_value(ma, "server", NULL))) {
575 pa_log("No server argument given.");
579 pa_sink_new_data_init(&data);
580 data.driver = __FILE__;
582 pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
583 pa_sink_new_data_set_sample_spec(&data, &ss);
584 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
585 if ((desc = pa_modargs_get_value(ma, "description", NULL)))
586 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, desc);
588 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server);
590 u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_NETWORK);
591 pa_sink_new_data_done(&data);
594 pa_log("Failed to create sink.");
598 u->sink->parent.process_msg = sink_process_msg;
599 u->sink->userdata = u;
600 u->sink->set_volume = sink_set_volume_cb;
601 u->sink->set_mute = sink_set_mute_cb;
602 u->sink->flags = PA_SINK_LATENCY|PA_SINK_NETWORK|PA_SINK_HW_VOLUME_CTRL;
604 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
605 pa_sink_set_rtpoll(u->sink, u->rtpoll);
607 if (!(u->raop = pa_raop_client_new(u->core, server))) {
608 pa_log("Failed to connect to server.");
612 pa_raop_client_set_callback(u->raop, on_connection, u);
613 pa_raop_client_set_closed_callback(u->raop, on_close, u);
615 if (!(u->thread = pa_thread_new(thread_func, u))) {
616 pa_log("Failed to create thread.");
620 pa_sink_put(u->sink);
635 int pa__get_n_used(pa_module *m) {
639 pa_assert_se(u = m->userdata);
641 return pa_sink_linked_by(u->sink);
644 void pa__done(pa_module*m) {
648 if (!(u = m->userdata))
652 pa_sink_unlink(u->sink);
655 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
656 pa_thread_free(u->thread);
659 pa_thread_mq_done(&u->thread_mq);
662 pa_sink_unref(u->sink);
665 pa_rtpoll_item_free(u->rtpoll_item);
668 pa_rtpoll_free(u->rtpoll);
670 if (u->raw_memchunk.memblock)
671 pa_memblock_unref(u->raw_memchunk.memblock);
673 if (u->encoded_memchunk.memblock)
674 pa_memblock_unref(u->encoded_memchunk.memblock);
677 pa_raop_client_free(u->raop);
679 pa_xfree(u->read_data);
680 pa_xfree(u->write_data);
683 pa_smoother_free(u->smoother);