Merge commit 'origin/master-tx'
[platform/upstream/pulseaudio.git] / src / modules / module-tunnel.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67         "sink_name=<name for the local sink> "
68         "sink_properties=<properties for the local sink> "
69         "server=<address> "
70         "sink=<remote sink name> "
71         "cookie=<filename> "
72         "format=<sample format> "
73         "channels=<number of channels> "
74         "rate=<sample rate> "
75         "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79         "source_name=<name for the local source> "
80         "source_properties=<properties for the local source> "
81         "server=<address> "
82         "source=<remote source name> "
83         "cookie=<filename> "
84         "format=<sample format> "
85         "channels=<number of channels> "
86         "rate=<sample rate> "
87         "channel_map=<channel map>");
88 #endif
89
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(FALSE);
93
94 static const char* const valid_modargs[] = {
95     "server",
96     "cookie",
97     "format",
98     "channels",
99     "rate",
100 #ifdef TUNNEL_SINK
101     "sink_name",
102     "sink_properties",
103     "sink",
104 #else
105     "source_name",
106     "source_properties",
107     "source",
108 #endif
109     "channel_map",
110     NULL,
111 };
112
113 #define DEFAULT_TIMEOUT 5
114
115 #define LATENCY_INTERVAL 10
116
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118
119 #ifdef TUNNEL_SINK
120
121 enum {
122     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123     SINK_MESSAGE_REMOTE_SUSPEND,
124     SINK_MESSAGE_UPDATE_LATENCY,
125     SINK_MESSAGE_POST
126 };
127
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
130
131 #else
132
133 enum {
134     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135     SOURCE_MESSAGE_REMOTE_SUSPEND,
136     SOURCE_MESSAGE_UPDATE_LATENCY
137 };
138
139 #define DEFAULT_FRAGSIZE_MSEC 25
140
141 #endif
142
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157     [PA_COMMAND_REQUEST] = command_request,
158     [PA_COMMAND_STARTED] = command_started,
159 #endif
160     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161     [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162     [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168     [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169     [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170     [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171     [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172     [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173     [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
174 };
175
176 struct userdata {
177     pa_core *core;
178     pa_module *module;
179
180     pa_thread_mq thread_mq;
181     pa_rtpoll *rtpoll;
182     pa_thread *thread;
183
184     pa_socket_client *client;
185     pa_pstream *pstream;
186     pa_pdispatch *pdispatch;
187
188     char *server_name;
189 #ifdef TUNNEL_SINK
190     char *sink_name;
191     pa_sink *sink;
192     size_t requested_bytes;
193 #else
194     char *source_name;
195     pa_source *source;
196 #endif
197
198     pa_auth_cookie *auth_cookie;
199
200     uint32_t version;
201     uint32_t ctag;
202     uint32_t device_index;
203     uint32_t channel;
204
205     int64_t counter, counter_delta;
206
207     pa_bool_t remote_corked:1;
208     pa_bool_t remote_suspended:1;
209
210     pa_usec_t transport_usec; /* maintained in the main thread */
211     pa_usec_t thread_transport_usec; /* maintained in the IO thread */
212
213     uint32_t ignore_latency_before;
214
215     pa_time_event *time_event;
216
217     pa_smoother *smoother;
218
219     char *device_description;
220     char *server_fqdn;
221     char *user_name;
222
223     uint32_t maxlength;
224 #ifdef TUNNEL_SINK
225     uint32_t tlength;
226     uint32_t minreq;
227     uint32_t prebuf;
228 #else
229     uint32_t fragsize;
230 #endif
231 };
232
233 static void request_latency(struct userdata *u);
234
235 /* Called from main context */
236 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
237     pa_log_debug("Got stream or client event.");
238 }
239
240 /* Called from main context */
241 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
242     struct userdata *u = userdata;
243
244     pa_assert(pd);
245     pa_assert(t);
246     pa_assert(u);
247     pa_assert(u->pdispatch == pd);
248
249     pa_log_warn("Stream killed");
250     pa_module_unload_request(u->module, TRUE);
251 }
252
253 /* Called from main context */
254 static void command_overflow_or_underflow(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
255     struct userdata *u = userdata;
256
257     pa_assert(pd);
258     pa_assert(t);
259     pa_assert(u);
260     pa_assert(u->pdispatch == pd);
261
262     pa_log_info("Server signalled buffer overrun/underrun.");
263     request_latency(u);
264 }
265
266 /* Called from main context */
267 static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
268     struct userdata *u = userdata;
269     uint32_t channel;
270     pa_bool_t suspended;
271
272     pa_assert(pd);
273     pa_assert(t);
274     pa_assert(u);
275     pa_assert(u->pdispatch == pd);
276
277     if (pa_tagstruct_getu32(t, &channel) < 0 ||
278         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
279         !pa_tagstruct_eof(t)) {
280
281         pa_log("Invalid packet.");
282         pa_module_unload_request(u->module, TRUE);
283         return;
284     }
285
286     pa_log_debug("Server reports device suspend.");
287
288 #ifdef TUNNEL_SINK
289     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
290 #else
291     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
292 #endif
293
294     request_latency(u);
295 }
296
297 /* Called from main context */
298 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
299     struct userdata *u = userdata;
300     uint32_t channel, di;
301     const char *dn;
302     pa_bool_t suspended;
303
304     pa_assert(pd);
305     pa_assert(t);
306     pa_assert(u);
307     pa_assert(u->pdispatch == pd);
308
309     if (pa_tagstruct_getu32(t, &channel) < 0 ||
310         pa_tagstruct_getu32(t, &di) < 0 ||
311         pa_tagstruct_gets(t, &dn) < 0 ||
312         pa_tagstruct_get_boolean(t, &suspended) < 0) {
313
314         pa_log_error("Invalid packet.");
315         pa_module_unload_request(u->module, TRUE);
316         return;
317     }
318
319     pa_log_debug("Server reports a stream move.");
320
321 #ifdef TUNNEL_SINK
322     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
323 #else
324     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
325 #endif
326
327     request_latency(u);
328 }
329
330 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
331     struct userdata *u = userdata;
332     uint32_t channel, maxlength, tlength, fragsize, prebuf, minreq;
333     pa_usec_t usec;
334
335     pa_assert(pd);
336     pa_assert(t);
337     pa_assert(u);
338     pa_assert(u->pdispatch == pd);
339
340     if (pa_tagstruct_getu32(t, &channel) < 0 ||
341         pa_tagstruct_getu32(t, &maxlength) < 0) {
342
343         pa_log_error("Invalid packet.");
344         pa_module_unload_request(u->module, TRUE);
345         return;
346     }
347
348     if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
349         if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
350             pa_tagstruct_get_usec(t, &usec) < 0) {
351
352             pa_log_error("Invalid packet.");
353             pa_module_unload_request(u->module, TRUE);
354             return;
355         }
356     } else {
357         if (pa_tagstruct_getu32(t, &tlength) < 0 ||
358             pa_tagstruct_getu32(t, &prebuf) < 0 ||
359             pa_tagstruct_getu32(t, &minreq) < 0 ||
360             pa_tagstruct_get_usec(t, &usec) < 0) {
361
362             pa_log_error("Invalid packet.");
363             pa_module_unload_request(u->module, TRUE);
364             return;
365         }
366     }
367
368 #ifdef TUNNEL_SINK
369     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
370 #endif
371
372     request_latency(u);
373 }
374
375 #ifdef TUNNEL_SINK
376
377 /* Called from main context */
378 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
379    struct userdata *u = userdata;
380
381     pa_assert(pd);
382     pa_assert(t);
383     pa_assert(u);
384     pa_assert(u->pdispatch == pd);
385
386     pa_log_debug("Server reports playback started.");
387     request_latency(u);
388 }
389
390 #endif
391
392 /* Called from IO thread context */
393 static void check_smoother_status(struct userdata *u, pa_bool_t past)  {
394     pa_usec_t x;
395
396     pa_assert(u);
397
398     x = pa_rtclock_usec();
399
400     /* Correct by the time the requested issued needs to travel to the
401      * other side.  This is a valid thread-safe access, because the
402      * main thread is waiting for us */
403
404     if (past)
405         x -= u->thread_transport_usec;
406     else
407         x += u->thread_transport_usec;
408
409     if (u->remote_suspended || u->remote_corked)
410         pa_smoother_pause(u->smoother, x);
411     else
412         pa_smoother_resume(u->smoother, x, TRUE);
413 }
414
415 /* Called from IO thread context */
416 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
417     pa_assert(u);
418
419     if (u->remote_corked == cork)
420         return;
421
422     u->remote_corked = cork;
423     check_smoother_status(u, FALSE);
424 }
425
426 /* Called from main context */
427 static void stream_cork(struct userdata *u, pa_bool_t cork) {
428     pa_tagstruct *t;
429     pa_assert(u);
430
431     if (!u->pstream)
432         return;
433
434     t = pa_tagstruct_new(NULL, 0);
435 #ifdef TUNNEL_SINK
436     pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
437 #else
438     pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
439 #endif
440     pa_tagstruct_putu32(t, u->ctag++);
441     pa_tagstruct_putu32(t, u->channel);
442     pa_tagstruct_put_boolean(t, !!cork);
443     pa_pstream_send_tagstruct(u->pstream, t);
444
445     request_latency(u);
446 }
447
448 /* Called from IO thread context */
449 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
450     pa_assert(u);
451
452     if (u->remote_suspended == suspend)
453         return;
454
455     u->remote_suspended = suspend;
456     check_smoother_status(u, TRUE);
457 }
458
459 #ifdef TUNNEL_SINK
460
461 /* Called from IO thread context */
462 static void send_data(struct userdata *u) {
463     pa_assert(u);
464
465     while (u->requested_bytes > 0) {
466         pa_memchunk memchunk;
467
468         pa_sink_render(u->sink, u->requested_bytes, &memchunk);
469         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
470         pa_memblock_unref(memchunk.memblock);
471
472         u->requested_bytes -= memchunk.length;
473
474         u->counter += (int64_t) memchunk.length;
475     }
476 }
477
478 /* This function is called from IO context -- except when it is not. */
479 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
480     struct userdata *u = PA_SINK(o)->userdata;
481
482     switch (code) {
483
484         case PA_SINK_MESSAGE_SET_STATE: {
485             int r;
486
487             /* First, change the state, because otherwide pa_sink_render() would fail */
488             if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
489
490                 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
491
492                 if (PA_SINK_IS_OPENED(u->sink->state))
493                     send_data(u);
494             }
495
496             return r;
497         }
498
499         case PA_SINK_MESSAGE_GET_LATENCY: {
500             pa_usec_t yl, yr, *usec = data;
501
502             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
503             yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
504
505             *usec = yl > yr ? yl - yr : 0;
506             return 0;
507         }
508
509         case SINK_MESSAGE_REQUEST:
510
511             pa_assert(offset > 0);
512             u->requested_bytes += (size_t) offset;
513
514             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
515                 send_data(u);
516
517             return 0;
518
519
520         case SINK_MESSAGE_REMOTE_SUSPEND:
521
522             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
523             return 0;
524
525
526         case SINK_MESSAGE_UPDATE_LATENCY: {
527             pa_usec_t y;
528
529             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
530
531             if (y > (pa_usec_t) offset)
532                 y -= (pa_usec_t) offset;
533             else
534                 y = 0;
535
536             pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
537
538             /* We can access this freely here, since the main thread is waiting for us */
539             u->thread_transport_usec = u->transport_usec;
540
541             return 0;
542         }
543
544         case SINK_MESSAGE_POST:
545
546             /* OK, This might be a bit confusing. This message is
547              * delivered to us from the main context -- NOT from the
548              * IO thread context where the rest of the messages are
549              * dispatched. Yeah, ugly, but I am a lazy bastard. */
550
551             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
552
553             u->counter_delta += (int64_t) chunk->length;
554
555             return 0;
556     }
557
558     return pa_sink_process_msg(o, code, data, offset, chunk);
559 }
560
561 /* Called from main context */
562 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
563     struct userdata *u;
564     pa_sink_assert_ref(s);
565     u = s->userdata;
566
567     switch ((pa_sink_state_t) state) {
568
569         case PA_SINK_SUSPENDED:
570             pa_assert(PA_SINK_IS_OPENED(s->state));
571             stream_cork(u, TRUE);
572             break;
573
574         case PA_SINK_IDLE:
575         case PA_SINK_RUNNING:
576             if (s->state == PA_SINK_SUSPENDED)
577                 stream_cork(u, FALSE);
578             break;
579
580         case PA_SINK_UNLINKED:
581         case PA_SINK_INIT:
582         case PA_SINK_INVALID_STATE:
583             ;
584     }
585
586     return 0;
587 }
588
589 #else
590
591 /* This function is called from IO context -- except when it is not. */
592 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
593     struct userdata *u = PA_SOURCE(o)->userdata;
594
595     switch (code) {
596
597         case PA_SOURCE_MESSAGE_SET_STATE: {
598             int r;
599
600             if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
601                 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
602
603             return r;
604         }
605
606         case PA_SOURCE_MESSAGE_GET_LATENCY: {
607             pa_usec_t yr, yl, *usec = data;
608
609             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
610             yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
611
612             *usec = yr > yl ? yr - yl : 0;
613             return 0;
614         }
615
616         case SOURCE_MESSAGE_POST:
617
618             if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
619                 pa_source_post(u->source, chunk);
620
621             u->counter += (int64_t) chunk->length;
622
623             return 0;
624
625         case SOURCE_MESSAGE_REMOTE_SUSPEND:
626
627             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
628             return 0;
629
630         case SOURCE_MESSAGE_UPDATE_LATENCY: {
631             pa_usec_t y;
632
633             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
634             y += (pa_usec_t) offset;
635
636             pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
637
638             /* We can access this freely here, since the main thread is waiting for us */
639             u->thread_transport_usec = u->transport_usec;
640
641             return 0;
642         }
643     }
644
645     return pa_source_process_msg(o, code, data, offset, chunk);
646 }
647
648 /* Called from main context */
649 static int source_set_state(pa_source *s, pa_source_state_t state) {
650     struct userdata *u;
651     pa_source_assert_ref(s);
652     u = s->userdata;
653
654     switch ((pa_source_state_t) state) {
655
656         case PA_SOURCE_SUSPENDED:
657             pa_assert(PA_SOURCE_IS_OPENED(s->state));
658             stream_cork(u, TRUE);
659             break;
660
661         case PA_SOURCE_IDLE:
662         case PA_SOURCE_RUNNING:
663             if (s->state == PA_SOURCE_SUSPENDED)
664                 stream_cork(u, FALSE);
665             break;
666
667         case PA_SOURCE_UNLINKED:
668         case PA_SOURCE_INIT:
669         case PA_SINK_INVALID_STATE:
670             ;
671     }
672
673     return 0;
674 }
675
676 #endif
677
678 static void thread_func(void *userdata) {
679     struct userdata *u = userdata;
680
681     pa_assert(u);
682
683     pa_log_debug("Thread starting up");
684
685     pa_thread_mq_install(&u->thread_mq);
686     pa_rtpoll_install(u->rtpoll);
687
688     for (;;) {
689         int ret;
690
691 #ifdef TUNNEL_SINK
692         if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
693             if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
694                 pa_sink_process_rewind(u->sink, 0);
695 #endif
696
697         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
698             goto fail;
699
700         if (ret == 0)
701             goto finish;
702     }
703
704 fail:
705     /* If this was no regular exit from the loop we have to continue
706      * processing messages until we received PA_MESSAGE_SHUTDOWN */
707     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
708     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
709
710 finish:
711     pa_log_debug("Thread shutting down");
712 }
713
714 #ifdef TUNNEL_SINK
715 /* Called from main context */
716 static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
717     struct userdata *u = userdata;
718     uint32_t bytes, channel;
719
720     pa_assert(pd);
721     pa_assert(command == PA_COMMAND_REQUEST);
722     pa_assert(t);
723     pa_assert(u);
724     pa_assert(u->pdispatch == pd);
725
726     if (pa_tagstruct_getu32(t, &channel) < 0 ||
727         pa_tagstruct_getu32(t, &bytes) < 0) {
728         pa_log("Invalid protocol reply");
729         goto fail;
730     }
731
732     if (channel != u->channel) {
733         pa_log("Received data for invalid channel");
734         goto fail;
735     }
736
737     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
738     return;
739
740 fail:
741     pa_module_unload_request(u->module, TRUE);
742 }
743
744 #endif
745
746 /* Called from main context */
747 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
748     struct userdata *u = userdata;
749     pa_usec_t sink_usec, source_usec;
750     pa_bool_t playing;
751     int64_t write_index, read_index;
752     struct timeval local, remote, now;
753     pa_sample_spec *ss;
754     int64_t delay;
755
756     pa_assert(pd);
757     pa_assert(u);
758
759     if (command != PA_COMMAND_REPLY) {
760         if (command == PA_COMMAND_ERROR)
761             pa_log("Failed to get latency.");
762         else
763             pa_log("Protocol error.");
764         goto fail;
765     }
766
767     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
768         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
769         pa_tagstruct_get_boolean(t, &playing) < 0 ||
770         pa_tagstruct_get_timeval(t, &local) < 0 ||
771         pa_tagstruct_get_timeval(t, &remote) < 0 ||
772         pa_tagstruct_gets64(t, &write_index) < 0 ||
773         pa_tagstruct_gets64(t, &read_index) < 0) {
774         pa_log("Invalid reply.");
775         goto fail;
776     }
777
778 #ifdef TUNNEL_SINK
779     if (u->version >= 13) {
780         uint64_t underrun_for = 0, playing_for = 0;
781
782         if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
783             pa_tagstruct_getu64(t, &playing_for) < 0) {
784             pa_log("Invalid reply.");
785             goto fail;
786         }
787     }
788 #endif
789
790     if (!pa_tagstruct_eof(t)) {
791         pa_log("Invalid reply.");
792         goto fail;
793     }
794
795     if (tag < u->ignore_latency_before) {
796         return;
797     }
798
799     pa_gettimeofday(&now);
800
801     /* Calculate transport usec */
802     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
803         /* local and remote seem to have synchronized clocks */
804 #ifdef TUNNEL_SINK
805         u->transport_usec = pa_timeval_diff(&remote, &local);
806 #else
807         u->transport_usec = pa_timeval_diff(&now, &remote);
808 #endif
809     } else
810         u->transport_usec = pa_timeval_diff(&now, &local)/2;
811
812     /* First, take the device's delay */
813 #ifdef TUNNEL_SINK
814     delay = (int64_t) sink_usec;
815     ss = &u->sink->sample_spec;
816 #else
817     delay = (int64_t) source_usec;
818     ss = &u->source->sample_spec;
819 #endif
820
821     /* Add the length of our server-side buffer */
822     if (write_index >= read_index)
823         delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
824     else
825         delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
826
827     /* Our measurements are already out of date, hence correct by the     *
828      * transport latency */
829 #ifdef TUNNEL_SINK
830     delay -= (int64_t) u->transport_usec;
831 #else
832     delay += (int64_t) u->transport_usec;
833 #endif
834
835     /* Now correct by what we have have read/written since we requested the update */
836 #ifdef TUNNEL_SINK
837     delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
838 #else
839     delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
840 #endif
841
842 #ifdef TUNNEL_SINK
843     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
844 #else
845     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
846 #endif
847
848     return;
849
850 fail:
851
852     pa_module_unload_request(u->module, TRUE);
853 }
854
855 /* Called from main context */
856 static void request_latency(struct userdata *u) {
857     pa_tagstruct *t;
858     struct timeval now;
859     uint32_t tag;
860     pa_assert(u);
861
862     t = pa_tagstruct_new(NULL, 0);
863 #ifdef TUNNEL_SINK
864     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
865 #else
866     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
867 #endif
868     pa_tagstruct_putu32(t, tag = u->ctag++);
869     pa_tagstruct_putu32(t, u->channel);
870
871     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
872
873     pa_pstream_send_tagstruct(u->pstream, t);
874     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
875
876     u->ignore_latency_before = tag;
877     u->counter_delta = 0;
878 }
879
880 /* Called from main context */
881 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e,  const struct timeval *tv, void *userdata) {
882     struct userdata *u = userdata;
883     struct timeval ntv;
884
885     pa_assert(m);
886     pa_assert(e);
887     pa_assert(u);
888
889     request_latency(u);
890
891     pa_gettimeofday(&ntv);
892     ntv.tv_sec += LATENCY_INTERVAL;
893     m->time_restart(e, &ntv);
894 }
895
896 /* Called from main context */
897 static void update_description(struct userdata *u) {
898     char *d;
899     char un[128], hn[128];
900     pa_tagstruct *t;
901
902     pa_assert(u);
903
904     if (!u->server_fqdn || !u->user_name || !u->device_description)
905         return;
906
907     d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
908
909 #ifdef TUNNEL_SINK
910     pa_sink_set_description(u->sink, d);
911     pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
912     pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
913     pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
914 #else
915     pa_source_set_description(u->source, d);
916     pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
917     pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
918     pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
919 #endif
920
921     pa_xfree(d);
922
923     d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
924                           pa_get_user_name(un, sizeof(un)),
925                           pa_get_host_name(hn, sizeof(hn)));
926
927     t = pa_tagstruct_new(NULL, 0);
928 #ifdef TUNNEL_SINK
929     pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
930 #else
931     pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
932 #endif
933     pa_tagstruct_putu32(t, u->ctag++);
934     pa_tagstruct_putu32(t, u->channel);
935     pa_tagstruct_puts(t, d);
936     pa_pstream_send_tagstruct(u->pstream, t);
937
938     pa_xfree(d);
939 }
940
941 /* Called from main context */
942 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
943     struct userdata *u = userdata;
944     pa_sample_spec ss;
945     pa_channel_map cm;
946     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
947     uint32_t cookie;
948
949     pa_assert(pd);
950     pa_assert(u);
951
952     if (command != PA_COMMAND_REPLY) {
953         if (command == PA_COMMAND_ERROR)
954             pa_log("Failed to get info.");
955         else
956             pa_log("Protocol error.");
957         goto fail;
958     }
959
960     if (pa_tagstruct_gets(t, &server_name) < 0 ||
961         pa_tagstruct_gets(t, &server_version) < 0 ||
962         pa_tagstruct_gets(t, &user_name) < 0 ||
963         pa_tagstruct_gets(t, &host_name) < 0 ||
964         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
965         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
966         pa_tagstruct_gets(t, &default_source_name) < 0 ||
967         pa_tagstruct_getu32(t, &cookie) < 0 ||
968         (u->version >= 15 &&
969          pa_tagstruct_get_channel_map(t, &cm) < 0)) {
970
971         pa_log("Parse failure");
972         goto fail;
973     }
974
975     if (!pa_tagstruct_eof(t)) {
976         pa_log("Packet too long");
977         goto fail;
978     }
979
980     pa_xfree(u->server_fqdn);
981     u->server_fqdn = pa_xstrdup(host_name);
982
983     pa_xfree(u->user_name);
984     u->user_name = pa_xstrdup(user_name);
985
986     update_description(u);
987
988     return;
989
990 fail:
991     pa_module_unload_request(u->module, TRUE);
992 }
993
994 #ifdef TUNNEL_SINK
995
996 /* Called from main context */
997 static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
998     struct userdata *u = userdata;
999     uint32_t idx, owner_module, monitor_source, flags;
1000     const char *name, *description, *monitor_source_name, *driver;
1001     pa_sample_spec ss;
1002     pa_channel_map cm;
1003     pa_cvolume volume;
1004     pa_bool_t mute;
1005     pa_usec_t latency;
1006     pa_proplist *pl;
1007
1008     pa_assert(pd);
1009     pa_assert(u);
1010
1011     pl = pa_proplist_new();
1012
1013     if (command != PA_COMMAND_REPLY) {
1014         if (command == PA_COMMAND_ERROR)
1015             pa_log("Failed to get info.");
1016         else
1017             pa_log("Protocol error.");
1018         goto fail;
1019     }
1020
1021     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1022         pa_tagstruct_gets(t, &name) < 0 ||
1023         pa_tagstruct_gets(t, &description) < 0 ||
1024         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1025         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1026         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1027         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1028         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1029         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1030         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1031         pa_tagstruct_get_usec(t, &latency) < 0 ||
1032         pa_tagstruct_gets(t, &driver) < 0 ||
1033         pa_tagstruct_getu32(t, &flags) < 0) {
1034
1035         pa_log("Parse failure");
1036         goto fail;
1037     }
1038
1039     if (u->version >= 13) {
1040         pa_usec_t configured_latency;
1041
1042         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1043             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1044
1045             pa_log("Parse failure");
1046             goto fail;
1047         }
1048     }
1049
1050     if (u->version >= 15) {
1051         pa_volume_t base_volume;
1052         uint32_t state, n_volume_steps, card;
1053
1054         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1055             pa_tagstruct_getu32(t, &state) < 0 ||
1056             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1057             pa_tagstruct_getu32(t, &card) < 0) {
1058
1059             pa_log("Parse failure");
1060             goto fail;
1061         }
1062     }
1063
1064     if (!pa_tagstruct_eof(t)) {
1065         pa_log("Packet too long");
1066         goto fail;
1067     }
1068
1069     pa_proplist_free(pl);
1070
1071     if (!u->sink_name || strcmp(name, u->sink_name))
1072         return;
1073
1074     pa_xfree(u->device_description);
1075     u->device_description = pa_xstrdup(description);
1076
1077     update_description(u);
1078
1079     return;
1080
1081 fail:
1082     pa_module_unload_request(u->module, TRUE);
1083     pa_proplist_free(pl);
1084 }
1085
1086 /* Called from main context */
1087 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1088     struct userdata *u = userdata;
1089     uint32_t idx, owner_module, client, sink;
1090     pa_usec_t buffer_usec, sink_usec;
1091     const char *name, *driver, *resample_method;
1092     pa_bool_t mute;
1093     pa_sample_spec sample_spec;
1094     pa_channel_map channel_map;
1095     pa_cvolume volume;
1096     pa_proplist *pl;
1097
1098     pa_assert(pd);
1099     pa_assert(u);
1100
1101     pl = pa_proplist_new();
1102
1103     if (command != PA_COMMAND_REPLY) {
1104         if (command == PA_COMMAND_ERROR)
1105             pa_log("Failed to get info.");
1106         else
1107             pa_log("Protocol error.");
1108         goto fail;
1109     }
1110
1111     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1112         pa_tagstruct_gets(t, &name) < 0 ||
1113         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1114         pa_tagstruct_getu32(t, &client) < 0 ||
1115         pa_tagstruct_getu32(t, &sink) < 0 ||
1116         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1117         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1118         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1119         pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1120         pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1121         pa_tagstruct_gets(t, &resample_method) < 0 ||
1122         pa_tagstruct_gets(t, &driver) < 0) {
1123
1124         pa_log("Parse failure");
1125         goto fail;
1126     }
1127
1128     if (u->version >= 11) {
1129         if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1130
1131             pa_log("Parse failure");
1132             goto fail;
1133         }
1134     }
1135
1136     if (u->version >= 13) {
1137         if (pa_tagstruct_get_proplist(t, pl) < 0) {
1138
1139             pa_log("Parse failure");
1140             goto fail;
1141         }
1142     }
1143
1144     if (!pa_tagstruct_eof(t)) {
1145         pa_log("Packet too long");
1146         goto fail;
1147     }
1148
1149     pa_proplist_free(pl);
1150
1151     if (idx != u->device_index)
1152         return;
1153
1154     pa_assert(u->sink);
1155
1156     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1157         pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1158         return;
1159
1160     pa_sink_volume_changed(u->sink, &volume, FALSE);
1161
1162     if (u->version >= 11)
1163         pa_sink_mute_changed(u->sink, mute, FALSE);
1164
1165     return;
1166
1167 fail:
1168     pa_module_unload_request(u->module, TRUE);
1169     pa_proplist_free(pl);
1170 }
1171
1172 #else
1173
1174 /* Called from main context */
1175 static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1176     struct userdata *u = userdata;
1177     uint32_t idx, owner_module, monitor_of_sink, flags;
1178     const char *name, *description, *monitor_of_sink_name, *driver;
1179     pa_sample_spec ss;
1180     pa_channel_map cm;
1181     pa_cvolume volume;
1182     pa_bool_t mute;
1183     pa_usec_t latency, configured_latency;
1184     pa_proplist *pl;
1185
1186     pa_assert(pd);
1187     pa_assert(u);
1188
1189     pl = pa_proplist_new();
1190
1191     if (command != PA_COMMAND_REPLY) {
1192         if (command == PA_COMMAND_ERROR)
1193             pa_log("Failed to get info.");
1194         else
1195             pa_log("Protocol error.");
1196         goto fail;
1197     }
1198
1199     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1200         pa_tagstruct_gets(t, &name) < 0 ||
1201         pa_tagstruct_gets(t, &description) < 0 ||
1202         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1203         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1204         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1205         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1206         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1207         pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1208         pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1209         pa_tagstruct_get_usec(t, &latency) < 0 ||
1210         pa_tagstruct_gets(t, &driver) < 0 ||
1211         pa_tagstruct_getu32(t, &flags) < 0) {
1212
1213         pa_log("Parse failure");
1214         goto fail;
1215     }
1216
1217     if (u->version >= 13) {
1218         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1219             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1220
1221             pa_log("Parse failure");
1222             goto fail;
1223         }
1224     }
1225
1226     if (u->version >= 15) {
1227         pa_volume_t base_volume;
1228         uint32_t state, n_volume_steps, card;
1229
1230         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1231             pa_tagstruct_getu32(t, &state) < 0 ||
1232             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1233             pa_tagstruct_getu32(t, &card) < 0) {
1234
1235             pa_log("Parse failure");
1236             goto fail;
1237         }
1238     }
1239
1240     if (!pa_tagstruct_eof(t)) {
1241         pa_log("Packet too long");
1242         goto fail;
1243     }
1244
1245     pa_proplist_free(pl);
1246
1247     if (!u->source_name || strcmp(name, u->source_name))
1248         return;
1249
1250     pa_xfree(u->device_description);
1251     u->device_description = pa_xstrdup(description);
1252
1253     update_description(u);
1254
1255     return;
1256
1257 fail:
1258     pa_module_unload_request(u->module, TRUE);
1259     pa_proplist_free(pl);
1260 }
1261
1262 #endif
1263
1264 /* Called from main context */
1265 static void request_info(struct userdata *u) {
1266     pa_tagstruct *t;
1267     uint32_t tag;
1268     pa_assert(u);
1269
1270     t = pa_tagstruct_new(NULL, 0);
1271     pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1272     pa_tagstruct_putu32(t, tag = u->ctag++);
1273     pa_pstream_send_tagstruct(u->pstream, t);
1274     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1275
1276 #ifdef TUNNEL_SINK
1277     t = pa_tagstruct_new(NULL, 0);
1278     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1279     pa_tagstruct_putu32(t, tag = u->ctag++);
1280     pa_tagstruct_putu32(t, u->device_index);
1281     pa_pstream_send_tagstruct(u->pstream, t);
1282     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1283
1284     if (u->sink_name) {
1285         t = pa_tagstruct_new(NULL, 0);
1286         pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1287         pa_tagstruct_putu32(t, tag = u->ctag++);
1288         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1289         pa_tagstruct_puts(t, u->sink_name);
1290         pa_pstream_send_tagstruct(u->pstream, t);
1291         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1292     }
1293 #else
1294     if (u->source_name) {
1295         t = pa_tagstruct_new(NULL, 0);
1296         pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1297         pa_tagstruct_putu32(t, tag = u->ctag++);
1298         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1299         pa_tagstruct_puts(t, u->source_name);
1300         pa_pstream_send_tagstruct(u->pstream, t);
1301         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1302     }
1303 #endif
1304 }
1305
1306 /* Called from main context */
1307 static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1308     struct userdata *u = userdata;
1309     pa_subscription_event_type_t e;
1310     uint32_t idx;
1311
1312     pa_assert(pd);
1313     pa_assert(t);
1314     pa_assert(u);
1315     pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1316
1317     if (pa_tagstruct_getu32(t, &e) < 0 ||
1318         pa_tagstruct_getu32(t, &idx) < 0) {
1319         pa_log("Invalid protocol reply");
1320         pa_module_unload_request(u->module, TRUE);
1321         return;
1322     }
1323
1324     if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1325 #ifdef TUNNEL_SINK
1326         e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1327         e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1328 #else
1329         e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1330 #endif
1331         )
1332         return;
1333
1334     request_info(u);
1335 }
1336
1337 /* Called from main context */
1338 static void start_subscribe(struct userdata *u) {
1339     pa_tagstruct *t;
1340     uint32_t tag;
1341     pa_assert(u);
1342
1343     t = pa_tagstruct_new(NULL, 0);
1344     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1345     pa_tagstruct_putu32(t, tag = u->ctag++);
1346     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1347 #ifdef TUNNEL_SINK
1348                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1349 #else
1350                         PA_SUBSCRIPTION_MASK_SOURCE
1351 #endif
1352                         );
1353
1354     pa_pstream_send_tagstruct(u->pstream, t);
1355 }
1356
1357 /* Called from main context */
1358 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1359     struct userdata *u = userdata;
1360     struct timeval ntv;
1361 #ifdef TUNNEL_SINK
1362     uint32_t bytes;
1363 #endif
1364
1365     pa_assert(pd);
1366     pa_assert(u);
1367     pa_assert(u->pdispatch == pd);
1368
1369     if (command != PA_COMMAND_REPLY) {
1370         if (command == PA_COMMAND_ERROR)
1371             pa_log("Failed to create stream.");
1372         else
1373             pa_log("Protocol error.");
1374         goto fail;
1375     }
1376
1377     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1378         pa_tagstruct_getu32(t, &u->device_index) < 0
1379 #ifdef TUNNEL_SINK
1380         || pa_tagstruct_getu32(t, &bytes) < 0
1381 #endif
1382         )
1383         goto parse_error;
1384
1385     if (u->version >= 9) {
1386 #ifdef TUNNEL_SINK
1387         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1388             pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1389             pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1390             pa_tagstruct_getu32(t, &u->minreq) < 0)
1391             goto parse_error;
1392 #else
1393         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1394             pa_tagstruct_getu32(t, &u->fragsize) < 0)
1395             goto parse_error;
1396 #endif
1397     }
1398
1399     if (u->version >= 12) {
1400         pa_sample_spec ss;
1401         pa_channel_map cm;
1402         uint32_t device_index;
1403         const char *dn;
1404         pa_bool_t suspended;
1405
1406         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1407             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1408             pa_tagstruct_getu32(t, &device_index) < 0 ||
1409             pa_tagstruct_gets(t, &dn) < 0 ||
1410             pa_tagstruct_get_boolean(t, &suspended) < 0)
1411             goto parse_error;
1412
1413 #ifdef TUNNEL_SINK
1414         pa_xfree(u->sink_name);
1415         u->sink_name = pa_xstrdup(dn);
1416 #else
1417         pa_xfree(u->source_name);
1418         u->source_name = pa_xstrdup(dn);
1419 #endif
1420     }
1421
1422     if (u->version >= 13) {
1423         pa_usec_t usec;
1424
1425         if (pa_tagstruct_get_usec(t, &usec) < 0)
1426             goto parse_error;
1427
1428 /* #ifdef TUNNEL_SINK */
1429 /*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1430 /* #else */
1431 /*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1432 /* #endif */
1433     }
1434
1435     if (!pa_tagstruct_eof(t))
1436         goto parse_error;
1437
1438     start_subscribe(u);
1439     request_info(u);
1440
1441     pa_assert(!u->time_event);
1442     pa_gettimeofday(&ntv);
1443     ntv.tv_sec += LATENCY_INTERVAL;
1444     u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1445
1446     request_latency(u);
1447
1448     pa_log_debug("Stream created.");
1449
1450 #ifdef TUNNEL_SINK
1451     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1452 #endif
1453
1454     return;
1455
1456 parse_error:
1457     pa_log("Invalid reply. (Create stream)");
1458
1459 fail:
1460     pa_module_unload_request(u->module, TRUE);
1461
1462 }
1463
1464 /* Called from main context */
1465 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1466     struct userdata *u = userdata;
1467     pa_tagstruct *reply;
1468     char name[256], un[128], hn[128];
1469 #ifdef TUNNEL_SINK
1470     pa_cvolume volume;
1471 #endif
1472
1473     pa_assert(pd);
1474     pa_assert(u);
1475     pa_assert(u->pdispatch == pd);
1476
1477     if (command != PA_COMMAND_REPLY ||
1478         pa_tagstruct_getu32(t, &u->version) < 0 ||
1479         !pa_tagstruct_eof(t)) {
1480
1481         if (command == PA_COMMAND_ERROR)
1482             pa_log("Failed to authenticate");
1483         else
1484             pa_log("Protocol error.");
1485
1486         goto fail;
1487     }
1488
1489     /* Minimum supported protocol version */
1490     if (u->version < 8) {
1491         pa_log("Incompatible protocol version");
1492         goto fail;
1493     }
1494
1495     /* Starting with protocol version 13 the MSB of the version tag
1496     reflects if shm is enabled for this connection or not. We don't
1497     support SHM here at all, so we just ignore this. */
1498
1499     if (u->version >= 13)
1500         u->version &= 0x7FFFFFFFU;
1501
1502     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1503
1504 #ifdef TUNNEL_SINK
1505     pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1506     pa_sink_update_proplist(u->sink, 0, NULL);
1507
1508     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1509                 u->sink_name,
1510                 pa_get_user_name(un, sizeof(un)),
1511                 pa_get_host_name(hn, sizeof(hn)));
1512 #else
1513     pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1514     pa_source_update_proplist(u->source, 0, NULL);
1515
1516     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1517                 u->source_name,
1518                 pa_get_user_name(un, sizeof(un)),
1519                 pa_get_host_name(hn, sizeof(hn)));
1520 #endif
1521
1522     reply = pa_tagstruct_new(NULL, 0);
1523     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1524     pa_tagstruct_putu32(reply, tag = u->ctag++);
1525
1526     if (u->version >= 13) {
1527         pa_proplist *pl;
1528         pl = pa_proplist_new();
1529         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1530         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1531         pa_init_proplist(pl);
1532         pa_tagstruct_put_proplist(reply, pl);
1533         pa_proplist_free(pl);
1534     } else
1535         pa_tagstruct_puts(reply, "PulseAudio");
1536
1537     pa_pstream_send_tagstruct(u->pstream, reply);
1538     /* We ignore the server's reply here */
1539
1540     reply = pa_tagstruct_new(NULL, 0);
1541
1542     if (u->version < 13)
1543         /* Only for older PA versions we need to fill in the maxlength */
1544         u->maxlength = 4*1024*1024;
1545
1546 #ifdef TUNNEL_SINK
1547     u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1548     u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1549     u->prebuf = u->tlength;
1550 #else
1551     u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1552 #endif
1553
1554 #ifdef TUNNEL_SINK
1555     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1556     pa_tagstruct_putu32(reply, tag = u->ctag++);
1557
1558     if (u->version < 13)
1559         pa_tagstruct_puts(reply, name);
1560
1561     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1562     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1563     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1564     pa_tagstruct_puts(reply, u->sink_name);
1565     pa_tagstruct_putu32(reply, u->maxlength);
1566     pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1567     pa_tagstruct_putu32(reply, u->tlength);
1568     pa_tagstruct_putu32(reply, u->prebuf);
1569     pa_tagstruct_putu32(reply, u->minreq);
1570     pa_tagstruct_putu32(reply, 0);
1571     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1572     pa_tagstruct_put_cvolume(reply, &volume);
1573 #else
1574     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1575     pa_tagstruct_putu32(reply, tag = u->ctag++);
1576
1577     if (u->version < 13)
1578         pa_tagstruct_puts(reply, name);
1579
1580     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1581     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1582     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1583     pa_tagstruct_puts(reply, u->source_name);
1584     pa_tagstruct_putu32(reply, u->maxlength);
1585     pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1586     pa_tagstruct_putu32(reply, u->fragsize);
1587 #endif
1588
1589     if (u->version >= 12) {
1590         pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1591         pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1592         pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1593         pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1594         pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1595         pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1596         pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1597     }
1598
1599     if (u->version >= 13) {
1600         pa_proplist *pl;
1601
1602         pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1603         pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1604
1605         pl = pa_proplist_new();
1606         pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1607         pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1608         pa_tagstruct_put_proplist(reply, pl);
1609         pa_proplist_free(pl);
1610
1611 #ifndef TUNNEL_SINK
1612         pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1613 #endif
1614     }
1615
1616     if (u->version >= 14) {
1617 #ifdef TUNNEL_SINK
1618         pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1619 #endif
1620         pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1621     }
1622
1623     if (u->version >= 15) {
1624 #ifdef TUNNEL_SINK
1625         pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1626 #endif
1627         pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1628         pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1629     }
1630
1631     pa_pstream_send_tagstruct(u->pstream, reply);
1632     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1633
1634     pa_log_debug("Connection authenticated, creating stream ...");
1635
1636     return;
1637
1638 fail:
1639     pa_module_unload_request(u->module, TRUE);
1640 }
1641
1642 /* Called from main context */
1643 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1644     struct userdata *u = userdata;
1645
1646     pa_assert(p);
1647     pa_assert(u);
1648
1649     pa_log_warn("Stream died.");
1650     pa_module_unload_request(u->module, TRUE);
1651 }
1652
1653 /* Called from main context */
1654 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1655     struct userdata *u = userdata;
1656
1657     pa_assert(p);
1658     pa_assert(packet);
1659     pa_assert(u);
1660
1661     if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1662         pa_log("Invalid packet");
1663         pa_module_unload_request(u->module, TRUE);
1664         return;
1665     }
1666 }
1667
1668 #ifndef TUNNEL_SINK
1669 /* Called from main context */
1670 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1671     struct userdata *u = userdata;
1672
1673     pa_assert(p);
1674     pa_assert(chunk);
1675     pa_assert(u);
1676
1677     if (channel != u->channel) {
1678         pa_log("Received memory block on bad channel.");
1679         pa_module_unload_request(u->module, TRUE);
1680         return;
1681     }
1682
1683     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1684
1685     u->counter_delta += (int64_t) chunk->length;
1686 }
1687 #endif
1688
1689 /* Called from main context */
1690 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1691     struct userdata *u = userdata;
1692     pa_tagstruct *t;
1693     uint32_t tag;
1694
1695     pa_assert(sc);
1696     pa_assert(u);
1697     pa_assert(u->client == sc);
1698
1699     pa_socket_client_unref(u->client);
1700     u->client = NULL;
1701
1702     if (!io) {
1703         pa_log("Connection failed: %s", pa_cstrerror(errno));
1704         pa_module_unload_request(u->module, TRUE);
1705         return;
1706     }
1707
1708     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1709     u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1710
1711     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1712     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1713 #ifndef TUNNEL_SINK
1714     pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1715 #endif
1716
1717     t = pa_tagstruct_new(NULL, 0);
1718     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1719     pa_tagstruct_putu32(t, tag = u->ctag++);
1720     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1721
1722     pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1723
1724 #ifdef HAVE_CREDS
1725 {
1726     pa_creds ucred;
1727
1728     if (pa_iochannel_creds_supported(io))
1729         pa_iochannel_creds_enable(io);
1730
1731     ucred.uid = getuid();
1732     ucred.gid = getgid();
1733
1734     pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1735 }
1736 #else
1737     pa_pstream_send_tagstruct(u->pstream, t);
1738 #endif
1739
1740     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1741
1742     pa_log_debug("Connection established, authenticating ...");
1743 }
1744
1745 #ifdef TUNNEL_SINK
1746
1747 /* Called from main context */
1748 static void sink_set_volume(pa_sink *sink) {
1749     struct userdata *u;
1750     pa_tagstruct *t;
1751     uint32_t tag;
1752
1753     pa_assert(sink);
1754     u = sink->userdata;
1755     pa_assert(u);
1756
1757     t = pa_tagstruct_new(NULL, 0);
1758     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1759     pa_tagstruct_putu32(t, tag = u->ctag++);
1760     pa_tagstruct_putu32(t, u->device_index);
1761     pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1762     pa_pstream_send_tagstruct(u->pstream, t);
1763 }
1764
1765 /* Called from main context */
1766 static void sink_set_mute(pa_sink *sink) {
1767     struct userdata *u;
1768     pa_tagstruct *t;
1769     uint32_t tag;
1770
1771     pa_assert(sink);
1772     u = sink->userdata;
1773     pa_assert(u);
1774
1775     if (u->version < 11)
1776         return;
1777
1778     t = pa_tagstruct_new(NULL, 0);
1779     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1780     pa_tagstruct_putu32(t, tag = u->ctag++);
1781     pa_tagstruct_putu32(t, u->device_index);
1782     pa_tagstruct_put_boolean(t, !!sink->muted);
1783     pa_pstream_send_tagstruct(u->pstream, t);
1784 }
1785
1786 #endif
1787
1788 int pa__init(pa_module*m) {
1789     pa_modargs *ma = NULL;
1790     struct userdata *u = NULL;
1791     pa_sample_spec ss;
1792     pa_channel_map map;
1793     char *dn = NULL;
1794 #ifdef TUNNEL_SINK
1795     pa_sink_new_data data;
1796 #else
1797     pa_source_new_data data;
1798 #endif
1799
1800     pa_assert(m);
1801
1802     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1803         pa_log("Failed to parse module arguments");
1804         goto fail;
1805     }
1806
1807     m->userdata = u = pa_xnew0(struct userdata, 1);
1808     u->core = m->core;
1809     u->module = m;
1810     u->client = NULL;
1811     u->pdispatch = NULL;
1812     u->pstream = NULL;
1813     u->server_name = NULL;
1814 #ifdef TUNNEL_SINK
1815     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1816     u->sink = NULL;
1817     u->requested_bytes = 0;
1818 #else
1819     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1820     u->source = NULL;
1821 #endif
1822     u->smoother = pa_smoother_new(
1823             PA_USEC_PER_SEC,
1824             PA_USEC_PER_SEC*2,
1825             TRUE,
1826             TRUE,
1827             10,
1828             pa_rtclock_usec(),
1829             FALSE);
1830     u->ctag = 1;
1831     u->device_index = u->channel = PA_INVALID_INDEX;
1832     u->time_event = NULL;
1833     u->ignore_latency_before = 0;
1834     u->transport_usec = u->thread_transport_usec = 0;
1835     u->remote_suspended = u->remote_corked = FALSE;
1836     u->counter = u->counter_delta = 0;
1837
1838     u->rtpoll = pa_rtpoll_new();
1839     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1840
1841     if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1842         goto fail;
1843
1844     if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1845         pa_log("No server specified.");
1846         goto fail;
1847     }
1848
1849     ss = m->core->default_sample_spec;
1850     map = m->core->default_channel_map;
1851     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1852         pa_log("Invalid sample format specification");
1853         goto fail;
1854     }
1855
1856     if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1857         pa_log("Failed to connect to server '%s'", u->server_name);
1858         goto fail;
1859     }
1860
1861     pa_socket_client_set_callback(u->client, on_connection, u);
1862
1863 #ifdef TUNNEL_SINK
1864
1865     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1866         dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1867
1868     pa_sink_new_data_init(&data);
1869     data.driver = __FILE__;
1870     data.module = m;
1871     data.namereg_fail = TRUE;
1872     pa_sink_new_data_set_name(&data, dn);
1873     pa_sink_new_data_set_sample_spec(&data, &ss);
1874     pa_sink_new_data_set_channel_map(&data, &map);
1875     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1876     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1877     if (u->sink_name)
1878         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1879
1880     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1881         pa_log("Invalid properties");
1882         pa_sink_new_data_done(&data);
1883         goto fail;
1884     }
1885
1886     u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1887     pa_sink_new_data_done(&data);
1888
1889     if (!u->sink) {
1890         pa_log("Failed to create sink.");
1891         goto fail;
1892     }
1893
1894     u->sink->parent.process_msg = sink_process_msg;
1895     u->sink->userdata = u;
1896     u->sink->set_state = sink_set_state;
1897     u->sink->set_volume = sink_set_volume;
1898     u->sink->set_mute = sink_set_mute;
1899
1900     u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1901
1902 /*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1903
1904     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1905     pa_sink_set_rtpoll(u->sink, u->rtpoll);
1906
1907 #else
1908
1909     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1910         dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1911
1912     pa_source_new_data_init(&data);
1913     data.driver = __FILE__;
1914     data.module = m;
1915     data.namereg_fail = TRUE;
1916     pa_source_new_data_set_name(&data, dn);
1917     pa_source_new_data_set_sample_spec(&data, &ss);
1918     pa_source_new_data_set_channel_map(&data, &map);
1919     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1920     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1921     if (u->source_name)
1922         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1923
1924     if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1925         pa_log("Invalid properties");
1926         pa_source_new_data_done(&data);
1927         goto fail;
1928     }
1929
1930     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1931     pa_source_new_data_done(&data);
1932
1933     if (!u->source) {
1934         pa_log("Failed to create source.");
1935         goto fail;
1936     }
1937
1938     u->source->parent.process_msg = source_process_msg;
1939     u->source->set_state = source_set_state;
1940     u->source->userdata = u;
1941
1942 /*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1943
1944     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1945     pa_source_set_rtpoll(u->source, u->rtpoll);
1946 #endif
1947
1948     pa_xfree(dn);
1949
1950     u->time_event = NULL;
1951
1952     u->maxlength = (uint32_t) -1;
1953 #ifdef TUNNEL_SINK
1954     u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
1955 #else
1956     u->fragsize = (uint32_t) -1;
1957 #endif
1958
1959     if (!(u->thread = pa_thread_new(thread_func, u))) {
1960         pa_log("Failed to create thread.");
1961         goto fail;
1962     }
1963
1964 #ifdef TUNNEL_SINK
1965     pa_sink_put(u->sink);
1966 #else
1967     pa_source_put(u->source);
1968 #endif
1969
1970     pa_modargs_free(ma);
1971
1972     return 0;
1973
1974 fail:
1975     pa__done(m);
1976
1977     if (ma)
1978         pa_modargs_free(ma);
1979
1980     pa_xfree(dn);
1981
1982     return  -1;
1983 }
1984
1985 void pa__done(pa_module*m) {
1986     struct userdata* u;
1987
1988     pa_assert(m);
1989
1990     if (!(u = m->userdata))
1991         return;
1992
1993 #ifdef TUNNEL_SINK
1994     if (u->sink)
1995         pa_sink_unlink(u->sink);
1996 #else
1997     if (u->source)
1998         pa_source_unlink(u->source);
1999 #endif
2000
2001     if (u->thread) {
2002         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2003         pa_thread_free(u->thread);
2004     }
2005
2006     pa_thread_mq_done(&u->thread_mq);
2007
2008 #ifdef TUNNEL_SINK
2009     if (u->sink)
2010         pa_sink_unref(u->sink);
2011 #else
2012     if (u->source)
2013         pa_source_unref(u->source);
2014 #endif
2015
2016     if (u->rtpoll)
2017         pa_rtpoll_free(u->rtpoll);
2018
2019     if (u->pstream) {
2020         pa_pstream_unlink(u->pstream);
2021         pa_pstream_unref(u->pstream);
2022     }
2023
2024     if (u->pdispatch)
2025         pa_pdispatch_unref(u->pdispatch);
2026
2027     if (u->client)
2028         pa_socket_client_unref(u->client);
2029
2030     if (u->auth_cookie)
2031         pa_auth_cookie_unref(u->auth_cookie);
2032
2033     if (u->smoother)
2034         pa_smoother_free(u->smoother);
2035
2036     if (u->time_event)
2037         u->core->mainloop->time_free(u->time_event);
2038
2039 #ifdef TUNNEL_SINK
2040     pa_xfree(u->sink_name);
2041 #else
2042     pa_xfree(u->source_name);
2043 #endif
2044     pa_xfree(u->server_name);
2045
2046     pa_xfree(u->device_description);
2047     pa_xfree(u->server_fqdn);
2048     pa_xfree(u->user_name);
2049
2050     pa_xfree(u);
2051 }