Remove unnecessary #includes
[profile/ivi/pulseaudio.git] / src / modules / module-tunnel.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67         "sink_name=<name for the local sink> "
68         "sink_properties=<properties for the local sink> "
69         "server=<address> "
70         "sink=<remote sink name> "
71         "cookie=<filename> "
72         "format=<sample format> "
73         "channels=<number of channels> "
74         "rate=<sample rate> "
75         "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79         "source_name=<name for the local source> "
80         "source_properties=<properties for the local source> "
81         "server=<address> "
82         "source=<remote source name> "
83         "cookie=<filename> "
84         "format=<sample format> "
85         "channels=<number of channels> "
86         "rate=<sample rate> "
87         "channel_map=<channel map>");
88 #endif
89
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(FALSE);
93
94 static const char* const valid_modargs[] = {
95     "server",
96     "cookie",
97     "format",
98     "channels",
99     "rate",
100 #ifdef TUNNEL_SINK
101     "sink_name",
102     "sink_properties",
103     "sink",
104 #else
105     "source_name",
106     "source_properties",
107     "source",
108 #endif
109     "channel_map",
110     NULL,
111 };
112
113 #define DEFAULT_TIMEOUT 5
114
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
116
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118
119 #ifdef TUNNEL_SINK
120
121 enum {
122     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123     SINK_MESSAGE_REMOTE_SUSPEND,
124     SINK_MESSAGE_UPDATE_LATENCY,
125     SINK_MESSAGE_POST
126 };
127
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
130
131 #else
132
133 enum {
134     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135     SOURCE_MESSAGE_REMOTE_SUSPEND,
136     SOURCE_MESSAGE_UPDATE_LATENCY
137 };
138
139 #define DEFAULT_FRAGSIZE_MSEC 25
140
141 #endif
142
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157     [PA_COMMAND_REQUEST] = command_request,
158     [PA_COMMAND_STARTED] = command_started,
159 #endif
160     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161     [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162     [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168     [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169     [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170     [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171     [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172     [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173     [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
174 };
175
176 struct userdata {
177     pa_core *core;
178     pa_module *module;
179
180     pa_thread_mq thread_mq;
181     pa_rtpoll *rtpoll;
182     pa_thread *thread;
183
184     pa_socket_client *client;
185     pa_pstream *pstream;
186     pa_pdispatch *pdispatch;
187
188     char *server_name;
189 #ifdef TUNNEL_SINK
190     char *sink_name;
191     pa_sink *sink;
192     size_t requested_bytes;
193 #else
194     char *source_name;
195     pa_source *source;
196     pa_mcalign *mcalign;
197 #endif
198
199     pa_auth_cookie *auth_cookie;
200
201     uint32_t version;
202     uint32_t ctag;
203     uint32_t device_index;
204     uint32_t channel;
205
206     int64_t counter, counter_delta;
207
208     pa_bool_t remote_corked:1;
209     pa_bool_t remote_suspended:1;
210
211     pa_usec_t transport_usec; /* maintained in the main thread */
212     pa_usec_t thread_transport_usec; /* maintained in the IO thread */
213
214     uint32_t ignore_latency_before;
215
216     pa_time_event *time_event;
217
218     pa_smoother *smoother;
219
220     char *device_description;
221     char *server_fqdn;
222     char *user_name;
223
224     uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226     uint32_t tlength;
227     uint32_t minreq;
228     uint32_t prebuf;
229 #else
230     uint32_t fragsize;
231 #endif
232 };
233
234 static void request_latency(struct userdata *u);
235
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238     pa_log_debug("Got stream or client event.");
239 }
240
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
243     struct userdata *u = userdata;
244
245     pa_assert(pd);
246     pa_assert(t);
247     pa_assert(u);
248     pa_assert(u->pdispatch == pd);
249
250     pa_log_warn("Stream killed");
251     pa_module_unload_request(u->module, TRUE);
252 }
253
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
256     struct userdata *u = userdata;
257
258     pa_assert(pd);
259     pa_assert(t);
260     pa_assert(u);
261     pa_assert(u->pdispatch == pd);
262
263     pa_log_info("Server signalled buffer overrun/underrun.");
264     request_latency(u);
265 }
266
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
269     struct userdata *u = userdata;
270     uint32_t channel;
271     pa_bool_t suspended;
272
273     pa_assert(pd);
274     pa_assert(t);
275     pa_assert(u);
276     pa_assert(u->pdispatch == pd);
277
278     if (pa_tagstruct_getu32(t, &channel) < 0 ||
279         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280         !pa_tagstruct_eof(t)) {
281
282         pa_log("Invalid packet.");
283         pa_module_unload_request(u->module, TRUE);
284         return;
285     }
286
287     pa_log_debug("Server reports device suspend.");
288
289 #ifdef TUNNEL_SINK
290     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
294
295     request_latency(u);
296 }
297
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
300     struct userdata *u = userdata;
301     uint32_t channel, di;
302     const char *dn;
303     pa_bool_t suspended;
304
305     pa_assert(pd);
306     pa_assert(t);
307     pa_assert(u);
308     pa_assert(u->pdispatch == pd);
309
310     if (pa_tagstruct_getu32(t, &channel) < 0 ||
311         pa_tagstruct_getu32(t, &di) < 0 ||
312         pa_tagstruct_gets(t, &dn) < 0 ||
313         pa_tagstruct_get_boolean(t, &suspended) < 0) {
314
315         pa_log_error("Invalid packet.");
316         pa_module_unload_request(u->module, TRUE);
317         return;
318     }
319
320     pa_log_debug("Server reports a stream move.");
321
322 #ifdef TUNNEL_SINK
323     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
327
328     request_latency(u);
329 }
330
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332     struct userdata *u = userdata;
333     uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
334     pa_usec_t usec;
335
336     pa_assert(pd);
337     pa_assert(t);
338     pa_assert(u);
339     pa_assert(u->pdispatch == pd);
340
341     if (pa_tagstruct_getu32(t, &channel) < 0 ||
342         pa_tagstruct_getu32(t, &maxlength) < 0) {
343
344         pa_log_error("Invalid packet.");
345         pa_module_unload_request(u->module, TRUE);
346         return;
347     }
348
349     if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350         if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351             pa_tagstruct_get_usec(t, &usec) < 0) {
352
353             pa_log_error("Invalid packet.");
354             pa_module_unload_request(u->module, TRUE);
355             return;
356         }
357     } else {
358         if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359             pa_tagstruct_getu32(t, &prebuf) < 0 ||
360             pa_tagstruct_getu32(t, &minreq) < 0 ||
361             pa_tagstruct_get_usec(t, &usec) < 0) {
362
363             pa_log_error("Invalid packet.");
364             pa_module_unload_request(u->module, TRUE);
365             return;
366         }
367     }
368
369 #ifdef TUNNEL_SINK
370     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
372
373     request_latency(u);
374 }
375
376 #ifdef TUNNEL_SINK
377
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380     struct userdata *u = userdata;
381
382     pa_assert(pd);
383     pa_assert(t);
384     pa_assert(u);
385     pa_assert(u->pdispatch == pd);
386
387     pa_log_debug("Server reports playback started.");
388     request_latency(u);
389 }
390
391 #endif
392
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
395     pa_usec_t x;
396
397     pa_assert(u);
398
399     x = pa_rtclock_now();
400
401     /* Correct by the time the requested issued needs to travel to the
402      * other side.  This is a valid thread-safe access, because the
403      * main thread is waiting for us */
404
405     if (past)
406         x -= u->thread_transport_usec;
407     else
408         x += u->thread_transport_usec;
409
410     if (u->remote_suspended || u->remote_corked)
411         pa_smoother_pause(u->smoother, x);
412     else
413         pa_smoother_resume(u->smoother, x, TRUE);
414 }
415
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
418     pa_assert(u);
419
420     if (u->remote_corked == cork)
421         return;
422
423     u->remote_corked = cork;
424     check_smoother_status(u, FALSE);
425 }
426
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, pa_bool_t cork) {
429     pa_tagstruct *t;
430     pa_assert(u);
431
432     if (!u->pstream)
433         return;
434
435     t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437     pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439     pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441     pa_tagstruct_putu32(t, u->ctag++);
442     pa_tagstruct_putu32(t, u->channel);
443     pa_tagstruct_put_boolean(t, !!cork);
444     pa_pstream_send_tagstruct(u->pstream, t);
445
446     request_latency(u);
447 }
448
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
451     pa_assert(u);
452
453     if (u->remote_suspended == suspend)
454         return;
455
456     u->remote_suspended = suspend;
457     check_smoother_status(u, TRUE);
458 }
459
460 #ifdef TUNNEL_SINK
461
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464     pa_assert(u);
465
466     while (u->requested_bytes > 0) {
467         pa_memchunk memchunk;
468
469         pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471         pa_memblock_unref(memchunk.memblock);
472
473         u->requested_bytes -= memchunk.length;
474
475         u->counter += (int64_t) memchunk.length;
476     }
477 }
478
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481     struct userdata *u = PA_SINK(o)->userdata;
482
483     switch (code) {
484
485         case PA_SINK_MESSAGE_SET_STATE: {
486             int r;
487
488             /* First, change the state, because otherwide pa_sink_render() would fail */
489             if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
490
491                 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
492
493                 if (PA_SINK_IS_OPENED(u->sink->state))
494                     send_data(u);
495             }
496
497             return r;
498         }
499
500         case PA_SINK_MESSAGE_GET_LATENCY: {
501             pa_usec_t yl, yr, *usec = data;
502
503             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
505
506             *usec = yl > yr ? yl - yr : 0;
507             return 0;
508         }
509
510         case SINK_MESSAGE_REQUEST:
511
512             pa_assert(offset > 0);
513             u->requested_bytes += (size_t) offset;
514
515             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516                 send_data(u);
517
518             return 0;
519
520
521         case SINK_MESSAGE_REMOTE_SUSPEND:
522
523             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
524             return 0;
525
526
527         case SINK_MESSAGE_UPDATE_LATENCY: {
528             pa_usec_t y;
529
530             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
531
532             if (y > (pa_usec_t) offset)
533                 y -= (pa_usec_t) offset;
534             else
535                 y = 0;
536
537             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
538
539             /* We can access this freely here, since the main thread is waiting for us */
540             u->thread_transport_usec = u->transport_usec;
541
542             return 0;
543         }
544
545         case SINK_MESSAGE_POST:
546
547             /* OK, This might be a bit confusing. This message is
548              * delivered to us from the main context -- NOT from the
549              * IO thread context where the rest of the messages are
550              * dispatched. Yeah, ugly, but I am a lazy bastard. */
551
552             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
553
554             u->counter_delta += (int64_t) chunk->length;
555
556             return 0;
557     }
558
559     return pa_sink_process_msg(o, code, data, offset, chunk);
560 }
561
562 /* Called from main context */
563 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
564     struct userdata *u;
565     pa_sink_assert_ref(s);
566     u = s->userdata;
567
568     switch ((pa_sink_state_t) state) {
569
570         case PA_SINK_SUSPENDED:
571             pa_assert(PA_SINK_IS_OPENED(s->state));
572             stream_cork(u, TRUE);
573             break;
574
575         case PA_SINK_IDLE:
576         case PA_SINK_RUNNING:
577             if (s->state == PA_SINK_SUSPENDED)
578                 stream_cork(u, FALSE);
579             break;
580
581         case PA_SINK_UNLINKED:
582         case PA_SINK_INIT:
583         case PA_SINK_INVALID_STATE:
584             ;
585     }
586
587     return 0;
588 }
589
590 #else
591
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
594     struct userdata *u = PA_SOURCE(o)->userdata;
595
596     switch (code) {
597
598         case PA_SOURCE_MESSAGE_SET_STATE: {
599             int r;
600
601             if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
602                 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
603
604             return r;
605         }
606
607         case PA_SOURCE_MESSAGE_GET_LATENCY: {
608             pa_usec_t yr, yl, *usec = data;
609
610             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
611             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
612
613             *usec = yr > yl ? yr - yl : 0;
614             return 0;
615         }
616
617         case SOURCE_MESSAGE_POST: {
618             pa_memchunk c;
619
620             pa_mcalign_push(u->mcalign, chunk);
621
622             while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
623
624                 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
625                     pa_source_post(u->source, &c);
626
627                 pa_memblock_unref(c.memblock);
628
629                 u->counter += (int64_t) c.length;
630             }
631
632             return 0;
633         }
634
635         case SOURCE_MESSAGE_REMOTE_SUSPEND:
636
637             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
638             return 0;
639
640         case SOURCE_MESSAGE_UPDATE_LATENCY: {
641             pa_usec_t y;
642
643             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
644             y += (pa_usec_t) offset;
645
646             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
647
648             /* We can access this freely here, since the main thread is waiting for us */
649             u->thread_transport_usec = u->transport_usec;
650
651             return 0;
652         }
653     }
654
655     return pa_source_process_msg(o, code, data, offset, chunk);
656 }
657
658 /* Called from main context */
659 static int source_set_state(pa_source *s, pa_source_state_t state) {
660     struct userdata *u;
661     pa_source_assert_ref(s);
662     u = s->userdata;
663
664     switch ((pa_source_state_t) state) {
665
666         case PA_SOURCE_SUSPENDED:
667             pa_assert(PA_SOURCE_IS_OPENED(s->state));
668             stream_cork(u, TRUE);
669             break;
670
671         case PA_SOURCE_IDLE:
672         case PA_SOURCE_RUNNING:
673             if (s->state == PA_SOURCE_SUSPENDED)
674                 stream_cork(u, FALSE);
675             break;
676
677         case PA_SOURCE_UNLINKED:
678         case PA_SOURCE_INIT:
679         case PA_SINK_INVALID_STATE:
680             ;
681     }
682
683     return 0;
684 }
685
686 #endif
687
688 static void thread_func(void *userdata) {
689     struct userdata *u = userdata;
690
691     pa_assert(u);
692
693     pa_log_debug("Thread starting up");
694
695     pa_thread_mq_install(&u->thread_mq);
696
697     for (;;) {
698         int ret;
699
700 #ifdef TUNNEL_SINK
701         if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
702             if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
703                 pa_sink_process_rewind(u->sink, 0);
704 #endif
705
706         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
707             goto fail;
708
709         if (ret == 0)
710             goto finish;
711     }
712
713 fail:
714     /* If this was no regular exit from the loop we have to continue
715      * processing messages until we received PA_MESSAGE_SHUTDOWN */
716     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
717     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
718
719 finish:
720     pa_log_debug("Thread shutting down");
721 }
722
723 #ifdef TUNNEL_SINK
724 /* Called from main context */
725 static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
726     struct userdata *u = userdata;
727     uint32_t bytes, channel;
728
729     pa_assert(pd);
730     pa_assert(command == PA_COMMAND_REQUEST);
731     pa_assert(t);
732     pa_assert(u);
733     pa_assert(u->pdispatch == pd);
734
735     if (pa_tagstruct_getu32(t, &channel) < 0 ||
736         pa_tagstruct_getu32(t, &bytes) < 0) {
737         pa_log("Invalid protocol reply");
738         goto fail;
739     }
740
741     if (channel != u->channel) {
742         pa_log("Received data for invalid channel");
743         goto fail;
744     }
745
746     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
747     return;
748
749 fail:
750     pa_module_unload_request(u->module, TRUE);
751 }
752
753 #endif
754
755 /* Called from main context */
756 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
757     struct userdata *u = userdata;
758     pa_usec_t sink_usec, source_usec;
759     pa_bool_t playing;
760     int64_t write_index, read_index;
761     struct timeval local, remote, now;
762     pa_sample_spec *ss;
763     int64_t delay;
764
765     pa_assert(pd);
766     pa_assert(u);
767
768     if (command != PA_COMMAND_REPLY) {
769         if (command == PA_COMMAND_ERROR)
770             pa_log("Failed to get latency.");
771         else
772             pa_log("Protocol error.");
773         goto fail;
774     }
775
776     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
777         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
778         pa_tagstruct_get_boolean(t, &playing) < 0 ||
779         pa_tagstruct_get_timeval(t, &local) < 0 ||
780         pa_tagstruct_get_timeval(t, &remote) < 0 ||
781         pa_tagstruct_gets64(t, &write_index) < 0 ||
782         pa_tagstruct_gets64(t, &read_index) < 0) {
783         pa_log("Invalid reply.");
784         goto fail;
785     }
786
787 #ifdef TUNNEL_SINK
788     if (u->version >= 13) {
789         uint64_t underrun_for = 0, playing_for = 0;
790
791         if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
792             pa_tagstruct_getu64(t, &playing_for) < 0) {
793             pa_log("Invalid reply.");
794             goto fail;
795         }
796     }
797 #endif
798
799     if (!pa_tagstruct_eof(t)) {
800         pa_log("Invalid reply.");
801         goto fail;
802     }
803
804     if (tag < u->ignore_latency_before) {
805         return;
806     }
807
808     pa_gettimeofday(&now);
809
810     /* Calculate transport usec */
811     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
812         /* local and remote seem to have synchronized clocks */
813 #ifdef TUNNEL_SINK
814         u->transport_usec = pa_timeval_diff(&remote, &local);
815 #else
816         u->transport_usec = pa_timeval_diff(&now, &remote);
817 #endif
818     } else
819         u->transport_usec = pa_timeval_diff(&now, &local)/2;
820
821     /* First, take the device's delay */
822 #ifdef TUNNEL_SINK
823     delay = (int64_t) sink_usec;
824     ss = &u->sink->sample_spec;
825 #else
826     delay = (int64_t) source_usec;
827     ss = &u->source->sample_spec;
828 #endif
829
830     /* Add the length of our server-side buffer */
831     if (write_index >= read_index)
832         delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
833     else
834         delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
835
836     /* Our measurements are already out of date, hence correct by the     *
837      * transport latency */
838 #ifdef TUNNEL_SINK
839     delay -= (int64_t) u->transport_usec;
840 #else
841     delay += (int64_t) u->transport_usec;
842 #endif
843
844     /* Now correct by what we have have read/written since we requested the update */
845 #ifdef TUNNEL_SINK
846     delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
847 #else
848     delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
849 #endif
850
851 #ifdef TUNNEL_SINK
852     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
853 #else
854     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
855 #endif
856
857     return;
858
859 fail:
860
861     pa_module_unload_request(u->module, TRUE);
862 }
863
864 /* Called from main context */
865 static void request_latency(struct userdata *u) {
866     pa_tagstruct *t;
867     struct timeval now;
868     uint32_t tag;
869     pa_assert(u);
870
871     t = pa_tagstruct_new(NULL, 0);
872 #ifdef TUNNEL_SINK
873     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
874 #else
875     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
876 #endif
877     pa_tagstruct_putu32(t, tag = u->ctag++);
878     pa_tagstruct_putu32(t, u->channel);
879
880     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
881
882     pa_pstream_send_tagstruct(u->pstream, t);
883     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
884
885     u->ignore_latency_before = tag;
886     u->counter_delta = 0;
887 }
888
889 /* Called from main context */
890 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
891     struct userdata *u = userdata;
892
893     pa_assert(m);
894     pa_assert(e);
895     pa_assert(u);
896
897     request_latency(u);
898
899     pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
900 }
901
902 /* Called from main context */
903 static void update_description(struct userdata *u) {
904     char *d;
905     char un[128], hn[128];
906     pa_tagstruct *t;
907
908     pa_assert(u);
909
910     if (!u->server_fqdn || !u->user_name || !u->device_description)
911         return;
912
913     d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
914
915 #ifdef TUNNEL_SINK
916     pa_sink_set_description(u->sink, d);
917     pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
918     pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
919     pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
920 #else
921     pa_source_set_description(u->source, d);
922     pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
923     pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
924     pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
925 #endif
926
927     pa_xfree(d);
928
929     d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
930                           pa_get_user_name(un, sizeof(un)),
931                           pa_get_host_name(hn, sizeof(hn)));
932
933     t = pa_tagstruct_new(NULL, 0);
934 #ifdef TUNNEL_SINK
935     pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
936 #else
937     pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
938 #endif
939     pa_tagstruct_putu32(t, u->ctag++);
940     pa_tagstruct_putu32(t, u->channel);
941     pa_tagstruct_puts(t, d);
942     pa_pstream_send_tagstruct(u->pstream, t);
943
944     pa_xfree(d);
945 }
946
947 /* Called from main context */
948 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
949     struct userdata *u = userdata;
950     pa_sample_spec ss;
951     pa_channel_map cm;
952     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
953     uint32_t cookie;
954
955     pa_assert(pd);
956     pa_assert(u);
957
958     if (command != PA_COMMAND_REPLY) {
959         if (command == PA_COMMAND_ERROR)
960             pa_log("Failed to get info.");
961         else
962             pa_log("Protocol error.");
963         goto fail;
964     }
965
966     if (pa_tagstruct_gets(t, &server_name) < 0 ||
967         pa_tagstruct_gets(t, &server_version) < 0 ||
968         pa_tagstruct_gets(t, &user_name) < 0 ||
969         pa_tagstruct_gets(t, &host_name) < 0 ||
970         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
971         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
972         pa_tagstruct_gets(t, &default_source_name) < 0 ||
973         pa_tagstruct_getu32(t, &cookie) < 0 ||
974         (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
975
976         pa_log("Parse failure");
977         goto fail;
978     }
979
980     if (!pa_tagstruct_eof(t)) {
981         pa_log("Packet too long");
982         goto fail;
983     }
984
985     pa_xfree(u->server_fqdn);
986     u->server_fqdn = pa_xstrdup(host_name);
987
988     pa_xfree(u->user_name);
989     u->user_name = pa_xstrdup(user_name);
990
991     update_description(u);
992
993     return;
994
995 fail:
996     pa_module_unload_request(u->module, TRUE);
997 }
998
999 #ifdef TUNNEL_SINK
1000
1001 /* Called from main context */
1002 static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1003     struct userdata *u = userdata;
1004     uint32_t idx, owner_module, monitor_source, flags;
1005     const char *name, *description, *monitor_source_name, *driver;
1006     pa_sample_spec ss;
1007     pa_channel_map cm;
1008     pa_cvolume volume;
1009     pa_bool_t mute;
1010     pa_usec_t latency;
1011     pa_proplist *pl;
1012
1013     pa_assert(pd);
1014     pa_assert(u);
1015
1016     pl = pa_proplist_new();
1017
1018     if (command != PA_COMMAND_REPLY) {
1019         if (command == PA_COMMAND_ERROR)
1020             pa_log("Failed to get info.");
1021         else
1022             pa_log("Protocol error.");
1023         goto fail;
1024     }
1025
1026     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1027         pa_tagstruct_gets(t, &name) < 0 ||
1028         pa_tagstruct_gets(t, &description) < 0 ||
1029         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1030         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1031         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1032         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1033         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1034         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1035         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1036         pa_tagstruct_get_usec(t, &latency) < 0 ||
1037         pa_tagstruct_gets(t, &driver) < 0 ||
1038         pa_tagstruct_getu32(t, &flags) < 0) {
1039
1040         pa_log("Parse failure");
1041         goto fail;
1042     }
1043
1044     if (u->version >= 13) {
1045         pa_usec_t configured_latency;
1046
1047         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1048             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1049
1050             pa_log("Parse failure");
1051             goto fail;
1052         }
1053     }
1054
1055     if (u->version >= 15) {
1056         pa_volume_t base_volume;
1057         uint32_t state, n_volume_steps, card;
1058
1059         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1060             pa_tagstruct_getu32(t, &state) < 0 ||
1061             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1062             pa_tagstruct_getu32(t, &card) < 0) {
1063
1064             pa_log("Parse failure");
1065             goto fail;
1066         }
1067     }
1068
1069     if (u->version >= 16) {
1070         uint32_t n_ports;
1071         const char *s;
1072
1073         if (pa_tagstruct_getu32(t, &n_ports)) {
1074             pa_log("Parse failure");
1075             goto fail;
1076         }
1077
1078         for (uint32_t j = 0; j < n_ports; j++) {
1079             uint32_t priority;
1080
1081             if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1082                 pa_tagstruct_gets(t, &s) < 0 || /* description */
1083                 pa_tagstruct_getu32(t, &priority) < 0) {
1084
1085                 pa_log("Parse failure");
1086                 goto fail;
1087             }
1088         }
1089
1090         if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1091             pa_log("Parse failure");
1092             goto fail;
1093         }
1094     }
1095
1096     if (u->version >= 21) {
1097         uint8_t n_formats;
1098         pa_format_info format;
1099
1100         if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1101             pa_log("Parse failure");
1102             goto fail;
1103         }
1104
1105         for (uint8_t j = 0; j < n_formats; j++) {
1106             if (pa_tagstruct_get_format_info(t, &format)) { /* format info */
1107                 pa_log("Parse failure");
1108                 goto fail;
1109             }
1110         }
1111     }
1112
1113     if (!pa_tagstruct_eof(t)) {
1114         pa_log("Packet too long");
1115         goto fail;
1116     }
1117
1118     pa_proplist_free(pl);
1119
1120     if (!u->sink_name || strcmp(name, u->sink_name))
1121         return;
1122
1123     pa_xfree(u->device_description);
1124     u->device_description = pa_xstrdup(description);
1125
1126     update_description(u);
1127
1128     return;
1129
1130 fail:
1131     pa_module_unload_request(u->module, TRUE);
1132     pa_proplist_free(pl);
1133 }
1134
1135 /* Called from main context */
1136 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1137     struct userdata *u = userdata;
1138     uint32_t idx, owner_module, client, sink;
1139     pa_usec_t buffer_usec, sink_usec;
1140     const char *name, *driver, *resample_method;
1141     pa_bool_t mute = FALSE;
1142     pa_sample_spec sample_spec;
1143     pa_channel_map channel_map;
1144     pa_cvolume volume;
1145     pa_proplist *pl;
1146     pa_bool_t b;
1147
1148     pa_assert(pd);
1149     pa_assert(u);
1150
1151     pl = pa_proplist_new();
1152
1153     if (command != PA_COMMAND_REPLY) {
1154         if (command == PA_COMMAND_ERROR)
1155             pa_log("Failed to get info.");
1156         else
1157             pa_log("Protocol error.");
1158         goto fail;
1159     }
1160
1161     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1162         pa_tagstruct_gets(t, &name) < 0 ||
1163         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1164         pa_tagstruct_getu32(t, &client) < 0 ||
1165         pa_tagstruct_getu32(t, &sink) < 0 ||
1166         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1167         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1168         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1169         pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1170         pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1171         pa_tagstruct_gets(t, &resample_method) < 0 ||
1172         pa_tagstruct_gets(t, &driver) < 0) {
1173
1174         pa_log("Parse failure");
1175         goto fail;
1176     }
1177
1178     if (u->version >= 11) {
1179         if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1180
1181             pa_log("Parse failure");
1182             goto fail;
1183         }
1184     }
1185
1186     if (u->version >= 13) {
1187         if (pa_tagstruct_get_proplist(t, pl) < 0) {
1188
1189             pa_log("Parse failure");
1190             goto fail;
1191         }
1192     }
1193
1194     if (u->version >= 19) {
1195         if (pa_tagstruct_get_boolean(t, &b) < 0) {
1196
1197             pa_log("Parse failure");
1198             goto fail;
1199         }
1200     }
1201
1202     if (u->version >= 20) {
1203         if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1204             pa_tagstruct_get_boolean(t, &b) < 0) {
1205
1206             pa_log("Parse failure");
1207             goto fail;
1208         }
1209     }
1210
1211     if (u->version >= 21) {
1212         pa_format_info format;
1213
1214         if (pa_tagstruct_get_format_info(t, &format) < 0) {
1215
1216             pa_log("Parse failure");
1217             goto fail;
1218         }
1219     }
1220
1221     if (!pa_tagstruct_eof(t)) {
1222         pa_log("Packet too long");
1223         goto fail;
1224     }
1225
1226     pa_proplist_free(pl);
1227
1228     if (idx != u->device_index)
1229         return;
1230
1231     pa_assert(u->sink);
1232
1233     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1234         pa_cvolume_equal(&volume, &u->sink->real_volume))
1235         return;
1236
1237     pa_sink_volume_changed(u->sink, &volume);
1238
1239     if (u->version >= 11)
1240         pa_sink_mute_changed(u->sink, mute);
1241
1242     return;
1243
1244 fail:
1245     pa_module_unload_request(u->module, TRUE);
1246     pa_proplist_free(pl);
1247 }
1248
1249 #else
1250
1251 /* Called from main context */
1252 static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1253     struct userdata *u = userdata;
1254     uint32_t idx, owner_module, monitor_of_sink, flags;
1255     const char *name, *description, *monitor_of_sink_name, *driver;
1256     pa_sample_spec ss;
1257     pa_channel_map cm;
1258     pa_cvolume volume;
1259     pa_bool_t mute;
1260     pa_usec_t latency, configured_latency;
1261     pa_proplist *pl;
1262
1263     pa_assert(pd);
1264     pa_assert(u);
1265
1266     pl = pa_proplist_new();
1267
1268     if (command != PA_COMMAND_REPLY) {
1269         if (command == PA_COMMAND_ERROR)
1270             pa_log("Failed to get info.");
1271         else
1272             pa_log("Protocol error.");
1273         goto fail;
1274     }
1275
1276     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1277         pa_tagstruct_gets(t, &name) < 0 ||
1278         pa_tagstruct_gets(t, &description) < 0 ||
1279         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1280         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1281         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1282         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1283         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1284         pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1285         pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1286         pa_tagstruct_get_usec(t, &latency) < 0 ||
1287         pa_tagstruct_gets(t, &driver) < 0 ||
1288         pa_tagstruct_getu32(t, &flags) < 0) {
1289
1290         pa_log("Parse failure");
1291         goto fail;
1292     }
1293
1294     if (u->version >= 13) {
1295         if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1296             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1297
1298             pa_log("Parse failure");
1299             goto fail;
1300         }
1301     }
1302
1303     if (u->version >= 15) {
1304         pa_volume_t base_volume;
1305         uint32_t state, n_volume_steps, card;
1306
1307         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1308             pa_tagstruct_getu32(t, &state) < 0 ||
1309             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1310             pa_tagstruct_getu32(t, &card) < 0) {
1311
1312             pa_log("Parse failure");
1313             goto fail;
1314         }
1315     }
1316
1317     if (u->version >= 16) {
1318         uint32_t n_ports;
1319         const char *s;
1320
1321         if (pa_tagstruct_getu32(t, &n_ports)) {
1322             pa_log("Parse failure");
1323             goto fail;
1324         }
1325
1326         for (uint32_t j = 0; j < n_ports; j++) {
1327             uint32_t priority;
1328
1329             if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1330                 pa_tagstruct_gets(t, &s) < 0 || /* description */
1331                 pa_tagstruct_getu32(t, &priority) < 0) {
1332
1333                 pa_log("Parse failure");
1334                 goto fail;
1335             }
1336         }
1337
1338         if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1339             pa_log("Parse failure");
1340             goto fail;
1341         }
1342     }
1343
1344     if (!pa_tagstruct_eof(t)) {
1345         pa_log("Packet too long");
1346         goto fail;
1347     }
1348
1349     pa_proplist_free(pl);
1350
1351     if (!u->source_name || strcmp(name, u->source_name))
1352         return;
1353
1354     pa_xfree(u->device_description);
1355     u->device_description = pa_xstrdup(description);
1356
1357     update_description(u);
1358
1359     return;
1360
1361 fail:
1362     pa_module_unload_request(u->module, TRUE);
1363     pa_proplist_free(pl);
1364 }
1365
1366 #endif
1367
1368 /* Called from main context */
1369 static void request_info(struct userdata *u) {
1370     pa_tagstruct *t;
1371     uint32_t tag;
1372     pa_assert(u);
1373
1374     t = pa_tagstruct_new(NULL, 0);
1375     pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1376     pa_tagstruct_putu32(t, tag = u->ctag++);
1377     pa_pstream_send_tagstruct(u->pstream, t);
1378     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1379
1380 #ifdef TUNNEL_SINK
1381     t = pa_tagstruct_new(NULL, 0);
1382     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1383     pa_tagstruct_putu32(t, tag = u->ctag++);
1384     pa_tagstruct_putu32(t, u->device_index);
1385     pa_pstream_send_tagstruct(u->pstream, t);
1386     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1387
1388     if (u->sink_name) {
1389         t = pa_tagstruct_new(NULL, 0);
1390         pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1391         pa_tagstruct_putu32(t, tag = u->ctag++);
1392         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1393         pa_tagstruct_puts(t, u->sink_name);
1394         pa_pstream_send_tagstruct(u->pstream, t);
1395         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1396     }
1397 #else
1398     if (u->source_name) {
1399         t = pa_tagstruct_new(NULL, 0);
1400         pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1401         pa_tagstruct_putu32(t, tag = u->ctag++);
1402         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1403         pa_tagstruct_puts(t, u->source_name);
1404         pa_pstream_send_tagstruct(u->pstream, t);
1405         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1406     }
1407 #endif
1408 }
1409
1410 /* Called from main context */
1411 static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1412     struct userdata *u = userdata;
1413     pa_subscription_event_type_t e;
1414     uint32_t idx;
1415
1416     pa_assert(pd);
1417     pa_assert(t);
1418     pa_assert(u);
1419     pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1420
1421     if (pa_tagstruct_getu32(t, &e) < 0 ||
1422         pa_tagstruct_getu32(t, &idx) < 0) {
1423         pa_log("Invalid protocol reply");
1424         pa_module_unload_request(u->module, TRUE);
1425         return;
1426     }
1427
1428     if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1429 #ifdef TUNNEL_SINK
1430         e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1431         e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1432 #else
1433         e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1434 #endif
1435         )
1436         return;
1437
1438     request_info(u);
1439 }
1440
1441 /* Called from main context */
1442 static void start_subscribe(struct userdata *u) {
1443     pa_tagstruct *t;
1444     pa_assert(u);
1445
1446     t = pa_tagstruct_new(NULL, 0);
1447     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1448     pa_tagstruct_putu32(t, u->ctag++);
1449     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1450 #ifdef TUNNEL_SINK
1451                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1452 #else
1453                         PA_SUBSCRIPTION_MASK_SOURCE
1454 #endif
1455                         );
1456
1457     pa_pstream_send_tagstruct(u->pstream, t);
1458 }
1459
1460 /* Called from main context */
1461 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1462     struct userdata *u = userdata;
1463 #ifdef TUNNEL_SINK
1464     uint32_t bytes;
1465 #endif
1466
1467     pa_assert(pd);
1468     pa_assert(u);
1469     pa_assert(u->pdispatch == pd);
1470
1471     if (command != PA_COMMAND_REPLY) {
1472         if (command == PA_COMMAND_ERROR)
1473             pa_log("Failed to create stream.");
1474         else
1475             pa_log("Protocol error.");
1476         goto fail;
1477     }
1478
1479     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1480         pa_tagstruct_getu32(t, &u->device_index) < 0
1481 #ifdef TUNNEL_SINK
1482         || pa_tagstruct_getu32(t, &bytes) < 0
1483 #endif
1484         )
1485         goto parse_error;
1486
1487     if (u->version >= 9) {
1488 #ifdef TUNNEL_SINK
1489         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1490             pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1491             pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1492             pa_tagstruct_getu32(t, &u->minreq) < 0)
1493             goto parse_error;
1494 #else
1495         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1496             pa_tagstruct_getu32(t, &u->fragsize) < 0)
1497             goto parse_error;
1498 #endif
1499     }
1500
1501     if (u->version >= 12) {
1502         pa_sample_spec ss;
1503         pa_channel_map cm;
1504         uint32_t device_index;
1505         const char *dn;
1506         pa_bool_t suspended;
1507
1508         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1509             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1510             pa_tagstruct_getu32(t, &device_index) < 0 ||
1511             pa_tagstruct_gets(t, &dn) < 0 ||
1512             pa_tagstruct_get_boolean(t, &suspended) < 0)
1513             goto parse_error;
1514
1515 #ifdef TUNNEL_SINK
1516         pa_xfree(u->sink_name);
1517         u->sink_name = pa_xstrdup(dn);
1518 #else
1519         pa_xfree(u->source_name);
1520         u->source_name = pa_xstrdup(dn);
1521 #endif
1522     }
1523
1524     if (u->version >= 13) {
1525         pa_usec_t usec;
1526
1527         if (pa_tagstruct_get_usec(t, &usec) < 0)
1528             goto parse_error;
1529
1530 /* #ifdef TUNNEL_SINK */
1531 /*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1532 /* #else */
1533 /*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1534 /* #endif */
1535     }
1536
1537     if (u->version >= 21) {
1538         pa_format_info format;
1539
1540         if (pa_tagstruct_get_format_info(t, &format) < 0)
1541             goto parse_error;
1542     }
1543
1544     if (!pa_tagstruct_eof(t))
1545         goto parse_error;
1546
1547     start_subscribe(u);
1548     request_info(u);
1549
1550     pa_assert(!u->time_event);
1551     u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1552
1553     request_latency(u);
1554
1555     pa_log_debug("Stream created.");
1556
1557 #ifdef TUNNEL_SINK
1558     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1559 #endif
1560
1561     return;
1562
1563 parse_error:
1564     pa_log("Invalid reply. (Create stream)");
1565
1566 fail:
1567     pa_module_unload_request(u->module, TRUE);
1568
1569 }
1570
1571 /* Called from main context */
1572 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1573     struct userdata *u = userdata;
1574     pa_tagstruct *reply;
1575     char name[256], un[128], hn[128];
1576 #ifdef TUNNEL_SINK
1577     pa_cvolume volume;
1578 #endif
1579
1580     pa_assert(pd);
1581     pa_assert(u);
1582     pa_assert(u->pdispatch == pd);
1583
1584     if (command != PA_COMMAND_REPLY ||
1585         pa_tagstruct_getu32(t, &u->version) < 0 ||
1586         !pa_tagstruct_eof(t)) {
1587
1588         if (command == PA_COMMAND_ERROR)
1589             pa_log("Failed to authenticate");
1590         else
1591             pa_log("Protocol error.");
1592
1593         goto fail;
1594     }
1595
1596     /* Minimum supported protocol version */
1597     if (u->version < 8) {
1598         pa_log("Incompatible protocol version");
1599         goto fail;
1600     }
1601
1602     /* Starting with protocol version 13 the MSB of the version tag
1603     reflects if shm is enabled for this connection or not. We don't
1604     support SHM here at all, so we just ignore this. */
1605
1606     if (u->version >= 13)
1607         u->version &= 0x7FFFFFFFU;
1608
1609     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1610
1611 #ifdef TUNNEL_SINK
1612     pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1613     pa_sink_update_proplist(u->sink, 0, NULL);
1614
1615     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1616                 u->sink_name,
1617                 pa_get_user_name(un, sizeof(un)),
1618                 pa_get_host_name(hn, sizeof(hn)));
1619 #else
1620     pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1621     pa_source_update_proplist(u->source, 0, NULL);
1622
1623     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1624                 u->source_name,
1625                 pa_get_user_name(un, sizeof(un)),
1626                 pa_get_host_name(hn, sizeof(hn)));
1627 #endif
1628
1629     reply = pa_tagstruct_new(NULL, 0);
1630     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1631     pa_tagstruct_putu32(reply, u->ctag++);
1632
1633     if (u->version >= 13) {
1634         pa_proplist *pl;
1635         pl = pa_proplist_new();
1636         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1637         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1638         pa_init_proplist(pl);
1639         pa_tagstruct_put_proplist(reply, pl);
1640         pa_proplist_free(pl);
1641     } else
1642         pa_tagstruct_puts(reply, "PulseAudio");
1643
1644     pa_pstream_send_tagstruct(u->pstream, reply);
1645     /* We ignore the server's reply here */
1646
1647     reply = pa_tagstruct_new(NULL, 0);
1648
1649     if (u->version < 13)
1650         /* Only for older PA versions we need to fill in the maxlength */
1651         u->maxlength = 4*1024*1024;
1652
1653 #ifdef TUNNEL_SINK
1654     u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1655     u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1656     u->prebuf = u->tlength;
1657 #else
1658     u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1659 #endif
1660
1661 #ifdef TUNNEL_SINK
1662     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1663     pa_tagstruct_putu32(reply, tag = u->ctag++);
1664
1665     if (u->version < 13)
1666         pa_tagstruct_puts(reply, name);
1667
1668     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1669     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1670     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1671     pa_tagstruct_puts(reply, u->sink_name);
1672     pa_tagstruct_putu32(reply, u->maxlength);
1673     pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1674     pa_tagstruct_putu32(reply, u->tlength);
1675     pa_tagstruct_putu32(reply, u->prebuf);
1676     pa_tagstruct_putu32(reply, u->minreq);
1677     pa_tagstruct_putu32(reply, 0);
1678     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1679     pa_tagstruct_put_cvolume(reply, &volume);
1680 #else
1681     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1682     pa_tagstruct_putu32(reply, tag = u->ctag++);
1683
1684     if (u->version < 13)
1685         pa_tagstruct_puts(reply, name);
1686
1687     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1688     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1689     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1690     pa_tagstruct_puts(reply, u->source_name);
1691     pa_tagstruct_putu32(reply, u->maxlength);
1692     pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1693     pa_tagstruct_putu32(reply, u->fragsize);
1694 #endif
1695
1696     if (u->version >= 12) {
1697         pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1698         pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1699         pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1700         pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1701         pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1702         pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1703         pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1704     }
1705
1706     if (u->version >= 13) {
1707         pa_proplist *pl;
1708
1709         pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1710         pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1711
1712         pl = pa_proplist_new();
1713         pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1714         pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1715         pa_tagstruct_put_proplist(reply, pl);
1716         pa_proplist_free(pl);
1717
1718 #ifndef TUNNEL_SINK
1719         pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1720 #endif
1721     }
1722
1723     if (u->version >= 14) {
1724 #ifdef TUNNEL_SINK
1725         pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1726 #endif
1727         pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1728     }
1729
1730     if (u->version >= 15) {
1731 #ifdef TUNNEL_SINK
1732         pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1733 #endif
1734         pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1735         pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1736     }
1737
1738 #ifdef TUNNEL_SINK
1739     if (u->version >= 17)
1740         pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1741
1742     if (u->version >= 18)
1743         pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1744 #endif
1745
1746 #ifdef TUNNEL_SINK
1747     if (u->version >= 21) {
1748         /* We're not using the extended API, so n_formats = 0 and that's that */
1749         pa_tagstruct_putu8(t, 0);
1750     }
1751 #endif
1752
1753     pa_pstream_send_tagstruct(u->pstream, reply);
1754     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1755
1756     pa_log_debug("Connection authenticated, creating stream ...");
1757
1758     return;
1759
1760 fail:
1761     pa_module_unload_request(u->module, TRUE);
1762 }
1763
1764 /* Called from main context */
1765 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1766     struct userdata *u = userdata;
1767
1768     pa_assert(p);
1769     pa_assert(u);
1770
1771     pa_log_warn("Stream died.");
1772     pa_module_unload_request(u->module, TRUE);
1773 }
1774
1775 /* Called from main context */
1776 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1777     struct userdata *u = userdata;
1778
1779     pa_assert(p);
1780     pa_assert(packet);
1781     pa_assert(u);
1782
1783     if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1784         pa_log("Invalid packet");
1785         pa_module_unload_request(u->module, TRUE);
1786         return;
1787     }
1788 }
1789
1790 #ifndef TUNNEL_SINK
1791 /* Called from main context */
1792 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1793     struct userdata *u = userdata;
1794
1795     pa_assert(p);
1796     pa_assert(chunk);
1797     pa_assert(u);
1798
1799     if (channel != u->channel) {
1800         pa_log("Received memory block on bad channel.");
1801         pa_module_unload_request(u->module, TRUE);
1802         return;
1803     }
1804
1805     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1806
1807     u->counter_delta += (int64_t) chunk->length;
1808 }
1809 #endif
1810
1811 /* Called from main context */
1812 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1813     struct userdata *u = userdata;
1814     pa_tagstruct *t;
1815     uint32_t tag;
1816
1817     pa_assert(sc);
1818     pa_assert(u);
1819     pa_assert(u->client == sc);
1820
1821     pa_socket_client_unref(u->client);
1822     u->client = NULL;
1823
1824     if (!io) {
1825         pa_log("Connection failed: %s", pa_cstrerror(errno));
1826         pa_module_unload_request(u->module, TRUE);
1827         return;
1828     }
1829
1830     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1831     u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1832
1833     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1834     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1835 #ifndef TUNNEL_SINK
1836     pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1837 #endif
1838
1839     t = pa_tagstruct_new(NULL, 0);
1840     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1841     pa_tagstruct_putu32(t, tag = u->ctag++);
1842     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1843
1844     pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1845
1846 #ifdef HAVE_CREDS
1847 {
1848     pa_creds ucred;
1849
1850     if (pa_iochannel_creds_supported(io))
1851         pa_iochannel_creds_enable(io);
1852
1853     ucred.uid = getuid();
1854     ucred.gid = getgid();
1855
1856     pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1857 }
1858 #else
1859     pa_pstream_send_tagstruct(u->pstream, t);
1860 #endif
1861
1862     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1863
1864     pa_log_debug("Connection established, authenticating ...");
1865 }
1866
1867 #ifdef TUNNEL_SINK
1868
1869 /* Called from main context */
1870 static void sink_set_volume(pa_sink *sink) {
1871     struct userdata *u;
1872     pa_tagstruct *t;
1873
1874     pa_assert(sink);
1875     u = sink->userdata;
1876     pa_assert(u);
1877
1878     t = pa_tagstruct_new(NULL, 0);
1879     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1880     pa_tagstruct_putu32(t, u->ctag++);
1881     pa_tagstruct_putu32(t, u->device_index);
1882     pa_tagstruct_put_cvolume(t, &sink->real_volume);
1883     pa_pstream_send_tagstruct(u->pstream, t);
1884 }
1885
1886 /* Called from main context */
1887 static void sink_set_mute(pa_sink *sink) {
1888     struct userdata *u;
1889     pa_tagstruct *t;
1890
1891     pa_assert(sink);
1892     u = sink->userdata;
1893     pa_assert(u);
1894
1895     if (u->version < 11)
1896         return;
1897
1898     t = pa_tagstruct_new(NULL, 0);
1899     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1900     pa_tagstruct_putu32(t, u->ctag++);
1901     pa_tagstruct_putu32(t, u->device_index);
1902     pa_tagstruct_put_boolean(t, !!sink->muted);
1903     pa_pstream_send_tagstruct(u->pstream, t);
1904 }
1905
1906 #endif
1907
1908 int pa__init(pa_module*m) {
1909     pa_modargs *ma = NULL;
1910     struct userdata *u = NULL;
1911     pa_sample_spec ss;
1912     pa_channel_map map;
1913     char *dn = NULL;
1914 #ifdef TUNNEL_SINK
1915     pa_sink_new_data data;
1916 #else
1917     pa_source_new_data data;
1918 #endif
1919
1920     pa_assert(m);
1921
1922     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1923         pa_log("Failed to parse module arguments");
1924         goto fail;
1925     }
1926
1927     m->userdata = u = pa_xnew0(struct userdata, 1);
1928     u->core = m->core;
1929     u->module = m;
1930     u->client = NULL;
1931     u->pdispatch = NULL;
1932     u->pstream = NULL;
1933     u->server_name = NULL;
1934 #ifdef TUNNEL_SINK
1935     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1936     u->sink = NULL;
1937     u->requested_bytes = 0;
1938 #else
1939     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1940     u->source = NULL;
1941 #endif
1942     u->smoother = pa_smoother_new(
1943             PA_USEC_PER_SEC,
1944             PA_USEC_PER_SEC*2,
1945             TRUE,
1946             TRUE,
1947             10,
1948             pa_rtclock_now(),
1949             FALSE);
1950     u->ctag = 1;
1951     u->device_index = u->channel = PA_INVALID_INDEX;
1952     u->time_event = NULL;
1953     u->ignore_latency_before = 0;
1954     u->transport_usec = u->thread_transport_usec = 0;
1955     u->remote_suspended = u->remote_corked = FALSE;
1956     u->counter = u->counter_delta = 0;
1957
1958     u->rtpoll = pa_rtpoll_new();
1959     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1960
1961     if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1962         goto fail;
1963
1964     if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1965         pa_log("No server specified.");
1966         goto fail;
1967     }
1968
1969     ss = m->core->default_sample_spec;
1970     map = m->core->default_channel_map;
1971     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1972         pa_log("Invalid sample format specification");
1973         goto fail;
1974     }
1975
1976     if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1977         pa_log("Failed to connect to server '%s'", u->server_name);
1978         goto fail;
1979     }
1980
1981     pa_socket_client_set_callback(u->client, on_connection, u);
1982
1983 #ifdef TUNNEL_SINK
1984
1985     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1986         dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
1987
1988     pa_sink_new_data_init(&data);
1989     data.driver = __FILE__;
1990     data.module = m;
1991     data.namereg_fail = TRUE;
1992     pa_sink_new_data_set_name(&data, dn);
1993     pa_sink_new_data_set_sample_spec(&data, &ss);
1994     pa_sink_new_data_set_channel_map(&data, &map);
1995     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1996     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1997     if (u->sink_name)
1998         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1999
2000     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2001         pa_log("Invalid properties");
2002         pa_sink_new_data_done(&data);
2003         goto fail;
2004     }
2005
2006     u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
2007     pa_sink_new_data_done(&data);
2008
2009     if (!u->sink) {
2010         pa_log("Failed to create sink.");
2011         goto fail;
2012     }
2013
2014     u->sink->parent.process_msg = sink_process_msg;
2015     u->sink->userdata = u;
2016     u->sink->set_state = sink_set_state;
2017     u->sink->set_volume = sink_set_volume;
2018     u->sink->set_mute = sink_set_mute;
2019
2020     u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
2021
2022 /*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2023
2024     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2025     pa_sink_set_rtpoll(u->sink, u->rtpoll);
2026
2027 #else
2028
2029     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2030         dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2031
2032     pa_source_new_data_init(&data);
2033     data.driver = __FILE__;
2034     data.module = m;
2035     data.namereg_fail = TRUE;
2036     pa_source_new_data_set_name(&data, dn);
2037     pa_source_new_data_set_sample_spec(&data, &ss);
2038     pa_source_new_data_set_channel_map(&data, &map);
2039     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2040     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2041     if (u->source_name)
2042         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2043
2044     if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2045         pa_log("Invalid properties");
2046         pa_source_new_data_done(&data);
2047         goto fail;
2048     }
2049
2050     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2051     pa_source_new_data_done(&data);
2052
2053     if (!u->source) {
2054         pa_log("Failed to create source.");
2055         goto fail;
2056     }
2057
2058     u->source->parent.process_msg = source_process_msg;
2059     u->source->set_state = source_set_state;
2060     u->source->userdata = u;
2061
2062 /*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2063
2064     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2065     pa_source_set_rtpoll(u->source, u->rtpoll);
2066
2067     u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2068 #endif
2069
2070     pa_xfree(dn);
2071
2072     u->time_event = NULL;
2073
2074     u->maxlength = (uint32_t) -1;
2075 #ifdef TUNNEL_SINK
2076     u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2077 #else
2078     u->fragsize = (uint32_t) -1;
2079 #endif
2080
2081     if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2082         pa_log("Failed to create thread.");
2083         goto fail;
2084     }
2085
2086 #ifdef TUNNEL_SINK
2087     pa_sink_put(u->sink);
2088 #else
2089     pa_source_put(u->source);
2090 #endif
2091
2092     pa_modargs_free(ma);
2093
2094     return 0;
2095
2096 fail:
2097     pa__done(m);
2098
2099     if (ma)
2100         pa_modargs_free(ma);
2101
2102     pa_xfree(dn);
2103
2104     return -1;
2105 }
2106
2107 void pa__done(pa_module*m) {
2108     struct userdata* u;
2109
2110     pa_assert(m);
2111
2112     if (!(u = m->userdata))
2113         return;
2114
2115 #ifdef TUNNEL_SINK
2116     if (u->sink)
2117         pa_sink_unlink(u->sink);
2118 #else
2119     if (u->source)
2120         pa_source_unlink(u->source);
2121 #endif
2122
2123     if (u->thread) {
2124         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2125         pa_thread_free(u->thread);
2126     }
2127
2128     pa_thread_mq_done(&u->thread_mq);
2129
2130 #ifdef TUNNEL_SINK
2131     if (u->sink)
2132         pa_sink_unref(u->sink);
2133 #else
2134     if (u->source)
2135         pa_source_unref(u->source);
2136 #endif
2137
2138     if (u->rtpoll)
2139         pa_rtpoll_free(u->rtpoll);
2140
2141     if (u->pstream) {
2142         pa_pstream_unlink(u->pstream);
2143         pa_pstream_unref(u->pstream);
2144     }
2145
2146     if (u->pdispatch)
2147         pa_pdispatch_unref(u->pdispatch);
2148
2149     if (u->client)
2150         pa_socket_client_unref(u->client);
2151
2152     if (u->auth_cookie)
2153         pa_auth_cookie_unref(u->auth_cookie);
2154
2155     if (u->smoother)
2156         pa_smoother_free(u->smoother);
2157
2158     if (u->time_event)
2159         u->core->mainloop->time_free(u->time_event);
2160
2161 #ifndef TUNNEL_SINK
2162     if (u->mcalign)
2163         pa_mcalign_free(u->mcalign);
2164 #endif
2165
2166 #ifdef TUNNEL_SINK
2167     pa_xfree(u->sink_name);
2168 #else
2169     pa_xfree(u->source_name);
2170 #endif
2171     pa_xfree(u->server_name);
2172
2173     pa_xfree(u->device_description);
2174     pa_xfree(u->server_fqdn);
2175     pa_xfree(u->user_name);
2176
2177     pa_xfree(u);
2178 }