split polypcore/util.[ch] into polypcore/core-util.[ch] and polyp/util.[ch]
[profile/ivi/pulseaudio.git] / src / modules / module-tunnel.c
1 /* $Id$ */
2
3 /***
4   This file is part of polypaudio.
5  
6   polypaudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   polypaudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU Lesser General Public License
17   along with polypaudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <unistd.h>
27 #include <assert.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33
34 #include <polyp/version.h>
35 #include <polyp/xmalloc.h>
36
37 #include <polypcore/module.h>
38 #include <polypcore/core-util.h>
39 #include <polypcore/modargs.h>
40 #include <polypcore/log.h>
41 #include <polypcore/core-subscribe.h>
42 #include <polypcore/sink-input.h>
43 #include <polypcore/pdispatch.h>
44 #include <polypcore/pstream.h>
45 #include <polypcore/pstream-util.h>
46 #include <polypcore/authkey.h>
47 #include <polypcore/socket-client.h>
48 #include <polypcore/socket-util.h>
49 #include <polypcore/authkey-prop.h>
50
51 #ifdef TUNNEL_SINK
52 #include "module-tunnel-sink-symdef.h"
53 PA_MODULE_DESCRIPTION("Tunnel module for sinks")
54 PA_MODULE_USAGE(
55         "server=<address> "
56         "sink=<remote sink name> "
57         "cookie=<filename> "
58         "format=<sample format> "
59         "channels=<number of channels> "
60         "rate=<sample rate> "
61         "sink_name=<name for the local sink> "
62         "channel_map=<channel map>")
63 #else
64 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sources")
66 PA_MODULE_USAGE(
67         "server=<address> "
68         "source=<remote source name> "
69         "cookie=<filename> "
70         "format=<sample format> "
71         "channels=<number of channels> "
72         "rate=<sample rate> "
73         "source_name=<name for the local source> "
74         "channel_map=<channel map>")
75 #endif
76
77 PA_MODULE_AUTHOR("Lennart Poettering")
78 PA_MODULE_VERSION(PACKAGE_VERSION)
79
80 #define DEFAULT_SINK_NAME "tunnel"
81 #define DEFAULT_SOURCE_NAME "tunnel"
82
83 #define DEFAULT_TLENGTH (44100*2*2/10)  //(10240*8)
84 #define DEFAULT_MAXLENGTH ((DEFAULT_TLENGTH*3)/2)
85 #define DEFAULT_MINREQ 512
86 #define DEFAULT_PREBUF (DEFAULT_TLENGTH-DEFAULT_MINREQ)
87 #define DEFAULT_FRAGSIZE 1024
88
89 #define DEFAULT_TIMEOUT 5
90
91 #define LATENCY_INTERVAL 10
92
93 static const char* const valid_modargs[] = {
94     "server",
95     "cookie",
96     "format",
97     "channels",
98     "rate",
99 #ifdef TUNNEL_SINK
100     "sink_name",
101     "sink",
102 #else
103     "source_name",
104     "source",
105 #endif
106     "channel_map",
107     NULL,
108 };
109
110 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
111 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
112
113 #ifdef TUNNEL_SINK
114 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
115 #endif
116
117 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
118 #ifdef TUNNEL_SINK
119     [PA_COMMAND_REQUEST] = command_request,
120 #endif    
121     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
122     [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
123     [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event, 
124 };
125
126 struct userdata {
127     pa_socket_client *client;
128     pa_pstream *pstream;
129     pa_pdispatch *pdispatch;
130
131     char *server_name;
132 #ifdef TUNNEL_SINK
133     char *sink_name;
134     pa_sink *sink;
135     uint32_t requested_bytes;
136 #else
137     char *source_name;
138     pa_source *source;
139 #endif
140     
141     pa_module *module;
142     pa_core *core;
143
144     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
145
146     uint32_t version;
147     uint32_t ctag;
148     uint32_t device_index;
149     uint32_t channel;
150     
151     pa_usec_t host_latency;
152
153     pa_time_event *time_event;
154
155     int auth_cookie_in_property;
156 };
157
158 static void close_stuff(struct userdata *u) {
159     assert(u);
160     
161     if (u->pstream) {
162         pa_pstream_close(u->pstream);
163         pa_pstream_unref(u->pstream);
164         u->pstream = NULL;
165     }
166
167     if (u->pdispatch) {
168         pa_pdispatch_unref(u->pdispatch);
169         u->pdispatch = NULL;
170     }
171
172     if (u->client) {
173         pa_socket_client_unref(u->client);
174         u->client = NULL;
175     }
176
177 #ifdef TUNNEL_SINK
178     if (u->sink) {
179         pa_sink_disconnect(u->sink);
180         pa_sink_unref(u->sink);
181         u->sink = NULL;
182     }
183 #else
184     if (u->source) {
185         pa_source_disconnect(u->source);
186         pa_source_unref(u->source);
187         u->source = NULL;
188     }
189 #endif
190
191     if (u->time_event) {
192         u->core->mainloop->time_free(u->time_event);
193         u->time_event = NULL;
194     }
195 }
196
197 static void die(struct userdata *u) {
198     assert(u);
199     close_stuff(u);
200     pa_module_unload_request(u->module);
201 }
202
203 static void command_stream_killed(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
204     struct userdata *u = userdata;
205     assert(pd && t && u && u->pdispatch == pd);
206
207     pa_log(__FILE__": stream killed");
208     die(u);
209 }
210
211 static void request_info(struct userdata *u);
212
213 static void command_subscribe_event(pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
214     struct userdata *u = userdata;
215     pa_subscription_event_type_t e;
216     uint32_t idx;
217
218     assert(pd && t && u);
219     assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
220
221     if (pa_tagstruct_getu32(t, &e) < 0 ||
222         pa_tagstruct_getu32(t, &idx) < 0 ||
223         !pa_tagstruct_eof(t)) {
224         pa_log(__FILE__": invalid protocol reply");
225         die(u);
226         return;
227     }
228
229 #ifdef TUNNEL_SINK
230     if (e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE))
231         return;
232 #else
233     if (e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE))
234         return;
235 #endif
236
237     request_info(u);
238 }
239
240 #ifdef TUNNEL_SINK
241 static void send_prebuf_request(struct userdata *u) {
242     pa_tagstruct *t;
243
244     t = pa_tagstruct_new(NULL, 0);
245     pa_tagstruct_putu32(t, PA_COMMAND_PREBUF_PLAYBACK_STREAM);
246     pa_tagstruct_putu32(t, u->ctag++);
247     pa_tagstruct_putu32(t, u->channel);
248     pa_pstream_send_tagstruct(u->pstream, t);
249 }
250
251 static void send_bytes(struct userdata *u) {
252     assert(u);
253
254     if (!u->pstream)
255         return;
256
257     while (u->requested_bytes > 0) {
258         pa_memchunk chunk;
259         if (pa_sink_render(u->sink, u->requested_bytes, &chunk) < 0) {
260             
261             if (u->requested_bytes >= DEFAULT_TLENGTH-DEFAULT_PREBUF) 
262                 send_prebuf_request(u);
263             
264             return;
265         }
266
267         pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, &chunk);
268         pa_memblock_unref(chunk.memblock);
269
270         if (chunk.length > u->requested_bytes)
271             u->requested_bytes = 0;
272         else
273             u->requested_bytes -= chunk.length;
274     }
275 }
276
277 static void command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
278     struct userdata *u = userdata;
279     uint32_t bytes, channel;
280     assert(pd && command == PA_COMMAND_REQUEST && t && u && u->pdispatch == pd);
281
282     if (pa_tagstruct_getu32(t, &channel) < 0 ||
283         pa_tagstruct_getu32(t, &bytes) < 0 ||
284         !pa_tagstruct_eof(t)) {
285         pa_log(__FILE__": invalid protocol reply");
286         die(u);
287         return;
288     }
289
290     if (channel != u->channel) {
291         pa_log(__FILE__": recieved data for invalid channel");
292         die(u);
293         return;
294     }
295     
296     u->requested_bytes += bytes;
297     send_bytes(u);
298 }
299
300 #endif
301
302 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
303     struct userdata *u = userdata;
304     pa_usec_t sink_usec, source_usec, transport_usec;
305     int playing;
306     int64_t write_index, read_index;
307     struct timeval local, remote, now;
308     assert(pd && u);
309
310     if (command != PA_COMMAND_REPLY) {
311         if (command == PA_COMMAND_ERROR)
312             pa_log(__FILE__": failed to get latency.");
313         else
314             pa_log(__FILE__": protocol error.");
315         die(u);
316         return;
317     }
318     
319     if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
320         pa_tagstruct_get_usec(t, &source_usec) < 0 ||
321         pa_tagstruct_get_boolean(t, &playing) < 0 ||
322         pa_tagstruct_get_timeval(t, &local) < 0 ||
323         pa_tagstruct_get_timeval(t, &remote) < 0 ||
324         pa_tagstruct_gets64(t, &write_index) < 0 ||
325         pa_tagstruct_gets64(t, &read_index) < 0 ||
326         !pa_tagstruct_eof(t)) {
327         pa_log(__FILE__": invalid reply. (latency)");
328         die(u);
329         return;
330     }
331
332     pa_gettimeofday(&now);
333
334     if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
335         /* local and remote seem to have synchronized clocks */
336 #ifdef TUNNEL_SINK
337         transport_usec = pa_timeval_diff(&remote, &local);
338 #else
339         transport_usec = pa_timeval_diff(&now, &remote);
340 #endif    
341     } else
342         transport_usec = pa_timeval_diff(&now, &local)/2;
343
344 #ifdef TUNNEL_SINK
345     u->host_latency = sink_usec + transport_usec;
346 #else
347     u->host_latency = source_usec + transport_usec;
348     if (u->host_latency > sink_usec)
349         u->host_latency -= sink_usec;
350     else
351         u->host_latency = 0;
352 #endif
353
354 /*     pa_log(__FILE__": estimated host latency: %0.0f usec", (double) u->host_latency); */
355 }
356
357 static void request_latency(struct userdata *u) {
358     pa_tagstruct *t;
359     struct timeval now;
360     uint32_t tag;
361     assert(u);
362
363     t = pa_tagstruct_new(NULL, 0);
364 #ifdef TUNNEL_SINK    
365     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
366 #else
367     pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
368 #endif
369     pa_tagstruct_putu32(t, tag = u->ctag++);
370     pa_tagstruct_putu32(t, u->channel);
371
372     pa_gettimeofday(&now);
373     pa_tagstruct_put_timeval(t, &now);
374     
375     pa_pstream_send_tagstruct(u->pstream, t);
376     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
377 }
378
379 static void stream_get_info_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
380     struct userdata *u = userdata;
381     uint32_t idx, owner_module, monitor_source;
382     pa_usec_t latency;
383     const char *name, *description, *monitor_source_name, *driver;
384     int mute;
385     uint32_t flags;
386     pa_sample_spec sample_spec;
387     pa_channel_map channel_map;
388     pa_cvolume volume;
389     assert(pd && u);
390
391     if (command != PA_COMMAND_REPLY) {
392         if (command == PA_COMMAND_ERROR)
393             pa_log(__FILE__": failed to get info.");
394         else
395             pa_log(__FILE__": protocol error.");
396         die(u);
397         return;
398     }
399
400     if (pa_tagstruct_getu32(t, &idx) < 0 ||
401         pa_tagstruct_gets(t, &name) < 0 ||
402         pa_tagstruct_gets(t, &description) < 0 ||
403         pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
404         pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
405         pa_tagstruct_getu32(t, &owner_module) < 0 ||
406         pa_tagstruct_get_cvolume(t, &volume) < 0 ||
407         pa_tagstruct_get_boolean(t, &mute) < 0 ||
408         pa_tagstruct_getu32(t, &monitor_source) < 0 ||
409         pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
410         pa_tagstruct_get_usec(t, &latency) < 0 ||
411         pa_tagstruct_gets(t, &driver) < 0 ||
412         pa_tagstruct_getu32(t, &flags) < 0 ||
413         !pa_tagstruct_eof(t)) {
414         pa_log(__FILE__": invalid reply. (get_info)");
415         die(u);
416         return;
417     }
418
419 #ifdef TUNNEL_SINK
420     assert(u->sink);
421     if ((!!mute == !!u->sink->hw_muted) &&
422         pa_cvolume_equal(&volume, &u->sink->hw_volume))
423         return;
424 #else
425     assert(u->source);
426     if ((!!mute == !!u->source->hw_muted) &&
427         pa_cvolume_equal(&volume, &u->source->hw_volume))
428         return;
429 #endif
430
431 #ifdef TUNNEL_SINK
432     memcpy(&u->sink->hw_volume, &volume, sizeof(pa_cvolume));
433     u->sink->hw_muted = !!mute;
434
435     pa_subscription_post(u->sink->core,
436         PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE,
437         u->sink->index);
438 #else
439     memcpy(&u->source->hw_volume, &volume, sizeof(pa_cvolume));
440     u->source->hw_muted = !!mute;
441
442     pa_subscription_post(u->source->core,
443         PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE,
444         u->source->index);
445 #endif
446 }
447
448 static void request_info(struct userdata *u) {
449     pa_tagstruct *t;
450     uint32_t tag;
451     assert(u);
452
453     t = pa_tagstruct_new(NULL, 0);
454 #ifdef TUNNEL_SINK
455     pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
456 #else
457     pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
458 #endif
459     pa_tagstruct_putu32(t, tag = u->ctag++);
460
461     pa_tagstruct_putu32(t, PA_INVALID_INDEX);
462 #ifdef TUNNEL_SINK
463     pa_tagstruct_puts(t, u->sink_name);
464 #else
465     pa_tagstruct_puts(t, u->source_name);
466 #endif
467
468     pa_pstream_send_tagstruct(u->pstream, t);
469     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_info_callback, u, NULL);
470 }
471
472 static void start_subscribe(struct userdata *u) {
473     pa_tagstruct *t;
474     uint32_t tag;
475     assert(u);
476
477     t = pa_tagstruct_new(NULL, 0);
478     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
479     pa_tagstruct_putu32(t, tag = u->ctag++);
480
481 #ifdef TUNNEL_SINK
482     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SINK);
483 #else
484     pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SOURCE);
485 #endif
486
487     pa_pstream_send_tagstruct(u->pstream, t);
488 }
489
490 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
491     struct userdata *u = userdata;
492     assert(pd && u && u->pdispatch == pd);
493
494     if (command != PA_COMMAND_REPLY) {
495         if (command == PA_COMMAND_ERROR)
496             pa_log(__FILE__": failed to create stream.");
497         else
498             pa_log(__FILE__": protocol error.");
499         die(u);
500         return;
501     }
502
503     if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
504         pa_tagstruct_getu32(t, &u->device_index) < 0 ||
505 #ifdef TUNNEL_SINK        
506         pa_tagstruct_getu32(t, &u->requested_bytes) < 0 ||
507 #endif        
508         !pa_tagstruct_eof(t)) {
509         pa_log(__FILE__": invalid reply. (create stream)");
510         die(u);
511         return;
512     }
513
514     start_subscribe(u);
515     request_info(u);
516
517     request_latency(u);
518 #ifdef TUNNEL_SINK
519     send_bytes(u);
520 #endif
521 }
522
523 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
524     struct userdata *u = userdata;
525     pa_tagstruct *reply;
526     char name[256], un[128], hn[128];
527 #ifdef TUNNEL_SINK
528     pa_cvolume volume;
529 #endif
530     assert(pd && u && u->pdispatch == pd);
531
532     if (command != PA_COMMAND_REPLY ||
533         pa_tagstruct_getu32(t, &u->version) < 0 ||
534         !pa_tagstruct_eof(t)) {
535         if (command == PA_COMMAND_ERROR)
536             pa_log(__FILE__": failed to authenticate");
537         else
538             pa_log(__FILE__": protocol error.");
539         die(u);
540         return;
541     }
542
543     /* Minimum supported protocol version */
544     if (u->version < 8) {
545         pa_log(__FILE__": incompatible protocol version");
546         die(u);
547         return;
548     }
549
550 #ifdef TUNNEL_SINK
551     snprintf(name, sizeof(name), "Tunnel from host '%s', user '%s', sink '%s'",
552              pa_get_host_name(hn, sizeof(hn)),
553              pa_get_user_name(un, sizeof(un)),
554              u->sink->name);
555 #else
556     snprintf(name, sizeof(name), "Tunnel from host '%s', user '%s', source '%s'",
557              pa_get_host_name(hn, sizeof(hn)),
558              pa_get_user_name(un, sizeof(un)),
559              u->source->name);
560 #endif
561     
562     reply = pa_tagstruct_new(NULL, 0);
563     pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
564     pa_tagstruct_putu32(reply, tag = u->ctag++);
565     pa_tagstruct_puts(reply, name);
566     pa_pstream_send_tagstruct(u->pstream, reply);
567     /* We ignore the server's reply here */
568
569     reply = pa_tagstruct_new(NULL, 0);
570 #ifdef TUNNEL_SINK    
571     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
572     pa_tagstruct_putu32(reply, tag = u->ctag++);
573     pa_tagstruct_puts(reply, name);
574     pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
575     pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
576     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
577     pa_tagstruct_puts(reply, u->sink_name);
578     pa_tagstruct_putu32(reply, DEFAULT_MAXLENGTH);
579     pa_tagstruct_put_boolean(reply, 0);
580     pa_tagstruct_putu32(reply, DEFAULT_TLENGTH);
581     pa_tagstruct_putu32(reply, DEFAULT_PREBUF);
582     pa_tagstruct_putu32(reply, DEFAULT_MINREQ);
583     pa_tagstruct_putu32(reply, 0);
584     pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
585     pa_tagstruct_put_cvolume(reply, &volume);
586 #else
587     pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
588     pa_tagstruct_putu32(reply, tag = u->ctag++);
589     pa_tagstruct_puts(reply, name);
590     pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
591     pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
592     pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
593     pa_tagstruct_puts(reply, u->source_name);
594     pa_tagstruct_putu32(reply, DEFAULT_MAXLENGTH);
595     pa_tagstruct_put_boolean(reply, 0);
596     pa_tagstruct_putu32(reply, DEFAULT_FRAGSIZE);
597 #endif
598     
599     pa_pstream_send_tagstruct(u->pstream, reply);
600     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
601 }
602
603 static void pstream_die_callback(pa_pstream *p, void *userdata) {
604     struct userdata *u = userdata;
605     assert(p && u);
606
607     pa_log(__FILE__": stream died.");
608     die(u);
609 }
610
611
612 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const void*creds, void *userdata) {
613     struct userdata *u = userdata;
614     assert(p && packet && u);
615
616     if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
617         pa_log(__FILE__": invalid packet");
618         die(u);
619     }
620 }
621
622 #ifndef TUNNEL_SINK
623 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, PA_GCC_UNUSED int64_t offset, PA_GCC_UNUSED pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
624     struct userdata *u = userdata;
625     assert(p && chunk && u);
626
627     if (channel != u->channel) {
628         pa_log(__FILE__": recieved memory block on bad channel.");
629         die(u);
630         return;
631     }
632     
633     pa_source_post(u->source, chunk);
634 }
635 #endif
636
637 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
638     struct userdata *u = userdata;
639     pa_tagstruct *t;
640     uint32_t tag;
641     assert(sc && u && u->client == sc);
642
643     pa_socket_client_unref(u->client);
644     u->client = NULL;
645     
646     if (!io) {
647         pa_log(__FILE__": connection failed.");
648         pa_module_unload_request(u->module);
649         return;
650     }
651
652     u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->memblock_stat);
653     u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
654
655     pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
656     pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
657 #ifndef TUNNEL_SINK
658     pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
659 #endif
660     
661     t = pa_tagstruct_new(NULL, 0);
662     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
663     pa_tagstruct_putu32(t, tag = u->ctag++);
664     pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
665     pa_tagstruct_put_arbitrary(t, u->auth_cookie, sizeof(u->auth_cookie));
666     pa_pstream_send_tagstruct(u->pstream, t);
667     pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
668     
669 }
670
671 #ifdef TUNNEL_SINK
672 static void sink_notify(pa_sink*sink) {
673     struct userdata *u;
674     assert(sink && sink->userdata);
675     u = sink->userdata;
676
677     send_bytes(u);
678 }
679
680 static pa_usec_t sink_get_latency(pa_sink *sink) {
681     struct userdata *u;
682     uint32_t l;
683     pa_usec_t usec = 0;
684     assert(sink && sink->userdata);
685     u = sink->userdata;
686
687     l = DEFAULT_TLENGTH;
688
689     if (l > u->requested_bytes) {
690         l -= u->requested_bytes;
691         usec += pa_bytes_to_usec(l, &u->sink->sample_spec);
692     }
693
694     usec += u->host_latency;
695
696     return usec;
697 }
698
699 static int sink_get_hw_volume(pa_sink *sink) {
700     struct userdata *u;
701     assert(sink && sink->userdata);
702     u = sink->userdata;
703
704     return 0;
705 }
706
707 static int sink_set_hw_volume(pa_sink *sink) {
708     struct userdata *u;
709     pa_tagstruct *t;
710     uint32_t tag;
711     assert(sink && sink->userdata);
712     u = sink->userdata;
713
714     t = pa_tagstruct_new(NULL, 0);
715     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_VOLUME);
716     pa_tagstruct_putu32(t, tag = u->ctag++);
717
718     pa_tagstruct_putu32(t, PA_INVALID_INDEX);
719     pa_tagstruct_puts(t, u->sink_name);
720     pa_tagstruct_put_cvolume(t, &sink->hw_volume);
721     pa_pstream_send_tagstruct(u->pstream, t);
722
723     return 0;
724 }
725
726 static int sink_get_hw_mute(pa_sink *sink) {
727     struct userdata *u;
728     assert(sink && sink->userdata);
729     u = sink->userdata;
730
731     return 0;
732 }
733
734 static int sink_set_hw_mute(pa_sink *sink) {
735     struct userdata *u;
736     pa_tagstruct *t;
737     uint32_t tag;
738     assert(sink && sink->userdata);
739     u = sink->userdata;
740
741     t = pa_tagstruct_new(NULL, 0);
742     pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_MUTE);
743     pa_tagstruct_putu32(t, tag = u->ctag++);
744
745     pa_tagstruct_putu32(t, PA_INVALID_INDEX);
746     pa_tagstruct_puts(t, u->sink_name);
747     pa_tagstruct_put_boolean(t, !!sink->hw_muted);
748     pa_pstream_send_tagstruct(u->pstream, t);
749
750     return 0;
751 }
752 #else
753 static pa_usec_t source_get_latency(pa_source *source) {
754     struct userdata *u;
755     assert(source && source->userdata);
756     u = source->userdata;
757
758     return u->host_latency;
759 }
760
761 static int source_get_hw_volume(pa_source *source) {
762     struct userdata *u;
763     assert(source && source->userdata);
764     u = source->userdata;
765
766     return 0;
767 }
768
769 static int source_set_hw_volume(pa_source *source) {
770     struct userdata *u;
771     pa_tagstruct *t;
772     uint32_t tag;
773     assert(source && source->userdata);
774     u = source->userdata;
775
776     t = pa_tagstruct_new(NULL, 0);
777     pa_tagstruct_putu32(t, PA_COMMAND_SET_SOURCE_VOLUME);
778     pa_tagstruct_putu32(t, tag = u->ctag++);
779
780     pa_tagstruct_putu32(t, PA_INVALID_INDEX);
781     pa_tagstruct_puts(t, u->source_name);
782     pa_tagstruct_put_cvolume(t, &source->hw_volume);
783     pa_pstream_send_tagstruct(u->pstream, t);
784
785     return 0;
786 }
787
788 static int source_get_hw_mute(pa_source *source) {
789     struct userdata *u;
790     assert(source && source->userdata);
791     u = source->userdata;
792
793     return 0;
794 }
795
796 static int source_set_hw_mute(pa_source *source) {
797     struct userdata *u;
798     pa_tagstruct *t;
799     uint32_t tag;
800     assert(source && source->userdata);
801     u = source->userdata;
802
803     t = pa_tagstruct_new(NULL, 0);
804     pa_tagstruct_putu32(t, PA_COMMAND_SET_SOURCE_MUTE);
805     pa_tagstruct_putu32(t, tag = u->ctag++);
806
807     pa_tagstruct_putu32(t, PA_INVALID_INDEX);
808     pa_tagstruct_puts(t, u->source_name);
809     pa_tagstruct_put_boolean(t, !!source->hw_muted);
810     pa_pstream_send_tagstruct(u->pstream, t);
811
812     return 0;
813 }
814 #endif
815
816 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) {
817     struct userdata *u = userdata;
818     struct timeval ntv;
819     assert(m && e && u);
820
821     request_latency(u);
822     
823     pa_gettimeofday(&ntv);
824     ntv.tv_sec += LATENCY_INTERVAL;
825     m->time_restart(e, &ntv);
826 }
827
828 static int load_key(struct userdata *u, const char*fn) {
829     assert(u);
830
831     u->auth_cookie_in_property = 0;
832     
833     if (!fn && pa_authkey_prop_get(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0) {
834         pa_log_debug(__FILE__": using already loaded auth cookie.");
835         pa_authkey_prop_ref(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
836         u->auth_cookie_in_property = 1;
837         return 0;
838     }
839     
840     if (!fn)
841         fn = PA_NATIVE_COOKIE_FILE;
842
843     if (pa_authkey_load_auto(fn, u->auth_cookie, sizeof(u->auth_cookie)) < 0)
844         return -1;
845
846     pa_log_debug(__FILE__": loading cookie from disk.");
847     
848     if (pa_authkey_prop_put(u->core, PA_NATIVE_COOKIE_PROPERTY_NAME, u->auth_cookie, sizeof(u->auth_cookie)) >= 0)
849         u->auth_cookie_in_property = 1;
850
851     return 0;
852 }
853
854 int pa__init(pa_core *c, pa_module*m) {
855     pa_modargs *ma = NULL;
856     struct userdata *u = NULL;
857     pa_sample_spec ss;
858     pa_channel_map map;
859     struct timeval ntv;
860     assert(c && m);
861
862     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
863         pa_log(__FILE__": failed to parse module arguments");
864         goto fail;
865     }
866
867     u = pa_xmalloc(sizeof(struct userdata));
868     m->userdata = u;
869     u->module = m;
870     u->core = c;
871     u->client = NULL;
872     u->pdispatch = NULL;
873     u->pstream = NULL;
874     u->server_name = NULL;
875 #ifdef TUNNEL_SINK
876     u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
877     u->sink = NULL;
878     u->requested_bytes = 0;
879 #else
880     u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
881     u->source = NULL;
882 #endif
883     u->ctag = 1;
884     u->device_index = u->channel = PA_INVALID_INDEX;
885     u->host_latency = 0;
886     u->auth_cookie_in_property = 0;
887     u->time_event = NULL;
888     
889     if (load_key(u, pa_modargs_get_value(ma, "cookie", NULL)) < 0)
890         goto fail;
891     
892     if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
893         pa_log(__FILE__": no server specified.");
894         goto fail;
895     }
896
897     ss = c->default_sample_spec;
898     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
899         pa_log(__FILE__": invalid sample format specification");
900         goto fail;
901     }
902
903     if (!(u->client = pa_socket_client_new_string(c->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
904         pa_log(__FILE__": failed to connect to server '%s'", u->server_name);
905         goto fail;
906     }
907     
908     if (!u->client)
909         goto fail;
910
911     pa_socket_client_set_callback(u->client, on_connection, u);
912
913 #ifdef TUNNEL_SINK
914     if (!(u->sink = pa_sink_new(c, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) {
915         pa_log(__FILE__": failed to create sink.");
916         goto fail;
917     }
918
919     u->sink->notify = sink_notify;
920     u->sink->get_latency = sink_get_latency;
921     u->sink->get_hw_volume = sink_get_hw_volume;
922     u->sink->set_hw_volume = sink_set_hw_volume;
923     u->sink->get_hw_mute = sink_get_hw_mute;
924     u->sink->set_hw_mute = sink_set_hw_mute;
925     u->sink->userdata = u;
926     u->sink->description = pa_sprintf_malloc("Tunnel to '%s%s%s'", u->sink_name ? u->sink_name : "", u->sink_name ? "@" : "", u->server_name);
927
928     pa_sink_set_owner(u->sink, m);
929 #else
930     if (!(u->source = pa_source_new(c, __FILE__, pa_modargs_get_value(ma, "source_name", DEFAULT_SOURCE_NAME), 0, &ss, &map))) {
931         pa_log(__FILE__": failed to create source.");
932         goto fail;
933     }
934
935     u->source->get_latency = source_get_latency;
936     u->source->get_hw_volume = source_get_hw_volume;
937     u->source->set_hw_volume = source_set_hw_volume;
938     u->source->get_hw_mute = source_get_hw_mute;
939     u->source->set_hw_mute = source_set_hw_mute;
940     u->source->userdata = u;
941     u->source->description = pa_sprintf_malloc("Tunnel to '%s%s%s'", u->source_name ? u->source_name : "", u->source_name ? "@" : "", u->server_name);
942
943     pa_source_set_owner(u->source, m);
944 #endif
945     
946     pa_gettimeofday(&ntv);
947     ntv.tv_sec += LATENCY_INTERVAL;
948     u->time_event = c->mainloop->time_new(c->mainloop, &ntv, timeout_callback, u);
949
950     pa_modargs_free(ma);
951
952     return 0;
953     
954 fail:
955     pa__done(c, m);
956
957     if (ma)
958         pa_modargs_free(ma);
959     return  -1;
960 }
961
962 void pa__done(pa_core *c, pa_module*m) {
963     struct userdata* u;
964     assert(c && m);
965
966     if (!(u = m->userdata))
967         return;
968
969     close_stuff(u);
970
971     if (u->auth_cookie_in_property)
972         pa_authkey_prop_unref(c, PA_NATIVE_COOKIE_PROPERTY_NAME);
973     
974 #ifdef TUNNEL_SINK
975     pa_xfree(u->sink_name);
976 #else
977     pa_xfree(u->source_name);
978 #endif
979     pa_xfree(u->server_name);
980
981     pa_xfree(u);
982 }
983
984