Add "Rear Mic" to alsa mixer paths.
[profile/ivi/pulseaudio.git] / src / modules / module-tunnel.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58 #include <pulsecore/mcalign.h>
59
60 #ifdef TUNNEL_SINK
61 #include "module-tunnel-sink-symdef.h"
62 #else
63 #include "module-tunnel-source-symdef.h"
64 #endif
65
66 #ifdef TUNNEL_SINK
67 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 PA_MODULE_USAGE(
69         "sink_name=<name for the local sink> "
70         "sink_properties=<properties for the local sink> "
71         "server=<address> "
72         "sink=<remote sink name> "
73         "cookie=<filename> "
74         "format=<sample format> "
75         "channels=<number of channels> "
76         "rate=<sample rate> "
77         "channel_map=<channel map>");
78 #else
79 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 PA_MODULE_USAGE(
81         "source_name=<name for the local source> "
82         "source_properties=<properties for the local source> "
83         "server=<address> "
84         "source=<remote source name> "
85         "cookie=<filename> "
86         "format=<sample format> "
87         "channels=<number of channels> "
88         "rate=<sample rate> "
89         "channel_map=<channel map>");
90 #endif
91
92 PA_MODULE_AUTHOR("Lennart Poettering");
93 PA_MODULE_VERSION(PACKAGE_VERSION);
94 PA_MODULE_LOAD_ONCE(FALSE);
95
96 static const char* const valid_modargs[] = {
97     "server",
98     "cookie",
99     "format",
100     "channels",
101     "rate",
102 #ifdef TUNNEL_SINK
103     "sink_name",
104     "sink_properties",
105     "sink",
106 #else
107     "source_name",
108     "source_properties",
109     "source",
110 #endif
111     "channel_map",
112     NULL,
113 };
114
115 #define DEFAULT_TIMEOUT 5
116
117 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
118
119 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
120
121 #ifdef TUNNEL_SINK
122
123 enum {
124     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
125     SINK_MESSAGE_REMOTE_SUSPEND,
126     SINK_MESSAGE_UPDATE_LATENCY,
127     SINK_MESSAGE_POST
128 };
129
130 #define DEFAULT_TLENGTH_MSEC 150
131 #define DEFAULT_MINREQ_MSEC 25
132
133 #else
134
135 enum {
136     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
137     SOURCE_MESSAGE_REMOTE_SUSPEND,
138     SOURCE_MESSAGE_UPDATE_LATENCY
139 };
140
141 #define DEFAULT_FRAGSIZE_MSEC 25
142
143 #endif
144
145 #ifdef TUNNEL_SINK
146 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 #endif
149 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
156
157 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
158 #ifdef TUNNEL_SINK
159     [PA_COMMAND_REQUEST] = command_request,
160     [PA_COMMAND_STARTED] = command_started,
161 #endif
162     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
163     [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
164     [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
165     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
166     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
167     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
168     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
169     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
170     [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
171     [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
172     [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
173     [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
174     [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
175     [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
176 };
177
178 struct userdata {
179     pa_core *core;
180     pa_module *module;
181
182     pa_thread_mq thread_mq;
183     pa_rtpoll *rtpoll;
184     pa_thread *thread;
185
186     pa_socket_client *client;
187     pa_pstream *pstream;
188     pa_pdispatch *pdispatch;
189
190     char *server_name;
191 #ifdef TUNNEL_SINK
192     char *sink_name;
193     pa_sink *sink;
194     size_t requested_bytes;
195 #else
196     char *source_name;
197     pa_source *source;
198     pa_mcalign *mcalign;
199 #endif
200
201     pa_auth_cookie *auth_cookie;
202
203     uint32_t version;
204     uint32_t ctag;
205     uint32_t device_index;
206     uint32_t channel;
207
208     int64_t counter, counter_delta;
209
210     pa_bool_t remote_corked:1;
211     pa_bool_t remote_suspended:1;
212
213     pa_usec_t transport_usec; /* maintained in the main thread */
214     pa_usec_t thread_transport_usec; /* maintained in the IO thread */
215
216     uint32_t ignore_latency_before;
217
218     pa_time_event *time_event;
219
220     pa_smoother *smoother;
221
222     char *device_description;
223     char *server_fqdn;
224     char *user_name;
225
226     uint32_t maxlength;
227 #ifdef TUNNEL_SINK
228     uint32_t tlength;
229     uint32_t minreq;
230     uint32_t prebuf;
231 #else
232     uint32_t fragsize;
233 #endif
234 };
235
236 static void request_latency(struct userdata *u);
237
238 /* Called from main context */
239 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
240     pa_log_debug("Got stream or client event.");
241 }
242
243 /* Called from main context */
244 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
245     struct userdata *u = userdata;
246
247     pa_assert(pd);
248     pa_assert(t);
249     pa_assert(u);
250     pa_assert(u->pdispatch == pd);
251
252     pa_log_warn("Stream killed");
253     pa_module_unload_request(u->module, TRUE);
254 }
255
256 /* Called from main context */
257 static void command_overflow_or_underflow(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
258     struct userdata *u = userdata;
259
260     pa_assert(pd);
261     pa_assert(t);
262     pa_assert(u);
263     pa_assert(u->pdispatch == pd);
264
265     pa_log_info("Server signalled buffer overrun/underrun.");
266     request_latency(u);
267 }
268
269 /* Called from main context */
270 static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
271     struct userdata *u = userdata;
272     uint32_t channel;
273     pa_bool_t suspended;
274
275     pa_assert(pd);
276     pa_assert(t);
277     pa_assert(u);
278     pa_assert(u->pdispatch == pd);
279
280     if (pa_tagstruct_getu32(t, &channel) < 0 ||
281         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
282         !pa_tagstruct_eof(t)) {
283
284         pa_log("Invalid packet.");
285         pa_module_unload_request(u->module, TRUE);
286         return;
287     }
288
289     pa_log_debug("Server reports device suspend.");
290
291 #ifdef TUNNEL_SINK
292     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #else
294     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
295 #endif
296
297     request_latency(u);
298 }
299
300 /* Called from main context */
301 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
302     struct userdata *u = userdata;
303     uint32_t channel, di;
304     const char *dn;
305     pa_bool_t suspended;
306
307     pa_assert(pd);
308     pa_assert(t);
309     pa_assert(u);
310     pa_assert(u->pdispatch == pd);
311
312     if (pa_tagstruct_getu32(t, &channel) < 0 ||
313         pa_tagstruct_getu32(t, &di) < 0 ||
314         pa_tagstruct_gets(t, &dn) < 0 ||
315         pa_tagstruct_get_boolean(t, &suspended) < 0) {
316
317         pa_log_error("Invalid packet.");
318         pa_module_unload_request(u->module, TRUE);
319         return;
320     }
321
322     pa_log_debug("Server reports a stream move.");
323
324 #ifdef TUNNEL_SINK
325     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #else
327     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
328 #endif
329
330     request_latency(u);
331 }
332
333 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
334     struct userdata *u = userdata;
335     uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
336     pa_usec_t usec;
337
338     pa_assert(pd);
339     pa_assert(t);
340     pa_assert(u);
341     pa_assert(u->pdispatch == pd);
342
343     if (pa_tagstruct_getu32(t, &channel) < 0 ||
344         pa_tagstruct_getu32(t, &maxlength) < 0) {
345
346         pa_log_error("Invalid packet.");
347         pa_module_unload_request(u->module, TRUE);
348         return;
349     }
350
351     if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
352         if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
353             pa_tagstruct_get_usec(t, &usec) < 0) {
354
355             pa_log_error("Invalid packet.");
356             pa_module_unload_request(u->module, TRUE);
357             return;
358         }
359     } else {
360         if (pa_tagstruct_getu32(t, &tlength) < 0 ||
361             pa_tagstruct_getu32(t, &prebuf) < 0 ||
362             pa_tagstruct_getu32(t, &minreq) < 0 ||
363             pa_tagstruct_get_usec(t, &usec) < 0) {
364
365             pa_log_error("Invalid packet.");
366             pa_module_unload_request(u->module, TRUE);
367             return;
368         }
369     }
370
371 #ifdef TUNNEL_SINK
372     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
373 #endif
374
375     request_latency(u);
376 }
377
378 #ifdef TUNNEL_SINK
379
380 /* Called from main context */
381 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
382    struct userdata *u = userdata;
383
384     pa_assert(pd);
385     pa_assert(t);
386     pa_assert(u);
387     pa_assert(u->pdispatch == pd);
388
389     pa_log_debug("Server reports playback started.");
390     request_latency(u);
391 }
392
393 #endif
394
395 /* Called from IO thread context */
396 static void check_smoother_status(struct userdata *u, pa_bool_t past)  {
397     pa_usec_t x;
398
399     pa_assert(u);
400
401     x = pa_rtclock_now();
402
403     /* Correct by the time the requested issued needs to travel to the
404      * other side.  This is a valid thread-safe access, because the
405      * main thread is waiting for us */
406
407     if (past)
408         x -= u->thread_transport_usec;
409     else
410         x += u->thread_transport_usec;
411
412     if (u->remote_suspended || u->remote_corked)
413         pa_smoother_pause(u->smoother, x);
414     else
415         pa_smoother_resume(u->smoother, x, TRUE);
416 }
417
418 /* Called from IO thread context */
419 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
420     pa_assert(u);
421
422     if (u->remote_corked == cork)
423         return;
424
425     u->remote_corked = cork;
426     check_smoother_status(u, FALSE);
427 }
428
429 /* Called from main context */
430 static void stream_cork(struct userdata *u, pa_bool_t cork) {
431     pa_tagstruct *t;
432     pa_assert(u);
433
434     if (!u->pstream)
435         return;
436
437     t = pa_tagstruct_new(NULL, 0);
438 #ifdef TUNNEL_SINK
439     pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
440 #else
441     pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
442 #endif
443     pa_tagstruct_putu32(t, u->ctag++);
444     pa_tagstruct_putu32(t, u->channel);
445     pa_tagstruct_put_boolean(t, !!cork);
446     pa_pstream_send_tagstruct(u->pstream, t);
447
448     request_latency(u);
449 }
450
451 /* Called from IO thread context */
452 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
453     pa_assert(u);
454
455     if (u->remote_suspended == suspend)
456         return;
457
458     u->remote_suspended = suspend;
459     check_smoother_status(u, TRUE);
460 }
461
462 #ifdef TUNNEL_SINK
463
464 /* Called from IO thread context */
465 static void send_data(struct userdata *u) {
466     pa_assert(u);
467
468     while (u->requested_bytes > 0) {
469         pa_memchunk memchunk;
470
471         pa_sink_render(u->sink, u->requested_bytes, &memchunk);
472         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
473         pa_memblock_unref(memchunk.memblock);
474
475         u->requested_bytes -= memchunk.length;
476
477         u->counter += (int64_t) memchunk.length;
478     }
479 }
480
481 /* This function is called from IO context -- except when it is not. */
482 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
483     struct userdata *u = PA_SINK(o)->userdata;
484
485     switch (code) {
486
487         case PA_SINK_MESSAGE_SET_STATE: {
488             int r;
489
490             /* First, change the state, because otherwide pa_sink_render() would fail */
491             if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
492
493                 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
494
495                 if (PA_SINK_IS_OPENED(u->sink->state))
496                     send_data(u);
497             }
498
499             return r;
500         }
501
502         case PA_SINK_MESSAGE_GET_LATENCY: {
503             pa_usec_t yl, yr, *usec = data;
504
505             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
506             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
507
508             *usec = yl > yr ? yl - yr : 0;
509             return 0;
510         }
511
512         case SINK_MESSAGE_REQUEST:
513
514             pa_assert(offset > 0);
515             u->requested_bytes += (size_t) offset;
516
517             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
518                 send_data(u);
519
520             return 0;
521
522
523         case SINK_MESSAGE_REMOTE_SUSPEND:
524
525             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
526             return 0;
527
528
529         case SINK_MESSAGE_UPDATE_LATENCY: {
530             pa_usec_t y;
531
532             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
533
534             if (y > (pa_usec_t) offset)
535                 y -= (pa_usec_t) offset;
536             else
537                 y = 0;
538
539             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
540
541             /* We can access this freely here, since the main thread is waiting for us */
542             u->thread_transport_usec = u->transport_usec;
543
544             return 0;
545         }
546
547         case SINK_MESSAGE_POST:
548
549             /* OK, This might be a bit confusing. This message is
550              * delivered to us from the main context -- NOT from the
551              * IO thread context where the rest of the messages are
552              * dispatched. Yeah, ugly, but I am a lazy bastard. */
553
554             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
555
556             u->counter_delta += (int64_t) chunk->length;
557
558             return 0;
559     }
560
561     return pa_sink_process_msg(o, code, data, offset, chunk);
562 }
563
564 /* Called from main context */
565 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
566     struct userdata *u;
567     pa_sink_assert_ref(s);
568     u = s->userdata;
569
570     switch ((pa_sink_state_t) state) {
571
572         case PA_SINK_SUSPENDED:
573             pa_assert(PA_SINK_IS_OPENED(s->state));
574             stream_cork(u, TRUE);
575             break;
576
577         case PA_SINK_IDLE:
578         case PA_SINK_RUNNING:
579             if (s->state == PA_SINK_SUSPENDED)
580                 stream_cork(u, FALSE);
581             break;
582
583         case PA_SINK_UNLINKED:
584         case PA_SINK_INIT:
585         case PA_SINK_INVALID_STATE:
586             ;
587     }
588
589     return 0;
590 }
591
592 #else
593
594 /* This function is called from IO context -- except when it is not. */
595 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
596     struct userdata *u = PA_SOURCE(o)->userdata;
597
598     switch (code) {
599
600         case PA_SOURCE_MESSAGE_SET_STATE: {
601             int r;
602
603             if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
604                 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
605
606             return r;
607         }
608
609         case PA_SOURCE_MESSAGE_GET_LATENCY: {
610             pa_usec_t yr, yl, *usec = data;
611
612             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
613             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
614
615             *usec = yr > yl ? yr - yl : 0;
616             return 0;
617         }
618
619         case SOURCE_MESSAGE_POST: {
620             pa_memchunk c;
621
622             pa_mcalign_push(u->mcalign, chunk);
623
624             while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
625
626                 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
627                     pa_source_post(u->source, &c);
628
629                 pa_memblock_unref(c.memblock);
630
631                 u->counter += (int64_t) c.length;
632             }
633
634             return 0;
635         }
636
637         case SOURCE_MESSAGE_REMOTE_SUSPEND:
638
639             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
640             return 0;
641
642         case SOURCE_MESSAGE_UPDATE_LATENCY: {
643             pa_usec_t y;
644
645             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
646             y += (pa_usec_t) offset;
647
648             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
649
650             /* We can access this freely here, since the main thread is waiting for us */
651             u->thread_transport_usec = u->transport_usec;
652
653             return 0;
654         }
655     }
656
657     return pa_source_process_msg(o, code, data, offset, chunk);
658 }
659
660 /* Called from main context */
661 static int source_set_state(pa_source *s, pa_source_state_t state) {
662     struct userdata *u;
663     pa_source_assert_ref(s);
664     u = s->userdata;
665
666     switch ((pa_source_state_t) state) {
667
668         case PA_SOURCE_SUSPENDED:
669             pa_assert(PA_SOURCE_IS_OPENED(s->state));
670             stream_cork(u, TRUE);
671             break;
672
673         case PA_SOURCE_IDLE:
674         case PA_SOURCE_RUNNING:
675             if (s->state == PA_SOURCE_SUSPENDED)
676                 stream_cork(u, FALSE);
677             break;
678
679         case PA_SOURCE_UNLINKED:
680         case PA_SOURCE_INIT:
681         case PA_SINK_INVALID_STATE:
682             ;
683     }
684
685     return 0;
686 }
687
688 #endif
689
690 static void thread_func(void *userdata) {
691     struct userdata *u = userdata;
692
693     pa_assert(u);
694
695     pa_log_debug("Thread starting up");
696
697     pa_thread_mq_install(&u->thread_mq);
698
699     for (;;) {
700         int ret;
701
702 #ifdef TUNNEL_SINK
703         if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
704             if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
705                 pa_sink_process_rewind(u->sink, 0);
706 #endif
707
708         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
709             goto fail;
710
711         if (ret == 0)
712             goto finish;
713     }
714
715 fail:
716     /* If this was no regular exit from the loop we have to continue
717      * processing messages until we received PA_MESSAGE_SHUTDOWN */
718     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
719     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
720
721 finish:
722     pa_log_debug("Thread shutting down");
723 }
724
725 #ifdef TUNNEL_SINK
726 /* Called from main context */
727 static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
728     struct userdata *u = userdata;
729     uint32_t bytes, channel;
730
731     pa_assert(pd);
732     pa_assert(command == PA_COMMAND_REQUEST);
733     pa_assert(t);
734     pa_assert(u);
735     pa_assert(u->pdispatch == pd);
736
737     if (pa_tagstruct_getu32(t, &channel) < 0 ||
738         pa_tagstruct_getu32(t, &bytes) < 0) {
739         pa_log("Invalid protocol reply");
740         goto fail;
741     }
742
743     if (channel != u->channel) {
744         pa_log("Received data for invalid channel");
745         goto fail;
746     }
747
748     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
749     return;
750
751 fail:
752     pa_module_unload_request(u->module, TRUE);
753 }
754
755 #endif
756
757 /* Called from main context */
758 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
759     struct userdata *u = userdata;
760     pa_usec_t sink_usec, source_usec;
761     pa_bool_t playing;
762     int64_t write_index, read_index;
763     struct timeval local, remote, now;
764     pa_sample_spec *ss;
765     int64_t delay;
766
767     pa_assert(pd);
768     pa_assert(u);
769
770     if (command != PA_COMMAND_REPLY) {
771         if (command == PA_COMMAND_ERROR)
772             pa_log("Failed to get latency.");
773         else
774             pa_log("Protocol error.");
775         goto fail;
776     }
777
778     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
779         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
780         pa_tagstruct_get_boolean(t, &playing) < 0 ||
781         pa_tagstruct_get_timeval(t, &local) < 0 ||
782         pa_tagstruct_get_timeval(t, &remote) < 0 ||
783         pa_tagstruct_gets64(t, &write_index) < 0 ||
784         pa_tagstruct_gets64(t, &read_index) < 0) {
785         pa_log("Invalid reply.");
786         goto fail;
787     }
788
789 #ifdef TUNNEL_SINK
790     if (u->version >= 13) {
791         uint64_t underrun_for = 0, playing_for = 0;
792
793         if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
794             pa_tagstruct_getu64(t, &playing_for) < 0) {
795             pa_log("Invalid reply.");
796             goto fail;
797         }
798     }
799 #endif
800
801     if (!pa_tagstruct_eof(t)) {
802         pa_log("Invalid reply.");
803         goto fail;
804     }
805
806     if (tag < u->ignore_latency_before) {
807         return;
808     }
809
810     pa_gettimeofday(&now);
811
812     /* Calculate transport usec */
813     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
814         /* local and remote seem to have synchronized clocks */
815 #ifdef TUNNEL_SINK
816         u->transport_usec = pa_timeval_diff(&remote, &local);
817 #else
818         u->transport_usec = pa_timeval_diff(&now, &remote);
819 #endif
820     } else
821         u->transport_usec = pa_timeval_diff(&now, &local)/2;
822
823     /* First, take the device's delay */
824 #ifdef TUNNEL_SINK
825     delay = (int64_t) sink_usec;
826     ss = &u->sink->sample_spec;
827 #else
828     delay = (int64_t) source_usec;
829     ss = &u->source->sample_spec;
830 #endif
831
832     /* Add the length of our server-side buffer */
833     if (write_index >= read_index)
834         delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
835     else
836         delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
837
838     /* Our measurements are already out of date, hence correct by the     *
839      * transport latency */
840 #ifdef TUNNEL_SINK
841     delay -= (int64_t) u->transport_usec;
842 #else
843     delay += (int64_t) u->transport_usec;
844 #endif
845
846     /* Now correct by what we have have read/written since we requested the update */
847 #ifdef TUNNEL_SINK
848     delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
849 #else
850     delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
851 #endif
852
853 #ifdef TUNNEL_SINK
854     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
855 #else
856     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
857 #endif
858
859     return;
860
861 fail:
862
863     pa_module_unload_request(u->module, TRUE);
864 }
865
866 /* Called from main context */
867 static void request_latency(struct userdata *u) {
868     pa_tagstruct *t;
869     struct timeval now;
870     uint32_t tag;
871     pa_assert(u);
872
873     t = pa_tagstruct_new(NULL, 0);
874 #ifdef TUNNEL_SINK
875     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
876 #else
877     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
878 #endif
879     pa_tagstruct_putu32(t, tag = u->ctag++);
880     pa_tagstruct_putu32(t, u->channel);
881
882     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
883
884     pa_pstream_send_tagstruct(u->pstream, t);
885     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
886
887     u->ignore_latency_before = tag;
888     u->counter_delta = 0;
889 }
890
891 /* Called from main context */
892 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
893     struct userdata *u = userdata;
894
895     pa_assert(m);
896     pa_assert(e);
897     pa_assert(u);
898
899     request_latency(u);
900
901     pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
902 }
903
904 /* Called from main context */
905 static void update_description(struct userdata *u) {
906     char *d;
907     char un[128], hn[128];
908     pa_tagstruct *t;
909
910     pa_assert(u);
911
912     if (!u->server_fqdn || !u->user_name || !u->device_description)
913         return;
914
915     d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
916
917 #ifdef TUNNEL_SINK
918     pa_sink_set_description(u->sink, d);
919     pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
920     pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
921     pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
922 #else
923     pa_source_set_description(u->source, d);
924     pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
925     pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
926     pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
927 #endif
928
929     pa_xfree(d);
930
931     d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
932                           pa_get_user_name(un, sizeof(un)),
933                           pa_get_host_name(hn, sizeof(hn)));
934
935     t = pa_tagstruct_new(NULL, 0);
936 #ifdef TUNNEL_SINK
937     pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
938 #else
939     pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
940 #endif
941     pa_tagstruct_putu32(t, u->ctag++);
942     pa_tagstruct_putu32(t, u->channel);
943     pa_tagstruct_puts(t, d);
944     pa_pstream_send_tagstruct(u->pstream, t);
945
946     pa_xfree(d);
947 }
948
949 /* Called from main context */
950 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
951     struct userdata *u = userdata;
952     pa_sample_spec ss;
953     pa_channel_map cm;
954     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
955     uint32_t cookie;
956
957     pa_assert(pd);
958     pa_assert(u);
959
960     if (command != PA_COMMAND_REPLY) {
961         if (command == PA_COMMAND_ERROR)
962             pa_log("Failed to get info.");
963         else
964             pa_log("Protocol error.");
965         goto fail;
966     }
967
968     if (pa_tagstruct_gets(t, &server_name) < 0 ||
969         pa_tagstruct_gets(t, &server_version) < 0 ||
970         pa_tagstruct_gets(t, &user_name) < 0 ||
971         pa_tagstruct_gets(t, &host_name) < 0 ||
972         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
973         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
974         pa_tagstruct_gets(t, &default_source_name) < 0 ||
975         pa_tagstruct_getu32(t, &cookie) < 0 ||
976         (u->version >= 15 &&
977          pa_tagstruct_get_channel_map(t, &cm) < 0)) {
978
979         pa_log("Parse failure");
980         goto fail;
981     }
982
983     if (!pa_tagstruct_eof(t)) {
984         pa_log("Packet too long");
985         goto fail;
986     }
987
988     pa_xfree(u->server_fqdn);
989     u->server_fqdn = pa_xstrdup(host_name);
990
991     pa_xfree(u->user_name);
992     u->user_name = pa_xstrdup(user_name);
993
994     update_description(u);
995
996     return;
997
998 fail:
999     pa_module_unload_request(u->module, TRUE);
1000 }
1001
1002 #ifdef TUNNEL_SINK
1003
1004 /* Called from main context */
1005 static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1006     struct userdata *u = userdata;
1007     uint32_t idx, owner_module, monitor_source, flags;
1008     const char *name, *description, *monitor_source_name, *driver;
1009     pa_sample_spec ss;
1010     pa_channel_map cm;
1011     pa_cvolume volume;
1012     pa_bool_t mute;
1013     pa_usec_t latency;
1014     pa_proplist *pl;
1015
1016     pa_assert(pd);
1017     pa_assert(u);
1018
1019     pl = pa_proplist_new();
1020
1021     if (command != PA_COMMAND_REPLY) {
1022         if (command == PA_COMMAND_ERROR)
1023             pa_log("Failed to get info.");
1024         else
1025             pa_log("Protocol error.");
1026         goto fail;
1027     }
1028
1029     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1030         pa_tagstruct_gets(t, &name) < 0 ||
1031         pa_tagstruct_gets(t, &description) < 0 ||
1032         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1033         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1034         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1035         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1036         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1037         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1038         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1039         pa_tagstruct_get_usec(t, &latency) < 0 ||
1040         pa_tagstruct_gets(t, &driver) < 0 ||
1041         pa_tagstruct_getu32(t, &flags) < 0) {
1042
1043         pa_log("Parse failure");
1044         goto fail;
1045     }
1046
1047     if (u->version >= 13) {
1048         pa_usec_t configured_latency;
1049
1050         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1051             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1052
1053             pa_log("Parse failure");
1054             goto fail;
1055         }
1056     }
1057
1058     if (u->version >= 15) {
1059         pa_volume_t base_volume;
1060         uint32_t state, n_volume_steps, card;
1061
1062         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1063             pa_tagstruct_getu32(t, &state) < 0 ||
1064             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1065             pa_tagstruct_getu32(t, &card) < 0) {
1066
1067             pa_log("Parse failure");
1068             goto fail;
1069         }
1070     }
1071
1072     if (u->version >= 16) {
1073         uint32_t n_ports;
1074         const char *s;
1075
1076         if (pa_tagstruct_getu32(t, &n_ports)) {
1077             pa_log("Parse failure");
1078             goto fail;
1079         }
1080
1081         for (uint32_t j = 0; j < n_ports; j++) {
1082             uint32_t priority;
1083
1084             if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1085                 pa_tagstruct_gets(t, &s) < 0 || /* description */
1086                 pa_tagstruct_getu32(t, &priority) < 0) {
1087
1088                 pa_log("Parse failure");
1089                 goto fail;
1090             }
1091         }
1092
1093         if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1094             pa_log("Parse failure");
1095             goto fail;
1096         }
1097     }
1098
1099     if (!pa_tagstruct_eof(t)) {
1100         pa_log("Packet too long");
1101         goto fail;
1102     }
1103
1104     pa_proplist_free(pl);
1105
1106     if (!u->sink_name || strcmp(name, u->sink_name))
1107         return;
1108
1109     pa_xfree(u->device_description);
1110     u->device_description = pa_xstrdup(description);
1111
1112     update_description(u);
1113
1114     return;
1115
1116 fail:
1117     pa_module_unload_request(u->module, TRUE);
1118     pa_proplist_free(pl);
1119 }
1120
1121 /* Called from main context */
1122 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1123     struct userdata *u = userdata;
1124     uint32_t idx, owner_module, client, sink;
1125     pa_usec_t buffer_usec, sink_usec;
1126     const char *name, *driver, *resample_method;
1127     pa_bool_t mute = FALSE;
1128     pa_sample_spec sample_spec;
1129     pa_channel_map channel_map;
1130     pa_cvolume volume;
1131     pa_proplist *pl;
1132
1133     pa_assert(pd);
1134     pa_assert(u);
1135
1136     pl = pa_proplist_new();
1137
1138     if (command != PA_COMMAND_REPLY) {
1139         if (command == PA_COMMAND_ERROR)
1140             pa_log("Failed to get info.");
1141         else
1142             pa_log("Protocol error.");
1143         goto fail;
1144     }
1145
1146     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1147         pa_tagstruct_gets(t, &name) < 0 ||
1148         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1149         pa_tagstruct_getu32(t, &client) < 0 ||
1150         pa_tagstruct_getu32(t, &sink) < 0 ||
1151         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1152         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1153         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1154         pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1155         pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1156         pa_tagstruct_gets(t, &resample_method) < 0 ||
1157         pa_tagstruct_gets(t, &driver) < 0) {
1158
1159         pa_log("Parse failure");
1160         goto fail;
1161     }
1162
1163     if (u->version >= 11) {
1164         if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1165
1166             pa_log("Parse failure");
1167             goto fail;
1168         }
1169     }
1170
1171     if (u->version >= 13) {
1172         if (pa_tagstruct_get_proplist(t, pl) < 0) {
1173
1174             pa_log("Parse failure");
1175             goto fail;
1176         }
1177     }
1178
1179     if (!pa_tagstruct_eof(t)) {
1180         pa_log("Packet too long");
1181         goto fail;
1182     }
1183
1184     pa_proplist_free(pl);
1185
1186     if (idx != u->device_index)
1187         return;
1188
1189     pa_assert(u->sink);
1190
1191     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1192         pa_cvolume_equal(&volume, &u->sink->real_volume))
1193         return;
1194
1195     pa_sink_volume_changed(u->sink, &volume);
1196
1197     if (u->version >= 11)
1198         pa_sink_mute_changed(u->sink, mute);
1199
1200     return;
1201
1202 fail:
1203     pa_module_unload_request(u->module, TRUE);
1204     pa_proplist_free(pl);
1205 }
1206
1207 #else
1208
1209 /* Called from main context */
1210 static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1211     struct userdata *u = userdata;
1212     uint32_t idx, owner_module, monitor_of_sink, flags;
1213     const char *name, *description, *monitor_of_sink_name, *driver;
1214     pa_sample_spec ss;
1215     pa_channel_map cm;
1216     pa_cvolume volume;
1217     pa_bool_t mute;
1218     pa_usec_t latency, configured_latency;
1219     pa_proplist *pl;
1220
1221     pa_assert(pd);
1222     pa_assert(u);
1223
1224     pl = pa_proplist_new();
1225
1226     if (command != PA_COMMAND_REPLY) {
1227         if (command == PA_COMMAND_ERROR)
1228             pa_log("Failed to get info.");
1229         else
1230             pa_log("Protocol error.");
1231         goto fail;
1232     }
1233
1234     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1235         pa_tagstruct_gets(t, &name) < 0 ||
1236         pa_tagstruct_gets(t, &description) < 0 ||
1237         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1238         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1239         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1240         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1241         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1242         pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1243         pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1244         pa_tagstruct_get_usec(t, &latency) < 0 ||
1245         pa_tagstruct_gets(t, &driver) < 0 ||
1246         pa_tagstruct_getu32(t, &flags) < 0) {
1247
1248         pa_log("Parse failure");
1249         goto fail;
1250     }
1251
1252     if (u->version >= 13) {
1253         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1254             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1255
1256             pa_log("Parse failure");
1257             goto fail;
1258         }
1259     }
1260
1261     if (u->version >= 15) {
1262         pa_volume_t base_volume;
1263         uint32_t state, n_volume_steps, card;
1264
1265         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1266             pa_tagstruct_getu32(t, &state) < 0 ||
1267             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1268             pa_tagstruct_getu32(t, &card) < 0) {
1269
1270             pa_log("Parse failure");
1271             goto fail;
1272         }
1273     }
1274
1275     if (u->version >= 16) {
1276         uint32_t n_ports;
1277         const char *s;
1278
1279         if (pa_tagstruct_getu32(t, &n_ports)) {
1280             pa_log("Parse failure");
1281             goto fail;
1282         }
1283
1284         for (uint32_t j = 0; j < n_ports; j++) {
1285             uint32_t priority;
1286
1287             if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1288                 pa_tagstruct_gets(t, &s) < 0 || /* description */
1289                 pa_tagstruct_getu32(t, &priority) < 0) {
1290
1291                 pa_log("Parse failure");
1292                 goto fail;
1293             }
1294         }
1295
1296         if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1297             pa_log("Parse failure");
1298             goto fail;
1299         }
1300     }
1301
1302     if (!pa_tagstruct_eof(t)) {
1303         pa_log("Packet too long");
1304         goto fail;
1305     }
1306
1307     pa_proplist_free(pl);
1308
1309     if (!u->source_name || strcmp(name, u->source_name))
1310         return;
1311
1312     pa_xfree(u->device_description);
1313     u->device_description = pa_xstrdup(description);
1314
1315     update_description(u);
1316
1317     return;
1318
1319 fail:
1320     pa_module_unload_request(u->module, TRUE);
1321     pa_proplist_free(pl);
1322 }
1323
1324 #endif
1325
1326 /* Called from main context */
1327 static void request_info(struct userdata *u) {
1328     pa_tagstruct *t;
1329     uint32_t tag;
1330     pa_assert(u);
1331
1332     t = pa_tagstruct_new(NULL, 0);
1333     pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1334     pa_tagstruct_putu32(t, tag = u->ctag++);
1335     pa_pstream_send_tagstruct(u->pstream, t);
1336     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1337
1338 #ifdef TUNNEL_SINK
1339     t = pa_tagstruct_new(NULL, 0);
1340     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1341     pa_tagstruct_putu32(t, tag = u->ctag++);
1342     pa_tagstruct_putu32(t, u->device_index);
1343     pa_pstream_send_tagstruct(u->pstream, t);
1344     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1345
1346     if (u->sink_name) {
1347         t = pa_tagstruct_new(NULL, 0);
1348         pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1349         pa_tagstruct_putu32(t, tag = u->ctag++);
1350         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1351         pa_tagstruct_puts(t, u->sink_name);
1352         pa_pstream_send_tagstruct(u->pstream, t);
1353         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1354     }
1355 #else
1356     if (u->source_name) {
1357         t = pa_tagstruct_new(NULL, 0);
1358         pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1359         pa_tagstruct_putu32(t, tag = u->ctag++);
1360         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1361         pa_tagstruct_puts(t, u->source_name);
1362         pa_pstream_send_tagstruct(u->pstream, t);
1363         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1364     }
1365 #endif
1366 }
1367
1368 /* Called from main context */
1369 static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1370     struct userdata *u = userdata;
1371     pa_subscription_event_type_t e;
1372     uint32_t idx;
1373
1374     pa_assert(pd);
1375     pa_assert(t);
1376     pa_assert(u);
1377     pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1378
1379     if (pa_tagstruct_getu32(t, &e) < 0 ||
1380         pa_tagstruct_getu32(t, &idx) < 0) {
1381         pa_log("Invalid protocol reply");
1382         pa_module_unload_request(u->module, TRUE);
1383         return;
1384     }
1385
1386     if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1387 #ifdef TUNNEL_SINK
1388         e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1389         e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1390 #else
1391         e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1392 #endif
1393         )
1394         return;
1395
1396     request_info(u);
1397 }
1398
1399 /* Called from main context */
1400 static void start_subscribe(struct userdata *u) {
1401     pa_tagstruct *t;
1402     pa_assert(u);
1403
1404     t = pa_tagstruct_new(NULL, 0);
1405     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1406     pa_tagstruct_putu32(t, u->ctag++);
1407     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1408 #ifdef TUNNEL_SINK
1409                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1410 #else
1411                         PA_SUBSCRIPTION_MASK_SOURCE
1412 #endif
1413                         );
1414
1415     pa_pstream_send_tagstruct(u->pstream, t);
1416 }
1417
1418 /* Called from main context */
1419 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1420     struct userdata *u = userdata;
1421 #ifdef TUNNEL_SINK
1422     uint32_t bytes;
1423 #endif
1424
1425     pa_assert(pd);
1426     pa_assert(u);
1427     pa_assert(u->pdispatch == pd);
1428
1429     if (command != PA_COMMAND_REPLY) {
1430         if (command == PA_COMMAND_ERROR)
1431             pa_log("Failed to create stream.");
1432         else
1433             pa_log("Protocol error.");
1434         goto fail;
1435     }
1436
1437     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1438         pa_tagstruct_getu32(t, &u->device_index) < 0
1439 #ifdef TUNNEL_SINK
1440         || pa_tagstruct_getu32(t, &bytes) < 0
1441 #endif
1442         )
1443         goto parse_error;
1444
1445     if (u->version >= 9) {
1446 #ifdef TUNNEL_SINK
1447         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1448             pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1449             pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1450             pa_tagstruct_getu32(t, &u->minreq) < 0)
1451             goto parse_error;
1452 #else
1453         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1454             pa_tagstruct_getu32(t, &u->fragsize) < 0)
1455             goto parse_error;
1456 #endif
1457     }
1458
1459     if (u->version >= 12) {
1460         pa_sample_spec ss;
1461         pa_channel_map cm;
1462         uint32_t device_index;
1463         const char *dn;
1464         pa_bool_t suspended;
1465
1466         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1467             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1468             pa_tagstruct_getu32(t, &device_index) < 0 ||
1469             pa_tagstruct_gets(t, &dn) < 0 ||
1470             pa_tagstruct_get_boolean(t, &suspended) < 0)
1471             goto parse_error;
1472
1473 #ifdef TUNNEL_SINK
1474         pa_xfree(u->sink_name);
1475         u->sink_name = pa_xstrdup(dn);
1476 #else
1477         pa_xfree(u->source_name);
1478         u->source_name = pa_xstrdup(dn);
1479 #endif
1480     }
1481
1482     if (u->version >= 13) {
1483         pa_usec_t usec;
1484
1485         if (pa_tagstruct_get_usec(t, &usec) < 0)
1486             goto parse_error;
1487
1488 /* #ifdef TUNNEL_SINK */
1489 /*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1490 /* #else */
1491 /*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1492 /* #endif */
1493     }
1494
1495     if (!pa_tagstruct_eof(t))
1496         goto parse_error;
1497
1498     start_subscribe(u);
1499     request_info(u);
1500
1501     pa_assert(!u->time_event);
1502     u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1503
1504     request_latency(u);
1505
1506     pa_log_debug("Stream created.");
1507
1508 #ifdef TUNNEL_SINK
1509     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1510 #endif
1511
1512     return;
1513
1514 parse_error:
1515     pa_log("Invalid reply. (Create stream)");
1516
1517 fail:
1518     pa_module_unload_request(u->module, TRUE);
1519
1520 }
1521
1522 /* Called from main context */
1523 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1524     struct userdata *u = userdata;
1525     pa_tagstruct *reply;
1526     char name[256], un[128], hn[128];
1527 #ifdef TUNNEL_SINK
1528     pa_cvolume volume;
1529 #endif
1530
1531     pa_assert(pd);
1532     pa_assert(u);
1533     pa_assert(u->pdispatch == pd);
1534
1535     if (command != PA_COMMAND_REPLY ||
1536         pa_tagstruct_getu32(t, &u->version) < 0 ||
1537         !pa_tagstruct_eof(t)) {
1538
1539         if (command == PA_COMMAND_ERROR)
1540             pa_log("Failed to authenticate");
1541         else
1542             pa_log("Protocol error.");
1543
1544         goto fail;
1545     }
1546
1547     /* Minimum supported protocol version */
1548     if (u->version < 8) {
1549         pa_log("Incompatible protocol version");
1550         goto fail;
1551     }
1552
1553     /* Starting with protocol version 13 the MSB of the version tag
1554     reflects if shm is enabled for this connection or not. We don't
1555     support SHM here at all, so we just ignore this. */
1556
1557     if (u->version >= 13)
1558         u->version &= 0x7FFFFFFFU;
1559
1560     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1561
1562 #ifdef TUNNEL_SINK
1563     pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1564     pa_sink_update_proplist(u->sink, 0, NULL);
1565
1566     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1567                 u->sink_name,
1568                 pa_get_user_name(un, sizeof(un)),
1569                 pa_get_host_name(hn, sizeof(hn)));
1570 #else
1571     pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1572     pa_source_update_proplist(u->source, 0, NULL);
1573
1574     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1575                 u->source_name,
1576                 pa_get_user_name(un, sizeof(un)),
1577                 pa_get_host_name(hn, sizeof(hn)));
1578 #endif
1579
1580     reply = pa_tagstruct_new(NULL, 0);
1581     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1582     pa_tagstruct_putu32(reply, u->ctag++);
1583
1584     if (u->version >= 13) {
1585         pa_proplist *pl;
1586         pl = pa_proplist_new();
1587         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1588         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1589         pa_init_proplist(pl);
1590         pa_tagstruct_put_proplist(reply, pl);
1591         pa_proplist_free(pl);
1592     } else
1593         pa_tagstruct_puts(reply, "PulseAudio");
1594
1595     pa_pstream_send_tagstruct(u->pstream, reply);
1596     /* We ignore the server's reply here */
1597
1598     reply = pa_tagstruct_new(NULL, 0);
1599
1600     if (u->version < 13)
1601         /* Only for older PA versions we need to fill in the maxlength */
1602         u->maxlength = 4*1024*1024;
1603
1604 #ifdef TUNNEL_SINK
1605     u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1606     u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1607     u->prebuf = u->tlength;
1608 #else
1609     u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1610 #endif
1611
1612 #ifdef TUNNEL_SINK
1613     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1614     pa_tagstruct_putu32(reply, tag = u->ctag++);
1615
1616     if (u->version < 13)
1617         pa_tagstruct_puts(reply, name);
1618
1619     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1620     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1621     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1622     pa_tagstruct_puts(reply, u->sink_name);
1623     pa_tagstruct_putu32(reply, u->maxlength);
1624     pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1625     pa_tagstruct_putu32(reply, u->tlength);
1626     pa_tagstruct_putu32(reply, u->prebuf);
1627     pa_tagstruct_putu32(reply, u->minreq);
1628     pa_tagstruct_putu32(reply, 0);
1629     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1630     pa_tagstruct_put_cvolume(reply, &volume);
1631 #else
1632     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1633     pa_tagstruct_putu32(reply, tag = u->ctag++);
1634
1635     if (u->version < 13)
1636         pa_tagstruct_puts(reply, name);
1637
1638     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1639     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1640     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1641     pa_tagstruct_puts(reply, u->source_name);
1642     pa_tagstruct_putu32(reply, u->maxlength);
1643     pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1644     pa_tagstruct_putu32(reply, u->fragsize);
1645 #endif
1646
1647     if (u->version >= 12) {
1648         pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1649         pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1650         pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1651         pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1652         pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1653         pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1654         pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1655     }
1656
1657     if (u->version >= 13) {
1658         pa_proplist *pl;
1659
1660         pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1661         pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1662
1663         pl = pa_proplist_new();
1664         pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1665         pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1666         pa_tagstruct_put_proplist(reply, pl);
1667         pa_proplist_free(pl);
1668
1669 #ifndef TUNNEL_SINK
1670         pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1671 #endif
1672     }
1673
1674     if (u->version >= 14) {
1675 #ifdef TUNNEL_SINK
1676         pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1677 #endif
1678         pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1679     }
1680
1681     if (u->version >= 15) {
1682 #ifdef TUNNEL_SINK
1683         pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1684 #endif
1685         pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1686         pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1687     }
1688
1689     pa_pstream_send_tagstruct(u->pstream, reply);
1690     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1691
1692     pa_log_debug("Connection authenticated, creating stream ...");
1693
1694     return;
1695
1696 fail:
1697     pa_module_unload_request(u->module, TRUE);
1698 }
1699
1700 /* Called from main context */
1701 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1702     struct userdata *u = userdata;
1703
1704     pa_assert(p);
1705     pa_assert(u);
1706
1707     pa_log_warn("Stream died.");
1708     pa_module_unload_request(u->module, TRUE);
1709 }
1710
1711 /* Called from main context */
1712 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1713     struct userdata *u = userdata;
1714
1715     pa_assert(p);
1716     pa_assert(packet);
1717     pa_assert(u);
1718
1719     if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1720         pa_log("Invalid packet");
1721         pa_module_unload_request(u->module, TRUE);
1722         return;
1723     }
1724 }
1725
1726 #ifndef TUNNEL_SINK
1727 /* Called from main context */
1728 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1729     struct userdata *u = userdata;
1730
1731     pa_assert(p);
1732     pa_assert(chunk);
1733     pa_assert(u);
1734
1735     if (channel != u->channel) {
1736         pa_log("Received memory block on bad channel.");
1737         pa_module_unload_request(u->module, TRUE);
1738         return;
1739     }
1740
1741     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1742
1743     u->counter_delta += (int64_t) chunk->length;
1744 }
1745 #endif
1746
1747 /* Called from main context */
1748 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1749     struct userdata *u = userdata;
1750     pa_tagstruct *t;
1751     uint32_t tag;
1752
1753     pa_assert(sc);
1754     pa_assert(u);
1755     pa_assert(u->client == sc);
1756
1757     pa_socket_client_unref(u->client);
1758     u->client = NULL;
1759
1760     if (!io) {
1761         pa_log("Connection failed: %s", pa_cstrerror(errno));
1762         pa_module_unload_request(u->module, TRUE);
1763         return;
1764     }
1765
1766     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1767     u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1768
1769     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1770     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1771 #ifndef TUNNEL_SINK
1772     pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1773 #endif
1774
1775     t = pa_tagstruct_new(NULL, 0);
1776     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1777     pa_tagstruct_putu32(t, tag = u->ctag++);
1778     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1779
1780     pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1781
1782 #ifdef HAVE_CREDS
1783 {
1784     pa_creds ucred;
1785
1786     if (pa_iochannel_creds_supported(io))
1787         pa_iochannel_creds_enable(io);
1788
1789     ucred.uid = getuid();
1790     ucred.gid = getgid();
1791
1792     pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1793 }
1794 #else
1795     pa_pstream_send_tagstruct(u->pstream, t);
1796 #endif
1797
1798     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1799
1800     pa_log_debug("Connection established, authenticating ...");
1801 }
1802
1803 #ifdef TUNNEL_SINK
1804
1805 /* Called from main context */
1806 static void sink_set_volume(pa_sink *sink) {
1807     struct userdata *u;
1808     pa_tagstruct *t;
1809
1810     pa_assert(sink);
1811     u = sink->userdata;
1812     pa_assert(u);
1813
1814     t = pa_tagstruct_new(NULL, 0);
1815     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1816     pa_tagstruct_putu32(t, u->ctag++);
1817     pa_tagstruct_putu32(t, u->device_index);
1818     pa_tagstruct_put_cvolume(t, &sink->real_volume);
1819     pa_pstream_send_tagstruct(u->pstream, t);
1820 }
1821
1822 /* Called from main context */
1823 static void sink_set_mute(pa_sink *sink) {
1824     struct userdata *u;
1825     pa_tagstruct *t;
1826
1827     pa_assert(sink);
1828     u = sink->userdata;
1829     pa_assert(u);
1830
1831     if (u->version < 11)
1832         return;
1833
1834     t = pa_tagstruct_new(NULL, 0);
1835     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1836     pa_tagstruct_putu32(t, u->ctag++);
1837     pa_tagstruct_putu32(t, u->device_index);
1838     pa_tagstruct_put_boolean(t, !!sink->muted);
1839     pa_pstream_send_tagstruct(u->pstream, t);
1840 }
1841
1842 #endif
1843
1844 int pa__init(pa_module*m) {
1845     pa_modargs *ma = NULL;
1846     struct userdata *u = NULL;
1847     pa_sample_spec ss;
1848     pa_channel_map map;
1849     char *dn = NULL;
1850 #ifdef TUNNEL_SINK
1851     pa_sink_new_data data;
1852 #else
1853     pa_source_new_data data;
1854 #endif
1855
1856     pa_assert(m);
1857
1858     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1859         pa_log("Failed to parse module arguments");
1860         goto fail;
1861     }
1862
1863     m->userdata = u = pa_xnew0(struct userdata, 1);
1864     u->core = m->core;
1865     u->module = m;
1866     u->client = NULL;
1867     u->pdispatch = NULL;
1868     u->pstream = NULL;
1869     u->server_name = NULL;
1870 #ifdef TUNNEL_SINK
1871     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1872     u->sink = NULL;
1873     u->requested_bytes = 0;
1874 #else
1875     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1876     u->source = NULL;
1877 #endif
1878     u->smoother = pa_smoother_new(
1879             PA_USEC_PER_SEC,
1880             PA_USEC_PER_SEC*2,
1881             TRUE,
1882             TRUE,
1883             10,
1884             pa_rtclock_now(),
1885             FALSE);
1886     u->ctag = 1;
1887     u->device_index = u->channel = PA_INVALID_INDEX;
1888     u->time_event = NULL;
1889     u->ignore_latency_before = 0;
1890     u->transport_usec = u->thread_transport_usec = 0;
1891     u->remote_suspended = u->remote_corked = FALSE;
1892     u->counter = u->counter_delta = 0;
1893
1894     u->rtpoll = pa_rtpoll_new();
1895     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1896
1897     if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1898         goto fail;
1899
1900     if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1901         pa_log("No server specified.");
1902         goto fail;
1903     }
1904
1905     ss = m->core->default_sample_spec;
1906     map = m->core->default_channel_map;
1907     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1908         pa_log("Invalid sample format specification");
1909         goto fail;
1910     }
1911
1912     if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1913         pa_log("Failed to connect to server '%s'", u->server_name);
1914         goto fail;
1915     }
1916
1917     pa_socket_client_set_callback(u->client, on_connection, u);
1918
1919 #ifdef TUNNEL_SINK
1920
1921     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1922         dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1923
1924     pa_sink_new_data_init(&data);
1925     data.driver = __FILE__;
1926     data.module = m;
1927     data.namereg_fail = TRUE;
1928     pa_sink_new_data_set_name(&data, dn);
1929     pa_sink_new_data_set_sample_spec(&data, &ss);
1930     pa_sink_new_data_set_channel_map(&data, &map);
1931     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1932     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1933     if (u->sink_name)
1934         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1935
1936     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1937         pa_log("Invalid properties");
1938         pa_sink_new_data_done(&data);
1939         goto fail;
1940     }
1941
1942     u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1943     pa_sink_new_data_done(&data);
1944
1945     if (!u->sink) {
1946         pa_log("Failed to create sink.");
1947         goto fail;
1948     }
1949
1950     u->sink->parent.process_msg = sink_process_msg;
1951     u->sink->userdata = u;
1952     u->sink->set_state = sink_set_state;
1953     u->sink->set_volume = sink_set_volume;
1954     u->sink->set_mute = sink_set_mute;
1955
1956     u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1957
1958 /*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1959
1960     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1961     pa_sink_set_rtpoll(u->sink, u->rtpoll);
1962
1963 #else
1964
1965     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1966         dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1967
1968     pa_source_new_data_init(&data);
1969     data.driver = __FILE__;
1970     data.module = m;
1971     data.namereg_fail = TRUE;
1972     pa_source_new_data_set_name(&data, dn);
1973     pa_source_new_data_set_sample_spec(&data, &ss);
1974     pa_source_new_data_set_channel_map(&data, &map);
1975     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1976     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1977     if (u->source_name)
1978         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1979
1980     if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1981         pa_log("Invalid properties");
1982         pa_source_new_data_done(&data);
1983         goto fail;
1984     }
1985
1986     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1987     pa_source_new_data_done(&data);
1988
1989     if (!u->source) {
1990         pa_log("Failed to create source.");
1991         goto fail;
1992     }
1993
1994     u->source->parent.process_msg = source_process_msg;
1995     u->source->set_state = source_set_state;
1996     u->source->userdata = u;
1997
1998 /*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1999
2000     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2001     pa_source_set_rtpoll(u->source, u->rtpoll);
2002
2003     u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2004 #endif
2005
2006     pa_xfree(dn);
2007
2008     u->time_event = NULL;
2009
2010     u->maxlength = (uint32_t) -1;
2011 #ifdef TUNNEL_SINK
2012     u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2013 #else
2014     u->fragsize = (uint32_t) -1;
2015 #endif
2016
2017     if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2018         pa_log("Failed to create thread.");
2019         goto fail;
2020     }
2021
2022 #ifdef TUNNEL_SINK
2023     pa_sink_put(u->sink);
2024 #else
2025     pa_source_put(u->source);
2026 #endif
2027
2028     pa_modargs_free(ma);
2029
2030     return 0;
2031
2032 fail:
2033     pa__done(m);
2034
2035     if (ma)
2036         pa_modargs_free(ma);
2037
2038     pa_xfree(dn);
2039
2040     return  -1;
2041 }
2042
2043 void pa__done(pa_module*m) {
2044     struct userdata* u;
2045
2046     pa_assert(m);
2047
2048     if (!(u = m->userdata))
2049         return;
2050
2051 #ifdef TUNNEL_SINK
2052     if (u->sink)
2053         pa_sink_unlink(u->sink);
2054 #else
2055     if (u->source)
2056         pa_source_unlink(u->source);
2057 #endif
2058
2059     if (u->thread) {
2060         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2061         pa_thread_free(u->thread);
2062     }
2063
2064     pa_thread_mq_done(&u->thread_mq);
2065
2066 #ifdef TUNNEL_SINK
2067     if (u->sink)
2068         pa_sink_unref(u->sink);
2069 #else
2070     if (u->source)
2071         pa_source_unref(u->source);
2072 #endif
2073
2074     if (u->rtpoll)
2075         pa_rtpoll_free(u->rtpoll);
2076
2077     if (u->pstream) {
2078         pa_pstream_unlink(u->pstream);
2079         pa_pstream_unref(u->pstream);
2080     }
2081
2082     if (u->pdispatch)
2083         pa_pdispatch_unref(u->pdispatch);
2084
2085     if (u->client)
2086         pa_socket_client_unref(u->client);
2087
2088     if (u->auth_cookie)
2089         pa_auth_cookie_unref(u->auth_cookie);
2090
2091     if (u->smoother)
2092         pa_smoother_free(u->smoother);
2093
2094     if (u->time_event)
2095         u->core->mainloop->time_free(u->time_event);
2096
2097 #ifndef TUNNEL_SINK
2098     if (u->mcalign)
2099         pa_mcalign_free(u->mcalign);
2100 #endif
2101
2102 #ifdef TUNNEL_SINK
2103     pa_xfree(u->sink_name);
2104 #else
2105     pa_xfree(u->source_name);
2106 #endif
2107     pa_xfree(u->server_name);
2108
2109     pa_xfree(u->device_description);
2110     pa_xfree(u->server_fqdn);
2111     pa_xfree(u->user_name);
2112
2113     pa_xfree(u);
2114 }