Sending translation for Finnish
[profile/ivi/pulseaudio.git] / src / modules / module-raop-sink.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2008 Colin Guthrie
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdlib.h>
28 #include <sys/stat.h>
29 #include <stdio.h>
30 #include <errno.h>
31 #include <string.h>
32 #include <fcntl.h>
33 #include <unistd.h>
34 #include <limits.h>
35 #include <poll.h>
36 #include <sys/socket.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #include <sys/ioctl.h>
40
41 #ifdef HAVE_LINUX_SOCKIOS_H
42 #include <linux/sockios.h>
43 #endif
44
45 #include <pulse/xmalloc.h>
46 #include <pulse/timeval.h>
47
48 #include <pulsecore/core-error.h>
49 #include <pulsecore/iochannel.h>
50 #include <pulsecore/sink.h>
51 #include <pulsecore/module.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/modargs.h>
54 #include <pulsecore/log.h>
55 #include <pulsecore/socket-client.h>
56 #include <pulsecore/authkey.h>
57 #include <pulsecore/thread-mq.h>
58 #include <pulsecore/thread.h>
59 #include <pulsecore/time-smoother.h>
60 #include <pulsecore/rtclock.h>
61 #include <pulsecore/socket-util.h>
62
63 #include "module-raop-sink-symdef.h"
64 #include "rtp.h"
65 #include "sdp.h"
66 #include "sap.h"
67 #include "raop_client.h"
68
69 PA_MODULE_AUTHOR("Colin Guthrie");
70 PA_MODULE_DESCRIPTION("RAOP Sink");
71 PA_MODULE_VERSION(PACKAGE_VERSION);
72 PA_MODULE_LOAD_ONCE(FALSE);
73 PA_MODULE_USAGE(
74         "sink_name=<name for the sink> "
75         "description=<description for the sink> "
76         "server=<address>  "
77         "format=<sample format> "
78         "channels=<number of channels> "
79         "rate=<sample rate>");
80
81 #define DEFAULT_SINK_NAME "raop"
82
83 struct userdata {
84     pa_core *core;
85     pa_module *module;
86     pa_sink *sink;
87
88     pa_thread_mq thread_mq;
89     pa_rtpoll *rtpoll;
90     pa_rtpoll_item *rtpoll_item;
91     pa_thread *thread;
92
93     pa_memchunk raw_memchunk;
94     pa_memchunk encoded_memchunk;
95
96     void *write_data;
97     size_t write_length, write_index;
98
99     void *read_data;
100     size_t read_length, read_index;
101
102     pa_usec_t latency;
103
104     /*esd_format_t format;*/
105     int32_t rate;
106
107     pa_smoother *smoother;
108     int fd;
109
110     int64_t offset;
111     int64_t encoding_overhead;
112     int32_t next_encoding_overhead;
113     double encoding_ratio;
114
115     pa_raop_client *raop;
116
117     size_t block_size;
118 };
119
120 static const char* const valid_modargs[] = {
121     "server",
122     "rate",
123     "format",
124     "channels",
125     "sink_name",
126     "description",
127     NULL
128 };
129
130 enum {
131     SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX,
132     SINK_MESSAGE_RIP_SOCKET
133 };
134
135 /* Forward declaration */
136 static void sink_set_volume_cb(pa_sink *);
137
138 static void on_connection(int fd, void*userdata) {
139     int so_sndbuf = 0;
140     socklen_t sl = sizeof(int);
141     struct userdata *u = userdata;
142     pa_assert(u);
143
144     pa_assert(u->fd < 0);
145     u->fd = fd;
146
147     if (getsockopt(u->fd, SOL_SOCKET, SO_SNDBUF, &so_sndbuf, &sl) < 0)
148         pa_log_warn("getsockopt(SO_SNDBUF) failed: %s", pa_cstrerror(errno));
149     else {
150         pa_log_debug("SO_SNDBUF is %zu.", (size_t) so_sndbuf);
151         pa_sink_set_max_request(u->sink, PA_MAX((size_t) so_sndbuf, u->block_size));
152     }
153
154     /* Set the initial volume */
155     sink_set_volume_cb(u->sink);
156
157     pa_log_debug("Connection authenticated, handing fd to IO thread...");
158
159     pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
160 }
161
162 static void on_close(void*userdata) {
163     struct userdata *u = userdata;
164     pa_assert(u);
165
166     pa_log_debug("Connection closed, informing IO thread...");
167
168     pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RIP_SOCKET, NULL, 0, NULL, NULL);
169 }
170
171 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
172     struct userdata *u = PA_SINK(o)->userdata;
173
174     switch (code) {
175
176         case PA_SINK_MESSAGE_SET_STATE:
177
178             switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
179
180                 case PA_SINK_SUSPENDED:
181                     pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
182
183                     pa_smoother_pause(u->smoother, pa_rtclock_usec());
184
185                     /* Issue a FLUSH if we are connected */
186                     if (u->fd >= 0) {
187                         pa_raop_flush(u->raop);
188                     }
189                     break;
190
191                 case PA_SINK_IDLE:
192                 case PA_SINK_RUNNING:
193
194                     if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
195                         pa_smoother_resume(u->smoother, pa_rtclock_usec());
196
197                         /* The connection can be closed when idle, so check to
198                            see if we need to reestablish it */
199                         if (u->fd < 0)
200                             pa_raop_connect(u->raop);
201                         else
202                             pa_raop_flush(u->raop);
203                     }
204
205                     break;
206
207                 case PA_SINK_UNLINKED:
208                 case PA_SINK_INIT:
209                 case PA_SINK_INVALID_STATE:
210                     ;
211             }
212
213             break;
214
215         case PA_SINK_MESSAGE_GET_LATENCY: {
216             pa_usec_t w, r;
217
218             r = pa_smoother_get(u->smoother, pa_rtclock_usec());
219             w = pa_bytes_to_usec((u->offset - u->encoding_overhead + (u->encoded_memchunk.length / u->encoding_ratio)), &u->sink->sample_spec);
220
221             *((pa_usec_t*) data) = w > r ? w - r : 0;
222             return 0;
223         }
224
225         case SINK_MESSAGE_PASS_SOCKET: {
226             struct pollfd *pollfd;
227
228             pa_assert(!u->rtpoll_item);
229
230             u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
231             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
232             pollfd->fd = u->fd;
233             pollfd->events = POLLOUT;
234             /*pollfd->events = */pollfd->revents = 0;
235
236             if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
237                 /* Our stream has been suspended so we just flush it.... */
238                 pa_raop_flush(u->raop);
239             }
240             return 0;
241         }
242
243         case SINK_MESSAGE_RIP_SOCKET: {
244             pa_assert(u->fd >= 0);
245
246             pa_close(u->fd);
247             u->fd = -1;
248
249             if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
250
251                 pa_log_debug("RTSP control connection closed, but we're suspended so let's not worry about it... we'll open it again later");
252
253                 if (u->rtpoll_item)
254                     pa_rtpoll_item_free(u->rtpoll_item);
255                 u->rtpoll_item = NULL;
256             } else {
257                 /* Quesiton: is this valid here: or should we do some sort of:
258                    return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL);
259                    ?? */
260                 pa_module_unload_request(u->module, TRUE);
261             }
262             return 0;
263         }
264     }
265
266     return pa_sink_process_msg(o, code, data, offset, chunk);
267 }
268
269 static void sink_set_volume_cb(pa_sink *s) {
270     struct userdata *u = s->userdata;
271     pa_cvolume hw;
272     pa_volume_t v;
273     char t[PA_CVOLUME_SNPRINT_MAX];
274
275     pa_assert(u);
276
277     /* If we're muted we don't need to do anything */
278     if (s->muted)
279         return;
280
281     /* Calculate the max volume of all channels.
282        We'll use this as our (single) volume on the APEX device and emulate
283        any variation in channel volumes in software */
284     v = pa_cvolume_max(&s->virtual_volume);
285
286     /* Create a pa_cvolume version of our single value */
287     pa_cvolume_set(&hw, s->sample_spec.channels, v);
288
289     /* Perform any software manipulation of the volume needed */
290     pa_sw_cvolume_divide(&s->soft_volume, &s->virtual_volume, &hw);
291
292     pa_log_debug("Requested volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->virtual_volume));
293     pa_log_debug("Got hardware volume: %s", pa_cvolume_snprint(t, sizeof(t), &hw));
294     pa_log_debug("Calculated software volume: %s", pa_cvolume_snprint(t, sizeof(t), &s->soft_volume));
295
296     /* Any necessary software volume manipulateion is done so set
297        our hw volume (or v as a single value) on the device */
298     pa_raop_client_set_volume(u->raop, v);
299 }
300
301 static void sink_set_mute_cb(pa_sink *s) {
302     struct userdata *u = s->userdata;
303
304     pa_assert(u);
305
306     if (s->muted) {
307         pa_raop_client_set_volume(u->raop, PA_VOLUME_MUTED);
308     } else {
309         sink_set_volume_cb(s);
310     }
311 }
312
313 static void thread_func(void *userdata) {
314     struct userdata *u = userdata;
315     int write_type = 0;
316     pa_memchunk silence;
317     uint32_t silence_overhead = 0;
318     double silence_ratio = 0;
319
320     pa_assert(u);
321
322     pa_log_debug("Thread starting up");
323
324     pa_thread_mq_install(&u->thread_mq);
325     pa_rtpoll_install(u->rtpoll);
326
327     pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
328
329     /* Create a chunk of memory that is our encoded silence sample. */
330     pa_memchunk_reset(&silence);
331
332     for (;;) {
333         int ret;
334
335         if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
336             if (u->sink->thread_info.rewind_requested)
337                 pa_sink_process_rewind(u->sink, 0);
338
339         if (u->rtpoll_item) {
340             struct pollfd *pollfd;
341             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
342
343             /* Render some data and write it to the fifo */
344             if (/*PA_SINK_IS_OPENED(u->sink->thread_info.state) && */pollfd->revents) {
345                 pa_usec_t usec;
346                 int64_t n;
347                 void *p;
348
349                 if (!silence.memblock) {
350                     pa_memchunk silence_tmp;
351
352                     pa_memchunk_reset(&silence_tmp);
353                     silence_tmp.memblock = pa_memblock_new(u->core->mempool, 4096);
354                     silence_tmp.length = 4096;
355                     p = pa_memblock_acquire(silence_tmp.memblock);
356                       memset(p, 0, 4096);
357                     pa_memblock_release(silence_tmp.memblock);
358                     pa_raop_client_encode_sample(u->raop, &silence_tmp, &silence);
359                     pa_assert(0 == silence_tmp.length);
360                     silence_overhead = silence_tmp.length - 4096;
361                     silence_ratio = silence_tmp.length / 4096;
362                     pa_memblock_unref(silence_tmp.memblock);
363                 }
364
365                 for (;;) {
366                     ssize_t l;
367
368                     if (u->encoded_memchunk.length <= 0) {
369                         if (u->encoded_memchunk.memblock)
370                             pa_memblock_unref(u->encoded_memchunk.memblock);
371                         if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
372                             size_t rl;
373
374                             /* We render real data */
375                             if (u->raw_memchunk.length <= 0) {
376                                 if (u->raw_memchunk.memblock)
377                                     pa_memblock_unref(u->raw_memchunk.memblock);
378                                 pa_memchunk_reset(&u->raw_memchunk);
379
380                                 /* Grab unencoded data */
381                                 pa_sink_render(u->sink, u->block_size, &u->raw_memchunk);
382                             }
383                             pa_assert(u->raw_memchunk.length > 0);
384
385                             /* Encode it */
386                             rl = u->raw_memchunk.length;
387                             u->encoding_overhead += u->next_encoding_overhead;
388                             pa_raop_client_encode_sample(u->raop, &u->raw_memchunk, &u->encoded_memchunk);
389                             u->next_encoding_overhead = (u->encoded_memchunk.length - (rl - u->raw_memchunk.length));
390                             u->encoding_ratio = u->encoded_memchunk.length / (rl - u->raw_memchunk.length);
391                         } else {
392                             /* We render some silence into our memchunk */
393                             memcpy(&u->encoded_memchunk, &silence, sizeof(pa_memchunk));
394                             pa_memblock_ref(silence.memblock);
395
396                             /* Calculate/store some values to be used with the smoother */
397                             u->next_encoding_overhead = silence_overhead;
398                             u->encoding_ratio = silence_ratio;
399                         }
400                     }
401                     pa_assert(u->encoded_memchunk.length > 0);
402
403                     p = pa_memblock_acquire(u->encoded_memchunk.memblock);
404                     l = pa_write(u->fd, (uint8_t*) p + u->encoded_memchunk.index, u->encoded_memchunk.length, &write_type);
405                     pa_memblock_release(u->encoded_memchunk.memblock);
406
407                     pa_assert(l != 0);
408
409                     if (l < 0) {
410
411                         if (errno == EINTR)
412                             continue;
413                         else if (errno == EAGAIN) {
414
415                             /* OK, we filled all socket buffers up
416                              * now. */
417                             goto filled_up;
418
419                         } else {
420                             pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
421                             goto fail;
422                         }
423
424                     } else {
425                         u->offset += l;
426
427                         u->encoded_memchunk.index += l;
428                         u->encoded_memchunk.length -= l;
429
430                         pollfd->revents = 0;
431
432                         if (u->encoded_memchunk.length > 0) {
433                             /* we've completely written the encoded data, so update our overhead */
434                             u->encoding_overhead += u->next_encoding_overhead;
435
436                             /* OK, we wrote less that we asked for,
437                              * hence we can assume that the socket
438                              * buffers are full now */
439                             goto filled_up;
440                         }
441                     }
442                 }
443
444             filled_up:
445
446                 /* At this spot we know that the socket buffers are
447                  * fully filled up. This is the best time to estimate
448                  * the playback position of the server */
449
450                 n = u->offset - u->encoding_overhead;
451
452 #ifdef SIOCOUTQ
453                 {
454                     int l;
455                     if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0)
456                         n -= (l / u->encoding_ratio);
457                 }
458 #endif
459
460                 usec = pa_bytes_to_usec(n, &u->sink->sample_spec);
461
462                 if (usec > u->latency)
463                     usec -= u->latency;
464                 else
465                     usec = 0;
466
467                 pa_smoother_put(u->smoother, pa_rtclock_usec(), usec);
468             }
469
470             /* Hmm, nothing to do. Let's sleep */
471             pollfd->events = POLLOUT; /*PA_SINK_IS_OPENED(u->sink->thread_info.state)  ? POLLOUT : 0;*/
472         }
473
474         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
475             goto fail;
476
477         if (ret == 0)
478             goto finish;
479
480         if (u->rtpoll_item) {
481             struct pollfd* pollfd;
482
483             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
484
485             if (pollfd->revents & ~POLLOUT) {
486                 if (u->sink->thread_info.state != PA_SINK_SUSPENDED) {
487                     pa_log("FIFO shutdown.");
488                     goto fail;
489                 }
490
491                 /* We expect this to happen on occasion if we are not sending data.
492                    It's perfectly natural and normal and natural */
493                 if (u->rtpoll_item)
494                     pa_rtpoll_item_free(u->rtpoll_item);
495                 u->rtpoll_item = NULL;
496             }
497         }
498     }
499
500 fail:
501     /* If this was no regular exit from the loop we have to continue
502      * processing messages until we received PA_MESSAGE_SHUTDOWN */
503     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
504     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
505
506 finish:
507     if (silence.memblock)
508         pa_memblock_unref(silence.memblock);
509     pa_log_debug("Thread shutting down");
510 }
511
512 int pa__init(pa_module*m) {
513     struct userdata *u = NULL;
514     pa_sample_spec ss;
515     pa_modargs *ma = NULL;
516     const char *server, *desc;
517     pa_sink_new_data data;
518
519     pa_assert(m);
520
521     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
522         pa_log("failed to parse module arguments");
523         goto fail;
524     }
525
526     ss = m->core->default_sample_spec;
527     if (pa_modargs_get_sample_spec(ma, &ss) < 0) {
528         pa_log("invalid sample format specification");
529         goto fail;
530     }
531
532     if ((/*ss.format != PA_SAMPLE_U8 &&*/ ss.format != PA_SAMPLE_S16NE) ||
533         (ss.channels > 2)) {
534         pa_log("sample type support is limited to mono/stereo and U8 or S16NE sample data");
535         goto fail;
536     }
537
538     u = pa_xnew0(struct userdata, 1);
539     u->core = m->core;
540     u->module = m;
541     m->userdata = u;
542     u->fd = -1;
543     u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
544     pa_memchunk_reset(&u->raw_memchunk);
545     pa_memchunk_reset(&u->encoded_memchunk);
546     u->offset = 0;
547     u->encoding_overhead = 0;
548     u->next_encoding_overhead = 0;
549     u->encoding_ratio = 1.0;
550
551     u->rtpoll = pa_rtpoll_new();
552     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
553     u->rtpoll_item = NULL;
554
555     /*u->format =
556         (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
557         (ss.channels == 2 ? ESD_STEREO : ESD_MONO);*/
558     u->rate = ss.rate;
559     u->block_size = pa_usec_to_bytes(PA_USEC_PER_SEC/20, &ss);
560
561     u->read_data = u->write_data = NULL;
562     u->read_index = u->write_index = u->read_length = u->write_length = 0;
563
564     /*u->state = STATE_AUTH;*/
565     u->latency = 0;
566
567     if (!(server = pa_modargs_get_value(ma, "server", NULL))) {
568         pa_log("No server argument given.");
569         goto fail;
570     }
571
572     pa_sink_new_data_init(&data);
573     data.driver = __FILE__;
574     data.module = m;
575     pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
576     pa_sink_new_data_set_sample_spec(&data, &ss);
577     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, server);
578     if ((desc = pa_modargs_get_value(ma, "description", NULL)))
579         pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, desc);
580     else
581         pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server);
582
583     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_NETWORK);
584     pa_sink_new_data_done(&data);
585
586     if (!u->sink) {
587         pa_log("Failed to create sink.");
588         goto fail;
589     }
590
591     u->sink->parent.process_msg = sink_process_msg;
592     u->sink->userdata = u;
593     u->sink->set_volume = sink_set_volume_cb;
594     u->sink->set_mute = sink_set_mute_cb;
595     u->sink->flags = PA_SINK_LATENCY|PA_SINK_NETWORK|PA_SINK_HW_VOLUME_CTRL;
596
597     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
598     pa_sink_set_rtpoll(u->sink, u->rtpoll);
599
600     if (!(u->raop = pa_raop_client_new(u->core, server))) {
601         pa_log("Failed to connect to server.");
602         goto fail;
603     }
604
605     pa_raop_client_set_callback(u->raop, on_connection, u);
606     pa_raop_client_set_closed_callback(u->raop, on_close, u);
607
608     if (!(u->thread = pa_thread_new(thread_func, u))) {
609         pa_log("Failed to create thread.");
610         goto fail;
611     }
612
613     pa_sink_put(u->sink);
614
615     pa_modargs_free(ma);
616
617     return 0;
618
619 fail:
620     if (ma)
621         pa_modargs_free(ma);
622
623     pa__done(m);
624
625     return -1;
626 }
627
628 int pa__get_n_used(pa_module *m) {
629     struct userdata *u;
630
631     pa_assert(m);
632     pa_assert_se(u = m->userdata);
633
634     return pa_sink_linked_by(u->sink);
635 }
636
637 void pa__done(pa_module*m) {
638     struct userdata *u;
639     pa_assert(m);
640
641     if (!(u = m->userdata))
642         return;
643
644     if (u->sink)
645         pa_sink_unlink(u->sink);
646
647     if (u->thread) {
648         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
649         pa_thread_free(u->thread);
650     }
651
652     pa_thread_mq_done(&u->thread_mq);
653
654     if (u->sink)
655         pa_sink_unref(u->sink);
656
657     if (u->rtpoll_item)
658         pa_rtpoll_item_free(u->rtpoll_item);
659
660     if (u->rtpoll)
661         pa_rtpoll_free(u->rtpoll);
662
663     if (u->raw_memchunk.memblock)
664         pa_memblock_unref(u->raw_memchunk.memblock);
665
666     if (u->encoded_memchunk.memblock)
667         pa_memblock_unref(u->encoded_memchunk.memblock);
668
669     if (u->raop)
670         pa_raop_client_free(u->raop);
671
672     pa_xfree(u->read_data);
673     pa_xfree(u->write_data);
674
675     if (u->smoother)
676         pa_smoother_free(u->smoother);
677
678     if (u->fd >= 0)
679         pa_close(u->fd);
680
681     pa_xfree(u);
682 }