build-system: move x11 and jack modules into subdirectories
[profile/ivi/pulseaudio-panda.git] / src / modules / module-esound-sink.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2.1 of the License,
9   or (at your option) any later version.
10
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15
16   You should have received a copy of the GNU Lesser General Public License
17   along with PulseAudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdlib.h>
27 #include <sys/stat.h>
28 #include <stdio.h>
29 #include <errno.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <limits.h>
34 #include <poll.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/ioctl.h>
39
40 #ifdef HAVE_LINUX_SOCKIOS_H
41 #include <linux/sockios.h>
42 #endif
43
44 #include <pulse/xmalloc.h>
45 #include <pulse/timeval.h>
46
47 #include <pulsecore/core-error.h>
48 #include <pulsecore/iochannel.h>
49 #include <pulsecore/sink.h>
50 #include <pulsecore/module.h>
51 #include <pulsecore/core-util.h>
52 #include <pulsecore/modargs.h>
53 #include <pulsecore/log.h>
54 #include <pulsecore/socket-client.h>
55 #include <pulsecore/esound.h>
56 #include <pulsecore/authkey.h>
57 #include <pulsecore/thread-mq.h>
58 #include <pulsecore/thread.h>
59 #include <pulsecore/time-smoother.h>
60 #include <pulsecore/rtclock.h>
61 #include <pulsecore/socket-util.h>
62
63 #include "module-esound-sink-symdef.h"
64
65 PA_MODULE_AUTHOR("Lennart Poettering");
66 PA_MODULE_DESCRIPTION("ESOUND Sink");
67 PA_MODULE_VERSION(PACKAGE_VERSION);
68 PA_MODULE_LOAD_ONCE(FALSE);
69 PA_MODULE_USAGE(
70         "sink_name=<name for the sink> "
71         "server=<address> cookie=<filename>  "
72         "format=<sample format> "
73         "channels=<number of channels> "
74         "rate=<sample rate>");
75
76 #define DEFAULT_SINK_NAME "esound_out"
77
78 struct userdata {
79     pa_core *core;
80     pa_module *module;
81     pa_sink *sink;
82
83     pa_thread_mq thread_mq;
84     pa_rtpoll *rtpoll;
85     pa_rtpoll_item *rtpoll_item;
86     pa_thread *thread;
87
88     pa_memchunk memchunk;
89
90     void *write_data;
91     size_t write_length, write_index;
92
93     void *read_data;
94     size_t read_length, read_index;
95
96     enum {
97         STATE_AUTH,
98         STATE_LATENCY,
99         STATE_PREPARE,
100         STATE_RUNNING,
101         STATE_DEAD
102     } state;
103
104     pa_usec_t latency;
105
106     esd_format_t format;
107     int32_t rate;
108
109     pa_smoother *smoother;
110     int fd;
111
112     int64_t offset;
113
114     pa_iochannel *io;
115     pa_socket_client *client;
116
117     size_t block_size;
118 };
119
120 static const char* const valid_modargs[] = {
121     "server",
122     "cookie",
123     "rate",
124     "format",
125     "channels",
126     "sink_name",
127     NULL
128 };
129
130 enum {
131     SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX
132 };
133
134 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
135     struct userdata *u = PA_SINK(o)->userdata;
136
137     switch (code) {
138
139         case PA_SINK_MESSAGE_SET_STATE:
140
141             switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
142
143                 case PA_SINK_SUSPENDED:
144                     pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
145
146                     pa_smoother_pause(u->smoother, pa_rtclock_usec());
147                     break;
148
149                 case PA_SINK_IDLE:
150                 case PA_SINK_RUNNING:
151
152                     if (u->sink->thread_info.state == PA_SINK_SUSPENDED)
153                         pa_smoother_resume(u->smoother, pa_rtclock_usec(), TRUE);
154
155                     break;
156
157                 case PA_SINK_UNLINKED:
158                 case PA_SINK_INIT:
159                 case PA_SINK_INVALID_STATE:
160                     ;
161             }
162
163             break;
164
165         case PA_SINK_MESSAGE_GET_LATENCY: {
166             pa_usec_t w, r;
167
168             r = pa_smoother_get(u->smoother, pa_rtclock_usec());
169             w = pa_bytes_to_usec((uint64_t) u->offset + u->memchunk.length, &u->sink->sample_spec);
170
171             *((pa_usec_t*) data) = w > r ? w - r : 0;
172             return 0;
173         }
174
175         case SINK_MESSAGE_PASS_SOCKET: {
176             struct pollfd *pollfd;
177
178             pa_assert(!u->rtpoll_item);
179
180             u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
181             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
182             pollfd->fd = u->fd;
183             pollfd->events = pollfd->revents = 0;
184
185             return 0;
186         }
187     }
188
189     return pa_sink_process_msg(o, code, data, offset, chunk);
190 }
191
192 static void thread_func(void *userdata) {
193     struct userdata *u = userdata;
194     int write_type = 0;
195
196     pa_assert(u);
197
198     pa_log_debug("Thread starting up");
199
200     pa_thread_mq_install(&u->thread_mq);
201     pa_rtpoll_install(u->rtpoll);
202
203     pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
204
205     for (;;) {
206         int ret;
207
208         if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
209             if (u->sink->thread_info.rewind_requested)
210                 pa_sink_process_rewind(u->sink, 0);
211
212         if (u->rtpoll_item) {
213             struct pollfd *pollfd;
214             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
215
216             /* Render some data and write it to the fifo */
217             if (PA_SINK_IS_OPENED(u->sink->thread_info.state) && pollfd->revents) {
218                 pa_usec_t usec;
219                 int64_t n;
220
221                 for (;;) {
222                     ssize_t l;
223                     void *p;
224
225                     if (u->memchunk.length <= 0)
226                         pa_sink_render(u->sink, u->block_size, &u->memchunk);
227
228                     pa_assert(u->memchunk.length > 0);
229
230                     p = pa_memblock_acquire(u->memchunk.memblock);
231                     l = pa_write(u->fd, (uint8_t*) p + u->memchunk.index, u->memchunk.length, &write_type);
232                     pa_memblock_release(u->memchunk.memblock);
233
234                     pa_assert(l != 0);
235
236                     if (l < 0) {
237
238                         if (errno == EINTR)
239                             continue;
240                         else if (errno == EAGAIN) {
241
242                             /* OK, we filled all socket buffers up
243                              * now. */
244                             goto filled_up;
245
246                         } else {
247                             pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
248                             goto fail;
249                         }
250
251                     } else {
252                         u->offset += l;
253
254                         u->memchunk.index += (size_t) l;
255                         u->memchunk.length -= (size_t) l;
256
257                         if (u->memchunk.length <= 0) {
258                             pa_memblock_unref(u->memchunk.memblock);
259                             pa_memchunk_reset(&u->memchunk);
260                         }
261
262                         pollfd->revents = 0;
263
264                         if (u->memchunk.length > 0)
265
266                             /* OK, we wrote less that we asked for,
267                              * hence we can assume that the socket
268                              * buffers are full now */
269                             goto filled_up;
270                     }
271                 }
272
273             filled_up:
274
275                 /* At this spot we know that the socket buffers are
276                  * fully filled up. This is the best time to estimate
277                  * the playback position of the server */
278
279                 n = u->offset;
280
281 #ifdef SIOCOUTQ
282                 {
283                     int l;
284                     if (ioctl(u->fd, SIOCOUTQ, &l) >= 0 && l > 0)
285                         n -= l;
286                 }
287 #endif
288
289                 usec = pa_bytes_to_usec((uint64_t) n, &u->sink->sample_spec);
290
291                 if (usec > u->latency)
292                     usec -= u->latency;
293                 else
294                     usec = 0;
295
296                 pa_smoother_put(u->smoother, pa_rtclock_usec(), usec);
297             }
298
299             /* Hmm, nothing to do. Let's sleep */
300             pollfd->events = (short) (PA_SINK_IS_OPENED(u->sink->thread_info.state) ? POLLOUT : 0);
301         }
302
303         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
304             goto fail;
305
306         if (ret == 0)
307             goto finish;
308
309         if (u->rtpoll_item) {
310             struct pollfd* pollfd;
311
312             pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
313
314             if (pollfd->revents & ~POLLOUT) {
315                 pa_log("FIFO shutdown.");
316                 goto fail;
317             }
318         }
319     }
320
321 fail:
322     /* If this was no regular exit from the loop we have to continue
323      * processing messages until we received PA_MESSAGE_SHUTDOWN */
324     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
325     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
326
327 finish:
328     pa_log_debug("Thread shutting down");
329 }
330
331 static int do_write(struct userdata *u) {
332     ssize_t r;
333     pa_assert(u);
334
335     if (!pa_iochannel_is_writable(u->io))
336         return 0;
337
338     if (u->write_data) {
339         pa_assert(u->write_index < u->write_length);
340
341         if ((r = pa_iochannel_write(u->io, (uint8_t*) u->write_data + u->write_index, u->write_length - u->write_index)) <= 0) {
342             pa_log("write() failed: %s", pa_cstrerror(errno));
343             return -1;
344         }
345
346         u->write_index += (size_t) r;
347         pa_assert(u->write_index <= u->write_length);
348
349         if (u->write_index == u->write_length) {
350             pa_xfree(u->write_data);
351             u->write_data = NULL;
352             u->write_index = u->write_length = 0;
353         }
354     }
355
356     if (!u->write_data && u->state == STATE_PREPARE) {
357         int so_sndbuf = 0;
358         socklen_t sl = sizeof(int);
359
360         /* OK, we're done with sending all control data we need to, so
361          * let's hand the socket over to the IO thread now */
362
363         pa_assert(u->fd < 0);
364         u->fd = pa_iochannel_get_send_fd(u->io);
365
366         pa_iochannel_set_noclose(u->io, TRUE);
367         pa_iochannel_free(u->io);
368         u->io = NULL;
369
370         pa_make_tcp_socket_low_delay(u->fd);
371
372         if (getsockopt(u->fd, SOL_SOCKET, SO_SNDBUF, &so_sndbuf, &sl) < 0)
373             pa_log_warn("getsockopt(SO_SNDBUF) failed: %s", pa_cstrerror(errno));
374         else {
375             pa_log_debug("SO_SNDBUF is %zu.", (size_t) so_sndbuf);
376             pa_sink_set_max_request(u->sink, PA_MAX((size_t) so_sndbuf, u->block_size));
377         }
378
379         pa_log_debug("Connection authenticated, handing fd to IO thread...");
380
381         pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PASS_SOCKET, NULL, 0, NULL, NULL);
382         u->state = STATE_RUNNING;
383     }
384
385     return 0;
386 }
387
388 static int handle_response(struct userdata *u) {
389     pa_assert(u);
390
391     switch (u->state) {
392
393         case STATE_AUTH:
394             pa_assert(u->read_length == sizeof(int32_t));
395
396             /* Process auth data */
397             if (!*(int32_t*) u->read_data) {
398                 pa_log("Authentication failed: %s", pa_cstrerror(errno));
399                 return -1;
400             }
401
402             /* Request latency data */
403             pa_assert(!u->write_data);
404             *(int32_t*) (u->write_data = pa_xmalloc(u->write_length = sizeof(int32_t))) = ESD_PROTO_LATENCY;
405
406             u->write_index = 0;
407             u->state = STATE_LATENCY;
408
409             /* Space for next response */
410             pa_assert(u->read_length >= sizeof(int32_t));
411             u->read_index = 0;
412             u->read_length = sizeof(int32_t);
413
414             break;
415
416         case STATE_LATENCY: {
417             int32_t *p;
418             pa_assert(u->read_length == sizeof(int32_t));
419
420             /* Process latency info */
421             u->latency = (pa_usec_t) ((double) (*(int32_t*) u->read_data) * 1000000 / 44100);
422             if (u->latency > 10000000) {
423                 pa_log_warn("Invalid latency information received from server");
424                 u->latency = 0;
425             }
426
427             /* Create stream */
428             pa_assert(!u->write_data);
429             p = u->write_data = pa_xmalloc0(u->write_length = sizeof(int32_t)*3+ESD_NAME_MAX);
430             *(p++) = ESD_PROTO_STREAM_PLAY;
431             *(p++) = u->format;
432             *(p++) = u->rate;
433             pa_strlcpy((char*) p, "PulseAudio Tunnel", ESD_NAME_MAX);
434
435             u->write_index = 0;
436             u->state = STATE_PREPARE;
437
438             /* Don't read any further */
439             pa_xfree(u->read_data);
440             u->read_data = NULL;
441             u->read_index = u->read_length = 0;
442
443             break;
444         }
445
446         default:
447             pa_assert_not_reached();
448     }
449
450     return 0;
451 }
452
453 static int do_read(struct userdata *u) {
454     pa_assert(u);
455
456     if (!pa_iochannel_is_readable(u->io))
457         return 0;
458
459     if (u->state == STATE_AUTH || u->state == STATE_LATENCY) {
460         ssize_t r;
461
462         if (!u->read_data)
463             return 0;
464
465         pa_assert(u->read_index < u->read_length);
466
467         if ((r = pa_iochannel_read(u->io, (uint8_t*) u->read_data + u->read_index, u->read_length - u->read_index)) <= 0) {
468             pa_log("read() failed: %s", r < 0 ? pa_cstrerror(errno) : "EOF");
469             return -1;
470         }
471
472         u->read_index += (size_t) r;
473         pa_assert(u->read_index <= u->read_length);
474
475         if (u->read_index == u->read_length)
476             return handle_response(u);
477     }
478
479     return 0;
480 }
481
482 static void io_callback(pa_iochannel *io, void*userdata) {
483     struct userdata *u = userdata;
484     pa_assert(u);
485
486     if (do_read(u) < 0 || do_write(u) < 0) {
487
488         if (u->io) {
489             pa_iochannel_free(u->io);
490             u->io = NULL;
491         }
492
493         pa_module_unload_request(u->module, TRUE);
494     }
495 }
496
497 static void on_connection(pa_socket_client *c, pa_iochannel*io, void *userdata) {
498     struct userdata *u = userdata;
499
500     pa_socket_client_unref(u->client);
501     u->client = NULL;
502
503     if (!io) {
504         pa_log("Connection failed: %s", pa_cstrerror(errno));
505         pa_module_unload_request(u->module, TRUE);
506         return;
507     }
508
509     pa_assert(!u->io);
510     u->io = io;
511     pa_iochannel_set_callback(u->io, io_callback, u);
512
513     pa_log_debug("Connection established, authenticating ...");
514 }
515
516 int pa__init(pa_module*m) {
517     struct userdata *u = NULL;
518     pa_sample_spec ss;
519     pa_modargs *ma = NULL;
520     const char *espeaker;
521     uint32_t key;
522     pa_sink_new_data data;
523
524     pa_assert(m);
525
526     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
527         pa_log("failed to parse module arguments");
528         goto fail;
529     }
530
531     ss = m->core->default_sample_spec;
532     if (pa_modargs_get_sample_spec(ma, &ss) < 0) {
533         pa_log("invalid sample format specification");
534         goto fail;
535     }
536
537     if ((ss.format != PA_SAMPLE_U8 && ss.format != PA_SAMPLE_S16NE) ||
538         (ss.channels > 2)) {
539         pa_log("esound sample type support is limited to mono/stereo and U8 or S16NE sample data");
540         goto fail;
541     }
542
543     u = pa_xnew0(struct userdata, 1);
544     u->core = m->core;
545     u->module = m;
546     m->userdata = u;
547     u->fd = -1;
548     u->smoother = pa_smoother_new(
549             PA_USEC_PER_SEC,
550             PA_USEC_PER_SEC*2,
551             TRUE,
552             TRUE,
553             10,
554             0,
555             FALSE);
556     pa_memchunk_reset(&u->memchunk);
557     u->offset = 0;
558
559     u->rtpoll = pa_rtpoll_new();
560     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
561     u->rtpoll_item = NULL;
562
563     u->format =
564         (ss.format == PA_SAMPLE_U8 ? ESD_BITS8 : ESD_BITS16) |
565         (ss.channels == 2 ? ESD_STEREO : ESD_MONO);
566     u->rate = (int32_t) ss.rate;
567     u->block_size = pa_usec_to_bytes(PA_USEC_PER_SEC/20, &ss);
568
569     u->read_data = u->write_data = NULL;
570     u->read_index = u->write_index = u->read_length = u->write_length = 0;
571
572     u->state = STATE_AUTH;
573     u->latency = 0;
574
575     if (!(espeaker = getenv("ESPEAKER")))
576         espeaker = ESD_UNIX_SOCKET_NAME;
577
578     espeaker = pa_modargs_get_value(ma, "server", espeaker);
579
580     pa_sink_new_data_init(&data);
581     data.driver = __FILE__;
582     data.module = m;
583     pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
584     pa_sink_new_data_set_sample_spec(&data, &ss);
585     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, espeaker);
586     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_API, "esd");
587     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "EsounD Output on %s", espeaker);
588
589     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_NETWORK);
590     pa_sink_new_data_done(&data);
591
592     if (!u->sink) {
593         pa_log("Failed to create sink.");
594         goto fail;
595     }
596
597     u->sink->parent.process_msg = sink_process_msg;
598     u->sink->userdata = u;
599
600     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
601     pa_sink_set_rtpoll(u->sink, u->rtpoll);
602
603     if (!(u->client = pa_socket_client_new_string(u->core->mainloop, espeaker, ESD_DEFAULT_PORT))) {
604         pa_log("Failed to connect to server.");
605         goto fail;
606     }
607
608     pa_socket_client_set_callback(u->client, on_connection, u);
609
610     /* Prepare the initial request */
611     u->write_data = pa_xmalloc(u->write_length = ESD_KEY_LEN + sizeof(int32_t));
612     if (pa_authkey_load_auto(pa_modargs_get_value(ma, "cookie", ".esd_auth"), u->write_data, ESD_KEY_LEN) < 0) {
613         pa_log("Failed to load cookie");
614         goto fail;
615     }
616
617     key = ESD_ENDIAN_KEY;
618     memcpy((uint8_t*) u->write_data + ESD_KEY_LEN, &key, sizeof(key));
619
620     /* Reserve space for the response */
621     u->read_data = pa_xmalloc(u->read_length = sizeof(int32_t));
622
623     if (!(u->thread = pa_thread_new(thread_func, u))) {
624         pa_log("Failed to create thread.");
625         goto fail;
626     }
627
628     pa_sink_put(u->sink);
629
630     pa_modargs_free(ma);
631
632     return 0;
633
634 fail:
635     if (ma)
636         pa_modargs_free(ma);
637
638     pa__done(m);
639
640     return -1;
641 }
642
643 int pa__get_n_used(pa_module *m) {
644     struct userdata *u;
645
646     pa_assert(m);
647     pa_assert_se(u = m->userdata);
648
649     return pa_sink_linked_by(u->sink);
650 }
651
652 void pa__done(pa_module*m) {
653     struct userdata *u;
654     pa_assert(m);
655
656     if (!(u = m->userdata))
657         return;
658
659     if (u->sink)
660         pa_sink_unlink(u->sink);
661
662     if (u->thread) {
663         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
664         pa_thread_free(u->thread);
665     }
666
667     pa_thread_mq_done(&u->thread_mq);
668
669     if (u->sink)
670         pa_sink_unref(u->sink);
671
672     if (u->io)
673         pa_iochannel_free(u->io);
674
675     if (u->rtpoll_item)
676         pa_rtpoll_item_free(u->rtpoll_item);
677
678     if (u->rtpoll)
679         pa_rtpoll_free(u->rtpoll);
680
681     if (u->memchunk.memblock)
682         pa_memblock_unref(u->memchunk.memblock);
683
684     if (u->client)
685         pa_socket_client_unref(u->client);
686
687     pa_xfree(u->read_data);
688     pa_xfree(u->write_data);
689
690     if (u->smoother)
691         pa_smoother_free(u->smoother);
692
693     if (u->fd >= 0)
694         pa_close(u->fd);
695
696     pa_xfree(u);
697 }