Sending translation for Finnish
[profile/ivi/pulseaudio.git] / src / modules / module-tunnel.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67         "server=<address> "
68         "sink=<remote sink name> "
69         "cookie=<filename> "
70         "format=<sample format> "
71         "channels=<number of channels> "
72         "rate=<sample rate> "
73         "sink_name=<name for the local sink> "
74         "channel_map=<channel map>");
75 #else
76 PA_MODULE_DESCRIPTION("Tunnel module for sources");
77 PA_MODULE_USAGE(
78         "server=<address> "
79         "source=<remote source name> "
80         "cookie=<filename> "
81         "format=<sample format> "
82         "channels=<number of channels> "
83         "rate=<sample rate> "
84         "source_name=<name for the local source> "
85         "channel_map=<channel map>");
86 #endif
87
88 PA_MODULE_AUTHOR("Lennart Poettering");
89 PA_MODULE_VERSION(PACKAGE_VERSION);
90 PA_MODULE_LOAD_ONCE(FALSE);
91
92 static const char* const valid_modargs[] = {
93     "server",
94     "cookie",
95     "format",
96     "channels",
97     "rate",
98 #ifdef TUNNEL_SINK
99     "sink_name",
100     "sink",
101 #else
102     "source_name",
103     "source",
104 #endif
105     "channel_map",
106     NULL,
107 };
108
109 #define DEFAULT_TIMEOUT 5
110
111 #define LATENCY_INTERVAL 10
112
113 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
114
115 #ifdef TUNNEL_SINK
116
117 enum {
118     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
119     SINK_MESSAGE_REMOTE_SUSPEND,
120     SINK_MESSAGE_UPDATE_LATENCY,
121     SINK_MESSAGE_POST
122 };
123
124 #define DEFAULT_TLENGTH_MSEC 150
125 #define DEFAULT_MINREQ_MSEC 25
126
127 #else
128
129 enum {
130     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
131     SOURCE_MESSAGE_REMOTE_SUSPEND,
132     SOURCE_MESSAGE_UPDATE_LATENCY
133 };
134
135 #define DEFAULT_FRAGSIZE_MSEC 25
136
137 #endif
138
139 #ifdef TUNNEL_SINK
140 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
141 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
142 #endif
143 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
144 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150
151 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
152 #ifdef TUNNEL_SINK
153     [PA_COMMAND_REQUEST] = command_request,
154     [PA_COMMAND_STARTED] = command_started,
155 #endif
156     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
157     [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
158     [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
159     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
160     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
161     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
162     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
163     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
164     [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
165     [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
166     [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
167     [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
168     [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
169     [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
170 };
171
172 struct userdata {
173     pa_core *core;
174     pa_module *module;
175
176     pa_thread_mq thread_mq;
177     pa_rtpoll *rtpoll;
178     pa_thread *thread;
179
180     pa_socket_client *client;
181     pa_pstream *pstream;
182     pa_pdispatch *pdispatch;
183
184     char *server_name;
185 #ifdef TUNNEL_SINK
186     char *sink_name;
187     pa_sink *sink;
188     size_t requested_bytes;
189 #else
190     char *source_name;
191     pa_source *source;
192 #endif
193
194     pa_auth_cookie *auth_cookie;
195
196     uint32_t version;
197     uint32_t ctag;
198     uint32_t device_index;
199     uint32_t channel;
200
201     int64_t counter, counter_delta;
202
203     pa_bool_t remote_corked:1;
204     pa_bool_t remote_suspended:1;
205
206     pa_usec_t transport_usec; /* maintained in the main thread */
207     pa_usec_t thread_transport_usec; /* maintained in the IO thread */
208
209     uint32_t ignore_latency_before;
210
211     pa_time_event *time_event;
212
213     pa_smoother *smoother;
214
215     char *device_description;
216     char *server_fqdn;
217     char *user_name;
218
219     uint32_t maxlength;
220 #ifdef TUNNEL_SINK
221     uint32_t tlength;
222     uint32_t minreq;
223     uint32_t prebuf;
224 #else
225     uint32_t fragsize;
226 #endif
227 };
228
229 static void request_latency(struct userdata *u);
230
231 /* Called from main context */
232 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
233     pa_log_debug("Got stream or client event.");
234 }
235
236 /* Called from main context */
237 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
238     struct userdata *u = userdata;
239
240     pa_assert(pd);
241     pa_assert(t);
242     pa_assert(u);
243     pa_assert(u->pdispatch == pd);
244
245     pa_log_warn("Stream killed");
246     pa_module_unload_request(u->module, TRUE);
247 }
248
249 /* Called from main context */
250 static void command_overflow_or_underflow(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
251     struct userdata *u = userdata;
252
253     pa_assert(pd);
254     pa_assert(t);
255     pa_assert(u);
256     pa_assert(u->pdispatch == pd);
257
258     pa_log_info("Server signalled buffer overrun/underrun.");
259     request_latency(u);
260 }
261
262 /* Called from main context */
263 static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
264     struct userdata *u = userdata;
265     uint32_t channel;
266     pa_bool_t suspended;
267
268     pa_assert(pd);
269     pa_assert(t);
270     pa_assert(u);
271     pa_assert(u->pdispatch == pd);
272
273     if (pa_tagstruct_getu32(t, &channel) < 0 ||
274         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
275         !pa_tagstruct_eof(t)) {
276
277         pa_log("Invalid packet.");
278         pa_module_unload_request(u->module, TRUE);
279         return;
280     }
281
282     pa_log_debug("Server reports device suspend.");
283
284 #ifdef TUNNEL_SINK
285     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
286 #else
287     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
288 #endif
289
290     request_latency(u);
291 }
292
293 /* Called from main context */
294 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
295     struct userdata *u = userdata;
296     uint32_t channel, di;
297     const char *dn;
298     pa_bool_t suspended;
299
300     pa_assert(pd);
301     pa_assert(t);
302     pa_assert(u);
303     pa_assert(u->pdispatch == pd);
304
305     if (pa_tagstruct_getu32(t, &channel) < 0 ||
306         pa_tagstruct_getu32(t, &di) < 0 ||
307         pa_tagstruct_gets(t, &dn) < 0 ||
308         pa_tagstruct_get_boolean(t, &suspended) < 0) {
309
310         pa_log_error("Invalid packet.");
311         pa_module_unload_request(u->module, TRUE);
312         return;
313     }
314
315     pa_log_debug("Server reports a stream move.");
316
317 #ifdef TUNNEL_SINK
318     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
319 #else
320     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
321 #endif
322
323     request_latency(u);
324 }
325
326 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
327     struct userdata *u = userdata;
328     uint32_t channel, maxlength, tlength, fragsize, prebuf, minreq;
329     pa_usec_t usec;
330
331     pa_assert(pd);
332     pa_assert(t);
333     pa_assert(u);
334     pa_assert(u->pdispatch == pd);
335
336     if (pa_tagstruct_getu32(t, &channel) < 0 ||
337         pa_tagstruct_getu32(t, &maxlength) < 0) {
338
339         pa_log_error("Invalid packet.");
340         pa_module_unload_request(u->module, TRUE);
341         return;
342     }
343
344     if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
345         if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
346             pa_tagstruct_get_usec(t, &usec) < 0) {
347
348             pa_log_error("Invalid packet.");
349             pa_module_unload_request(u->module, TRUE);
350             return;
351         }
352     } else {
353         if (pa_tagstruct_getu32(t, &tlength) < 0 ||
354             pa_tagstruct_getu32(t, &prebuf) < 0 ||
355             pa_tagstruct_getu32(t, &minreq) < 0 ||
356             pa_tagstruct_get_usec(t, &usec) < 0) {
357
358             pa_log_error("Invalid packet.");
359             pa_module_unload_request(u->module, TRUE);
360             return;
361         }
362     }
363
364 #ifdef TUNNEL_SINK
365     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
366 #endif
367
368     request_latency(u);
369 }
370
371 #ifdef TUNNEL_SINK
372
373 /* Called from main context */
374 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
375    struct userdata *u = userdata;
376
377     pa_assert(pd);
378     pa_assert(t);
379     pa_assert(u);
380     pa_assert(u->pdispatch == pd);
381
382     pa_log_debug("Server reports playback started.");
383     request_latency(u);
384 }
385
386 #endif
387
388 /* Called from IO thread context */
389 static void check_smoother_status(struct userdata *u, pa_bool_t past)  {
390     pa_usec_t x;
391
392     pa_assert(u);
393
394     x = pa_rtclock_usec();
395
396     /* Correct by the time the requested issued needs to travel to the
397      * other side.  This is a valid thread-safe access, because the
398      * main thread is waiting for us */
399
400     if (past)
401         x -= u->thread_transport_usec;
402     else
403         x += u->thread_transport_usec;
404
405     if (u->remote_suspended || u->remote_corked)
406         pa_smoother_pause(u->smoother, x);
407     else
408         pa_smoother_resume(u->smoother, x);
409 }
410
411 /* Called from IO thread context */
412 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
413     pa_assert(u);
414
415     if (u->remote_corked == cork)
416         return;
417
418     u->remote_corked = cork;
419     check_smoother_status(u, FALSE);
420 }
421
422 /* Called from main context */
423 static void stream_cork(struct userdata *u, pa_bool_t cork) {
424     pa_tagstruct *t;
425     pa_assert(u);
426
427     if (!u->pstream)
428         return;
429
430     t = pa_tagstruct_new(NULL, 0);
431 #ifdef TUNNEL_SINK
432     pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
433 #else
434     pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
435 #endif
436     pa_tagstruct_putu32(t, u->ctag++);
437     pa_tagstruct_putu32(t, u->channel);
438     pa_tagstruct_put_boolean(t, !!cork);
439     pa_pstream_send_tagstruct(u->pstream, t);
440
441     request_latency(u);
442 }
443
444 /* Called from IO thread context */
445 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
446     pa_assert(u);
447
448     if (u->remote_suspended == suspend)
449         return;
450
451     u->remote_suspended = suspend;
452     check_smoother_status(u, TRUE);
453 }
454
455 #ifdef TUNNEL_SINK
456
457 /* Called from IO thread context */
458 static void send_data(struct userdata *u) {
459     pa_assert(u);
460
461     while (u->requested_bytes > 0) {
462         pa_memchunk memchunk;
463
464         pa_sink_render(u->sink, u->requested_bytes, &memchunk);
465         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
466         pa_memblock_unref(memchunk.memblock);
467
468         u->requested_bytes -= memchunk.length;
469
470         u->counter += (int64_t) memchunk.length;
471     }
472 }
473
474 /* This function is called from IO context -- except when it is not. */
475 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
476     struct userdata *u = PA_SINK(o)->userdata;
477
478     switch (code) {
479
480         case PA_SINK_MESSAGE_SET_STATE: {
481             int r;
482
483             /* First, change the state, because otherwide pa_sink_render() would fail */
484             if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
485
486                 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
487
488                 if (PA_SINK_IS_OPENED(u->sink->state))
489                     send_data(u);
490             }
491
492             return r;
493         }
494
495         case PA_SINK_MESSAGE_GET_LATENCY: {
496             pa_usec_t yl, yr, *usec = data;
497
498             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
499             yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
500
501             *usec = yl > yr ? yl - yr : 0;
502             return 0;
503         }
504
505         case SINK_MESSAGE_REQUEST:
506
507             pa_assert(offset > 0);
508             u->requested_bytes += (size_t) offset;
509
510             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
511                 send_data(u);
512
513             return 0;
514
515
516         case SINK_MESSAGE_REMOTE_SUSPEND:
517
518             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
519             return 0;
520
521
522         case SINK_MESSAGE_UPDATE_LATENCY: {
523             pa_usec_t y;
524
525             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
526
527             if (y > (pa_usec_t) offset)
528                 y -= (pa_usec_t) offset;
529             else
530                 y = 0;
531
532             pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
533
534             /* We can access this freely here, since the main thread is waiting for us */
535             u->thread_transport_usec = u->transport_usec;
536
537             return 0;
538         }
539
540         case SINK_MESSAGE_POST:
541
542             /* OK, This might be a bit confusing. This message is
543              * delivered to us from the main context -- NOT from the
544              * IO thread context where the rest of the messages are
545              * dispatched. Yeah, ugly, but I am a lazy bastard. */
546
547             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
548
549             u->counter_delta += (int64_t) chunk->length;
550
551             return 0;
552     }
553
554     return pa_sink_process_msg(o, code, data, offset, chunk);
555 }
556
557 /* Called from main context */
558 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
559     struct userdata *u;
560     pa_sink_assert_ref(s);
561     u = s->userdata;
562
563     switch ((pa_sink_state_t) state) {
564
565         case PA_SINK_SUSPENDED:
566             pa_assert(PA_SINK_IS_OPENED(s->state));
567             stream_cork(u, TRUE);
568             break;
569
570         case PA_SINK_IDLE:
571         case PA_SINK_RUNNING:
572             if (s->state == PA_SINK_SUSPENDED)
573                 stream_cork(u, FALSE);
574             break;
575
576         case PA_SINK_UNLINKED:
577         case PA_SINK_INIT:
578         case PA_SINK_INVALID_STATE:
579             ;
580     }
581
582     return 0;
583 }
584
585 #else
586
587 /* This function is called from IO context -- except when it is not. */
588 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
589     struct userdata *u = PA_SOURCE(o)->userdata;
590
591     switch (code) {
592
593         case PA_SOURCE_MESSAGE_SET_STATE: {
594             int r;
595
596             if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
597                 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
598
599             return r;
600         }
601
602         case PA_SOURCE_MESSAGE_GET_LATENCY: {
603             pa_usec_t yr, yl, *usec = data;
604
605             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
606             yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
607
608             *usec = yr > yl ? yr - yl : 0;
609             return 0;
610         }
611
612         case SOURCE_MESSAGE_POST:
613
614             if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
615                 pa_source_post(u->source, chunk);
616
617             u->counter += (int64_t) chunk->length;
618
619             return 0;
620
621         case SOURCE_MESSAGE_REMOTE_SUSPEND:
622
623             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
624             return 0;
625
626         case SOURCE_MESSAGE_UPDATE_LATENCY: {
627             pa_usec_t y;
628
629             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
630             y += (pa_usec_t) offset;
631
632             pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
633
634             /* We can access this freely here, since the main thread is waiting for us */
635             u->thread_transport_usec = u->transport_usec;
636
637             return 0;
638         }
639     }
640
641     return pa_source_process_msg(o, code, data, offset, chunk);
642 }
643
644 /* Called from main context */
645 static int source_set_state(pa_source *s, pa_source_state_t state) {
646     struct userdata *u;
647     pa_source_assert_ref(s);
648     u = s->userdata;
649
650     switch ((pa_source_state_t) state) {
651
652         case PA_SOURCE_SUSPENDED:
653             pa_assert(PA_SOURCE_IS_OPENED(s->state));
654             stream_cork(u, TRUE);
655             break;
656
657         case PA_SOURCE_IDLE:
658         case PA_SOURCE_RUNNING:
659             if (s->state == PA_SOURCE_SUSPENDED)
660                 stream_cork(u, FALSE);
661             break;
662
663         case PA_SOURCE_UNLINKED:
664         case PA_SOURCE_INIT:
665         case PA_SINK_INVALID_STATE:
666             ;
667     }
668
669     return 0;
670 }
671
672 #endif
673
674 static void thread_func(void *userdata) {
675     struct userdata *u = userdata;
676
677     pa_assert(u);
678
679     pa_log_debug("Thread starting up");
680
681     pa_thread_mq_install(&u->thread_mq);
682     pa_rtpoll_install(u->rtpoll);
683
684     for (;;) {
685         int ret;
686
687 #ifdef TUNNEL_SINK
688         if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
689             if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
690                 pa_sink_process_rewind(u->sink, 0);
691 #endif
692
693         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
694             goto fail;
695
696         if (ret == 0)
697             goto finish;
698     }
699
700 fail:
701     /* If this was no regular exit from the loop we have to continue
702      * processing messages until we received PA_MESSAGE_SHUTDOWN */
703     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
704     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
705
706 finish:
707     pa_log_debug("Thread shutting down");
708 }
709
710 #ifdef TUNNEL_SINK
711 /* Called from main context */
712 static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
713     struct userdata *u = userdata;
714     uint32_t bytes, channel;
715
716     pa_assert(pd);
717     pa_assert(command == PA_COMMAND_REQUEST);
718     pa_assert(t);
719     pa_assert(u);
720     pa_assert(u->pdispatch == pd);
721
722     if (pa_tagstruct_getu32(t, &channel) < 0 ||
723         pa_tagstruct_getu32(t, &bytes) < 0) {
724         pa_log("Invalid protocol reply");
725         goto fail;
726     }
727
728     if (channel != u->channel) {
729         pa_log("Recieved data for invalid channel");
730         goto fail;
731     }
732
733     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
734     return;
735
736 fail:
737     pa_module_unload_request(u->module, TRUE);
738 }
739
740 #endif
741
742 /* Called from main context */
743 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
744     struct userdata *u = userdata;
745     pa_usec_t sink_usec, source_usec;
746     pa_bool_t playing;
747     int64_t write_index, read_index;
748     struct timeval local, remote, now;
749     pa_sample_spec *ss;
750     int64_t delay;
751
752     pa_assert(pd);
753     pa_assert(u);
754
755     if (command != PA_COMMAND_REPLY) {
756         if (command == PA_COMMAND_ERROR)
757             pa_log("Failed to get latency.");
758         else
759             pa_log("Protocol error.");
760         goto fail;
761     }
762
763     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
764         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
765         pa_tagstruct_get_boolean(t, &playing) < 0 ||
766         pa_tagstruct_get_timeval(t, &local) < 0 ||
767         pa_tagstruct_get_timeval(t, &remote) < 0 ||
768         pa_tagstruct_gets64(t, &write_index) < 0 ||
769         pa_tagstruct_gets64(t, &read_index) < 0) {
770         pa_log("Invalid reply.");
771         goto fail;
772     }
773
774 #ifdef TUNNEL_SINK
775     if (u->version >= 13) {
776         uint64_t underrun_for = 0, playing_for = 0;
777
778         if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
779             pa_tagstruct_getu64(t, &playing_for) < 0) {
780             pa_log("Invalid reply.");
781             goto fail;
782         }
783     }
784 #endif
785
786     if (!pa_tagstruct_eof(t)) {
787         pa_log("Invalid reply.");
788         goto fail;
789     }
790
791     if (tag < u->ignore_latency_before) {
792         return;
793     }
794
795     pa_gettimeofday(&now);
796
797     /* Calculate transport usec */
798     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
799         /* local and remote seem to have synchronized clocks */
800 #ifdef TUNNEL_SINK
801         u->transport_usec = pa_timeval_diff(&remote, &local);
802 #else
803         u->transport_usec = pa_timeval_diff(&now, &remote);
804 #endif
805     } else
806         u->transport_usec = pa_timeval_diff(&now, &local)/2;
807
808     /* First, take the device's delay */
809 #ifdef TUNNEL_SINK
810     delay = (int64_t) sink_usec;
811     ss = &u->sink->sample_spec;
812 #else
813     delay = (int64_t) source_usec;
814     ss = &u->source->sample_spec;
815 #endif
816
817     /* Add the length of our server-side buffer */
818     if (write_index >= read_index)
819         delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
820     else
821         delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
822
823     /* Our measurements are already out of date, hence correct by the     *
824      * transport latency */
825 #ifdef TUNNEL_SINK
826     delay -= (int64_t) u->transport_usec;
827 #else
828     delay += (int64_t) u->transport_usec;
829 #endif
830
831     /* Now correct by what we have have read/written since we requested the update */
832 #ifdef TUNNEL_SINK
833     delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
834 #else
835     delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
836 #endif
837
838 #ifdef TUNNEL_SINK
839     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
840 #else
841     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
842 #endif
843
844     return;
845
846 fail:
847
848     pa_module_unload_request(u->module, TRUE);
849 }
850
851 /* Called from main context */
852 static void request_latency(struct userdata *u) {
853     pa_tagstruct *t;
854     struct timeval now;
855     uint32_t tag;
856     pa_assert(u);
857
858     t = pa_tagstruct_new(NULL, 0);
859 #ifdef TUNNEL_SINK
860     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
861 #else
862     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
863 #endif
864     pa_tagstruct_putu32(t, tag = u->ctag++);
865     pa_tagstruct_putu32(t, u->channel);
866
867     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
868
869     pa_pstream_send_tagstruct(u->pstream, t);
870     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
871
872     u->ignore_latency_before = tag;
873     u->counter_delta = 0;
874 }
875
876 /* Called from main context */
877 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e,  const struct timeval *tv, void *userdata) {
878     struct userdata *u = userdata;
879     struct timeval ntv;
880
881     pa_assert(m);
882     pa_assert(e);
883     pa_assert(u);
884
885     request_latency(u);
886
887     pa_gettimeofday(&ntv);
888     ntv.tv_sec += LATENCY_INTERVAL;
889     m->time_restart(e, &ntv);
890 }
891
892 /* Called from main context */
893 static void update_description(struct userdata *u) {
894     char *d;
895     char un[128], hn[128];
896     pa_tagstruct *t;
897
898     pa_assert(u);
899
900     if (!u->server_fqdn || !u->user_name || !u->device_description)
901         return;
902
903     d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
904
905 #ifdef TUNNEL_SINK
906     pa_sink_set_description(u->sink, d);
907     pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
908     pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
909     pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
910 #else
911     pa_source_set_description(u->source, d);
912     pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
913     pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
914     pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
915 #endif
916
917     pa_xfree(d);
918
919     d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
920                           pa_get_user_name(un, sizeof(un)),
921                           pa_get_host_name(hn, sizeof(hn)));
922
923     t = pa_tagstruct_new(NULL, 0);
924 #ifdef TUNNEL_SINK
925     pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
926 #else
927     pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
928 #endif
929     pa_tagstruct_putu32(t, u->ctag++);
930     pa_tagstruct_putu32(t, u->channel);
931     pa_tagstruct_puts(t, d);
932     pa_pstream_send_tagstruct(u->pstream, t);
933
934     pa_xfree(d);
935 }
936
937 /* Called from main context */
938 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
939     struct userdata *u = userdata;
940     pa_sample_spec ss;
941     pa_channel_map cm;
942     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
943     uint32_t cookie;
944
945     pa_assert(pd);
946     pa_assert(u);
947
948     if (command != PA_COMMAND_REPLY) {
949         if (command == PA_COMMAND_ERROR)
950             pa_log("Failed to get info.");
951         else
952             pa_log("Protocol error.");
953         goto fail;
954     }
955
956     if (pa_tagstruct_gets(t, &server_name) < 0 ||
957         pa_tagstruct_gets(t, &server_version) < 0 ||
958         pa_tagstruct_gets(t, &user_name) < 0 ||
959         pa_tagstruct_gets(t, &host_name) < 0 ||
960         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
961         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
962         pa_tagstruct_gets(t, &default_source_name) < 0 ||
963         pa_tagstruct_getu32(t, &cookie) < 0 ||
964         (u->version >= 15 &&
965          pa_tagstruct_get_channel_map(t, &cm) < 0)) {
966
967         pa_log("Parse failure");
968         goto fail;
969     }
970
971     if (!pa_tagstruct_eof(t)) {
972         pa_log("Packet too long");
973         goto fail;
974     }
975
976     pa_xfree(u->server_fqdn);
977     u->server_fqdn = pa_xstrdup(host_name);
978
979     pa_xfree(u->user_name);
980     u->user_name = pa_xstrdup(user_name);
981
982     update_description(u);
983
984     return;
985
986 fail:
987     pa_module_unload_request(u->module, TRUE);
988 }
989
990 #ifdef TUNNEL_SINK
991
992 /* Called from main context */
993 static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
994     struct userdata *u = userdata;
995     uint32_t idx, owner_module, monitor_source, flags;
996     const char *name, *description, *monitor_source_name, *driver;
997     pa_sample_spec ss;
998     pa_channel_map cm;
999     pa_cvolume volume;
1000     pa_bool_t mute;
1001     pa_usec_t latency;
1002     pa_proplist *pl;
1003
1004     pa_assert(pd);
1005     pa_assert(u);
1006
1007     pl = pa_proplist_new();
1008
1009     if (command != PA_COMMAND_REPLY) {
1010         if (command == PA_COMMAND_ERROR)
1011             pa_log("Failed to get info.");
1012         else
1013             pa_log("Protocol error.");
1014         goto fail;
1015     }
1016
1017     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1018         pa_tagstruct_gets(t, &name) < 0 ||
1019         pa_tagstruct_gets(t, &description) < 0 ||
1020         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1021         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1022         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1023         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1024         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1025         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1026         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1027         pa_tagstruct_get_usec(t, &latency) < 0 ||
1028         pa_tagstruct_gets(t, &driver) < 0 ||
1029         pa_tagstruct_getu32(t, &flags) < 0) {
1030
1031         pa_log("Parse failure");
1032         goto fail;
1033     }
1034
1035     if (u->version >= 13) {
1036         pa_usec_t configured_latency;
1037
1038         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1039             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1040
1041             pa_log("Parse failure");
1042             goto fail;
1043         }
1044     }
1045
1046     if (u->version >= 15) {
1047         pa_volume_t base_volume;
1048         uint32_t state, n_volume_steps, card;
1049
1050         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1051             pa_tagstruct_getu32(t, &state) < 0 ||
1052             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1053             pa_tagstruct_getu32(t, &card) < 0) {
1054
1055             pa_log("Parse failure");
1056             goto fail;
1057         }
1058     }
1059
1060     if (!pa_tagstruct_eof(t)) {
1061         pa_log("Packet too long");
1062         goto fail;
1063     }
1064
1065     pa_proplist_free(pl);
1066
1067     if (!u->sink_name || strcmp(name, u->sink_name))
1068         return;
1069
1070     pa_xfree(u->device_description);
1071     u->device_description = pa_xstrdup(description);
1072
1073     update_description(u);
1074
1075     return;
1076
1077 fail:
1078     pa_module_unload_request(u->module, TRUE);
1079     pa_proplist_free(pl);
1080 }
1081
1082 /* Called from main context */
1083 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1084     struct userdata *u = userdata;
1085     uint32_t idx, owner_module, client, sink;
1086     pa_usec_t buffer_usec, sink_usec;
1087     const char *name, *driver, *resample_method;
1088     pa_bool_t mute;
1089     pa_sample_spec sample_spec;
1090     pa_channel_map channel_map;
1091     pa_cvolume volume;
1092     pa_proplist *pl;
1093
1094     pa_assert(pd);
1095     pa_assert(u);
1096
1097     pl = pa_proplist_new();
1098
1099     if (command != PA_COMMAND_REPLY) {
1100         if (command == PA_COMMAND_ERROR)
1101             pa_log("Failed to get info.");
1102         else
1103             pa_log("Protocol error.");
1104         goto fail;
1105     }
1106
1107     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1108         pa_tagstruct_gets(t, &name) < 0 ||
1109         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1110         pa_tagstruct_getu32(t, &client) < 0 ||
1111         pa_tagstruct_getu32(t, &sink) < 0 ||
1112         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1113         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1114         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1115         pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1116         pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1117         pa_tagstruct_gets(t, &resample_method) < 0 ||
1118         pa_tagstruct_gets(t, &driver) < 0) {
1119
1120         pa_log("Parse failure");
1121         goto fail;
1122     }
1123
1124     if (u->version >= 11) {
1125         if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1126
1127             pa_log("Parse failure");
1128             goto fail;
1129         }
1130     }
1131
1132     if (u->version >= 13) {
1133         if (pa_tagstruct_get_proplist(t, pl) < 0) {
1134
1135             pa_log("Parse failure");
1136             goto fail;
1137         }
1138     }
1139
1140     if (!pa_tagstruct_eof(t)) {
1141         pa_log("Packet too long");
1142         goto fail;
1143     }
1144
1145     pa_proplist_free(pl);
1146
1147     if (idx != u->device_index)
1148         return;
1149
1150     pa_assert(u->sink);
1151
1152     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1153         pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1154         return;
1155
1156     pa_sink_volume_changed(u->sink, &volume);
1157
1158     if (u->version >= 11)
1159         pa_sink_mute_changed(u->sink, mute);
1160
1161     return;
1162
1163 fail:
1164     pa_module_unload_request(u->module, TRUE);
1165     pa_proplist_free(pl);
1166 }
1167
1168 #else
1169
1170 /* Called from main context */
1171 static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1172     struct userdata *u = userdata;
1173     uint32_t idx, owner_module, monitor_of_sink, flags;
1174     const char *name, *description, *monitor_of_sink_name, *driver;
1175     pa_sample_spec ss;
1176     pa_channel_map cm;
1177     pa_cvolume volume;
1178     pa_bool_t mute;
1179     pa_usec_t latency, configured_latency;
1180     pa_proplist *pl;
1181
1182     pa_assert(pd);
1183     pa_assert(u);
1184
1185     pl = pa_proplist_new();
1186
1187     if (command != PA_COMMAND_REPLY) {
1188         if (command == PA_COMMAND_ERROR)
1189             pa_log("Failed to get info.");
1190         else
1191             pa_log("Protocol error.");
1192         goto fail;
1193     }
1194
1195     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1196         pa_tagstruct_gets(t, &name) < 0 ||
1197         pa_tagstruct_gets(t, &description) < 0 ||
1198         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1199         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1200         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1201         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1202         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1203         pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1204         pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1205         pa_tagstruct_get_usec(t, &latency) < 0 ||
1206         pa_tagstruct_gets(t, &driver) < 0 ||
1207         pa_tagstruct_getu32(t, &flags) < 0) {
1208
1209         pa_log("Parse failure");
1210         goto fail;
1211     }
1212
1213     if (u->version >= 13) {
1214         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1215             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1216
1217             pa_log("Parse failure");
1218             goto fail;
1219         }
1220     }
1221
1222     if (u->version >= 15) {
1223         pa_volume_t base_volume;
1224         uint32_t state, n_volume_steps, card;
1225
1226         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1227             pa_tagstruct_getu32(t, &state) < 0 ||
1228             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1229             pa_tagstruct_getu32(t, &card) < 0) {
1230
1231             pa_log("Parse failure");
1232             goto fail;
1233         }
1234     }
1235
1236     if (!pa_tagstruct_eof(t)) {
1237         pa_log("Packet too long");
1238         goto fail;
1239     }
1240
1241     pa_proplist_free(pl);
1242
1243     if (!u->source_name || strcmp(name, u->source_name))
1244         return;
1245
1246     pa_xfree(u->device_description);
1247     u->device_description = pa_xstrdup(description);
1248
1249     update_description(u);
1250
1251     return;
1252
1253 fail:
1254     pa_module_unload_request(u->module, TRUE);
1255     pa_proplist_free(pl);
1256 }
1257
1258 #endif
1259
1260 /* Called from main context */
1261 static void request_info(struct userdata *u) {
1262     pa_tagstruct *t;
1263     uint32_t tag;
1264     pa_assert(u);
1265
1266     t = pa_tagstruct_new(NULL, 0);
1267     pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1268     pa_tagstruct_putu32(t, tag = u->ctag++);
1269     pa_pstream_send_tagstruct(u->pstream, t);
1270     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1271
1272 #ifdef TUNNEL_SINK
1273     t = pa_tagstruct_new(NULL, 0);
1274     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1275     pa_tagstruct_putu32(t, tag = u->ctag++);
1276     pa_tagstruct_putu32(t, u->device_index);
1277     pa_pstream_send_tagstruct(u->pstream, t);
1278     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1279
1280     if (u->sink_name) {
1281         t = pa_tagstruct_new(NULL, 0);
1282         pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1283         pa_tagstruct_putu32(t, tag = u->ctag++);
1284         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1285         pa_tagstruct_puts(t, u->sink_name);
1286         pa_pstream_send_tagstruct(u->pstream, t);
1287         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1288     }
1289 #else
1290     if (u->source_name) {
1291         t = pa_tagstruct_new(NULL, 0);
1292         pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1293         pa_tagstruct_putu32(t, tag = u->ctag++);
1294         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1295         pa_tagstruct_puts(t, u->source_name);
1296         pa_pstream_send_tagstruct(u->pstream, t);
1297         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1298     }
1299 #endif
1300 }
1301
1302 /* Called from main context */
1303 static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1304     struct userdata *u = userdata;
1305     pa_subscription_event_type_t e;
1306     uint32_t idx;
1307
1308     pa_assert(pd);
1309     pa_assert(t);
1310     pa_assert(u);
1311     pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1312
1313     if (pa_tagstruct_getu32(t, &e) < 0 ||
1314         pa_tagstruct_getu32(t, &idx) < 0) {
1315         pa_log("Invalid protocol reply");
1316         pa_module_unload_request(u->module, TRUE);
1317         return;
1318     }
1319
1320     if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1321 #ifdef TUNNEL_SINK
1322         e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1323         e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1324 #else
1325         e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1326 #endif
1327         )
1328         return;
1329
1330     request_info(u);
1331 }
1332
1333 /* Called from main context */
1334 static void start_subscribe(struct userdata *u) {
1335     pa_tagstruct *t;
1336     uint32_t tag;
1337     pa_assert(u);
1338
1339     t = pa_tagstruct_new(NULL, 0);
1340     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1341     pa_tagstruct_putu32(t, tag = u->ctag++);
1342     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1343 #ifdef TUNNEL_SINK
1344                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1345 #else
1346                         PA_SUBSCRIPTION_MASK_SOURCE
1347 #endif
1348                         );
1349
1350     pa_pstream_send_tagstruct(u->pstream, t);
1351 }
1352
1353 /* Called from main context */
1354 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1355     struct userdata *u = userdata;
1356     struct timeval ntv;
1357 #ifdef TUNNEL_SINK
1358     uint32_t bytes;
1359 #endif
1360
1361     pa_assert(pd);
1362     pa_assert(u);
1363     pa_assert(u->pdispatch == pd);
1364
1365     if (command != PA_COMMAND_REPLY) {
1366         if (command == PA_COMMAND_ERROR)
1367             pa_log("Failed to create stream.");
1368         else
1369             pa_log("Protocol error.");
1370         goto fail;
1371     }
1372
1373     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1374         pa_tagstruct_getu32(t, &u->device_index) < 0
1375 #ifdef TUNNEL_SINK
1376         || pa_tagstruct_getu32(t, &bytes) < 0
1377 #endif
1378         )
1379         goto parse_error;
1380
1381     if (u->version >= 9) {
1382 #ifdef TUNNEL_SINK
1383         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1384             pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1385             pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1386             pa_tagstruct_getu32(t, &u->minreq) < 0)
1387             goto parse_error;
1388 #else
1389         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1390             pa_tagstruct_getu32(t, &u->fragsize) < 0)
1391             goto parse_error;
1392 #endif
1393     }
1394
1395     if (u->version >= 12) {
1396         pa_sample_spec ss;
1397         pa_channel_map cm;
1398         uint32_t device_index;
1399         const char *dn;
1400         pa_bool_t suspended;
1401
1402         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1403             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1404             pa_tagstruct_getu32(t, &device_index) < 0 ||
1405             pa_tagstruct_gets(t, &dn) < 0 ||
1406             pa_tagstruct_get_boolean(t, &suspended) < 0)
1407             goto parse_error;
1408
1409 #ifdef TUNNEL_SINK
1410         pa_xfree(u->sink_name);
1411         u->sink_name = pa_xstrdup(dn);
1412 #else
1413         pa_xfree(u->source_name);
1414         u->source_name = pa_xstrdup(dn);
1415 #endif
1416     }
1417
1418     if (u->version >= 13) {
1419         pa_usec_t usec;
1420
1421         if (pa_tagstruct_get_usec(t, &usec) < 0)
1422             goto parse_error;
1423
1424 /* #ifdef TUNNEL_SINK */
1425 /*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1426 /* #else */
1427 /*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1428 /* #endif */
1429     }
1430
1431     if (!pa_tagstruct_eof(t))
1432         goto parse_error;
1433
1434     start_subscribe(u);
1435     request_info(u);
1436
1437     pa_assert(!u->time_event);
1438     pa_gettimeofday(&ntv);
1439     ntv.tv_sec += LATENCY_INTERVAL;
1440     u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1441
1442     request_latency(u);
1443
1444     pa_log_debug("Stream created.");
1445
1446 #ifdef TUNNEL_SINK
1447     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1448 #endif
1449
1450     return;
1451
1452 parse_error:
1453     pa_log("Invalid reply. (Create stream)");
1454
1455 fail:
1456     pa_module_unload_request(u->module, TRUE);
1457
1458 }
1459
1460 /* Called from main context */
1461 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1462     struct userdata *u = userdata;
1463     pa_tagstruct *reply;
1464     char name[256], un[128], hn[128];
1465 #ifdef TUNNEL_SINK
1466     pa_cvolume volume;
1467 #endif
1468
1469     pa_assert(pd);
1470     pa_assert(u);
1471     pa_assert(u->pdispatch == pd);
1472
1473     if (command != PA_COMMAND_REPLY ||
1474         pa_tagstruct_getu32(t, &u->version) < 0 ||
1475         !pa_tagstruct_eof(t)) {
1476
1477         if (command == PA_COMMAND_ERROR)
1478             pa_log("Failed to authenticate");
1479         else
1480             pa_log("Protocol error.");
1481
1482         goto fail;
1483     }
1484
1485     /* Minimum supported protocol version */
1486     if (u->version < 8) {
1487         pa_log("Incompatible protocol version");
1488         goto fail;
1489     }
1490
1491     /* Starting with protocol version 13 the MSB of the version tag
1492     reflects if shm is enabled for this connection or not. We don't
1493     support SHM here at all, so we just ignore this. */
1494
1495     if (u->version >= 13)
1496         u->version &= 0x7FFFFFFFU;
1497
1498     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1499
1500 #ifdef TUNNEL_SINK
1501     pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1502     pa_sink_update_proplist(u->sink, 0, NULL);
1503
1504     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1505                 u->sink_name,
1506                 pa_get_user_name(un, sizeof(un)),
1507                 pa_get_host_name(hn, sizeof(hn)));
1508 #else
1509     pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1510     pa_source_update_proplist(u->source, 0, NULL);
1511
1512     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1513                 u->source_name,
1514                 pa_get_user_name(un, sizeof(un)),
1515                 pa_get_host_name(hn, sizeof(hn)));
1516 #endif
1517
1518     reply = pa_tagstruct_new(NULL, 0);
1519     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1520     pa_tagstruct_putu32(reply, tag = u->ctag++);
1521
1522     if (u->version >= 13) {
1523         pa_proplist *pl;
1524         pl = pa_proplist_new();
1525         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1526         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1527         pa_init_proplist(pl);
1528         pa_tagstruct_put_proplist(reply, pl);
1529         pa_proplist_free(pl);
1530     } else
1531         pa_tagstruct_puts(reply, "PulseAudio");
1532
1533     pa_pstream_send_tagstruct(u->pstream, reply);
1534     /* We ignore the server's reply here */
1535
1536     reply = pa_tagstruct_new(NULL, 0);
1537
1538     if (u->version < 13)
1539         /* Only for older PA versions we need to fill in the maxlength */
1540         u->maxlength = 4*1024*1024;
1541
1542 #ifdef TUNNEL_SINK
1543     u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1544     u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1545     u->prebuf = u->tlength;
1546 #else
1547     u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1548 #endif
1549
1550 #ifdef TUNNEL_SINK
1551     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1552     pa_tagstruct_putu32(reply, tag = u->ctag++);
1553
1554     if (u->version < 13)
1555         pa_tagstruct_puts(reply, name);
1556
1557     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1558     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1559     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1560     pa_tagstruct_puts(reply, u->sink_name);
1561     pa_tagstruct_putu32(reply, u->maxlength);
1562     pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1563     pa_tagstruct_putu32(reply, u->tlength);
1564     pa_tagstruct_putu32(reply, u->prebuf);
1565     pa_tagstruct_putu32(reply, u->minreq);
1566     pa_tagstruct_putu32(reply, 0);
1567     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1568     pa_tagstruct_put_cvolume(reply, &volume);
1569 #else
1570     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1571     pa_tagstruct_putu32(reply, tag = u->ctag++);
1572
1573     if (u->version < 13)
1574         pa_tagstruct_puts(reply, name);
1575
1576     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1577     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1578     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1579     pa_tagstruct_puts(reply, u->source_name);
1580     pa_tagstruct_putu32(reply, u->maxlength);
1581     pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1582     pa_tagstruct_putu32(reply, u->fragsize);
1583 #endif
1584
1585     if (u->version >= 12) {
1586         pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1587         pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1588         pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1589         pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1590         pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1591         pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1592         pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1593     }
1594
1595     if (u->version >= 13) {
1596         pa_proplist *pl;
1597
1598         pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1599         pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1600
1601         pl = pa_proplist_new();
1602         pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1603         pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1604         pa_tagstruct_put_proplist(reply, pl);
1605         pa_proplist_free(pl);
1606
1607 #ifndef TUNNEL_SINK
1608         pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1609 #endif
1610     }
1611
1612     if (u->version >= 14) {
1613 #ifdef TUNNEL_SINK
1614         pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1615 #endif
1616         pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1617     }
1618
1619     if (u->version >= 15) {
1620 #ifdef TUNNEL_SINK
1621         pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1622 #endif
1623         pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1624         pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1625     }
1626
1627     pa_pstream_send_tagstruct(u->pstream, reply);
1628     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1629
1630     pa_log_debug("Connection authenticated, creating stream ...");
1631
1632     return;
1633
1634 fail:
1635     pa_module_unload_request(u->module, TRUE);
1636 }
1637
1638 /* Called from main context */
1639 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1640     struct userdata *u = userdata;
1641
1642     pa_assert(p);
1643     pa_assert(u);
1644
1645     pa_log_warn("Stream died.");
1646     pa_module_unload_request(u->module, TRUE);
1647 }
1648
1649 /* Called from main context */
1650 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1651     struct userdata *u = userdata;
1652
1653     pa_assert(p);
1654     pa_assert(packet);
1655     pa_assert(u);
1656
1657     if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1658         pa_log("Invalid packet");
1659         pa_module_unload_request(u->module, TRUE);
1660         return;
1661     }
1662 }
1663
1664 #ifndef TUNNEL_SINK
1665 /* Called from main context */
1666 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1667     struct userdata *u = userdata;
1668
1669     pa_assert(p);
1670     pa_assert(chunk);
1671     pa_assert(u);
1672
1673     if (channel != u->channel) {
1674         pa_log("Recieved memory block on bad channel.");
1675         pa_module_unload_request(u->module, TRUE);
1676         return;
1677     }
1678
1679     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1680
1681     u->counter_delta += (int64_t) chunk->length;
1682 }
1683 #endif
1684
1685 /* Called from main context */
1686 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1687     struct userdata *u = userdata;
1688     pa_tagstruct *t;
1689     uint32_t tag;
1690
1691     pa_assert(sc);
1692     pa_assert(u);
1693     pa_assert(u->client == sc);
1694
1695     pa_socket_client_unref(u->client);
1696     u->client = NULL;
1697
1698     if (!io) {
1699         pa_log("Connection failed: %s", pa_cstrerror(errno));
1700         pa_module_unload_request(u->module, TRUE);
1701         return;
1702     }
1703
1704     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1705     u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1706
1707     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1708     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1709 #ifndef TUNNEL_SINK
1710     pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1711 #endif
1712
1713     t = pa_tagstruct_new(NULL, 0);
1714     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1715     pa_tagstruct_putu32(t, tag = u->ctag++);
1716     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1717
1718     pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1719
1720 #ifdef HAVE_CREDS
1721 {
1722     pa_creds ucred;
1723
1724     if (pa_iochannel_creds_supported(io))
1725         pa_iochannel_creds_enable(io);
1726
1727     ucred.uid = getuid();
1728     ucred.gid = getgid();
1729
1730     pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1731 }
1732 #else
1733     pa_pstream_send_tagstruct(u->pstream, t);
1734 #endif
1735
1736     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1737
1738     pa_log_debug("Connection established, authenticating ...");
1739 }
1740
1741 #ifdef TUNNEL_SINK
1742
1743 /* Called from main context */
1744 static void sink_set_volume(pa_sink *sink) {
1745     struct userdata *u;
1746     pa_tagstruct *t;
1747     uint32_t tag;
1748
1749     pa_assert(sink);
1750     u = sink->userdata;
1751     pa_assert(u);
1752
1753     t = pa_tagstruct_new(NULL, 0);
1754     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1755     pa_tagstruct_putu32(t, tag = u->ctag++);
1756     pa_tagstruct_putu32(t, u->device_index);
1757     pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1758     pa_pstream_send_tagstruct(u->pstream, t);
1759 }
1760
1761 /* Called from main context */
1762 static void sink_set_mute(pa_sink *sink) {
1763     struct userdata *u;
1764     pa_tagstruct *t;
1765     uint32_t tag;
1766
1767     pa_assert(sink);
1768     u = sink->userdata;
1769     pa_assert(u);
1770
1771     if (u->version < 11)
1772         return;
1773
1774     t = pa_tagstruct_new(NULL, 0);
1775     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1776     pa_tagstruct_putu32(t, tag = u->ctag++);
1777     pa_tagstruct_putu32(t, u->device_index);
1778     pa_tagstruct_put_boolean(t, !!sink->muted);
1779     pa_pstream_send_tagstruct(u->pstream, t);
1780 }
1781
1782 #endif
1783
1784 int pa__init(pa_module*m) {
1785     pa_modargs *ma = NULL;
1786     struct userdata *u = NULL;
1787     pa_sample_spec ss;
1788     pa_channel_map map;
1789     char *dn = NULL;
1790 #ifdef TUNNEL_SINK
1791     pa_sink_new_data data;
1792 #else
1793     pa_source_new_data data;
1794 #endif
1795
1796     pa_assert(m);
1797
1798     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1799         pa_log("Failed to parse module arguments");
1800         goto fail;
1801     }
1802
1803     m->userdata = u = pa_xnew0(struct userdata, 1);
1804     u->core = m->core;
1805     u->module = m;
1806     u->client = NULL;
1807     u->pdispatch = NULL;
1808     u->pstream = NULL;
1809     u->server_name = NULL;
1810 #ifdef TUNNEL_SINK
1811     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1812     u->sink = NULL;
1813     u->requested_bytes = 0;
1814 #else
1815     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1816     u->source = NULL;
1817 #endif
1818     u->smoother = pa_smoother_new(PA_USEC_PER_SEC, PA_USEC_PER_SEC*2, TRUE, 10);
1819     u->ctag = 1;
1820     u->device_index = u->channel = PA_INVALID_INDEX;
1821     u->time_event = NULL;
1822     u->ignore_latency_before = 0;
1823     u->transport_usec = u->thread_transport_usec = 0;
1824     u->remote_suspended = u->remote_corked = FALSE;
1825     u->counter = u->counter_delta = 0;
1826
1827     u->rtpoll = pa_rtpoll_new();
1828     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1829
1830     if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1831         goto fail;
1832
1833     if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1834         pa_log("No server specified.");
1835         goto fail;
1836     }
1837
1838     ss = m->core->default_sample_spec;
1839     map = m->core->default_channel_map;
1840     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1841         pa_log("Invalid sample format specification");
1842         goto fail;
1843     }
1844
1845     if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1846         pa_log("Failed to connect to server '%s'", u->server_name);
1847         goto fail;
1848     }
1849
1850     pa_socket_client_set_callback(u->client, on_connection, u);
1851
1852 #ifdef TUNNEL_SINK
1853
1854     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1855         dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1856
1857     pa_sink_new_data_init(&data);
1858     data.driver = __FILE__;
1859     data.module = m;
1860     data.namereg_fail = TRUE;
1861     pa_sink_new_data_set_name(&data, dn);
1862     pa_sink_new_data_set_sample_spec(&data, &ss);
1863     pa_sink_new_data_set_channel_map(&data, &map);
1864     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1865     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1866     if (u->sink_name)
1867         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1868
1869     u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1870     pa_sink_new_data_done(&data);
1871
1872     if (!u->sink) {
1873         pa_log("Failed to create sink.");
1874         goto fail;
1875     }
1876
1877     u->sink->parent.process_msg = sink_process_msg;
1878     u->sink->userdata = u;
1879     u->sink->set_state = sink_set_state;
1880     u->sink->set_volume = sink_set_volume;
1881     u->sink->set_mute = sink_set_mute;
1882
1883     u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1884
1885 /*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1886
1887     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1888     pa_sink_set_rtpoll(u->sink, u->rtpoll);
1889
1890 #else
1891
1892     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1893         dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1894
1895     pa_source_new_data_init(&data);
1896     data.driver = __FILE__;
1897     data.module = m;
1898     data.namereg_fail = TRUE;
1899     pa_source_new_data_set_name(&data, dn);
1900     pa_source_new_data_set_sample_spec(&data, &ss);
1901     pa_source_new_data_set_channel_map(&data, &map);
1902     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1903     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1904     if (u->source_name)
1905         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1906
1907     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1908     pa_source_new_data_done(&data);
1909
1910     if (!u->source) {
1911         pa_log("Failed to create source.");
1912         goto fail;
1913     }
1914
1915     u->source->parent.process_msg = source_process_msg;
1916     u->source->set_state = source_set_state;
1917     u->source->userdata = u;
1918
1919 /*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1920
1921     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1922     pa_source_set_rtpoll(u->source, u->rtpoll);
1923 #endif
1924
1925     pa_xfree(dn);
1926
1927     u->time_event = NULL;
1928
1929     u->maxlength = (uint32_t) -1;
1930 #ifdef TUNNEL_SINK
1931     u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
1932 #else
1933     u->fragsize = (uint32_t) -1;
1934 #endif
1935
1936     pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec());
1937
1938     if (!(u->thread = pa_thread_new(thread_func, u))) {
1939         pa_log("Failed to create thread.");
1940         goto fail;
1941     }
1942
1943 #ifdef TUNNEL_SINK
1944     pa_sink_put(u->sink);
1945 #else
1946     pa_source_put(u->source);
1947 #endif
1948
1949     pa_modargs_free(ma);
1950
1951     return 0;
1952
1953 fail:
1954     pa__done(m);
1955
1956     if (ma)
1957         pa_modargs_free(ma);
1958
1959     pa_xfree(dn);
1960
1961     return  -1;
1962 }
1963
1964 void pa__done(pa_module*m) {
1965     struct userdata* u;
1966
1967     pa_assert(m);
1968
1969     if (!(u = m->userdata))
1970         return;
1971
1972 #ifdef TUNNEL_SINK
1973     if (u->sink)
1974         pa_sink_unlink(u->sink);
1975 #else
1976     if (u->source)
1977         pa_source_unlink(u->source);
1978 #endif
1979
1980     if (u->thread) {
1981         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1982         pa_thread_free(u->thread);
1983     }
1984
1985     pa_thread_mq_done(&u->thread_mq);
1986
1987 #ifdef TUNNEL_SINK
1988     if (u->sink)
1989         pa_sink_unref(u->sink);
1990 #else
1991     if (u->source)
1992         pa_source_unref(u->source);
1993 #endif
1994
1995     if (u->rtpoll)
1996         pa_rtpoll_free(u->rtpoll);
1997
1998     if (u->pstream) {
1999         pa_pstream_unlink(u->pstream);
2000         pa_pstream_unref(u->pstream);
2001     }
2002
2003     if (u->pdispatch)
2004         pa_pdispatch_unref(u->pdispatch);
2005
2006     if (u->client)
2007         pa_socket_client_unref(u->client);
2008
2009     if (u->auth_cookie)
2010         pa_auth_cookie_unref(u->auth_cookie);
2011
2012     if (u->smoother)
2013         pa_smoother_free(u->smoother);
2014
2015     if (u->time_event)
2016         u->core->mainloop->time_free(u->time_event);
2017
2018 #ifdef TUNNEL_SINK
2019     pa_xfree(u->sink_name);
2020 #else
2021     pa_xfree(u->source_name);
2022 #endif
2023     pa_xfree(u->server_name);
2024
2025     pa_xfree(u->device_description);
2026     pa_xfree(u->server_fqdn);
2027     pa_xfree(u->user_name);
2028
2029     pa_xfree(u);
2030 }