echo-cancel-test: Enable debug log level
[platform/upstream/pulseaudio.git] / src / modules / module-tunnel.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <unistd.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <pulse/rtclock.h>
35 #include <pulse/timeval.h>
36 #include <pulse/util.h>
37 #include <pulse/version.h>
38 #include <pulse/xmalloc.h>
39
40 #include <pulsecore/module.h>
41 #include <pulsecore/core-util.h>
42 #include <pulsecore/modargs.h>
43 #include <pulsecore/log.h>
44 #include <pulsecore/core-subscribe.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/time-smoother.h>
50 #include <pulsecore/thread.h>
51 #include <pulsecore/thread-mq.h>
52 #include <pulsecore/core-rtclock.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/proplist-util.h>
55 #include <pulsecore/auth-cookie.h>
56 #include <pulsecore/mcalign.h>
57
58 #ifdef TUNNEL_SINK
59 #include "module-tunnel-sink-symdef.h"
60 #else
61 #include "module-tunnel-source-symdef.h"
62 #endif
63
64 #ifdef TUNNEL_SINK
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
66 PA_MODULE_USAGE(
67         "sink_name=<name for the local sink> "
68         "sink_properties=<properties for the local sink> "
69         "server=<address> "
70         "sink=<remote sink name> "
71         "cookie=<filename> "
72         "format=<sample format> "
73         "channels=<number of channels> "
74         "rate=<sample rate> "
75         "channel_map=<channel map>");
76 #else
77 PA_MODULE_DESCRIPTION("Tunnel module for sources");
78 PA_MODULE_USAGE(
79         "source_name=<name for the local source> "
80         "source_properties=<properties for the local source> "
81         "server=<address> "
82         "source=<remote source name> "
83         "cookie=<filename> "
84         "format=<sample format> "
85         "channels=<number of channels> "
86         "rate=<sample rate> "
87         "channel_map=<channel map>");
88 #endif
89
90 PA_MODULE_AUTHOR("Lennart Poettering");
91 PA_MODULE_VERSION(PACKAGE_VERSION);
92 PA_MODULE_LOAD_ONCE(FALSE);
93
94 static const char* const valid_modargs[] = {
95     "server",
96     "cookie",
97     "format",
98     "channels",
99     "rate",
100 #ifdef TUNNEL_SINK
101     "sink_name",
102     "sink_properties",
103     "sink",
104 #else
105     "source_name",
106     "source_properties",
107     "source",
108 #endif
109     "channel_map",
110     NULL,
111 };
112
113 #define DEFAULT_TIMEOUT 5
114
115 #define LATENCY_INTERVAL (10*PA_USEC_PER_SEC)
116
117 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118
119 #ifdef TUNNEL_SINK
120
121 enum {
122     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
123     SINK_MESSAGE_REMOTE_SUSPEND,
124     SINK_MESSAGE_UPDATE_LATENCY,
125     SINK_MESSAGE_POST
126 };
127
128 #define DEFAULT_TLENGTH_MSEC 150
129 #define DEFAULT_MINREQ_MSEC 25
130
131 #else
132
133 enum {
134     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
135     SOURCE_MESSAGE_REMOTE_SUSPEND,
136     SOURCE_MESSAGE_UPDATE_LATENCY
137 };
138
139 #define DEFAULT_FRAGSIZE_MSEC 25
140
141 #endif
142
143 #ifdef TUNNEL_SINK
144 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 #endif
147 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
150 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
152 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
153 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
154
155 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
156 #ifdef TUNNEL_SINK
157     [PA_COMMAND_REQUEST] = command_request,
158     [PA_COMMAND_STARTED] = command_started,
159 #endif
160     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
161     [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
162     [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
163     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
164     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
165     [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
166     [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
167     [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
168     [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
169     [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
170     [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
171     [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
172     [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
173     [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
174 };
175
176 struct userdata {
177     pa_core *core;
178     pa_module *module;
179
180     pa_thread_mq thread_mq;
181     pa_rtpoll *rtpoll;
182     pa_thread *thread;
183
184     pa_socket_client *client;
185     pa_pstream *pstream;
186     pa_pdispatch *pdispatch;
187
188     char *server_name;
189 #ifdef TUNNEL_SINK
190     char *sink_name;
191     pa_sink *sink;
192     size_t requested_bytes;
193 #else
194     char *source_name;
195     pa_source *source;
196     pa_mcalign *mcalign;
197 #endif
198
199     pa_auth_cookie *auth_cookie;
200
201     uint32_t version;
202     uint32_t ctag;
203     uint32_t device_index;
204     uint32_t channel;
205
206     int64_t counter, counter_delta;
207
208     pa_bool_t remote_corked:1;
209     pa_bool_t remote_suspended:1;
210
211     pa_usec_t transport_usec; /* maintained in the main thread */
212     pa_usec_t thread_transport_usec; /* maintained in the IO thread */
213
214     uint32_t ignore_latency_before;
215
216     pa_time_event *time_event;
217
218     pa_smoother *smoother;
219
220     char *device_description;
221     char *server_fqdn;
222     char *user_name;
223
224     uint32_t maxlength;
225 #ifdef TUNNEL_SINK
226     uint32_t tlength;
227     uint32_t minreq;
228     uint32_t prebuf;
229 #else
230     uint32_t fragsize;
231 #endif
232 };
233
234 static void request_latency(struct userdata *u);
235
236 /* Called from main context */
237 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238     pa_log_debug("Got stream or client event.");
239 }
240
241 /* Called from main context */
242 static void command_stream_killed(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
243     struct userdata *u = userdata;
244
245     pa_assert(pd);
246     pa_assert(t);
247     pa_assert(u);
248     pa_assert(u->pdispatch == pd);
249
250     pa_log_warn("Stream killed");
251     pa_module_unload_request(u->module, TRUE);
252 }
253
254 /* Called from main context */
255 static void command_overflow_or_underflow(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
256     struct userdata *u = userdata;
257
258     pa_assert(pd);
259     pa_assert(t);
260     pa_assert(u);
261     pa_assert(u->pdispatch == pd);
262
263     pa_log_info("Server signalled buffer overrun/underrun.");
264     request_latency(u);
265 }
266
267 /* Called from main context */
268 static void command_suspended(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
269     struct userdata *u = userdata;
270     uint32_t channel;
271     pa_bool_t suspended;
272
273     pa_assert(pd);
274     pa_assert(t);
275     pa_assert(u);
276     pa_assert(u->pdispatch == pd);
277
278     if (pa_tagstruct_getu32(t, &channel) < 0 ||
279         pa_tagstruct_get_boolean(t, &suspended) < 0 ||
280         !pa_tagstruct_eof(t)) {
281
282         pa_log("Invalid packet.");
283         pa_module_unload_request(u->module, TRUE);
284         return;
285     }
286
287     pa_log_debug("Server reports device suspend.");
288
289 #ifdef TUNNEL_SINK
290     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
291 #else
292     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 #endif
294
295     request_latency(u);
296 }
297
298 /* Called from main context */
299 static void command_moved(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
300     struct userdata *u = userdata;
301     uint32_t channel, di;
302     const char *dn;
303     pa_bool_t suspended;
304
305     pa_assert(pd);
306     pa_assert(t);
307     pa_assert(u);
308     pa_assert(u->pdispatch == pd);
309
310     if (pa_tagstruct_getu32(t, &channel) < 0 ||
311         pa_tagstruct_getu32(t, &di) < 0 ||
312         pa_tagstruct_gets(t, &dn) < 0 ||
313         pa_tagstruct_get_boolean(t, &suspended) < 0) {
314
315         pa_log_error("Invalid packet.");
316         pa_module_unload_request(u->module, TRUE);
317         return;
318     }
319
320     pa_log_debug("Server reports a stream move.");
321
322 #ifdef TUNNEL_SINK
323     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
324 #else
325     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 #endif
327
328     request_latency(u);
329 }
330
331 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
332     struct userdata *u = userdata;
333     uint32_t channel, maxlength, tlength = 0, fragsize, prebuf, minreq;
334     pa_usec_t usec;
335
336     pa_assert(pd);
337     pa_assert(t);
338     pa_assert(u);
339     pa_assert(u->pdispatch == pd);
340
341     if (pa_tagstruct_getu32(t, &channel) < 0 ||
342         pa_tagstruct_getu32(t, &maxlength) < 0) {
343
344         pa_log_error("Invalid packet.");
345         pa_module_unload_request(u->module, TRUE);
346         return;
347     }
348
349     if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
350         if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
351             pa_tagstruct_get_usec(t, &usec) < 0) {
352
353             pa_log_error("Invalid packet.");
354             pa_module_unload_request(u->module, TRUE);
355             return;
356         }
357     } else {
358         if (pa_tagstruct_getu32(t, &tlength) < 0 ||
359             pa_tagstruct_getu32(t, &prebuf) < 0 ||
360             pa_tagstruct_getu32(t, &minreq) < 0 ||
361             pa_tagstruct_get_usec(t, &usec) < 0) {
362
363             pa_log_error("Invalid packet.");
364             pa_module_unload_request(u->module, TRUE);
365             return;
366         }
367     }
368
369 #ifdef TUNNEL_SINK
370     pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
371 #endif
372
373     request_latency(u);
374 }
375
376 #ifdef TUNNEL_SINK
377
378 /* Called from main context */
379 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
380     struct userdata *u = userdata;
381
382     pa_assert(pd);
383     pa_assert(t);
384     pa_assert(u);
385     pa_assert(u->pdispatch == pd);
386
387     pa_log_debug("Server reports playback started.");
388     request_latency(u);
389 }
390
391 #endif
392
393 /* Called from IO thread context */
394 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
395     pa_usec_t x;
396
397     pa_assert(u);
398
399     x = pa_rtclock_now();
400
401     /* Correct by the time the requested issued needs to travel to the
402      * other side.  This is a valid thread-safe access, because the
403      * main thread is waiting for us */
404
405     if (past)
406         x -= u->thread_transport_usec;
407     else
408         x += u->thread_transport_usec;
409
410     if (u->remote_suspended || u->remote_corked)
411         pa_smoother_pause(u->smoother, x);
412     else
413         pa_smoother_resume(u->smoother, x, TRUE);
414 }
415
416 /* Called from IO thread context */
417 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
418     pa_assert(u);
419
420     if (u->remote_corked == cork)
421         return;
422
423     u->remote_corked = cork;
424     check_smoother_status(u, FALSE);
425 }
426
427 /* Called from main context */
428 static void stream_cork(struct userdata *u, pa_bool_t cork) {
429     pa_tagstruct *t;
430     pa_assert(u);
431
432     if (!u->pstream)
433         return;
434
435     t = pa_tagstruct_new(NULL, 0);
436 #ifdef TUNNEL_SINK
437     pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
438 #else
439     pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
440 #endif
441     pa_tagstruct_putu32(t, u->ctag++);
442     pa_tagstruct_putu32(t, u->channel);
443     pa_tagstruct_put_boolean(t, !!cork);
444     pa_pstream_send_tagstruct(u->pstream, t);
445
446     request_latency(u);
447 }
448
449 /* Called from IO thread context */
450 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
451     pa_assert(u);
452
453     if (u->remote_suspended == suspend)
454         return;
455
456     u->remote_suspended = suspend;
457     check_smoother_status(u, TRUE);
458 }
459
460 #ifdef TUNNEL_SINK
461
462 /* Called from IO thread context */
463 static void send_data(struct userdata *u) {
464     pa_assert(u);
465
466     while (u->requested_bytes > 0) {
467         pa_memchunk memchunk;
468
469         pa_sink_render(u->sink, u->requested_bytes, &memchunk);
470         pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
471         pa_memblock_unref(memchunk.memblock);
472
473         u->requested_bytes -= memchunk.length;
474
475         u->counter += (int64_t) memchunk.length;
476     }
477 }
478
479 /* This function is called from IO context -- except when it is not. */
480 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
481     struct userdata *u = PA_SINK(o)->userdata;
482
483     switch (code) {
484
485         case PA_SINK_MESSAGE_SET_STATE: {
486             int r;
487
488             /* First, change the state, because otherwise pa_sink_render() would fail */
489             if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
490
491                 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
492
493                 if (PA_SINK_IS_OPENED(u->sink->state))
494                     send_data(u);
495             }
496
497             return r;
498         }
499
500         case PA_SINK_MESSAGE_GET_LATENCY: {
501             pa_usec_t yl, yr, *usec = data;
502
503             yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
504             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
505
506             *usec = yl > yr ? yl - yr : 0;
507             return 0;
508         }
509
510         case SINK_MESSAGE_REQUEST:
511
512             pa_assert(offset > 0);
513             u->requested_bytes += (size_t) offset;
514
515             if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516                 send_data(u);
517
518             return 0;
519
520
521         case SINK_MESSAGE_REMOTE_SUSPEND:
522
523             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
524             return 0;
525
526
527         case SINK_MESSAGE_UPDATE_LATENCY: {
528             pa_usec_t y;
529
530             y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
531
532             if (y > (pa_usec_t) offset)
533                 y -= (pa_usec_t) offset;
534             else
535                 y = 0;
536
537             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
538
539             /* We can access this freely here, since the main thread is waiting for us */
540             u->thread_transport_usec = u->transport_usec;
541
542             return 0;
543         }
544
545         case SINK_MESSAGE_POST:
546
547             /* OK, This might be a bit confusing. This message is
548              * delivered to us from the main context -- NOT from the
549              * IO thread context where the rest of the messages are
550              * dispatched. Yeah, ugly, but I am a lazy bastard. */
551
552             pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
553
554             u->counter_delta += (int64_t) chunk->length;
555
556             return 0;
557     }
558
559     return pa_sink_process_msg(o, code, data, offset, chunk);
560 }
561
562 /* Called from main context */
563 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
564     struct userdata *u;
565     pa_sink_assert_ref(s);
566     u = s->userdata;
567
568     switch ((pa_sink_state_t) state) {
569
570         case PA_SINK_SUSPENDED:
571             pa_assert(PA_SINK_IS_OPENED(s->state));
572             stream_cork(u, TRUE);
573             break;
574
575         case PA_SINK_IDLE:
576         case PA_SINK_RUNNING:
577             if (s->state == PA_SINK_SUSPENDED)
578                 stream_cork(u, FALSE);
579             break;
580
581         case PA_SINK_UNLINKED:
582         case PA_SINK_INIT:
583         case PA_SINK_INVALID_STATE:
584             ;
585     }
586
587     return 0;
588 }
589
590 #else
591
592 /* This function is called from IO context -- except when it is not. */
593 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
594     struct userdata *u = PA_SOURCE(o)->userdata;
595
596     switch (code) {
597
598         case PA_SOURCE_MESSAGE_SET_STATE: {
599             int r;
600
601             if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
602                 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
603
604             return r;
605         }
606
607         case PA_SOURCE_MESSAGE_GET_LATENCY: {
608             pa_usec_t yr, yl, *usec = data;
609
610             yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
611             yr = pa_smoother_get(u->smoother, pa_rtclock_now());
612
613             *usec = yr > yl ? yr - yl : 0;
614             return 0;
615         }
616
617         case SOURCE_MESSAGE_POST: {
618             pa_memchunk c;
619
620             pa_mcalign_push(u->mcalign, chunk);
621
622             while (pa_mcalign_pop(u->mcalign, &c) >= 0) {
623
624                 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
625                     pa_source_post(u->source, &c);
626
627                 pa_memblock_unref(c.memblock);
628
629                 u->counter += (int64_t) c.length;
630             }
631
632             return 0;
633         }
634
635         case SOURCE_MESSAGE_REMOTE_SUSPEND:
636
637             stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
638             return 0;
639
640         case SOURCE_MESSAGE_UPDATE_LATENCY: {
641             pa_usec_t y;
642
643             y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
644             y += (pa_usec_t) offset;
645
646             pa_smoother_put(u->smoother, pa_rtclock_now(), y);
647
648             /* We can access this freely here, since the main thread is waiting for us */
649             u->thread_transport_usec = u->transport_usec;
650
651             return 0;
652         }
653     }
654
655     return pa_source_process_msg(o, code, data, offset, chunk);
656 }
657
658 /* Called from main context */
659 static int source_set_state(pa_source *s, pa_source_state_t state) {
660     struct userdata *u;
661     pa_source_assert_ref(s);
662     u = s->userdata;
663
664     switch ((pa_source_state_t) state) {
665
666         case PA_SOURCE_SUSPENDED:
667             pa_assert(PA_SOURCE_IS_OPENED(s->state));
668             stream_cork(u, TRUE);
669             break;
670
671         case PA_SOURCE_IDLE:
672         case PA_SOURCE_RUNNING:
673             if (s->state == PA_SOURCE_SUSPENDED)
674                 stream_cork(u, FALSE);
675             break;
676
677         case PA_SOURCE_UNLINKED:
678         case PA_SOURCE_INIT:
679         case PA_SINK_INVALID_STATE:
680             ;
681     }
682
683     return 0;
684 }
685
686 #endif
687
688 static void thread_func(void *userdata) {
689     struct userdata *u = userdata;
690
691     pa_assert(u);
692
693     pa_log_debug("Thread starting up");
694
695     pa_thread_mq_install(&u->thread_mq);
696
697     for (;;) {
698         int ret;
699
700 #ifdef TUNNEL_SINK
701         if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
702             pa_sink_process_rewind(u->sink, 0);
703 #endif
704
705         if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
706             goto fail;
707
708         if (ret == 0)
709             goto finish;
710     }
711
712 fail:
713     /* If this was no regular exit from the loop we have to continue
714      * processing messages until we received PA_MESSAGE_SHUTDOWN */
715     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
716     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
717
718 finish:
719     pa_log_debug("Thread shutting down");
720 }
721
722 #ifdef TUNNEL_SINK
723 /* Called from main context */
724 static void command_request(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
725     struct userdata *u = userdata;
726     uint32_t bytes, channel;
727
728     pa_assert(pd);
729     pa_assert(command == PA_COMMAND_REQUEST);
730     pa_assert(t);
731     pa_assert(u);
732     pa_assert(u->pdispatch == pd);
733
734     if (pa_tagstruct_getu32(t, &channel) < 0 ||
735         pa_tagstruct_getu32(t, &bytes) < 0) {
736         pa_log("Invalid protocol reply");
737         goto fail;
738     }
739
740     if (channel != u->channel) {
741         pa_log("Received data for invalid channel");
742         goto fail;
743     }
744
745     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
746     return;
747
748 fail:
749     pa_module_unload_request(u->module, TRUE);
750 }
751
752 #endif
753
754 /* Called from main context */
755 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
756     struct userdata *u = userdata;
757     pa_usec_t sink_usec, source_usec;
758     pa_bool_t playing;
759     int64_t write_index, read_index;
760     struct timeval local, remote, now;
761     pa_sample_spec *ss;
762     int64_t delay;
763
764     pa_assert(pd);
765     pa_assert(u);
766
767     if (command != PA_COMMAND_REPLY) {
768         if (command == PA_COMMAND_ERROR)
769             pa_log("Failed to get latency.");
770         else
771             pa_log("Protocol error.");
772         goto fail;
773     }
774
775     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
776         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
777         pa_tagstruct_get_boolean(t, &playing) < 0 ||
778         pa_tagstruct_get_timeval(t, &local) < 0 ||
779         pa_tagstruct_get_timeval(t, &remote) < 0 ||
780         pa_tagstruct_gets64(t, &write_index) < 0 ||
781         pa_tagstruct_gets64(t, &read_index) < 0) {
782         pa_log("Invalid reply.");
783         goto fail;
784     }
785
786 #ifdef TUNNEL_SINK
787     if (u->version >= 13) {
788         uint64_t underrun_for = 0, playing_for = 0;
789
790         if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
791             pa_tagstruct_getu64(t, &playing_for) < 0) {
792             pa_log("Invalid reply.");
793             goto fail;
794         }
795     }
796 #endif
797
798     if (!pa_tagstruct_eof(t)) {
799         pa_log("Invalid reply.");
800         goto fail;
801     }
802
803     if (tag < u->ignore_latency_before) {
804         return;
805     }
806
807     pa_gettimeofday(&now);
808
809     /* Calculate transport usec */
810     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
811         /* local and remote seem to have synchronized clocks */
812 #ifdef TUNNEL_SINK
813         u->transport_usec = pa_timeval_diff(&remote, &local);
814 #else
815         u->transport_usec = pa_timeval_diff(&now, &remote);
816 #endif
817     } else
818         u->transport_usec = pa_timeval_diff(&now, &local)/2;
819
820     /* First, take the device's delay */
821 #ifdef TUNNEL_SINK
822     delay = (int64_t) sink_usec;
823     ss = &u->sink->sample_spec;
824 #else
825     delay = (int64_t) source_usec;
826     ss = &u->source->sample_spec;
827 #endif
828
829     /* Add the length of our server-side buffer */
830     if (write_index >= read_index)
831         delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
832     else
833         delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
834
835     /* Our measurements are already out of date, hence correct by the     *
836      * transport latency */
837 #ifdef TUNNEL_SINK
838     delay -= (int64_t) u->transport_usec;
839 #else
840     delay += (int64_t) u->transport_usec;
841 #endif
842
843     /* Now correct by what we have have read/written since we requested the update */
844 #ifdef TUNNEL_SINK
845     delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
846 #else
847     delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
848 #endif
849
850 #ifdef TUNNEL_SINK
851     pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
852 #else
853     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
854 #endif
855
856     return;
857
858 fail:
859
860     pa_module_unload_request(u->module, TRUE);
861 }
862
863 /* Called from main context */
864 static void request_latency(struct userdata *u) {
865     pa_tagstruct *t;
866     struct timeval now;
867     uint32_t tag;
868     pa_assert(u);
869
870     t = pa_tagstruct_new(NULL, 0);
871 #ifdef TUNNEL_SINK
872     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
873 #else
874     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
875 #endif
876     pa_tagstruct_putu32(t, tag = u->ctag++);
877     pa_tagstruct_putu32(t, u->channel);
878
879     pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
880
881     pa_pstream_send_tagstruct(u->pstream, t);
882     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
883
884     u->ignore_latency_before = tag;
885     u->counter_delta = 0;
886 }
887
888 /* Called from main context */
889 static void timeout_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
890     struct userdata *u = userdata;
891
892     pa_assert(m);
893     pa_assert(e);
894     pa_assert(u);
895
896     request_latency(u);
897
898     pa_core_rttime_restart(u->core, e, pa_rtclock_now() + LATENCY_INTERVAL);
899 }
900
901 /* Called from main context */
902 static void update_description(struct userdata *u) {
903     char *d;
904     char un[128], hn[128];
905     pa_tagstruct *t;
906
907     pa_assert(u);
908
909     if (!u->server_fqdn || !u->user_name || !u->device_description)
910         return;
911
912     d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
913
914 #ifdef TUNNEL_SINK
915     pa_sink_set_description(u->sink, d);
916     pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
917     pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
918     pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
919 #else
920     pa_source_set_description(u->source, d);
921     pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
922     pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
923     pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
924 #endif
925
926     pa_xfree(d);
927
928     d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
929                           pa_get_user_name(un, sizeof(un)),
930                           pa_get_host_name(hn, sizeof(hn)));
931
932     t = pa_tagstruct_new(NULL, 0);
933 #ifdef TUNNEL_SINK
934     pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
935 #else
936     pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
937 #endif
938     pa_tagstruct_putu32(t, u->ctag++);
939     pa_tagstruct_putu32(t, u->channel);
940     pa_tagstruct_puts(t, d);
941     pa_pstream_send_tagstruct(u->pstream, t);
942
943     pa_xfree(d);
944 }
945
946 /* Called from main context */
947 static void server_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
948     struct userdata *u = userdata;
949     pa_sample_spec ss;
950     pa_channel_map cm;
951     const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
952     uint32_t cookie;
953
954     pa_assert(pd);
955     pa_assert(u);
956
957     if (command != PA_COMMAND_REPLY) {
958         if (command == PA_COMMAND_ERROR)
959             pa_log("Failed to get info.");
960         else
961             pa_log("Protocol error.");
962         goto fail;
963     }
964
965     if (pa_tagstruct_gets(t, &server_name) < 0 ||
966         pa_tagstruct_gets(t, &server_version) < 0 ||
967         pa_tagstruct_gets(t, &user_name) < 0 ||
968         pa_tagstruct_gets(t, &host_name) < 0 ||
969         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
970         pa_tagstruct_gets(t, &default_sink_name) < 0 ||
971         pa_tagstruct_gets(t, &default_source_name) < 0 ||
972         pa_tagstruct_getu32(t, &cookie) < 0 ||
973         (u->version >= 15 && pa_tagstruct_get_channel_map(t, &cm) < 0)) {
974
975         pa_log("Parse failure");
976         goto fail;
977     }
978
979     if (!pa_tagstruct_eof(t)) {
980         pa_log("Packet too long");
981         goto fail;
982     }
983
984     pa_xfree(u->server_fqdn);
985     u->server_fqdn = pa_xstrdup(host_name);
986
987     pa_xfree(u->user_name);
988     u->user_name = pa_xstrdup(user_name);
989
990     update_description(u);
991
992     return;
993
994 fail:
995     pa_module_unload_request(u->module, TRUE);
996 }
997
998 static int read_ports(struct userdata *u, pa_tagstruct *t)
999 {
1000     if (u->version >= 16) {
1001         uint32_t n_ports;
1002         const char *s;
1003
1004         if (pa_tagstruct_getu32(t, &n_ports)) {
1005             pa_log("Parse failure");
1006             return -PA_ERR_PROTOCOL;
1007         }
1008
1009         for (uint32_t j = 0; j < n_ports; j++) {
1010             uint32_t priority;
1011
1012             if (pa_tagstruct_gets(t, &s) < 0 || /* name */
1013                 pa_tagstruct_gets(t, &s) < 0 || /* description */
1014                 pa_tagstruct_getu32(t, &priority) < 0) {
1015
1016                 pa_log("Parse failure");
1017                 return -PA_ERR_PROTOCOL;
1018             }
1019             if (u->version >= 24 && pa_tagstruct_getu32(t, &priority) < 0) { /* available */
1020                 pa_log("Parse failure");
1021                 return -PA_ERR_PROTOCOL;
1022             }
1023         }
1024
1025         if (pa_tagstruct_gets(t, &s) < 0) { /* active port */
1026             pa_log("Parse failure");
1027             return -PA_ERR_PROTOCOL;
1028         }
1029     }
1030     return 0;
1031 }
1032
1033
1034 static int read_formats(struct userdata *u, pa_tagstruct *t) {
1035     uint8_t n_formats;
1036     pa_format_info *format;
1037
1038     if (pa_tagstruct_getu8(t, &n_formats) < 0) { /* no. of formats */
1039         pa_log("Parse failure");
1040         return -PA_ERR_PROTOCOL;
1041     }
1042
1043     for (uint8_t j = 0; j < n_formats; j++) {
1044         format = pa_format_info_new();
1045         if (pa_tagstruct_get_format_info(t, format)) { /* format info */
1046             pa_format_info_free(format);
1047             pa_log("Parse failure");
1048             return -PA_ERR_PROTOCOL;
1049         }
1050         pa_format_info_free(format);
1051     }
1052     return 0;
1053 }
1054
1055 #ifdef TUNNEL_SINK
1056
1057 /* Called from main context */
1058 static void sink_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1059     struct userdata *u = userdata;
1060     uint32_t idx, owner_module, monitor_source, flags;
1061     const char *name, *description, *monitor_source_name, *driver;
1062     pa_sample_spec ss;
1063     pa_channel_map cm;
1064     pa_cvolume volume;
1065     pa_bool_t mute;
1066     pa_usec_t latency;
1067
1068     pa_assert(pd);
1069     pa_assert(u);
1070
1071     if (command != PA_COMMAND_REPLY) {
1072         if (command == PA_COMMAND_ERROR)
1073             pa_log("Failed to get info.");
1074         else
1075             pa_log("Protocol error.");
1076         goto fail;
1077     }
1078
1079     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1080         pa_tagstruct_gets(t, &name) < 0 ||
1081         pa_tagstruct_gets(t, &description) < 0 ||
1082         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1083         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1084         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1085         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1086         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1087         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1088         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1089         pa_tagstruct_get_usec(t, &latency) < 0 ||
1090         pa_tagstruct_gets(t, &driver) < 0 ||
1091         pa_tagstruct_getu32(t, &flags) < 0) {
1092
1093         pa_log("Parse failure");
1094         goto fail;
1095     }
1096
1097     if (u->version >= 13) {
1098         pa_usec_t configured_latency;
1099
1100         if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1101             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1102
1103             pa_log("Parse failure");
1104             goto fail;
1105         }
1106     }
1107
1108     if (u->version >= 15) {
1109         pa_volume_t base_volume;
1110         uint32_t state, n_volume_steps, card;
1111
1112         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1113             pa_tagstruct_getu32(t, &state) < 0 ||
1114             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1115             pa_tagstruct_getu32(t, &card) < 0) {
1116
1117             pa_log("Parse failure");
1118             goto fail;
1119         }
1120     }
1121
1122     if (read_ports(u, t) < 0)
1123         goto fail;
1124
1125     if (u->version >= 21 && read_formats(u, t) < 0)
1126         goto fail;
1127
1128     if (!pa_tagstruct_eof(t)) {
1129         pa_log("Packet too long");
1130         goto fail;
1131     }
1132
1133     if (!u->sink_name || !pa_streq(name, u->sink_name))
1134         return;
1135
1136     pa_xfree(u->device_description);
1137     u->device_description = pa_xstrdup(description);
1138
1139     update_description(u);
1140
1141     return;
1142
1143 fail:
1144     pa_module_unload_request(u->module, TRUE);
1145 }
1146
1147 /* Called from main context */
1148 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1149     struct userdata *u = userdata;
1150     uint32_t idx, owner_module, client, sink;
1151     pa_usec_t buffer_usec, sink_usec;
1152     const char *name, *driver, *resample_method;
1153     pa_bool_t mute = FALSE;
1154     pa_sample_spec sample_spec;
1155     pa_channel_map channel_map;
1156     pa_cvolume volume;
1157     pa_bool_t b;
1158
1159     pa_assert(pd);
1160     pa_assert(u);
1161
1162     if (command != PA_COMMAND_REPLY) {
1163         if (command == PA_COMMAND_ERROR)
1164             pa_log("Failed to get info.");
1165         else
1166             pa_log("Protocol error.");
1167         goto fail;
1168     }
1169
1170     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1171         pa_tagstruct_gets(t, &name) < 0 ||
1172         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1173         pa_tagstruct_getu32(t, &client) < 0 ||
1174         pa_tagstruct_getu32(t, &sink) < 0 ||
1175         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1176         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1177         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1178         pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1179         pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1180         pa_tagstruct_gets(t, &resample_method) < 0 ||
1181         pa_tagstruct_gets(t, &driver) < 0) {
1182
1183         pa_log("Parse failure");
1184         goto fail;
1185     }
1186
1187     if (u->version >= 11) {
1188         if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1189
1190             pa_log("Parse failure");
1191             goto fail;
1192         }
1193     }
1194
1195     if (u->version >= 13) {
1196         if (pa_tagstruct_get_proplist(t, NULL) < 0) {
1197
1198             pa_log("Parse failure");
1199             goto fail;
1200         }
1201     }
1202
1203     if (u->version >= 19) {
1204         if (pa_tagstruct_get_boolean(t, &b) < 0) {
1205
1206             pa_log("Parse failure");
1207             goto fail;
1208         }
1209     }
1210
1211     if (u->version >= 20) {
1212         if (pa_tagstruct_get_boolean(t, &b) < 0 ||
1213             pa_tagstruct_get_boolean(t, &b) < 0) {
1214
1215             pa_log("Parse failure");
1216             goto fail;
1217         }
1218     }
1219
1220     if (u->version >= 21) {
1221         pa_format_info *format = pa_format_info_new();
1222
1223         if (pa_tagstruct_get_format_info(t, format) < 0) {
1224             pa_format_info_free(format);
1225             pa_log("Parse failure");
1226             goto fail;
1227         }
1228         pa_format_info_free(format);
1229     }
1230
1231     if (!pa_tagstruct_eof(t)) {
1232         pa_log("Packet too long");
1233         goto fail;
1234     }
1235
1236     if (idx != u->device_index)
1237         return;
1238
1239     pa_assert(u->sink);
1240
1241     if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1242         pa_cvolume_equal(&volume, &u->sink->real_volume))
1243         return;
1244
1245     pa_sink_volume_changed(u->sink, &volume);
1246
1247     if (u->version >= 11)
1248         pa_sink_mute_changed(u->sink, mute);
1249
1250     return;
1251
1252 fail:
1253     pa_module_unload_request(u->module, TRUE);
1254 }
1255
1256 #else
1257
1258 /* Called from main context */
1259 static void source_info_cb(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1260     struct userdata *u = userdata;
1261     uint32_t idx, owner_module, monitor_of_sink, flags;
1262     const char *name, *description, *monitor_of_sink_name, *driver;
1263     pa_sample_spec ss;
1264     pa_channel_map cm;
1265     pa_cvolume volume;
1266     pa_bool_t mute;
1267     pa_usec_t latency, configured_latency;
1268
1269     pa_assert(pd);
1270     pa_assert(u);
1271
1272     if (command != PA_COMMAND_REPLY) {
1273         if (command == PA_COMMAND_ERROR)
1274             pa_log("Failed to get info.");
1275         else
1276             pa_log("Protocol error.");
1277         goto fail;
1278     }
1279
1280     if (pa_tagstruct_getu32(t, &idx) < 0 ||
1281         pa_tagstruct_gets(t, &name) < 0 ||
1282         pa_tagstruct_gets(t, &description) < 0 ||
1283         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1284         pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1285         pa_tagstruct_getu32(t, &owner_module) < 0 ||
1286         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1287         pa_tagstruct_get_boolean(t, &mute) < 0 ||
1288         pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1289         pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1290         pa_tagstruct_get_usec(t, &latency) < 0 ||
1291         pa_tagstruct_gets(t, &driver) < 0 ||
1292         pa_tagstruct_getu32(t, &flags) < 0) {
1293
1294         pa_log("Parse failure");
1295         goto fail;
1296     }
1297
1298     if (u->version >= 13) {
1299         if (pa_tagstruct_get_proplist(t, NULL) < 0 ||
1300             pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1301
1302             pa_log("Parse failure");
1303             goto fail;
1304         }
1305     }
1306
1307     if (u->version >= 15) {
1308         pa_volume_t base_volume;
1309         uint32_t state, n_volume_steps, card;
1310
1311         if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1312             pa_tagstruct_getu32(t, &state) < 0 ||
1313             pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1314             pa_tagstruct_getu32(t, &card) < 0) {
1315
1316             pa_log("Parse failure");
1317             goto fail;
1318         }
1319     }
1320
1321     if (read_ports(u, t) < 0)
1322         goto fail;
1323
1324     if (u->version >= 22 && read_formats(u, t) < 0)
1325         goto fail;
1326
1327     if (!pa_tagstruct_eof(t)) {
1328         pa_log("Packet too long");
1329         goto fail;
1330     }
1331
1332     if (!u->source_name || !pa_streq(name, u->source_name))
1333         return;
1334
1335     pa_xfree(u->device_description);
1336     u->device_description = pa_xstrdup(description);
1337
1338     update_description(u);
1339
1340     return;
1341
1342 fail:
1343     pa_module_unload_request(u->module, TRUE);
1344 }
1345
1346 #endif
1347
1348 /* Called from main context */
1349 static void request_info(struct userdata *u) {
1350     pa_tagstruct *t;
1351     uint32_t tag;
1352     pa_assert(u);
1353
1354     t = pa_tagstruct_new(NULL, 0);
1355     pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1356     pa_tagstruct_putu32(t, tag = u->ctag++);
1357     pa_pstream_send_tagstruct(u->pstream, t);
1358     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1359
1360 #ifdef TUNNEL_SINK
1361     t = pa_tagstruct_new(NULL, 0);
1362     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1363     pa_tagstruct_putu32(t, tag = u->ctag++);
1364     pa_tagstruct_putu32(t, u->device_index);
1365     pa_pstream_send_tagstruct(u->pstream, t);
1366     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1367
1368     if (u->sink_name) {
1369         t = pa_tagstruct_new(NULL, 0);
1370         pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1371         pa_tagstruct_putu32(t, tag = u->ctag++);
1372         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1373         pa_tagstruct_puts(t, u->sink_name);
1374         pa_pstream_send_tagstruct(u->pstream, t);
1375         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1376     }
1377 #else
1378     if (u->source_name) {
1379         t = pa_tagstruct_new(NULL, 0);
1380         pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1381         pa_tagstruct_putu32(t, tag = u->ctag++);
1382         pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1383         pa_tagstruct_puts(t, u->source_name);
1384         pa_pstream_send_tagstruct(u->pstream, t);
1385         pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1386     }
1387 #endif
1388 }
1389
1390 /* Called from main context */
1391 static void command_subscribe_event(pa_pdispatch *pd,  uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1392     struct userdata *u = userdata;
1393     pa_subscription_event_type_t e;
1394     uint32_t idx;
1395
1396     pa_assert(pd);
1397     pa_assert(t);
1398     pa_assert(u);
1399     pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1400
1401     if (pa_tagstruct_getu32(t, &e) < 0 ||
1402         pa_tagstruct_getu32(t, &idx) < 0) {
1403         pa_log("Invalid protocol reply");
1404         pa_module_unload_request(u->module, TRUE);
1405         return;
1406     }
1407
1408     if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1409 #ifdef TUNNEL_SINK
1410         e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1411         e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1412 #else
1413         e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1414 #endif
1415         )
1416         return;
1417
1418     request_info(u);
1419 }
1420
1421 /* Called from main context */
1422 static void start_subscribe(struct userdata *u) {
1423     pa_tagstruct *t;
1424     pa_assert(u);
1425
1426     t = pa_tagstruct_new(NULL, 0);
1427     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1428     pa_tagstruct_putu32(t, u->ctag++);
1429     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1430 #ifdef TUNNEL_SINK
1431                         PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1432 #else
1433                         PA_SUBSCRIPTION_MASK_SOURCE
1434 #endif
1435                         );
1436
1437     pa_pstream_send_tagstruct(u->pstream, t);
1438 }
1439
1440 /* Called from main context */
1441 static void create_stream_callback(pa_pdispatch *pd, uint32_t command,  uint32_t tag, pa_tagstruct *t, void *userdata) {
1442     struct userdata *u = userdata;
1443 #ifdef TUNNEL_SINK
1444     uint32_t bytes;
1445 #endif
1446
1447     pa_assert(pd);
1448     pa_assert(u);
1449     pa_assert(u->pdispatch == pd);
1450
1451     if (command != PA_COMMAND_REPLY) {
1452         if (command == PA_COMMAND_ERROR)
1453             pa_log("Failed to create stream.");
1454         else
1455             pa_log("Protocol error.");
1456         goto fail;
1457     }
1458
1459     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1460         pa_tagstruct_getu32(t, &u->device_index) < 0
1461 #ifdef TUNNEL_SINK
1462         || pa_tagstruct_getu32(t, &bytes) < 0
1463 #endif
1464         )
1465         goto parse_error;
1466
1467     if (u->version >= 9) {
1468 #ifdef TUNNEL_SINK
1469         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1470             pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1471             pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1472             pa_tagstruct_getu32(t, &u->minreq) < 0)
1473             goto parse_error;
1474 #else
1475         if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1476             pa_tagstruct_getu32(t, &u->fragsize) < 0)
1477             goto parse_error;
1478 #endif
1479     }
1480
1481     if (u->version >= 12) {
1482         pa_sample_spec ss;
1483         pa_channel_map cm;
1484         uint32_t device_index;
1485         const char *dn;
1486         pa_bool_t suspended;
1487
1488         if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1489             pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1490             pa_tagstruct_getu32(t, &device_index) < 0 ||
1491             pa_tagstruct_gets(t, &dn) < 0 ||
1492             pa_tagstruct_get_boolean(t, &suspended) < 0)
1493             goto parse_error;
1494
1495 #ifdef TUNNEL_SINK
1496         pa_xfree(u->sink_name);
1497         u->sink_name = pa_xstrdup(dn);
1498 #else
1499         pa_xfree(u->source_name);
1500         u->source_name = pa_xstrdup(dn);
1501 #endif
1502     }
1503
1504     if (u->version >= 13) {
1505         pa_usec_t usec;
1506
1507         if (pa_tagstruct_get_usec(t, &usec) < 0)
1508             goto parse_error;
1509
1510 /* #ifdef TUNNEL_SINK */
1511 /*         pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1512 /* #else */
1513 /*         pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1514 /* #endif */
1515     }
1516
1517     if (u->version >= 21) {
1518         pa_format_info *format = pa_format_info_new();
1519
1520         if (pa_tagstruct_get_format_info(t, format) < 0) {
1521             pa_format_info_free(format);
1522             goto parse_error;
1523         }
1524
1525         pa_format_info_free(format);
1526     }
1527
1528     if (!pa_tagstruct_eof(t))
1529         goto parse_error;
1530
1531     start_subscribe(u);
1532     request_info(u);
1533
1534     pa_assert(!u->time_event);
1535     u->time_event = pa_core_rttime_new(u->core, pa_rtclock_now() + LATENCY_INTERVAL, timeout_callback, u);
1536
1537     request_latency(u);
1538
1539     pa_log_debug("Stream created.");
1540
1541 #ifdef TUNNEL_SINK
1542     pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1543 #endif
1544
1545     return;
1546
1547 parse_error:
1548     pa_log("Invalid reply. (Create stream)");
1549
1550 fail:
1551     pa_module_unload_request(u->module, TRUE);
1552
1553 }
1554
1555 /* Called from main context */
1556 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1557     struct userdata *u = userdata;
1558     pa_tagstruct *reply;
1559     char name[256], un[128], hn[128];
1560     pa_cvolume volume;
1561
1562     pa_assert(pd);
1563     pa_assert(u);
1564     pa_assert(u->pdispatch == pd);
1565
1566     if (command != PA_COMMAND_REPLY ||
1567         pa_tagstruct_getu32(t, &u->version) < 0 ||
1568         !pa_tagstruct_eof(t)) {
1569
1570         if (command == PA_COMMAND_ERROR)
1571             pa_log("Failed to authenticate");
1572         else
1573             pa_log("Protocol error.");
1574
1575         goto fail;
1576     }
1577
1578     /* Minimum supported protocol version */
1579     if (u->version < 8) {
1580         pa_log("Incompatible protocol version");
1581         goto fail;
1582     }
1583
1584     /* Starting with protocol version 13 the MSB of the version tag
1585     reflects if shm is enabled for this connection or not. We don't
1586     support SHM here at all, so we just ignore this. */
1587
1588     if (u->version >= 13)
1589         u->version &= 0x7FFFFFFFU;
1590
1591     pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1592
1593 #ifdef TUNNEL_SINK
1594     pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1595     pa_sink_update_proplist(u->sink, 0, NULL);
1596
1597     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1598                 u->sink_name,
1599                 pa_get_user_name(un, sizeof(un)),
1600                 pa_get_host_name(hn, sizeof(hn)));
1601 #else
1602     pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1603     pa_source_update_proplist(u->source, 0, NULL);
1604
1605     pa_snprintf(name, sizeof(name), "%s for %s@%s",
1606                 u->source_name,
1607                 pa_get_user_name(un, sizeof(un)),
1608                 pa_get_host_name(hn, sizeof(hn)));
1609 #endif
1610
1611     reply = pa_tagstruct_new(NULL, 0);
1612     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1613     pa_tagstruct_putu32(reply, u->ctag++);
1614
1615     if (u->version >= 13) {
1616         pa_proplist *pl;
1617         pl = pa_proplist_new();
1618         pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1619         pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1620         pa_init_proplist(pl);
1621         pa_tagstruct_put_proplist(reply, pl);
1622         pa_proplist_free(pl);
1623     } else
1624         pa_tagstruct_puts(reply, "PulseAudio");
1625
1626     pa_pstream_send_tagstruct(u->pstream, reply);
1627     /* We ignore the server's reply here */
1628
1629     reply = pa_tagstruct_new(NULL, 0);
1630
1631     if (u->version < 13)
1632         /* Only for older PA versions we need to fill in the maxlength */
1633         u->maxlength = 4*1024*1024;
1634
1635 #ifdef TUNNEL_SINK
1636     u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1637     u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1638     u->prebuf = u->tlength;
1639 #else
1640     u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1641 #endif
1642
1643 #ifdef TUNNEL_SINK
1644     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1645     pa_tagstruct_putu32(reply, tag = u->ctag++);
1646
1647     if (u->version < 13)
1648         pa_tagstruct_puts(reply, name);
1649
1650     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1651     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1652     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1653     pa_tagstruct_puts(reply, u->sink_name);
1654     pa_tagstruct_putu32(reply, u->maxlength);
1655     pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1656     pa_tagstruct_putu32(reply, u->tlength);
1657     pa_tagstruct_putu32(reply, u->prebuf);
1658     pa_tagstruct_putu32(reply, u->minreq);
1659     pa_tagstruct_putu32(reply, 0);
1660     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1661     pa_tagstruct_put_cvolume(reply, &volume);
1662 #else
1663     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1664     pa_tagstruct_putu32(reply, tag = u->ctag++);
1665
1666     if (u->version < 13)
1667         pa_tagstruct_puts(reply, name);
1668
1669     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1670     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1671     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1672     pa_tagstruct_puts(reply, u->source_name);
1673     pa_tagstruct_putu32(reply, u->maxlength);
1674     pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1675     pa_tagstruct_putu32(reply, u->fragsize);
1676 #endif
1677
1678     if (u->version >= 12) {
1679         pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1680         pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1681         pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1682         pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1683         pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1684         pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1685         pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1686     }
1687
1688     if (u->version >= 13) {
1689         pa_proplist *pl;
1690
1691         pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1692         pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1693
1694         pl = pa_proplist_new();
1695         pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1696         pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1697         pa_tagstruct_put_proplist(reply, pl);
1698         pa_proplist_free(pl);
1699
1700 #ifndef TUNNEL_SINK
1701         pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1702 #endif
1703     }
1704
1705     if (u->version >= 14) {
1706 #ifdef TUNNEL_SINK
1707         pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1708 #endif
1709         pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1710     }
1711
1712     if (u->version >= 15) {
1713 #ifdef TUNNEL_SINK
1714         pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1715 #endif
1716         pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1717         pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1718     }
1719
1720 #ifdef TUNNEL_SINK
1721     if (u->version >= 17)
1722         pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1723
1724     if (u->version >= 18)
1725         pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1726 #endif
1727
1728 #ifdef TUNNEL_SINK
1729     if (u->version >= 21) {
1730         /* We're not using the extended API, so n_formats = 0 and that's that */
1731         pa_tagstruct_putu8(reply, 0);
1732     }
1733 #else
1734     if (u->version >= 22) {
1735         /* We're not using the extended API, so n_formats = 0 and that's that */
1736         pa_tagstruct_putu8(reply, 0);
1737         pa_cvolume_reset(&volume, u->source->sample_spec.channels);
1738         pa_tagstruct_put_cvolume(reply, &volume);
1739         pa_tagstruct_put_boolean(reply, FALSE); /* muted */
1740         pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1741         pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1742         pa_tagstruct_put_boolean(reply, FALSE); /* relative volume */
1743         pa_tagstruct_put_boolean(reply, FALSE); /* passthrough stream */
1744     }
1745 #endif
1746
1747     pa_pstream_send_tagstruct(u->pstream, reply);
1748     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1749
1750     pa_log_debug("Connection authenticated, creating stream ...");
1751
1752     return;
1753
1754 fail:
1755     pa_module_unload_request(u->module, TRUE);
1756 }
1757
1758 /* Called from main context */
1759 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1760     struct userdata *u = userdata;
1761
1762     pa_assert(p);
1763     pa_assert(u);
1764
1765     pa_log_warn("Stream died.");
1766     pa_module_unload_request(u->module, TRUE);
1767 }
1768
1769 /* Called from main context */
1770 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1771     struct userdata *u = userdata;
1772
1773     pa_assert(p);
1774     pa_assert(packet);
1775     pa_assert(u);
1776
1777     if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1778         pa_log("Invalid packet");
1779         pa_module_unload_request(u->module, TRUE);
1780         return;
1781     }
1782 }
1783
1784 #ifndef TUNNEL_SINK
1785 /* Called from main context */
1786 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1787     struct userdata *u = userdata;
1788
1789     pa_assert(p);
1790     pa_assert(chunk);
1791     pa_assert(u);
1792
1793     if (channel != u->channel) {
1794         pa_log("Received memory block on bad channel.");
1795         pa_module_unload_request(u->module, TRUE);
1796         return;
1797     }
1798
1799     pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1800
1801     u->counter_delta += (int64_t) chunk->length;
1802 }
1803 #endif
1804
1805 /* Called from main context */
1806 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1807     struct userdata *u = userdata;
1808     pa_tagstruct *t;
1809     uint32_t tag;
1810
1811     pa_assert(sc);
1812     pa_assert(u);
1813     pa_assert(u->client == sc);
1814
1815     pa_socket_client_unref(u->client);
1816     u->client = NULL;
1817
1818     if (!io) {
1819         pa_log("Connection failed: %s", pa_cstrerror(errno));
1820         pa_module_unload_request(u->module, TRUE);
1821         return;
1822     }
1823
1824     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1825     u->pdispatch = pa_pdispatch_new(u->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
1826
1827     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1828     pa_pstream_set_receive_packet_callback(u->pstream, pstream_packet_callback, u);
1829 #ifndef TUNNEL_SINK
1830     pa_pstream_set_receive_memblock_callback(u->pstream, pstream_memblock_callback, u);
1831 #endif
1832
1833     t = pa_tagstruct_new(NULL, 0);
1834     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1835     pa_tagstruct_putu32(t, tag = u->ctag++);
1836     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1837
1838     pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1839
1840 #ifdef HAVE_CREDS
1841 {
1842     pa_creds ucred;
1843
1844     if (pa_iochannel_creds_supported(io))
1845         pa_iochannel_creds_enable(io);
1846
1847     ucred.uid = getuid();
1848     ucred.gid = getgid();
1849
1850     pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1851 }
1852 #else
1853     pa_pstream_send_tagstruct(u->pstream, t);
1854 #endif
1855
1856     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1857
1858     pa_log_debug("Connection established, authenticating ...");
1859 }
1860
1861 #ifdef TUNNEL_SINK
1862
1863 /* Called from main context */
1864 static void sink_set_volume(pa_sink *sink) {
1865     struct userdata *u;
1866     pa_tagstruct *t;
1867
1868     pa_assert(sink);
1869     u = sink->userdata;
1870     pa_assert(u);
1871
1872     t = pa_tagstruct_new(NULL, 0);
1873     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1874     pa_tagstruct_putu32(t, u->ctag++);
1875     pa_tagstruct_putu32(t, u->device_index);
1876     pa_tagstruct_put_cvolume(t, &sink->real_volume);
1877     pa_pstream_send_tagstruct(u->pstream, t);
1878 }
1879
1880 /* Called from main context */
1881 static void sink_set_mute(pa_sink *sink) {
1882     struct userdata *u;
1883     pa_tagstruct *t;
1884
1885     pa_assert(sink);
1886     u = sink->userdata;
1887     pa_assert(u);
1888
1889     if (u->version < 11)
1890         return;
1891
1892     t = pa_tagstruct_new(NULL, 0);
1893     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1894     pa_tagstruct_putu32(t, u->ctag++);
1895     pa_tagstruct_putu32(t, u->device_index);
1896     pa_tagstruct_put_boolean(t, !!sink->muted);
1897     pa_pstream_send_tagstruct(u->pstream, t);
1898 }
1899
1900 #endif
1901
1902 int pa__init(pa_module*m) {
1903     pa_modargs *ma = NULL;
1904     struct userdata *u = NULL;
1905     pa_sample_spec ss;
1906     pa_channel_map map;
1907     char *dn = NULL;
1908 #ifdef TUNNEL_SINK
1909     pa_sink_new_data data;
1910 #else
1911     pa_source_new_data data;
1912 #endif
1913
1914     pa_assert(m);
1915
1916     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1917         pa_log("Failed to parse module arguments");
1918         goto fail;
1919     }
1920
1921     m->userdata = u = pa_xnew0(struct userdata, 1);
1922     u->core = m->core;
1923     u->module = m;
1924     u->client = NULL;
1925     u->pdispatch = NULL;
1926     u->pstream = NULL;
1927     u->server_name = NULL;
1928 #ifdef TUNNEL_SINK
1929     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1930     u->sink = NULL;
1931     u->requested_bytes = 0;
1932 #else
1933     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1934     u->source = NULL;
1935 #endif
1936     u->smoother = pa_smoother_new(
1937             PA_USEC_PER_SEC,
1938             PA_USEC_PER_SEC*2,
1939             TRUE,
1940             TRUE,
1941             10,
1942             pa_rtclock_now(),
1943             FALSE);
1944     u->ctag = 1;
1945     u->device_index = u->channel = PA_INVALID_INDEX;
1946     u->time_event = NULL;
1947     u->ignore_latency_before = 0;
1948     u->transport_usec = u->thread_transport_usec = 0;
1949     u->remote_suspended = u->remote_corked = FALSE;
1950     u->counter = u->counter_delta = 0;
1951
1952     u->rtpoll = pa_rtpoll_new();
1953     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1954
1955     if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), TRUE, PA_NATIVE_COOKIE_LENGTH)))
1956         goto fail;
1957
1958     if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1959         pa_log("No server specified.");
1960         goto fail;
1961     }
1962
1963     ss = m->core->default_sample_spec;
1964     map = m->core->default_channel_map;
1965     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1966         pa_log("Invalid sample format specification");
1967         goto fail;
1968     }
1969
1970     if (!(u->client = pa_socket_client_new_string(m->core->mainloop, TRUE, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1971         pa_log("Failed to connect to server '%s'", u->server_name);
1972         goto fail;
1973     }
1974
1975     pa_socket_client_set_callback(u->client, on_connection, u);
1976
1977 #ifdef TUNNEL_SINK
1978
1979     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1980         dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
1981
1982     pa_sink_new_data_init(&data);
1983     data.driver = __FILE__;
1984     data.module = m;
1985     data.namereg_fail = TRUE;
1986     pa_sink_new_data_set_name(&data, dn);
1987     pa_sink_new_data_set_sample_spec(&data, &ss);
1988     pa_sink_new_data_set_channel_map(&data, &map);
1989     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1990     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1991     if (u->sink_name)
1992         pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1993
1994     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
1995         pa_log("Invalid properties");
1996         pa_sink_new_data_done(&data);
1997         goto fail;
1998     }
1999
2000     u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
2001     pa_sink_new_data_done(&data);
2002
2003     if (!u->sink) {
2004         pa_log("Failed to create sink.");
2005         goto fail;
2006     }
2007
2008     u->sink->parent.process_msg = sink_process_msg;
2009     u->sink->userdata = u;
2010     u->sink->set_state = sink_set_state;
2011     pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
2012     pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
2013
2014     u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
2015
2016 /*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
2017
2018     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
2019     pa_sink_set_rtpoll(u->sink, u->rtpoll);
2020
2021 #else
2022
2023     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
2024         dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
2025
2026     pa_source_new_data_init(&data);
2027     data.driver = __FILE__;
2028     data.module = m;
2029     data.namereg_fail = TRUE;
2030     pa_source_new_data_set_name(&data, dn);
2031     pa_source_new_data_set_sample_spec(&data, &ss);
2032     pa_source_new_data_set_channel_map(&data, &map);
2033     pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
2034     pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
2035     if (u->source_name)
2036         pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
2037
2038     if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
2039         pa_log("Invalid properties");
2040         pa_source_new_data_done(&data);
2041         goto fail;
2042     }
2043
2044     u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
2045     pa_source_new_data_done(&data);
2046
2047     if (!u->source) {
2048         pa_log("Failed to create source.");
2049         goto fail;
2050     }
2051
2052     u->source->parent.process_msg = source_process_msg;
2053     u->source->set_state = source_set_state;
2054     u->source->userdata = u;
2055
2056 /*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
2057
2058     pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
2059     pa_source_set_rtpoll(u->source, u->rtpoll);
2060
2061     u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
2062 #endif
2063
2064     pa_xfree(dn);
2065
2066     u->time_event = NULL;
2067
2068     u->maxlength = (uint32_t) -1;
2069 #ifdef TUNNEL_SINK
2070     u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
2071 #else
2072     u->fragsize = (uint32_t) -1;
2073 #endif
2074
2075     if (!(u->thread = pa_thread_new("module-tunnel", thread_func, u))) {
2076         pa_log("Failed to create thread.");
2077         goto fail;
2078     }
2079
2080 #ifdef TUNNEL_SINK
2081     pa_sink_put(u->sink);
2082 #else
2083     pa_source_put(u->source);
2084 #endif
2085
2086     pa_modargs_free(ma);
2087
2088     return 0;
2089
2090 fail:
2091     pa__done(m);
2092
2093     if (ma)
2094         pa_modargs_free(ma);
2095
2096     pa_xfree(dn);
2097
2098     return -1;
2099 }
2100
2101 void pa__done(pa_module*m) {
2102     struct userdata* u;
2103
2104     pa_assert(m);
2105
2106     if (!(u = m->userdata))
2107         return;
2108
2109 #ifdef TUNNEL_SINK
2110     if (u->sink)
2111         pa_sink_unlink(u->sink);
2112 #else
2113     if (u->source)
2114         pa_source_unlink(u->source);
2115 #endif
2116
2117     if (u->thread) {
2118         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
2119         pa_thread_free(u->thread);
2120     }
2121
2122     pa_thread_mq_done(&u->thread_mq);
2123
2124 #ifdef TUNNEL_SINK
2125     if (u->sink)
2126         pa_sink_unref(u->sink);
2127 #else
2128     if (u->source)
2129         pa_source_unref(u->source);
2130 #endif
2131
2132     if (u->rtpoll)
2133         pa_rtpoll_free(u->rtpoll);
2134
2135     if (u->pstream) {
2136         pa_pstream_unlink(u->pstream);
2137         pa_pstream_unref(u->pstream);
2138     }
2139
2140     if (u->pdispatch)
2141         pa_pdispatch_unref(u->pdispatch);
2142
2143     if (u->client)
2144         pa_socket_client_unref(u->client);
2145
2146     if (u->auth_cookie)
2147         pa_auth_cookie_unref(u->auth_cookie);
2148
2149     if (u->smoother)
2150         pa_smoother_free(u->smoother);
2151
2152     if (u->time_event)
2153         u->core->mainloop->time_free(u->time_event);
2154
2155 #ifndef TUNNEL_SINK
2156     if (u->mcalign)
2157         pa_mcalign_free(u->mcalign);
2158 #endif
2159
2160 #ifdef TUNNEL_SINK
2161     pa_xfree(u->sink_name);
2162 #else
2163     pa_xfree(u->source_name);
2164 #endif
2165     pa_xfree(u->server_name);
2166
2167     pa_xfree(u->device_description);
2168     pa_xfree(u->server_fqdn);
2169     pa_xfree(u->user_name);
2170
2171     pa_xfree(u);
2172 }