merge glitch-free branch back into trunk
[profile/ivi/pulseaudio.git] / src / modules / module-tunnel.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5
6   Copyright 2004-2006 Lennart Poettering
7   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
8
9   PulseAudio is free software; you can redistribute it and/or modify
10   it under the terms of the GNU Lesser General Public License as published
11   by the Free Software Foundation; either version 2 of the License,
12   or (at your option) any later version.
13
14   PulseAudio is distributed in the hope that it will be useful, but
15   WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   General Public License for more details.
18
19   You should have received a copy of the GNU Lesser General Public License
20   along with PulseAudio; if not, write to the Free Software
21   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
22   USA.
23 ***/
24
25 #ifdef HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <unistd.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <sys/types.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35
36 #include <pulse/timeval.h>
37 #include <pulse/util.h>
38 #include <pulse/version.h>
39 #include <pulse/xmalloc.h>
40
41 #include <pulsecore/module.h>
42 #include <pulsecore/core-util.h>
43 #include <pulsecore/modargs.h>
44 #include <pulsecore/log.h>
45 #include <pulsecore/core-subscribe.h>
46 #include <pulsecore/sink-input.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream.h>
49 #include <pulsecore/pstream-util.h>
50 #include <pulsecore/authkey.h>
51 #include <pulsecore/socket-client.h>
52 #include <pulsecore/socket-util.h>
53 #include <pulsecore/authkey-prop.h>
54 #include <pulsecore/time-smoother.h>
55 #include <pulsecore/thread.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/rtclock.h>
58 #include <pulsecore/core-error.h>
59
60 #ifdef TUNNEL_SINK
61 #include "module-tunnel-sink-symdef.h"
62 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
63 PA_MODULE_LOAD_ONCE(FALSE);
64 PA_MODULE_USAGE(
65         "server=<address> "
66         "sink=<remote sink name> "
67         "cookie=<filename> "
68         "format=<sample format> "
69         "channels=<number of channels> "
70         "rate=<sample rate> "
71         "sink_name=<name for the local sink> "
72         "channel_map=<channel map>");
73 #else
74 #include "module-tunnel-source-symdef.h"
75 PA_MODULE_DESCRIPTION("Tunnel module for sources");
76 PA_MODULE_USAGE(
77         "server=<address> "
78         "source=<remote source name> "
79         "cookie=<filename> "
80         "format=<sample format> "
81         "channels=<number of channels> "
82         "rate=<sample rate> "
83         "source_name=<name for the local source> "
84         "channel_map=<channel map>");
85 #endif
86
87 PA_MODULE_AUTHOR("Lennart Poettering");
88 PA_MODULE_VERSION(PACKAGE_VERSION);
89
90 #define DEFAULT_TLENGTH_MSEC 100
91 #define DEFAULT_MINREQ_MSEC 10
92 #define DEFAULT_MAXLENGTH_MSEC ((DEFAULT_TLENGTH_MSEC*3)/2)
93 #define DEFAULT_FRAGSIZE_MSEC 10
94
95 #define DEFAULT_TIMEOUT 5
96
97 #define LATENCY_INTERVAL 10
98
99 static const char* const valid_modargs[] = {
100     "server",
101     "cookie",
102     "format",
103     "channels",
104     "rate",
105 #ifdef TUNNEL_SINK
106     "sink_name",
107     "sink",
108 #else
109     "source_name",
110     "source",
111 #endif
112     "channel_map",
113     NULL,
114 };
115
116 enum {
117     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX
118 };
119
120 enum {
121     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
122     SINK_MESSAGE_POST
123 };
124
125 #ifdef TUNNEL_SINK
126 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
127 #endif
128 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
129 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
130 static void command_overflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
131 static void command_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
132 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
133 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
134
135 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
136 #ifdef TUNNEL_SINK
137     [PA_COMMAND_REQUEST] = command_request,
138 #endif
139     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
140     [PA_COMMAND_OVERFLOW] = command_overflow,
141     [PA_COMMAND_UNDERFLOW] = command_underflow,
142     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
143     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
144     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspend,
145     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspend,
146     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
147     [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
148 };
149
150 struct userdata {
151     pa_core *core;
152     pa_module *module;
153
154     pa_thread_mq thread_mq;
155     pa_rtpoll *rtpoll;
156     pa_thread *thread;
157
158     pa_socket_client *client;
159     pa_pstream *pstream;
160     pa_pdispatch *pdispatch;
161
162     char *server_name;
163 #ifdef TUNNEL_SINK
164     char *sink_name;
165     pa_sink *sink;
166     uint32_t requested_bytes;
167 #else
168     char *source_name;
169     pa_source *source;
170 #endif
171
172     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
173
174     uint32_t version;
175     uint32_t ctag;
176     uint32_t device_index;
177     uint32_t channel;
178
179     int64_t counter, counter_delta;
180
181     pa_time_event *time_event;
182
183     pa_bool_t auth_cookie_in_property;
184
185     pa_smoother *smoother;
186
187     char *device_description;
188     char *server_fqdn;
189     char *user_name;
190
191     uint32_t maxlength;
192 #ifdef TUNNEL_SINK
193     uint32_t tlength;
194     uint32_t minreq;
195     uint32_t prebuf;
196 #else
197     uint32_t fragsize;
198 #endif
199 };
200
201 static void command_stream_killed(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
202     struct userdata *u = userdata;
203
204     pa_assert(pd);
205     pa_assert(t);
206     pa_assert(u);
207     pa_assert(u->pdispatch == pd);
208
209     pa_log_warn("Stream killed");
210     pa_module_unload_request(u->module);
211 }
212
213 static void command_overflow(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
214     struct userdata *u = userdata;
215
216     pa_assert(pd);
217     pa_assert(t);
218     pa_assert(u);
219     pa_assert(u->pdispatch == pd);
220
221     pa_log_warn("Server signalled buffer overrun.");
222 }
223
224 static void command_underflow(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
225     struct userdata *u = userdata;
226
227     pa_assert(pd);
228     pa_assert(t);
229     pa_assert(u);
230     pa_assert(u->pdispatch == pd);
231
232     pa_log_warn("Server signalled buffer underrun.");
233 }
234
235 static void command_suspend(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
236     struct userdata *u = userdata;
237
238     pa_assert(pd);
239     pa_assert(t);
240     pa_assert(u);
241     pa_assert(u->pdispatch == pd);
242
243     pa_log_debug("Server reports a stream suspension.");
244 }
245
246 static void command_moved(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
247     struct userdata *u = userdata;
248
249     pa_assert(pd);
250     pa_assert(t);
251     pa_assert(u);
252     pa_assert(u->pdispatch == pd);
253
254     pa_log_debug("Server reports a stream move.");
255 }
256
257 static void stream_cork(struct userdata *u, pa_bool_t cork) {
258     pa_tagstruct *t;
259     pa_assert(u);
260
261     if (cork)
262         pa_smoother_pause(u->smoother, pa_rtclock_usec());
263     else
264         pa_smoother_resume(u->smoother, pa_rtclock_usec());
265
266     if (!u->pstream)
267         return;
268
269     t = pa_tagstruct_new(NULL, 0);
270 #ifdef TUNNEL_SINK
271     pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
272 #else
273     pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
274 #endif
275     pa_tagstruct_putu32(t, u->ctag++);
276     pa_tagstruct_putu32(t, u->channel);
277     pa_tagstruct_put_boolean(t, !!cork);
278     pa_pstream_send_tagstruct(u->pstream, t);
279 }
280
281 #ifdef TUNNEL_SINK
282
283 static void send_data(struct userdata *u) {
284     pa_assert(u);
285
286     while (u->requested_bytes > 0) {
287         pa_memchunk memchunk;
288         pa_sink_render(u->sink, u->requested_bytes, &memchunk);
289         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
290         pa_memblock_unref(memchunk.memblock);
291         u->requested_bytes -= memchunk.length;
292     }
293 }
294
295 /* This function is called from IO context -- except when it is not. */
296 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
297     struct userdata *u = PA_SINK(o)->userdata;
298
299     switch (code) {
300
301         case PA_SINK_MESSAGE_SET_STATE: {
302             int r;
303
304             /* First, change the state, because otherwide pa_sink_render() would fail */
305             if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0)
306                 if (PA_SINK_IS_OPENED((pa_sink_state_t) PA_PTR_TO_UINT(data)))
307                     send_data(u);
308
309             return r;
310         }
311
312         case SINK_MESSAGE_REQUEST:
313
314             pa_assert(offset > 0);
315             u->requested_bytes += (size_t) offset;
316
317             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
318                 send_data(u);
319
320             return 0;
321
322         case SINK_MESSAGE_POST:
323
324             /* OK, This might be a bit confusing. This message is
325              * delivered to us from the main context -- NOT from the
326              * IO thread context where the rest of the messages are
327              * dispatched. Yeah, ugly, but I am a lazy bastard. */
328
329             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
330             u->counter += chunk->length;
331             u->counter_delta += chunk->length;
332             return 0;
333     }
334
335     return pa_sink_process_msg(o, code, data, offset, chunk);
336 }
337
338 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
339     struct userdata *u;
340     pa_sink_assert_ref(s);
341     u = s->userdata;
342
343     switch ((pa_sink_state_t) state) {
344
345         case PA_SINK_SUSPENDED:
346             pa_assert(PA_SINK_IS_OPENED(s->state));
347             stream_cork(u, TRUE);
348             break;
349
350         case PA_SINK_IDLE:
351         case PA_SINK_RUNNING:
352             if (s->state == PA_SINK_SUSPENDED)
353                 stream_cork(u, FALSE);
354             break;
355
356         case PA_SINK_UNLINKED:
357         case PA_SINK_INIT:
358             ;
359     }
360
361     return 0;
362 }
363
364 #else
365
366 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
367     struct userdata *u = PA_SOURCE(o)->userdata;
368
369     switch (code) {
370         case SOURCE_MESSAGE_POST:
371
372             if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
373                 pa_source_post(u->source, chunk);
374             return 0;
375     }
376
377     return pa_source_process_msg(o, code, data, offset, chunk);
378 }
379
380 static int source_set_state(pa_source *s, pa_source_state_t state) {
381     struct userdata *u;
382     pa_source_assert_ref(s);
383     u = s->userdata;
384
385     switch ((pa_source_state_t) state) {
386
387         case PA_SOURCE_SUSPENDED:
388             pa_assert(PA_SOURCE_IS_OPENED(s->state));
389             stream_cork(u, TRUE);
390             break;
391
392         case PA_SOURCE_IDLE:
393         case PA_SOURCE_RUNNING:
394             if (s->state == PA_SOURCE_SUSPENDED)
395                 stream_cork(u, FALSE);
396             break;
397
398         case PA_SOURCE_UNLINKED:
399         case PA_SOURCE_INIT:
400             ;
401     }
402
403     return 0;
404 }
405
406 #endif
407
408 static void thread_func(void *userdata) {
409     struct userdata *u = userdata;
410
411     pa_assert(u);
412
413     pa_log_debug("Thread starting up");
414
415     pa_thread_mq_install(&u->thread_mq);
416     pa_rtpoll_install(u->rtpoll);
417
418     for (;;) {
419         int ret;
420
421         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
422             goto fail;
423
424         if (ret == 0)
425             goto finish;
426     }
427
428 fail:
429     /* If this was no regular exit from the loop we have to continue
430      * processing messages until we received PA_MESSAGE_SHUTDOWN */
431     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
432     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
433
434 finish:
435     pa_log_debug("Thread shutting down");
436 }
437
438 #ifdef TUNNEL_SINK
439 static void command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
440     struct userdata *u = userdata;
441     uint32_t bytes, channel;
442
443     pa_assert(pd);
444     pa_assert(command == PA_COMMAND_REQUEST);
445     pa_assert(t);
446     pa_assert(u);
447     pa_assert(u->pdispatch == pd);
448
449     if (pa_tagstruct_getu32(t, &channel) < 0 ||
450         pa_tagstruct_getu32(t, &bytes) < 0) {
451         pa_log("Invalid protocol reply");
452         goto fail;
453     }
454
455     if (channel != u->channel) {
456         pa_log("Recieved data for invalid channel");
457         goto fail;
458     }
459
460     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL);
461     return;
462
463 fail:
464     pa_module_unload_request(u->module);
465 }
466
467 #endif
468
469 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
470     struct userdata *u = userdata;
471     pa_usec_t sink_usec, source_usec, transport_usec, host_usec, k;
472     int playing;
473     int64_t write_index, read_index;
474     struct timeval local, remote, now;
475
476     pa_assert(pd);
477     pa_assert(u);
478
479     if (command != PA_COMMAND_REPLY) {
480         if (command == PA_COMMAND_ERROR)
481             pa_log("Failed to get latency.");
482         else
483             pa_log("Protocol error 1.");
484         goto fail;
485     }
486
487     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
488         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
489         pa_tagstruct_get_boolean(t, &playing) < 0 ||
490         pa_tagstruct_get_timeval(t, &local) < 0 ||
491         pa_tagstruct_get_timeval(t, &remote) < 0 ||
492         pa_tagstruct_gets64(t, &write_index) < 0 ||
493         pa_tagstruct_gets64(t, &read_index) < 0) {
494         pa_log("Invalid reply. (latency)");
495         goto fail;
496     }
497
498     pa_gettimeofday(&now);
499
500     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
501         /* local and remote seem to have synchronized clocks */
502 #ifdef TUNNEL_SINK
503         transport_usec = pa_timeval_diff(&remote, &local);
504 #else
505         transport_usec = pa_timeval_diff(&now, &remote);
506 #endif
507     } else
508         transport_usec = pa_timeval_diff(&now, &local)/2;
509
510 #ifdef TUNNEL_SINK
511     host_usec = sink_usec + transport_usec;
512 #else
513     host_usec = source_usec + transport_usec;
514     if (host_usec > sink_usec)
515         host_usec -= sink_usec;
516     else
517         host_usec = 0;
518 #endif
519
520 #ifdef TUNNEL_SINK
521     k = pa_bytes_to_usec(u->counter - u->counter_delta, &u->sink->sample_spec);
522
523     if (k > host_usec)
524         k -= host_usec;
525     else
526         k = 0;
527 #else
528     k = pa_bytes_to_usec(u->counter - u->counter_delta, &u->source->sample_spec);
529     k += host_usec;
530 #endif
531
532     pa_smoother_put(u->smoother, pa_rtclock_usec(), k);
533
534     return;
535
536 fail:
537     pa_module_unload_request(u->module);
538 }
539
540 static void request_latency(struct userdata *u) {
541     pa_tagstruct *t;
542     struct timeval now;
543     uint32_t tag;
544     pa_assert(u);
545
546     t = pa_tagstruct_new(NULL, 0);
547 #ifdef TUNNEL_SINK
548     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
549 #else
550     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
551 #endif
552     pa_tagstruct_putu32(t, tag = u->ctag++);
553     pa_tagstruct_putu32(t, u->channel);
554
555     pa_gettimeofday(&now);
556     pa_tagstruct_put_timeval(t, &now);
557
558     pa_pstream_send_tagstruct(u->pstream, t);
559     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
560
561     u->counter_delta = 0;
562 }
563
564 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) {
565     struct userdata *u = userdata;
566     struct timeval ntv;
567
568     pa_assert(m);
569     pa_assert(e);
570     pa_assert(u);
571
572     request_latency(u);
573
574     pa_gettimeofday(&ntv);
575     ntv.tv_sec += LATENCY_INTERVAL;
576     m->time_restart(e, &ntv);
577 }
578
579 #ifdef TUNNEL_SINK
580 /* static pa_usec_t sink_get_latency(pa_sink *s) { */
581 /*     pa_usec_t t, c; */
582 /*     struct userdata *u = s->userdata; */
583
584 /*     pa_sink_assert_ref(s); */
585
586 /*     c = pa_bytes_to_usec(u->counter, &s->sample_spec); */
587 /*     t = pa_smoother_get(u->smoother, pa_rtclock_usec()); */
588
589 /*     return c > t ? c - t : 0; */
590 /* } */
591 #else
592 /* static pa_usec_t source_get_latency(pa_source *s) { */
593 /*     pa_usec_t t, c; */
594 /*     struct userdata *u = s->userdata; */
595
596 /*     pa_source_assert_ref(s); */
597
598 /*     c = pa_bytes_to_usec(u->counter, &s->sample_spec); */
599 /*     t = pa_smoother_get(u->smoother, pa_rtclock_usec()); */
600
601 /*     return t > c ? t - c : 0; */
602 /* } */
603 #endif
604
605 static void update_description(struct userdata *u) {
606     char *d;
607     char un[128], hn[128];
608     pa_tagstruct *t;
609
610     pa_assert(u);
611
612     if (!u->server_fqdn || !u->user_name || !u->device_description)
613         return;
614
615     d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
616
617 #ifdef TUNNEL_SINK
618     pa_sink_set_description(u->sink, d);
619 #else
620     pa_source_set_description(u->source, d);
621 #endif
622
623     pa_xfree(d);
624
625     d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
626                           pa_get_user_name(un, sizeof(un)),
627                           pa_get_host_name(hn, sizeof(hn)));
628
629     t = pa_tagstruct_new(NULL, 0);
630 #ifdef TUNNEL_SINK
631     pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
632 #else
633     pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
634 #endif
635     pa_tagstruct_putu32(t, u->ctag++);
636     pa_tagstruct_putu32(t, u->channel);
637     pa_tagstruct_puts(t, d);
638     pa_pstream_send_tagstruct(u->pstream, t);
639
640     pa_xfree(d);
641 }
642
643 static void server_info_cb(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
644     struct userdata *u = userdata;
645     pa_sample_spec ss;
646     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
647     uint32_t cookie;
648
649     pa_assert(pd);
650     pa_assert(u);
651
652     if (command != PA_COMMAND_REPLY) {
653         if (command == PA_COMMAND_ERROR)
654             pa_log("Failed to get info.");
655         else
656             pa_log("Protocol error 6.");
657         goto fail;
658     }
659
660     if (pa_tagstruct_gets(t, &server_name) < 0 ||
661         pa_tagstruct_gets(t, &server_version) < 0 ||
662         pa_tagstruct_gets(t, &user_name) < 0 ||
663         pa_tagstruct_gets(t, &host_name) < 0 ||
664         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
665         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
666         pa_tagstruct_gets(t, &default_source_name) < 0 ||
667         pa_tagstruct_getu32(t, &cookie) < 0) {
668         pa_log("Invalid reply. (get_server_info)");
669         goto fail;
670     }
671
672     pa_xfree(u->server_fqdn);
673     u->server_fqdn = pa_xstrdup(host_name);
674
675     pa_xfree(u->user_name);
676     u->user_name = pa_xstrdup(user_name);
677
678     update_description(u);
679
680     return;
681
682 fail:
683     pa_module_unload_request(u->module);
684 }
685
686 #ifdef TUNNEL_SINK
687
688 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
689     struct userdata *u = userdata;
690     uint32_t idx, owner_module, monitor_source, flags;
691     const char *name, *description, *monitor_source_name, *driver;
692     pa_sample_spec ss;
693     pa_channel_map cm;
694     pa_cvolume volume;
695     int mute;
696     pa_usec_t latency;
697
698     pa_assert(pd);
699     pa_assert(u);
700
701     if (command != PA_COMMAND_REPLY) {
702         if (command == PA_COMMAND_ERROR)
703             pa_log("Failed to get info.");
704         else
705             pa_log("Protocol error 5.");
706         goto fail;
707     }
708
709     if (pa_tagstruct_getu32(t, &idx) < 0 ||
710         pa_tagstruct_gets(t, &name) < 0 ||
711         pa_tagstruct_gets(t, &description) < 0 ||
712         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
713         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
714         pa_tagstruct_getu32(t, &owner_module) < 0 ||
715         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
716         pa_tagstruct_get_boolean(t, &mute) < 0 ||
717         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
718         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
719         pa_tagstruct_get_usec(t, &latency) < 0 ||
720         pa_tagstruct_gets(t, &driver) < 0 ||
721         pa_tagstruct_getu32(t, &flags) < 0) {
722         pa_log("Invalid reply. (get_sink_info)");
723         goto fail;
724     }
725
726     if (!u->sink_name || strcmp(name, u->sink_name))
727         return;
728
729     pa_xfree(u->device_description);
730     u->device_description = pa_xstrdup(description);
731
732     update_description(u);
733
734     return;
735
736 fail:
737     pa_module_unload_request(u->module);
738 }
739
740 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
741     struct userdata *u = userdata;
742     uint32_t idx, owner_module, client, sink;
743     pa_usec_t buffer_usec, sink_usec;
744     const char *name, *driver, *resample_method;
745     int mute;
746     pa_sample_spec sample_spec;
747     pa_channel_map channel_map;
748     pa_cvolume volume;
749
750     pa_assert(pd);
751     pa_assert(u);
752
753     if (command != PA_COMMAND_REPLY) {
754         if (command == PA_COMMAND_ERROR)
755             pa_log("Failed to get info.");
756         else
757             pa_log("Protocol error 2.");
758         goto fail;
759     }
760
761     if (pa_tagstruct_getu32(t, &idx) < 0 ||
762         pa_tagstruct_gets(t, &name) < 0 ||
763         pa_tagstruct_getu32(t, &owner_module) < 0 ||
764         pa_tagstruct_getu32(t, &client) < 0 ||
765         pa_tagstruct_getu32(t, &sink) < 0 ||
766         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
767         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
768         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
769         pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
770         pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
771         pa_tagstruct_gets(t, &resample_method) < 0 ||
772         pa_tagstruct_gets(t, &driver) < 0 ||
773         (u->version >= 11 && pa_tagstruct_get_boolean(t, &mute) < 0)) {
774         pa_log("Invalid reply. (get_info)");
775         goto fail;
776     }
777
778     if (idx != u->device_index)
779         return;
780
781     pa_assert(u->sink);
782
783     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
784         pa_cvolume_equal(&volume, &u->sink->volume))
785         return;
786
787     memcpy(&u->sink->volume, &volume, sizeof(pa_cvolume));
788
789     if (u->version >= 11)
790         u->sink->muted = !!mute;
791
792     pa_subscription_post(u->sink->core, PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE, u->sink->index);
793     return;
794
795 fail:
796     pa_module_unload_request(u->module);
797 }
798
799 #else
800
801 static void source_info_cb(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
802     struct userdata *u = userdata;
803     uint32_t idx, owner_module, monitor_of_sink, flags;
804     const char *name, *description, *monitor_of_sink_name, *driver;
805     pa_sample_spec ss;
806     pa_channel_map cm;
807     pa_cvolume volume;
808     int mute;
809     pa_usec_t latency;
810
811     pa_assert(pd);
812     pa_assert(u);
813
814     if (command != PA_COMMAND_REPLY) {
815         if (command == PA_COMMAND_ERROR)
816             pa_log("Failed to get info.");
817         else
818             pa_log("Protocol error 5.");
819         goto fail;
820     }
821
822     if (pa_tagstruct_getu32(t, &idx) < 0 ||
823         pa_tagstruct_gets(t, &name) < 0 ||
824         pa_tagstruct_gets(t, &description) < 0 ||
825         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
826         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
827         pa_tagstruct_getu32(t, &owner_module) < 0 ||
828         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
829         pa_tagstruct_get_boolean(t, &mute) < 0 ||
830         pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
831         pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
832         pa_tagstruct_get_usec(t, &latency) < 0 ||
833         pa_tagstruct_gets(t, &driver) < 0 ||
834         pa_tagstruct_getu32(t, &flags) < 0) {
835         pa_log("Invalid reply. (get_source_info)");
836         goto fail;
837     }
838
839     if (!u->source_name || strcmp(name, u->source_name))
840         return;
841
842     pa_xfree(u->device_description);
843     u->device_description = pa_xstrdup(description);
844
845     update_description(u);
846
847     return;
848
849 fail:
850     pa_module_unload_request(u->module);
851 }
852
853 #endif
854
855 static void request_info(struct userdata *u) {
856     pa_tagstruct *t;
857     uint32_t tag;
858     pa_assert(u);
859
860     t = pa_tagstruct_new(NULL, 0);
861     pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
862     pa_tagstruct_putu32(t, tag = u->ctag++);
863     pa_pstream_send_tagstruct(u->pstream, t);
864     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
865
866 #ifdef TUNNEL_SINK
867     t = pa_tagstruct_new(NULL, 0);
868     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
869     pa_tagstruct_putu32(t, tag = u->ctag++);
870     pa_tagstruct_putu32(t, u->device_index);
871     pa_pstream_send_tagstruct(u->pstream, t);
872     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
873
874     t = pa_tagstruct_new(NULL, 0);
875     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
876     pa_tagstruct_putu32(t, tag = u->ctag++);
877     pa_tagstruct_putu32(t, PA_INVALID_INDEX);
878     pa_tagstruct_puts(t, u->sink_name);
879     pa_pstream_send_tagstruct(u->pstream, t);
880     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
881 #else
882     t = pa_tagstruct_new(NULL, 0);
883     pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
884     pa_tagstruct_putu32(t, tag = u->ctag++);
885     pa_tagstruct_putu32(t, PA_INVALID_INDEX);
886     pa_tagstruct_puts(t, u->source_name);
887     pa_pstream_send_tagstruct(u->pstream, t);
888     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
889 #endif
890 }
891
892 static void command_subscribe_event(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
893     struct userdata *u = userdata;
894     pa_subscription_event_type_t e;
895     uint32_t idx;
896
897     pa_assert(pd);
898     pa_assert(t);
899     pa_assert(u);
900     pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
901
902     if (pa_tagstruct_getu32(t, &e) < 0 ||
903         pa_tagstruct_getu32(t, &idx) < 0) {
904         pa_log("Invalid protocol reply");
905         pa_module_unload_request(u->module);
906         return;
907     }
908
909     if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
910 #ifdef TUNNEL_SINK
911         e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
912         e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
913 #else
914         e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
915 #endif
916         )
917         return;
918
919     request_info(u);
920 }
921
922 static void start_subscribe(struct userdata *u) {
923     pa_tagstruct *t;
924     uint32_t tag;
925     pa_assert(u);
926
927     t = pa_tagstruct_new(NULL, 0);
928     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
929     pa_tagstruct_putu32(t, tag = u->ctag++);
930     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
931 #ifdef TUNNEL_SINK
932                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
933 #else
934                         PA_SUBSCRIPTION_MASK_SOURCE
935 #endif
936                         );
937
938     pa_pstream_send_tagstruct(u->pstream, t);
939 }
940
941 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
942     struct userdata *u = userdata;
943     struct timeval ntv;
944 #ifdef TUNNEL_SINK
945     uint32_t bytes;
946 #endif
947
948     pa_assert(pd);
949     pa_assert(u);
950     pa_assert(u->pdispatch == pd);
951
952     if (command != PA_COMMAND_REPLY) {
953         if (command == PA_COMMAND_ERROR)
954             pa_log("Failed to create stream.");
955         else
956             pa_log("Protocol error 3.");
957         goto fail;
958     }
959
960     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
961         pa_tagstruct_getu32(t, &u->device_index) < 0
962 #ifdef TUNNEL_SINK
963         || pa_tagstruct_getu32(t, &bytes) < 0
964 #endif
965         )
966         goto parse_error;
967
968     if (u->version >= 9) {
969 #ifdef TUNNEL_SINK
970         uint32_t maxlength, tlength, prebuf, minreq;
971
972         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
973             pa_tagstruct_getu32(t, &tlength) < 0 ||
974             pa_tagstruct_getu32(t, &prebuf) < 0 ||
975             pa_tagstruct_getu32(t, &minreq) < 0)
976             goto parse_error;
977 #else
978         uint32_t maxlength, fragsize;
979
980         if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
981             pa_tagstruct_getu32(t, &fragsize) < 0)
982             goto parse_error;
983 #endif
984     }
985
986     start_subscribe(u);
987     request_info(u);
988
989     pa_assert(!u->time_event);
990     pa_gettimeofday(&ntv);
991     ntv.tv_sec += LATENCY_INTERVAL;
992     u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
993
994     request_latency(u);
995
996     pa_log_debug("Stream created.");
997
998 #ifdef TUNNEL_SINK
999     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1000 #endif
1001
1002     return;
1003
1004 parse_error:
1005     pa_log("Invalid reply. (Create stream)");
1006
1007 fail:
1008     pa_module_unload_request(u->module);
1009 }
1010
1011 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1012     struct userdata *u = userdata;
1013     pa_tagstruct *reply;
1014     char name[256], un[128], hn[128];
1015 #ifdef TUNNEL_SINK
1016     pa_cvolume volume;
1017 #endif
1018
1019     pa_assert(pd);
1020     pa_assert(u);
1021     pa_assert(u->pdispatch == pd);
1022
1023     if (command != PA_COMMAND_REPLY ||
1024         pa_tagstruct_getu32(t, &u->version) < 0) {
1025         if (command == PA_COMMAND_ERROR)
1026             pa_log("Failed to authenticate");
1027         else
1028             pa_log("Protocol error 4.");
1029
1030         goto fail;
1031     }
1032
1033     /* Minimum supported protocol version */
1034     if (u->version < 8) {
1035         pa_log("Incompatible protocol version");
1036         goto fail;
1037     }
1038
1039 #ifdef TUNNEL_SINK
1040     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1041                 u->sink_name,
1042                 pa_get_user_name(un, sizeof(un)),
1043                 pa_get_host_name(hn, sizeof(hn)));
1044 #else
1045     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1046                 u->source_name,
1047                 pa_get_user_name(un, sizeof(un)),
1048                 pa_get_host_name(hn, sizeof(hn)));
1049 #endif
1050
1051     reply = pa_tagstruct_new(NULL, 0);
1052     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1053     pa_tagstruct_putu32(reply, tag = u->ctag++);
1054     pa_tagstruct_puts(reply, "PulseAudio");
1055     pa_pstream_send_tagstruct(u->pstream, reply);
1056     /* We ignore the server's reply here */
1057
1058     reply = pa_tagstruct_new(NULL, 0);
1059
1060 #ifdef TUNNEL_SINK
1061     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1062     pa_tagstruct_putu32(reply, tag = u->ctag++);
1063     pa_tagstruct_puts(reply, name);
1064     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1065     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1066     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1067     pa_tagstruct_puts(reply, u->sink_name);
1068     pa_tagstruct_putu32(reply, u->maxlength);
1069     pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1070     pa_tagstruct_putu32(reply, u->tlength);
1071     pa_tagstruct_putu32(reply, u->prebuf);
1072     pa_tagstruct_putu32(reply, u->minreq);
1073     pa_tagstruct_putu32(reply, 0);
1074     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1075     pa_tagstruct_put_cvolume(reply, &volume);
1076 #else
1077     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1078     pa_tagstruct_putu32(reply, tag = u->ctag++);
1079     pa_tagstruct_puts(reply, name);
1080     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1081     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1082     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1083     pa_tagstruct_puts(reply, u->source_name);
1084     pa_tagstruct_putu32(reply, u->maxlength);
1085     pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1086     pa_tagstruct_putu32(reply, u->fragsize);
1087 #endif
1088
1089     /* New flags added in 0.9.8 */
1090     if (u->version >= 12) {
1091         /* TODO: set these to useful values */
1092         pa_tagstruct_put_boolean(reply, FALSE); /*no_remap*/
1093         pa_tagstruct_put_boolean(reply, FALSE); /*no_remix*/
1094         pa_tagstruct_put_boolean(reply, FALSE); /*fix_format*/
1095         pa_tagstruct_put_boolean(reply, FALSE); /*fix_rate*/
1096         pa_tagstruct_put_boolean(reply, FALSE); /*fix_channels*/
1097         pa_tagstruct_put_boolean(reply, FALSE); /*no_move*/
1098         pa_tagstruct_put_boolean(reply, FALSE); /*variable_rate*/
1099     }
1100
1101     pa_pstream_send_tagstruct(u->pstream, reply);
1102     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1103
1104     pa_log_debug("Connection authenticated, creating stream ...");
1105
1106     return;
1107
1108 fail:
1109     pa_module_unload_request(u->module);
1110 }
1111
1112 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1113     struct userdata *u = userdata;
1114
1115     pa_assert(p);
1116     pa_assert(u);
1117
1118     pa_log_warn("Stream died.");
1119     pa_module_unload_request(u->module);
1120 }
1121
1122 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1123     struct userdata *u = userdata;
1124
1125     pa_assert(p);
1126     pa_assert(packet);
1127     pa_assert(u);
1128
1129     if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1130         pa_log("Invalid packet");
1131         pa_module_unload_request(u->module);
1132         return;
1133     }
1134 }
1135
1136 #ifndef TUNNEL_SINK
1137 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1138     struct userdata *u = userdata;
1139
1140     pa_assert(p);
1141     pa_assert(chunk);
1142     pa_assert(u);
1143
1144     if (channel != u->channel) {
1145         pa_log("Recieved memory block on bad channel.");
1146         pa_module_unload_request(u->module);
1147         return;
1148     }
1149
1150     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1151
1152     u->counter += chunk->length;
1153     u->counter_delta += chunk->length;
1154 }
1155
1156 #endif
1157
1158 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1159     struct userdata *u = userdata;
1160     pa_tagstruct *t;
1161     uint32_t tag;
1162
1163     pa_assert(sc);
1164     pa_assert(u);
1165     pa_assert(u->client == sc);
1166
1167     pa_socket_client_unref(u->client);
1168     u->client = NULL;
1169
1170     if (!io) {
1171         pa_log("Connection failed: %s", pa_cstrerror(errno));
1172         pa_module_unload_request(u->module);
1173         return;
1174     }
1175
1176     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1177     u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1178
1179     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1180     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1181 #ifndef TUNNEL_SINK
1182     pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1183 #endif
1184
1185     t = pa_tagstruct_new(NULL, 0);
1186     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1187     pa_tagstruct_putu32(t, tag = u->ctag++);
1188     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1189     pa_tagstruct_put_arbitrary(t, u->auth_cookie, sizeof(u->auth_cookie));
1190
1191 #ifdef HAVE_CREDS
1192 {
1193     pa_creds ucred;
1194
1195     if (pa_iochannel_creds_supported(io))
1196         pa_iochannel_creds_enable(io);
1197
1198     ucred.uid = getuid();
1199     ucred.gid = getgid();
1200
1201     pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1202 }
1203 #else
1204     pa_pstream_send_tagstruct(u->pstream, t);
1205 #endif
1206
1207     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1208
1209     pa_log_debug("Connection established, authenticating ...");
1210 }
1211
1212 #ifdef TUNNEL_SINK
1213
1214 static int sink_get_volume(pa_sink *sink) {
1215     return 0;
1216 }
1217
1218 static int sink_set_volume(pa_sink *sink) {
1219     struct userdata *u;
1220     pa_tagstruct *t;
1221     uint32_t tag;
1222
1223     pa_assert(sink);
1224     u = sink->userdata;
1225     pa_assert(u);
1226
1227     t = pa_tagstruct_new(NULL, 0);
1228     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1229     pa_tagstruct_putu32(t, tag = u->ctag++);
1230     pa_tagstruct_putu32(t, u->device_index);
1231     pa_tagstruct_put_cvolume(t, &sink->volume);
1232     pa_pstream_send_tagstruct(u->pstream, t);
1233
1234     return 0;
1235 }
1236
1237 static int sink_get_mute(pa_sink *sink) {
1238     return 0;
1239 }
1240
1241 static int sink_set_mute(pa_sink *sink) {
1242     struct userdata *u;
1243     pa_tagstruct *t;
1244     uint32_t tag;
1245
1246     pa_assert(sink);
1247     u = sink->userdata;
1248     pa_assert(u);
1249
1250     if (u->version < 11)
1251         return -1;
1252
1253     t = pa_tagstruct_new(NULL, 0);
1254     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1255     pa_tagstruct_putu32(t, tag = u->ctag++);
1256     pa_tagstruct_putu32(t, u->device_index);
1257     pa_tagstruct_put_boolean(t, !!sink->muted);
1258     pa_pstream_send_tagstruct(u->pstream, t);
1259
1260     return 0;
1261 }
1262
1263 #endif
1264
1265 static int load_key(struct userdata *u, const char*fn) {
1266     pa_assert(u);
1267
1268     u->auth_cookie_in_property = FALSE;
1269
1270     if (!fn && pa_authkey_prop_get(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0) {
1271         pa_log_debug("Using already loaded auth cookie.");
1272         pa_authkey_prop_ref(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1273         u->auth_cookie_in_property = 1;
1274         return 0;
1275     }
1276
1277     if (!fn)
1278         fn = PA_NATIVE_COOKIE_FILE;
1279
1280     if (pa_authkey_load_auto(fn, u->auth_cookie, sizeof(u->auth_cookie)) < 0)
1281         return -1;
1282
1283     pa_log_debug("Loading cookie from disk.");
1284
1285     if (pa_authkey_prop_put(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0)
1286         u->auth_cookie_in_property = TRUE;
1287
1288     return 0;
1289 }
1290
1291 int pa__init(pa_module*m) {
1292     pa_modargs *ma = NULL;
1293     struct userdata *u = NULL;
1294     pa_sample_spec ss;
1295     pa_channel_map map;
1296     char *t, *dn = NULL;
1297 #ifdef TUNNEL_SINK
1298     pa_sink_new_data data;
1299 #else
1300     pa_source_new_data data;
1301 #endif
1302
1303     pa_assert(m);
1304
1305     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1306         pa_log("failed to parse module arguments");
1307         goto fail;
1308     }
1309
1310     u = pa_xnew0(struct userdata, 1);
1311     m->userdata = u;
1312     u->module = m;
1313     u->core = m->core;
1314     u->client = NULL;
1315     u->pdispatch = NULL;
1316     u->pstream = NULL;
1317     u->server_name = NULL;
1318 #ifdef TUNNEL_SINK
1319     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1320     u->sink = NULL;
1321     u->requested_bytes = 0;
1322 #else
1323     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1324     u->source = NULL;
1325 #endif
1326     u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
1327     u->ctag = 1;
1328     u->device_index = u->channel = PA_INVALID_INDEX;
1329     u->auth_cookie_in_property = FALSE;
1330     u->time_event = NULL;
1331
1332     u->rtpoll = pa_rtpoll_new();
1333     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1334
1335     if (load_key(u, pa_modargs_get_value(ma, "cookie", NULL)) < 0)
1336         goto fail;
1337
1338     if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1339         pa_log("no server specified.");
1340         goto fail;
1341     }
1342
1343     ss = m->core->default_sample_spec;
1344     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1345         pa_log("invalid sample format specification");
1346         goto fail;
1347     }
1348
1349     if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1350         pa_log("failed to connect to server '%s'", u->server_name);
1351         goto fail;
1352     }
1353
1354     pa_socket_client_set_callback(u->client, on_connection, u);
1355
1356 #ifdef TUNNEL_SINK
1357
1358     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1359         dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1360
1361     pa_sink_new_data_init(&data);
1362     data.driver = __FILE__;
1363     data.module = m;
1364     data.namereg_fail = TRUE;
1365     pa_sink_new_data_set_name(&data, dn);
1366     pa_sink_new_data_set_sample_spec(&data, &ss);
1367     pa_sink_new_data_set_channel_map(&data, &map);
1368
1369     u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL);
1370     pa_sink_new_data_done(&data);
1371
1372     if (!u->sink) {
1373         pa_log("Failed to create sink.");
1374         goto fail;
1375     }
1376
1377     u->sink->parent.process_msg = sink_process_msg;
1378     u->sink->userdata = u;
1379     u->sink->set_state = sink_set_state;
1380 /*     u->sink->get_latency = sink_get_latency; */
1381     u->sink->get_volume = sink_get_volume;
1382     u->sink->get_mute = sink_get_mute;
1383     u->sink->set_volume = sink_set_volume;
1384     u->sink->set_mute = sink_set_mute;
1385
1386     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1387     pa_sink_set_rtpoll(u->sink, u->rtpoll);
1388     pa_sink_set_description(u->sink, t = pa_sprintf_malloc("%s%s%s", u->sink_name ? u->sink_name : "", u->sink_name ? " on " : "", u->server_name));
1389     pa_xfree(t);
1390
1391 #else
1392
1393     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1394         dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1395
1396     pa_source_new_data_init(&data);
1397     data.driver = __FILE__;
1398     data.module = m;
1399     data.namereg_fail = TRUE;
1400     pa_source_new_data_set_name(&data, dn);
1401     pa_source_new_data_set_sample_spec(&data, &ss);
1402     pa_source_new_data_set_channel_map(&data, &map);
1403
1404     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1405     pa_source_new_data_done(&data);
1406
1407     if (!u->source) {
1408         pa_log("Failed to create source.");
1409         goto fail;
1410     }
1411
1412     u->source->parent.process_msg = source_process_msg;
1413     u->source->userdata = u;
1414     u->source->set_state = source_set_state;
1415 /*     u->source->get_latency = source_get_latency; */
1416
1417     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1418     pa_source_set_rtpoll(u->source, u->rtpoll);
1419     pa_source_set_description(u->source, t = pa_sprintf_malloc("%s%s%s", u->source_name ? u->source_name : "", u->source_name ? " on " : "", u->server_name));
1420     pa_xfree(t);
1421 #endif
1422
1423     pa_xfree(dn);
1424
1425     u->time_event = NULL;
1426
1427     u->maxlength = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MAXLENGTH_MSEC, &ss);
1428 #ifdef TUNNEL_SINK
1429     u->tlength = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &ss);
1430     u->minreq = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &ss);
1431     u->prebuf = u->tlength;
1432 #else
1433     u->fragsize = pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &ss);
1434 #endif
1435
1436     u->counter = u->counter_delta = 0;
1437     pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1438
1439     if (!(u->thread = pa_thread_new(thread_func, u))) {
1440         pa_log("Failed to create thread.");
1441         goto fail;
1442     }
1443
1444 #ifdef TUNNEL_SINK
1445     pa_sink_put(u->sink);
1446 #else
1447     pa_source_put(u->source);
1448 #endif
1449
1450     pa_modargs_free(ma);
1451
1452     return 0;
1453
1454 fail:
1455     pa__done(m);
1456
1457     if (ma)
1458         pa_modargs_free(ma);
1459
1460     pa_xfree(dn);
1461
1462     return  -1;
1463 }
1464
1465 void pa__done(pa_module*m) {
1466     struct userdata* u;
1467
1468     pa_assert(m);
1469
1470     if (!(u = m->userdata))
1471         return;
1472
1473 #ifdef TUNNEL_SINK
1474     if (u->sink)
1475         pa_sink_unlink(u->sink);
1476 #else
1477     if (u->source)
1478         pa_source_unlink(u->source);
1479 #endif
1480
1481     if (u->thread) {
1482         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1483         pa_thread_free(u->thread);
1484     }
1485
1486     pa_thread_mq_done(&u->thread_mq);
1487
1488 #ifdef TUNNEL_SINK
1489     if (u->sink)
1490         pa_sink_unref(u->sink);
1491 #else
1492     if (u->source)
1493         pa_source_unref(u->source);
1494 #endif
1495
1496     if (u->rtpoll)
1497         pa_rtpoll_free(u->rtpoll);
1498
1499     if (u->pstream) {
1500         pa_pstream_unlink(u->pstream);
1501         pa_pstream_unref(u->pstream);
1502     }
1503
1504     if (u->pdispatch)
1505         pa_pdispatch_unref(u->pdispatch);
1506
1507     if (u->client)
1508         pa_socket_client_unref(u->client);
1509
1510     if (u->auth_cookie_in_property)
1511         pa_authkey_prop_unref(m->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
1512
1513     if (u->smoother)
1514         pa_smoother_free(u->smoother);
1515
1516     if (u->time_event)
1517         u->core->mainloop->time_free(u->time_event);
1518
1519 #ifdef TUNNEL_SINK
1520     pa_xfree(u->sink_name);
1521 #else
1522     pa_xfree(u->source_name);
1523 #endif
1524     pa_xfree(u->server_name);
1525
1526     pa_xfree(u->device_description);
1527     pa_xfree(u->server_fqdn);
1528     pa_xfree(u->user_name);
1529
1530     pa_xfree(u);
1531 }