2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
30 #include <sys/types.h>
34 #include <pulse/timeval.h>
35 #include <pulse/util.h>
36 #include <pulse/version.h>
37 #include <pulse/xmalloc.h>
39 #include <pulsecore/module.h>
40 #include <pulsecore/core-util.h>
41 #include <pulsecore/modargs.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-subscribe.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/socket-client.h>
49 #include <pulsecore/socket-util.h>
50 #include <pulsecore/time-smoother.h>
51 #include <pulsecore/thread.h>
52 #include <pulsecore/thread-mq.h>
53 #include <pulsecore/rtclock.h>
54 #include <pulsecore/core-error.h>
55 #include <pulsecore/proplist-util.h>
56 #include <pulsecore/auth-cookie.h>
59 #include "module-tunnel-sink-symdef.h"
61 #include "module-tunnel-source-symdef.h"
65 PA_MODULE_DESCRIPTION("Tunnel module for sinks");
68 "sink=<remote sink name> "
70 "format=<sample format> "
71 "channels=<number of channels> "
73 "sink_name=<name for the local sink> "
74 "channel_map=<channel map>");
76 PA_MODULE_DESCRIPTION("Tunnel module for sources");
79 "source=<remote source name> "
81 "format=<sample format> "
82 "channels=<number of channels> "
84 "source_name=<name for the local source> "
85 "channel_map=<channel map>");
88 PA_MODULE_AUTHOR("Lennart Poettering");
89 PA_MODULE_VERSION(PACKAGE_VERSION);
90 PA_MODULE_LOAD_ONCE(FALSE);
92 static const char* const valid_modargs[] = {
109 #define DEFAULT_TIMEOUT 5
111 #define LATENCY_INTERVAL 10
113 #define MIN_NETWORK_LATENCY_USEC (8*PA_USEC_PER_MSEC)
118 SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
119 SINK_MESSAGE_REMOTE_SUSPEND,
120 SINK_MESSAGE_UPDATE_LATENCY,
124 #define DEFAULT_TLENGTH_MSEC 150
125 #define DEFAULT_MINREQ_MSEC 25
130 SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
131 SOURCE_MESSAGE_REMOTE_SUSPEND,
132 SOURCE_MESSAGE_UPDATE_LATENCY
135 #define DEFAULT_FRAGSIZE_MSEC 25
140 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
141 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
143 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
144 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
145 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
146 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
147 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
148 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
149 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
151 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
153 [PA_COMMAND_REQUEST] = command_request,
154 [PA_COMMAND_STARTED] = command_started,
156 [PA_COMMAND_SUBSCRIBE_EVENT] = command_subscribe_event,
157 [PA_COMMAND_OVERFLOW] = command_overflow_or_underflow,
158 [PA_COMMAND_UNDERFLOW] = command_overflow_or_underflow,
159 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = command_stream_killed,
160 [PA_COMMAND_RECORD_STREAM_KILLED] = command_stream_killed,
161 [PA_COMMAND_PLAYBACK_STREAM_SUSPENDED] = command_suspended,
162 [PA_COMMAND_RECORD_STREAM_SUSPENDED] = command_suspended,
163 [PA_COMMAND_PLAYBACK_STREAM_MOVED] = command_moved,
164 [PA_COMMAND_RECORD_STREAM_MOVED] = command_moved,
165 [PA_COMMAND_PLAYBACK_STREAM_EVENT] = command_stream_or_client_event,
166 [PA_COMMAND_RECORD_STREAM_EVENT] = command_stream_or_client_event,
167 [PA_COMMAND_CLIENT_EVENT] = command_stream_or_client_event,
168 [PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed,
169 [PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED] = command_stream_buffer_attr_changed
176 pa_thread_mq thread_mq;
180 pa_socket_client *client;
182 pa_pdispatch *pdispatch;
188 size_t requested_bytes;
194 pa_auth_cookie *auth_cookie;
198 uint32_t device_index;
201 int64_t counter, counter_delta;
203 pa_bool_t remote_corked:1;
204 pa_bool_t remote_suspended:1;
206 pa_usec_t transport_usec; /* maintained in the main thread */
207 pa_usec_t thread_transport_usec; /* maintained in the IO thread */
209 uint32_t ignore_latency_before;
211 pa_time_event *time_event;
213 pa_smoother *smoother;
215 char *device_description;
229 static void request_latency(struct userdata *u);
231 /* Called from main context */
232 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
233 pa_log_debug("Got stream or client event.");
236 /* Called from main context */
237 static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
238 struct userdata *u = userdata;
243 pa_assert(u->pdispatch == pd);
245 pa_log_warn("Stream killed");
246 pa_module_unload_request(u->module, TRUE);
249 /* Called from main context */
250 static void command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
251 struct userdata *u = userdata;
256 pa_assert(u->pdispatch == pd);
258 pa_log_info("Server signalled buffer overrun/underrun.");
262 /* Called from main context */
263 static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
264 struct userdata *u = userdata;
271 pa_assert(u->pdispatch == pd);
273 if (pa_tagstruct_getu32(t, &channel) < 0 ||
274 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
275 !pa_tagstruct_eof(t)) {
277 pa_log("Invalid packet.");
278 pa_module_unload_request(u->module, TRUE);
282 pa_log_debug("Server reports device suspend.");
285 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
287 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
293 /* Called from main context */
294 static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
295 struct userdata *u = userdata;
296 uint32_t channel, di;
303 pa_assert(u->pdispatch == pd);
305 if (pa_tagstruct_getu32(t, &channel) < 0 ||
306 pa_tagstruct_getu32(t, &di) < 0 ||
307 pa_tagstruct_gets(t, &dn) < 0 ||
308 pa_tagstruct_get_boolean(t, &suspended) < 0) {
310 pa_log_error("Invalid packet.");
311 pa_module_unload_request(u->module, TRUE);
315 pa_log_debug("Server reports a stream move.");
318 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
320 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_REMOTE_SUSPEND, PA_UINT32_TO_PTR(!!suspended), 0, NULL);
326 static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
327 struct userdata *u = userdata;
328 uint32_t channel, maxlength, tlength, fragsize, prebuf, minreq;
334 pa_assert(u->pdispatch == pd);
336 if (pa_tagstruct_getu32(t, &channel) < 0 ||
337 pa_tagstruct_getu32(t, &maxlength) < 0) {
339 pa_log_error("Invalid packet.");
340 pa_module_unload_request(u->module, TRUE);
344 if (command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED) {
345 if (pa_tagstruct_getu32(t, &fragsize) < 0 ||
346 pa_tagstruct_get_usec(t, &usec) < 0) {
348 pa_log_error("Invalid packet.");
349 pa_module_unload_request(u->module, TRUE);
353 if (pa_tagstruct_getu32(t, &tlength) < 0 ||
354 pa_tagstruct_getu32(t, &prebuf) < 0 ||
355 pa_tagstruct_getu32(t, &minreq) < 0 ||
356 pa_tagstruct_get_usec(t, &usec) < 0) {
358 pa_log_error("Invalid packet.");
359 pa_module_unload_request(u->module, TRUE);
365 pa_log_debug("Server reports buffer attrs changed. tlength now at %lu, before %lu.", (unsigned long) tlength, (unsigned long) u->tlength);
373 /* Called from main context */
374 static void command_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
375 struct userdata *u = userdata;
380 pa_assert(u->pdispatch == pd);
382 pa_log_debug("Server reports playback started.");
388 /* Called from IO thread context */
389 static void check_smoother_status(struct userdata *u, pa_bool_t past) {
394 x = pa_rtclock_usec();
396 /* Correct by the time the requested issued needs to travel to the
397 * other side. This is a valid thread-safe access, because the
398 * main thread is waiting for us */
401 x -= u->thread_transport_usec;
403 x += u->thread_transport_usec;
405 if (u->remote_suspended || u->remote_corked)
406 pa_smoother_pause(u->smoother, x);
408 pa_smoother_resume(u->smoother, x, TRUE);
411 /* Called from IO thread context */
412 static void stream_cork_within_thread(struct userdata *u, pa_bool_t cork) {
415 if (u->remote_corked == cork)
418 u->remote_corked = cork;
419 check_smoother_status(u, FALSE);
422 /* Called from main context */
423 static void stream_cork(struct userdata *u, pa_bool_t cork) {
430 t = pa_tagstruct_new(NULL, 0);
432 pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
434 pa_tagstruct_putu32(t, PA_COMMAND_CORK_RECORD_STREAM);
436 pa_tagstruct_putu32(t, u->ctag++);
437 pa_tagstruct_putu32(t, u->channel);
438 pa_tagstruct_put_boolean(t, !!cork);
439 pa_pstream_send_tagstruct(u->pstream, t);
444 /* Called from IO thread context */
445 static void stream_suspend_within_thread(struct userdata *u, pa_bool_t suspend) {
448 if (u->remote_suspended == suspend)
451 u->remote_suspended = suspend;
452 check_smoother_status(u, TRUE);
457 /* Called from IO thread context */
458 static void send_data(struct userdata *u) {
461 while (u->requested_bytes > 0) {
462 pa_memchunk memchunk;
464 pa_sink_render(u->sink, u->requested_bytes, &memchunk);
465 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_POST, NULL, 0, &memchunk, NULL);
466 pa_memblock_unref(memchunk.memblock);
468 u->requested_bytes -= memchunk.length;
470 u->counter += (int64_t) memchunk.length;
474 /* This function is called from IO context -- except when it is not. */
475 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
476 struct userdata *u = PA_SINK(o)->userdata;
480 case PA_SINK_MESSAGE_SET_STATE: {
483 /* First, change the state, because otherwide pa_sink_render() would fail */
484 if ((r = pa_sink_process_msg(o, code, data, offset, chunk)) >= 0) {
486 stream_cork_within_thread(u, u->sink->state == PA_SINK_SUSPENDED);
488 if (PA_SINK_IS_OPENED(u->sink->state))
495 case PA_SINK_MESSAGE_GET_LATENCY: {
496 pa_usec_t yl, yr, *usec = data;
498 yl = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
499 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
501 *usec = yl > yr ? yl - yr : 0;
505 case SINK_MESSAGE_REQUEST:
507 pa_assert(offset > 0);
508 u->requested_bytes += (size_t) offset;
510 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
516 case SINK_MESSAGE_REMOTE_SUSPEND:
518 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
522 case SINK_MESSAGE_UPDATE_LATENCY: {
525 y = pa_bytes_to_usec((uint64_t) u->counter, &u->sink->sample_spec);
527 if (y > (pa_usec_t) offset)
528 y -= (pa_usec_t) offset;
532 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
534 /* We can access this freely here, since the main thread is waiting for us */
535 u->thread_transport_usec = u->transport_usec;
540 case SINK_MESSAGE_POST:
542 /* OK, This might be a bit confusing. This message is
543 * delivered to us from the main context -- NOT from the
544 * IO thread context where the rest of the messages are
545 * dispatched. Yeah, ugly, but I am a lazy bastard. */
547 pa_pstream_send_memblock(u->pstream, u->channel, 0, PA_SEEK_RELATIVE, chunk);
549 u->counter_delta += (int64_t) chunk->length;
554 return pa_sink_process_msg(o, code, data, offset, chunk);
557 /* Called from main context */
558 static int sink_set_state(pa_sink *s, pa_sink_state_t state) {
560 pa_sink_assert_ref(s);
563 switch ((pa_sink_state_t) state) {
565 case PA_SINK_SUSPENDED:
566 pa_assert(PA_SINK_IS_OPENED(s->state));
567 stream_cork(u, TRUE);
571 case PA_SINK_RUNNING:
572 if (s->state == PA_SINK_SUSPENDED)
573 stream_cork(u, FALSE);
576 case PA_SINK_UNLINKED:
578 case PA_SINK_INVALID_STATE:
587 /* This function is called from IO context -- except when it is not. */
588 static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
589 struct userdata *u = PA_SOURCE(o)->userdata;
593 case PA_SOURCE_MESSAGE_SET_STATE: {
596 if ((r = pa_source_process_msg(o, code, data, offset, chunk)) >= 0)
597 stream_cork_within_thread(u, u->source->state == PA_SOURCE_SUSPENDED);
602 case PA_SOURCE_MESSAGE_GET_LATENCY: {
603 pa_usec_t yr, yl, *usec = data;
605 yl = pa_bytes_to_usec((uint64_t) u->counter, &PA_SOURCE(o)->sample_spec);
606 yr = pa_smoother_get(u->smoother, pa_rtclock_usec());
608 *usec = yr > yl ? yr - yl : 0;
612 case SOURCE_MESSAGE_POST:
614 if (PA_SOURCE_IS_OPENED(u->source->thread_info.state))
615 pa_source_post(u->source, chunk);
617 u->counter += (int64_t) chunk->length;
621 case SOURCE_MESSAGE_REMOTE_SUSPEND:
623 stream_suspend_within_thread(u, !!PA_PTR_TO_UINT(data));
626 case SOURCE_MESSAGE_UPDATE_LATENCY: {
629 y = pa_bytes_to_usec((uint64_t) u->counter, &u->source->sample_spec);
630 y += (pa_usec_t) offset;
632 pa_smoother_put(u->smoother, pa_rtclock_usec(), y);
634 /* We can access this freely here, since the main thread is waiting for us */
635 u->thread_transport_usec = u->transport_usec;
641 return pa_source_process_msg(o, code, data, offset, chunk);
644 /* Called from main context */
645 static int source_set_state(pa_source *s, pa_source_state_t state) {
647 pa_source_assert_ref(s);
650 switch ((pa_source_state_t) state) {
652 case PA_SOURCE_SUSPENDED:
653 pa_assert(PA_SOURCE_IS_OPENED(s->state));
654 stream_cork(u, TRUE);
658 case PA_SOURCE_RUNNING:
659 if (s->state == PA_SOURCE_SUSPENDED)
660 stream_cork(u, FALSE);
663 case PA_SOURCE_UNLINKED:
665 case PA_SINK_INVALID_STATE:
674 static void thread_func(void *userdata) {
675 struct userdata *u = userdata;
679 pa_log_debug("Thread starting up");
681 pa_thread_mq_install(&u->thread_mq);
682 pa_rtpoll_install(u->rtpoll);
688 if (PA_SINK_IS_OPENED(u->sink->thread_info.state))
689 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
690 pa_sink_process_rewind(u->sink, 0);
693 if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0)
701 /* If this was no regular exit from the loop we have to continue
702 * processing messages until we received PA_MESSAGE_SHUTDOWN */
703 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
704 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
707 pa_log_debug("Thread shutting down");
711 /* Called from main context */
712 static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
713 struct userdata *u = userdata;
714 uint32_t bytes, channel;
717 pa_assert(command == PA_COMMAND_REQUEST);
720 pa_assert(u->pdispatch == pd);
722 if (pa_tagstruct_getu32(t, &channel) < 0 ||
723 pa_tagstruct_getu32(t, &bytes) < 0) {
724 pa_log("Invalid protocol reply");
728 if (channel != u->channel) {
729 pa_log("Recieved data for invalid channel");
733 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
737 pa_module_unload_request(u->module, TRUE);
742 /* Called from main context */
743 static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
744 struct userdata *u = userdata;
745 pa_usec_t sink_usec, source_usec;
747 int64_t write_index, read_index;
748 struct timeval local, remote, now;
755 if (command != PA_COMMAND_REPLY) {
756 if (command == PA_COMMAND_ERROR)
757 pa_log("Failed to get latency.");
759 pa_log("Protocol error.");
763 if (pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
764 pa_tagstruct_get_usec(t, &source_usec) < 0 ||
765 pa_tagstruct_get_boolean(t, &playing) < 0 ||
766 pa_tagstruct_get_timeval(t, &local) < 0 ||
767 pa_tagstruct_get_timeval(t, &remote) < 0 ||
768 pa_tagstruct_gets64(t, &write_index) < 0 ||
769 pa_tagstruct_gets64(t, &read_index) < 0) {
770 pa_log("Invalid reply.");
775 if (u->version >= 13) {
776 uint64_t underrun_for = 0, playing_for = 0;
778 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
779 pa_tagstruct_getu64(t, &playing_for) < 0) {
780 pa_log("Invalid reply.");
786 if (!pa_tagstruct_eof(t)) {
787 pa_log("Invalid reply.");
791 if (tag < u->ignore_latency_before) {
795 pa_gettimeofday(&now);
797 /* Calculate transport usec */
798 if (pa_timeval_cmp(&local, &remote) < 0 && pa_timeval_cmp(&remote, &now)) {
799 /* local and remote seem to have synchronized clocks */
801 u->transport_usec = pa_timeval_diff(&remote, &local);
803 u->transport_usec = pa_timeval_diff(&now, &remote);
806 u->transport_usec = pa_timeval_diff(&now, &local)/2;
808 /* First, take the device's delay */
810 delay = (int64_t) sink_usec;
811 ss = &u->sink->sample_spec;
813 delay = (int64_t) source_usec;
814 ss = &u->source->sample_spec;
817 /* Add the length of our server-side buffer */
818 if (write_index >= read_index)
819 delay += (int64_t) pa_bytes_to_usec((uint64_t) (write_index-read_index), ss);
821 delay -= (int64_t) pa_bytes_to_usec((uint64_t) (read_index-write_index), ss);
823 /* Our measurements are already out of date, hence correct by the *
824 * transport latency */
826 delay -= (int64_t) u->transport_usec;
828 delay += (int64_t) u->transport_usec;
831 /* Now correct by what we have have read/written since we requested the update */
833 delay += (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
835 delay -= (int64_t) pa_bytes_to_usec((uint64_t) u->counter_delta, ss);
839 pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
841 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_UPDATE_LATENCY, 0, delay, NULL);
848 pa_module_unload_request(u->module, TRUE);
851 /* Called from main context */
852 static void request_latency(struct userdata *u) {
858 t = pa_tagstruct_new(NULL, 0);
860 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
862 pa_tagstruct_putu32(t, PA_COMMAND_GET_RECORD_LATENCY);
864 pa_tagstruct_putu32(t, tag = u->ctag++);
865 pa_tagstruct_putu32(t, u->channel);
867 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
869 pa_pstream_send_tagstruct(u->pstream, t);
870 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, u, NULL);
872 u->ignore_latency_before = tag;
873 u->counter_delta = 0;
876 /* Called from main context */
877 static void timeout_callback(pa_mainloop_api *m, pa_time_event*e, const struct timeval *tv, void *userdata) {
878 struct userdata *u = userdata;
887 pa_gettimeofday(&ntv);
888 ntv.tv_sec += LATENCY_INTERVAL;
889 m->time_restart(e, &ntv);
892 /* Called from main context */
893 static void update_description(struct userdata *u) {
895 char un[128], hn[128];
900 if (!u->server_fqdn || !u->user_name || !u->device_description)
903 d = pa_sprintf_malloc("%s on %s@%s", u->device_description, u->user_name, u->server_fqdn);
906 pa_sink_set_description(u->sink, d);
907 pa_proplist_sets(u->sink->proplist, "tunnel.remote.user", u->user_name);
908 pa_proplist_sets(u->sink->proplist, "tunnel.remote.fqdn", u->server_fqdn);
909 pa_proplist_sets(u->sink->proplist, "tunnel.remote.description", u->device_description);
911 pa_source_set_description(u->source, d);
912 pa_proplist_sets(u->source->proplist, "tunnel.remote.user", u->user_name);
913 pa_proplist_sets(u->source->proplist, "tunnel.remote.fqdn", u->server_fqdn);
914 pa_proplist_sets(u->source->proplist, "tunnel.remote.description", u->device_description);
919 d = pa_sprintf_malloc("%s for %s@%s", u->device_description,
920 pa_get_user_name(un, sizeof(un)),
921 pa_get_host_name(hn, sizeof(hn)));
923 t = pa_tagstruct_new(NULL, 0);
925 pa_tagstruct_putu32(t, PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
927 pa_tagstruct_putu32(t, PA_COMMAND_SET_RECORD_STREAM_NAME);
929 pa_tagstruct_putu32(t, u->ctag++);
930 pa_tagstruct_putu32(t, u->channel);
931 pa_tagstruct_puts(t, d);
932 pa_pstream_send_tagstruct(u->pstream, t);
937 /* Called from main context */
938 static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
939 struct userdata *u = userdata;
942 const char *server_name, *server_version, *user_name, *host_name, *default_sink_name, *default_source_name;
948 if (command != PA_COMMAND_REPLY) {
949 if (command == PA_COMMAND_ERROR)
950 pa_log("Failed to get info.");
952 pa_log("Protocol error.");
956 if (pa_tagstruct_gets(t, &server_name) < 0 ||
957 pa_tagstruct_gets(t, &server_version) < 0 ||
958 pa_tagstruct_gets(t, &user_name) < 0 ||
959 pa_tagstruct_gets(t, &host_name) < 0 ||
960 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
961 pa_tagstruct_gets(t, &default_sink_name) < 0 ||
962 pa_tagstruct_gets(t, &default_source_name) < 0 ||
963 pa_tagstruct_getu32(t, &cookie) < 0 ||
965 pa_tagstruct_get_channel_map(t, &cm) < 0)) {
967 pa_log("Parse failure");
971 if (!pa_tagstruct_eof(t)) {
972 pa_log("Packet too long");
976 pa_xfree(u->server_fqdn);
977 u->server_fqdn = pa_xstrdup(host_name);
979 pa_xfree(u->user_name);
980 u->user_name = pa_xstrdup(user_name);
982 update_description(u);
987 pa_module_unload_request(u->module, TRUE);
992 /* Called from main context */
993 static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
994 struct userdata *u = userdata;
995 uint32_t idx, owner_module, monitor_source, flags;
996 const char *name, *description, *monitor_source_name, *driver;
1007 pl = pa_proplist_new();
1009 if (command != PA_COMMAND_REPLY) {
1010 if (command == PA_COMMAND_ERROR)
1011 pa_log("Failed to get info.");
1013 pa_log("Protocol error.");
1017 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1018 pa_tagstruct_gets(t, &name) < 0 ||
1019 pa_tagstruct_gets(t, &description) < 0 ||
1020 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1021 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1022 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1023 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1024 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1025 pa_tagstruct_getu32(t, &monitor_source) < 0 ||
1026 pa_tagstruct_gets(t, &monitor_source_name) < 0 ||
1027 pa_tagstruct_get_usec(t, &latency) < 0 ||
1028 pa_tagstruct_gets(t, &driver) < 0 ||
1029 pa_tagstruct_getu32(t, &flags) < 0) {
1031 pa_log("Parse failure");
1035 if (u->version >= 13) {
1036 pa_usec_t configured_latency;
1038 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1039 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1041 pa_log("Parse failure");
1046 if (u->version >= 15) {
1047 pa_volume_t base_volume;
1048 uint32_t state, n_volume_steps, card;
1050 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1051 pa_tagstruct_getu32(t, &state) < 0 ||
1052 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1053 pa_tagstruct_getu32(t, &card) < 0) {
1055 pa_log("Parse failure");
1060 if (!pa_tagstruct_eof(t)) {
1061 pa_log("Packet too long");
1065 pa_proplist_free(pl);
1067 if (!u->sink_name || strcmp(name, u->sink_name))
1070 pa_xfree(u->device_description);
1071 u->device_description = pa_xstrdup(description);
1073 update_description(u);
1078 pa_module_unload_request(u->module, TRUE);
1079 pa_proplist_free(pl);
1082 /* Called from main context */
1083 static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1084 struct userdata *u = userdata;
1085 uint32_t idx, owner_module, client, sink;
1086 pa_usec_t buffer_usec, sink_usec;
1087 const char *name, *driver, *resample_method;
1089 pa_sample_spec sample_spec;
1090 pa_channel_map channel_map;
1097 pl = pa_proplist_new();
1099 if (command != PA_COMMAND_REPLY) {
1100 if (command == PA_COMMAND_ERROR)
1101 pa_log("Failed to get info.");
1103 pa_log("Protocol error.");
1107 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1108 pa_tagstruct_gets(t, &name) < 0 ||
1109 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1110 pa_tagstruct_getu32(t, &client) < 0 ||
1111 pa_tagstruct_getu32(t, &sink) < 0 ||
1112 pa_tagstruct_get_sample_spec(t, &sample_spec) < 0 ||
1113 pa_tagstruct_get_channel_map(t, &channel_map) < 0 ||
1114 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1115 pa_tagstruct_get_usec(t, &buffer_usec) < 0 ||
1116 pa_tagstruct_get_usec(t, &sink_usec) < 0 ||
1117 pa_tagstruct_gets(t, &resample_method) < 0 ||
1118 pa_tagstruct_gets(t, &driver) < 0) {
1120 pa_log("Parse failure");
1124 if (u->version >= 11) {
1125 if (pa_tagstruct_get_boolean(t, &mute) < 0) {
1127 pa_log("Parse failure");
1132 if (u->version >= 13) {
1133 if (pa_tagstruct_get_proplist(t, pl) < 0) {
1135 pa_log("Parse failure");
1140 if (!pa_tagstruct_eof(t)) {
1141 pa_log("Packet too long");
1145 pa_proplist_free(pl);
1147 if (idx != u->device_index)
1152 if ((u->version < 11 || !!mute == !!u->sink->muted) &&
1153 pa_cvolume_equal(&volume, &u->sink->virtual_volume))
1156 pa_sink_volume_changed(u->sink, &volume);
1158 if (u->version >= 11)
1159 pa_sink_mute_changed(u->sink, mute);
1164 pa_module_unload_request(u->module, TRUE);
1165 pa_proplist_free(pl);
1170 /* Called from main context */
1171 static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1172 struct userdata *u = userdata;
1173 uint32_t idx, owner_module, monitor_of_sink, flags;
1174 const char *name, *description, *monitor_of_sink_name, *driver;
1179 pa_usec_t latency, configured_latency;
1185 pl = pa_proplist_new();
1187 if (command != PA_COMMAND_REPLY) {
1188 if (command == PA_COMMAND_ERROR)
1189 pa_log("Failed to get info.");
1191 pa_log("Protocol error.");
1195 if (pa_tagstruct_getu32(t, &idx) < 0 ||
1196 pa_tagstruct_gets(t, &name) < 0 ||
1197 pa_tagstruct_gets(t, &description) < 0 ||
1198 pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1199 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1200 pa_tagstruct_getu32(t, &owner_module) < 0 ||
1201 pa_tagstruct_get_cvolume(t, &volume) < 0 ||
1202 pa_tagstruct_get_boolean(t, &mute) < 0 ||
1203 pa_tagstruct_getu32(t, &monitor_of_sink) < 0 ||
1204 pa_tagstruct_gets(t, &monitor_of_sink_name) < 0 ||
1205 pa_tagstruct_get_usec(t, &latency) < 0 ||
1206 pa_tagstruct_gets(t, &driver) < 0 ||
1207 pa_tagstruct_getu32(t, &flags) < 0) {
1209 pa_log("Parse failure");
1213 if (u->version >= 13) {
1214 if (pa_tagstruct_get_proplist(t, pl) < 0 ||
1215 pa_tagstruct_get_usec(t, &configured_latency) < 0) {
1217 pa_log("Parse failure");
1222 if (u->version >= 15) {
1223 pa_volume_t base_volume;
1224 uint32_t state, n_volume_steps, card;
1226 if (pa_tagstruct_get_volume(t, &base_volume) < 0 ||
1227 pa_tagstruct_getu32(t, &state) < 0 ||
1228 pa_tagstruct_getu32(t, &n_volume_steps) < 0 ||
1229 pa_tagstruct_getu32(t, &card) < 0) {
1231 pa_log("Parse failure");
1236 if (!pa_tagstruct_eof(t)) {
1237 pa_log("Packet too long");
1241 pa_proplist_free(pl);
1243 if (!u->source_name || strcmp(name, u->source_name))
1246 pa_xfree(u->device_description);
1247 u->device_description = pa_xstrdup(description);
1249 update_description(u);
1254 pa_module_unload_request(u->module, TRUE);
1255 pa_proplist_free(pl);
1260 /* Called from main context */
1261 static void request_info(struct userdata *u) {
1266 t = pa_tagstruct_new(NULL, 0);
1267 pa_tagstruct_putu32(t, PA_COMMAND_GET_SERVER_INFO);
1268 pa_tagstruct_putu32(t, tag = u->ctag++);
1269 pa_pstream_send_tagstruct(u->pstream, t);
1270 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, server_info_cb, u, NULL);
1273 t = pa_tagstruct_new(NULL, 0);
1274 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INPUT_INFO);
1275 pa_tagstruct_putu32(t, tag = u->ctag++);
1276 pa_tagstruct_putu32(t, u->device_index);
1277 pa_pstream_send_tagstruct(u->pstream, t);
1278 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_input_info_cb, u, NULL);
1281 t = pa_tagstruct_new(NULL, 0);
1282 pa_tagstruct_putu32(t, PA_COMMAND_GET_SINK_INFO);
1283 pa_tagstruct_putu32(t, tag = u->ctag++);
1284 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1285 pa_tagstruct_puts(t, u->sink_name);
1286 pa_pstream_send_tagstruct(u->pstream, t);
1287 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, sink_info_cb, u, NULL);
1290 if (u->source_name) {
1291 t = pa_tagstruct_new(NULL, 0);
1292 pa_tagstruct_putu32(t, PA_COMMAND_GET_SOURCE_INFO);
1293 pa_tagstruct_putu32(t, tag = u->ctag++);
1294 pa_tagstruct_putu32(t, PA_INVALID_INDEX);
1295 pa_tagstruct_puts(t, u->source_name);
1296 pa_pstream_send_tagstruct(u->pstream, t);
1297 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, source_info_cb, u, NULL);
1302 /* Called from main context */
1303 static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1304 struct userdata *u = userdata;
1305 pa_subscription_event_type_t e;
1311 pa_assert(command == PA_COMMAND_SUBSCRIBE_EVENT);
1313 if (pa_tagstruct_getu32(t, &e) < 0 ||
1314 pa_tagstruct_getu32(t, &idx) < 0) {
1315 pa_log("Invalid protocol reply");
1316 pa_module_unload_request(u->module, TRUE);
1320 if (e != (PA_SUBSCRIPTION_EVENT_SERVER|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1322 e != (PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE) &&
1323 e != (PA_SUBSCRIPTION_EVENT_SINK|PA_SUBSCRIPTION_EVENT_CHANGE)
1325 e != (PA_SUBSCRIPTION_EVENT_SOURCE|PA_SUBSCRIPTION_EVENT_CHANGE)
1333 /* Called from main context */
1334 static void start_subscribe(struct userdata *u) {
1339 t = pa_tagstruct_new(NULL, 0);
1340 pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE);
1341 pa_tagstruct_putu32(t, tag = u->ctag++);
1342 pa_tagstruct_putu32(t, PA_SUBSCRIPTION_MASK_SERVER|
1344 PA_SUBSCRIPTION_MASK_SINK_INPUT|PA_SUBSCRIPTION_MASK_SINK
1346 PA_SUBSCRIPTION_MASK_SOURCE
1350 pa_pstream_send_tagstruct(u->pstream, t);
1353 /* Called from main context */
1354 static void create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1355 struct userdata *u = userdata;
1363 pa_assert(u->pdispatch == pd);
1365 if (command != PA_COMMAND_REPLY) {
1366 if (command == PA_COMMAND_ERROR)
1367 pa_log("Failed to create stream.");
1369 pa_log("Protocol error.");
1373 if (pa_tagstruct_getu32(t, &u->channel) < 0 ||
1374 pa_tagstruct_getu32(t, &u->device_index) < 0
1376 || pa_tagstruct_getu32(t, &bytes) < 0
1381 if (u->version >= 9) {
1383 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1384 pa_tagstruct_getu32(t, &u->tlength) < 0 ||
1385 pa_tagstruct_getu32(t, &u->prebuf) < 0 ||
1386 pa_tagstruct_getu32(t, &u->minreq) < 0)
1389 if (pa_tagstruct_getu32(t, &u->maxlength) < 0 ||
1390 pa_tagstruct_getu32(t, &u->fragsize) < 0)
1395 if (u->version >= 12) {
1398 uint32_t device_index;
1400 pa_bool_t suspended;
1402 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1403 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1404 pa_tagstruct_getu32(t, &device_index) < 0 ||
1405 pa_tagstruct_gets(t, &dn) < 0 ||
1406 pa_tagstruct_get_boolean(t, &suspended) < 0)
1410 pa_xfree(u->sink_name);
1411 u->sink_name = pa_xstrdup(dn);
1413 pa_xfree(u->source_name);
1414 u->source_name = pa_xstrdup(dn);
1418 if (u->version >= 13) {
1421 if (pa_tagstruct_get_usec(t, &usec) < 0)
1424 /* #ifdef TUNNEL_SINK */
1425 /* pa_sink_set_latency_range(u->sink, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1427 /* pa_source_set_latency_range(u->source, usec + MIN_NETWORK_LATENCY_USEC, 0); */
1431 if (!pa_tagstruct_eof(t))
1437 pa_assert(!u->time_event);
1438 pa_gettimeofday(&ntv);
1439 ntv.tv_sec += LATENCY_INTERVAL;
1440 u->time_event = u->core->mainloop->time_new(u->core->mainloop, &ntv, timeout_callback, u);
1444 pa_log_debug("Stream created.");
1447 pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_REQUEST, NULL, bytes, NULL, NULL);
1453 pa_log("Invalid reply. (Create stream)");
1456 pa_module_unload_request(u->module, TRUE);
1460 /* Called from main context */
1461 static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1462 struct userdata *u = userdata;
1463 pa_tagstruct *reply;
1464 char name[256], un[128], hn[128];
1471 pa_assert(u->pdispatch == pd);
1473 if (command != PA_COMMAND_REPLY ||
1474 pa_tagstruct_getu32(t, &u->version) < 0 ||
1475 !pa_tagstruct_eof(t)) {
1477 if (command == PA_COMMAND_ERROR)
1478 pa_log("Failed to authenticate");
1480 pa_log("Protocol error.");
1485 /* Minimum supported protocol version */
1486 if (u->version < 8) {
1487 pa_log("Incompatible protocol version");
1491 /* Starting with protocol version 13 the MSB of the version tag
1492 reflects if shm is enabled for this connection or not. We don't
1493 support SHM here at all, so we just ignore this. */
1495 if (u->version >= 13)
1496 u->version &= 0x7FFFFFFFU;
1498 pa_log_debug("Protocol version: remote %u, local %u", u->version, PA_PROTOCOL_VERSION);
1501 pa_proplist_setf(u->sink->proplist, "tunnel.remote_version", "%u", u->version);
1502 pa_sink_update_proplist(u->sink, 0, NULL);
1504 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1506 pa_get_user_name(un, sizeof(un)),
1507 pa_get_host_name(hn, sizeof(hn)));
1509 pa_proplist_setf(u->source->proplist, "tunnel.remote_version", "%u", u->version);
1510 pa_source_update_proplist(u->source, 0, NULL);
1512 pa_snprintf(name, sizeof(name), "%s for %s@%s",
1514 pa_get_user_name(un, sizeof(un)),
1515 pa_get_host_name(hn, sizeof(hn)));
1518 reply = pa_tagstruct_new(NULL, 0);
1519 pa_tagstruct_putu32(reply, PA_COMMAND_SET_CLIENT_NAME);
1520 pa_tagstruct_putu32(reply, tag = u->ctag++);
1522 if (u->version >= 13) {
1524 pl = pa_proplist_new();
1525 pa_proplist_sets(pl, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio");
1526 pa_proplist_sets(pl, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION);
1527 pa_init_proplist(pl);
1528 pa_tagstruct_put_proplist(reply, pl);
1529 pa_proplist_free(pl);
1531 pa_tagstruct_puts(reply, "PulseAudio");
1533 pa_pstream_send_tagstruct(u->pstream, reply);
1534 /* We ignore the server's reply here */
1536 reply = pa_tagstruct_new(NULL, 0);
1538 if (u->version < 13)
1539 /* Only for older PA versions we need to fill in the maxlength */
1540 u->maxlength = 4*1024*1024;
1543 u->tlength = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_TLENGTH_MSEC, &u->sink->sample_spec);
1544 u->minreq = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_MINREQ_MSEC, &u->sink->sample_spec);
1545 u->prebuf = u->tlength;
1547 u->fragsize = (uint32_t) pa_usec_to_bytes(PA_USEC_PER_MSEC * DEFAULT_FRAGSIZE_MSEC, &u->source->sample_spec);
1551 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_PLAYBACK_STREAM);
1552 pa_tagstruct_putu32(reply, tag = u->ctag++);
1554 if (u->version < 13)
1555 pa_tagstruct_puts(reply, name);
1557 pa_tagstruct_put_sample_spec(reply, &u->sink->sample_spec);
1558 pa_tagstruct_put_channel_map(reply, &u->sink->channel_map);
1559 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1560 pa_tagstruct_puts(reply, u->sink_name);
1561 pa_tagstruct_putu32(reply, u->maxlength);
1562 pa_tagstruct_put_boolean(reply, !PA_SINK_IS_OPENED(pa_sink_get_state(u->sink)));
1563 pa_tagstruct_putu32(reply, u->tlength);
1564 pa_tagstruct_putu32(reply, u->prebuf);
1565 pa_tagstruct_putu32(reply, u->minreq);
1566 pa_tagstruct_putu32(reply, 0);
1567 pa_cvolume_reset(&volume, u->sink->sample_spec.channels);
1568 pa_tagstruct_put_cvolume(reply, &volume);
1570 pa_tagstruct_putu32(reply, PA_COMMAND_CREATE_RECORD_STREAM);
1571 pa_tagstruct_putu32(reply, tag = u->ctag++);
1573 if (u->version < 13)
1574 pa_tagstruct_puts(reply, name);
1576 pa_tagstruct_put_sample_spec(reply, &u->source->sample_spec);
1577 pa_tagstruct_put_channel_map(reply, &u->source->channel_map);
1578 pa_tagstruct_putu32(reply, PA_INVALID_INDEX);
1579 pa_tagstruct_puts(reply, u->source_name);
1580 pa_tagstruct_putu32(reply, u->maxlength);
1581 pa_tagstruct_put_boolean(reply, !PA_SOURCE_IS_OPENED(pa_source_get_state(u->source)));
1582 pa_tagstruct_putu32(reply, u->fragsize);
1585 if (u->version >= 12) {
1586 pa_tagstruct_put_boolean(reply, FALSE); /* no_remap */
1587 pa_tagstruct_put_boolean(reply, FALSE); /* no_remix */
1588 pa_tagstruct_put_boolean(reply, FALSE); /* fix_format */
1589 pa_tagstruct_put_boolean(reply, FALSE); /* fix_rate */
1590 pa_tagstruct_put_boolean(reply, FALSE); /* fix_channels */
1591 pa_tagstruct_put_boolean(reply, TRUE); /* no_move */
1592 pa_tagstruct_put_boolean(reply, FALSE); /* variable_rate */
1595 if (u->version >= 13) {
1598 pa_tagstruct_put_boolean(reply, FALSE); /* start muted/peak detect*/
1599 pa_tagstruct_put_boolean(reply, TRUE); /* adjust_latency */
1601 pl = pa_proplist_new();
1602 pa_proplist_sets(pl, PA_PROP_MEDIA_NAME, name);
1603 pa_proplist_sets(pl, PA_PROP_MEDIA_ROLE, "abstract");
1604 pa_tagstruct_put_proplist(reply, pl);
1605 pa_proplist_free(pl);
1608 pa_tagstruct_putu32(reply, PA_INVALID_INDEX); /* direct on input */
1612 if (u->version >= 14) {
1614 pa_tagstruct_put_boolean(reply, FALSE); /* volume_set */
1616 pa_tagstruct_put_boolean(reply, TRUE); /* early rquests */
1619 if (u->version >= 15) {
1621 pa_tagstruct_put_boolean(reply, FALSE); /* muted_set */
1623 pa_tagstruct_put_boolean(reply, FALSE); /* don't inhibit auto suspend */
1624 pa_tagstruct_put_boolean(reply, FALSE); /* fail on suspend */
1627 pa_pstream_send_tagstruct(u->pstream, reply);
1628 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, u, NULL);
1630 pa_log_debug("Connection authenticated, creating stream ...");
1635 pa_module_unload_request(u->module, TRUE);
1638 /* Called from main context */
1639 static void pstream_die_callback(pa_pstream *p, void *userdata) {
1640 struct userdata *u = userdata;
1645 pa_log_warn("Stream died.");
1646 pa_module_unload_request(u->module, TRUE);
1649 /* Called from main context */
1650 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
1651 struct userdata *u = userdata;
1657 if (pa_pdispatch_run(u->pdispatch, packet, creds, u) < 0) {
1658 pa_log("Invalid packet");
1659 pa_module_unload_request(u->module, TRUE);
1665 /* Called from main context */
1666 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
1667 struct userdata *u = userdata;
1673 if (channel != u->channel) {
1674 pa_log("Recieved memory block on bad channel.");
1675 pa_module_unload_request(u->module, TRUE);
1679 pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_POST, PA_UINT_TO_PTR(seek), offset, chunk);
1681 u->counter_delta += (int64_t) chunk->length;
1685 /* Called from main context */
1686 static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
1687 struct userdata *u = userdata;
1693 pa_assert(u->client == sc);
1695 pa_socket_client_unref(u->client);
1699 pa_log("Connection failed: %s", pa_cstrerror(errno));
1700 pa_module_unload_request(u->module, TRUE);
1704 u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
1705 u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
1707 pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
1708 pa_pstream_set_recieve_packet_callback(u->pstream, pstream_packet_callback, u);
1710 pa_pstream_set_recieve_memblock_callback(u->pstream, pstream_memblock_callback, u);
1713 t = pa_tagstruct_new(NULL, 0);
1714 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
1715 pa_tagstruct_putu32(t, tag = u->ctag++);
1716 pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION);
1718 pa_tagstruct_put_arbitrary(t, pa_auth_cookie_read(u->auth_cookie, PA_NATIVE_COOKIE_LENGTH), PA_NATIVE_COOKIE_LENGTH);
1724 if (pa_iochannel_creds_supported(io))
1725 pa_iochannel_creds_enable(io);
1727 ucred.uid = getuid();
1728 ucred.gid = getgid();
1730 pa_pstream_send_tagstruct_with_creds(u->pstream, t, &ucred);
1733 pa_pstream_send_tagstruct(u->pstream, t);
1736 pa_pdispatch_register_reply(u->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, u, NULL);
1738 pa_log_debug("Connection established, authenticating ...");
1743 /* Called from main context */
1744 static void sink_set_volume(pa_sink *sink) {
1753 t = pa_tagstruct_new(NULL, 0);
1754 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_VOLUME);
1755 pa_tagstruct_putu32(t, tag = u->ctag++);
1756 pa_tagstruct_putu32(t, u->device_index);
1757 pa_tagstruct_put_cvolume(t, &sink->virtual_volume);
1758 pa_pstream_send_tagstruct(u->pstream, t);
1761 /* Called from main context */
1762 static void sink_set_mute(pa_sink *sink) {
1771 if (u->version < 11)
1774 t = pa_tagstruct_new(NULL, 0);
1775 pa_tagstruct_putu32(t, PA_COMMAND_SET_SINK_INPUT_MUTE);
1776 pa_tagstruct_putu32(t, tag = u->ctag++);
1777 pa_tagstruct_putu32(t, u->device_index);
1778 pa_tagstruct_put_boolean(t, !!sink->muted);
1779 pa_pstream_send_tagstruct(u->pstream, t);
1784 int pa__init(pa_module*m) {
1785 pa_modargs *ma = NULL;
1786 struct userdata *u = NULL;
1791 pa_sink_new_data data;
1793 pa_source_new_data data;
1798 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1799 pa_log("Failed to parse module arguments");
1803 m->userdata = u = pa_xnew0(struct userdata, 1);
1807 u->pdispatch = NULL;
1809 u->server_name = NULL;
1811 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));;
1813 u->requested_bytes = 0;
1815 u->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));;
1818 u->smoother = pa_smoother_new(
1827 u->device_index = u->channel = PA_INVALID_INDEX;
1828 u->time_event = NULL;
1829 u->ignore_latency_before = 0;
1830 u->transport_usec = u->thread_transport_usec = 0;
1831 u->remote_suspended = u->remote_corked = FALSE;
1832 u->counter = u->counter_delta = 0;
1834 u->rtpoll = pa_rtpoll_new();
1835 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1837 if (!(u->auth_cookie = pa_auth_cookie_get(u->core, pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), PA_NATIVE_COOKIE_LENGTH)))
1840 if (!(u->server_name = pa_xstrdup(pa_modargs_get_value(ma, "server", NULL)))) {
1841 pa_log("No server specified.");
1845 ss = m->core->default_sample_spec;
1846 map = m->core->default_channel_map;
1847 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1848 pa_log("Invalid sample format specification");
1852 if (!(u->client = pa_socket_client_new_string(m->core->mainloop, u->server_name, PA_NATIVE_DEFAULT_PORT))) {
1853 pa_log("Failed to connect to server '%s'", u->server_name);
1857 pa_socket_client_set_callback(u->client, on_connection, u);
1861 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1862 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1864 pa_sink_new_data_init(&data);
1865 data.driver = __FILE__;
1867 data.namereg_fail = TRUE;
1868 pa_sink_new_data_set_name(&data, dn);
1869 pa_sink_new_data_set_sample_spec(&data, &ss);
1870 pa_sink_new_data_set_channel_map(&data, &map);
1871 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
1872 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1874 pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
1876 u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY|PA_SINK_HW_VOLUME_CTRL|PA_SINK_HW_MUTE_CTRL);
1877 pa_sink_new_data_done(&data);
1880 pa_log("Failed to create sink.");
1884 u->sink->parent.process_msg = sink_process_msg;
1885 u->sink->userdata = u;
1886 u->sink->set_state = sink_set_state;
1887 u->sink->set_volume = sink_set_volume;
1888 u->sink->set_mute = sink_set_mute;
1890 u->sink->refresh_volume = u->sink->refresh_muted = FALSE;
1892 /* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
1894 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
1895 pa_sink_set_rtpoll(u->sink, u->rtpoll);
1899 if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1900 dn = pa_sprintf_malloc("tunnel.%s", u->server_name);
1902 pa_source_new_data_init(&data);
1903 data.driver = __FILE__;
1905 data.namereg_fail = TRUE;
1906 pa_source_new_data_set_name(&data, dn);
1907 pa_source_new_data_set_sample_spec(&data, &ss);
1908 pa_source_new_data_set_channel_map(&data, &map);
1909 pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
1910 pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
1912 pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
1914 u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
1915 pa_source_new_data_done(&data);
1918 pa_log("Failed to create source.");
1922 u->source->parent.process_msg = source_process_msg;
1923 u->source->set_state = source_set_state;
1924 u->source->userdata = u;
1926 /* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
1928 pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
1929 pa_source_set_rtpoll(u->source, u->rtpoll);
1934 u->time_event = NULL;
1936 u->maxlength = (uint32_t) -1;
1938 u->tlength = u->minreq = u->prebuf = (uint32_t) -1;
1940 u->fragsize = (uint32_t) -1;
1943 if (!(u->thread = pa_thread_new(thread_func, u))) {
1944 pa_log("Failed to create thread.");
1949 pa_sink_put(u->sink);
1951 pa_source_put(u->source);
1954 pa_modargs_free(ma);
1962 pa_modargs_free(ma);
1969 void pa__done(pa_module*m) {
1974 if (!(u = m->userdata))
1979 pa_sink_unlink(u->sink);
1982 pa_source_unlink(u->source);
1986 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1987 pa_thread_free(u->thread);
1990 pa_thread_mq_done(&u->thread_mq);
1994 pa_sink_unref(u->sink);
1997 pa_source_unref(u->source);
2001 pa_rtpoll_free(u->rtpoll);
2004 pa_pstream_unlink(u->pstream);
2005 pa_pstream_unref(u->pstream);
2009 pa_pdispatch_unref(u->pdispatch);
2012 pa_socket_client_unref(u->client);
2015 pa_auth_cookie_unref(u->auth_cookie);
2018 pa_smoother_free(u->smoother);
2021 u->core->mainloop->time_free(u->time_event);
2024 pa_xfree(u->sink_name);
2026 pa_xfree(u->source_name);
2028 pa_xfree(u->server_name);
2030 pa_xfree(u->device_description);
2031 pa_xfree(u->server_fqdn);
2032 pa_xfree(u->user_name);