format: Avoid some code duplication
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38 #include <pulse/internal.h>
39
40 #include <pulsecore/native-common.h>
41 #include <pulsecore/packet.h>
42 #include <pulsecore/client.h>
43 #include <pulsecore/source-output.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pstream.h>
46 #include <pulsecore/tagstruct.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/authkey.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/core-scache.h>
52 #include <pulsecore/core-subscribe.h>
53 #include <pulsecore/log.h>
54 #include <pulsecore/strlist.h>
55 #include <pulsecore/shared.h>
56 #include <pulsecore/sample-util.h>
57 #include <pulsecore/llist.h>
58 #include <pulsecore/creds.h>
59 #include <pulsecore/core-util.h>
60 #include <pulsecore/ipacl.h>
61 #include <pulsecore/thread-mq.h>
62
63 #include "protocol-native.h"
64
65 /* Kick a client if it doesn't authenticate within this time */
66 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
67
68 /* Don't accept more connection than this */
69 #define MAX_CONNECTIONS 64
70
71 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
72 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
73 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
74 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
75
76 struct pa_native_protocol;
77
78 typedef struct record_stream {
79     pa_msgobject parent;
80
81     pa_native_connection *connection;
82     uint32_t index;
83
84     pa_source_output *source_output;
85     pa_memblockq *memblockq;
86
87     pa_bool_t adjust_latency:1;
88     pa_bool_t early_requests:1;
89
90     pa_buffer_attr buffer_attr;
91
92     pa_atomic_t on_the_fly;
93     pa_usec_t configured_source_latency;
94     size_t drop_initial;
95
96     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
97     size_t on_the_fly_snapshot;
98     pa_usec_t current_monitor_latency;
99     pa_usec_t current_source_latency;
100 } record_stream;
101
102 #define RECORD_STREAM(o) (record_stream_cast(o))
103 PA_DEFINE_PRIVATE_CLASS(record_stream, pa_msgobject);
104
105 typedef struct output_stream {
106     pa_msgobject parent;
107 } output_stream;
108
109 #define OUTPUT_STREAM(o) (output_stream_cast(o))
110 PA_DEFINE_PRIVATE_CLASS(output_stream, pa_msgobject);
111
112 typedef struct playback_stream {
113     output_stream parent;
114
115     pa_native_connection *connection;
116     uint32_t index;
117
118     pa_sink_input *sink_input;
119     pa_memblockq *memblockq;
120
121     pa_bool_t adjust_latency:1;
122     pa_bool_t early_requests:1;
123
124     pa_bool_t is_underrun:1;
125     pa_bool_t drain_request:1;
126     uint32_t drain_tag;
127     uint32_t syncid;
128
129     /* Optimization to avoid too many rewinds with a lot of small blocks */
130     pa_atomic_t seek_or_post_in_queue;
131     int64_t seek_windex;
132
133     pa_atomic_t missing;
134     pa_usec_t configured_sink_latency;
135     pa_buffer_attr buffer_attr;
136
137     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
138     int64_t read_index, write_index;
139     size_t render_memblockq_length;
140     pa_usec_t current_sink_latency;
141     uint64_t playing_for, underrun_for;
142 } playback_stream;
143
144 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
145 PA_DEFINE_PRIVATE_CLASS(playback_stream, output_stream);
146
147 typedef struct upload_stream {
148     output_stream parent;
149
150     pa_native_connection *connection;
151     uint32_t index;
152
153     pa_memchunk memchunk;
154     size_t length;
155     char *name;
156     pa_sample_spec sample_spec;
157     pa_channel_map channel_map;
158     pa_proplist *proplist;
159 } upload_stream;
160
161 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
162 PA_DEFINE_PRIVATE_CLASS(upload_stream, output_stream);
163
164 struct pa_native_connection {
165     pa_msgobject parent;
166     pa_native_protocol *protocol;
167     pa_native_options *options;
168     pa_bool_t authorized:1;
169     pa_bool_t is_local:1;
170     uint32_t version;
171     pa_client *client;
172     pa_pstream *pstream;
173     pa_pdispatch *pdispatch;
174     pa_idxset *record_streams, *output_streams;
175     uint32_t rrobin_index;
176     pa_subscription *subscription;
177     pa_time_event *auth_timeout_event;
178 };
179
180 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
181 PA_DEFINE_PRIVATE_CLASS(pa_native_connection, pa_msgobject);
182
183 struct pa_native_protocol {
184     PA_REFCNT_DECLARE;
185
186     pa_core *core;
187     pa_idxset *connections;
188
189     pa_strlist *servers;
190     pa_hook hooks[PA_NATIVE_HOOK_MAX];
191
192     pa_hashmap *extensions;
193 };
194
195 enum {
196     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
197 };
198
199 enum {
200     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
201     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
202     SINK_INPUT_MESSAGE_FLUSH,
203     SINK_INPUT_MESSAGE_TRIGGER,
204     SINK_INPUT_MESSAGE_SEEK,
205     SINK_INPUT_MESSAGE_PREBUF_FORCE,
206     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
207     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
208 };
209
210 enum {
211     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
212     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
213     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
214     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
215     PLAYBACK_STREAM_MESSAGE_STARTED,
216     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
217 };
218
219 enum {
220     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
221 };
222
223 enum {
224     CONNECTION_MESSAGE_RELEASE,
225     CONNECTION_MESSAGE_REVOKE
226 };
227
228 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
229 static void sink_input_kill_cb(pa_sink_input *i);
230 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
231 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
232 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
235 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
236
237 static void native_connection_send_memblock(pa_native_connection *c);
238 static void playback_stream_request_bytes(struct playback_stream*s);
239
240 static void source_output_kill_cb(pa_source_output *o);
241 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
242 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
243 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
244 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
245 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
246
247 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
249
250 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
289
290 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
291     [PA_COMMAND_ERROR] = NULL,
292     [PA_COMMAND_TIMEOUT] = NULL,
293     [PA_COMMAND_REPLY] = NULL,
294     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
295     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
296     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
297     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
298     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
299     [PA_COMMAND_AUTH] = command_auth,
300     [PA_COMMAND_REQUEST] = NULL,
301     [PA_COMMAND_EXIT] = command_exit,
302     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
303     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
304     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
305     [PA_COMMAND_STAT] = command_stat,
306     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
307     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
308     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
309     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
310     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
311     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
312     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
313     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
314     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
315     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
316     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
317     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
318     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
319     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
320     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
321     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
328     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
329     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
330     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
331
332     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
333     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
334     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
335
336     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
337     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
338     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
339
340     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
341     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
342
343     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
344     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
346     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
347
348     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
349     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
350
351     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
352     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
353     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
354     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
355     [PA_COMMAND_KILL_CLIENT] = command_kill,
356     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
357     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
358     [PA_COMMAND_LOAD_MODULE] = command_load_module,
359     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
360
361     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
362     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
363     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
364     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
365
366     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
367     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
368
369     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
370     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
371
372     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
373     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
374
375     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
376     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
377     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
378
379     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
380     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
381     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
382
383     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
384
385     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
386     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
387
388     [PA_COMMAND_EXTENSION] = command_extension
389 };
390
391 /* structure management */
392
393 /* Called from main context */
394 static void upload_stream_unlink(upload_stream *s) {
395     pa_assert(s);
396
397     if (!s->connection)
398         return;
399
400     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
401     s->connection = NULL;
402     upload_stream_unref(s);
403 }
404
405 /* Called from main context */
406 static void upload_stream_free(pa_object *o) {
407     upload_stream *s = UPLOAD_STREAM(o);
408     pa_assert(s);
409
410     upload_stream_unlink(s);
411
412     pa_xfree(s->name);
413
414     if (s->proplist)
415         pa_proplist_free(s->proplist);
416
417     if (s->memchunk.memblock)
418         pa_memblock_unref(s->memchunk.memblock);
419
420     pa_xfree(s);
421 }
422
423 /* Called from main context */
424 static upload_stream* upload_stream_new(
425         pa_native_connection *c,
426         const pa_sample_spec *ss,
427         const pa_channel_map *map,
428         const char *name,
429         size_t length,
430         pa_proplist *p) {
431
432     upload_stream *s;
433
434     pa_assert(c);
435     pa_assert(ss);
436     pa_assert(name);
437     pa_assert(length > 0);
438     pa_assert(p);
439
440     s = pa_msgobject_new(upload_stream);
441     s->parent.parent.parent.free = upload_stream_free;
442     s->connection = c;
443     s->sample_spec = *ss;
444     s->channel_map = *map;
445     s->name = pa_xstrdup(name);
446     pa_memchunk_reset(&s->memchunk);
447     s->length = length;
448     s->proplist = pa_proplist_copy(p);
449     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
450
451     pa_idxset_put(c->output_streams, s, &s->index);
452
453     return s;
454 }
455
456 /* Called from main context */
457 static void record_stream_unlink(record_stream *s) {
458     pa_assert(s);
459
460     if (!s->connection)
461         return;
462
463     if (s->source_output) {
464         pa_source_output_unlink(s->source_output);
465         pa_source_output_unref(s->source_output);
466         s->source_output = NULL;
467     }
468
469     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
470     s->connection = NULL;
471     record_stream_unref(s);
472 }
473
474 /* Called from main context */
475 static void record_stream_free(pa_object *o) {
476     record_stream *s = RECORD_STREAM(o);
477     pa_assert(s);
478
479     record_stream_unlink(s);
480
481     pa_memblockq_free(s->memblockq);
482     pa_xfree(s);
483 }
484
485 /* Called from main context */
486 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
487     record_stream *s = RECORD_STREAM(o);
488     record_stream_assert_ref(s);
489
490     if (!s->connection)
491         return -1;
492
493     switch (code) {
494
495         case RECORD_STREAM_MESSAGE_POST_DATA:
496
497             /* We try to keep up to date with how many bytes are
498              * currently on the fly */
499             pa_atomic_sub(&s->on_the_fly, chunk->length);
500
501             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
502 /*                 pa_log_warn("Failed to push data into output queue."); */
503                 return -1;
504             }
505
506             if (!pa_pstream_is_pending(s->connection->pstream))
507                 native_connection_send_memblock(s->connection);
508
509             break;
510     }
511
512     return 0;
513 }
514
515 /* Called from main context */
516 static void fix_record_buffer_attr_pre(record_stream *s) {
517
518     size_t frame_size;
519     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
520
521     pa_assert(s);
522
523     /* This function will be called from the main thread, before as
524      * well as after the source output has been activated using
525      * pa_source_output_put()! That means it may not touch any
526      * ->thread_info data! */
527
528     frame_size = pa_frame_size(&s->source_output->sample_spec);
529
530     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
531         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
532     if (s->buffer_attr.maxlength <= 0)
533         s->buffer_attr.maxlength = (uint32_t) frame_size;
534
535     if (s->buffer_attr.fragsize == (uint32_t) -1)
536         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
537     if (s->buffer_attr.fragsize <= 0)
538         s->buffer_attr.fragsize = (uint32_t) frame_size;
539
540     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
541
542     if (s->early_requests) {
543
544         /* In early request mode we need to emulate the classic
545          * fragment-based playback model. We do this setting the source
546          * latency to the fragment size. */
547
548         source_usec = fragsize_usec;
549
550     } else if (s->adjust_latency) {
551
552         /* So, the user asked us to adjust the latency according to
553          * what the source can provide. Half the latency will be
554          * spent on the hw buffer, half of it in the async buffer
555          * queue we maintain for each client. */
556
557         source_usec = fragsize_usec/2;
558
559     } else {
560
561         /* Ok, the user didn't ask us to adjust the latency, hence we
562          * don't */
563
564         source_usec = (pa_usec_t) -1;
565     }
566
567     if (source_usec != (pa_usec_t) -1)
568         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
569     else
570         s->configured_source_latency = 0;
571
572     if (s->early_requests) {
573
574         /* Ok, we didn't necessarily get what we were asking for, so
575          * let's tell the user */
576
577         fragsize_usec = s->configured_source_latency;
578
579     } else if (s->adjust_latency) {
580
581         /* Now subtract what we actually got */
582
583         if (fragsize_usec >= s->configured_source_latency*2)
584             fragsize_usec -= s->configured_source_latency;
585         else
586             fragsize_usec = s->configured_source_latency;
587     }
588
589     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
590         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
591
592         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
593
594     if (s->buffer_attr.fragsize <= 0)
595         s->buffer_attr.fragsize = (uint32_t) frame_size;
596 }
597
598 /* Called from main context */
599 static void fix_record_buffer_attr_post(record_stream *s) {
600     size_t base;
601
602     pa_assert(s);
603
604     /* This function will be called from the main thread, before as
605      * well as after the source output has been activated using
606      * pa_source_output_put()! That means it may not touch and
607      * ->thread_info data! */
608
609     base = pa_frame_size(&s->source_output->sample_spec);
610
611     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
612     if (s->buffer_attr.fragsize <= 0)
613         s->buffer_attr.fragsize = base;
614
615     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
616         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
617 }
618
619 /* Called from main context */
620 static record_stream* record_stream_new(
621         pa_native_connection *c,
622         pa_source *source,
623         pa_sample_spec *ss,
624         pa_channel_map *map,
625         pa_bool_t peak_detect,
626         pa_buffer_attr *attr,
627         pa_source_output_flags_t flags,
628         pa_proplist *p,
629         pa_bool_t adjust_latency,
630         pa_sink_input *direct_on_input,
631         pa_bool_t early_requests,
632         int *ret) {
633
634     record_stream *s;
635     pa_source_output *source_output = NULL;
636     pa_source_output_new_data data;
637
638     pa_assert(c);
639     pa_assert(ss);
640     pa_assert(p);
641     pa_assert(ret);
642
643     pa_source_output_new_data_init(&data);
644
645     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
646     data.driver = __FILE__;
647     data.module = c->options->module;
648     data.client = c->client;
649     data.source = source;
650     data.direct_on_input = direct_on_input;
651     pa_source_output_new_data_set_sample_spec(&data, ss);
652     pa_source_output_new_data_set_channel_map(&data, map);
653     if (peak_detect)
654         data.resample_method = PA_RESAMPLER_PEAKS;
655     data.flags = flags;
656
657     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data);
658
659     pa_source_output_new_data_done(&data);
660
661     if (!source_output)
662         return NULL;
663
664     s = pa_msgobject_new(record_stream);
665     s->parent.parent.free = record_stream_free;
666     s->parent.process_msg = record_stream_process_msg;
667     s->connection = c;
668     s->source_output = source_output;
669     s->buffer_attr = *attr;
670     s->adjust_latency = adjust_latency;
671     s->early_requests = early_requests;
672     pa_atomic_store(&s->on_the_fly, 0);
673
674     s->source_output->parent.process_msg = source_output_process_msg;
675     s->source_output->push = source_output_push_cb;
676     s->source_output->kill = source_output_kill_cb;
677     s->source_output->get_latency = source_output_get_latency_cb;
678     s->source_output->moving = source_output_moving_cb;
679     s->source_output->suspend = source_output_suspend_cb;
680     s->source_output->send_event = source_output_send_event_cb;
681     s->source_output->userdata = s;
682
683     fix_record_buffer_attr_pre(s);
684
685     s->memblockq = pa_memblockq_new(
686             0,
687             s->buffer_attr.maxlength,
688             0,
689             pa_frame_size(&source_output->sample_spec),
690             1,
691             0,
692             0,
693             NULL);
694
695     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
696     fix_record_buffer_attr_post(s);
697
698     *ss = s->source_output->sample_spec;
699     *map = s->source_output->channel_map;
700
701     pa_idxset_put(c->record_streams, s, &s->index);
702
703     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
704                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
705                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
706                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
707
708     pa_source_output_put(s->source_output);
709     return s;
710 }
711
712 /* Called from main context */
713 static void record_stream_send_killed(record_stream *r) {
714     pa_tagstruct *t;
715     record_stream_assert_ref(r);
716
717     t = pa_tagstruct_new(NULL, 0);
718     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
719     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
720     pa_tagstruct_putu32(t, r->index);
721     pa_pstream_send_tagstruct(r->connection->pstream, t);
722 }
723
724 /* Called from main context */
725 static void playback_stream_unlink(playback_stream *s) {
726     pa_assert(s);
727
728     if (!s->connection)
729         return;
730
731     if (s->sink_input) {
732         pa_sink_input_unlink(s->sink_input);
733         pa_sink_input_unref(s->sink_input);
734         s->sink_input = NULL;
735     }
736
737     if (s->drain_request)
738         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
739
740     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
741     s->connection = NULL;
742     playback_stream_unref(s);
743 }
744
745 /* Called from main context */
746 static void playback_stream_free(pa_object* o) {
747     playback_stream *s = PLAYBACK_STREAM(o);
748     pa_assert(s);
749
750     playback_stream_unlink(s);
751
752     pa_memblockq_free(s->memblockq);
753     pa_xfree(s);
754 }
755
756 /* Called from main context */
757 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
758     playback_stream *s = PLAYBACK_STREAM(o);
759     playback_stream_assert_ref(s);
760
761     if (!s->connection)
762         return -1;
763
764     switch (code) {
765
766         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
767             pa_tagstruct *t;
768             int l = 0;
769
770             for (;;) {
771                 if ((l = pa_atomic_load(&s->missing)) <= 0)
772                     return 0;
773
774                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
775                     break;
776             }
777
778             t = pa_tagstruct_new(NULL, 0);
779             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
780             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
781             pa_tagstruct_putu32(t, s->index);
782             pa_tagstruct_putu32(t, (uint32_t) l);
783             pa_pstream_send_tagstruct(s->connection->pstream, t);
784
785 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
786             break;
787         }
788
789         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
790             pa_tagstruct *t;
791
792 /*             pa_log("signalling underflow"); */
793
794             /* Report that we're empty */
795             t = pa_tagstruct_new(NULL, 0);
796             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
797             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
798             pa_tagstruct_putu32(t, s->index);
799             pa_pstream_send_tagstruct(s->connection->pstream, t);
800             break;
801         }
802
803         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
804             pa_tagstruct *t;
805
806             /* Notify the user we're overflowed*/
807             t = pa_tagstruct_new(NULL, 0);
808             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
809             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
810             pa_tagstruct_putu32(t, s->index);
811             pa_pstream_send_tagstruct(s->connection->pstream, t);
812             break;
813         }
814
815         case PLAYBACK_STREAM_MESSAGE_STARTED:
816
817             if (s->connection->version >= 13) {
818                 pa_tagstruct *t;
819
820                 /* Notify the user we started playback */
821                 t = pa_tagstruct_new(NULL, 0);
822                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
823                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
824                 pa_tagstruct_putu32(t, s->index);
825                 pa_pstream_send_tagstruct(s->connection->pstream, t);
826             }
827
828             break;
829
830         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
831             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
832             break;
833
834         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH:
835
836             s->buffer_attr.tlength = (uint32_t) offset;
837
838             if (s->connection->version >= 15) {
839                 pa_tagstruct *t;
840
841                 t = pa_tagstruct_new(NULL, 0);
842                 pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
843                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
844                 pa_tagstruct_putu32(t, s->index);
845                 pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
846                 pa_tagstruct_putu32(t, s->buffer_attr.tlength);
847                 pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
848                 pa_tagstruct_putu32(t, s->buffer_attr.minreq);
849                 pa_tagstruct_put_usec(t, s->configured_sink_latency);
850                 pa_pstream_send_tagstruct(s->connection->pstream, t);
851             }
852
853             break;
854     }
855
856     return 0;
857 }
858
859 /* Called from main context */
860 static void fix_playback_buffer_attr(playback_stream *s) {
861     size_t frame_size, max_prebuf;
862     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
863
864     pa_assert(s);
865
866     /* pa_log("Client requested: maxlength=%li bytes tlength=%li bytes minreq=%li bytes prebuf=%li bytes", */
867     /*        (long) s->buffer_attr.maxlength, */
868     /*        (long) s->buffer_attr.tlength, */
869     /*        (long) s->buffer_attr.minreq, */
870     /*        (long) s->buffer_attr.prebuf); */
871
872     /* pa_log("Client requested: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms", */
873     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
874     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
875     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
876     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC)); */
877
878     /* This function will be called from the main thread, before as
879      * well as after the sink input has been activated using
880      * pa_sink_input_put()! That means it may not touch any
881      * ->thread_info data, such as the memblockq! */
882
883     frame_size = pa_frame_size(&s->sink_input->sample_spec);
884
885     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
886         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
887     if (s->buffer_attr.maxlength <= 0)
888         s->buffer_attr.maxlength = (uint32_t) frame_size;
889
890     if (s->buffer_attr.tlength == (uint32_t) -1)
891         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
892     if (s->buffer_attr.tlength <= 0)
893         s->buffer_attr.tlength = (uint32_t) frame_size;
894
895     if (s->buffer_attr.minreq == (uint32_t) -1)
896         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
897     if (s->buffer_attr.minreq <= 0)
898         s->buffer_attr.minreq = (uint32_t) frame_size;
899
900     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
901         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
902
903     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
904     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
905
906     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
907                 (double) tlength_usec / PA_USEC_PER_MSEC,
908                 (double) minreq_usec / PA_USEC_PER_MSEC);
909
910     if (s->early_requests) {
911
912         /* In early request mode we need to emulate the classic
913          * fragment-based playback model. We do this setting the sink
914          * latency to the fragment size. */
915
916         sink_usec = minreq_usec;
917         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
918
919     } else if (s->adjust_latency) {
920
921         /* So, the user asked us to adjust the latency of the stream
922          * buffer according to the what the sink can provide. The
923          * tlength passed in shall be the overall latency. Roughly
924          * half the latency will be spent on the hw buffer, the other
925          * half of it in the async buffer queue we maintain for each
926          * client. In between we'll have a safety space of size
927          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
928          * empty and needs to be filled, then our buffer must have
929          * enough data to fulfill this request immediatly and thus
930          * have at least the same tlength as the size of the hw
931          * buffer. It additionally needs space for 2 times minreq
932          * because if the buffer ran empty and a partial fillup
933          * happens immediately on the next iteration we need to be
934          * able to fulfill it and give the application also minreq
935          * time to fill it up again for the next request Makes 2 times
936          * minreq in plus.. */
937
938         if (tlength_usec > minreq_usec*2)
939             sink_usec = (tlength_usec - minreq_usec*2)/2;
940         else
941             sink_usec = 0;
942
943         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
944
945     } else {
946
947         /* Ok, the user didn't ask us to adjust the latency, but we
948          * still need to make sure that the parameters from the user
949          * do make sense. */
950
951         if (tlength_usec > minreq_usec*2)
952             sink_usec = (tlength_usec - minreq_usec*2);
953         else
954             sink_usec = 0;
955
956         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
957     }
958
959     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
960
961     if (s->early_requests) {
962
963         /* Ok, we didn't necessarily get what we were asking for, so
964          * let's tell the user */
965
966         minreq_usec = s->configured_sink_latency;
967
968     } else if (s->adjust_latency) {
969
970         /* Ok, we didn't necessarily get what we were asking for, so
971          * let's subtract from what we asked for for the remaining
972          * buffer space */
973
974         if (tlength_usec >= s->configured_sink_latency)
975             tlength_usec -= s->configured_sink_latency;
976     }
977
978     /* FIXME: This is actually larger than necessary, since not all of
979      * the sink latency is actually rewritable. */
980     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
981         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
982
983     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
984         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
985         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
986
987     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
988         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
989         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
990
991     if (s->buffer_attr.minreq <= 0) {
992         s->buffer_attr.minreq = (uint32_t) frame_size;
993         s->buffer_attr.tlength += (uint32_t) frame_size*2;
994     }
995
996     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
997         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
998
999     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
1000
1001     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
1002         s->buffer_attr.prebuf > max_prebuf)
1003         s->buffer_attr.prebuf = max_prebuf;
1004
1005     /* pa_log("Client accepted: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms", */
1006     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
1007     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
1008     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
1009     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC)); */
1010 }
1011
1012 /* Called from main context */
1013 static playback_stream* playback_stream_new(
1014         pa_native_connection *c,
1015         pa_sink *sink,
1016         pa_sample_spec *ss,
1017         pa_channel_map *map,
1018         pa_idxset *formats,
1019         pa_buffer_attr *a,
1020         pa_cvolume *volume,
1021         pa_bool_t muted,
1022         pa_bool_t muted_set,
1023         uint32_t syncid,
1024         uint32_t *missing,
1025         pa_sink_input_flags_t flags,
1026         pa_proplist *p,
1027         pa_bool_t adjust_latency,
1028         pa_bool_t early_requests,
1029         pa_bool_t relative_volume,
1030         int *ret) {
1031
1032     playback_stream *s, *ssync;
1033     pa_sink_input *sink_input = NULL;
1034     pa_memchunk silence;
1035     uint32_t idx;
1036     int64_t start_index;
1037     pa_sink_input_new_data data;
1038
1039     pa_assert(c);
1040     pa_assert(ss);
1041     pa_assert(missing);
1042     pa_assert(p);
1043     pa_assert(ret);
1044
1045     /* Find syncid group */
1046     PA_IDXSET_FOREACH(ssync, c->output_streams, idx) {
1047
1048         if (!playback_stream_isinstance(ssync))
1049             continue;
1050
1051         if (ssync->syncid == syncid)
1052             break;
1053     }
1054
1055     /* Synced streams must connect to the same sink */
1056     if (ssync) {
1057
1058         if (!sink)
1059             sink = ssync->sink_input->sink;
1060         else if (sink != ssync->sink_input->sink) {
1061             *ret = PA_ERR_INVALID;
1062             return NULL;
1063         }
1064     }
1065
1066     pa_sink_input_new_data_init(&data);
1067
1068     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1069     data.driver = __FILE__;
1070     data.module = c->options->module;
1071     data.client = c->client;
1072     if (sink)
1073         pa_sink_input_new_data_set_sink(&data, sink, TRUE);
1074     if (pa_sample_spec_valid(ss))
1075         pa_sink_input_new_data_set_sample_spec(&data, ss);
1076     if (pa_channel_map_valid(map))
1077         pa_sink_input_new_data_set_channel_map(&data, map);
1078     if (formats)
1079         pa_sink_input_new_data_set_formats(&data, formats);
1080     if (volume) {
1081         pa_sink_input_new_data_set_volume(&data, volume);
1082         data.volume_is_absolute = !relative_volume;
1083         data.save_volume = TRUE;
1084     }
1085     if (muted_set) {
1086         pa_sink_input_new_data_set_muted(&data, muted);
1087         data.save_muted = TRUE;
1088     }
1089     data.sync_base = ssync ? ssync->sink_input : NULL;
1090     data.flags = flags;
1091
1092     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data);
1093
1094     pa_sink_input_new_data_done(&data);
1095
1096     if (!sink_input)
1097         return NULL;
1098
1099     s = pa_msgobject_new(playback_stream);
1100     s->parent.parent.parent.free = playback_stream_free;
1101     s->parent.parent.process_msg = playback_stream_process_msg;
1102     s->connection = c;
1103     s->syncid = syncid;
1104     s->sink_input = sink_input;
1105     s->is_underrun = TRUE;
1106     s->drain_request = FALSE;
1107     pa_atomic_store(&s->missing, 0);
1108     s->buffer_attr = *a;
1109     s->adjust_latency = adjust_latency;
1110     s->early_requests = early_requests;
1111     pa_atomic_store(&s->seek_or_post_in_queue, 0);
1112     s->seek_windex = -1;
1113
1114     s->sink_input->parent.process_msg = sink_input_process_msg;
1115     s->sink_input->pop = sink_input_pop_cb;
1116     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1117     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1118     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1119     s->sink_input->kill = sink_input_kill_cb;
1120     s->sink_input->moving = sink_input_moving_cb;
1121     s->sink_input->suspend = sink_input_suspend_cb;
1122     s->sink_input->send_event = sink_input_send_event_cb;
1123     s->sink_input->userdata = s;
1124
1125     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1126
1127     fix_playback_buffer_attr(s);
1128
1129     pa_sink_input_get_silence(sink_input, &silence);
1130     s->memblockq = pa_memblockq_new(
1131             start_index,
1132             s->buffer_attr.maxlength,
1133             s->buffer_attr.tlength,
1134             pa_frame_size(&sink_input->sample_spec),
1135             s->buffer_attr.prebuf,
1136             s->buffer_attr.minreq,
1137             0,
1138             &silence);
1139     pa_memblock_unref(silence.memblock);
1140
1141     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1142
1143     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1144
1145     /* pa_log("missing original: %li", (long int) *missing); */
1146
1147     *ss = s->sink_input->sample_spec;
1148     *map = s->sink_input->channel_map;
1149
1150     pa_idxset_put(c->output_streams, s, &s->index);
1151
1152     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1153                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1154                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1155                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1156                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1157
1158     pa_sink_input_put(s->sink_input);
1159     return s;
1160 }
1161
1162 /* Called from IO context */
1163 static void playback_stream_request_bytes(playback_stream *s) {
1164     size_t m, minreq;
1165     int previous_missing;
1166
1167     playback_stream_assert_ref(s);
1168
1169     m = pa_memblockq_pop_missing(s->memblockq);
1170
1171     /* pa_log("request_bytes(%lu) (tlength=%lu minreq=%lu length=%lu really missing=%lli)", */
1172     /*        (unsigned long) m, */
1173     /*        pa_memblockq_get_tlength(s->memblockq), */
1174     /*        pa_memblockq_get_minreq(s->memblockq), */
1175     /*        pa_memblockq_get_length(s->memblockq), */
1176     /*        (long long) pa_memblockq_get_tlength(s->memblockq) - (long long) pa_memblockq_get_length(s->memblockq)); */
1177
1178     if (m <= 0)
1179         return;
1180
1181 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1182
1183     previous_missing = pa_atomic_add(&s->missing, (int) m);
1184     minreq = pa_memblockq_get_minreq(s->memblockq);
1185
1186     if (pa_memblockq_prebuf_active(s->memblockq) ||
1187         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1188         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1189 }
1190
1191 /* Called from main context */
1192 static void playback_stream_send_killed(playback_stream *p) {
1193     pa_tagstruct *t;
1194     playback_stream_assert_ref(p);
1195
1196     t = pa_tagstruct_new(NULL, 0);
1197     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1198     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1199     pa_tagstruct_putu32(t, p->index);
1200     pa_pstream_send_tagstruct(p->connection->pstream, t);
1201 }
1202
1203 /* Called from main context */
1204 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1205     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1206     pa_native_connection_assert_ref(c);
1207
1208     if (!c->protocol)
1209         return -1;
1210
1211     switch (code) {
1212
1213         case CONNECTION_MESSAGE_REVOKE:
1214             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1215             break;
1216
1217         case CONNECTION_MESSAGE_RELEASE:
1218             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1219             break;
1220     }
1221
1222     return 0;
1223 }
1224
1225 /* Called from main context */
1226 static void native_connection_unlink(pa_native_connection *c) {
1227     record_stream *r;
1228     output_stream *o;
1229
1230     pa_assert(c);
1231
1232     if (!c->protocol)
1233         return;
1234
1235     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1236
1237     if (c->options)
1238         pa_native_options_unref(c->options);
1239
1240     while ((r = pa_idxset_first(c->record_streams, NULL)))
1241         record_stream_unlink(r);
1242
1243     while ((o = pa_idxset_first(c->output_streams, NULL)))
1244         if (playback_stream_isinstance(o))
1245             playback_stream_unlink(PLAYBACK_STREAM(o));
1246         else
1247             upload_stream_unlink(UPLOAD_STREAM(o));
1248
1249     if (c->subscription)
1250         pa_subscription_free(c->subscription);
1251
1252     if (c->pstream)
1253         pa_pstream_unlink(c->pstream);
1254
1255     if (c->auth_timeout_event) {
1256         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1257         c->auth_timeout_event = NULL;
1258     }
1259
1260     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1261     c->protocol = NULL;
1262     pa_native_connection_unref(c);
1263 }
1264
1265 /* Called from main context */
1266 static void native_connection_free(pa_object *o) {
1267     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1268
1269     pa_assert(c);
1270
1271     native_connection_unlink(c);
1272
1273     pa_idxset_free(c->record_streams, NULL, NULL);
1274     pa_idxset_free(c->output_streams, NULL, NULL);
1275
1276     pa_pdispatch_unref(c->pdispatch);
1277     pa_pstream_unref(c->pstream);
1278     pa_client_free(c->client);
1279
1280     pa_xfree(c);
1281 }
1282
1283 /* Called from main context */
1284 static void native_connection_send_memblock(pa_native_connection *c) {
1285     uint32_t start;
1286     record_stream *r;
1287
1288     start = PA_IDXSET_INVALID;
1289     for (;;) {
1290         pa_memchunk chunk;
1291
1292         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1293             return;
1294
1295         if (start == PA_IDXSET_INVALID)
1296             start = c->rrobin_index;
1297         else if (start == c->rrobin_index)
1298             return;
1299
1300         if (pa_memblockq_peek(r->memblockq, &chunk) >= 0) {
1301             pa_memchunk schunk = chunk;
1302
1303             if (schunk.length > r->buffer_attr.fragsize)
1304                 schunk.length = r->buffer_attr.fragsize;
1305
1306             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1307
1308             pa_memblockq_drop(r->memblockq, schunk.length);
1309             pa_memblock_unref(schunk.memblock);
1310
1311             return;
1312         }
1313     }
1314 }
1315
1316 /*** sink input callbacks ***/
1317
1318 /* Called from thread context */
1319 static void handle_seek(playback_stream *s, int64_t indexw) {
1320     playback_stream_assert_ref(s);
1321
1322 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1323
1324     if (s->sink_input->thread_info.underrun_for > 0) {
1325
1326 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1327
1328         if (pa_memblockq_is_readable(s->memblockq)) {
1329
1330             /* We just ended an underrun, let's ask the sink
1331              * for a complete rewind rewrite */
1332
1333             pa_log_debug("Requesting rewind due to end of underrun.");
1334             pa_sink_input_request_rewind(s->sink_input,
1335                                          (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 :
1336                                                    s->sink_input->thread_info.underrun_for),
1337                                          FALSE, TRUE, FALSE);
1338         }
1339
1340     } else {
1341         int64_t indexr;
1342
1343         indexr = pa_memblockq_get_read_index(s->memblockq);
1344
1345         if (indexw < indexr) {
1346             /* OK, the sink already asked for this data, so
1347              * let's have it usk us again */
1348
1349             pa_log_debug("Requesting rewind due to rewrite.");
1350             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1351         }
1352     }
1353
1354     playback_stream_request_bytes(s);
1355 }
1356
1357 static void flush_write_no_account(pa_memblockq *q) {
1358     pa_memblockq_flush_write(q, FALSE);
1359 }
1360
1361 /* Called from thread context */
1362 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1363     pa_sink_input *i = PA_SINK_INPUT(o);
1364     playback_stream *s;
1365
1366     pa_sink_input_assert_ref(i);
1367     s = PLAYBACK_STREAM(i->userdata);
1368     playback_stream_assert_ref(s);
1369
1370     switch (code) {
1371
1372         case SINK_INPUT_MESSAGE_SEEK:
1373         case SINK_INPUT_MESSAGE_POST_DATA: {
1374             int64_t windex = pa_memblockq_get_write_index(s->memblockq);
1375
1376             if (code == SINK_INPUT_MESSAGE_SEEK) {
1377                 /* The client side is incapable of accounting correctly
1378                  * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1379                  * able to deal with that. */
1380
1381                 pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1382                 windex = PA_MIN(windex, pa_memblockq_get_write_index(s->memblockq));
1383             }
1384
1385             if (chunk && pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1386                 if (pa_log_ratelimit(PA_LOG_WARN))
1387                     pa_log_warn("Failed to push data into queue");
1388                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1389                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1390             }
1391
1392             /* If more data is in queue, we rewind later instead. */
1393             if (s->seek_windex != -1)
1394                 windex = PA_MIN(windex, s->seek_windex);
1395             if (pa_atomic_dec(&s->seek_or_post_in_queue) > 1)
1396                 s->seek_windex = windex;
1397             else {
1398                 s->seek_windex = -1;
1399                 handle_seek(s, windex);
1400             }
1401             return 0;
1402         }
1403
1404         case SINK_INPUT_MESSAGE_DRAIN:
1405         case SINK_INPUT_MESSAGE_FLUSH:
1406         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1407         case SINK_INPUT_MESSAGE_TRIGGER: {
1408
1409             int64_t windex;
1410             pa_sink_input *isync;
1411             void (*func)(pa_memblockq *bq);
1412
1413             switch (code) {
1414                 case SINK_INPUT_MESSAGE_FLUSH:
1415                     func = flush_write_no_account;
1416                     break;
1417
1418                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1419                     func = pa_memblockq_prebuf_force;
1420                     break;
1421
1422                 case SINK_INPUT_MESSAGE_DRAIN:
1423                 case SINK_INPUT_MESSAGE_TRIGGER:
1424                     func = pa_memblockq_prebuf_disable;
1425                     break;
1426
1427                 default:
1428                     pa_assert_not_reached();
1429             }
1430
1431             windex = pa_memblockq_get_write_index(s->memblockq);
1432             func(s->memblockq);
1433             handle_seek(s, windex);
1434
1435             /* Do the same for all other members in the sync group */
1436             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1437                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1438                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1439                 func(ssync->memblockq);
1440                 handle_seek(ssync, windex);
1441             }
1442
1443             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1444                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1445                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1446                 func(ssync->memblockq);
1447                 handle_seek(ssync, windex);
1448             }
1449
1450             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1451                 if (!pa_memblockq_is_readable(s->memblockq))
1452                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1453                 else {
1454                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1455                     s->drain_request = TRUE;
1456                 }
1457             }
1458
1459             return 0;
1460         }
1461
1462         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1463             /* Atomically get a snapshot of all timing parameters... */
1464             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1465             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1466             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1467             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1468             s->underrun_for = s->sink_input->thread_info.underrun_for;
1469             s->playing_for = s->sink_input->thread_info.playing_for;
1470
1471             return 0;
1472
1473         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1474             int64_t windex;
1475
1476             windex = pa_memblockq_get_write_index(s->memblockq);
1477
1478             pa_memblockq_prebuf_force(s->memblockq);
1479
1480             handle_seek(s, windex);
1481
1482             /* Fall through to the default handler */
1483             break;
1484         }
1485
1486         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1487             pa_usec_t *r = userdata;
1488
1489             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1490
1491             /* Fall through, the default handler will add in the extra
1492              * latency added by the resampler */
1493             break;
1494         }
1495
1496         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1497             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1498             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1499             return 0;
1500         }
1501     }
1502
1503     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1504 }
1505
1506 /* Called from thread context */
1507 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1508     playback_stream *s;
1509
1510     pa_sink_input_assert_ref(i);
1511     s = PLAYBACK_STREAM(i->userdata);
1512     playback_stream_assert_ref(s);
1513     pa_assert(chunk);
1514
1515 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1516
1517     if (pa_memblockq_is_readable(s->memblockq))
1518         s->is_underrun = FALSE;
1519     else {
1520         if (!s->is_underrun)
1521             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1522
1523         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1524             s->drain_request = FALSE;
1525             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1526         } else if (!s->is_underrun)
1527             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1528
1529         s->is_underrun = TRUE;
1530
1531         playback_stream_request_bytes(s);
1532     }
1533
1534     /* This call will not fail with prebuf=0, hence we check for
1535        underrun explicitly above */
1536     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1537         return -1;
1538
1539     chunk->length = PA_MIN(nbytes, chunk->length);
1540
1541     if (i->thread_info.underrun_for > 0)
1542         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1543
1544     pa_memblockq_drop(s->memblockq, chunk->length);
1545     playback_stream_request_bytes(s);
1546
1547     return 0;
1548 }
1549
1550 /* Called from thread context */
1551 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1552     playback_stream *s;
1553
1554     pa_sink_input_assert_ref(i);
1555     s = PLAYBACK_STREAM(i->userdata);
1556     playback_stream_assert_ref(s);
1557
1558     /* If we are in an underrun, then we don't rewind */
1559     if (i->thread_info.underrun_for > 0)
1560         return;
1561
1562     pa_memblockq_rewind(s->memblockq, nbytes);
1563 }
1564
1565 /* Called from thread context */
1566 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1567     playback_stream *s;
1568
1569     pa_sink_input_assert_ref(i);
1570     s = PLAYBACK_STREAM(i->userdata);
1571     playback_stream_assert_ref(s);
1572
1573     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1574 }
1575
1576 /* Called from thread context */
1577 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1578     playback_stream *s;
1579     size_t new_tlength, old_tlength;
1580
1581     pa_sink_input_assert_ref(i);
1582     s = PLAYBACK_STREAM(i->userdata);
1583     playback_stream_assert_ref(s);
1584
1585     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1586     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1587
1588     if (old_tlength < new_tlength) {
1589         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1590         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1591         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1592
1593         if (new_tlength == old_tlength)
1594             pa_log_debug("Failed to increase tlength");
1595         else {
1596             pa_log_debug("Notifying client about increased tlength");
1597             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1598         }
1599     }
1600 }
1601
1602 /* Called from main context */
1603 static void sink_input_kill_cb(pa_sink_input *i) {
1604     playback_stream *s;
1605
1606     pa_sink_input_assert_ref(i);
1607     s = PLAYBACK_STREAM(i->userdata);
1608     playback_stream_assert_ref(s);
1609
1610     playback_stream_send_killed(s);
1611     playback_stream_unlink(s);
1612 }
1613
1614 /* Called from main context */
1615 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1616     playback_stream *s;
1617     pa_tagstruct *t;
1618
1619     pa_sink_input_assert_ref(i);
1620     s = PLAYBACK_STREAM(i->userdata);
1621     playback_stream_assert_ref(s);
1622
1623     if (s->connection->version < 15)
1624       return;
1625
1626     t = pa_tagstruct_new(NULL, 0);
1627     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1628     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1629     pa_tagstruct_putu32(t, s->index);
1630     pa_tagstruct_puts(t, event);
1631     pa_tagstruct_put_proplist(t, pl);
1632     pa_pstream_send_tagstruct(s->connection->pstream, t);
1633 }
1634
1635 /* Called from main context */
1636 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1637     playback_stream *s;
1638     pa_tagstruct *t;
1639
1640     pa_sink_input_assert_ref(i);
1641     s = PLAYBACK_STREAM(i->userdata);
1642     playback_stream_assert_ref(s);
1643
1644     if (s->connection->version < 12)
1645       return;
1646
1647     t = pa_tagstruct_new(NULL, 0);
1648     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1649     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1650     pa_tagstruct_putu32(t, s->index);
1651     pa_tagstruct_put_boolean(t, suspend);
1652     pa_pstream_send_tagstruct(s->connection->pstream, t);
1653 }
1654
1655 /* Called from main context */
1656 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1657     playback_stream *s;
1658     pa_tagstruct *t;
1659
1660     pa_sink_input_assert_ref(i);
1661     s = PLAYBACK_STREAM(i->userdata);
1662     playback_stream_assert_ref(s);
1663
1664     if (!dest)
1665         return;
1666
1667     fix_playback_buffer_attr(s);
1668     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1669     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1670
1671     if (s->connection->version < 12)
1672       return;
1673
1674     t = pa_tagstruct_new(NULL, 0);
1675     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1676     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1677     pa_tagstruct_putu32(t, s->index);
1678     pa_tagstruct_putu32(t, dest->index);
1679     pa_tagstruct_puts(t, dest->name);
1680     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1681
1682     if (s->connection->version >= 13) {
1683         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1684         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1685         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1686         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1687         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1688     }
1689
1690     pa_pstream_send_tagstruct(s->connection->pstream, t);
1691 }
1692
1693 /*** source_output callbacks ***/
1694
1695 /* Called from thread context */
1696 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1697     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1698     record_stream *s;
1699
1700     pa_source_output_assert_ref(o);
1701     s = RECORD_STREAM(o->userdata);
1702     record_stream_assert_ref(s);
1703
1704     switch (code) {
1705         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1706             /* Atomically get a snapshot of all timing parameters... */
1707             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1708             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1709             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1710             return 0;
1711     }
1712
1713     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1714 }
1715
1716 /* Called from thread context */
1717 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1718     record_stream *s;
1719
1720     pa_source_output_assert_ref(o);
1721     s = RECORD_STREAM(o->userdata);
1722     record_stream_assert_ref(s);
1723     pa_assert(chunk);
1724
1725     pa_atomic_add(&s->on_the_fly, chunk->length);
1726     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1727 }
1728
1729 static void source_output_kill_cb(pa_source_output *o) {
1730     record_stream *s;
1731
1732     pa_source_output_assert_ref(o);
1733     s = RECORD_STREAM(o->userdata);
1734     record_stream_assert_ref(s);
1735
1736     record_stream_send_killed(s);
1737     record_stream_unlink(s);
1738 }
1739
1740 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1741     record_stream *s;
1742
1743     pa_source_output_assert_ref(o);
1744     s = RECORD_STREAM(o->userdata);
1745     record_stream_assert_ref(s);
1746
1747     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1748
1749     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1750 }
1751
1752 /* Called from main context */
1753 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1754     record_stream *s;
1755     pa_tagstruct *t;
1756
1757     pa_source_output_assert_ref(o);
1758     s = RECORD_STREAM(o->userdata);
1759     record_stream_assert_ref(s);
1760
1761     if (s->connection->version < 15)
1762       return;
1763
1764     t = pa_tagstruct_new(NULL, 0);
1765     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1766     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1767     pa_tagstruct_putu32(t, s->index);
1768     pa_tagstruct_puts(t, event);
1769     pa_tagstruct_put_proplist(t, pl);
1770     pa_pstream_send_tagstruct(s->connection->pstream, t);
1771 }
1772
1773 /* Called from main context */
1774 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1775     record_stream *s;
1776     pa_tagstruct *t;
1777
1778     pa_source_output_assert_ref(o);
1779     s = RECORD_STREAM(o->userdata);
1780     record_stream_assert_ref(s);
1781
1782     if (s->connection->version < 12)
1783       return;
1784
1785     t = pa_tagstruct_new(NULL, 0);
1786     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1787     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1788     pa_tagstruct_putu32(t, s->index);
1789     pa_tagstruct_put_boolean(t, suspend);
1790     pa_pstream_send_tagstruct(s->connection->pstream, t);
1791 }
1792
1793 /* Called from main context */
1794 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1795     record_stream *s;
1796     pa_tagstruct *t;
1797
1798     pa_source_output_assert_ref(o);
1799     s = RECORD_STREAM(o->userdata);
1800     record_stream_assert_ref(s);
1801
1802     if (!dest)
1803         return;
1804
1805     fix_record_buffer_attr_pre(s);
1806     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1807     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1808     fix_record_buffer_attr_post(s);
1809
1810     if (s->connection->version < 12)
1811       return;
1812
1813     t = pa_tagstruct_new(NULL, 0);
1814     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1815     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1816     pa_tagstruct_putu32(t, s->index);
1817     pa_tagstruct_putu32(t, dest->index);
1818     pa_tagstruct_puts(t, dest->name);
1819     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1820
1821     if (s->connection->version >= 13) {
1822         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1823         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1824         pa_tagstruct_put_usec(t, s->configured_source_latency);
1825     }
1826
1827     pa_pstream_send_tagstruct(s->connection->pstream, t);
1828 }
1829
1830 /*** pdispatch callbacks ***/
1831
1832 static void protocol_error(pa_native_connection *c) {
1833     pa_log("protocol error, kicking client");
1834     native_connection_unlink(c);
1835 }
1836
1837 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1838 if (!(expression)) { \
1839     pa_pstream_send_error((pstream), (tag), (error)); \
1840     return; \
1841 } \
1842 } while(0);
1843
1844 static pa_tagstruct *reply_new(uint32_t tag) {
1845     pa_tagstruct *reply;
1846
1847     reply = pa_tagstruct_new(NULL, 0);
1848     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1849     pa_tagstruct_putu32(reply, tag);
1850     return reply;
1851 }
1852
1853 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1854     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1855     playback_stream *s;
1856     uint32_t sink_index, syncid, missing;
1857     pa_buffer_attr attr;
1858     const char *name = NULL, *sink_name;
1859     pa_sample_spec ss;
1860     pa_channel_map map;
1861     pa_tagstruct *reply;
1862     pa_sink *sink = NULL;
1863     pa_cvolume volume;
1864     pa_bool_t
1865         corked = FALSE,
1866         no_remap = FALSE,
1867         no_remix = FALSE,
1868         fix_format = FALSE,
1869         fix_rate = FALSE,
1870         fix_channels = FALSE,
1871         no_move = FALSE,
1872         variable_rate = FALSE,
1873         muted = FALSE,
1874         adjust_latency = FALSE,
1875         early_requests = FALSE,
1876         dont_inhibit_auto_suspend = FALSE,
1877         muted_set = FALSE,
1878         fail_on_suspend = FALSE,
1879         relative_volume = FALSE,
1880         passthrough = FALSE;
1881
1882     pa_sink_input_flags_t flags = 0;
1883     pa_proplist *p = NULL;
1884     pa_bool_t volume_set = TRUE;
1885     int ret = PA_ERR_INVALID;
1886     uint8_t n_formats = 0;
1887     pa_format_info *format;
1888     pa_idxset *formats = NULL;
1889     uint32_t i;
1890
1891     pa_native_connection_assert_ref(c);
1892     pa_assert(t);
1893     memset(&attr, 0, sizeof(attr));
1894
1895     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1896         pa_tagstruct_get(
1897                 t,
1898                 PA_TAG_SAMPLE_SPEC, &ss,
1899                 PA_TAG_CHANNEL_MAP, &map,
1900                 PA_TAG_U32, &sink_index,
1901                 PA_TAG_STRING, &sink_name,
1902                 PA_TAG_U32, &attr.maxlength,
1903                 PA_TAG_BOOLEAN, &corked,
1904                 PA_TAG_U32, &attr.tlength,
1905                 PA_TAG_U32, &attr.prebuf,
1906                 PA_TAG_U32, &attr.minreq,
1907                 PA_TAG_U32, &syncid,
1908                 PA_TAG_CVOLUME, &volume,
1909                 PA_TAG_INVALID) < 0) {
1910
1911         protocol_error(c);
1912         goto error;
1913     }
1914
1915     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1916     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID);
1917     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1918     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1919     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1920
1921     p = pa_proplist_new();
1922
1923     if (name)
1924         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1925
1926     if (c->version >= 12) {
1927         /* Since 0.9.8 the user can ask for a couple of additional flags */
1928
1929         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1930             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1931             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1932             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1933             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1934             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1935             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1936
1937             protocol_error(c);
1938             goto error;
1939         }
1940     }
1941
1942     if (c->version >= 13) {
1943
1944         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1945             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1946             pa_tagstruct_get_proplist(t, p) < 0) {
1947
1948             protocol_error(c);
1949             goto error;
1950         }
1951     }
1952
1953     if (c->version >= 14) {
1954
1955         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1956             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1957
1958             protocol_error(c);
1959             goto error;
1960         }
1961     }
1962
1963     if (c->version >= 15) {
1964
1965         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1966             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1967             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1968
1969             protocol_error(c);
1970             goto error;
1971         }
1972     }
1973
1974     if (c->version >= 17) {
1975
1976         if (pa_tagstruct_get_boolean(t, &relative_volume) < 0) {
1977
1978             protocol_error(c);
1979             goto error;
1980         }
1981     }
1982
1983     if (c->version >= 18) {
1984
1985         if (pa_tagstruct_get_boolean(t, &passthrough) < 0 ) {
1986             protocol_error(c);
1987             goto error;
1988         }
1989     }
1990
1991     if (c->version >= 21) {
1992
1993         if (pa_tagstruct_getu8(t, &n_formats) < 0) {
1994             protocol_error(c);
1995             goto error;
1996         }
1997
1998         if (n_formats)
1999             formats = pa_idxset_new(NULL, NULL);
2000
2001         for (i = 0; i < n_formats; i++) {
2002             format = pa_format_info_new();
2003             if (pa_tagstruct_get_format_info(t, format) < 0) {
2004                 protocol_error(c);
2005                 goto error;
2006             }
2007             pa_idxset_put(formats, format, NULL);
2008         }
2009     }
2010
2011     CHECK_VALIDITY(c->pstream, n_formats > 0 || pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2012     CHECK_VALIDITY(c->pstream, n_formats > 0 || (map.channels == ss.channels && volume.channels == ss.channels), tag, PA_ERR_INVALID);
2013     CHECK_VALIDITY(c->pstream, n_formats > 0 || pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2014     /* XXX: add checks on formats. At least inverse checks of the 3 above */
2015
2016     if (!pa_tagstruct_eof(t)) {
2017         protocol_error(c);
2018         goto error;
2019     }
2020
2021     if (sink_index != PA_INVALID_INDEX) {
2022
2023         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
2024             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2025             goto error;
2026         }
2027
2028     } else if (sink_name) {
2029
2030         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
2031             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2032             goto error;
2033         }
2034     }
2035
2036     flags =
2037         (corked ? PA_SINK_INPUT_START_CORKED : 0) |
2038         (no_remap ? PA_SINK_INPUT_NO_REMAP : 0) |
2039         (no_remix ? PA_SINK_INPUT_NO_REMIX : 0) |
2040         (fix_format ? PA_SINK_INPUT_FIX_FORMAT : 0) |
2041         (fix_rate ? PA_SINK_INPUT_FIX_RATE : 0) |
2042         (fix_channels ? PA_SINK_INPUT_FIX_CHANNELS : 0) |
2043         (no_move ? PA_SINK_INPUT_DONT_MOVE : 0) |
2044         (variable_rate ? PA_SINK_INPUT_VARIABLE_RATE : 0) |
2045         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2046         (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0) |
2047         (passthrough ? PA_SINK_INPUT_PASSTHROUGH : 0);
2048
2049     /* Only since protocol version 15 there's a seperate muted_set
2050      * flag. For older versions we synthesize it here */
2051     muted_set = muted_set || muted;
2052
2053     s = playback_stream_new(c, sink, &ss, &map, formats, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, relative_volume, &ret);
2054     pa_proplist_free(p);
2055
2056     CHECK_VALIDITY(c->pstream, s, tag, ret);
2057
2058     reply = reply_new(tag);
2059     pa_tagstruct_putu32(reply, s->index);
2060     pa_assert(s->sink_input);
2061     pa_tagstruct_putu32(reply, s->sink_input->index);
2062     pa_tagstruct_putu32(reply, missing);
2063
2064 /*     pa_log("initial request is %u", missing); */
2065
2066     if (c->version >= 9) {
2067         /* Since 0.9.0 we support sending the buffer metrics back to the client */
2068
2069         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2070         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
2071         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
2072         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
2073     }
2074
2075     if (c->version >= 12) {
2076         /* Since 0.9.8 we support sending the chosen sample
2077          * spec/channel map/device/suspend status back to the
2078          * client */
2079
2080         pa_tagstruct_put_sample_spec(reply, &ss);
2081         pa_tagstruct_put_channel_map(reply, &map);
2082
2083         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
2084         pa_tagstruct_puts(reply, s->sink_input->sink->name);
2085
2086         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2087     }
2088
2089     if (c->version >= 13)
2090         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2091
2092     if (c->version >= 21) {
2093         /* Send back the format we negotiated */
2094         if (s->sink_input->format)
2095             pa_tagstruct_put_format_info(reply, s->sink_input->format);
2096         else {
2097             pa_format_info *f = pa_format_info_new();
2098             pa_tagstruct_put_format_info(reply, f);
2099             pa_format_info_free(f);
2100         }
2101     }
2102
2103     pa_pstream_send_tagstruct(c->pstream, reply);
2104     return;
2105
2106 error:
2107     if (p)
2108         pa_proplist_free(p);
2109     if (formats)
2110         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
2111     return;
2112 }
2113
2114 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2115     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2116     uint32_t channel;
2117
2118     pa_native_connection_assert_ref(c);
2119     pa_assert(t);
2120
2121     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2122         !pa_tagstruct_eof(t)) {
2123         protocol_error(c);
2124         return;
2125     }
2126
2127     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2128
2129     switch (command) {
2130
2131         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2132             playback_stream *s;
2133             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2134                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2135                 return;
2136             }
2137
2138             playback_stream_unlink(s);
2139             break;
2140         }
2141
2142         case PA_COMMAND_DELETE_RECORD_STREAM: {
2143             record_stream *s;
2144             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2145                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2146                 return;
2147             }
2148
2149             record_stream_unlink(s);
2150             break;
2151         }
2152
2153         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2154             upload_stream *s;
2155
2156             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2157                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2158                 return;
2159             }
2160
2161             upload_stream_unlink(s);
2162             break;
2163         }
2164
2165         default:
2166             pa_assert_not_reached();
2167     }
2168
2169     pa_pstream_send_simple_ack(c->pstream, tag);
2170 }
2171
2172 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2173     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2174     record_stream *s;
2175     pa_buffer_attr attr;
2176     uint32_t source_index;
2177     const char *name = NULL, *source_name;
2178     pa_sample_spec ss;
2179     pa_channel_map map;
2180     pa_tagstruct *reply;
2181     pa_source *source = NULL;
2182     pa_bool_t
2183         corked = FALSE,
2184         no_remap = FALSE,
2185         no_remix = FALSE,
2186         fix_format = FALSE,
2187         fix_rate = FALSE,
2188         fix_channels = FALSE,
2189         no_move = FALSE,
2190         variable_rate = FALSE,
2191         adjust_latency = FALSE,
2192         peak_detect = FALSE,
2193         early_requests = FALSE,
2194         dont_inhibit_auto_suspend = FALSE,
2195         fail_on_suspend = FALSE;
2196     pa_source_output_flags_t flags = 0;
2197     pa_proplist *p;
2198     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2199     pa_sink_input *direct_on_input = NULL;
2200     int ret = PA_ERR_INVALID;
2201
2202     pa_native_connection_assert_ref(c);
2203     pa_assert(t);
2204
2205     memset(&attr, 0, sizeof(attr));
2206
2207     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2208         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2209         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2210         pa_tagstruct_getu32(t, &source_index) < 0 ||
2211         pa_tagstruct_gets(t, &source_name) < 0 ||
2212         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2213         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2214         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2215         protocol_error(c);
2216         return;
2217     }
2218
2219     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2220     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name_or_wildcard(source_name, PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
2221     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2222     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2223     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2224     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2225     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2226
2227     p = pa_proplist_new();
2228
2229     if (name)
2230         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2231
2232     if (c->version >= 12) {
2233         /* Since 0.9.8 the user can ask for a couple of additional flags */
2234
2235         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2236             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2237             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2238             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2239             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2240             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2241             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2242
2243             protocol_error(c);
2244             pa_proplist_free(p);
2245             return;
2246         }
2247     }
2248
2249     if (c->version >= 13) {
2250
2251         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2252             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2253             pa_tagstruct_get_proplist(t, p) < 0 ||
2254             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2255             protocol_error(c);
2256             pa_proplist_free(p);
2257             return;
2258         }
2259     }
2260
2261     if (c->version >= 14) {
2262
2263         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2264             protocol_error(c);
2265             pa_proplist_free(p);
2266             return;
2267         }
2268     }
2269
2270     if (c->version >= 15) {
2271
2272         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2273             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2274             protocol_error(c);
2275             pa_proplist_free(p);
2276             return;
2277         }
2278     }
2279
2280     if (!pa_tagstruct_eof(t)) {
2281         protocol_error(c);
2282         pa_proplist_free(p);
2283         return;
2284     }
2285
2286     if (source_index != PA_INVALID_INDEX) {
2287
2288         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2289             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2290             pa_proplist_free(p);
2291             return;
2292         }
2293
2294     } else if (source_name) {
2295
2296         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2297             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2298             pa_proplist_free(p);
2299             return;
2300         }
2301     }
2302
2303     if (direct_on_input_idx != PA_INVALID_INDEX) {
2304
2305         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2306             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2307             pa_proplist_free(p);
2308             return;
2309         }
2310     }
2311
2312     flags =
2313         (corked ? PA_SOURCE_OUTPUT_START_CORKED : 0) |
2314         (no_remap ? PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2315         (no_remix ? PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2316         (fix_format ? PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2317         (fix_rate ? PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2318         (fix_channels ? PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2319         (no_move ? PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2320         (variable_rate ? PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2321         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2322         (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0);
2323
2324     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2325     pa_proplist_free(p);
2326
2327     CHECK_VALIDITY(c->pstream, s, tag, ret);
2328
2329     reply = reply_new(tag);
2330     pa_tagstruct_putu32(reply, s->index);
2331     pa_assert(s->source_output);
2332     pa_tagstruct_putu32(reply, s->source_output->index);
2333
2334     if (c->version >= 9) {
2335         /* Since 0.9 we support sending the buffer metrics back to the client */
2336
2337         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2338         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2339     }
2340
2341     if (c->version >= 12) {
2342         /* Since 0.9.8 we support sending the chosen sample
2343          * spec/channel map/device/suspend status back to the
2344          * client */
2345
2346         pa_tagstruct_put_sample_spec(reply, &ss);
2347         pa_tagstruct_put_channel_map(reply, &map);
2348
2349         pa_tagstruct_putu32(reply, s->source_output->source->index);
2350         pa_tagstruct_puts(reply, s->source_output->source->name);
2351
2352         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2353     }
2354
2355     if (c->version >= 13)
2356         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2357
2358     pa_pstream_send_tagstruct(c->pstream, reply);
2359 }
2360
2361 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2362     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2363     int ret;
2364
2365     pa_native_connection_assert_ref(c);
2366     pa_assert(t);
2367
2368     if (!pa_tagstruct_eof(t)) {
2369         protocol_error(c);
2370         return;
2371     }
2372
2373     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2374     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2375     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2376
2377     pa_log_debug("Client %s asks us to terminate.", pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY)));
2378
2379     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2380 }
2381
2382 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2383     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2384     const void*cookie;
2385     pa_tagstruct *reply;
2386     pa_bool_t shm_on_remote = FALSE, do_shm;
2387
2388     pa_native_connection_assert_ref(c);
2389     pa_assert(t);
2390
2391     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2392         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2393         !pa_tagstruct_eof(t)) {
2394         protocol_error(c);
2395         return;
2396     }
2397
2398     /* Minimum supported version */
2399     if (c->version < 8) {
2400         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2401         return;
2402     }
2403
2404     /* Starting with protocol version 13 the MSB of the version tag
2405        reflects if shm is available for this pa_native_connection or
2406        not. */
2407     if (c->version >= 13) {
2408         shm_on_remote = !!(c->version & 0x80000000U);
2409         c->version &= 0x7FFFFFFFU;
2410     }
2411
2412     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2413
2414     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2415
2416     if (!c->authorized) {
2417         pa_bool_t success = FALSE;
2418
2419 #ifdef HAVE_CREDS
2420         const pa_creds *creds;
2421
2422         if ((creds = pa_pdispatch_creds(pd))) {
2423             if (creds->uid == getuid())
2424                 success = TRUE;
2425             else if (c->options->auth_group) {
2426                 int r;
2427                 gid_t gid;
2428
2429                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2430                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2431                 else if (gid == creds->gid)
2432                     success = TRUE;
2433
2434                 if (!success) {
2435                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2436                         pa_log_warn("Failed to check group membership.");
2437                     else if (r > 0)
2438                         success = TRUE;
2439                 }
2440             }
2441
2442             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2443                         (unsigned long) creds->uid,
2444                         (unsigned long) creds->gid,
2445                         (int) success);
2446         }
2447 #endif
2448
2449         if (!success && c->options->auth_cookie) {
2450             const uint8_t *ac;
2451
2452             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2453                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2454                     success = TRUE;
2455         }
2456
2457         if (!success) {
2458             pa_log_warn("Denied access to client with invalid authorization data.");
2459             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2460             return;
2461         }
2462
2463         c->authorized = TRUE;
2464         if (c->auth_timeout_event) {
2465             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2466             c->auth_timeout_event = NULL;
2467         }
2468     }
2469
2470     /* Enable shared memory support if possible */
2471     do_shm =
2472         pa_mempool_is_shared(c->protocol->core->mempool) &&
2473         c->is_local;
2474
2475     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2476
2477     if (do_shm)
2478         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2479             do_shm = FALSE;
2480
2481 #ifdef HAVE_CREDS
2482     if (do_shm) {
2483         /* Only enable SHM if both sides are owned by the same
2484          * user. This is a security measure because otherwise data
2485          * private to the user might leak. */
2486
2487         const pa_creds *creds;
2488         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2489             do_shm = FALSE;
2490     }
2491 #endif
2492
2493     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2494     pa_pstream_enable_shm(c->pstream, do_shm);
2495
2496     reply = reply_new(tag);
2497     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2498
2499 #ifdef HAVE_CREDS
2500 {
2501     /* SHM support is only enabled after both sides made sure they are the same user. */
2502
2503     pa_creds ucred;
2504
2505     ucred.uid = getuid();
2506     ucred.gid = getgid();
2507
2508     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2509 }
2510 #else
2511     pa_pstream_send_tagstruct(c->pstream, reply);
2512 #endif
2513 }
2514
2515 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2516     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2517     const char *name = NULL;
2518     pa_proplist *p;
2519     pa_tagstruct *reply;
2520
2521     pa_native_connection_assert_ref(c);
2522     pa_assert(t);
2523
2524     p = pa_proplist_new();
2525
2526     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2527         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2528         !pa_tagstruct_eof(t)) {
2529
2530         protocol_error(c);
2531         pa_proplist_free(p);
2532         return;
2533     }
2534
2535     if (name)
2536         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2537             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2538             pa_proplist_free(p);
2539             return;
2540         }
2541
2542     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2543     pa_proplist_free(p);
2544
2545     reply = reply_new(tag);
2546
2547     if (c->version >= 13)
2548         pa_tagstruct_putu32(reply, c->client->index);
2549
2550     pa_pstream_send_tagstruct(c->pstream, reply);
2551 }
2552
2553 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2554     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2555     const char *name;
2556     uint32_t idx = PA_IDXSET_INVALID;
2557
2558     pa_native_connection_assert_ref(c);
2559     pa_assert(t);
2560
2561     if (pa_tagstruct_gets(t, &name) < 0 ||
2562         !pa_tagstruct_eof(t)) {
2563         protocol_error(c);
2564         return;
2565     }
2566
2567     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2568     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_LOOKUP_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
2569
2570     if (command == PA_COMMAND_LOOKUP_SINK) {
2571         pa_sink *sink;
2572         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2573             idx = sink->index;
2574     } else {
2575         pa_source *source;
2576         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2577         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2578             idx = source->index;
2579     }
2580
2581     if (idx == PA_IDXSET_INVALID)
2582         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2583     else {
2584         pa_tagstruct *reply;
2585         reply = reply_new(tag);
2586         pa_tagstruct_putu32(reply, idx);
2587         pa_pstream_send_tagstruct(c->pstream, reply);
2588     }
2589 }
2590
2591 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2592     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2593     uint32_t idx;
2594     playback_stream *s;
2595
2596     pa_native_connection_assert_ref(c);
2597     pa_assert(t);
2598
2599     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2600         !pa_tagstruct_eof(t)) {
2601         protocol_error(c);
2602         return;
2603     }
2604
2605     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2606     s = pa_idxset_get_by_index(c->output_streams, idx);
2607     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2608     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2609
2610     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2611 }
2612
2613 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2614     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2615     pa_tagstruct *reply;
2616     const pa_mempool_stat *stat;
2617
2618     pa_native_connection_assert_ref(c);
2619     pa_assert(t);
2620
2621     if (!pa_tagstruct_eof(t)) {
2622         protocol_error(c);
2623         return;
2624     }
2625
2626     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2627
2628     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2629
2630     reply = reply_new(tag);
2631     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2632     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2633     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2634     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2635     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2636     pa_pstream_send_tagstruct(c->pstream, reply);
2637 }
2638
2639 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2640     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2641     pa_tagstruct *reply;
2642     playback_stream *s;
2643     struct timeval tv, now;
2644     uint32_t idx;
2645
2646     pa_native_connection_assert_ref(c);
2647     pa_assert(t);
2648
2649     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2650         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2651         !pa_tagstruct_eof(t)) {
2652         protocol_error(c);
2653         return;
2654     }
2655
2656     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2657     s = pa_idxset_get_by_index(c->output_streams, idx);
2658     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2659     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2660
2661     /* Get an atomic snapshot of all timing parameters */
2662     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2663
2664     reply = reply_new(tag);
2665     pa_tagstruct_put_usec(reply,
2666                           s->current_sink_latency +
2667                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2668     pa_tagstruct_put_usec(reply, 0);
2669     pa_tagstruct_put_boolean(reply,
2670                              s->playing_for > 0 &&
2671                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2672                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2673     pa_tagstruct_put_timeval(reply, &tv);
2674     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2675     pa_tagstruct_puts64(reply, s->write_index);
2676     pa_tagstruct_puts64(reply, s->read_index);
2677
2678     if (c->version >= 13) {
2679         pa_tagstruct_putu64(reply, s->underrun_for);
2680         pa_tagstruct_putu64(reply, s->playing_for);
2681     }
2682
2683     pa_pstream_send_tagstruct(c->pstream, reply);
2684 }
2685
2686 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2687     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2688     pa_tagstruct *reply;
2689     record_stream *s;
2690     struct timeval tv, now;
2691     uint32_t idx;
2692
2693     pa_native_connection_assert_ref(c);
2694     pa_assert(t);
2695
2696     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2697         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2698         !pa_tagstruct_eof(t)) {
2699         protocol_error(c);
2700         return;
2701     }
2702
2703     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2704     s = pa_idxset_get_by_index(c->record_streams, idx);
2705     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2706
2707     /* Get an atomic snapshot of all timing parameters */
2708     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2709
2710     reply = reply_new(tag);
2711     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2712     pa_tagstruct_put_usec(reply,
2713                           s->current_source_latency +
2714                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->source->sample_spec));
2715     pa_tagstruct_put_boolean(reply,
2716                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2717                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2718     pa_tagstruct_put_timeval(reply, &tv);
2719     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2720     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2721     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2722     pa_pstream_send_tagstruct(c->pstream, reply);
2723 }
2724
2725 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2726     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2727     upload_stream *s;
2728     uint32_t length;
2729     const char *name = NULL;
2730     pa_sample_spec ss;
2731     pa_channel_map map;
2732     pa_tagstruct *reply;
2733     pa_proplist *p;
2734
2735     pa_native_connection_assert_ref(c);
2736     pa_assert(t);
2737
2738     if (pa_tagstruct_gets(t, &name) < 0 ||
2739         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2740         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2741         pa_tagstruct_getu32(t, &length) < 0) {
2742         protocol_error(c);
2743         return;
2744     }
2745
2746     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2747     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2748     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2749     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2750     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2751     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2752
2753     p = pa_proplist_new();
2754
2755     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2756         !pa_tagstruct_eof(t)) {
2757
2758         protocol_error(c);
2759         pa_proplist_free(p);
2760         return;
2761     }
2762
2763     if (c->version < 13)
2764         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2765     else if (!name)
2766         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2767             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2768
2769     if (!name || !pa_namereg_is_valid_name(name)) {
2770         pa_proplist_free(p);
2771         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2772     }
2773
2774     s = upload_stream_new(c, &ss, &map, name, length, p);
2775     pa_proplist_free(p);
2776
2777     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2778
2779     reply = reply_new(tag);
2780     pa_tagstruct_putu32(reply, s->index);
2781     pa_tagstruct_putu32(reply, length);
2782     pa_pstream_send_tagstruct(c->pstream, reply);
2783 }
2784
2785 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2786     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2787     uint32_t channel;
2788     upload_stream *s;
2789     uint32_t idx;
2790
2791     pa_native_connection_assert_ref(c);
2792     pa_assert(t);
2793
2794     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2795         !pa_tagstruct_eof(t)) {
2796         protocol_error(c);
2797         return;
2798     }
2799
2800     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2801
2802     s = pa_idxset_get_by_index(c->output_streams, channel);
2803     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2804     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2805
2806     if (!s->memchunk.memblock)
2807         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2808     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2809         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2810     else
2811         pa_pstream_send_simple_ack(c->pstream, tag);
2812
2813     upload_stream_unlink(s);
2814 }
2815
2816 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2817     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2818     uint32_t sink_index;
2819     pa_volume_t volume;
2820     pa_sink *sink;
2821     const char *name, *sink_name;
2822     uint32_t idx;
2823     pa_proplist *p;
2824     pa_tagstruct *reply;
2825
2826     pa_native_connection_assert_ref(c);
2827     pa_assert(t);
2828
2829     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2830
2831     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2832         pa_tagstruct_gets(t, &sink_name) < 0 ||
2833         pa_tagstruct_getu32(t, &volume) < 0 ||
2834         pa_tagstruct_gets(t, &name) < 0) {
2835         protocol_error(c);
2836         return;
2837     }
2838
2839     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID);
2840     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2841     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2842     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2843
2844     if (sink_index != PA_INVALID_INDEX)
2845         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2846     else
2847         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2848
2849     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2850
2851     p = pa_proplist_new();
2852
2853     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2854         !pa_tagstruct_eof(t)) {
2855         protocol_error(c);
2856         pa_proplist_free(p);
2857         return;
2858     }
2859
2860     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2861
2862     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2863         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2864         pa_proplist_free(p);
2865         return;
2866     }
2867
2868     pa_proplist_free(p);
2869
2870     reply = reply_new(tag);
2871
2872     if (c->version >= 13)
2873         pa_tagstruct_putu32(reply, idx);
2874
2875     pa_pstream_send_tagstruct(c->pstream, reply);
2876 }
2877
2878 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2879     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2880     const char *name;
2881
2882     pa_native_connection_assert_ref(c);
2883     pa_assert(t);
2884
2885     if (pa_tagstruct_gets(t, &name) < 0 ||
2886         !pa_tagstruct_eof(t)) {
2887         protocol_error(c);
2888         return;
2889     }
2890
2891     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2892     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2893
2894     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2895         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2896         return;
2897     }
2898
2899     pa_pstream_send_simple_ack(c->pstream, tag);
2900 }
2901
2902 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2903     pa_assert(c);
2904     pa_assert(fixed);
2905     pa_assert(original);
2906
2907     *fixed = *original;
2908
2909     if (c->version < 12) {
2910         /* Before protocol version 12 we didn't support S32 samples,
2911          * so we need to lie about this to the client */
2912
2913         if (fixed->format == PA_SAMPLE_S32LE)
2914             fixed->format = PA_SAMPLE_FLOAT32LE;
2915         if (fixed->format == PA_SAMPLE_S32BE)
2916             fixed->format = PA_SAMPLE_FLOAT32BE;
2917     }
2918
2919     if (c->version < 15) {
2920         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2921             fixed->format = PA_SAMPLE_FLOAT32LE;
2922         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2923             fixed->format = PA_SAMPLE_FLOAT32BE;
2924     }
2925 }
2926
2927 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2928     pa_sample_spec fixed_ss;
2929
2930     pa_assert(t);
2931     pa_sink_assert_ref(sink);
2932
2933     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2934
2935     pa_tagstruct_put(
2936         t,
2937         PA_TAG_U32, sink->index,
2938         PA_TAG_STRING, sink->name,
2939         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2940         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2941         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2942         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2943         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
2944         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2945         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2946         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2947         PA_TAG_USEC, pa_sink_get_latency(sink),
2948         PA_TAG_STRING, sink->driver,
2949         PA_TAG_U32, sink->flags & ~PA_SINK_SHARE_VOLUME_WITH_MASTER,
2950         PA_TAG_INVALID);
2951
2952     if (c->version >= 13) {
2953         pa_tagstruct_put_proplist(t, sink->proplist);
2954         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2955     }
2956
2957     if (c->version >= 15) {
2958         pa_tagstruct_put_volume(t, sink->base_volume);
2959         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2960             pa_log_error("Internal sink state is invalid.");
2961         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2962         pa_tagstruct_putu32(t, sink->n_volume_steps);
2963         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2964     }
2965
2966     if (c->version >= 16) {
2967         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
2968
2969         if (sink->ports) {
2970             void *state;
2971             pa_device_port *p;
2972
2973             PA_HASHMAP_FOREACH(p, sink->ports, state) {
2974                 pa_tagstruct_puts(t, p->name);
2975                 pa_tagstruct_puts(t, p->description);
2976                 pa_tagstruct_putu32(t, p->priority);
2977             }
2978         }
2979
2980         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
2981     }
2982 }
2983
2984 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2985     pa_sample_spec fixed_ss;
2986
2987     pa_assert(t);
2988     pa_source_assert_ref(source);
2989
2990     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2991
2992     pa_tagstruct_put(
2993         t,
2994         PA_TAG_U32, source->index,
2995         PA_TAG_STRING, source->name,
2996         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2997         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2998         PA_TAG_CHANNEL_MAP, &source->channel_map,
2999         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
3000         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
3001         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
3002         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
3003         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
3004         PA_TAG_USEC, pa_source_get_latency(source),
3005         PA_TAG_STRING, source->driver,
3006         PA_TAG_U32, source->flags,
3007         PA_TAG_INVALID);
3008
3009     if (c->version >= 13) {
3010         pa_tagstruct_put_proplist(t, source->proplist);
3011         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
3012     }
3013
3014     if (c->version >= 15) {
3015         pa_tagstruct_put_volume(t, source->base_volume);
3016         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
3017             pa_log_error("Internal source state is invalid.");
3018         pa_tagstruct_putu32(t, pa_source_get_state(source));
3019         pa_tagstruct_putu32(t, source->n_volume_steps);
3020         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
3021     }
3022
3023     if (c->version >= 16) {
3024
3025         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
3026
3027         if (source->ports) {
3028             void *state;
3029             pa_device_port *p;
3030
3031             PA_HASHMAP_FOREACH(p, source->ports, state) {
3032                 pa_tagstruct_puts(t, p->name);
3033                 pa_tagstruct_puts(t, p->description);
3034                 pa_tagstruct_putu32(t, p->priority);
3035             }
3036         }
3037
3038         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
3039     }
3040 }
3041
3042 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
3043     pa_assert(t);
3044     pa_assert(client);
3045
3046     pa_tagstruct_putu32(t, client->index);
3047     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
3048     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
3049     pa_tagstruct_puts(t, client->driver);
3050
3051     if (c->version >= 13)
3052         pa_tagstruct_put_proplist(t, client->proplist);
3053 }
3054
3055 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
3056     void *state = NULL;
3057     pa_card_profile *p;
3058
3059     pa_assert(t);
3060     pa_assert(card);
3061
3062     pa_tagstruct_putu32(t, card->index);
3063     pa_tagstruct_puts(t, card->name);
3064     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
3065     pa_tagstruct_puts(t, card->driver);
3066
3067     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
3068
3069     if (card->profiles) {
3070         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
3071             pa_tagstruct_puts(t, p->name);
3072             pa_tagstruct_puts(t, p->description);
3073             pa_tagstruct_putu32(t, p->n_sinks);
3074             pa_tagstruct_putu32(t, p->n_sources);
3075             pa_tagstruct_putu32(t, p->priority);
3076         }
3077     }
3078
3079     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
3080     pa_tagstruct_put_proplist(t, card->proplist);
3081 }
3082
3083 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
3084     pa_assert(t);
3085     pa_assert(module);
3086
3087     pa_tagstruct_putu32(t, module->index);
3088     pa_tagstruct_puts(t, module->name);
3089     pa_tagstruct_puts(t, module->argument);
3090     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
3091
3092     if (c->version < 15)
3093         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
3094
3095     if (c->version >= 15)
3096         pa_tagstruct_put_proplist(t, module->proplist);
3097 }
3098
3099 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
3100     pa_sample_spec fixed_ss;
3101     pa_usec_t sink_latency;
3102     pa_cvolume v;
3103     pa_bool_t has_volume = FALSE;
3104
3105     pa_assert(t);
3106     pa_sink_input_assert_ref(s);
3107
3108     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3109
3110     has_volume = pa_sink_input_is_volume_readable(s);
3111     if (has_volume)
3112         pa_sink_input_get_volume(s, &v, TRUE);
3113     else
3114         pa_cvolume_reset(&v, fixed_ss.channels);
3115
3116     pa_tagstruct_putu32(t, s->index);
3117     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3118     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3119     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3120     pa_tagstruct_putu32(t, s->sink->index);
3121     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3122     pa_tagstruct_put_channel_map(t, &s->channel_map);
3123     pa_tagstruct_put_cvolume(t, &v);
3124     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3125     pa_tagstruct_put_usec(t, sink_latency);
3126     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3127     pa_tagstruct_puts(t, s->driver);
3128     if (c->version >= 11)
3129         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3130     if (c->version >= 13)
3131         pa_tagstruct_put_proplist(t, s->proplist);
3132     if (c->version >= 19)
3133         pa_tagstruct_put_boolean(t, (pa_sink_input_get_state(s) == PA_SINK_INPUT_CORKED));
3134     if (c->version >= 20) {
3135         pa_tagstruct_put_boolean(t, has_volume);
3136         pa_tagstruct_put_boolean(t, s->volume_writable);
3137     }
3138 }
3139
3140 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3141     pa_sample_spec fixed_ss;
3142     pa_usec_t source_latency;
3143
3144     pa_assert(t);
3145     pa_source_output_assert_ref(s);
3146
3147     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3148
3149     pa_tagstruct_putu32(t, s->index);
3150     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3151     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3152     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3153     pa_tagstruct_putu32(t, s->source->index);
3154     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3155     pa_tagstruct_put_channel_map(t, &s->channel_map);
3156     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3157     pa_tagstruct_put_usec(t, source_latency);
3158     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3159     pa_tagstruct_puts(t, s->driver);
3160     if (c->version >= 13)
3161         pa_tagstruct_put_proplist(t, s->proplist);
3162     if (c->version >= 19)
3163         pa_tagstruct_put_boolean(t, (pa_source_output_get_state(s) == PA_SOURCE_OUTPUT_CORKED));
3164 }
3165
3166 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3167     pa_sample_spec fixed_ss;
3168     pa_cvolume v;
3169
3170     pa_assert(t);
3171     pa_assert(e);
3172
3173     if (e->memchunk.memblock)
3174         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3175     else
3176         memset(&fixed_ss, 0, sizeof(fixed_ss));
3177
3178     pa_tagstruct_putu32(t, e->index);
3179     pa_tagstruct_puts(t, e->name);
3180
3181     if (e->volume_is_set)
3182         v = e->volume;
3183     else
3184         pa_cvolume_init(&v);
3185
3186     pa_tagstruct_put_cvolume(t, &v);
3187     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3188     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3189     pa_tagstruct_put_channel_map(t, &e->channel_map);
3190     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3191     pa_tagstruct_put_boolean(t, e->lazy);
3192     pa_tagstruct_puts(t, e->filename);
3193
3194     if (c->version >= 13)
3195         pa_tagstruct_put_proplist(t, e->proplist);
3196 }
3197
3198 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3199     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3200     uint32_t idx;
3201     pa_sink *sink = NULL;
3202     pa_source *source = NULL;
3203     pa_client *client = NULL;
3204     pa_card *card = NULL;
3205     pa_module *module = NULL;
3206     pa_sink_input *si = NULL;
3207     pa_source_output *so = NULL;
3208     pa_scache_entry *sce = NULL;
3209     const char *name = NULL;
3210     pa_tagstruct *reply;
3211
3212     pa_native_connection_assert_ref(c);
3213     pa_assert(t);
3214
3215     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3216         (command != PA_COMMAND_GET_CLIENT_INFO &&
3217          command != PA_COMMAND_GET_MODULE_INFO &&
3218          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3219          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3220          pa_tagstruct_gets(t, &name) < 0) ||
3221         !pa_tagstruct_eof(t)) {
3222         protocol_error(c);
3223         return;
3224     }
3225
3226     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3227     CHECK_VALIDITY(c->pstream, !name ||
3228                    (command == PA_COMMAND_GET_SINK_INFO &&
3229                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SINK)) ||
3230                    (command == PA_COMMAND_GET_SOURCE_INFO &&
3231                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SOURCE)) ||
3232                    pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3233     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3234     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3235     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3236
3237     if (command == PA_COMMAND_GET_SINK_INFO) {
3238         if (idx != PA_INVALID_INDEX)
3239             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3240         else
3241             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3242     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3243         if (idx != PA_INVALID_INDEX)
3244             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3245         else
3246             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3247     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3248         if (idx != PA_INVALID_INDEX)
3249             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3250         else
3251             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3252     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3253         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3254     else if (command == PA_COMMAND_GET_MODULE_INFO)
3255         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3256     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3257         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3258     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3259         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3260     else {
3261         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3262         if (idx != PA_INVALID_INDEX)
3263             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3264         else
3265             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3266     }
3267
3268     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3269         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3270         return;
3271     }
3272
3273     reply = reply_new(tag);
3274     if (sink)
3275         sink_fill_tagstruct(c, reply, sink);
3276     else if (source)
3277         source_fill_tagstruct(c, reply, source);
3278     else if (client)
3279         client_fill_tagstruct(c, reply, client);
3280     else if (card)
3281         card_fill_tagstruct(c, reply, card);
3282     else if (module)
3283         module_fill_tagstruct(c, reply, module);
3284     else if (si)
3285         sink_input_fill_tagstruct(c, reply, si);
3286     else if (so)
3287         source_output_fill_tagstruct(c, reply, so);
3288     else
3289         scache_fill_tagstruct(c, reply, sce);
3290     pa_pstream_send_tagstruct(c->pstream, reply);
3291 }
3292
3293 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3294     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3295     pa_idxset *i;
3296     uint32_t idx;
3297     void *p;
3298     pa_tagstruct *reply;
3299
3300     pa_native_connection_assert_ref(c);
3301     pa_assert(t);
3302
3303     if (!pa_tagstruct_eof(t)) {
3304         protocol_error(c);
3305         return;
3306     }
3307
3308     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3309
3310     reply = reply_new(tag);
3311
3312     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3313         i = c->protocol->core->sinks;
3314     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3315         i = c->protocol->core->sources;
3316     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3317         i = c->protocol->core->clients;
3318     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3319         i = c->protocol->core->cards;
3320     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3321         i = c->protocol->core->modules;
3322     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3323         i = c->protocol->core->sink_inputs;
3324     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3325         i = c->protocol->core->source_outputs;
3326     else {
3327         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3328         i = c->protocol->core->scache;
3329     }
3330
3331     if (i) {
3332         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3333             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3334                 sink_fill_tagstruct(c, reply, p);
3335             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3336                 source_fill_tagstruct(c, reply, p);
3337             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3338                 client_fill_tagstruct(c, reply, p);
3339             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3340                 card_fill_tagstruct(c, reply, p);
3341             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3342                 module_fill_tagstruct(c, reply, p);
3343             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3344                 sink_input_fill_tagstruct(c, reply, p);
3345             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3346                 source_output_fill_tagstruct(c, reply, p);
3347             else {
3348                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3349                 scache_fill_tagstruct(c, reply, p);
3350             }
3351         }
3352     }
3353
3354     pa_pstream_send_tagstruct(c->pstream, reply);
3355 }
3356
3357 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3358     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3359     pa_tagstruct *reply;
3360     pa_sink *def_sink;
3361     pa_source *def_source;
3362     pa_sample_spec fixed_ss;
3363     char *h, *u;
3364
3365     pa_native_connection_assert_ref(c);
3366     pa_assert(t);
3367
3368     if (!pa_tagstruct_eof(t)) {
3369         protocol_error(c);
3370         return;
3371     }
3372
3373     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3374
3375     reply = reply_new(tag);
3376     pa_tagstruct_puts(reply, PACKAGE_NAME);
3377     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3378
3379     u = pa_get_user_name_malloc();
3380     pa_tagstruct_puts(reply, u);
3381     pa_xfree(u);
3382
3383     h = pa_get_host_name_malloc();
3384     pa_tagstruct_puts(reply, h);
3385     pa_xfree(h);
3386
3387     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3388     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3389
3390     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3391     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3392     def_source = pa_namereg_get_default_source(c->protocol->core);
3393     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3394
3395     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3396
3397     if (c->version >= 15)
3398         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3399
3400     pa_pstream_send_tagstruct(c->pstream, reply);
3401 }
3402
3403 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3404     pa_tagstruct *t;
3405     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3406
3407     pa_native_connection_assert_ref(c);
3408
3409     t = pa_tagstruct_new(NULL, 0);
3410     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3411     pa_tagstruct_putu32(t, (uint32_t) -1);
3412     pa_tagstruct_putu32(t, e);
3413     pa_tagstruct_putu32(t, idx);
3414     pa_pstream_send_tagstruct(c->pstream, t);
3415 }
3416
3417 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3418     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3419     pa_subscription_mask_t m;
3420
3421     pa_native_connection_assert_ref(c);
3422     pa_assert(t);
3423
3424     if (pa_tagstruct_getu32(t, &m) < 0 ||
3425         !pa_tagstruct_eof(t)) {
3426         protocol_error(c);
3427         return;
3428     }
3429
3430     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3431     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3432
3433     if (c->subscription)
3434         pa_subscription_free(c->subscription);
3435
3436     if (m != 0) {
3437         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3438         pa_assert(c->subscription);
3439     } else
3440         c->subscription = NULL;
3441
3442     pa_pstream_send_simple_ack(c->pstream, tag);
3443 }
3444
3445 static void command_set_volume(
3446         pa_pdispatch *pd,
3447         uint32_t command,
3448         uint32_t tag,
3449         pa_tagstruct *t,
3450         void *userdata) {
3451
3452     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3453     uint32_t idx;
3454     pa_cvolume volume;
3455     pa_sink *sink = NULL;
3456     pa_source *source = NULL;
3457     pa_sink_input *si = NULL;
3458     const char *name = NULL;
3459     const char *client_name;
3460
3461     pa_native_connection_assert_ref(c);
3462     pa_assert(t);
3463
3464     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3465         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3466         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3467         pa_tagstruct_get_cvolume(t, &volume) ||
3468         !pa_tagstruct_eof(t)) {
3469         protocol_error(c);
3470         return;
3471     }
3472
3473     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3474     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_VOLUME ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3475     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3476     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3477     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3478     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3479
3480     switch (command) {
3481
3482         case PA_COMMAND_SET_SINK_VOLUME:
3483             if (idx != PA_INVALID_INDEX)
3484                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3485             else
3486                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3487             break;
3488
3489         case PA_COMMAND_SET_SOURCE_VOLUME:
3490             if (idx != PA_INVALID_INDEX)
3491                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3492             else
3493                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3494             break;
3495
3496         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3497             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3498             break;
3499
3500         default:
3501             pa_assert_not_reached();
3502     }
3503
3504     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3505
3506     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3507
3508     if (sink) {
3509         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &sink->sample_spec), tag, PA_ERR_INVALID);
3510
3511         pa_log_debug("Client %s changes volume of sink %s.", client_name, sink->name);
3512         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3513     } else if (source) {
3514         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &source->sample_spec), tag, PA_ERR_INVALID);
3515
3516         pa_log_debug("Client %s changes volume of source %s.", client_name, source->name);
3517         pa_source_set_volume(source, &volume, TRUE);
3518     } else if (si) {
3519         CHECK_VALIDITY(c->pstream, si->volume_writable, tag, PA_ERR_BADSTATE);
3520         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &si->sample_spec), tag, PA_ERR_INVALID);
3521
3522         pa_log_debug("Client %s changes volume of sink input %s.",
3523                      client_name,
3524                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3525         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3526     }
3527
3528     pa_pstream_send_simple_ack(c->pstream, tag);
3529 }
3530
3531 static void command_set_mute(
3532         pa_pdispatch *pd,
3533         uint32_t command,
3534         uint32_t tag,
3535         pa_tagstruct *t,
3536         void *userdata) {
3537
3538     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3539     uint32_t idx;
3540     pa_bool_t mute;
3541     pa_sink *sink = NULL;
3542     pa_source *source = NULL;
3543     pa_sink_input *si = NULL;
3544     const char *name = NULL, *client_name;
3545
3546     pa_native_connection_assert_ref(c);
3547     pa_assert(t);
3548
3549     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3550         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3551         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3552         pa_tagstruct_get_boolean(t, &mute) ||
3553         !pa_tagstruct_eof(t)) {
3554         protocol_error(c);
3555         return;
3556     }
3557
3558     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3559     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_MUTE ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3560     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3561     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3562     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3563
3564     switch (command) {
3565
3566         case PA_COMMAND_SET_SINK_MUTE:
3567             if (idx != PA_INVALID_INDEX)
3568                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3569             else
3570                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3571
3572             break;
3573
3574         case PA_COMMAND_SET_SOURCE_MUTE:
3575             if (idx != PA_INVALID_INDEX)
3576                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3577             else
3578                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3579
3580             break;
3581
3582         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3583             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3584             break;
3585
3586         default:
3587             pa_assert_not_reached();
3588     }
3589
3590     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3591
3592     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3593
3594     if (sink) {
3595         pa_log_debug("Client %s changes mute of sink %s.", client_name, sink->name);
3596         pa_sink_set_mute(sink, mute, TRUE);
3597     } else if (source) {
3598         pa_log_debug("Client %s changes mute of source %s.", client_name, source->name);
3599         pa_source_set_mute(source, mute, TRUE);
3600     } else if (si) {
3601         pa_log_debug("Client %s changes mute of sink input %s.",
3602                      client_name,
3603                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3604         pa_sink_input_set_mute(si, mute, TRUE);
3605     }
3606
3607     pa_pstream_send_simple_ack(c->pstream, tag);
3608 }
3609
3610 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3611     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3612     uint32_t idx;
3613     pa_bool_t b;
3614     playback_stream *s;
3615
3616     pa_native_connection_assert_ref(c);
3617     pa_assert(t);
3618
3619     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3620         pa_tagstruct_get_boolean(t, &b) < 0 ||
3621         !pa_tagstruct_eof(t)) {
3622         protocol_error(c);
3623         return;
3624     }
3625
3626     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3627     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3628     s = pa_idxset_get_by_index(c->output_streams, idx);
3629     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3630     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3631
3632     pa_sink_input_cork(s->sink_input, b);
3633
3634     if (b)
3635         s->is_underrun = TRUE;
3636
3637     pa_pstream_send_simple_ack(c->pstream, tag);
3638 }
3639
3640 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3641     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3642     uint32_t idx;
3643     playback_stream *s;
3644
3645     pa_native_connection_assert_ref(c);
3646     pa_assert(t);
3647
3648     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3649         !pa_tagstruct_eof(t)) {
3650         protocol_error(c);
3651         return;
3652     }
3653
3654     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3655     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3656     s = pa_idxset_get_by_index(c->output_streams, idx);
3657     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3658     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3659
3660     switch (command) {
3661         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3662             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3663             break;
3664
3665         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3666             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3667             break;
3668
3669         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3670             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3671             break;
3672
3673         default:
3674             pa_assert_not_reached();
3675     }
3676
3677     pa_pstream_send_simple_ack(c->pstream, tag);
3678 }
3679
3680 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3681     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3682     uint32_t idx;
3683     record_stream *s;
3684     pa_bool_t b;
3685
3686     pa_native_connection_assert_ref(c);
3687     pa_assert(t);
3688
3689     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3690         pa_tagstruct_get_boolean(t, &b) < 0 ||
3691         !pa_tagstruct_eof(t)) {
3692         protocol_error(c);
3693         return;
3694     }
3695
3696     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3697     s = pa_idxset_get_by_index(c->record_streams, idx);
3698     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3699
3700     pa_source_output_cork(s->source_output, b);
3701     pa_memblockq_prebuf_force(s->memblockq);
3702     pa_pstream_send_simple_ack(c->pstream, tag);
3703 }
3704
3705 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3706     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3707     uint32_t idx;
3708     record_stream *s;
3709
3710     pa_native_connection_assert_ref(c);
3711     pa_assert(t);
3712
3713     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3714         !pa_tagstruct_eof(t)) {
3715         protocol_error(c);
3716         return;
3717     }
3718
3719     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3720     s = pa_idxset_get_by_index(c->record_streams, idx);
3721     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3722
3723     pa_memblockq_flush_read(s->memblockq);
3724     pa_pstream_send_simple_ack(c->pstream, tag);
3725 }
3726
3727 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3728     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3729     uint32_t idx;
3730     pa_buffer_attr a;
3731     pa_tagstruct *reply;
3732
3733     pa_native_connection_assert_ref(c);
3734     pa_assert(t);
3735
3736     memset(&a, 0, sizeof(a));
3737
3738     if (pa_tagstruct_getu32(t, &idx) < 0) {
3739         protocol_error(c);
3740         return;
3741     }
3742
3743     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3744
3745     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3746         playback_stream *s;
3747         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3748
3749         s = pa_idxset_get_by_index(c->output_streams, idx);
3750         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3751         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3752
3753         if (pa_tagstruct_get(
3754                     t,
3755                     PA_TAG_U32, &a.maxlength,
3756                     PA_TAG_U32, &a.tlength,
3757                     PA_TAG_U32, &a.prebuf,
3758                     PA_TAG_U32, &a.minreq,
3759                     PA_TAG_INVALID) < 0 ||
3760             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3761             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3762             !pa_tagstruct_eof(t)) {
3763             protocol_error(c);
3764             return;
3765         }
3766
3767         s->adjust_latency = adjust_latency;
3768         s->early_requests = early_requests;
3769         s->buffer_attr = a;
3770
3771         fix_playback_buffer_attr(s);
3772         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3773
3774         reply = reply_new(tag);
3775         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3776         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3777         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3778         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3779
3780         if (c->version >= 13)
3781             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3782
3783     } else {
3784         record_stream *s;
3785         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3786         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3787
3788         s = pa_idxset_get_by_index(c->record_streams, idx);
3789         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3790
3791         if (pa_tagstruct_get(
3792                     t,
3793                     PA_TAG_U32, &a.maxlength,
3794                     PA_TAG_U32, &a.fragsize,
3795                     PA_TAG_INVALID) < 0 ||
3796             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3797             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3798             !pa_tagstruct_eof(t)) {
3799             protocol_error(c);
3800             return;
3801         }
3802
3803         s->adjust_latency = adjust_latency;
3804         s->early_requests = early_requests;
3805         s->buffer_attr = a;
3806
3807         fix_record_buffer_attr_pre(s);
3808         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3809         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3810         fix_record_buffer_attr_post(s);
3811
3812         reply = reply_new(tag);
3813         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3814         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3815
3816         if (c->version >= 13)
3817             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3818     }
3819
3820     pa_pstream_send_tagstruct(c->pstream, reply);
3821 }
3822
3823 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3824     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3825     uint32_t idx;
3826     uint32_t rate;
3827
3828     pa_native_connection_assert_ref(c);
3829     pa_assert(t);
3830
3831     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3832         pa_tagstruct_getu32(t, &rate) < 0 ||
3833         !pa_tagstruct_eof(t)) {
3834         protocol_error(c);
3835         return;
3836     }
3837
3838     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3839     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3840
3841     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3842         playback_stream *s;
3843
3844         s = pa_idxset_get_by_index(c->output_streams, idx);
3845         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3846         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3847
3848         pa_sink_input_set_rate(s->sink_input, rate);
3849
3850     } else {
3851         record_stream *s;
3852         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3853
3854         s = pa_idxset_get_by_index(c->record_streams, idx);
3855         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3856
3857         pa_source_output_set_rate(s->source_output, rate);
3858     }
3859
3860     pa_pstream_send_simple_ack(c->pstream, tag);
3861 }
3862
3863 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3864     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3865     uint32_t idx;
3866     uint32_t mode;
3867     pa_proplist *p;
3868
3869     pa_native_connection_assert_ref(c);
3870     pa_assert(t);
3871
3872     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3873
3874     p = pa_proplist_new();
3875
3876     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3877
3878         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3879             pa_tagstruct_get_proplist(t, p) < 0 ||
3880             !pa_tagstruct_eof(t)) {
3881             protocol_error(c);
3882             pa_proplist_free(p);
3883             return;
3884         }
3885
3886     } else {
3887
3888         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3889             pa_tagstruct_getu32(t, &mode) < 0 ||
3890             pa_tagstruct_get_proplist(t, p) < 0 ||
3891             !pa_tagstruct_eof(t)) {
3892             protocol_error(c);
3893             pa_proplist_free(p);
3894             return;
3895         }
3896     }
3897
3898     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3899         pa_proplist_free(p);
3900         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3901     }
3902
3903     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3904         playback_stream *s;
3905
3906         s = pa_idxset_get_by_index(c->output_streams, idx);
3907         if (!s || !playback_stream_isinstance(s)) {
3908             pa_proplist_free(p);
3909             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3910         }
3911         pa_sink_input_update_proplist(s->sink_input, mode, p);
3912
3913     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3914         record_stream *s;
3915
3916         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3917             pa_proplist_free(p);
3918             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3919         }
3920         pa_source_output_update_proplist(s->source_output, mode, p);
3921
3922     } else {
3923         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3924
3925         pa_client_update_proplist(c->client, mode, p);
3926     }
3927
3928     pa_pstream_send_simple_ack(c->pstream, tag);
3929     pa_proplist_free(p);
3930 }
3931
3932 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3933     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3934     uint32_t idx;
3935     unsigned changed = 0;
3936     pa_proplist *p;
3937     pa_strlist *l = NULL;
3938
3939     pa_native_connection_assert_ref(c);
3940     pa_assert(t);
3941
3942     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3943
3944     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3945
3946         if (pa_tagstruct_getu32(t, &idx) < 0) {
3947             protocol_error(c);
3948             return;
3949         }
3950     }
3951
3952     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3953         playback_stream *s;
3954
3955         s = pa_idxset_get_by_index(c->output_streams, idx);
3956         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3957         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3958
3959         p = s->sink_input->proplist;
3960
3961     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3962         record_stream *s;
3963
3964         s = pa_idxset_get_by_index(c->record_streams, idx);
3965         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3966
3967         p = s->source_output->proplist;
3968     } else {
3969         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3970
3971         p = c->client->proplist;
3972     }
3973
3974     for (;;) {
3975         const char *k;
3976
3977         if (pa_tagstruct_gets(t, &k) < 0) {
3978             protocol_error(c);
3979             pa_strlist_free(l);
3980             return;
3981         }
3982
3983         if (!k)
3984             break;
3985
3986         l = pa_strlist_prepend(l, k);
3987     }
3988
3989     if (!pa_tagstruct_eof(t)) {
3990         protocol_error(c);
3991         pa_strlist_free(l);
3992         return;
3993     }
3994
3995     for (;;) {
3996         char *z;
3997
3998         l = pa_strlist_pop(l, &z);
3999
4000         if (!z)
4001             break;
4002
4003         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
4004         pa_xfree(z);
4005     }
4006
4007     pa_pstream_send_simple_ack(c->pstream, tag);
4008
4009     if (changed) {
4010         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
4011             playback_stream *s;
4012
4013             s = pa_idxset_get_by_index(c->output_streams, idx);
4014             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
4015
4016         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
4017             record_stream *s;
4018
4019             s = pa_idxset_get_by_index(c->record_streams, idx);
4020             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
4021
4022         } else {
4023             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
4024             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
4025         }
4026     }
4027 }
4028
4029 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4030     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4031     const char *s;
4032
4033     pa_native_connection_assert_ref(c);
4034     pa_assert(t);
4035
4036     if (pa_tagstruct_gets(t, &s) < 0 ||
4037         !pa_tagstruct_eof(t)) {
4038         protocol_error(c);
4039         return;
4040     }
4041
4042     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4043     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
4044
4045     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
4046         pa_source *source;
4047
4048         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
4049         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4050
4051         pa_namereg_set_default_source(c->protocol->core, source);
4052     } else {
4053         pa_sink *sink;
4054         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
4055
4056         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
4057         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4058
4059         pa_namereg_set_default_sink(c->protocol->core, sink);
4060     }
4061
4062     pa_pstream_send_simple_ack(c->pstream, tag);
4063 }
4064
4065 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4066     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4067     uint32_t idx;
4068     const char *name;
4069
4070     pa_native_connection_assert_ref(c);
4071     pa_assert(t);
4072
4073     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4074         pa_tagstruct_gets(t, &name) < 0 ||
4075         !pa_tagstruct_eof(t)) {
4076         protocol_error(c);
4077         return;
4078     }
4079
4080     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4081     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
4082
4083     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
4084         playback_stream *s;
4085
4086         s = pa_idxset_get_by_index(c->output_streams, idx);
4087         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4088         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4089
4090         pa_sink_input_set_name(s->sink_input, name);
4091
4092     } else {
4093         record_stream *s;
4094         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
4095
4096         s = pa_idxset_get_by_index(c->record_streams, idx);
4097         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4098
4099         pa_source_output_set_name(s->source_output, name);
4100     }
4101
4102     pa_pstream_send_simple_ack(c->pstream, tag);
4103 }
4104
4105 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4106     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4107     uint32_t idx;
4108
4109     pa_native_connection_assert_ref(c);
4110     pa_assert(t);
4111
4112     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4113         !pa_tagstruct_eof(t)) {
4114         protocol_error(c);
4115         return;
4116     }
4117
4118     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4119
4120     if (command == PA_COMMAND_KILL_CLIENT) {
4121         pa_client *client;
4122
4123         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
4124         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
4125
4126         pa_native_connection_ref(c);
4127         pa_client_kill(client);
4128
4129     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
4130         pa_sink_input *s;
4131
4132         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4133         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4134
4135         pa_native_connection_ref(c);
4136         pa_sink_input_kill(s);
4137     } else {
4138         pa_source_output *s;
4139
4140         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4141
4142         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4143         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4144
4145         pa_native_connection_ref(c);
4146         pa_source_output_kill(s);
4147     }
4148
4149     pa_pstream_send_simple_ack(c->pstream, tag);
4150     pa_native_connection_unref(c);
4151 }
4152
4153 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4154     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4155     pa_module *m;
4156     const char *name, *argument;
4157     pa_tagstruct *reply;
4158
4159     pa_native_connection_assert_ref(c);
4160     pa_assert(t);
4161
4162     if (pa_tagstruct_gets(t, &name) < 0 ||
4163         pa_tagstruct_gets(t, &argument) < 0 ||
4164         !pa_tagstruct_eof(t)) {
4165         protocol_error(c);
4166         return;
4167     }
4168
4169     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4170     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4171     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4172
4173     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4174         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4175         return;
4176     }
4177
4178     reply = reply_new(tag);
4179     pa_tagstruct_putu32(reply, m->index);
4180     pa_pstream_send_tagstruct(c->pstream, reply);
4181 }
4182
4183 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4184     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4185     uint32_t idx;
4186     pa_module *m;
4187
4188     pa_native_connection_assert_ref(c);
4189     pa_assert(t);
4190
4191     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4192         !pa_tagstruct_eof(t)) {
4193         protocol_error(c);
4194         return;
4195     }
4196
4197     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4198     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4199     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4200
4201     pa_module_unload_request(m, FALSE);
4202     pa_pstream_send_simple_ack(c->pstream, tag);
4203 }
4204
4205 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4206     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4207     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4208     const char *name_device = NULL;
4209
4210     pa_native_connection_assert_ref(c);
4211     pa_assert(t);
4212
4213     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4214         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4215         pa_tagstruct_gets(t, &name_device) < 0 ||
4216         !pa_tagstruct_eof(t)) {
4217         protocol_error(c);
4218         return;
4219     }
4220
4221     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4222     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4223
4224     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name_or_wildcard(name_device, command == PA_COMMAND_MOVE_SINK_INPUT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4225     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4226     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4227     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4228
4229     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4230         pa_sink_input *si = NULL;
4231         pa_sink *sink = NULL;
4232
4233         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4234
4235         if (idx_device != PA_INVALID_INDEX)
4236             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4237         else
4238             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4239
4240         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4241
4242         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4243             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4244             return;
4245         }
4246     } else {
4247         pa_source_output *so = NULL;
4248         pa_source *source;
4249
4250         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4251
4252         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4253
4254         if (idx_device != PA_INVALID_INDEX)
4255             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4256         else
4257             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4258
4259         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4260
4261         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4262             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4263             return;
4264         }
4265     }
4266
4267     pa_pstream_send_simple_ack(c->pstream, tag);
4268 }
4269
4270 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4271     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4272     uint32_t idx = PA_INVALID_INDEX;
4273     const char *name = NULL;
4274     pa_bool_t b;
4275
4276     pa_native_connection_assert_ref(c);
4277     pa_assert(t);
4278
4279     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4280         pa_tagstruct_gets(t, &name) < 0 ||
4281         pa_tagstruct_get_boolean(t, &b) < 0 ||
4282         !pa_tagstruct_eof(t)) {
4283         protocol_error(c);
4284         return;
4285     }
4286
4287     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4288     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SUSPEND_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE) || *name == 0, tag, PA_ERR_INVALID);
4289     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4290     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4291     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4292
4293     if (command == PA_COMMAND_SUSPEND_SINK) {
4294
4295         if (idx == PA_INVALID_INDEX && name && !*name) {
4296
4297             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4298
4299             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4300                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4301                 return;
4302             }
4303         } else {
4304             pa_sink *sink = NULL;
4305
4306             if (idx != PA_INVALID_INDEX)
4307                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4308             else
4309                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4310
4311             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4312
4313             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4314                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4315                 return;
4316             }
4317         }
4318     } else {
4319
4320         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4321
4322         if (idx == PA_INVALID_INDEX && name && !*name) {
4323
4324             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4325
4326             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4327                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4328                 return;
4329             }
4330
4331         } else {
4332             pa_source *source;
4333
4334             if (idx != PA_INVALID_INDEX)
4335                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4336             else
4337                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4338
4339             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4340
4341             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4342                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4343                 return;
4344             }
4345         }
4346     }
4347
4348     pa_pstream_send_simple_ack(c->pstream, tag);
4349 }
4350
4351 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4352     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4353     uint32_t idx = PA_INVALID_INDEX;
4354     const char *name = NULL;
4355     pa_module *m;
4356     pa_native_protocol_ext_cb_t cb;
4357
4358     pa_native_connection_assert_ref(c);
4359     pa_assert(t);
4360
4361     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4362         pa_tagstruct_gets(t, &name) < 0) {
4363         protocol_error(c);
4364         return;
4365     }
4366
4367     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4368     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4369     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4370     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4371     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4372
4373     if (idx != PA_INVALID_INDEX)
4374         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4375     else {
4376         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4377             if (strcmp(name, m->name) == 0)
4378                 break;
4379     }
4380
4381     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4382     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4383
4384     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4385     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4386
4387     if (cb(c->protocol, m, c, tag, t) < 0)
4388         protocol_error(c);
4389 }
4390
4391 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4392     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4393     uint32_t idx = PA_INVALID_INDEX;
4394     const char *name = NULL, *profile = NULL;
4395     pa_card *card = NULL;
4396     int ret;
4397
4398     pa_native_connection_assert_ref(c);
4399     pa_assert(t);
4400
4401     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4402         pa_tagstruct_gets(t, &name) < 0 ||
4403         pa_tagstruct_gets(t, &profile) < 0 ||
4404         !pa_tagstruct_eof(t)) {
4405         protocol_error(c);
4406         return;
4407     }
4408
4409     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4410     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4411     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4412     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4413     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4414
4415     if (idx != PA_INVALID_INDEX)
4416         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4417     else
4418         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4419
4420     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4421
4422     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4423         pa_pstream_send_error(c->pstream, tag, -ret);
4424         return;
4425     }
4426
4427     pa_pstream_send_simple_ack(c->pstream, tag);
4428 }
4429
4430 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4431     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4432     uint32_t idx = PA_INVALID_INDEX;
4433     const char *name = NULL, *port = NULL;
4434     int ret;
4435
4436     pa_native_connection_assert_ref(c);
4437     pa_assert(t);
4438
4439     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4440         pa_tagstruct_gets(t, &name) < 0 ||
4441         pa_tagstruct_gets(t, &port) < 0 ||
4442         !pa_tagstruct_eof(t)) {
4443         protocol_error(c);
4444         return;
4445     }
4446
4447     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4448     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_PORT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4449     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4450     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4451     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4452
4453     if (command == PA_COMMAND_SET_SINK_PORT) {
4454         pa_sink *sink;
4455
4456         if (idx != PA_INVALID_INDEX)
4457             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4458         else
4459             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4460
4461         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4462
4463         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4464             pa_pstream_send_error(c->pstream, tag, -ret);
4465             return;
4466         }
4467     } else {
4468         pa_source *source;
4469
4470         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4471
4472         if (idx != PA_INVALID_INDEX)
4473             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4474         else
4475             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4476
4477         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4478
4479         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4480             pa_pstream_send_error(c->pstream, tag, -ret);
4481             return;
4482         }
4483     }
4484
4485     pa_pstream_send_simple_ack(c->pstream, tag);
4486 }
4487
4488 /*** pstream callbacks ***/
4489
4490 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4491     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4492
4493     pa_assert(p);
4494     pa_assert(packet);
4495     pa_native_connection_assert_ref(c);
4496
4497     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4498         pa_log("invalid packet.");
4499         native_connection_unlink(c);
4500     }
4501 }
4502
4503 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4504     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4505     output_stream *stream;
4506
4507     pa_assert(p);
4508     pa_assert(chunk);
4509     pa_native_connection_assert_ref(c);
4510
4511     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4512         pa_log_debug("Client sent block for invalid stream.");
4513         /* Ignoring */
4514         return;
4515     }
4516
4517 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4518
4519     if (playback_stream_isinstance(stream)) {
4520         playback_stream *ps = PLAYBACK_STREAM(stream);
4521
4522         pa_atomic_inc(&ps->seek_or_post_in_queue);
4523         if (chunk->memblock) {
4524             if (seek != PA_SEEK_RELATIVE || offset != 0)
4525                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, chunk, NULL);
4526             else
4527                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4528         } else
4529             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4530
4531     } else {
4532         upload_stream *u = UPLOAD_STREAM(stream);
4533         size_t l;
4534
4535         if (!u->memchunk.memblock) {
4536             if (u->length == chunk->length && chunk->memblock) {
4537                 u->memchunk = *chunk;
4538                 pa_memblock_ref(u->memchunk.memblock);
4539                 u->length = 0;
4540             } else {
4541                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4542                 u->memchunk.index = u->memchunk.length = 0;
4543             }
4544         }
4545
4546         pa_assert(u->memchunk.memblock);
4547
4548         l = u->length;
4549         if (l > chunk->length)
4550             l = chunk->length;
4551
4552         if (l > 0) {
4553             void *dst;
4554             dst = pa_memblock_acquire(u->memchunk.memblock);
4555
4556             if (chunk->memblock) {
4557                 void *src;
4558                 src = pa_memblock_acquire(chunk->memblock);
4559
4560                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4561                        (uint8_t*) src + chunk->index, l);
4562
4563                 pa_memblock_release(chunk->memblock);
4564             } else
4565                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4566
4567             pa_memblock_release(u->memchunk.memblock);
4568
4569             u->memchunk.length += l;
4570             u->length -= l;
4571         }
4572     }
4573 }
4574
4575 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4576     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4577
4578     pa_assert(p);
4579     pa_native_connection_assert_ref(c);
4580
4581     native_connection_unlink(c);
4582     pa_log_info("Connection died.");
4583 }
4584
4585 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4586     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4587
4588     pa_assert(p);
4589     pa_native_connection_assert_ref(c);
4590
4591     native_connection_send_memblock(c);
4592 }
4593
4594 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4595     pa_thread_mq *q;
4596
4597     if (!(q = pa_thread_mq_get()))
4598         pa_pstream_send_revoke(p, block_id);
4599     else
4600         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4601 }
4602
4603 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4604     pa_thread_mq *q;
4605
4606     if (!(q = pa_thread_mq_get()))
4607         pa_pstream_send_release(p, block_id);
4608     else
4609         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4610 }
4611
4612 /*** client callbacks ***/
4613
4614 static void client_kill_cb(pa_client *c) {
4615     pa_assert(c);
4616
4617     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4618     pa_log_info("Connection killed.");
4619 }
4620
4621 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4622     pa_tagstruct *t;
4623     pa_native_connection *c;
4624
4625     pa_assert(client);
4626     c = PA_NATIVE_CONNECTION(client->userdata);
4627     pa_native_connection_assert_ref(c);
4628
4629     if (c->version < 15)
4630       return;
4631
4632     t = pa_tagstruct_new(NULL, 0);
4633     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4634     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4635     pa_tagstruct_puts(t, event);
4636     pa_tagstruct_put_proplist(t, pl);
4637     pa_pstream_send_tagstruct(c->pstream, t);
4638 }
4639
4640 /*** module entry points ***/
4641
4642 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4643     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4644
4645     pa_assert(m);
4646     pa_native_connection_assert_ref(c);
4647     pa_assert(c->auth_timeout_event == e);
4648
4649     if (!c->authorized) {
4650         native_connection_unlink(c);
4651         pa_log_info("Connection terminated due to authentication timeout.");
4652     }
4653 }
4654
4655 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4656     pa_native_connection *c;
4657     char pname[128];
4658     pa_client *client;
4659     pa_client_new_data data;
4660
4661     pa_assert(p);
4662     pa_assert(io);
4663     pa_assert(o);
4664
4665     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4666         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4667         pa_iochannel_free(io);
4668         return;
4669     }
4670
4671     pa_client_new_data_init(&data);
4672     data.module = o->module;
4673     data.driver = __FILE__;
4674     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4675     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4676     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4677     client = pa_client_new(p->core, &data);
4678     pa_client_new_data_done(&data);
4679
4680     if (!client)
4681         return;
4682
4683     c = pa_msgobject_new(pa_native_connection);
4684     c->parent.parent.free = native_connection_free;
4685     c->parent.process_msg = native_connection_process_msg;
4686     c->protocol = p;
4687     c->options = pa_native_options_ref(o);
4688     c->authorized = FALSE;
4689
4690     if (o->auth_anonymous) {
4691         pa_log_info("Client authenticated anonymously.");
4692         c->authorized = TRUE;
4693     }
4694
4695     if (!c->authorized &&
4696         o->auth_ip_acl &&
4697         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4698
4699         pa_log_info("Client authenticated by IP ACL.");
4700         c->authorized = TRUE;
4701     }
4702
4703     if (!c->authorized)
4704         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4705     else
4706         c->auth_timeout_event = NULL;
4707
4708     c->is_local = pa_iochannel_socket_is_local(io);
4709     c->version = 8;
4710
4711     c->client = client;
4712     c->client->kill = client_kill_cb;
4713     c->client->send_event = client_send_event_cb;
4714     c->client->userdata = c;
4715
4716     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4717     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4718     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4719     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4720     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4721     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4722     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4723
4724     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4725
4726     c->record_streams = pa_idxset_new(NULL, NULL);
4727     c->output_streams = pa_idxset_new(NULL, NULL);
4728
4729     c->rrobin_index = PA_IDXSET_INVALID;
4730     c->subscription = NULL;
4731
4732     pa_idxset_put(p->connections, c, NULL);
4733
4734 #ifdef HAVE_CREDS
4735     if (pa_iochannel_creds_supported(io))
4736         pa_iochannel_creds_enable(io);
4737 #endif
4738
4739     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4740 }
4741
4742 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4743     pa_native_connection *c;
4744     void *state = NULL;
4745
4746     pa_assert(p);
4747     pa_assert(m);
4748
4749     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4750         if (c->options->module == m)
4751             native_connection_unlink(c);
4752 }
4753
4754 static pa_native_protocol* native_protocol_new(pa_core *c) {
4755     pa_native_protocol *p;
4756     pa_native_hook_t h;
4757
4758     pa_assert(c);
4759
4760     p = pa_xnew(pa_native_protocol, 1);
4761     PA_REFCNT_INIT(p);
4762     p->core = c;
4763     p->connections = pa_idxset_new(NULL, NULL);
4764
4765     p->servers = NULL;
4766
4767     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4768
4769     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4770         pa_hook_init(&p->hooks[h], p);
4771
4772     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4773
4774     return p;
4775 }
4776
4777 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4778     pa_native_protocol *p;
4779
4780     if ((p = pa_shared_get(c, "native-protocol")))
4781         return pa_native_protocol_ref(p);
4782
4783     return native_protocol_new(c);
4784 }
4785
4786 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4787     pa_assert(p);
4788     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4789
4790     PA_REFCNT_INC(p);
4791
4792     return p;
4793 }
4794
4795 void pa_native_protocol_unref(pa_native_protocol *p) {
4796     pa_native_connection *c;
4797     pa_native_hook_t h;
4798
4799     pa_assert(p);
4800     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4801
4802     if (PA_REFCNT_DEC(p) > 0)
4803         return;
4804
4805     while ((c = pa_idxset_first(p->connections, NULL)))
4806         native_connection_unlink(c);
4807
4808     pa_idxset_free(p->connections, NULL, NULL);
4809
4810     pa_strlist_free(p->servers);
4811
4812     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4813         pa_hook_done(&p->hooks[h]);
4814
4815     pa_hashmap_free(p->extensions, NULL, NULL);
4816
4817     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4818
4819     pa_xfree(p);
4820 }
4821
4822 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4823     pa_assert(p);
4824     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4825     pa_assert(name);
4826
4827     p->servers = pa_strlist_prepend(p->servers, name);
4828
4829     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4830 }
4831
4832 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4833     pa_assert(p);
4834     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4835     pa_assert(name);
4836
4837     p->servers = pa_strlist_remove(p->servers, name);
4838
4839     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4840 }
4841
4842 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4843     pa_assert(p);
4844     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4845
4846     return p->hooks;
4847 }
4848
4849 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4850     pa_assert(p);
4851     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4852
4853     return p->servers;
4854 }
4855
4856 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4857     pa_assert(p);
4858     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4859     pa_assert(m);
4860     pa_assert(cb);
4861     pa_assert(!pa_hashmap_get(p->extensions, m));
4862
4863     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4864     return 0;
4865 }
4866
4867 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4868     pa_assert(p);
4869     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4870     pa_assert(m);
4871
4872     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4873 }
4874
4875 pa_native_options* pa_native_options_new(void) {
4876     pa_native_options *o;
4877
4878     o = pa_xnew0(pa_native_options, 1);
4879     PA_REFCNT_INIT(o);
4880
4881     return o;
4882 }
4883
4884 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4885     pa_assert(o);
4886     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4887
4888     PA_REFCNT_INC(o);
4889
4890     return o;
4891 }
4892
4893 void pa_native_options_unref(pa_native_options *o) {
4894     pa_assert(o);
4895     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4896
4897     if (PA_REFCNT_DEC(o) > 0)
4898         return;
4899
4900     pa_xfree(o->auth_group);
4901
4902     if (o->auth_ip_acl)
4903         pa_ip_acl_free(o->auth_ip_acl);
4904
4905     if (o->auth_cookie)
4906         pa_auth_cookie_unref(o->auth_cookie);
4907
4908     pa_xfree(o);
4909 }
4910
4911 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4912     pa_bool_t enabled;
4913     const char *acl;
4914
4915     pa_assert(o);
4916     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4917     pa_assert(ma);
4918
4919     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4920         pa_log("auth-anonymous= expects a boolean argument.");
4921         return -1;
4922     }
4923
4924     enabled = TRUE;
4925     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4926         pa_log("auth-group-enabled= expects a boolean argument.");
4927         return -1;
4928     }
4929
4930     pa_xfree(o->auth_group);
4931     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4932
4933 #ifndef HAVE_CREDS
4934     if (o->auth_group)
4935         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4936 #endif
4937
4938     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4939         pa_ip_acl *ipa;
4940
4941         if (!(ipa = pa_ip_acl_new(acl))) {
4942             pa_log("Failed to parse IP ACL '%s'", acl);
4943             return -1;
4944         }
4945
4946         if (o->auth_ip_acl)
4947             pa_ip_acl_free(o->auth_ip_acl);
4948
4949         o->auth_ip_acl = ipa;
4950     }
4951
4952     enabled = TRUE;
4953     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4954         pa_log("auth-cookie-enabled= expects a boolean argument.");
4955         return -1;
4956     }
4957
4958     if (o->auth_cookie)
4959         pa_auth_cookie_unref(o->auth_cookie);
4960
4961     if (enabled) {
4962         const char *cn;
4963
4964         /* The new name for this is 'auth-cookie', for compat reasons
4965          * we check the old name too */
4966         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4967             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4968                 cn = PA_NATIVE_COOKIE_FILE;
4969
4970         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4971             return -1;
4972
4973     } else
4974           o->auth_cookie = NULL;
4975
4976     return 0;
4977 }
4978
4979 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4980     pa_native_connection_assert_ref(c);
4981
4982     return c->pstream;
4983 }
4984
4985 pa_client* pa_native_connection_get_client(pa_native_connection *c) {
4986     pa_native_connection_assert_ref(c);
4987
4988     return c->client;
4989 }