f0c254bcff7fbc3a98b7d49983d4b94c37237559
[platform/upstream/pulseaudio.git] / src / modules / module-tunnel.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/sink-input.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/socket-client.h>
50 #include <pulsecore/socket-util.h>
51 #include <pulsecore/time-smoother.h>
52 #include <pulsecore/thread.h>
53 #include <pulsecore/thread-mq.h>
54 #include <pulsecore/core-rtclock.h>
55 #include <pulsecore/core-error.h>
56 #include <pulsecore/proplist-util.h>
57 #include <pulsecore/auth-cookie.h>
58 #include <pulsecore/mcalign.h>
59
60 #ifdef TUNNEL_SINK
61 #include "module-tunnel-sink-symdef.h"
62 #else
63 #include "module-tunnel-source-symdef.h"
64 #endif
65
66 #ifdef TUNNEL_SINK
67 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 PA_MODULE_USAGE(
69         "sink_name=<name for the local sink> "
70         "sink_properties=<properties for the local sink> "
71         "server=<address> "
72         "sink=<remote sink name> "
73         "cookie=<filename> "
74         "format=<sample format> "
75         "channels=<number of channels> "
76         "rate=<sample rate> "
77         "channel_map=<channel map>");
78 #else
79 PA_MODULE_DESCRIPTION("Tunnel module for sources");
80 PA_MODULE_USAGE(
81         "source_name=<name for the local source> "
82         "source_properties=<properties for the local source> "
83         "server=<address> "
84         "source=<remote source name> "
85         "cookie=<filename> "
86         "format=<sample format> "
87         "channels=<number of channels> "
88         "rate=<sample rate> "
89         "channel_map=<channel map>");
90 #endif
91
92 PA_MODULE_AUTHOR("Lennart Poettering");
93 PA_MODULE_VERSION(PACKAGE_VERSION);
94 PA_MODULE_LOAD_ONCE(FALSE);
95
96 static const char* const valid_modargs[] = {
97     "server",
98     "cookie",
99     "format",
100     "channels",
101     "rate",
102 #ifdef TUNNEL_SINK
103     "sink_name",
104     "sink_properties",
105     "sink",
106 #else
107     "source_name",
108     "source_properties",
109     "source",
110 #endif
111     "channel_map",
112     NULL,
113 };
114
115 #define DEFAULT_TIMEOUT 5
116
117 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
118
119 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
120
121 #ifdef TUNNEL_SINK
122
123 enum {
124     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
125     SINK_MESSAGE_REMOTE_SUSPEND,
126     SINK_MESSAGE_UPDATE_LATENCY,
127     SINK_MESSAGE_POST
128 };
129
130 #define DEFAULT_TLENGTH_MSEC 150
131 #define DEFAULT_MINREQ_MSEC 25
132
133 #else
134
135 enum {
136     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
137     SOURCE_MESSAGE_REMOTE_SUSPEND,
138     SOURCE_MESSAGE_UPDATE_LATENCY
139 };
140
141 #define DEFAULT_FRAGSIZE_MSEC 25
142
143 #endif
144
145 #ifdef TUNNEL_SINK
146 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 #endif
149 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
155 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
156
157 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
158 #ifdef TUNNEL_SINK
159     [PA_COMMAND_REQUEST] = command_request,
160     [PA_COMMAND_STARTED] = command_started,
161 #endif
162     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
163     [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
164     [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
165     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
166     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
167     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
168     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
169     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
170     [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
171     [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
172     [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
173     [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
174     [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
175     [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
176 };
177
178 struct userdata {
179     pa_core *core;
180     pa_module *module;
181
182     pa_thread_mq thread_mq;
183     pa_rtpoll *rtpoll;
184     pa_thread *thread;
185
186     pa_socket_client *client;
187     pa_pstream *pstream;
188     pa_pdispatch *pdispatch;
189
190     char *server_name;
191 #ifdef TUNNEL_SINK
192     char *sink_name;
193     pa_sink *sink;
194     size_t requested_bytes;
195 #else
196     char *source_name;
197     pa_source *source;
198     pa_mcalign *mcalign;
199 #endif
200
201     pa_auth_cookie *auth_cookie;
202
203     uint32_t version;
204     uint32_t ctag;
205     uint32_t device_index;
206     uint32_t channel;
207
208     int64_t counter, counter_delta;
209
210     pa_bool_t remote_corked:1;
211     pa_bool_t remote_suspended:1;
212
213     pa_usec_t transport_usec; /* maintained in the main thread */
214     pa_usec_t thread_transport_usec; /* maintained in the IO thread */
215
216     uint32_t ignore_latency_before;
217
218     pa_time_event *time_event;
219
220     pa_smoother *smoother;
221
222     char *device_description;
223     char *server_fqdn;
224     char *user_name;
225
226     uint32_t maxlength;
227 #ifdef TUNNEL_SINK
228     uint32_t tlength;
229     uint32_t minreq;
230     uint32_t prebuf;
231 #else
232     uint32_t fragsize;
233 #endif
234 };
235
236 static void request_latency(struct userdata *u);
237
238 /* Called from main context */
239 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
240     pa_log_debug("Got stream or client event.");
241 }
242
243 /* Called from main context */
244 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
245     struct userdata *u = userdata;
246
247     pa_assert(pd);
248     pa_assert(t);
249     pa_assert(u);
250     pa_assert(u->pdispatch == pd);
251
252     pa_log_warn("Stream killed");
253     pa_module_unload_request(u->module, TRUE);
254 }
255
256 /* Called from main context */
257 static void command_overflow_or_underflow(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
258     struct userdata *u = userdata;
259
260     pa_assert(pd);
261     pa_assert(t);
262     pa_assert(u);
263     pa_assert(u->pdispatch == pd);
264
265     pa_log_info("Server signalled buffer overrun/underrun.");
266     request_latency(u);
267 }
268
269 /* Called from main context */
270 static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
271     struct userdata *u = userdata;
272     uint32_t channel;
273     pa_bool_t suspended;
274
275     pa_assert(pd);
276     pa_assert(t);
277     pa_assert(u);
278     pa_assert(u->pdispatch == pd);
279
280     if (pa_tagstruct_getu32(t, &channel) < 0 ||
281         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
282         !pa_tagstruct_eof(t)) {
283
284         pa_log("Invalid packet.");
285         pa_module_unload_request(u->module, TRUE);
286         return;
287     }
288
289     pa_log_debug("Server reports device suspend.");
290
291 #ifdef TUNNEL_SINK
292     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #else
294     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
295 #endif
296
297     request_latency(u);
298 }
299
300 /* Called from main context */
301 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
302     struct userdata *u = userdata;
303     uint32_t channel, di;
304     const char *dn;
305     pa_bool_t suspended;
306
307     pa_assert(pd);
308     pa_assert(t);
309     pa_assert(u);
310     pa_assert(u->pdispatch == pd);
311
312     if (pa_tagstruct_getu32(t, &channel) < 0 ||
313         pa_tagstruct_getu32(t, &di) < 0 ||
314         pa_tagstruct_gets(t, &dn) < 0 ||
315         pa_tagstruct_get_boolean(t, &suspended) < 0) {
316
317         pa_log_error("Invalid packet.");
318         pa_module_unload_request(u->module, TRUE);
319         return;
320     }
321
322     pa_log_debug("Server reports a stream move.");
323
324 #ifdef TUNNEL_SINK
325     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #else
327     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
328 #endif
329
330     request_latency(u);
331 }
332
333 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
334     struct userdata *u = userdata;
335     uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
336     pa_usec_t usec;
337
338     pa_assert(pd);
339     pa_assert(t);
340     pa_assert(u);
341     pa_assert(u->pdispatch == pd);
342
343     if (pa_tagstruct_getu32(t, &channel) < 0 ||
344         pa_tagstruct_getu32(t, &maxlength) < 0) {
345
346         pa_log_error("Invalid packet.");
347         pa_module_unload_request(u->module, TRUE);
348         return;
349     }
350
351     if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
352         if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
353             pa_tagstruct_get_usec(t, &usec) < 0) {
354
355             pa_log_error("Invalid packet.");
356             pa_module_unload_request(u->module, TRUE);
357             return;
358         }
359     } else {
360         if (pa_tagstruct_getu32(t, &tlength) < 0 ||
361             pa_tagstruct_getu32(t, &prebuf) < 0 ||
362             pa_tagstruct_getu32(t, &minreq) < 0 ||
363             pa_tagstruct_get_usec(t, &usec) < 0) {
364
365             pa_log_error("Invalid packet.");
366             pa_module_unload_request(u->module, TRUE);
367             return;
368         }
369     }
370
371 #ifdef TUNNEL_SINK
372     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
373 #endif
374
375     request_latency(u);
376 }
377
378 #ifdef TUNNEL_SINK
379
380 /* Called from main context */
381 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
382     struct userdata *u = userdata;
383
384     pa_assert(pd);
385     pa_assert(t);
386     pa_assert(u);
387     pa_assert(u->pdispatch == pd);
388
389     pa_log_debug("Server reports playback started.");
390     request_latency(u);
391 }
392
393 #endif
394
395 /* Called from IO thread context */
396 static void check_smoother_status(struct userdata *u, pa_bool_t past)  {
397     pa_usec_t x;
398
399     pa_assert(u);
400
401     x = pa_rtclock_now();
402
403     /* Correct by the time the requested issued needs to travel to the
404      * other side.  This is a valid thread-safe access, because the
405      * main thread is waiting for us */
406
407     if (past)
408         x -= u->thread_transport_usec;
409     else
410         x += u->thread_transport_usec;
411
412     if (u->remote_suspended || u->remote_corked)
413         pa_smoother_pause(u->smoother, x);
414     else
415         pa_smoother_resume(u->smoother, x, TRUE);
416 }
417
418 /* Called from IO thread context */
419 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
420     pa_assert(u);
421
422     if (u->remote_corked == cork)
423         return;
424
425     u->remote_corked = cork;
426     check_smoother_status(u, FALSE);
427 }
428
429 /* Called from main context */
430 static void stream_cork(struct userdata *u, pa_bool_t cork) {
431     pa_tagstruct *t;
432     pa_assert(u);
433
434     if (!u->pstream)
435         return;
436
437     t = pa_tagstruct_new(NULL, 0);
438 #ifdef TUNNEL_SINK
439     pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
440 #else
441     pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
442 #endif
443     pa_tagstruct_putu32(t, u->ctag++);
444     pa_tagstruct_putu32(t, u->channel);
445     pa_tagstruct_put_boolean(t, !!cork);
446     pa_pstream_send_tagstruct(u->pstream, t);
447
448     request_latency(u);
449 }
450
451 /* Called from IO thread context */
452 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
453     pa_assert(u);
454
455     if (u->remote_suspended == suspend)
456         return;
457
458     u->remote_suspended = suspend;
459     check_smoother_status(u, TRUE);
460 }
461
462 #ifdef TUNNEL_SINK
463
464 /* Called from IO thread context */
465 static void send_data(struct userdata *u) {
466     pa_assert(u);
467
468     while (u->requested_bytes > 0) {
469         pa_memchunk memchunk;
470
471         pa_sink_render(u->sink, u->requested_bytes, &memchunk);
472         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
473         pa_memblock_unref(memchunk.memblock);
474
475         u->requested_bytes -= memchunk.length;
476
477         u->counter += (int64_t) memchunk.length;
478     }
479 }
480
481 /* This function is called from IO context -- except when it is not. */
482 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
483     struct userdata *u = PA_SINK(o)->userdata;
484
485     switch (code) {
486
487         case PA_SINK_MESSAGE_SET_STATE: {
488             int r;
489
490             /* First, change the state, because otherwide pa_sink_render() would fail */
491             if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
492
493                 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
494
495                 if (PA_SINK_IS_OPENED(u->sink->state))
496                     send_data(u);
497             }
498
499             return r;
500         }
501
502         case PA_SINK_MESSAGE_GET_LATENCY: {
503             pa_usec_t yl, yr, *usec = data;
504
505             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
506             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
507
508             *usec = yl > yr ? yl - yr : 0;
509             return 0;
510         }
511
512         case SINK_MESSAGE_REQUEST:
513
514             pa_assert(offset > 0);
515             u->requested_bytes += (size_t) offset;
516
517             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
518                 send_data(u);
519
520             return 0;
521
522
523         case SINK_MESSAGE_REMOTE_SUSPEND:
524
525             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
526             return 0;
527
528
529         case SINK_MESSAGE_UPDATE_LATENCY: {
530             pa_usec_t y;
531
532             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
533
534             if (y > (pa_usec_t) offset)
535                 y -= (pa_usec_t) offset;
536             else
537                 y = 0;
538
539             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
540
541             /* We can access this freely here, since the main thread is waiting for us */
542             u->thread_transport_usec = u->transport_usec;
543
544             return 0;
545         }
546
547         case SINK_MESSAGE_POST:
548
549             /* OK, This might be a bit confusing. This message is
550              * delivered to us from the main context -- NOT from the
551              * IO thread context where the rest of the messages are
552              * dispatched. Yeah, ugly, but I am a lazy bastard. */
553
554             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
555
556             u->counter_delta += (int64_t) chunk->length;
557
558             return 0;
559     }
560
561     return pa_sink_process_msg(o, code, data, offset, chunk);
562 }
563
564 /* Called from main context */
565 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
566     struct userdata *u;
567     pa_sink_assert_ref(s);
568     u = s->userdata;
569
570     switch ((pa_sink_state_t) state) {
571
572         case PA_SINK_SUSPENDED:
573             pa_assert(PA_SINK_IS_OPENED(s->state));
574             stream_cork(u, TRUE);
575             break;
576
577         case PA_SINK_IDLE:
578         case PA_SINK_RUNNING:
579             if (s->state == PA_SINK_SUSPENDED)
580                 stream_cork(u, FALSE);
581             break;
582
583         case PA_SINK_UNLINKED:
584         case PA_SINK_INIT:
585         case PA_SINK_INVALID_STATE:
586             ;
587     }
588
589     return 0;
590 }
591
592 #else
593
594 /* This function is called from IO context -- except when it is not. */
595 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
596     struct userdata *u = PA_SOURCE(o)->userdata;
597
598     switch (code) {
599
600         case PA_SOURCE_MESSAGE_SET_STATE: {
601             int r;
602
603             if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
604                 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
605
606             return r;
607         }
608
609         case PA_SOURCE_MESSAGE_GET_LATENCY: {
610             pa_usec_t yr, yl, *usec = data;
611
612             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
613             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
614
615             *usec = yr > yl ? yr - yl : 0;
616             return 0;
617         }
618
619         case SOURCE_MESSAGE_POST: {
620             pa_memchunk c;
621
622             pa_mcalign_push(u->mcalign, chunk);
623
624             while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
625
626                 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
627                     pa_source_post(u->source, &c);
628
629                 pa_memblock_unref(c.memblock);
630
631                 u->counter += (int64_t) c.length;
632             }
633
634             return 0;
635         }
636
637         case SOURCE_MESSAGE_REMOTE_SUSPEND:
638
639             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
640             return 0;
641
642         case SOURCE_MESSAGE_UPDATE_LATENCY: {
643             pa_usec_t y;
644
645             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
646             y += (pa_usec_t) offset;
647
648             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
649
650             /* We can access this freely here, since the main thread is waiting for us */
651             u->thread_transport_usec = u->transport_usec;
652
653             return 0;
654         }
655     }
656
657     return pa_source_process_msg(o, code, data, offset, chunk);
658 }
659
660 /* Called from main context */
661 static int source_set_state(pa_source *s, pa_source_state_t state) {
662     struct userdata *u;
663     pa_source_assert_ref(s);
664     u = s->userdata;
665
666     switch ((pa_source_state_t) state) {
667
668         case PA_SOURCE_SUSPENDED:
669             pa_assert(PA_SOURCE_IS_OPENED(s->state));
670             stream_cork(u, TRUE);
671             break;
672
673         case PA_SOURCE_IDLE:
674         case PA_SOURCE_RUNNING:
675             if (s->state == PA_SOURCE_SUSPENDED)
676                 stream_cork(u, FALSE);
677             break;
678
679         case PA_SOURCE_UNLINKED:
680         case PA_SOURCE_INIT:
681         case PA_SINK_INVALID_STATE:
682             ;
683     }
684
685     return 0;
686 }
687
688 #endif
689
690 static void thread_func(void *userdata) {
691     struct userdata *u = userdata;
692
693     pa_assert(u);
694
695     pa_log_debug("Thread starting up");
696
697     pa_thread_mq_install(&u->thread_mq);
698
699     for (;;) {
700         int ret;
701
702 #ifdef TUNNEL_SINK
703         if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
704             if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
705                 pa_sink_process_rewind(u->sink, 0);
706 #endif
707
708         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
709             goto fail;
710
711         if (ret == 0)
712             goto finish;
713     }
714
715 fail:
716     /* If this was no regular exit from the loop we have to continue
717      * processing messages until we received PA_MESSAGE_SHUTDOWN */
718     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
719     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
720
721 finish:
722     pa_log_debug("Thread shutting down");
723 }
724
725 #ifdef TUNNEL_SINK
726 /* Called from main context */
727 static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
728     struct userdata *u = userdata;
729     uint32_t bytes, channel;
730
731     pa_assert(pd);
732     pa_assert(command == PA_COMMAND_REQUEST);
733     pa_assert(t);
734     pa_assert(u);
735     pa_assert(u->pdispatch == pd);
736
737     if (pa_tagstruct_getu32(t, &channel) < 0 ||
738         pa_tagstruct_getu32(t, &bytes) < 0) {
739         pa_log("Invalid protocol reply");
740         goto fail;
741     }
742
743     if (channel != u->channel) {
744         pa_log("Received data for invalid channel");
745         goto fail;
746     }
747
748     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
749     return;
750
751 fail:
752     pa_module_unload_request(u->module, TRUE);
753 }
754
755 #endif
756
757 /* Called from main context */
758 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
759     struct userdata *u = userdata;
760     pa_usec_t sink_usec, source_usec;
761     pa_bool_t playing;
762     int64_t write_index, read_index;
763     struct timeval local, remote, now;
764     pa_sample_spec *ss;
765     int64_t delay;
766
767     pa_assert(pd);
768     pa_assert(u);
769
770     if (command != PA_COMMAND_REPLY) {
771         if (command == PA_COMMAND_ERROR)
772             pa_log("Failed to get latency.");
773         else
774             pa_log("Protocol error.");
775         goto fail;
776     }
777
778     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
779         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
780         pa_tagstruct_get_boolean(t, &playing) < 0 ||
781         pa_tagstruct_get_timeval(t, &local) < 0 ||
782         pa_tagstruct_get_timeval(t, &remote) < 0 ||
783         pa_tagstruct_gets64(t, &write_index) < 0 ||
784         pa_tagstruct_gets64(t, &read_index) < 0) {
785         pa_log("Invalid reply.");
786         goto fail;
787     }
788
789 #ifdef TUNNEL_SINK
790     if (u->version >= 13) {
791         uint64_t underrun_for = 0, playing_for = 0;
792
793         if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
794             pa_tagstruct_getu64(t, &playing_for) < 0) {
795             pa_log("Invalid reply.");
796             goto fail;
797         }
798     }
799 #endif
800
801     if (!pa_tagstruct_eof(t)) {
802         pa_log("Invalid reply.");
803         goto fail;
804     }
805
806     if (tag < u->ignore_latency_before) {
807         return;
808     }
809
810     pa_gettimeofday(&now);
811
812     /* Calculate transport usec */
813     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
814         /* local and remote seem to have synchronized clocks */
815 #ifdef TUNNEL_SINK
816         u->transport_usec = pa_timeval_diff(&remote, &local);
817 #else
818         u->transport_usec = pa_timeval_diff(&now, &remote);
819 #endif
820     } else
821         u->transport_usec = pa_timeval_diff(&now, &local)/2;
822
823     /* First, take the device's delay */
824 #ifdef TUNNEL_SINK
825     delay = (int64_t) sink_usec;
826     ss = &u->sink->sample_spec;
827 #else
828     delay = (int64_t) source_usec;
829     ss = &u->source->sample_spec;
830 #endif
831
832     /* Add the length of our server-side buffer */
833     if (write_index >= read_index)
834         delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
835     else
836         delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
837
838     /* Our measurements are already out of date, hence correct by the     *
839      * transport latency */
840 #ifdef TUNNEL_SINK
841     delay -= (int64_t) u->transport_usec;
842 #else
843     delay += (int64_t) u->transport_usec;
844 #endif
845
846     /* Now correct by what we have have read/written since we requested the update */
847 #ifdef TUNNEL_SINK
848     delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
849 #else
850     delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
851 #endif
852
853 #ifdef TUNNEL_SINK
854     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
855 #else
856     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
857 #endif
858
859     return;
860
861 fail:
862
863     pa_module_unload_request(u->module, TRUE);
864 }
865
866 /* Called from main context */
867 static void request_latency(struct userdata *u) {
868     pa_tagstruct *t;
869     struct timeval now;
870     uint32_t tag;
871     pa_assert(u);
872
873     t = pa_tagstruct_new(NULL, 0);
874 #ifdef TUNNEL_SINK
875     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
876 #else
877     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
878 #endif
879     pa_tagstruct_putu32(t, tag = u->ctag++);
880     pa_tagstruct_putu32(t, u->channel);
881
882     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
883
884     pa_pstream_send_tagstruct(u->pstream, t);
885     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
886
887     u->ignore_latency_before = tag;
888     u->counter_delta = 0;
889 }
890
891 /* Called from main context */
892 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
893     struct userdata *u = userdata;
894
895     pa_assert(m);
896     pa_assert(e);
897     pa_assert(u);
898
899     request_latency(u);
900
901     pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
902 }
903
904 /* Called from main context */
905 static void update_description(struct userdata *u) {
906     char *d;
907     char un[128], hn[128];
908     pa_tagstruct *t;
909
910     pa_assert(u);
911
912     if (!u->server_fqdn || !u->user_name || !u->device_description)
913         return;
914
915     d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
916
917 #ifdef TUNNEL_SINK
918     pa_sink_set_description(u->sink, d);
919     pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
920     pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
921     pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
922 #else
923     pa_source_set_description(u->source, d);
924     pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
925     pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
926     pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
927 #endif
928
929     pa_xfree(d);
930
931     d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
932                           pa_get_user_name(un, sizeof(un)),
933                           pa_get_host_name(hn, sizeof(hn)));
934
935     t = pa_tagstruct_new(NULL, 0);
936 #ifdef TUNNEL_SINK
937     pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
938 #else
939     pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
940 #endif
941     pa_tagstruct_putu32(t, u->ctag++);
942     pa_tagstruct_putu32(t, u->channel);
943     pa_tagstruct_puts(t, d);
944     pa_pstream_send_tagstruct(u->pstream, t);
945
946     pa_xfree(d);
947 }
948
949 /* Called from main context */
950 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
951     struct userdata *u = userdata;
952     pa_sample_spec ss;
953     pa_channel_map cm;
954     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
955     uint32_t cookie;
956
957     pa_assert(pd);
958     pa_assert(u);
959
960     if (command != PA_COMMAND_REPLY) {
961         if (command == PA_COMMAND_ERROR)
962             pa_log("Failed to get info.");
963         else
964             pa_log("Protocol error.");
965         goto fail;
966     }
967
968     if (pa_tagstruct_gets(t, &server_name) < 0 ||
969         pa_tagstruct_gets(t, &server_version) < 0 ||
970         pa_tagstruct_gets(t, &user_name) < 0 ||
971         pa_tagstruct_gets(t, &host_name) < 0 ||
972         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
973         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
974         pa_tagstruct_gets(t, &default_source_name) < 0 ||
975         pa_tagstruct_getu32(t, &cookie) < 0 ||
976         (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
977
978         pa_log("Parse failure");
979         goto fail;
980     }
981
982     if (!pa_tagstruct_eof(t)) {
983         pa_log("Packet too long");
984         goto fail;
985     }
986
987     pa_xfree(u->server_fqdn);
988     u->server_fqdn = pa_xstrdup(host_name);
989
990     pa_xfree(u->user_name);
991     u->user_name = pa_xstrdup(user_name);
992
993     update_description(u);
994
995     return;
996
997 fail:
998     pa_module_unload_request(u->module, TRUE);
999 }
1000
1001 #ifdef TUNNEL_SINK
1002
1003 /* Called from main context */
1004 static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1005     struct userdata *u = userdata;
1006     uint32_t idx, owner_module, monitor_source, flags;
1007     const char *name, *description, *monitor_source_name, *driver;
1008     pa_sample_spec ss;
1009     pa_channel_map cm;
1010     pa_cvolume volume;
1011     pa_bool_t mute;
1012     pa_usec_t latency;
1013     pa_proplist *pl;
1014
1015     pa_assert(pd);
1016     pa_assert(u);
1017
1018     pl = pa_proplist_new();
1019
1020     if (command != PA_COMMAND_REPLY) {
1021         if (command == PA_COMMAND_ERROR)
1022             pa_log("Failed to get info.");
1023         else
1024             pa_log("Protocol error.");
1025         goto fail;
1026     }
1027
1028     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1029         pa_tagstruct_gets(t, &name) < 0 ||
1030         pa_tagstruct_gets(t, &description) < 0 ||
1031         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1032         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1033         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1034         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1035         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1036         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1037         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1038         pa_tagstruct_get_usec(t, &latency) < 0 ||
1039         pa_tagstruct_gets(t, &driver) < 0 ||
1040         pa_tagstruct_getu32(t, &flags) < 0) {
1041
1042         pa_log("Parse failure");
1043         goto fail;
1044     }
1045
1046     if (u->version >= 13) {
1047         pa_usec_t configured_latency;
1048
1049         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1050             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1051
1052             pa_log("Parse failure");
1053             goto fail;
1054         }
1055     }
1056
1057     if (u->version >= 15) {
1058         pa_volume_t base_volume;
1059         uint32_t state, n_volume_steps, card;
1060
1061         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1062             pa_tagstruct_getu32(t, &state) < 0 ||
1063             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1064             pa_tagstruct_getu32(t, &card) < 0) {
1065
1066             pa_log("Parse failure");
1067             goto fail;
1068         }
1069     }
1070
1071     if (u->version >= 16) {
1072         uint32_t n_ports;
1073         const char *s;
1074
1075         if (pa_tagstruct_getu32(t, &n_ports)) {
1076             pa_log("Parse failure");
1077             goto fail;
1078         }
1079
1080         for (uint32_t j = 0; j < n_ports; j++) {
1081             uint32_t priority;
1082
1083             if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1084                 pa_tagstruct_gets(t, &s) < 0 || /* description */
1085                 pa_tagstruct_getu32(t, &priority) < 0) {
1086
1087                 pa_log("Parse failure");
1088                 goto fail;
1089             }
1090         }
1091
1092         if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1093             pa_log("Parse failure");
1094             goto fail;
1095         }
1096     }
1097
1098     if (!pa_tagstruct_eof(t)) {
1099         pa_log("Packet too long");
1100         goto fail;
1101     }
1102
1103     pa_proplist_free(pl);
1104
1105     if (!u->sink_name || strcmp(name, u->sink_name))
1106         return;
1107
1108     pa_xfree(u->device_description);
1109     u->device_description = pa_xstrdup(description);
1110
1111     update_description(u);
1112
1113     return;
1114
1115 fail:
1116     pa_module_unload_request(u->module, TRUE);
1117     pa_proplist_free(pl);
1118 }
1119
1120 /* Called from main context */
1121 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1122     struct userdata *u = userdata;
1123     uint32_t idx, owner_module, client, sink;
1124     pa_usec_t buffer_usec, sink_usec;
1125     const char *name, *driver, *resample_method;
1126     pa_bool_t mute = FALSE;
1127     pa_sample_spec sample_spec;
1128     pa_channel_map channel_map;
1129     pa_cvolume volume;
1130     pa_proplist *pl;
1131
1132     pa_assert(pd);
1133     pa_assert(u);
1134
1135     pl = pa_proplist_new();
1136
1137     if (command != PA_COMMAND_REPLY) {
1138         if (command == PA_COMMAND_ERROR)
1139             pa_log("Failed to get info.");
1140         else
1141             pa_log("Protocol error.");
1142         goto fail;
1143     }
1144
1145     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1146         pa_tagstruct_gets(t, &name) < 0 ||
1147         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1148         pa_tagstruct_getu32(t, &client) < 0 ||
1149         pa_tagstruct_getu32(t, &sink) < 0 ||
1150         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1151         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1152         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1153         pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1154         pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1155         pa_tagstruct_gets(t, &resample_method) < 0 ||
1156         pa_tagstruct_gets(t, &driver) < 0) {
1157
1158         pa_log("Parse failure");
1159         goto fail;
1160     }
1161
1162     if (u->version >= 11) {
1163         if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1164
1165             pa_log("Parse failure");
1166             goto fail;
1167         }
1168     }
1169
1170     if (u->version >= 13) {
1171         if (pa_tagstruct_get_proplist(t, pl) < 0) {
1172
1173             pa_log("Parse failure");
1174             goto fail;
1175         }
1176     }
1177
1178     if (!pa_tagstruct_eof(t)) {
1179         pa_log("Packet too long");
1180         goto fail;
1181     }
1182
1183     pa_proplist_free(pl);
1184
1185     if (idx != u->device_index)
1186         return;
1187
1188     pa_assert(u->sink);
1189
1190     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1191         pa_cvolume_equal(&volume, &u->sink->real_volume))
1192         return;
1193
1194     pa_sink_volume_changed(u->sink, &volume);
1195
1196     if (u->version >= 11)
1197         pa_sink_mute_changed(u->sink, mute);
1198
1199     return;
1200
1201 fail:
1202     pa_module_unload_request(u->module, TRUE);
1203     pa_proplist_free(pl);
1204 }
1205
1206 #else
1207
1208 /* Called from main context */
1209 static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1210     struct userdata *u = userdata;
1211     uint32_t idx, owner_module, monitor_of_sink, flags;
1212     const char *name, *description, *monitor_of_sink_name, *driver;
1213     pa_sample_spec ss;
1214     pa_channel_map cm;
1215     pa_cvolume volume;
1216     pa_bool_t mute;
1217     pa_usec_t latency, configured_latency;
1218     pa_proplist *pl;
1219
1220     pa_assert(pd);
1221     pa_assert(u);
1222
1223     pl = pa_proplist_new();
1224
1225     if (command != PA_COMMAND_REPLY) {
1226         if (command == PA_COMMAND_ERROR)
1227             pa_log("Failed to get info.");
1228         else
1229             pa_log("Protocol error.");
1230         goto fail;
1231     }
1232
1233     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1234         pa_tagstruct_gets(t, &name) < 0 ||
1235         pa_tagstruct_gets(t, &description) < 0 ||
1236         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1237         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1238         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1239         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1240         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1241         pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1242         pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1243         pa_tagstruct_get_usec(t, &latency) < 0 ||
1244         pa_tagstruct_gets(t, &driver) < 0 ||
1245         pa_tagstruct_getu32(t, &flags) < 0) {
1246
1247         pa_log("Parse failure");
1248         goto fail;
1249     }
1250
1251     if (u->version >= 13) {
1252         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1253             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1254
1255             pa_log("Parse failure");
1256             goto fail;
1257         }
1258     }
1259
1260     if (u->version >= 15) {
1261         pa_volume_t base_volume;
1262         uint32_t state, n_volume_steps, card;
1263
1264         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1265             pa_tagstruct_getu32(t, &state) < 0 ||
1266             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1267             pa_tagstruct_getu32(t, &card) < 0) {
1268
1269             pa_log("Parse failure");
1270             goto fail;
1271         }
1272     }
1273
1274     if (u->version >= 16) {
1275         uint32_t n_ports;
1276         const char *s;
1277
1278         if (pa_tagstruct_getu32(t, &n_ports)) {
1279             pa_log("Parse failure");
1280             goto fail;
1281         }
1282
1283         for (uint32_t j = 0; j < n_ports; j++) {
1284             uint32_t priority;
1285
1286             if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1287                 pa_tagstruct_gets(t, &s) < 0 || /* description */
1288                 pa_tagstruct_getu32(t, &priority) < 0) {
1289
1290                 pa_log("Parse failure");
1291                 goto fail;
1292             }
1293         }
1294
1295         if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1296             pa_log("Parse failure");
1297             goto fail;
1298         }
1299     }
1300
1301     if (!pa_tagstruct_eof(t)) {
1302         pa_log("Packet too long");
1303         goto fail;
1304     }
1305
1306     pa_proplist_free(pl);
1307
1308     if (!u->source_name || strcmp(name, u->source_name))
1309         return;
1310
1311     pa_xfree(u->device_description);
1312     u->device_description = pa_xstrdup(description);
1313
1314     update_description(u);
1315
1316     return;
1317
1318 fail:
1319     pa_module_unload_request(u->module, TRUE);
1320     pa_proplist_free(pl);
1321 }
1322
1323 #endif
1324
1325 /* Called from main context */
1326 static void request_info(struct userdata *u) {
1327     pa_tagstruct *t;
1328     uint32_t tag;
1329     pa_assert(u);
1330
1331     t = pa_tagstruct_new(NULL, 0);
1332     pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1333     pa_tagstruct_putu32(t, tag = u->ctag++);
1334     pa_pstream_send_tagstruct(u->pstream, t);
1335     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1336
1337 #ifdef TUNNEL_SINK
1338     t = pa_tagstruct_new(NULL, 0);
1339     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1340     pa_tagstruct_putu32(t, tag = u->ctag++);
1341     pa_tagstruct_putu32(t, u->device_index);
1342     pa_pstream_send_tagstruct(u->pstream, t);
1343     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1344
1345     if (u->sink_name) {
1346         t = pa_tagstruct_new(NULL, 0);
1347         pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1348         pa_tagstruct_putu32(t, tag = u->ctag++);
1349         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1350         pa_tagstruct_puts(t, u->sink_name);
1351         pa_pstream_send_tagstruct(u->pstream, t);
1352         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1353     }
1354 #else
1355     if (u->source_name) {
1356         t = pa_tagstruct_new(NULL, 0);
1357         pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1358         pa_tagstruct_putu32(t, tag = u->ctag++);
1359         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1360         pa_tagstruct_puts(t, u->source_name);
1361         pa_pstream_send_tagstruct(u->pstream, t);
1362         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1363     }
1364 #endif
1365 }
1366
1367 /* Called from main context */
1368 static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1369     struct userdata *u = userdata;
1370     pa_subscription_event_type_t e;
1371     uint32_t idx;
1372
1373     pa_assert(pd);
1374     pa_assert(t);
1375     pa_assert(u);
1376     pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1377
1378     if (pa_tagstruct_getu32(t, &e) < 0 ||
1379         pa_tagstruct_getu32(t, &idx) < 0) {
1380         pa_log("Invalid protocol reply");
1381         pa_module_unload_request(u->module, TRUE);
1382         return;
1383     }
1384
1385     if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1386 #ifdef TUNNEL_SINK
1387         e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1388         e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1389 #else
1390         e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1391 #endif
1392         )
1393         return;
1394
1395     request_info(u);
1396 }
1397
1398 /* Called from main context */
1399 static void start_subscribe(struct userdata *u) {
1400     pa_tagstruct *t;
1401     pa_assert(u);
1402
1403     t = pa_tagstruct_new(NULL, 0);
1404     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1405     pa_tagstruct_putu32(t, u->ctag++);
1406     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1407 #ifdef TUNNEL_SINK
1408                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1409 #else
1410                         PA_SUBSCRIPTION_MASK_SOURCE
1411 #endif
1412                         );
1413
1414     pa_pstream_send_tagstruct(u->pstream, t);
1415 }
1416
1417 /* Called from main context */
1418 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1419     struct userdata *u = userdata;
1420 #ifdef TUNNEL_SINK
1421     uint32_t bytes;
1422 #endif
1423
1424     pa_assert(pd);
1425     pa_assert(u);
1426     pa_assert(u->pdispatch == pd);
1427
1428     if (command != PA_COMMAND_REPLY) {
1429         if (command == PA_COMMAND_ERROR)
1430             pa_log("Failed to create stream.");
1431         else
1432             pa_log("Protocol error.");
1433         goto fail;
1434     }
1435
1436     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1437         pa_tagstruct_getu32(t, &u->device_index) < 0
1438 #ifdef TUNNEL_SINK
1439         || pa_tagstruct_getu32(t, &bytes) < 0
1440 #endif
1441         )
1442         goto parse_error;
1443
1444     if (u->version >= 9) {
1445 #ifdef TUNNEL_SINK
1446         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1447             pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1448             pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1449             pa_tagstruct_getu32(t, &u->minreq) < 0)
1450             goto parse_error;
1451 #else
1452         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1453             pa_tagstruct_getu32(t, &u->fragsize) < 0)
1454             goto parse_error;
1455 #endif
1456     }
1457
1458     if (u->version >= 12) {
1459         pa_sample_spec ss;
1460         pa_channel_map cm;
1461         uint32_t device_index;
1462         const char *dn;
1463         pa_bool_t suspended;
1464
1465         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1466             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1467             pa_tagstruct_getu32(t, &device_index) < 0 ||
1468             pa_tagstruct_gets(t, &dn) < 0 ||
1469             pa_tagstruct_get_boolean(t, &suspended) < 0)
1470             goto parse_error;
1471
1472 #ifdef TUNNEL_SINK
1473         pa_xfree(u->sink_name);
1474         u->sink_name = pa_xstrdup(dn);
1475 #else
1476         pa_xfree(u->source_name);
1477         u->source_name = pa_xstrdup(dn);
1478 #endif
1479     }
1480
1481     if (u->version >= 13) {
1482         pa_usec_t usec;
1483
1484         if (pa_tagstruct_get_usec(t, &usec) < 0)
1485             goto parse_error;
1486
1487 /* #ifdef TUNNEL_SINK */
1488 /*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1489 /* #else */
1490 /*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1491 /* #endif */
1492     }
1493
1494     if (!pa_tagstruct_eof(t))
1495         goto parse_error;
1496
1497     start_subscribe(u);
1498     request_info(u);
1499
1500     pa_assert(!u->time_event);
1501     u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1502
1503     request_latency(u);
1504
1505     pa_log_debug("Stream created.");
1506
1507 #ifdef TUNNEL_SINK
1508     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1509 #endif
1510
1511     return;
1512
1513 parse_error:
1514     pa_log("Invalid reply. (Create stream)");
1515
1516 fail:
1517     pa_module_unload_request(u->module, TRUE);
1518
1519 }
1520
1521 /* Called from main context */
1522 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1523     struct userdata *u = userdata;
1524     pa_tagstruct *reply;
1525     char name[256], un[128], hn[128];
1526 #ifdef TUNNEL_SINK
1527     pa_cvolume volume;
1528 #endif
1529
1530     pa_assert(pd);
1531     pa_assert(u);
1532     pa_assert(u->pdispatch == pd);
1533
1534     if (command != PA_COMMAND_REPLY ||
1535         pa_tagstruct_getu32(t, &u->version) < 0 ||
1536         !pa_tagstruct_eof(t)) {
1537
1538         if (command == PA_COMMAND_ERROR)
1539             pa_log("Failed to authenticate");
1540         else
1541             pa_log("Protocol error.");
1542
1543         goto fail;
1544     }
1545
1546     /* Minimum supported protocol version */
1547     if (u->version < 8) {
1548         pa_log("Incompatible protocol version");
1549         goto fail;
1550     }
1551
1552     /* Starting with protocol version 13 the MSB of the version tag
1553     reflects if shm is enabled for this connection or not. We don't
1554     support SHM here at all, so we just ignore this. */
1555
1556     if (u->version >= 13)
1557         u->version &= 0x7FFFFFFFU;
1558
1559     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1560
1561 #ifdef TUNNEL_SINK
1562     pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1563     pa_sink_update_proplist(u->sink, 0, NULL);
1564
1565     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1566                 u->sink_name,
1567                 pa_get_user_name(un, sizeof(un)),
1568                 pa_get_host_name(hn, sizeof(hn)));
1569 #else
1570     pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1571     pa_source_update_proplist(u->source, 0, NULL);
1572
1573     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1574                 u->source_name,
1575                 pa_get_user_name(un, sizeof(un)),
1576                 pa_get_host_name(hn, sizeof(hn)));
1577 #endif
1578
1579     reply = pa_tagstruct_new(NULL, 0);
1580     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1581     pa_tagstruct_putu32(reply, u->ctag++);
1582
1583     if (u->version >= 13) {
1584         pa_proplist *pl;
1585         pl = pa_proplist_new();
1586         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1587         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1588         pa_init_proplist(pl);
1589         pa_tagstruct_put_proplist(reply, pl);
1590         pa_proplist_free(pl);
1591     } else
1592         pa_tagstruct_puts(reply, "PulseAudio");
1593
1594     pa_pstream_send_tagstruct(u->pstream, reply);
1595     /* We ignore the server's reply here */
1596
1597     reply = pa_tagstruct_new(NULL, 0);
1598
1599     if (u->version < 13)
1600         /* Only for older PA versions we need to fill in the maxlength */
1601         u->maxlength = 4*1024*1024;
1602
1603 #ifdef TUNNEL_SINK
1604     u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1605     u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1606     u->prebuf = u->tlength;
1607 #else
1608     u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1609 #endif
1610
1611 #ifdef TUNNEL_SINK
1612     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1613     pa_tagstruct_putu32(reply, tag = u->ctag++);
1614
1615     if (u->version < 13)
1616         pa_tagstruct_puts(reply, name);
1617
1618     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1619     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1620     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1621     pa_tagstruct_puts(reply, u->sink_name);
1622     pa_tagstruct_putu32(reply, u->maxlength);
1623     pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1624     pa_tagstruct_putu32(reply, u->tlength);
1625     pa_tagstruct_putu32(reply, u->prebuf);
1626     pa_tagstruct_putu32(reply, u->minreq);
1627     pa_tagstruct_putu32(reply, 0);
1628     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1629     pa_tagstruct_put_cvolume(reply, &volume);
1630 #else
1631     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1632     pa_tagstruct_putu32(reply, tag = u->ctag++);
1633
1634     if (u->version < 13)
1635         pa_tagstruct_puts(reply, name);
1636
1637     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1638     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1639     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1640     pa_tagstruct_puts(reply, u->source_name);
1641     pa_tagstruct_putu32(reply, u->maxlength);
1642     pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1643     pa_tagstruct_putu32(reply, u->fragsize);
1644 #endif
1645
1646     if (u->version >= 12) {
1647         pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1648         pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1649         pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1650         pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1651         pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1652         pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1653         pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1654     }
1655
1656     if (u->version >= 13) {
1657         pa_proplist *pl;
1658
1659         pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1660         pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1661
1662         pl = pa_proplist_new();
1663         pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1664         pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1665         pa_tagstruct_put_proplist(reply, pl);
1666         pa_proplist_free(pl);
1667
1668 #ifndef TUNNEL_SINK
1669         pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1670 #endif
1671     }
1672
1673     if (u->version >= 14) {
1674 #ifdef TUNNEL_SINK
1675         pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1676 #endif
1677         pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1678     }
1679
1680     if (u->version >= 15) {
1681 #ifdef TUNNEL_SINK
1682         pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1683 #endif
1684         pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1685         pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1686     }
1687
1688     pa_pstream_send_tagstruct(u->pstream, reply);
1689     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1690
1691     pa_log_debug("Connection authenticated, creating stream ...");
1692
1693     return;
1694
1695 fail:
1696     pa_module_unload_request(u->module, TRUE);
1697 }
1698
1699 /* Called from main context */
1700 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1701     struct userdata *u = userdata;
1702
1703     pa_assert(p);
1704     pa_assert(u);
1705
1706     pa_log_warn("Stream died.");
1707     pa_module_unload_request(u->module, TRUE);
1708 }
1709
1710 /* Called from main context */
1711 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1712     struct userdata *u = userdata;
1713
1714     pa_assert(p);
1715     pa_assert(packet);
1716     pa_assert(u);
1717
1718     if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1719         pa_log("Invalid packet");
1720         pa_module_unload_request(u->module, TRUE);
1721         return;
1722     }
1723 }
1724
1725 #ifndef TUNNEL_SINK
1726 /* Called from main context */
1727 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1728     struct userdata *u = userdata;
1729
1730     pa_assert(p);
1731     pa_assert(chunk);
1732     pa_assert(u);
1733
1734     if (channel != u->channel) {
1735         pa_log("Received memory block on bad channel.");
1736         pa_module_unload_request(u->module, TRUE);
1737         return;
1738     }
1739
1740     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1741
1742     u->counter_delta += (int64_t) chunk->length;
1743 }
1744 #endif
1745
1746 /* Called from main context */
1747 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1748     struct userdata *u = userdata;
1749     pa_tagstruct *t;
1750     uint32_t tag;
1751
1752     pa_assert(sc);
1753     pa_assert(u);
1754     pa_assert(u->client == sc);
1755
1756     pa_socket_client_unref(u->client);
1757     u->client = NULL;
1758
1759     if (!io) {
1760         pa_log("Connection failed: %s", pa_cstrerror(errno));
1761         pa_module_unload_request(u->module, TRUE);
1762         return;
1763     }
1764
1765     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1766     u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1767
1768     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1769     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1770 #ifndef TUNNEL_SINK
1771     pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1772 #endif
1773
1774     t = pa_tagstruct_new(NULL, 0);
1775     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1776     pa_tagstruct_putu32(t, tag = u->ctag++);
1777     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1778
1779     pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1780
1781 #ifdef HAVE_CREDS
1782 {
1783     pa_creds ucred;
1784
1785     if (pa_iochannel_creds_supported(io))
1786         pa_iochannel_creds_enable(io);
1787
1788     ucred.uid = getuid();
1789     ucred.gid = getgid();
1790
1791     pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1792 }
1793 #else
1794     pa_pstream_send_tagstruct(u->pstream, t);
1795 #endif
1796
1797     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1798
1799     pa_log_debug("Connection established, authenticating ...");
1800 }
1801
1802 #ifdef TUNNEL_SINK
1803
1804 /* Called from main context */
1805 static void sink_set_volume(pa_sink *sink) {
1806     struct userdata *u;
1807     pa_tagstruct *t;
1808
1809     pa_assert(sink);
1810     u = sink->userdata;
1811     pa_assert(u);
1812
1813     t = pa_tagstruct_new(NULL, 0);
1814     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1815     pa_tagstruct_putu32(t, u->ctag++);
1816     pa_tagstruct_putu32(t, u->device_index);
1817     pa_tagstruct_put_cvolume(t, &sink->real_volume);
1818     pa_pstream_send_tagstruct(u->pstream, t);
1819 }
1820
1821 /* Called from main context */
1822 static void sink_set_mute(pa_sink *sink) {
1823     struct userdata *u;
1824     pa_tagstruct *t;
1825
1826     pa_assert(sink);
1827     u = sink->userdata;
1828     pa_assert(u);
1829
1830     if (u->version < 11)
1831         return;
1832
1833     t = pa_tagstruct_new(NULL, 0);
1834     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1835     pa_tagstruct_putu32(t, u->ctag++);
1836     pa_tagstruct_putu32(t, u->device_index);
1837     pa_tagstruct_put_boolean(t, !!sink->muted);
1838     pa_pstream_send_tagstruct(u->pstream, t);
1839 }
1840
1841 #endif
1842
1843 int pa__init(pa_module*m) {
1844     pa_modargs *ma = NULL;
1845     struct userdata *u = NULL;
1846     pa_sample_spec ss;
1847     pa_channel_map map;
1848     char *dn = NULL;
1849 #ifdef TUNNEL_SINK
1850     pa_sink_new_data data;
1851 #else
1852     pa_source_new_data data;
1853 #endif
1854
1855     pa_assert(m);
1856
1857     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1858         pa_log("Failed to parse module arguments");
1859         goto fail;
1860     }
1861
1862     m->userdata = u = pa_xnew0(struct userdata, 1);
1863     u->core = m->core;
1864     u->module = m;
1865     u->client = NULL;
1866     u->pdispatch = NULL;
1867     u->pstream = NULL;
1868     u->server_name = NULL;
1869 #ifdef TUNNEL_SINK
1870     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1871     u->sink = NULL;
1872     u->requested_bytes = 0;
1873 #else
1874     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1875     u->source = NULL;
1876 #endif
1877     u->smoother = pa_smoother_new(
1878             PA_USEC_PER_SEC,
1879             PA_USEC_PER_SEC*2,
1880             TRUE,
1881             TRUE,
1882             10,
1883             pa_rtclock_now(),
1884             FALSE);
1885     u->ctag = 1;
1886     u->device_index = u->channel = PA_INVALID_INDEX;
1887     u->time_event = NULL;
1888     u->ignore_latency_before = 0;
1889     u->transport_usec = u->thread_transport_usec = 0;
1890     u->remote_suspended = u->remote_corked = FALSE;
1891     u->counter = u->counter_delta = 0;
1892
1893     u->rtpoll = pa_rtpoll_new();
1894     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1895
1896     if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1897         goto fail;
1898
1899     if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1900         pa_log("No server specified.");
1901         goto fail;
1902     }
1903
1904     ss = m->core->default_sample_spec;
1905     map = m->core->default_channel_map;
1906     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1907         pa_log("Invalid sample format specification");
1908         goto fail;
1909     }
1910
1911     if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1912         pa_log("Failed to connect to server '%s'", u->server_name);
1913         goto fail;
1914     }
1915
1916     pa_socket_client_set_callback(u->client, on_connection, u);
1917
1918 #ifdef TUNNEL_SINK
1919
1920     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1921         dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
1922
1923     pa_sink_new_data_init(&data);
1924     data.driver = __FILE__;
1925     data.module = m;
1926     data.namereg_fail = TRUE;
1927     pa_sink_new_data_set_name(&data, dn);
1928     pa_sink_new_data_set_sample_spec(&data, &ss);
1929     pa_sink_new_data_set_channel_map(&data, &map);
1930     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1931     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1932     if (u->sink_name)
1933         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1934
1935     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1936         pa_log("Invalid properties");
1937         pa_sink_new_data_done(&data);
1938         goto fail;
1939     }
1940
1941     u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1942     pa_sink_new_data_done(&data);
1943
1944     if (!u->sink) {
1945         pa_log("Failed to create sink.");
1946         goto fail;
1947     }
1948
1949     u->sink->parent.process_msg = sink_process_msg;
1950     u->sink->userdata = u;
1951     u->sink->set_state = sink_set_state;
1952     u->sink->set_volume = sink_set_volume;
1953     u->sink->set_mute = sink_set_mute;
1954
1955     u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1956
1957 /*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1958
1959     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1960     pa_sink_set_rtpoll(u->sink, u->rtpoll);
1961
1962 #else
1963
1964     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1965         dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
1966
1967     pa_source_new_data_init(&data);
1968     data.driver = __FILE__;
1969     data.module = m;
1970     data.namereg_fail = TRUE;
1971     pa_source_new_data_set_name(&data, dn);
1972     pa_source_new_data_set_sample_spec(&data, &ss);
1973     pa_source_new_data_set_channel_map(&data, &map);
1974     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1975     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1976     if (u->source_name)
1977         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1978
1979     if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1980         pa_log("Invalid properties");
1981         pa_source_new_data_done(&data);
1982         goto fail;
1983     }
1984
1985     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1986     pa_source_new_data_done(&data);
1987
1988     if (!u->source) {
1989         pa_log("Failed to create source.");
1990         goto fail;
1991     }
1992
1993     u->source->parent.process_msg = source_process_msg;
1994     u->source->set_state = source_set_state;
1995     u->source->userdata = u;
1996
1997 /*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1998
1999     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2000     pa_source_set_rtpoll(u->source, u->rtpoll);
2001
2002     u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2003 #endif
2004
2005     pa_xfree(dn);
2006
2007     u->time_event = NULL;
2008
2009     u->maxlength = (uint32_t) -1;
2010 #ifdef TUNNEL_SINK
2011     u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2012 #else
2013     u->fragsize = (uint32_t) -1;
2014 #endif
2015
2016     if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2017         pa_log("Failed to create thread.");
2018         goto fail;
2019     }
2020
2021 #ifdef TUNNEL_SINK
2022     pa_sink_put(u->sink);
2023 #else
2024     pa_source_put(u->source);
2025 #endif
2026
2027     pa_modargs_free(ma);
2028
2029     return 0;
2030
2031 fail:
2032     pa__done(m);
2033
2034     if (ma)
2035         pa_modargs_free(ma);
2036
2037     pa_xfree(dn);
2038
2039     return  -1;
2040 }
2041
2042 void pa__done(pa_module*m) {
2043     struct userdata* u;
2044
2045     pa_assert(m);
2046
2047     if (!(u = m->userdata))
2048         return;
2049
2050 #ifdef TUNNEL_SINK
2051     if (u->sink)
2052         pa_sink_unlink(u->sink);
2053 #else
2054     if (u->source)
2055         pa_source_unlink(u->source);
2056 #endif
2057
2058     if (u->thread) {
2059         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2060         pa_thread_free(u->thread);
2061     }
2062
2063     pa_thread_mq_done(&u->thread_mq);
2064
2065 #ifdef TUNNEL_SINK
2066     if (u->sink)
2067         pa_sink_unref(u->sink);
2068 #else
2069     if (u->source)
2070         pa_source_unref(u->source);
2071 #endif
2072
2073     if (u->rtpoll)
2074         pa_rtpoll_free(u->rtpoll);
2075
2076     if (u->pstream) {
2077         pa_pstream_unlink(u->pstream);
2078         pa_pstream_unref(u->pstream);
2079     }
2080
2081     if (u->pdispatch)
2082         pa_pdispatch_unref(u->pdispatch);
2083
2084     if (u->client)
2085         pa_socket_client_unref(u->client);
2086
2087     if (u->auth_cookie)
2088         pa_auth_cookie_unref(u->auth_cookie);
2089
2090     if (u->smoother)
2091         pa_smoother_free(u->smoother);
2092
2093     if (u->time_event)
2094         u->core->mainloop->time_free(u->time_event);
2095
2096 #ifndef TUNNEL_SINK
2097     if (u->mcalign)
2098         pa_mcalign_free(u->mcalign);
2099 #endif
2100
2101 #ifdef TUNNEL_SINK
2102     pa_xfree(u->sink_name);
2103 #else
2104     pa_xfree(u->source_name);
2105 #endif
2106     pa_xfree(u->server_name);
2107
2108     pa_xfree(u->device_description);
2109     pa_xfree(u->server_fqdn);
2110     pa_xfree(u->user_name);
2111
2112     pa_xfree(u);
2113 }