protocol-native: Remove written differently but functionally redundant check.
[platform/upstream/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38 #include <pulse/internal.h>
39
40 #include <pulsecore/native-common.h>
41 #include <pulsecore/packet.h>
42 #include <pulsecore/client.h>
43 #include <pulsecore/source-output.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pstream.h>
46 #include <pulsecore/tagstruct.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* #define PROTOCOL_NATIVE_DEBUG */
64
65 /* Kick a client if it doesn't authenticate within this time */
66 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
67
68 /* Don't accept more connection than this */
69 #define MAX_CONNECTIONS 64
70
71 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
72 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
73 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
74 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
75
76 struct pa_native_protocol;
77
78 typedef struct record_stream {
79     pa_msgobject parent;
80
81     pa_native_connection *connection;
82     uint32_t index;
83
84     pa_source_output *source_output;
85     pa_memblockq *memblockq;
86
87     bool adjust_latency:1;
88     bool early_requests:1;
89
90     /* Requested buffer attributes */
91     pa_buffer_attr buffer_attr_req;
92     /* Fixed-up and adjusted buffer attributes */
93     pa_buffer_attr buffer_attr;
94
95     pa_atomic_t on_the_fly;
96     pa_usec_t configured_source_latency;
97     size_t drop_initial;
98
99     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
100     size_t on_the_fly_snapshot;
101     pa_usec_t current_monitor_latency;
102     pa_usec_t current_source_latency;
103 } record_stream;
104
105 #define RECORD_STREAM(o) (record_stream_cast(o))
106 PA_DEFINE_PRIVATE_CLASS(record_stream, pa_msgobject);
107
108 typedef struct output_stream {
109     pa_msgobject parent;
110 } output_stream;
111
112 #define OUTPUT_STREAM(o) (output_stream_cast(o))
113 PA_DEFINE_PRIVATE_CLASS(output_stream, pa_msgobject);
114
115 typedef struct playback_stream {
116     output_stream parent;
117
118     pa_native_connection *connection;
119     uint32_t index;
120
121     pa_sink_input *sink_input;
122     pa_memblockq *memblockq;
123
124     bool adjust_latency:1;
125     bool early_requests:1;
126
127     bool is_underrun:1;
128     bool drain_request:1;
129     uint32_t drain_tag;
130     uint32_t syncid;
131
132     /* Optimization to avoid too many rewinds with a lot of small blocks */
133     pa_atomic_t seek_or_post_in_queue;
134     int64_t seek_windex;
135
136     pa_atomic_t missing;
137     pa_usec_t configured_sink_latency;
138     /* Requested buffer attributes */
139     pa_buffer_attr buffer_attr_req;
140     /* Fixed-up and adjusted buffer attributes */
141     pa_buffer_attr buffer_attr;
142
143     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
144     int64_t read_index, write_index;
145     size_t render_memblockq_length;
146     pa_usec_t current_sink_latency;
147     uint64_t playing_for, underrun_for;
148 } playback_stream;
149
150 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
151 PA_DEFINE_PRIVATE_CLASS(playback_stream, output_stream);
152
153 typedef struct upload_stream {
154     output_stream parent;
155
156     pa_native_connection *connection;
157     uint32_t index;
158
159     pa_memchunk memchunk;
160     size_t length;
161     char *name;
162     pa_sample_spec sample_spec;
163     pa_channel_map channel_map;
164     pa_proplist *proplist;
165 } upload_stream;
166
167 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
168 PA_DEFINE_PRIVATE_CLASS(upload_stream, output_stream);
169
170 struct pa_native_connection {
171     pa_msgobject parent;
172     pa_native_protocol *protocol;
173     pa_native_options *options;
174     bool authorized:1;
175     bool is_local:1;
176     uint32_t version;
177     pa_client *client;
178     pa_pstream *pstream;
179     pa_pdispatch *pdispatch;
180     pa_idxset *record_streams, *output_streams;
181     uint32_t rrobin_index;
182     pa_subscription *subscription;
183     pa_time_event *auth_timeout_event;
184 };
185
186 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
187 PA_DEFINE_PRIVATE_CLASS(pa_native_connection, pa_msgobject);
188
189 struct pa_native_protocol {
190     PA_REFCNT_DECLARE;
191
192     pa_core *core;
193     pa_idxset *connections;
194
195     pa_strlist *servers;
196     pa_hook hooks[PA_NATIVE_HOOK_MAX];
197
198     pa_hashmap *extensions;
199 };
200
201 enum {
202     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
203 };
204
205 enum {
206     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
207     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
208     SINK_INPUT_MESSAGE_FLUSH,
209     SINK_INPUT_MESSAGE_TRIGGER,
210     SINK_INPUT_MESSAGE_SEEK,
211     SINK_INPUT_MESSAGE_PREBUF_FORCE,
212     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
213     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
214 };
215
216 enum {
217     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
218     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
219     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
220     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
221     PLAYBACK_STREAM_MESSAGE_STARTED,
222     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
223 };
224
225 enum {
226     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
227 };
228
229 enum {
230     CONNECTION_MESSAGE_RELEASE,
231     CONNECTION_MESSAGE_REVOKE
232 };
233
234 static bool sink_input_process_underrun_cb(pa_sink_input *i);
235 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
236 static void sink_input_kill_cb(pa_sink_input *i);
237 static void sink_input_suspend_cb(pa_sink_input *i, bool suspend);
238 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
239 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
240 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
241 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
242 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
243
244 static void native_connection_send_memblock(pa_native_connection *c);
245 static void playback_stream_request_bytes(struct playback_stream*s);
246
247 static void source_output_kill_cb(pa_source_output *o);
248 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
249 static void source_output_suspend_cb(pa_source_output *o, bool suspend);
250 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
251 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
252 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
253
254 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
255 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
256
257 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
289 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
290 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
291 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
292 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
293 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
294 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
295 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
296 static void command_set_port_latency_offset(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
297
298 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
299     [PA_COMMAND_ERROR] = NULL,
300     [PA_COMMAND_TIMEOUT] = NULL,
301     [PA_COMMAND_REPLY] = NULL,
302     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
303     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
304     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
305     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
306     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
307     [PA_COMMAND_AUTH] = command_auth,
308     [PA_COMMAND_REQUEST] = NULL,
309     [PA_COMMAND_EXIT] = command_exit,
310     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
311     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
312     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
313     [PA_COMMAND_STAT] = command_stat,
314     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
315     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
316     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
317     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
318     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
319     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
320     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
321     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
322     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
323     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
324     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
325     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
326     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
327     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
328     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
329     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
330     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
331     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
332     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
333     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
334     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
335     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
336     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
337     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
338     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
339
340     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
341     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
342     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
343     [PA_COMMAND_SET_SOURCE_OUTPUT_VOLUME] = command_set_volume,
344
345     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
346     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
347     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
348     [PA_COMMAND_SET_SOURCE_OUTPUT_MUTE] = command_set_mute,
349
350     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
351     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
352
353     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
354     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
355     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
356     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
357
358     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
359     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
360
361     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
362     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
363     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
364     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
365     [PA_COMMAND_KILL_CLIENT] = command_kill,
366     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
367     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
368     [PA_COMMAND_LOAD_MODULE] = command_load_module,
369     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
370
371     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
372     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
373     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
374     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
375
376     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
377     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
378
379     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
380     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
381
382     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
383     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
384
385     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
386     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
387     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
388
389     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
390     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
391     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
392
393     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
394
395     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
396     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
397
398     [PA_COMMAND_SET_PORT_LATENCY_OFFSET] = command_set_port_latency_offset,
399
400     [PA_COMMAND_EXTENSION] = command_extension
401 };
402
403 /* structure management */
404
405 /* Called from main context */
406 static void upload_stream_unlink(upload_stream *s) {
407     pa_assert(s);
408
409     if (!s->connection)
410         return;
411
412     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
413     s->connection = NULL;
414     upload_stream_unref(s);
415 }
416
417 /* Called from main context */
418 static void upload_stream_free(pa_object *o) {
419     upload_stream *s = UPLOAD_STREAM(o);
420     pa_assert(s);
421
422     upload_stream_unlink(s);
423
424     pa_xfree(s->name);
425
426     if (s->proplist)
427         pa_proplist_free(s->proplist);
428
429     if (s->memchunk.memblock)
430         pa_memblock_unref(s->memchunk.memblock);
431
432     pa_xfree(s);
433 }
434
435 /* Called from main context */
436 static upload_stream* upload_stream_new(
437         pa_native_connection *c,
438         const pa_sample_spec *ss,
439         const pa_channel_map *map,
440         const char *name,
441         size_t length,
442         pa_proplist *p) {
443
444     upload_stream *s;
445
446     pa_assert(c);
447     pa_assert(ss);
448     pa_assert(name);
449     pa_assert(length > 0);
450     pa_assert(p);
451
452     s = pa_msgobject_new(upload_stream);
453     s->parent.parent.parent.free = upload_stream_free;
454     s->connection = c;
455     s->sample_spec = *ss;
456     s->channel_map = *map;
457     s->name = pa_xstrdup(name);
458     pa_memchunk_reset(&s->memchunk);
459     s->length = length;
460     s->proplist = pa_proplist_copy(p);
461     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
462
463     pa_idxset_put(c->output_streams, s, &s->index);
464
465     return s;
466 }
467
468 /* Called from main context */
469 static void record_stream_unlink(record_stream *s) {
470     pa_assert(s);
471
472     if (!s->connection)
473         return;
474
475     if (s->source_output) {
476         pa_source_output_unlink(s->source_output);
477         pa_source_output_unref(s->source_output);
478         s->source_output = NULL;
479     }
480
481     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
482     s->connection = NULL;
483     record_stream_unref(s);
484 }
485
486 /* Called from main context */
487 static void record_stream_free(pa_object *o) {
488     record_stream *s = RECORD_STREAM(o);
489     pa_assert(s);
490
491     record_stream_unlink(s);
492
493     pa_memblockq_free(s->memblockq);
494     pa_xfree(s);
495 }
496
497 /* Called from main context */
498 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
499     record_stream *s = RECORD_STREAM(o);
500     record_stream_assert_ref(s);
501
502     if (!s->connection)
503         return -1;
504
505     switch (code) {
506
507         case RECORD_STREAM_MESSAGE_POST_DATA:
508
509             /* We try to keep up to date with how many bytes are
510              * currently on the fly */
511             pa_atomic_sub(&s->on_the_fly, chunk->length);
512
513             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
514 /*                 pa_log_warn("Failed to push data into output queue."); */
515                 return -1;
516             }
517
518             if (!pa_pstream_is_pending(s->connection->pstream))
519                 native_connection_send_memblock(s->connection);
520
521             break;
522     }
523
524     return 0;
525 }
526
527 /* Called from main context */
528 static void fix_record_buffer_attr_pre(record_stream *s) {
529
530     size_t frame_size;
531     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
532
533     pa_assert(s);
534
535     /* This function will be called from the main thread, before as
536      * well as after the source output has been activated using
537      * pa_source_output_put()! That means it may not touch any
538      * ->thread_info data! */
539
540     frame_size = pa_frame_size(&s->source_output->sample_spec);
541     s->buffer_attr = s->buffer_attr_req;
542
543     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
544         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
545     if (s->buffer_attr.maxlength <= 0)
546         s->buffer_attr.maxlength = (uint32_t) frame_size;
547
548     if (s->buffer_attr.fragsize == (uint32_t) -1)
549         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
550     if (s->buffer_attr.fragsize <= 0)
551         s->buffer_attr.fragsize = (uint32_t) frame_size;
552
553     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
554
555     if (s->early_requests) {
556
557         /* In early request mode we need to emulate the classic
558          * fragment-based playback model. We do this setting the source
559          * latency to the fragment size. */
560
561         source_usec = fragsize_usec;
562
563     } else if (s->adjust_latency) {
564
565         /* So, the user asked us to adjust the latency according to
566          * what the source can provide. Half the latency will be
567          * spent on the hw buffer, half of it in the async buffer
568          * queue we maintain for each client. */
569
570         source_usec = fragsize_usec/2;
571
572     } else {
573
574         /* Ok, the user didn't ask us to adjust the latency, hence we
575          * don't */
576
577         source_usec = (pa_usec_t) -1;
578     }
579
580     if (source_usec != (pa_usec_t) -1)
581         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
582     else
583         s->configured_source_latency = 0;
584
585     if (s->early_requests) {
586
587         /* Ok, we didn't necessarily get what we were asking for, so
588          * let's tell the user */
589
590         fragsize_usec = s->configured_source_latency;
591
592     } else if (s->adjust_latency) {
593
594         /* Now subtract what we actually got */
595
596         if (fragsize_usec >= s->configured_source_latency*2)
597             fragsize_usec -= s->configured_source_latency;
598         else
599             fragsize_usec = s->configured_source_latency;
600     }
601
602     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
603         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
604
605         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
606
607     if (s->buffer_attr.fragsize <= 0)
608         s->buffer_attr.fragsize = (uint32_t) frame_size;
609 }
610
611 /* Called from main context */
612 static void fix_record_buffer_attr_post(record_stream *s) {
613     size_t base;
614
615     pa_assert(s);
616
617     /* This function will be called from the main thread, before as
618      * well as after the source output has been activated using
619      * pa_source_output_put()! That means it may not touch and
620      * ->thread_info data! */
621
622     base = pa_frame_size(&s->source_output->sample_spec);
623
624     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
625     if (s->buffer_attr.fragsize <= 0)
626         s->buffer_attr.fragsize = base;
627
628     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
629         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
630 }
631
632 /* Called from main context */
633 static record_stream* record_stream_new(
634         pa_native_connection *c,
635         pa_source *source,
636         pa_sample_spec *ss,
637         pa_channel_map *map,
638         pa_idxset *formats,
639         pa_buffer_attr *attr,
640         pa_cvolume *volume,
641         bool muted,
642         bool muted_set,
643         pa_source_output_flags_t flags,
644         pa_proplist *p,
645         bool adjust_latency,
646         bool early_requests,
647         bool relative_volume,
648         bool peak_detect,
649         pa_sink_input *direct_on_input,
650         int *ret) {
651
652     record_stream *s;
653     pa_source_output *source_output = NULL;
654     pa_source_output_new_data data;
655     char *memblockq_name;
656
657     pa_assert(c);
658     pa_assert(ss);
659     pa_assert(p);
660     pa_assert(ret);
661
662     pa_source_output_new_data_init(&data);
663
664     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
665     data.driver = __FILE__;
666     data.module = c->options->module;
667     data.client = c->client;
668     if (source)
669         pa_source_output_new_data_set_source(&data, source, false);
670     if (pa_sample_spec_valid(ss))
671         pa_source_output_new_data_set_sample_spec(&data, ss);
672     if (pa_channel_map_valid(map))
673         pa_source_output_new_data_set_channel_map(&data, map);
674     if (formats)
675         pa_source_output_new_data_set_formats(&data, formats);
676     data.direct_on_input = direct_on_input;
677     if (volume) {
678         pa_source_output_new_data_set_volume(&data, volume);
679         data.volume_is_absolute = !relative_volume;
680         data.save_volume = false;
681     }
682     if (muted_set) {
683         pa_source_output_new_data_set_muted(&data, muted);
684         data.save_muted = false;
685     }
686     if (peak_detect)
687         data.resample_method = PA_RESAMPLER_PEAKS;
688     data.flags = flags;
689
690     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data);
691
692     pa_source_output_new_data_done(&data);
693
694     if (!source_output)
695         return NULL;
696
697     s = pa_msgobject_new(record_stream);
698     s->parent.parent.free = record_stream_free;
699     s->parent.process_msg = record_stream_process_msg;
700     s->connection = c;
701     s->source_output = source_output;
702     s->buffer_attr_req = *attr;
703     s->adjust_latency = adjust_latency;
704     s->early_requests = early_requests;
705     pa_atomic_store(&s->on_the_fly, 0);
706
707     s->source_output->parent.process_msg = source_output_process_msg;
708     s->source_output->push = source_output_push_cb;
709     s->source_output->kill = source_output_kill_cb;
710     s->source_output->get_latency = source_output_get_latency_cb;
711     s->source_output->moving = source_output_moving_cb;
712     s->source_output->suspend = source_output_suspend_cb;
713     s->source_output->send_event = source_output_send_event_cb;
714     s->source_output->userdata = s;
715
716     fix_record_buffer_attr_pre(s);
717
718     memblockq_name = pa_sprintf_malloc("native protocol record stream memblockq [%u]", s->source_output->index);
719     s->memblockq = pa_memblockq_new(
720             memblockq_name,
721             0,
722             s->buffer_attr.maxlength,
723             0,
724             &source_output->sample_spec,
725             1,
726             0,
727             0,
728             NULL);
729     pa_xfree(memblockq_name);
730
731     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
732     fix_record_buffer_attr_post(s);
733
734     *ss = s->source_output->sample_spec;
735     *map = s->source_output->channel_map;
736
737     pa_idxset_put(c->record_streams, s, &s->index);
738
739     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
740                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
741                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
742                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
743
744     pa_source_output_put(s->source_output);
745     return s;
746 }
747
748 /* Called from main context */
749 static void record_stream_send_killed(record_stream *r) {
750     pa_tagstruct *t;
751     record_stream_assert_ref(r);
752
753     t = pa_tagstruct_new(NULL, 0);
754     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
755     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
756     pa_tagstruct_putu32(t, r->index);
757     pa_pstream_send_tagstruct(r->connection->pstream, t);
758 }
759
760 /* Called from main context */
761 static void playback_stream_unlink(playback_stream *s) {
762     pa_assert(s);
763
764     if (!s->connection)
765         return;
766
767     if (s->sink_input) {
768         pa_sink_input_unlink(s->sink_input);
769         pa_sink_input_unref(s->sink_input);
770         s->sink_input = NULL;
771     }
772
773     if (s->drain_request)
774         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
775
776     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
777     s->connection = NULL;
778     playback_stream_unref(s);
779 }
780
781 /* Called from main context */
782 static void playback_stream_free(pa_object* o) {
783     playback_stream *s = PLAYBACK_STREAM(o);
784     pa_assert(s);
785
786     playback_stream_unlink(s);
787
788     pa_memblockq_free(s->memblockq);
789     pa_xfree(s);
790 }
791
792 /* Called from main context */
793 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
794     playback_stream *s = PLAYBACK_STREAM(o);
795     playback_stream_assert_ref(s);
796
797     if (!s->connection)
798         return -1;
799
800     switch (code) {
801
802         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
803             pa_tagstruct *t;
804             int l = 0;
805
806             for (;;) {
807                 if ((l = pa_atomic_load(&s->missing)) <= 0)
808                     return 0;
809
810                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
811                     break;
812             }
813
814             t = pa_tagstruct_new(NULL, 0);
815             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
816             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
817             pa_tagstruct_putu32(t, s->index);
818             pa_tagstruct_putu32(t, (uint32_t) l);
819             pa_pstream_send_tagstruct(s->connection->pstream, t);
820
821 #ifdef PROTOCOL_NATIVE_DEBUG
822             pa_log("Requesting %lu bytes", (unsigned long) l);
823 #endif
824             break;
825         }
826
827         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
828             pa_tagstruct *t;
829
830 #ifdef PROTOCOL_NATIVE_DEBUG
831             pa_log("signalling underflow");
832 #endif
833
834             /* Report that we're empty */
835             t = pa_tagstruct_new(NULL, 0);
836             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
837             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
838             pa_tagstruct_putu32(t, s->index);
839             if (s->connection->version >= 23)
840                 pa_tagstruct_puts64(t, offset);
841             pa_pstream_send_tagstruct(s->connection->pstream, t);
842             break;
843         }
844
845         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
846             pa_tagstruct *t;
847
848             /* Notify the user we're overflowed*/
849             t = pa_tagstruct_new(NULL, 0);
850             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
851             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
852             pa_tagstruct_putu32(t, s->index);
853             pa_pstream_send_tagstruct(s->connection->pstream, t);
854             break;
855         }
856
857         case PLAYBACK_STREAM_MESSAGE_STARTED:
858
859             if (s->connection->version >= 13) {
860                 pa_tagstruct *t;
861
862                 /* Notify the user we started playback */
863                 t = pa_tagstruct_new(NULL, 0);
864                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
865                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
866                 pa_tagstruct_putu32(t, s->index);
867                 pa_pstream_send_tagstruct(s->connection->pstream, t);
868             }
869
870             break;
871
872         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
873             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
874             break;
875
876         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH:
877
878             s->buffer_attr.tlength = (uint32_t) offset;
879
880             if (s->connection->version >= 15) {
881                 pa_tagstruct *t;
882
883                 t = pa_tagstruct_new(NULL, 0);
884                 pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
885                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
886                 pa_tagstruct_putu32(t, s->index);
887                 pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
888                 pa_tagstruct_putu32(t, s->buffer_attr.tlength);
889                 pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
890                 pa_tagstruct_putu32(t, s->buffer_attr.minreq);
891                 pa_tagstruct_put_usec(t, s->configured_sink_latency);
892                 pa_pstream_send_tagstruct(s->connection->pstream, t);
893             }
894
895             break;
896     }
897
898     return 0;
899 }
900
901 /* Called from main context */
902 static void fix_playback_buffer_attr(playback_stream *s) {
903     size_t frame_size, max_prebuf;
904     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
905
906     pa_assert(s);
907
908 #ifdef PROTOCOL_NATIVE_DEBUG
909     pa_log("Client requested: maxlength=%li bytes tlength=%li bytes minreq=%li bytes prebuf=%li bytes",
910            (long) s->buffer_attr_req.maxlength,
911            (long) s->buffer_attr_req.tlength,
912            (long) s->buffer_attr_req.minreq,
913            (long) s->buffer_attr_req.prebuf);
914
915     pa_log("Client requested: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms",
916            (unsigned long) (pa_bytes_to_usec(s->buffer_attr_req.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
917            (unsigned long) (pa_bytes_to_usec(s->buffer_attr_req.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
918            (unsigned long) (pa_bytes_to_usec(s->buffer_attr_req.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
919            (unsigned long) (pa_bytes_to_usec(s->buffer_attr_req.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC));
920 #endif
921
922     /* This function will be called from the main thread, before as
923      * well as after the sink input has been activated using
924      * pa_sink_input_put()! That means it may not touch any
925      * ->thread_info data, such as the memblockq! */
926
927     frame_size = pa_frame_size(&s->sink_input->sample_spec);
928     s->buffer_attr = s->buffer_attr_req;
929
930     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
931         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
932     if (s->buffer_attr.maxlength <= 0)
933         s->buffer_attr.maxlength = (uint32_t) frame_size;
934
935     if (s->buffer_attr.tlength == (uint32_t) -1)
936         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
937     if (s->buffer_attr.tlength <= 0)
938         s->buffer_attr.tlength = (uint32_t) frame_size;
939     if (s->buffer_attr.tlength > s->buffer_attr.maxlength)
940         s->buffer_attr.tlength = s->buffer_attr.maxlength;
941
942     if (s->buffer_attr.minreq == (uint32_t) -1) {
943         uint32_t process = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
944         /* With low-latency, tlength/4 gives a decent default in all of traditional, adjust latency and early request modes. */
945         uint32_t m = s->buffer_attr.tlength / 4;
946         if (frame_size)
947             m -= m % frame_size;
948         s->buffer_attr.minreq = PA_MIN(process, m);
949     }
950     if (s->buffer_attr.minreq <= 0)
951         s->buffer_attr.minreq = (uint32_t) frame_size;
952
953     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
954         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
955
956     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
957     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
958
959     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
960                 (double) tlength_usec / PA_USEC_PER_MSEC,
961                 (double) minreq_usec / PA_USEC_PER_MSEC);
962
963     if (s->early_requests) {
964
965         /* In early request mode we need to emulate the classic
966          * fragment-based playback model. We do this setting the sink
967          * latency to the fragment size. */
968
969         sink_usec = minreq_usec;
970         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
971
972     } else if (s->adjust_latency) {
973
974         /* So, the user asked us to adjust the latency of the stream
975          * buffer according to the what the sink can provide. The
976          * tlength passed in shall be the overall latency. Roughly
977          * half the latency will be spent on the hw buffer, the other
978          * half of it in the async buffer queue we maintain for each
979          * client. In between we'll have a safety space of size
980          * 2*minreq. Why the 2*minreq? When the hw buffer is completely
981          * empty and needs to be filled, then our buffer must have
982          * enough data to fulfill this request immediately and thus
983          * have at least the same tlength as the size of the hw
984          * buffer. It additionally needs space for 2 times minreq
985          * because if the buffer ran empty and a partial fillup
986          * happens immediately on the next iteration we need to be
987          * able to fulfill it and give the application also minreq
988          * time to fill it up again for the next request Makes 2 times
989          * minreq in plus.. */
990
991         if (tlength_usec > minreq_usec*2)
992             sink_usec = (tlength_usec - minreq_usec*2)/2;
993         else
994             sink_usec = 0;
995
996         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
997
998     } else {
999
1000         /* Ok, the user didn't ask us to adjust the latency, but we
1001          * still need to make sure that the parameters from the user
1002          * do make sense. */
1003
1004         if (tlength_usec > minreq_usec*2)
1005             sink_usec = (tlength_usec - minreq_usec*2);
1006         else
1007             sink_usec = 0;
1008
1009         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
1010     }
1011
1012     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
1013
1014     if (s->early_requests) {
1015
1016         /* Ok, we didn't necessarily get what we were asking for, so
1017          * let's tell the user */
1018
1019         minreq_usec = s->configured_sink_latency;
1020
1021     } else if (s->adjust_latency) {
1022
1023         /* Ok, we didn't necessarily get what we were asking for, so
1024          * let's subtract from what we asked for for the remaining
1025          * buffer space */
1026
1027         if (tlength_usec >= s->configured_sink_latency)
1028             tlength_usec -= s->configured_sink_latency;
1029     }
1030
1031     pa_log_debug("Requested latency=%0.2f ms, Received latency=%0.2f ms",
1032                  (double) sink_usec / PA_USEC_PER_MSEC,
1033                  (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1034
1035     /* FIXME: This is actually larger than necessary, since not all of
1036      * the sink latency is actually rewritable. */
1037     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
1038         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
1039
1040     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
1041         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
1042         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
1043
1044     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
1045         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
1046         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
1047
1048     if (s->buffer_attr.minreq <= 0) {
1049         s->buffer_attr.minreq = (uint32_t) frame_size;
1050         s->buffer_attr.tlength += (uint32_t) frame_size*2;
1051     }
1052
1053     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
1054         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
1055
1056     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
1057
1058     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
1059         s->buffer_attr.prebuf > max_prebuf)
1060         s->buffer_attr.prebuf = max_prebuf;
1061
1062 #ifdef PROTOCOL_NATIVE_DEBUG
1063     pa_log("Client accepted: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms",
1064            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
1065            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
1066            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
1067            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC));
1068 #endif
1069 }
1070
1071 /* Called from main context */
1072 static playback_stream* playback_stream_new(
1073         pa_native_connection *c,
1074         pa_sink *sink,
1075         pa_sample_spec *ss,
1076         pa_channel_map *map,
1077         pa_idxset *formats,
1078         pa_buffer_attr *a,
1079         pa_cvolume *volume,
1080         bool muted,
1081         bool muted_set,
1082         pa_sink_input_flags_t flags,
1083         pa_proplist *p,
1084         bool adjust_latency,
1085         bool early_requests,
1086         bool relative_volume,
1087         uint32_t syncid,
1088         uint32_t *missing,
1089         int *ret) {
1090
1091     /* Note: This function takes ownership of the 'formats' param, so we need
1092      * to take extra care to not leak it */
1093
1094     playback_stream *ssync;
1095     playback_stream *s = NULL;
1096     pa_sink_input *sink_input = NULL;
1097     pa_memchunk silence;
1098     uint32_t idx;
1099     int64_t start_index;
1100     pa_sink_input_new_data data;
1101     char *memblockq_name;
1102
1103     pa_assert(c);
1104     pa_assert(ss);
1105     pa_assert(missing);
1106     pa_assert(p);
1107     pa_assert(ret);
1108
1109     /* Find syncid group */
1110     PA_IDXSET_FOREACH(ssync, c->output_streams, idx) {
1111
1112         if (!playback_stream_isinstance(ssync))
1113             continue;
1114
1115         if (ssync->syncid == syncid)
1116             break;
1117     }
1118
1119     /* Synced streams must connect to the same sink */
1120     if (ssync) {
1121
1122         if (!sink)
1123             sink = ssync->sink_input->sink;
1124         else if (sink != ssync->sink_input->sink) {
1125             *ret = PA_ERR_INVALID;
1126             goto out;
1127         }
1128     }
1129
1130     pa_sink_input_new_data_init(&data);
1131
1132     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1133     data.driver = __FILE__;
1134     data.module = c->options->module;
1135     data.client = c->client;
1136     if (sink)
1137         pa_sink_input_new_data_set_sink(&data, sink, false);
1138     if (pa_sample_spec_valid(ss))
1139         pa_sink_input_new_data_set_sample_spec(&data, ss);
1140     if (pa_channel_map_valid(map))
1141         pa_sink_input_new_data_set_channel_map(&data, map);
1142     if (formats) {
1143         pa_sink_input_new_data_set_formats(&data, formats);
1144         /* Ownership transferred to new_data, so we don't free it ourselves */
1145         formats = NULL;
1146     }
1147     if (volume) {
1148         pa_sink_input_new_data_set_volume(&data, volume);
1149         data.volume_is_absolute = !relative_volume;
1150         data.save_volume = false;
1151     }
1152     if (muted_set) {
1153         pa_sink_input_new_data_set_muted(&data, muted);
1154         data.save_muted = false;
1155     }
1156     data.sync_base = ssync ? ssync->sink_input : NULL;
1157     data.flags = flags;
1158
1159     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data);
1160
1161     pa_sink_input_new_data_done(&data);
1162
1163     if (!sink_input)
1164         goto out;
1165
1166     s = pa_msgobject_new(playback_stream);
1167     s->parent.parent.parent.free = playback_stream_free;
1168     s->parent.parent.process_msg = playback_stream_process_msg;
1169     s->connection = c;
1170     s->syncid = syncid;
1171     s->sink_input = sink_input;
1172     s->is_underrun = true;
1173     s->drain_request = false;
1174     pa_atomic_store(&s->missing, 0);
1175     s->buffer_attr_req = *a;
1176     s->adjust_latency = adjust_latency;
1177     s->early_requests = early_requests;
1178     pa_atomic_store(&s->seek_or_post_in_queue, 0);
1179     s->seek_windex = -1;
1180
1181     s->sink_input->parent.process_msg = sink_input_process_msg;
1182     s->sink_input->pop = sink_input_pop_cb;
1183     s->sink_input->process_underrun = sink_input_process_underrun_cb;
1184     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1185     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1186     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1187     s->sink_input->kill = sink_input_kill_cb;
1188     s->sink_input->moving = sink_input_moving_cb;
1189     s->sink_input->suspend = sink_input_suspend_cb;
1190     s->sink_input->send_event = sink_input_send_event_cb;
1191     s->sink_input->userdata = s;
1192
1193     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1194
1195     fix_playback_buffer_attr(s);
1196
1197     pa_sink_input_get_silence(sink_input, &silence);
1198     memblockq_name = pa_sprintf_malloc("native protocol playback stream memblockq [%u]", s->sink_input->index);
1199     s->memblockq = pa_memblockq_new(
1200             memblockq_name,
1201             start_index,
1202             s->buffer_attr.maxlength,
1203             s->buffer_attr.tlength,
1204             &sink_input->sample_spec,
1205             s->buffer_attr.prebuf,
1206             s->buffer_attr.minreq,
1207             0,
1208             &silence);
1209     pa_xfree(memblockq_name);
1210     pa_memblock_unref(silence.memblock);
1211
1212     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1213
1214     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1215
1216 #ifdef PROTOCOL_NATIVE_DEBUG
1217     pa_log("missing original: %li", (long int) *missing);
1218 #endif
1219
1220     *ss = s->sink_input->sample_spec;
1221     *map = s->sink_input->channel_map;
1222
1223     pa_idxset_put(c->output_streams, s, &s->index);
1224
1225     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1226                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1227                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1228                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1229                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1230
1231     pa_sink_input_put(s->sink_input);
1232
1233 out:
1234     if (formats)
1235         pa_idxset_free(formats, (pa_free_cb_t) pa_format_info_free);
1236
1237     return s;
1238 }
1239
1240 /* Called from IO context */
1241 static void playback_stream_request_bytes(playback_stream *s) {
1242     size_t m, minreq;
1243     int previous_missing;
1244
1245     playback_stream_assert_ref(s);
1246
1247     m = pa_memblockq_pop_missing(s->memblockq);
1248
1249     /* pa_log("request_bytes(%lu) (tlength=%lu minreq=%lu length=%lu really missing=%lli)", */
1250     /*        (unsigned long) m, */
1251     /*        pa_memblockq_get_tlength(s->memblockq), */
1252     /*        pa_memblockq_get_minreq(s->memblockq), */
1253     /*        pa_memblockq_get_length(s->memblockq), */
1254     /*        (long long) pa_memblockq_get_tlength(s->memblockq) - (long long) pa_memblockq_get_length(s->memblockq)); */
1255
1256     if (m <= 0)
1257         return;
1258
1259 #ifdef PROTOCOL_NATIVE_DEBUG
1260     pa_log("request_bytes(%lu)", (unsigned long) m);
1261 #endif
1262
1263     previous_missing = pa_atomic_add(&s->missing, (int) m);
1264     minreq = pa_memblockq_get_minreq(s->memblockq);
1265
1266     if (pa_memblockq_prebuf_active(s->memblockq) ||
1267         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1268         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1269 }
1270
1271 /* Called from main context */
1272 static void playback_stream_send_killed(playback_stream *p) {
1273     pa_tagstruct *t;
1274     playback_stream_assert_ref(p);
1275
1276     t = pa_tagstruct_new(NULL, 0);
1277     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1278     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1279     pa_tagstruct_putu32(t, p->index);
1280     pa_pstream_send_tagstruct(p->connection->pstream, t);
1281 }
1282
1283 /* Called from main context */
1284 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1285     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1286     pa_native_connection_assert_ref(c);
1287
1288     if (!c->protocol)
1289         return -1;
1290
1291     switch (code) {
1292
1293         case CONNECTION_MESSAGE_REVOKE:
1294             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1295             break;
1296
1297         case CONNECTION_MESSAGE_RELEASE:
1298             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1299             break;
1300     }
1301
1302     return 0;
1303 }
1304
1305 /* Called from main context */
1306 static void native_connection_unlink(pa_native_connection *c) {
1307     record_stream *r;
1308     output_stream *o;
1309
1310     pa_assert(c);
1311
1312     if (!c->protocol)
1313         return;
1314
1315     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1316
1317     if (c->options)
1318         pa_native_options_unref(c->options);
1319
1320     while ((r = pa_idxset_first(c->record_streams, NULL)))
1321         record_stream_unlink(r);
1322
1323     while ((o = pa_idxset_first(c->output_streams, NULL)))
1324         if (playback_stream_isinstance(o))
1325             playback_stream_unlink(PLAYBACK_STREAM(o));
1326         else
1327             upload_stream_unlink(UPLOAD_STREAM(o));
1328
1329     if (c->subscription)
1330         pa_subscription_free(c->subscription);
1331
1332     if (c->pstream)
1333         pa_pstream_unlink(c->pstream);
1334
1335     if (c->auth_timeout_event) {
1336         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1337         c->auth_timeout_event = NULL;
1338     }
1339
1340     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1341     c->protocol = NULL;
1342     pa_native_connection_unref(c);
1343 }
1344
1345 /* Called from main context */
1346 static void native_connection_free(pa_object *o) {
1347     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1348
1349     pa_assert(c);
1350
1351     native_connection_unlink(c);
1352
1353     pa_idxset_free(c->record_streams, NULL);
1354     pa_idxset_free(c->output_streams, NULL);
1355
1356     pa_pdispatch_unref(c->pdispatch);
1357     pa_pstream_unref(c->pstream);
1358     pa_client_free(c->client);
1359
1360     pa_xfree(c);
1361 }
1362
1363 /* Called from main context */
1364 static void native_connection_send_memblock(pa_native_connection *c) {
1365     uint32_t start;
1366     record_stream *r;
1367
1368     start = PA_IDXSET_INVALID;
1369     for (;;) {
1370         pa_memchunk chunk;
1371
1372         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1373             return;
1374
1375         if (start == PA_IDXSET_INVALID)
1376             start = c->rrobin_index;
1377         else if (start == c->rrobin_index)
1378             return;
1379
1380         if (pa_memblockq_peek(r->memblockq, &chunk) >= 0) {
1381             pa_memchunk schunk = chunk;
1382
1383             if (schunk.length > r->buffer_attr.fragsize)
1384                 schunk.length = r->buffer_attr.fragsize;
1385
1386             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1387
1388             pa_memblockq_drop(r->memblockq, schunk.length);
1389             pa_memblock_unref(schunk.memblock);
1390
1391             return;
1392         }
1393     }
1394 }
1395
1396 /*** sink input callbacks ***/
1397
1398 /* Called from thread context */
1399 static void handle_seek(playback_stream *s, int64_t indexw) {
1400     playback_stream_assert_ref(s);
1401
1402 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1403
1404     if (s->sink_input->thread_info.underrun_for > 0) {
1405
1406 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1407
1408         if (pa_memblockq_is_readable(s->memblockq)) {
1409
1410             /* We just ended an underrun, let's ask the sink
1411              * for a complete rewind rewrite */
1412
1413             pa_log_debug("Requesting rewind due to end of underrun.");
1414             pa_sink_input_request_rewind(s->sink_input,
1415                                          (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 :
1416                                                    s->sink_input->thread_info.underrun_for),
1417                                          false, true, false);
1418         }
1419
1420     } else {
1421         int64_t indexr;
1422
1423         indexr = pa_memblockq_get_read_index(s->memblockq);
1424
1425         if (indexw < indexr) {
1426             /* OK, the sink already asked for this data, so
1427              * let's have it ask us again */
1428
1429             pa_log_debug("Requesting rewind due to rewrite.");
1430             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), true, false, false);
1431         }
1432     }
1433
1434     playback_stream_request_bytes(s);
1435 }
1436
1437 static void flush_write_no_account(pa_memblockq *q) {
1438     pa_memblockq_flush_write(q, false);
1439 }
1440
1441 /* Called from thread context */
1442 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1443     pa_sink_input *i = PA_SINK_INPUT(o);
1444     playback_stream *s;
1445
1446     pa_sink_input_assert_ref(i);
1447     s = PLAYBACK_STREAM(i->userdata);
1448     playback_stream_assert_ref(s);
1449
1450     switch (code) {
1451
1452         case SINK_INPUT_MESSAGE_SEEK:
1453         case SINK_INPUT_MESSAGE_POST_DATA: {
1454             int64_t windex = pa_memblockq_get_write_index(s->memblockq);
1455
1456             if (code == SINK_INPUT_MESSAGE_SEEK) {
1457                 /* The client side is incapable of accounting correctly
1458                  * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1459                  * able to deal with that. */
1460
1461                 pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1462                 windex = PA_MIN(windex, pa_memblockq_get_write_index(s->memblockq));
1463             }
1464
1465             if (chunk && pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1466                 if (pa_log_ratelimit(PA_LOG_WARN))
1467                     pa_log_warn("Failed to push data into queue");
1468                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1469                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, true);
1470             }
1471
1472             /* If more data is in queue, we rewind later instead. */
1473             if (s->seek_windex != -1)
1474                 windex = PA_MIN(windex, s->seek_windex);
1475             if (pa_atomic_dec(&s->seek_or_post_in_queue) > 1)
1476                 s->seek_windex = windex;
1477             else {
1478                 s->seek_windex = -1;
1479                 handle_seek(s, windex);
1480             }
1481             return 0;
1482         }
1483
1484         case SINK_INPUT_MESSAGE_DRAIN:
1485         case SINK_INPUT_MESSAGE_FLUSH:
1486         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1487         case SINK_INPUT_MESSAGE_TRIGGER: {
1488
1489             int64_t windex;
1490             pa_sink_input *isync;
1491             void (*func)(pa_memblockq *bq);
1492
1493             switch (code) {
1494                 case SINK_INPUT_MESSAGE_FLUSH:
1495                     func = flush_write_no_account;
1496                     break;
1497
1498                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1499                     func = pa_memblockq_prebuf_force;
1500                     break;
1501
1502                 case SINK_INPUT_MESSAGE_DRAIN:
1503                 case SINK_INPUT_MESSAGE_TRIGGER:
1504                     func = pa_memblockq_prebuf_disable;
1505                     break;
1506
1507                 default:
1508                     pa_assert_not_reached();
1509             }
1510
1511             windex = pa_memblockq_get_write_index(s->memblockq);
1512             func(s->memblockq);
1513             handle_seek(s, windex);
1514
1515             /* Do the same for all other members in the sync group */
1516             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1517                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1518                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1519                 func(ssync->memblockq);
1520                 handle_seek(ssync, windex);
1521             }
1522
1523             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1524                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1525                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1526                 func(ssync->memblockq);
1527                 handle_seek(ssync, windex);
1528             }
1529
1530             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1531                 if (!pa_memblockq_is_readable(s->memblockq))
1532                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1533                 else {
1534                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1535                     s->drain_request = true;
1536                 }
1537             }
1538
1539             return 0;
1540         }
1541
1542         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1543             /* Atomically get a snapshot of all timing parameters... */
1544             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1545             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1546             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1547             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1548             s->underrun_for = s->sink_input->thread_info.underrun_for;
1549             s->playing_for = s->sink_input->thread_info.playing_for;
1550
1551             return 0;
1552
1553         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1554             int64_t windex;
1555
1556             windex = pa_memblockq_get_write_index(s->memblockq);
1557
1558             /* We enable prebuffering so that after CORKED -> RUNNING
1559              * transitions we don't have trouble with underruns in case the
1560              * buffer has too little data. This must not be done when draining
1561              * has been requested, however, otherwise the buffered audio would
1562              * never play. */
1563             if (!s->drain_request)
1564                 pa_memblockq_prebuf_force(s->memblockq);
1565
1566             handle_seek(s, windex);
1567
1568             /* Fall through to the default handler */
1569             break;
1570         }
1571
1572         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1573             pa_usec_t *r = userdata;
1574
1575             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1576
1577             /* Fall through, the default handler will add in the extra
1578              * latency added by the resampler */
1579             break;
1580         }
1581
1582         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1583             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1584             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1585             return 0;
1586         }
1587     }
1588
1589     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1590 }
1591
1592 static bool handle_input_underrun(playback_stream *s, bool force) {
1593     bool send_drain;
1594
1595     if (pa_memblockq_is_readable(s->memblockq))
1596         return false;
1597
1598     if (!s->is_underrun)
1599         pa_log_debug("%s %s of '%s'", force ? "Actual" : "Implicit",
1600             s->drain_request ? "drain" : "underrun", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME)));
1601
1602     send_drain = s->drain_request && (force || pa_sink_input_safe_to_remove(s->sink_input));
1603
1604     if (send_drain) {
1605          s->drain_request = false;
1606          pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1607          pa_log_debug("Drain acknowledged of '%s'", pa_strnull(pa_proplist_gets(s->sink_input->proplist, PA_PROP_MEDIA_NAME)));
1608     } else if (!s->is_underrun) {
1609          pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
1610     }
1611     s->is_underrun = true;
1612     playback_stream_request_bytes(s);
1613     return true;
1614 }
1615
1616 /* Called from thread context */
1617 static bool sink_input_process_underrun_cb(pa_sink_input *i) {
1618     playback_stream *s;
1619
1620     pa_sink_input_assert_ref(i);
1621     s = PLAYBACK_STREAM(i->userdata);
1622     playback_stream_assert_ref(s);
1623
1624     return handle_input_underrun(s, true);
1625 }
1626
1627 /* Called from thread context */
1628 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1629     playback_stream *s;
1630
1631     pa_sink_input_assert_ref(i);
1632     s = PLAYBACK_STREAM(i->userdata);
1633     playback_stream_assert_ref(s);
1634     pa_assert(chunk);
1635
1636 #ifdef PROTOCOL_NATIVE_DEBUG
1637     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq));
1638 #endif
1639
1640     if (!handle_input_underrun(s, false))
1641         s->is_underrun = false;
1642
1643     /* This call will not fail with prebuf=0, hence we check for
1644        underrun explicitly in handle_input_underrun */
1645     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1646         return -1;
1647
1648     chunk->length = PA_MIN(nbytes, chunk->length);
1649
1650     if (i->thread_info.underrun_for > 0)
1651         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1652
1653     pa_memblockq_drop(s->memblockq, chunk->length);
1654     playback_stream_request_bytes(s);
1655
1656     return 0;
1657 }
1658
1659 /* Called from thread context */
1660 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1661     playback_stream *s;
1662
1663     pa_sink_input_assert_ref(i);
1664     s = PLAYBACK_STREAM(i->userdata);
1665     playback_stream_assert_ref(s);
1666
1667     /* If we are in an underrun, then we don't rewind */
1668     if (i->thread_info.underrun_for > 0)
1669         return;
1670
1671     pa_memblockq_rewind(s->memblockq, nbytes);
1672 }
1673
1674 /* Called from thread context */
1675 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1676     playback_stream *s;
1677
1678     pa_sink_input_assert_ref(i);
1679     s = PLAYBACK_STREAM(i->userdata);
1680     playback_stream_assert_ref(s);
1681
1682     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1683 }
1684
1685 /* Called from thread context */
1686 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1687     playback_stream *s;
1688     size_t new_tlength, old_tlength;
1689
1690     pa_sink_input_assert_ref(i);
1691     s = PLAYBACK_STREAM(i->userdata);
1692     playback_stream_assert_ref(s);
1693
1694     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1695     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1696
1697     if (old_tlength < new_tlength) {
1698         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1699         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1700         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1701
1702         if (new_tlength == old_tlength)
1703             pa_log_debug("Failed to increase tlength");
1704         else {
1705             pa_log_debug("Notifying client about increased tlength");
1706             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1707         }
1708     }
1709 }
1710
1711 /* Called from main context */
1712 static void sink_input_kill_cb(pa_sink_input *i) {
1713     playback_stream *s;
1714
1715     pa_sink_input_assert_ref(i);
1716     s = PLAYBACK_STREAM(i->userdata);
1717     playback_stream_assert_ref(s);
1718
1719     playback_stream_send_killed(s);
1720     playback_stream_unlink(s);
1721 }
1722
1723 /* Called from main context */
1724 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1725     playback_stream *s;
1726     pa_tagstruct *t;
1727
1728     pa_sink_input_assert_ref(i);
1729     s = PLAYBACK_STREAM(i->userdata);
1730     playback_stream_assert_ref(s);
1731
1732     if (s->connection->version < 15)
1733       return;
1734
1735     t = pa_tagstruct_new(NULL, 0);
1736     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1737     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1738     pa_tagstruct_putu32(t, s->index);
1739     pa_tagstruct_puts(t, event);
1740     pa_tagstruct_put_proplist(t, pl);
1741     pa_pstream_send_tagstruct(s->connection->pstream, t);
1742 }
1743
1744 /* Called from main context */
1745 static void sink_input_suspend_cb(pa_sink_input *i, bool suspend) {
1746     playback_stream *s;
1747     pa_tagstruct *t;
1748
1749     pa_sink_input_assert_ref(i);
1750     s = PLAYBACK_STREAM(i->userdata);
1751     playback_stream_assert_ref(s);
1752
1753     if (s->connection->version < 12)
1754       return;
1755
1756     t = pa_tagstruct_new(NULL, 0);
1757     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1758     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1759     pa_tagstruct_putu32(t, s->index);
1760     pa_tagstruct_put_boolean(t, suspend);
1761     pa_pstream_send_tagstruct(s->connection->pstream, t);
1762 }
1763
1764 /* Called from main context */
1765 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1766     playback_stream *s;
1767     pa_tagstruct *t;
1768
1769     pa_sink_input_assert_ref(i);
1770     s = PLAYBACK_STREAM(i->userdata);
1771     playback_stream_assert_ref(s);
1772
1773     if (!dest)
1774         return;
1775
1776     fix_playback_buffer_attr(s);
1777     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1778     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1779
1780     if (s->connection->version < 12)
1781       return;
1782
1783     t = pa_tagstruct_new(NULL, 0);
1784     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1785     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1786     pa_tagstruct_putu32(t, s->index);
1787     pa_tagstruct_putu32(t, dest->index);
1788     pa_tagstruct_puts(t, dest->name);
1789     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1790
1791     if (s->connection->version >= 13) {
1792         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1793         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1794         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1795         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1796         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1797     }
1798
1799     pa_pstream_send_tagstruct(s->connection->pstream, t);
1800 }
1801
1802 /*** source_output callbacks ***/
1803
1804 /* Called from thread context */
1805 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1806     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1807     record_stream *s;
1808
1809     pa_source_output_assert_ref(o);
1810     s = RECORD_STREAM(o->userdata);
1811     record_stream_assert_ref(s);
1812
1813     switch (code) {
1814         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1815             /* Atomically get a snapshot of all timing parameters... */
1816             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1817             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1818             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1819             return 0;
1820     }
1821
1822     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1823 }
1824
1825 /* Called from thread context */
1826 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1827     record_stream *s;
1828
1829     pa_source_output_assert_ref(o);
1830     s = RECORD_STREAM(o->userdata);
1831     record_stream_assert_ref(s);
1832     pa_assert(chunk);
1833
1834     pa_atomic_add(&s->on_the_fly, chunk->length);
1835     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1836 }
1837
1838 static void source_output_kill_cb(pa_source_output *o) {
1839     record_stream *s;
1840
1841     pa_source_output_assert_ref(o);
1842     s = RECORD_STREAM(o->userdata);
1843     record_stream_assert_ref(s);
1844
1845     record_stream_send_killed(s);
1846     record_stream_unlink(s);
1847 }
1848
1849 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1850     record_stream *s;
1851
1852     pa_source_output_assert_ref(o);
1853     s = RECORD_STREAM(o->userdata);
1854     record_stream_assert_ref(s);
1855
1856     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1857
1858     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1859 }
1860
1861 /* Called from main context */
1862 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1863     record_stream *s;
1864     pa_tagstruct *t;
1865
1866     pa_source_output_assert_ref(o);
1867     s = RECORD_STREAM(o->userdata);
1868     record_stream_assert_ref(s);
1869
1870     if (s->connection->version < 15)
1871       return;
1872
1873     t = pa_tagstruct_new(NULL, 0);
1874     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1875     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1876     pa_tagstruct_putu32(t, s->index);
1877     pa_tagstruct_puts(t, event);
1878     pa_tagstruct_put_proplist(t, pl);
1879     pa_pstream_send_tagstruct(s->connection->pstream, t);
1880 }
1881
1882 /* Called from main context */
1883 static void source_output_suspend_cb(pa_source_output *o, bool suspend) {
1884     record_stream *s;
1885     pa_tagstruct *t;
1886
1887     pa_source_output_assert_ref(o);
1888     s = RECORD_STREAM(o->userdata);
1889     record_stream_assert_ref(s);
1890
1891     if (s->connection->version < 12)
1892       return;
1893
1894     t = pa_tagstruct_new(NULL, 0);
1895     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1896     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1897     pa_tagstruct_putu32(t, s->index);
1898     pa_tagstruct_put_boolean(t, suspend);
1899     pa_pstream_send_tagstruct(s->connection->pstream, t);
1900 }
1901
1902 /* Called from main context */
1903 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1904     record_stream *s;
1905     pa_tagstruct *t;
1906
1907     pa_source_output_assert_ref(o);
1908     s = RECORD_STREAM(o->userdata);
1909     record_stream_assert_ref(s);
1910
1911     if (!dest)
1912         return;
1913
1914     fix_record_buffer_attr_pre(s);
1915     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1916     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1917     fix_record_buffer_attr_post(s);
1918
1919     if (s->connection->version < 12)
1920       return;
1921
1922     t = pa_tagstruct_new(NULL, 0);
1923     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1924     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1925     pa_tagstruct_putu32(t, s->index);
1926     pa_tagstruct_putu32(t, dest->index);
1927     pa_tagstruct_puts(t, dest->name);
1928     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1929
1930     if (s->connection->version >= 13) {
1931         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1932         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1933         pa_tagstruct_put_usec(t, s->configured_source_latency);
1934     }
1935
1936     pa_pstream_send_tagstruct(s->connection->pstream, t);
1937 }
1938
1939 /*** pdispatch callbacks ***/
1940
1941 static void protocol_error(pa_native_connection *c) {
1942     pa_log("protocol error, kicking client");
1943     native_connection_unlink(c);
1944 }
1945
1946 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1947 if (!(expression)) { \
1948     pa_pstream_send_error((pstream), (tag), (error)); \
1949     return; \
1950 } \
1951 } while(0);
1952
1953 #define CHECK_VALIDITY_GOTO(pstream, expression, tag, error, label) do { \
1954 if (!(expression)) { \
1955     pa_pstream_send_error((pstream), (tag), (error)); \
1956     goto label; \
1957 } \
1958 } while(0);
1959
1960 static pa_tagstruct *reply_new(uint32_t tag) {
1961     pa_tagstruct *reply;
1962
1963     reply = pa_tagstruct_new(NULL, 0);
1964     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1965     pa_tagstruct_putu32(reply, tag);
1966     return reply;
1967 }
1968
1969 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1970     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1971     playback_stream *s;
1972     uint32_t sink_index, syncid, missing = 0;
1973     pa_buffer_attr attr;
1974     const char *name = NULL, *sink_name;
1975     pa_sample_spec ss;
1976     pa_channel_map map;
1977     pa_tagstruct *reply;
1978     pa_sink *sink = NULL;
1979     pa_cvolume volume;
1980     bool
1981         corked = false,
1982         no_remap = false,
1983         no_remix = false,
1984         fix_format = false,
1985         fix_rate = false,
1986         fix_channels = false,
1987         no_move = false,
1988         variable_rate = false,
1989         muted = false,
1990         adjust_latency = false,
1991         early_requests = false,
1992         dont_inhibit_auto_suspend = false,
1993         volume_set = true,
1994         muted_set = false,
1995         fail_on_suspend = false,
1996         relative_volume = false,
1997         passthrough = false;
1998
1999     pa_sink_input_flags_t flags = 0;
2000     pa_proplist *p = NULL;
2001     int ret = PA_ERR_INVALID;
2002     uint8_t n_formats = 0;
2003     pa_format_info *format;
2004     pa_idxset *formats = NULL;
2005     uint32_t i;
2006
2007     pa_native_connection_assert_ref(c);
2008     pa_assert(t);
2009     memset(&attr, 0, sizeof(attr));
2010
2011     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2012         pa_tagstruct_get(
2013                 t,
2014                 PA_TAG_SAMPLE_SPEC, &ss,
2015                 PA_TAG_CHANNEL_MAP, &map,
2016                 PA_TAG_U32, &sink_index,
2017                 PA_TAG_STRING, &sink_name,
2018                 PA_TAG_U32, &attr.maxlength,
2019                 PA_TAG_BOOLEAN, &corked,
2020                 PA_TAG_U32, &attr.tlength,
2021                 PA_TAG_U32, &attr.prebuf,
2022                 PA_TAG_U32, &attr.minreq,
2023                 PA_TAG_U32, &syncid,
2024                 PA_TAG_CVOLUME, &volume,
2025                 PA_TAG_INVALID) < 0) {
2026
2027         protocol_error(c);
2028         goto finish;
2029     }
2030
2031     CHECK_VALIDITY_GOTO(c->pstream, c->authorized, tag, PA_ERR_ACCESS, finish);
2032     CHECK_VALIDITY_GOTO(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID, finish);
2033     CHECK_VALIDITY_GOTO(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID, finish);
2034     CHECK_VALIDITY_GOTO(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID, finish);
2035     CHECK_VALIDITY_GOTO(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID, finish);
2036
2037     p = pa_proplist_new();
2038
2039     if (name)
2040         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2041
2042     if (c->version >= 12) {
2043         /* Since 0.9.8 the user can ask for a couple of additional flags */
2044
2045         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2046             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2047             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2048             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2049             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2050             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2051             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2052
2053             protocol_error(c);
2054             goto finish;
2055         }
2056     }
2057
2058     if (c->version >= 13) {
2059
2060         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
2061             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2062             pa_tagstruct_get_proplist(t, p) < 0) {
2063
2064             protocol_error(c);
2065             goto finish;
2066         }
2067     }
2068
2069     if (c->version >= 14) {
2070
2071         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
2072             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2073
2074             protocol_error(c);
2075             goto finish;
2076         }
2077     }
2078
2079     if (c->version >= 15) {
2080
2081         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
2082             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2083             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2084
2085             protocol_error(c);
2086             goto finish;
2087         }
2088     }
2089
2090     if (c->version >= 17) {
2091
2092         if (pa_tagstruct_get_boolean(t, &relative_volume) < 0) {
2093
2094             protocol_error(c);
2095             goto finish;
2096         }
2097     }
2098
2099     if (c->version >= 18) {
2100
2101         if (pa_tagstruct_get_boolean(t, &passthrough) < 0 ) {
2102             protocol_error(c);
2103             goto finish;
2104         }
2105     }
2106
2107     if (c->version >= 21) {
2108
2109         if (pa_tagstruct_getu8(t, &n_formats) < 0) {
2110             protocol_error(c);
2111             goto finish;
2112         }
2113
2114         if (n_formats)
2115             formats = pa_idxset_new(NULL, NULL);
2116
2117         for (i = 0; i < n_formats; i++) {
2118             format = pa_format_info_new();
2119             if (pa_tagstruct_get_format_info(t, format) < 0) {
2120                 protocol_error(c);
2121                 goto finish;
2122             }
2123             pa_idxset_put(formats, format, NULL);
2124         }
2125     }
2126
2127     if (n_formats == 0) {
2128         CHECK_VALIDITY_GOTO(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID, finish);
2129         CHECK_VALIDITY_GOTO(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID, finish);
2130         CHECK_VALIDITY_GOTO(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID, finish);
2131     } else {
2132         PA_IDXSET_FOREACH(format, formats, i) {
2133             CHECK_VALIDITY_GOTO(c->pstream, pa_format_info_valid(format), tag, PA_ERR_INVALID, finish);
2134         }
2135     }
2136
2137     if (!pa_tagstruct_eof(t)) {
2138         protocol_error(c);
2139         goto finish;
2140     }
2141
2142     if (sink_index != PA_INVALID_INDEX) {
2143
2144         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
2145             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2146             goto finish;
2147         }
2148
2149     } else if (sink_name) {
2150
2151         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
2152             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2153             goto finish;
2154         }
2155     }
2156
2157     flags =
2158         (corked ? PA_SINK_INPUT_START_CORKED : 0) |
2159         (no_remap ? PA_SINK_INPUT_NO_REMAP : 0) |
2160         (no_remix ? PA_SINK_INPUT_NO_REMIX : 0) |
2161         (fix_format ? PA_SINK_INPUT_FIX_FORMAT : 0) |
2162         (fix_rate ? PA_SINK_INPUT_FIX_RATE : 0) |
2163         (fix_channels ? PA_SINK_INPUT_FIX_CHANNELS : 0) |
2164         (no_move ? PA_SINK_INPUT_DONT_MOVE : 0) |
2165         (variable_rate ? PA_SINK_INPUT_VARIABLE_RATE : 0) |
2166         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2167         (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0) |
2168         (passthrough ? PA_SINK_INPUT_PASSTHROUGH : 0);
2169
2170     /* Only since protocol version 15 there's a separate muted_set
2171      * flag. For older versions we synthesize it here */
2172     muted_set = muted_set || muted;
2173
2174     s = playback_stream_new(c, sink, &ss, &map, formats, &attr, volume_set ? &volume : NULL, muted, muted_set, flags, p, adjust_latency, early_requests, relative_volume, syncid, &missing, &ret);
2175     /* We no longer own the formats idxset */
2176     formats = NULL;
2177
2178     CHECK_VALIDITY_GOTO(c->pstream, s, tag, ret, finish);
2179
2180     reply = reply_new(tag);
2181     pa_tagstruct_putu32(reply, s->index);
2182     pa_assert(s->sink_input);
2183     pa_tagstruct_putu32(reply, s->sink_input->index);
2184     pa_tagstruct_putu32(reply, missing);
2185
2186 #ifdef PROTOCOL_NATIVE_DEBUG
2187     pa_log("initial request is %u", missing);
2188 #endif
2189
2190     if (c->version >= 9) {
2191         /* Since 0.9.0 we support sending the buffer metrics back to the client */
2192
2193         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2194         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
2195         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
2196         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
2197     }
2198
2199     if (c->version >= 12) {
2200         /* Since 0.9.8 we support sending the chosen sample
2201          * spec/channel map/device/suspend status back to the
2202          * client */
2203
2204         pa_tagstruct_put_sample_spec(reply, &ss);
2205         pa_tagstruct_put_channel_map(reply, &map);
2206
2207         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
2208         pa_tagstruct_puts(reply, s->sink_input->sink->name);
2209
2210         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2211     }
2212
2213     if (c->version >= 13)
2214         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2215
2216     if (c->version >= 21) {
2217         /* Send back the format we negotiated */
2218         if (s->sink_input->format)
2219             pa_tagstruct_put_format_info(reply, s->sink_input->format);
2220         else {
2221             pa_format_info *f = pa_format_info_new();
2222             pa_tagstruct_put_format_info(reply, f);
2223             pa_format_info_free(f);
2224         }
2225     }
2226
2227     pa_pstream_send_tagstruct(c->pstream, reply);
2228
2229 finish:
2230     if (p)
2231         pa_proplist_free(p);
2232     if (formats)
2233         pa_idxset_free(formats, (pa_free_cb_t) pa_format_info_free);
2234 }
2235
2236 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2237     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2238     uint32_t channel;
2239
2240     pa_native_connection_assert_ref(c);
2241     pa_assert(t);
2242
2243     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2244         !pa_tagstruct_eof(t)) {
2245         protocol_error(c);
2246         return;
2247     }
2248
2249     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2250
2251     switch (command) {
2252
2253         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2254             playback_stream *s;
2255             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2256                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2257                 return;
2258             }
2259
2260             playback_stream_unlink(s);
2261             break;
2262         }
2263
2264         case PA_COMMAND_DELETE_RECORD_STREAM: {
2265             record_stream *s;
2266             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2267                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2268                 return;
2269             }
2270
2271             record_stream_unlink(s);
2272             break;
2273         }
2274
2275         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2276             upload_stream *s;
2277
2278             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2279                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2280                 return;
2281             }
2282
2283             upload_stream_unlink(s);
2284             break;
2285         }
2286
2287         default:
2288             pa_assert_not_reached();
2289     }
2290
2291     pa_pstream_send_simple_ack(c->pstream, tag);
2292 }
2293
2294 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2295     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2296     record_stream *s;
2297     pa_buffer_attr attr;
2298     uint32_t source_index;
2299     const char *name = NULL, *source_name;
2300     pa_sample_spec ss;
2301     pa_channel_map map;
2302     pa_tagstruct *reply;
2303     pa_source *source = NULL;
2304     pa_cvolume volume;
2305     bool
2306         corked = false,
2307         no_remap = false,
2308         no_remix = false,
2309         fix_format = false,
2310         fix_rate = false,
2311         fix_channels = false,
2312         no_move = false,
2313         variable_rate = false,
2314         muted = false,
2315         adjust_latency = false,
2316         peak_detect = false,
2317         early_requests = false,
2318         dont_inhibit_auto_suspend = false,
2319         volume_set = false,
2320         muted_set = false,
2321         fail_on_suspend = false,
2322         relative_volume = false,
2323         passthrough = false;
2324
2325     pa_source_output_flags_t flags = 0;
2326     pa_proplist *p = NULL;
2327     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2328     pa_sink_input *direct_on_input = NULL;
2329     int ret = PA_ERR_INVALID;
2330     uint8_t n_formats = 0;
2331     pa_format_info *format;
2332     pa_idxset *formats = NULL;
2333     uint32_t i;
2334
2335     pa_native_connection_assert_ref(c);
2336     pa_assert(t);
2337
2338     memset(&attr, 0, sizeof(attr));
2339
2340     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2341         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2342         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2343         pa_tagstruct_getu32(t, &source_index) < 0 ||
2344         pa_tagstruct_gets(t, &source_name) < 0 ||
2345         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2346         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2347         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2348
2349         protocol_error(c);
2350         goto finish;
2351     }
2352
2353     CHECK_VALIDITY_GOTO(c->pstream, c->authorized, tag, PA_ERR_ACCESS, finish);
2354     CHECK_VALIDITY_GOTO(c->pstream, !source_name || pa_namereg_is_valid_name_or_wildcard(source_name, PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID, finish);
2355     CHECK_VALIDITY_GOTO(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID, finish);
2356     CHECK_VALIDITY_GOTO(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID, finish);
2357
2358     p = pa_proplist_new();
2359
2360     if (name)
2361         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2362
2363     if (c->version >= 12) {
2364         /* Since 0.9.8 the user can ask for a couple of additional flags */
2365
2366         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2367             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2368             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2369             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2370             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2371             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2372             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2373
2374             protocol_error(c);
2375             goto finish;
2376         }
2377     }
2378
2379     if (c->version >= 13) {
2380
2381         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2382             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2383             pa_tagstruct_get_proplist(t, p) < 0 ||
2384             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2385
2386             protocol_error(c);
2387             goto finish;
2388         }
2389     }
2390
2391     if (c->version >= 14) {
2392
2393         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2394             protocol_error(c);
2395             goto finish;
2396         }
2397     }
2398
2399     if (c->version >= 15) {
2400
2401         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2402             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2403
2404             protocol_error(c);
2405             goto finish;
2406         }
2407     }
2408
2409     if (c->version >= 22) {
2410         /* For newer client versions (with per-source-output volumes), we try
2411          * to make the behaviour for playback and record streams the same. */
2412         volume_set = true;
2413
2414         if (pa_tagstruct_getu8(t, &n_formats) < 0) {
2415             protocol_error(c);
2416             goto finish;
2417         }
2418
2419         if (n_formats)
2420             formats = pa_idxset_new(NULL, NULL);
2421
2422         for (i = 0; i < n_formats; i++) {
2423             format = pa_format_info_new();
2424             if (pa_tagstruct_get_format_info(t, format) < 0) {
2425                 protocol_error(c);
2426                 goto finish;
2427             }
2428             pa_idxset_put(formats, format, NULL);
2429         }
2430
2431         if (pa_tagstruct_get_cvolume(t, &volume) < 0 ||
2432             pa_tagstruct_get_boolean(t, &muted) < 0 ||
2433             pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
2434             pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
2435             pa_tagstruct_get_boolean(t, &relative_volume) < 0 ||
2436             pa_tagstruct_get_boolean(t, &passthrough) < 0) {
2437
2438             protocol_error(c);
2439             goto finish;
2440         }
2441
2442         CHECK_VALIDITY_GOTO(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID, finish);
2443     }
2444
2445     if (n_formats == 0) {
2446         CHECK_VALIDITY_GOTO(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID, finish);
2447         CHECK_VALIDITY_GOTO(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID, finish);
2448         CHECK_VALIDITY_GOTO(c->pstream, c->version < 22 || (volume.channels == ss.channels), tag, PA_ERR_INVALID, finish);
2449         CHECK_VALIDITY_GOTO(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID, finish);
2450     } else {
2451         PA_IDXSET_FOREACH(format, formats, i) {
2452             CHECK_VALIDITY_GOTO(c->pstream, pa_format_info_valid(format), tag, PA_ERR_INVALID, finish);
2453         }
2454     }
2455
2456     if (!pa_tagstruct_eof(t)) {
2457         protocol_error(c);
2458         goto finish;
2459     }
2460
2461     if (source_index != PA_INVALID_INDEX) {
2462
2463         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2464             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2465             goto finish;
2466         }
2467
2468     } else if (source_name) {
2469
2470         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2471             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2472             goto finish;
2473         }
2474     }
2475
2476     if (direct_on_input_idx != PA_INVALID_INDEX) {
2477
2478         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2479             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2480             goto finish;
2481         }
2482     }
2483
2484     flags =
2485         (corked ? PA_SOURCE_OUTPUT_START_CORKED : 0) |
2486         (no_remap ? PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2487         (no_remix ? PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2488         (fix_format ? PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2489         (fix_rate ? PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2490         (fix_channels ? PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2491         (no_move ? PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2492         (variable_rate ? PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2493         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2494         (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0) |
2495         (passthrough ? PA_SOURCE_OUTPUT_PASSTHROUGH : 0);
2496
2497     s = record_stream_new(c, source, &ss, &map, formats, &attr, volume_set ? &volume : NULL, muted, muted_set, flags, p, adjust_latency, early_requests, relative_volume, peak_detect, direct_on_input, &ret);
2498
2499     CHECK_VALIDITY_GOTO(c->pstream, s, tag, ret, finish);
2500
2501     reply = reply_new(tag);
2502     pa_tagstruct_putu32(reply, s->index);
2503     pa_assert(s->source_output);
2504     pa_tagstruct_putu32(reply, s->source_output->index);
2505
2506     if (c->version >= 9) {
2507         /* Since 0.9 we support sending the buffer metrics back to the client */
2508
2509         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2510         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2511     }
2512
2513     if (c->version >= 12) {
2514         /* Since 0.9.8 we support sending the chosen sample
2515          * spec/channel map/device/suspend status back to the
2516          * client */
2517
2518         pa_tagstruct_put_sample_spec(reply, &ss);
2519         pa_tagstruct_put_channel_map(reply, &map);
2520
2521         pa_tagstruct_putu32(reply, s->source_output->source->index);
2522         pa_tagstruct_puts(reply, s->source_output->source->name);
2523
2524         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2525     }
2526
2527     if (c->version >= 13)
2528         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2529
2530     if (c->version >= 22) {
2531         /* Send back the format we negotiated */
2532         if (s->source_output->format)
2533             pa_tagstruct_put_format_info(reply, s->source_output->format);
2534         else {
2535             pa_format_info *f = pa_format_info_new();
2536             pa_tagstruct_put_format_info(reply, f);
2537             pa_format_info_free(f);
2538         }
2539     }
2540
2541     pa_pstream_send_tagstruct(c->pstream, reply);
2542
2543 finish:
2544     if (p)
2545         pa_proplist_free(p);
2546     if (formats)
2547         pa_idxset_free(formats, (pa_free_cb_t) pa_format_info_free);
2548 }
2549
2550 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2551     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2552     int ret;
2553
2554     pa_native_connection_assert_ref(c);
2555     pa_assert(t);
2556
2557     if (!pa_tagstruct_eof(t)) {
2558         protocol_error(c);
2559         return;
2560     }
2561
2562     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2563     ret = pa_core_exit(c->protocol->core, false, 0);
2564     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2565
2566     pa_log_debug("Client %s asks us to terminate.", pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY)));
2567
2568     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2569 }
2570
2571 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2572     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2573     const void*cookie;
2574     pa_tagstruct *reply;
2575     bool shm_on_remote = false, do_shm;
2576
2577     pa_native_connection_assert_ref(c);
2578     pa_assert(t);
2579
2580     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2581         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2582         !pa_tagstruct_eof(t)) {
2583         protocol_error(c);
2584         return;
2585     }
2586
2587     /* Minimum supported version */
2588     if (c->version < 8) {
2589         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2590         return;
2591     }
2592
2593     /* Starting with protocol version 13 the MSB of the version tag
2594        reflects if shm is available for this pa_native_connection or
2595        not. */
2596     if (c->version >= 13) {
2597         shm_on_remote = !!(c->version & 0x80000000U);
2598         c->version &= 0x7FFFFFFFU;
2599     }
2600
2601     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2602
2603     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2604
2605     if (!c->authorized) {
2606         bool success = false;
2607
2608 #ifdef HAVE_CREDS
2609         const pa_creds *creds;
2610
2611         if ((creds = pa_pdispatch_creds(pd))) {
2612             if (creds->uid == getuid())
2613                 success = true;
2614             else if (c->options->auth_group) {
2615                 int r;
2616                 gid_t gid;
2617
2618                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2619                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2620                 else if (gid == creds->gid)
2621                     success = true;
2622
2623                 if (!success) {
2624                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2625                         pa_log_warn("Failed to check group membership.");
2626                     else if (r > 0)
2627                         success = true;
2628                 }
2629             }
2630
2631             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2632                         (unsigned long) creds->uid,
2633                         (unsigned long) creds->gid,
2634                         (int) success);
2635         }
2636 #endif
2637
2638         if (!success && c->options->auth_cookie) {
2639             const uint8_t *ac;
2640
2641             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2642                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2643                     success = true;
2644         }
2645
2646         if (!success) {
2647             pa_log_warn("Denied access to client with invalid authorization data.");
2648             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2649             return;
2650         }
2651
2652         c->authorized = true;
2653         if (c->auth_timeout_event) {
2654             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2655             c->auth_timeout_event = NULL;
2656         }
2657     }
2658
2659     /* Enable shared memory support if possible */
2660     do_shm =
2661         pa_mempool_is_shared(c->protocol->core->mempool) &&
2662         c->is_local;
2663
2664     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2665
2666     if (do_shm)
2667         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2668             do_shm = false;
2669
2670 #ifdef HAVE_CREDS
2671     if (do_shm) {
2672         /* Only enable SHM if both sides are owned by the same
2673          * user. This is a security measure because otherwise data
2674          * private to the user might leak. */
2675
2676         const pa_creds *creds;
2677         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2678             do_shm = false;
2679     }
2680 #endif
2681
2682     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2683     pa_pstream_enable_shm(c->pstream, do_shm);
2684
2685     reply = reply_new(tag);
2686     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2687
2688 #ifdef HAVE_CREDS
2689 {
2690     /* SHM support is only enabled after both sides made sure they are the same user. */
2691
2692     pa_creds ucred;
2693
2694     ucred.uid = getuid();
2695     ucred.gid = getgid();
2696
2697     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2698 }
2699 #else
2700     pa_pstream_send_tagstruct(c->pstream, reply);
2701 #endif
2702 }
2703
2704 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2705     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2706     const char *name = NULL;
2707     pa_proplist *p;
2708     pa_tagstruct *reply;
2709
2710     pa_native_connection_assert_ref(c);
2711     pa_assert(t);
2712
2713     p = pa_proplist_new();
2714
2715     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2716         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2717         !pa_tagstruct_eof(t)) {
2718
2719         protocol_error(c);
2720         pa_proplist_free(p);
2721         return;
2722     }
2723
2724     if (name)
2725         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2726             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2727             pa_proplist_free(p);
2728             return;
2729         }
2730
2731     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2732     pa_proplist_free(p);
2733
2734     reply = reply_new(tag);
2735
2736     if (c->version >= 13)
2737         pa_tagstruct_putu32(reply, c->client->index);
2738
2739     pa_pstream_send_tagstruct(c->pstream, reply);
2740 }
2741
2742 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2743     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2744     const char *name;
2745     uint32_t idx = PA_IDXSET_INVALID;
2746
2747     pa_native_connection_assert_ref(c);
2748     pa_assert(t);
2749
2750     if (pa_tagstruct_gets(t, &name) < 0 ||
2751         !pa_tagstruct_eof(t)) {
2752         protocol_error(c);
2753         return;
2754     }
2755
2756     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2757     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_LOOKUP_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
2758
2759     if (command == PA_COMMAND_LOOKUP_SINK) {
2760         pa_sink *sink;
2761         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2762             idx = sink->index;
2763     } else {
2764         pa_source *source;
2765         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2766         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2767             idx = source->index;
2768     }
2769
2770     if (idx == PA_IDXSET_INVALID)
2771         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2772     else {
2773         pa_tagstruct *reply;
2774         reply = reply_new(tag);
2775         pa_tagstruct_putu32(reply, idx);
2776         pa_pstream_send_tagstruct(c->pstream, reply);
2777     }
2778 }
2779
2780 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2781     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2782     uint32_t idx;
2783     playback_stream *s;
2784
2785     pa_native_connection_assert_ref(c);
2786     pa_assert(t);
2787
2788     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2789         !pa_tagstruct_eof(t)) {
2790         protocol_error(c);
2791         return;
2792     }
2793
2794     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2795     s = pa_idxset_get_by_index(c->output_streams, idx);
2796     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2797     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2798
2799     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2800 }
2801
2802 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2803     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2804     pa_tagstruct *reply;
2805     const pa_mempool_stat *stat;
2806
2807     pa_native_connection_assert_ref(c);
2808     pa_assert(t);
2809
2810     if (!pa_tagstruct_eof(t)) {
2811         protocol_error(c);
2812         return;
2813     }
2814
2815     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2816
2817     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2818
2819     reply = reply_new(tag);
2820     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2821     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2822     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2823     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2824     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2825     pa_pstream_send_tagstruct(c->pstream, reply);
2826 }
2827
2828 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2829     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2830     pa_tagstruct *reply;
2831     playback_stream *s;
2832     struct timeval tv, now;
2833     uint32_t idx;
2834
2835     pa_native_connection_assert_ref(c);
2836     pa_assert(t);
2837
2838     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2839         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2840         !pa_tagstruct_eof(t)) {
2841         protocol_error(c);
2842         return;
2843     }
2844
2845     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2846     s = pa_idxset_get_by_index(c->output_streams, idx);
2847     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2848     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2849
2850     /* Get an atomic snapshot of all timing parameters */
2851     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2852
2853     reply = reply_new(tag);
2854     pa_tagstruct_put_usec(reply,
2855                           s->current_sink_latency +
2856                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2857     pa_tagstruct_put_usec(reply, 0);
2858     pa_tagstruct_put_boolean(reply,
2859                              s->playing_for > 0 &&
2860                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2861                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2862     pa_tagstruct_put_timeval(reply, &tv);
2863     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2864     pa_tagstruct_puts64(reply, s->write_index);
2865     pa_tagstruct_puts64(reply, s->read_index);
2866
2867     if (c->version >= 13) {
2868         pa_tagstruct_putu64(reply, s->underrun_for);
2869         pa_tagstruct_putu64(reply, s->playing_for);
2870     }
2871
2872     pa_pstream_send_tagstruct(c->pstream, reply);
2873 }
2874
2875 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2876     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2877     pa_tagstruct *reply;
2878     record_stream *s;
2879     struct timeval tv, now;
2880     uint32_t idx;
2881
2882     pa_native_connection_assert_ref(c);
2883     pa_assert(t);
2884
2885     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2886         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2887         !pa_tagstruct_eof(t)) {
2888         protocol_error(c);
2889         return;
2890     }
2891
2892     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2893     s = pa_idxset_get_by_index(c->record_streams, idx);
2894     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2895
2896     /* Get an atomic snapshot of all timing parameters */
2897     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2898
2899     reply = reply_new(tag);
2900     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2901     pa_tagstruct_put_usec(reply,
2902                           s->current_source_latency +
2903                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->source->sample_spec));
2904     pa_tagstruct_put_boolean(reply,
2905                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2906                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2907     pa_tagstruct_put_timeval(reply, &tv);
2908     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2909     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2910     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2911     pa_pstream_send_tagstruct(c->pstream, reply);
2912 }
2913
2914 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2915     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2916     upload_stream *s;
2917     uint32_t length;
2918     const char *name = NULL;
2919     pa_sample_spec ss;
2920     pa_channel_map map;
2921     pa_tagstruct *reply;
2922     pa_proplist *p;
2923
2924     pa_native_connection_assert_ref(c);
2925     pa_assert(t);
2926
2927     if (pa_tagstruct_gets(t, &name) < 0 ||
2928         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2929         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2930         pa_tagstruct_getu32(t, &length) < 0) {
2931         protocol_error(c);
2932         return;
2933     }
2934
2935     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2936     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2937     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2938     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2939     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2940     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2941
2942     p = pa_proplist_new();
2943
2944     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2945         !pa_tagstruct_eof(t)) {
2946
2947         protocol_error(c);
2948         pa_proplist_free(p);
2949         return;
2950     }
2951
2952     if (c->version < 13)
2953         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2954     else if (!name)
2955         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2956             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2957
2958     if (!name || !pa_namereg_is_valid_name(name)) {
2959         pa_proplist_free(p);
2960         CHECK_VALIDITY(c->pstream, false, tag, PA_ERR_INVALID);
2961     }
2962
2963     s = upload_stream_new(c, &ss, &map, name, length, p);
2964     pa_proplist_free(p);
2965
2966     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2967
2968     reply = reply_new(tag);
2969     pa_tagstruct_putu32(reply, s->index);
2970     pa_tagstruct_putu32(reply, length);
2971     pa_pstream_send_tagstruct(c->pstream, reply);
2972 }
2973
2974 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2975     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2976     uint32_t channel;
2977     upload_stream *s;
2978     uint32_t idx;
2979
2980     pa_native_connection_assert_ref(c);
2981     pa_assert(t);
2982
2983     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2984         !pa_tagstruct_eof(t)) {
2985         protocol_error(c);
2986         return;
2987     }
2988
2989     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2990
2991     s = pa_idxset_get_by_index(c->output_streams, channel);
2992     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2993     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2994
2995     if (!s->memchunk.memblock)
2996         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2997     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2998         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2999     else
3000         pa_pstream_send_simple_ack(c->pstream, tag);
3001
3002     upload_stream_unlink(s);
3003 }
3004
3005 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3006     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3007     uint32_t sink_index;
3008     pa_volume_t volume;
3009     pa_sink *sink;
3010     const char *name, *sink_name;
3011     uint32_t idx;
3012     pa_proplist *p;
3013     pa_tagstruct *reply;
3014
3015     pa_native_connection_assert_ref(c);
3016     pa_assert(t);
3017
3018     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3019
3020     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
3021         pa_tagstruct_gets(t, &sink_name) < 0 ||
3022         pa_tagstruct_getu32(t, &volume) < 0 ||
3023         pa_tagstruct_gets(t, &name) < 0) {
3024         protocol_error(c);
3025         return;
3026     }
3027
3028     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID);
3029     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
3030     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3031     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3032
3033     if (sink_index != PA_INVALID_INDEX)
3034         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
3035     else
3036         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
3037
3038     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3039
3040     p = pa_proplist_new();
3041
3042     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
3043         !pa_tagstruct_eof(t)) {
3044         protocol_error(c);
3045         pa_proplist_free(p);
3046         return;
3047     }
3048
3049     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
3050
3051     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
3052         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3053         pa_proplist_free(p);
3054         return;
3055     }
3056
3057     pa_proplist_free(p);
3058
3059     reply = reply_new(tag);
3060
3061     if (c->version >= 13)
3062         pa_tagstruct_putu32(reply, idx);
3063
3064     pa_pstream_send_tagstruct(c->pstream, reply);
3065 }
3066
3067 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3068     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3069     const char *name;
3070
3071     pa_native_connection_assert_ref(c);
3072     pa_assert(t);
3073
3074     if (pa_tagstruct_gets(t, &name) < 0 ||
3075         !pa_tagstruct_eof(t)) {
3076         protocol_error(c);
3077         return;
3078     }
3079
3080     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3081     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3082
3083     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
3084         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3085         return;
3086     }
3087
3088     pa_pstream_send_simple_ack(c->pstream, tag);
3089 }
3090
3091 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
3092     pa_assert(c);
3093     pa_assert(fixed);
3094     pa_assert(original);
3095
3096     *fixed = *original;
3097
3098     if (c->version < 12) {
3099         /* Before protocol version 12 we didn't support S32 samples,
3100          * so we need to lie about this to the client */
3101
3102         if (fixed->format == PA_SAMPLE_S32LE)
3103             fixed->format = PA_SAMPLE_FLOAT32LE;
3104         if (fixed->format == PA_SAMPLE_S32BE)
3105             fixed->format = PA_SAMPLE_FLOAT32BE;
3106     }
3107
3108     if (c->version < 15) {
3109         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
3110             fixed->format = PA_SAMPLE_FLOAT32LE;
3111         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
3112             fixed->format = PA_SAMPLE_FLOAT32BE;
3113     }
3114 }
3115
3116 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
3117     pa_sample_spec fixed_ss;
3118
3119     pa_assert(t);
3120     pa_sink_assert_ref(sink);
3121
3122     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
3123
3124     pa_tagstruct_put(
3125         t,
3126         PA_TAG_U32, sink->index,
3127         PA_TAG_STRING, sink->name,
3128         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
3129         PA_TAG_SAMPLE_SPEC, &fixed_ss,
3130         PA_TAG_CHANNEL_MAP, &sink->channel_map,
3131         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
3132         PA_TAG_CVOLUME, pa_sink_get_volume(sink, false),
3133         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, false),
3134         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
3135         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
3136         PA_TAG_USEC, pa_sink_get_latency(sink),
3137         PA_TAG_STRING, sink->driver,
3138         PA_TAG_U32, sink->flags & PA_SINK_CLIENT_FLAGS_MASK,
3139         PA_TAG_INVALID);
3140
3141     if (c->version >= 13) {
3142         pa_tagstruct_put_proplist(t, sink->proplist);
3143         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
3144     }
3145
3146     if (c->version >= 15) {
3147         pa_tagstruct_put_volume(t, sink->base_volume);
3148         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
3149             pa_log_error("Internal sink state is invalid.");
3150         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
3151         pa_tagstruct_putu32(t, sink->n_volume_steps);
3152         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
3153     }
3154
3155     if (c->version >= 16) {
3156         void *state;
3157         pa_device_port *p;
3158
3159         pa_tagstruct_putu32(t, pa_hashmap_size(sink->ports));
3160
3161         PA_HASHMAP_FOREACH(p, sink->ports, state) {
3162             pa_tagstruct_puts(t, p->name);
3163             pa_tagstruct_puts(t, p->description);
3164             pa_tagstruct_putu32(t, p->priority);
3165             if (c->version >= 24)
3166                 pa_tagstruct_putu32(t, p->available);
3167         }
3168
3169         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
3170     }
3171
3172     if (c->version >= 21) {
3173         uint32_t i;
3174         pa_format_info *f;
3175         pa_idxset *formats = pa_sink_get_formats(sink);
3176
3177         pa_tagstruct_putu8(t, (uint8_t) pa_idxset_size(formats));
3178         PA_IDXSET_FOREACH(f, formats, i) {
3179             pa_tagstruct_put_format_info(t, f);
3180         }
3181
3182         pa_idxset_free(formats, (pa_free_cb_t) pa_format_info_free);
3183     }
3184 }
3185
3186 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
3187     pa_sample_spec fixed_ss;
3188
3189     pa_assert(t);
3190     pa_source_assert_ref(source);
3191
3192     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
3193
3194     pa_tagstruct_put(
3195         t,
3196         PA_TAG_U32, source->index,
3197         PA_TAG_STRING, source->name,
3198         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
3199         PA_TAG_SAMPLE_SPEC, &fixed_ss,
3200         PA_TAG_CHANNEL_MAP, &source->channel_map,
3201         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
3202         PA_TAG_CVOLUME, pa_source_get_volume(source, false),
3203         PA_TAG_BOOLEAN, pa_source_get_mute(source, false),
3204         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
3205         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
3206         PA_TAG_USEC, pa_source_get_latency(source),
3207         PA_TAG_STRING, source->driver,
3208         PA_TAG_U32, source->flags & PA_SOURCE_CLIENT_FLAGS_MASK,
3209         PA_TAG_INVALID);
3210
3211     if (c->version >= 13) {
3212         pa_tagstruct_put_proplist(t, source->proplist);
3213         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
3214     }
3215
3216     if (c->version >= 15) {
3217         pa_tagstruct_put_volume(t, source->base_volume);
3218         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
3219             pa_log_error("Internal source state is invalid.");
3220         pa_tagstruct_putu32(t, pa_source_get_state(source));
3221         pa_tagstruct_putu32(t, source->n_volume_steps);
3222         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
3223     }
3224
3225     if (c->version >= 16) {
3226         void *state;
3227         pa_device_port *p;
3228
3229         pa_tagstruct_putu32(t, pa_hashmap_size(source->ports));
3230
3231         PA_HASHMAP_FOREACH(p, source->ports, state) {
3232             pa_tagstruct_puts(t, p->name);
3233             pa_tagstruct_puts(t, p->description);
3234             pa_tagstruct_putu32(t, p->priority);
3235             if (c->version >= 24)
3236                 pa_tagstruct_putu32(t, p->available);
3237         }
3238
3239         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
3240     }
3241
3242     if (c->version >= 22) {
3243         uint32_t i;
3244         pa_format_info *f;
3245         pa_idxset *formats = pa_source_get_formats(source);
3246
3247         pa_tagstruct_putu8(t, (uint8_t) pa_idxset_size(formats));
3248         PA_IDXSET_FOREACH(f, formats, i) {
3249             pa_tagstruct_put_format_info(t, f);
3250         }
3251
3252         pa_idxset_free(formats, (pa_free_cb_t) pa_format_info_free);
3253     }
3254 }
3255
3256 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
3257     pa_assert(t);
3258     pa_assert(client);
3259
3260     pa_tagstruct_putu32(t, client->index);
3261     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
3262     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
3263     pa_tagstruct_puts(t, client->driver);
3264
3265     if (c->version >= 13)
3266         pa_tagstruct_put_proplist(t, client->proplist);
3267 }
3268
3269 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
3270     void *state = NULL;
3271     pa_card_profile *p;
3272     pa_device_port *port;
3273
3274     pa_assert(t);
3275     pa_assert(card);
3276
3277     pa_tagstruct_putu32(t, card->index);
3278     pa_tagstruct_puts(t, card->name);
3279     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
3280     pa_tagstruct_puts(t, card->driver);
3281
3282     pa_tagstruct_putu32(t, pa_hashmap_size(card->profiles));
3283
3284     PA_HASHMAP_FOREACH(p, card->profiles, state) {
3285         pa_tagstruct_puts(t, p->name);
3286         pa_tagstruct_puts(t, p->description);
3287         pa_tagstruct_putu32(t, p->n_sinks);
3288         pa_tagstruct_putu32(t, p->n_sources);
3289         pa_tagstruct_putu32(t, p->priority);
3290
3291         if (c->version >= 29)
3292             pa_tagstruct_putu32(t, (p->available != PA_AVAILABLE_NO));
3293     }
3294
3295     pa_tagstruct_puts(t, card->active_profile->name);
3296     pa_tagstruct_put_proplist(t, card->proplist);
3297
3298     if (c->version < 26)
3299         return;
3300
3301     pa_tagstruct_putu32(t, pa_hashmap_size(card->ports));
3302
3303     PA_HASHMAP_FOREACH(port, card->ports, state) {
3304         void *state2;
3305
3306         pa_tagstruct_puts(t, port->name);
3307         pa_tagstruct_puts(t, port->description);
3308         pa_tagstruct_putu32(t, port->priority);
3309         pa_tagstruct_putu32(t, port->available);
3310         pa_tagstruct_putu8(t, port->direction);
3311         pa_tagstruct_put_proplist(t, port->proplist);
3312
3313         pa_tagstruct_putu32(t, pa_hashmap_size(port->profiles));
3314
3315         PA_HASHMAP_FOREACH(p, port->profiles, state2)
3316             pa_tagstruct_puts(t, p->name);
3317
3318         if (c->version >= 27)
3319             pa_tagstruct_puts64(t, port->latency_offset);
3320     }
3321 }
3322
3323 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
3324     pa_assert(t);
3325     pa_assert(module);
3326
3327     pa_tagstruct_putu32(t, module->index);
3328     pa_tagstruct_puts(t, module->name);
3329     pa_tagstruct_puts(t, module->argument);
3330     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
3331
3332     if (c->version < 15)
3333         pa_tagstruct_put_boolean(t, false); /* autoload is obsolete */
3334
3335     if (c->version >= 15)
3336         pa_tagstruct_put_proplist(t, module->proplist);
3337 }
3338
3339 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
3340     pa_sample_spec fixed_ss;
3341     pa_usec_t sink_latency;
3342     pa_cvolume v;
3343     bool has_volume = false;
3344
3345     pa_assert(t);
3346     pa_sink_input_assert_ref(s);
3347
3348     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3349
3350     has_volume = pa_sink_input_is_volume_readable(s);
3351     if (has_volume)
3352         pa_sink_input_get_volume(s, &v, true);
3353     else
3354         pa_cvolume_reset(&v, fixed_ss.channels);
3355
3356     pa_tagstruct_putu32(t, s->index);
3357     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3358     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3359     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3360     pa_tagstruct_putu32(t, s->sink->index);
3361     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3362     pa_tagstruct_put_channel_map(t, &s->channel_map);
3363     pa_tagstruct_put_cvolume(t, &v);
3364     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3365     pa_tagstruct_put_usec(t, sink_latency);
3366     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3367     pa_tagstruct_puts(t, s->driver);
3368     if (c->version >= 11)
3369         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3370     if (c->version >= 13)
3371         pa_tagstruct_put_proplist(t, s->proplist);
3372     if (c->version >= 19)
3373         pa_tagstruct_put_boolean(t, (pa_sink_input_get_state(s) == PA_SINK_INPUT_CORKED));
3374     if (c->version >= 20) {
3375         pa_tagstruct_put_boolean(t, has_volume);
3376         pa_tagstruct_put_boolean(t, s->volume_writable);
3377     }
3378     if (c->version >= 21)
3379         pa_tagstruct_put_format_info(t, s->format);
3380 }
3381
3382 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3383     pa_sample_spec fixed_ss;
3384     pa_usec_t source_latency;
3385     pa_cvolume v;
3386     bool has_volume = false;
3387
3388     pa_assert(t);
3389     pa_source_output_assert_ref(s);
3390
3391     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3392
3393     has_volume = pa_source_output_is_volume_readable(s);
3394     if (has_volume)
3395         pa_source_output_get_volume(s, &v, true);
3396     else
3397         pa_cvolume_reset(&v, fixed_ss.channels);
3398
3399     pa_tagstruct_putu32(t, s->index);
3400     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3401     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3402     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3403     pa_tagstruct_putu32(t, s->source->index);
3404     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3405     pa_tagstruct_put_channel_map(t, &s->channel_map);
3406     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3407     pa_tagstruct_put_usec(t, source_latency);
3408     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3409     pa_tagstruct_puts(t, s->driver);
3410     if (c->version >= 13)
3411         pa_tagstruct_put_proplist(t, s->proplist);
3412     if (c->version >= 19)
3413         pa_tagstruct_put_boolean(t, (pa_source_output_get_state(s) == PA_SOURCE_OUTPUT_CORKED));
3414     if (c->version >= 22) {
3415         pa_tagstruct_put_cvolume(t, &v);
3416         pa_tagstruct_put_boolean(t, pa_source_output_get_mute(s));
3417         pa_tagstruct_put_boolean(t, has_volume);
3418         pa_tagstruct_put_boolean(t, s->volume_writable);
3419         pa_tagstruct_put_format_info(t, s->format);
3420     }
3421 }
3422
3423 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3424     pa_sample_spec fixed_ss;
3425     pa_cvolume v;
3426
3427     pa_assert(t);
3428     pa_assert(e);
3429
3430     if (e->memchunk.memblock)
3431         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3432     else
3433         memset(&fixed_ss, 0, sizeof(fixed_ss));
3434
3435     pa_tagstruct_putu32(t, e->index);
3436     pa_tagstruct_puts(t, e->name);
3437
3438     if (e->volume_is_set)
3439         v = e->volume;
3440     else
3441         pa_cvolume_init(&v);
3442
3443     pa_tagstruct_put_cvolume(t, &v);
3444     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3445     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3446     pa_tagstruct_put_channel_map(t, &e->channel_map);
3447     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3448     pa_tagstruct_put_boolean(t, e->lazy);
3449     pa_tagstruct_puts(t, e->filename);
3450
3451     if (c->version >= 13)
3452         pa_tagstruct_put_proplist(t, e->proplist);
3453 }
3454
3455 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3456     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3457     uint32_t idx;
3458     pa_sink *sink = NULL;
3459     pa_source *source = NULL;
3460     pa_client *client = NULL;
3461     pa_card *card = NULL;
3462     pa_module *module = NULL;
3463     pa_sink_input *si = NULL;
3464     pa_source_output *so = NULL;
3465     pa_scache_entry *sce = NULL;
3466     const char *name = NULL;
3467     pa_tagstruct *reply;
3468
3469     pa_native_connection_assert_ref(c);
3470     pa_assert(t);
3471
3472     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3473         (command != PA_COMMAND_GET_CLIENT_INFO &&
3474          command != PA_COMMAND_GET_MODULE_INFO &&
3475          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3476          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3477          pa_tagstruct_gets(t, &name) < 0) ||
3478         !pa_tagstruct_eof(t)) {
3479         protocol_error(c);
3480         return;
3481     }
3482
3483     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3484     CHECK_VALIDITY(c->pstream, !name ||
3485                    (command == PA_COMMAND_GET_SINK_INFO &&
3486                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SINK)) ||
3487                    (command == PA_COMMAND_GET_SOURCE_INFO &&
3488                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SOURCE)) ||
3489                    pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3490     CHECK_VALIDITY(c->pstream, command == PA_COMMAND_GET_SINK_INFO ||
3491                    command == PA_COMMAND_GET_SOURCE_INFO ||
3492                    (idx != PA_INVALID_INDEX || name), tag, PA_ERR_INVALID);
3493     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3494
3495     if (command == PA_COMMAND_GET_SINK_INFO) {
3496         if (idx != PA_INVALID_INDEX)
3497             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3498         else
3499             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3500     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3501         if (idx != PA_INVALID_INDEX)
3502             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3503         else
3504             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3505     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3506         if (idx != PA_INVALID_INDEX)
3507             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3508         else
3509             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3510     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3511         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3512     else if (command == PA_COMMAND_GET_MODULE_INFO)
3513         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3514     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3515         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3516     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3517         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3518     else {
3519         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3520         if (idx != PA_INVALID_INDEX)
3521             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3522         else
3523             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3524     }
3525
3526     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3527         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3528         return;
3529     }
3530
3531     reply = reply_new(tag);
3532     if (sink)
3533         sink_fill_tagstruct(c, reply, sink);
3534     else if (source)
3535         source_fill_tagstruct(c, reply, source);
3536     else if (client)
3537         client_fill_tagstruct(c, reply, client);
3538     else if (card)
3539         card_fill_tagstruct(c, reply, card);
3540     else if (module)
3541         module_fill_tagstruct(c, reply, module);
3542     else if (si)
3543         sink_input_fill_tagstruct(c, reply, si);
3544     else if (so)
3545         source_output_fill_tagstruct(c, reply, so);
3546     else
3547         scache_fill_tagstruct(c, reply, sce);
3548     pa_pstream_send_tagstruct(c->pstream, reply);
3549 }
3550
3551 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3552     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3553     pa_idxset *i;
3554     uint32_t idx;
3555     void *p;
3556     pa_tagstruct *reply;
3557
3558     pa_native_connection_assert_ref(c);
3559     pa_assert(t);
3560
3561     if (!pa_tagstruct_eof(t)) {
3562         protocol_error(c);
3563         return;
3564     }
3565
3566     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3567
3568     reply = reply_new(tag);
3569
3570     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3571         i = c->protocol->core->sinks;
3572     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3573         i = c->protocol->core->sources;
3574     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3575         i = c->protocol->core->clients;
3576     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3577         i = c->protocol->core->cards;
3578     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3579         i = c->protocol->core->modules;
3580     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3581         i = c->protocol->core->sink_inputs;
3582     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3583         i = c->protocol->core->source_outputs;
3584     else {
3585         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3586         i = c->protocol->core->scache;
3587     }
3588
3589     if (i) {
3590         PA_IDXSET_FOREACH(p, i, idx) {
3591             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3592                 sink_fill_tagstruct(c, reply, p);
3593             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3594                 source_fill_tagstruct(c, reply, p);
3595             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3596                 client_fill_tagstruct(c, reply, p);
3597             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3598                 card_fill_tagstruct(c, reply, p);
3599             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3600                 module_fill_tagstruct(c, reply, p);
3601             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3602                 sink_input_fill_tagstruct(c, reply, p);
3603             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3604                 source_output_fill_tagstruct(c, reply, p);
3605             else {
3606                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3607                 scache_fill_tagstruct(c, reply, p);
3608             }
3609         }
3610     }
3611
3612     pa_pstream_send_tagstruct(c->pstream, reply);
3613 }
3614
3615 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3616     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3617     pa_tagstruct *reply;
3618     pa_sink *def_sink;
3619     pa_source *def_source;
3620     pa_sample_spec fixed_ss;
3621     char *h, *u;
3622
3623     pa_native_connection_assert_ref(c);
3624     pa_assert(t);
3625
3626     if (!pa_tagstruct_eof(t)) {
3627         protocol_error(c);
3628         return;
3629     }
3630
3631     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3632
3633     reply = reply_new(tag);
3634     pa_tagstruct_puts(reply, PACKAGE_NAME);
3635     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3636
3637     u = pa_get_user_name_malloc();
3638     pa_tagstruct_puts(reply, u);
3639     pa_xfree(u);
3640
3641     h = pa_get_host_name_malloc();
3642     pa_tagstruct_puts(reply, h);
3643     pa_xfree(h);
3644
3645     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3646     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3647
3648     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3649     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3650     def_source = pa_namereg_get_default_source(c->protocol->core);
3651     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3652
3653     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3654
3655     if (c->version >= 15)
3656         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3657
3658     pa_pstream_send_tagstruct(c->pstream, reply);
3659 }
3660
3661 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3662     pa_tagstruct *t;
3663     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3664
3665     pa_native_connection_assert_ref(c);
3666
3667     t = pa_tagstruct_new(NULL, 0);
3668     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3669     pa_tagstruct_putu32(t, (uint32_t) -1);
3670     pa_tagstruct_putu32(t, e);
3671     pa_tagstruct_putu32(t, idx);
3672     pa_pstream_send_tagstruct(c->pstream, t);
3673 }
3674
3675 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3676     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3677     pa_subscription_mask_t m;
3678
3679     pa_native_connection_assert_ref(c);
3680     pa_assert(t);
3681
3682     if (pa_tagstruct_getu32(t, &m) < 0 ||
3683         !pa_tagstruct_eof(t)) {
3684         protocol_error(c);
3685         return;
3686     }
3687
3688     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3689     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3690
3691     if (c->subscription)
3692         pa_subscription_free(c->subscription);
3693
3694     if (m != 0) {
3695         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3696         pa_assert(c->subscription);
3697     } else
3698         c->subscription = NULL;
3699
3700     pa_pstream_send_simple_ack(c->pstream, tag);
3701 }
3702
3703 static void command_set_volume(
3704         pa_pdispatch *pd,
3705         uint32_t command,
3706         uint32_t tag,
3707         pa_tagstruct *t,
3708         void *userdata) {
3709
3710     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3711     uint32_t idx;
3712     pa_cvolume volume;
3713     pa_sink *sink = NULL;
3714     pa_source *source = NULL;
3715     pa_sink_input *si = NULL;
3716     pa_source_output *so = NULL;
3717     const char *name = NULL;
3718     const char *client_name;
3719
3720     pa_native_connection_assert_ref(c);
3721     pa_assert(t);
3722
3723     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3724         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3725         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3726         pa_tagstruct_get_cvolume(t, &volume) ||
3727         !pa_tagstruct_eof(t)) {
3728         protocol_error(c);
3729         return;
3730     }
3731
3732     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3733     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_VOLUME ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3734     CHECK_VALIDITY(c->pstream, (idx != PA_INVALID_INDEX) ^ (name != NULL), tag, PA_ERR_INVALID);
3735     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3736
3737     switch (command) {
3738
3739         case PA_COMMAND_SET_SINK_VOLUME:
3740             if (idx != PA_INVALID_INDEX)
3741                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3742             else
3743                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3744             break;
3745
3746         case PA_COMMAND_SET_SOURCE_VOLUME:
3747             if (idx != PA_INVALID_INDEX)
3748                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3749             else
3750                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3751             break;
3752
3753         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3754             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3755             break;
3756
3757         case PA_COMMAND_SET_SOURCE_OUTPUT_VOLUME:
3758             so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3759             break;
3760
3761         default:
3762             pa_assert_not_reached();
3763     }
3764
3765     CHECK_VALIDITY(c->pstream, si || so || sink || source, tag, PA_ERR_NOENTITY);
3766
3767     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3768
3769     if (sink) {
3770         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &sink->sample_spec), tag, PA_ERR_INVALID);
3771
3772         pa_log_debug("Client %s changes volume of sink %s.", client_name, sink->name);
3773         pa_sink_set_volume(sink, &volume, true, true);
3774     } else if (source) {
3775         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &source->sample_spec), tag, PA_ERR_INVALID);
3776
3777         pa_log_debug("Client %s changes volume of source %s.", client_name, source->name);
3778         pa_source_set_volume(source, &volume, true, true);
3779     } else if (si) {
3780         CHECK_VALIDITY(c->pstream, si->volume_writable, tag, PA_ERR_BADSTATE);
3781         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &si->sample_spec), tag, PA_ERR_INVALID);
3782
3783         pa_log_debug("Client %s changes volume of sink input %s.",
3784                      client_name,
3785                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3786         pa_sink_input_set_volume(si, &volume, true, true);
3787     } else if (so) {
3788         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &so->sample_spec), tag, PA_ERR_INVALID);
3789
3790         pa_log_debug("Client %s changes volume of source output %s.",
3791                      client_name,
3792                      pa_strnull(pa_proplist_gets(so->proplist, PA_PROP_MEDIA_NAME)));
3793         pa_source_output_set_volume(so, &volume, true, true);
3794     }
3795
3796     pa_pstream_send_simple_ack(c->pstream, tag);
3797 }
3798
3799 static void command_set_mute(
3800         pa_pdispatch *pd,
3801         uint32_t command,
3802         uint32_t tag,
3803         pa_tagstruct *t,
3804         void *userdata) {
3805
3806     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3807     uint32_t idx;
3808     bool mute;
3809     pa_sink *sink = NULL;
3810     pa_source *source = NULL;
3811     pa_sink_input *si = NULL;
3812     pa_source_output *so = NULL;
3813     const char *name = NULL, *client_name;
3814
3815     pa_native_connection_assert_ref(c);
3816     pa_assert(t);
3817
3818     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3819         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3820         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3821         pa_tagstruct_get_boolean(t, &mute) ||
3822         !pa_tagstruct_eof(t)) {
3823         protocol_error(c);
3824         return;
3825     }
3826
3827     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3828     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_MUTE ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3829     CHECK_VALIDITY(c->pstream, (idx != PA_INVALID_INDEX) ^ (name != NULL), tag, PA_ERR_INVALID);
3830
3831     switch (command) {
3832
3833         case PA_COMMAND_SET_SINK_MUTE:
3834             if (idx != PA_INVALID_INDEX)
3835                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3836             else
3837                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3838
3839             break;
3840
3841         case PA_COMMAND_SET_SOURCE_MUTE:
3842             if (idx != PA_INVALID_INDEX)
3843                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3844             else
3845                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3846
3847             break;
3848
3849         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3850             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3851             break;
3852
3853         case PA_COMMAND_SET_SOURCE_OUTPUT_MUTE:
3854             so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3855             break;
3856
3857         default:
3858             pa_assert_not_reached();
3859     }
3860
3861     CHECK_VALIDITY(c->pstream, si || so || sink || source, tag, PA_ERR_NOENTITY);
3862
3863     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3864
3865     if (sink) {
3866         pa_log_debug("Client %s changes mute of sink %s.", client_name, sink->name);
3867         pa_sink_set_mute(sink, mute, true);
3868     } else if (source) {
3869         pa_log_debug("Client %s changes mute of source %s.", client_name, source->name);
3870         pa_source_set_mute(source, mute, true);
3871     } else if (si) {
3872         pa_log_debug("Client %s changes mute of sink input %s.",
3873                      client_name,
3874                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3875         pa_sink_input_set_mute(si, mute, true);
3876     } else if (so) {
3877         pa_log_debug("Client %s changes mute of source output %s.",
3878                      client_name,
3879                      pa_strnull(pa_proplist_gets(so->proplist, PA_PROP_MEDIA_NAME)));
3880         pa_source_output_set_mute(so, mute, true);
3881     }
3882
3883     pa_pstream_send_simple_ack(c->pstream, tag);
3884 }
3885
3886 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3887     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3888     uint32_t idx;
3889     bool b;
3890     playback_stream *s;
3891
3892     pa_native_connection_assert_ref(c);
3893     pa_assert(t);
3894
3895     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3896         pa_tagstruct_get_boolean(t, &b) < 0 ||
3897         !pa_tagstruct_eof(t)) {
3898         protocol_error(c);
3899         return;
3900     }
3901
3902     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3903     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3904     s = pa_idxset_get_by_index(c->output_streams, idx);
3905     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3906     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3907
3908     pa_sink_input_cork(s->sink_input, b);
3909
3910     if (b)
3911         s->is_underrun = true;
3912
3913     pa_pstream_send_simple_ack(c->pstream, tag);
3914 }
3915
3916 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3917     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3918     uint32_t idx;
3919     playback_stream *s;
3920
3921     pa_native_connection_assert_ref(c);
3922     pa_assert(t);
3923
3924     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3925         !pa_tagstruct_eof(t)) {
3926         protocol_error(c);
3927         return;
3928     }
3929
3930     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3931     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3932     s = pa_idxset_get_by_index(c->output_streams, idx);
3933     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3934     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3935
3936     switch (command) {
3937         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3938             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3939             break;
3940
3941         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3942             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3943             break;
3944
3945         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3946             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3947             break;
3948
3949         default:
3950             pa_assert_not_reached();
3951     }
3952
3953     pa_pstream_send_simple_ack(c->pstream, tag);
3954 }
3955
3956 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3957     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3958     uint32_t idx;
3959     record_stream *s;
3960     bool b;
3961
3962     pa_native_connection_assert_ref(c);
3963     pa_assert(t);
3964
3965     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3966         pa_tagstruct_get_boolean(t, &b) < 0 ||
3967         !pa_tagstruct_eof(t)) {
3968         protocol_error(c);
3969         return;
3970     }
3971
3972     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3973     s = pa_idxset_get_by_index(c->record_streams, idx);
3974     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3975
3976     pa_source_output_cork(s->source_output, b);
3977     pa_memblockq_prebuf_force(s->memblockq);
3978     pa_pstream_send_simple_ack(c->pstream, tag);
3979 }
3980
3981 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3982     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3983     uint32_t idx;
3984     record_stream *s;
3985
3986     pa_native_connection_assert_ref(c);
3987     pa_assert(t);
3988
3989     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3990         !pa_tagstruct_eof(t)) {
3991         protocol_error(c);
3992         return;
3993     }
3994
3995     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3996     s = pa_idxset_get_by_index(c->record_streams, idx);
3997     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3998
3999     pa_memblockq_flush_read(s->memblockq);
4000     pa_pstream_send_simple_ack(c->pstream, tag);
4001 }
4002
4003 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4004     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4005     uint32_t idx;
4006     pa_buffer_attr a;
4007     pa_tagstruct *reply;
4008
4009     pa_native_connection_assert_ref(c);
4010     pa_assert(t);
4011
4012     memset(&a, 0, sizeof(a));
4013
4014     if (pa_tagstruct_getu32(t, &idx) < 0) {
4015         protocol_error(c);
4016         return;
4017     }
4018
4019     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4020
4021     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
4022         playback_stream *s;
4023         bool adjust_latency = false, early_requests = false;
4024
4025         s = pa_idxset_get_by_index(c->output_streams, idx);
4026         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4027         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4028
4029         if (pa_tagstruct_get(
4030                     t,
4031                     PA_TAG_U32, &a.maxlength,
4032                     PA_TAG_U32, &a.tlength,
4033                     PA_TAG_U32, &a.prebuf,
4034                     PA_TAG_U32, &a.minreq,
4035                     PA_TAG_INVALID) < 0 ||
4036             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
4037             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
4038             !pa_tagstruct_eof(t)) {
4039             protocol_error(c);
4040             return;
4041         }
4042
4043         s->adjust_latency = adjust_latency;
4044         s->early_requests = early_requests;
4045         s->buffer_attr_req = a;
4046
4047         fix_playback_buffer_attr(s);
4048         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
4049
4050         reply = reply_new(tag);
4051         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
4052         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
4053         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
4054         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
4055
4056         if (c->version >= 13)
4057             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
4058
4059     } else {
4060         record_stream *s;
4061         bool adjust_latency = false, early_requests = false;
4062         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
4063
4064         s = pa_idxset_get_by_index(c->record_streams, idx);
4065         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4066
4067         if (pa_tagstruct_get(
4068                     t,
4069                     PA_TAG_U32, &a.maxlength,
4070                     PA_TAG_U32, &a.fragsize,
4071                     PA_TAG_INVALID) < 0 ||
4072             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
4073             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
4074             !pa_tagstruct_eof(t)) {
4075             protocol_error(c);
4076             return;
4077         }
4078
4079         s->adjust_latency = adjust_latency;
4080         s->early_requests = early_requests;
4081         s->buffer_attr_req = a;
4082
4083         fix_record_buffer_attr_pre(s);
4084         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
4085         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
4086         fix_record_buffer_attr_post(s);
4087
4088         reply = reply_new(tag);
4089         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
4090         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
4091
4092         if (c->version >= 13)
4093             pa_tagstruct_put_usec(reply, s->configured_source_latency);
4094     }
4095
4096     pa_pstream_send_tagstruct(c->pstream, reply);
4097 }
4098
4099 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4100     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4101     uint32_t idx;
4102     uint32_t rate;
4103
4104     pa_native_connection_assert_ref(c);
4105     pa_assert(t);
4106
4107     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4108         pa_tagstruct_getu32(t, &rate) < 0 ||
4109         !pa_tagstruct_eof(t)) {
4110         protocol_error(c);
4111         return;
4112     }
4113
4114     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4115     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
4116
4117     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
4118         playback_stream *s;
4119
4120         s = pa_idxset_get_by_index(c->output_streams, idx);
4121         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4122         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4123
4124         pa_sink_input_set_rate(s->sink_input, rate);
4125
4126     } else {
4127         record_stream *s;
4128         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
4129
4130         s = pa_idxset_get_by_index(c->record_streams, idx);
4131         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4132
4133         pa_source_output_set_rate(s->source_output, rate);
4134     }
4135
4136     pa_pstream_send_simple_ack(c->pstream, tag);
4137 }
4138
4139 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4140     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4141     uint32_t idx;
4142     uint32_t mode;
4143     pa_proplist *p;
4144
4145     pa_native_connection_assert_ref(c);
4146     pa_assert(t);
4147
4148     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4149
4150     p = pa_proplist_new();
4151
4152     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
4153
4154         if (pa_tagstruct_getu32(t, &mode) < 0 ||
4155             pa_tagstruct_get_proplist(t, p) < 0 ||
4156             !pa_tagstruct_eof(t)) {
4157             protocol_error(c);
4158             pa_proplist_free(p);
4159             return;
4160         }
4161
4162     } else {
4163
4164         if (pa_tagstruct_getu32(t, &idx) < 0 ||
4165             pa_tagstruct_getu32(t, &mode) < 0 ||
4166             pa_tagstruct_get_proplist(t, p) < 0 ||
4167             !pa_tagstruct_eof(t)) {
4168             protocol_error(c);
4169             pa_proplist_free(p);
4170             return;
4171         }
4172     }
4173
4174     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
4175         pa_proplist_free(p);
4176         CHECK_VALIDITY(c->pstream, false, tag, PA_ERR_INVALID);
4177     }
4178
4179     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
4180         playback_stream *s;
4181
4182         s = pa_idxset_get_by_index(c->output_streams, idx);
4183         if (!s || !playback_stream_isinstance(s)) {
4184             pa_proplist_free(p);
4185             CHECK_VALIDITY(c->pstream, false, tag, PA_ERR_NOENTITY);
4186         }
4187         pa_sink_input_update_proplist(s->sink_input, mode, p);
4188
4189     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
4190         record_stream *s;
4191
4192         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
4193             pa_proplist_free(p);
4194             CHECK_VALIDITY(c->pstream, false, tag, PA_ERR_NOENTITY);
4195         }
4196         pa_source_output_update_proplist(s->source_output, mode, p);
4197
4198     } else {
4199         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
4200
4201         pa_client_update_proplist(c->client, mode, p);
4202     }
4203
4204     pa_pstream_send_simple_ack(c->pstream, tag);
4205     pa_proplist_free(p);
4206 }
4207
4208 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4209     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4210     uint32_t idx;
4211     unsigned changed = 0;
4212     pa_proplist *p;
4213     pa_strlist *l = NULL;
4214
4215     pa_native_connection_assert_ref(c);
4216     pa_assert(t);
4217
4218     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4219
4220     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
4221
4222         if (pa_tagstruct_getu32(t, &idx) < 0) {
4223             protocol_error(c);
4224             return;
4225         }
4226     }
4227
4228     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
4229         playback_stream *s;
4230
4231         s = pa_idxset_get_by_index(c->output_streams, idx);
4232         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4233         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4234
4235         p = s->sink_input->proplist;
4236
4237     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
4238         record_stream *s;
4239
4240         s = pa_idxset_get_by_index(c->record_streams, idx);
4241         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4242
4243         p = s->source_output->proplist;
4244     } else {
4245         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
4246
4247         p = c->client->proplist;
4248     }
4249
4250     for (;;) {
4251         const char *k;
4252
4253         if (pa_tagstruct_gets(t, &k) < 0) {
4254             protocol_error(c);
4255             pa_strlist_free(l);
4256             return;
4257         }
4258
4259         if (!k)
4260             break;
4261
4262         l = pa_strlist_prepend(l, k);
4263     }
4264
4265     if (!pa_tagstruct_eof(t)) {
4266         protocol_error(c);
4267         pa_strlist_free(l);
4268         return;
4269     }
4270
4271     for (;;) {
4272         char *z;
4273
4274         l = pa_strlist_pop(l, &z);
4275
4276         if (!z)
4277             break;
4278
4279         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
4280         pa_xfree(z);
4281     }
4282
4283     pa_pstream_send_simple_ack(c->pstream, tag);
4284
4285     if (changed) {
4286         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
4287             playback_stream *s;
4288
4289             s = pa_idxset_get_by_index(c->output_streams, idx);
4290             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
4291
4292         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
4293             record_stream *s;
4294
4295             s = pa_idxset_get_by_index(c->record_streams, idx);
4296             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
4297
4298         } else {
4299             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
4300             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
4301         }
4302     }
4303 }
4304
4305 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4306     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4307     const char *s;
4308
4309     pa_native_connection_assert_ref(c);
4310     pa_assert(t);
4311
4312     if (pa_tagstruct_gets(t, &s) < 0 ||
4313         !pa_tagstruct_eof(t)) {
4314         protocol_error(c);
4315         return;
4316     }
4317
4318     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4319     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
4320
4321     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
4322         pa_source *source;
4323
4324         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
4325         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4326
4327         pa_namereg_set_default_source(c->protocol->core, source);
4328     } else {
4329         pa_sink *sink;
4330         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
4331
4332         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
4333         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4334
4335         pa_namereg_set_default_sink(c->protocol->core, sink);
4336     }
4337
4338     pa_pstream_send_simple_ack(c->pstream, tag);
4339 }
4340
4341 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4342     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4343     uint32_t idx;
4344     const char *name;
4345
4346     pa_native_connection_assert_ref(c);
4347     pa_assert(t);
4348
4349     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4350         pa_tagstruct_gets(t, &name) < 0 ||
4351         !pa_tagstruct_eof(t)) {
4352         protocol_error(c);
4353         return;
4354     }
4355
4356     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4357     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
4358
4359     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
4360         playback_stream *s;
4361
4362         s = pa_idxset_get_by_index(c->output_streams, idx);
4363         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4364         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4365
4366         pa_sink_input_set_name(s->sink_input, name);
4367
4368     } else {
4369         record_stream *s;
4370         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
4371
4372         s = pa_idxset_get_by_index(c->record_streams, idx);
4373         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4374
4375         pa_source_output_set_name(s->source_output, name);
4376     }
4377
4378     pa_pstream_send_simple_ack(c->pstream, tag);
4379 }
4380
4381 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4382     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4383     uint32_t idx;
4384
4385     pa_native_connection_assert_ref(c);
4386     pa_assert(t);
4387
4388     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4389         !pa_tagstruct_eof(t)) {
4390         protocol_error(c);
4391         return;
4392     }
4393
4394     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4395
4396     if (command == PA_COMMAND_KILL_CLIENT) {
4397         pa_client *client;
4398
4399         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
4400         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
4401
4402         pa_native_connection_ref(c);
4403         pa_client_kill(client);
4404
4405     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
4406         pa_sink_input *s;
4407
4408         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4409         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4410
4411         pa_native_connection_ref(c);
4412         pa_sink_input_kill(s);
4413     } else {
4414         pa_source_output *s;
4415
4416         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4417
4418         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4419         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4420
4421         pa_native_connection_ref(c);
4422         pa_source_output_kill(s);
4423     }
4424
4425     pa_pstream_send_simple_ack(c->pstream, tag);
4426     pa_native_connection_unref(c);
4427 }
4428
4429 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4430     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4431     pa_module *m;
4432     const char *name, *argument;
4433     pa_tagstruct *reply;
4434
4435     pa_native_connection_assert_ref(c);
4436     pa_assert(t);
4437
4438     if (pa_tagstruct_gets(t, &name) < 0 ||
4439         pa_tagstruct_gets(t, &argument) < 0 ||
4440         !pa_tagstruct_eof(t)) {
4441         protocol_error(c);
4442         return;
4443     }
4444
4445     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4446     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4447     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4448
4449     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4450         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4451         return;
4452     }
4453
4454     reply = reply_new(tag);
4455     pa_tagstruct_putu32(reply, m->index);
4456     pa_pstream_send_tagstruct(c->pstream, reply);
4457 }
4458
4459 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4460     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4461     uint32_t idx;
4462     pa_module *m;
4463
4464     pa_native_connection_assert_ref(c);
4465     pa_assert(t);
4466
4467     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4468         !pa_tagstruct_eof(t)) {
4469         protocol_error(c);
4470         return;
4471     }
4472
4473     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4474     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4475     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4476
4477     pa_module_unload_request(m, false);
4478     pa_pstream_send_simple_ack(c->pstream, tag);
4479 }
4480
4481 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4482     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4483     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4484     const char *name_device = NULL;
4485
4486     pa_native_connection_assert_ref(c);
4487     pa_assert(t);
4488
4489     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4490         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4491         pa_tagstruct_gets(t, &name_device) < 0 ||
4492         !pa_tagstruct_eof(t)) {
4493         protocol_error(c);
4494         return;
4495     }
4496
4497     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4498     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4499
4500     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name_or_wildcard(name_device, command == PA_COMMAND_MOVE_SINK_INPUT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4501     CHECK_VALIDITY(c->pstream, (idx_device != PA_INVALID_INDEX) ^ (name_device != NULL), tag, PA_ERR_INVALID);
4502
4503     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4504         pa_sink_input *si = NULL;
4505         pa_sink *sink = NULL;
4506
4507         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4508
4509         if (idx_device != PA_INVALID_INDEX)
4510             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4511         else
4512             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4513
4514         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4515
4516         if (pa_sink_input_move_to(si, sink, true) < 0) {
4517             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4518             return;
4519         }
4520     } else {
4521         pa_source_output *so = NULL;
4522         pa_source *source;
4523
4524         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4525
4526         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4527
4528         if (idx_device != PA_INVALID_INDEX)
4529             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4530         else
4531             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4532
4533         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4534
4535         if (pa_source_output_move_to(so, source, true) < 0) {
4536             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4537             return;
4538         }
4539     }
4540
4541     pa_pstream_send_simple_ack(c->pstream, tag);
4542 }
4543
4544 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4545     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4546     uint32_t idx = PA_INVALID_INDEX;
4547     const char *name = NULL;
4548     bool b;
4549
4550     pa_native_connection_assert_ref(c);
4551     pa_assert(t);
4552
4553     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4554         pa_tagstruct_gets(t, &name) < 0 ||
4555         pa_tagstruct_get_boolean(t, &b) < 0 ||
4556         !pa_tagstruct_eof(t)) {
4557         protocol_error(c);
4558         return;
4559     }
4560
4561     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4562     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SUSPEND_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE) || *name == 0, tag, PA_ERR_INVALID);
4563     CHECK_VALIDITY(c->pstream, (idx != PA_INVALID_INDEX) ^ (name != NULL), tag, PA_ERR_INVALID);
4564
4565     if (command == PA_COMMAND_SUSPEND_SINK) {
4566
4567         if (idx == PA_INVALID_INDEX && name && !*name) {
4568
4569             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4570
4571             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4572                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4573                 return;
4574             }
4575         } else {
4576             pa_sink *sink = NULL;
4577
4578             if (idx != PA_INVALID_INDEX)
4579                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4580             else
4581                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4582
4583             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4584
4585             pa_log_debug("%s of sink %s requested by client %" PRIu32 ".",
4586                          b ? "Suspending" : "Resuming", sink->name, c->client->index);
4587
4588             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4589                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4590                 return;
4591             }
4592         }
4593     } else {
4594
4595         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4596
4597         if (idx == PA_INVALID_INDEX && name && !*name) {
4598
4599             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4600
4601             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4602                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4603                 return;
4604             }
4605
4606         } else {
4607             pa_source *source;
4608
4609             if (idx != PA_INVALID_INDEX)
4610                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4611             else
4612                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4613
4614             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4615
4616             pa_log_debug("%s of source %s requested by client %" PRIu32 ".",
4617                          b ? "Suspending" : "Resuming", source->name, c->client->index);
4618
4619             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4620                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4621                 return;
4622             }
4623         }
4624     }
4625
4626     pa_pstream_send_simple_ack(c->pstream, tag);
4627 }
4628
4629 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4630     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4631     uint32_t idx = PA_INVALID_INDEX;
4632     const char *name = NULL;
4633     pa_module *m;
4634     pa_native_protocol_ext_cb_t cb;
4635
4636     pa_native_connection_assert_ref(c);
4637     pa_assert(t);
4638
4639     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4640         pa_tagstruct_gets(t, &name) < 0) {
4641         protocol_error(c);
4642         return;
4643     }
4644
4645     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4646     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4647     CHECK_VALIDITY(c->pstream, (idx != PA_INVALID_INDEX) ^ (name != NULL), tag, PA_ERR_INVALID);
4648
4649     if (idx != PA_INVALID_INDEX)
4650         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4651     else
4652         PA_IDXSET_FOREACH(m, c->protocol->core->modules, idx)
4653             if (pa_streq(name, m->name))
4654                 break;
4655
4656     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4657     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4658
4659     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4660     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4661
4662     if (cb(c->protocol, m, c, tag, t) < 0)
4663         protocol_error(c);
4664 }
4665
4666 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4667     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4668     uint32_t idx = PA_INVALID_INDEX;
4669     const char *name = NULL, *profile = NULL;
4670     pa_card *card = NULL;
4671     int ret;
4672
4673     pa_native_connection_assert_ref(c);
4674     pa_assert(t);
4675
4676     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4677         pa_tagstruct_gets(t, &name) < 0 ||
4678         pa_tagstruct_gets(t, &profile) < 0 ||
4679         !pa_tagstruct_eof(t)) {
4680         protocol_error(c);
4681         return;
4682     }
4683
4684     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4685     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4686     CHECK_VALIDITY(c->pstream, (idx != PA_INVALID_INDEX) ^ (name != NULL), tag, PA_ERR_INVALID);
4687
4688     if (idx != PA_INVALID_INDEX)
4689         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4690     else
4691         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4692
4693     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4694
4695     if ((ret = pa_card_set_profile(card, profile, true)) < 0) {
4696         pa_pstream_send_error(c->pstream, tag, -ret);
4697         return;
4698     }
4699
4700     pa_pstream_send_simple_ack(c->pstream, tag);
4701 }
4702
4703 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4704     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4705     uint32_t idx = PA_INVALID_INDEX;
4706     const char *name = NULL, *port = NULL;
4707     int ret;
4708
4709     pa_native_connection_assert_ref(c);
4710     pa_assert(t);
4711
4712     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4713         pa_tagstruct_gets(t, &name) < 0 ||
4714         pa_tagstruct_gets(t, &port) < 0 ||
4715         !pa_tagstruct_eof(t)) {
4716         protocol_error(c);
4717         return;
4718     }
4719
4720     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4721     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_PORT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4722     CHECK_VALIDITY(c->pstream, (idx != PA_INVALID_INDEX) ^ (name != NULL), tag, PA_ERR_INVALID);
4723     CHECK_VALIDITY(c->pstream, port, tag, PA_ERR_INVALID);
4724
4725     if (command == PA_COMMAND_SET_SINK_PORT) {
4726         pa_sink *sink;
4727
4728         if (idx != PA_INVALID_INDEX)
4729             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4730         else
4731             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4732
4733         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4734
4735         if ((ret = pa_sink_set_port(sink, port, true)) < 0) {
4736             pa_pstream_send_error(c->pstream, tag, -ret);
4737             return;
4738         }
4739     } else {
4740         pa_source *source;
4741
4742         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4743
4744         if (idx != PA_INVALID_INDEX)
4745             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4746         else
4747             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4748
4749         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4750
4751         if ((ret = pa_source_set_port(source, port, true)) < 0) {
4752             pa_pstream_send_error(c->pstream, tag, -ret);
4753             return;
4754         }
4755     }
4756
4757     pa_pstream_send_simple_ack(c->pstream, tag);
4758 }
4759
4760 static void command_set_port_latency_offset(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4761     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4762     const char *port_name, *card_name;
4763     uint32_t idx = PA_INVALID_INDEX;
4764     int64_t offset;
4765     pa_card *card = NULL;
4766     pa_device_port *port = NULL;
4767
4768     pa_native_connection_assert_ref(c);
4769     pa_assert(t);
4770
4771     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4772         pa_tagstruct_gets(t, &card_name) < 0 ||
4773         pa_tagstruct_gets(t, &port_name) < 0 ||
4774         pa_tagstruct_gets64(t, &offset) < 0 ||
4775         !pa_tagstruct_eof(t)) {
4776         protocol_error(c);
4777     }
4778
4779     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4780     CHECK_VALIDITY(c->pstream, !card_name || pa_namereg_is_valid_name(card_name), tag, PA_ERR_INVALID);
4781     CHECK_VALIDITY(c->pstream, (idx != PA_INVALID_INDEX) ^ (card_name != NULL), tag, PA_ERR_INVALID);
4782     CHECK_VALIDITY(c->pstream, port_name, tag, PA_ERR_INVALID);
4783
4784     if (idx != PA_INVALID_INDEX)
4785         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4786     else
4787         card = pa_namereg_get(c->protocol->core, card_name, PA_NAMEREG_CARD);
4788
4789     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4790
4791     port = pa_hashmap_get(card->ports, port_name);
4792     CHECK_VALIDITY(c->pstream, port, tag, PA_ERR_NOENTITY);
4793
4794     pa_device_port_set_latency_offset(port, offset);
4795
4796     pa_pstream_send_simple_ack(c->pstream, tag);
4797 }
4798
4799 /*** pstream callbacks ***/
4800
4801 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4802     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4803
4804     pa_assert(p);
4805     pa_assert(packet);
4806     pa_native_connection_assert_ref(c);
4807
4808     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4809         pa_log("invalid packet.");
4810         native_connection_unlink(c);
4811     }
4812 }
4813
4814 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4815     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4816     output_stream *stream;
4817
4818     pa_assert(p);
4819     pa_assert(chunk);
4820     pa_native_connection_assert_ref(c);
4821
4822     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4823         pa_log_debug("Client sent block for invalid stream.");
4824         /* Ignoring */
4825         return;
4826     }
4827
4828 #ifdef PROTOCOL_NATIVE_DEBUG
4829     pa_log("got %lu bytes from client", (unsigned long) chunk->length);
4830 #endif
4831
4832     if (playback_stream_isinstance(stream)) {
4833         playback_stream *ps = PLAYBACK_STREAM(stream);
4834
4835         pa_atomic_inc(&ps->seek_or_post_in_queue);
4836         if (chunk->memblock) {
4837             if (seek != PA_SEEK_RELATIVE || offset != 0)
4838                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, chunk, NULL);
4839             else
4840                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4841         } else
4842             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4843
4844     } else {
4845         upload_stream *u = UPLOAD_STREAM(stream);
4846         size_t l;
4847
4848         if (!u->memchunk.memblock) {
4849             if (u->length == chunk->length && chunk->memblock) {
4850                 u->memchunk = *chunk;
4851                 pa_memblock_ref(u->memchunk.memblock);
4852                 u->length = 0;
4853             } else {
4854                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4855                 u->memchunk.index = u->memchunk.length = 0;
4856             }
4857         }
4858
4859         pa_assert(u->memchunk.memblock);
4860
4861         l = u->length;
4862         if (l > chunk->length)
4863             l = chunk->length;
4864
4865         if (l > 0) {
4866             void *dst;
4867             dst = pa_memblock_acquire(u->memchunk.memblock);
4868
4869             if (chunk->memblock) {
4870                 void *src;
4871                 src = pa_memblock_acquire(chunk->memblock);
4872
4873                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4874                        (uint8_t*) src + chunk->index, l);
4875
4876                 pa_memblock_release(chunk->memblock);
4877             } else
4878                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4879
4880             pa_memblock_release(u->memchunk.memblock);
4881
4882             u->memchunk.length += l;
4883             u->length -= l;
4884         }
4885     }
4886 }
4887
4888 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4889     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4890
4891     pa_assert(p);
4892     pa_native_connection_assert_ref(c);
4893
4894     native_connection_unlink(c);
4895     pa_log_info("Connection died.");
4896 }
4897
4898 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4899     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4900
4901     pa_assert(p);
4902     pa_native_connection_assert_ref(c);
4903
4904     native_connection_send_memblock(c);
4905 }
4906
4907 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4908     pa_thread_mq *q;
4909
4910     if (!(q = pa_thread_mq_get()))
4911         pa_pstream_send_revoke(p, block_id);
4912     else
4913         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4914 }
4915
4916 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4917     pa_thread_mq *q;
4918
4919     if (!(q = pa_thread_mq_get()))
4920         pa_pstream_send_release(p, block_id);
4921     else
4922         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4923 }
4924
4925 /*** client callbacks ***/
4926
4927 static void client_kill_cb(pa_client *c) {
4928     pa_assert(c);
4929
4930     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4931     pa_log_info("Connection killed.");
4932 }
4933
4934 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4935     pa_tagstruct *t;
4936     pa_native_connection *c;
4937
4938     pa_assert(client);
4939     c = PA_NATIVE_CONNECTION(client->userdata);
4940     pa_native_connection_assert_ref(c);
4941
4942     if (c->version < 15)
4943       return;
4944
4945     t = pa_tagstruct_new(NULL, 0);
4946     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4947     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4948     pa_tagstruct_puts(t, event);
4949     pa_tagstruct_put_proplist(t, pl);
4950     pa_pstream_send_tagstruct(c->pstream, t);
4951 }
4952
4953 /*** module entry points ***/
4954
4955 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4956     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4957
4958     pa_assert(m);
4959     pa_native_connection_assert_ref(c);
4960     pa_assert(c->auth_timeout_event == e);
4961
4962     if (!c->authorized) {
4963         native_connection_unlink(c);
4964         pa_log_info("Connection terminated due to authentication timeout.");
4965     }
4966 }
4967
4968 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4969     pa_native_connection *c;
4970     char pname[128];
4971     pa_client *client;
4972     pa_client_new_data data;
4973
4974     pa_assert(p);
4975     pa_assert(io);
4976     pa_assert(o);
4977
4978     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4979         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4980         pa_iochannel_free(io);
4981         return;
4982     }
4983
4984     pa_client_new_data_init(&data);
4985     data.module = o->module;
4986     data.driver = __FILE__;
4987     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4988     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4989     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4990     client = pa_client_new(p->core, &data);
4991     pa_client_new_data_done(&data);
4992
4993     if (!client)
4994         return;
4995
4996     c = pa_msgobject_new(pa_native_connection);
4997     c->parent.parent.free = native_connection_free;
4998     c->parent.process_msg = native_connection_process_msg;
4999     c->protocol = p;
5000     c->options = pa_native_options_ref(o);
5001     c->authorized = false;
5002
5003     if (o->auth_anonymous) {
5004         pa_log_info("Client authenticated anonymously.");
5005         c->authorized = true;
5006     }
5007
5008     if (!c->authorized &&
5009         o->auth_ip_acl &&
5010         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
5011
5012         pa_log_info("Client authenticated by IP ACL.");
5013         c->authorized = true;
5014     }
5015
5016     if (!c->authorized)
5017         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
5018     else
5019         c->auth_timeout_event = NULL;
5020
5021     c->is_local = pa_iochannel_socket_is_local(io);
5022     c->version = 8;
5023
5024     c->client = client;
5025     c->client->kill = client_kill_cb;
5026     c->client->send_event = client_send_event_cb;
5027     c->client->userdata = c;
5028
5029     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
5030     pa_pstream_set_receive_packet_callback(c->pstream, pstream_packet_callback, c);
5031     pa_pstream_set_receive_memblock_callback(c->pstream, pstream_memblock_callback, c);
5032     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
5033     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
5034     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
5035     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
5036
5037     c->pdispatch = pa_pdispatch_new(p->core->mainloop, true, command_table, PA_COMMAND_MAX);
5038
5039     c->record_streams = pa_idxset_new(NULL, NULL);
5040     c->output_streams = pa_idxset_new(NULL, NULL);
5041
5042     c->rrobin_index = PA_IDXSET_INVALID;
5043     c->subscription = NULL;
5044
5045     pa_idxset_put(p->connections, c, NULL);
5046
5047 #ifdef HAVE_CREDS
5048     if (pa_iochannel_creds_supported(io))
5049         pa_iochannel_creds_enable(io);
5050 #endif
5051
5052     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
5053 }
5054
5055 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
5056     pa_native_connection *c;
5057     void *state = NULL;
5058
5059     pa_assert(p);
5060     pa_assert(m);
5061
5062     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
5063         if (c->options->module == m)
5064             native_connection_unlink(c);
5065 }
5066
5067 static pa_native_protocol* native_protocol_new(pa_core *c) {
5068     pa_native_protocol *p;
5069     pa_native_hook_t h;
5070
5071     pa_assert(c);
5072
5073     p = pa_xnew(pa_native_protocol, 1);
5074     PA_REFCNT_INIT(p);
5075     p->core = c;
5076     p->connections = pa_idxset_new(NULL, NULL);
5077
5078     p->servers = NULL;
5079
5080     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
5081
5082     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
5083         pa_hook_init(&p->hooks[h], p);
5084
5085     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
5086
5087     return p;
5088 }
5089
5090 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
5091     pa_native_protocol *p;
5092
5093     if ((p = pa_shared_get(c, "native-protocol")))
5094         return pa_native_protocol_ref(p);
5095
5096     return native_protocol_new(c);
5097 }
5098
5099 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
5100     pa_assert(p);
5101     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5102
5103     PA_REFCNT_INC(p);
5104
5105     return p;
5106 }
5107
5108 void pa_native_protocol_unref(pa_native_protocol *p) {
5109     pa_native_connection *c;
5110     pa_native_hook_t h;
5111
5112     pa_assert(p);
5113     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5114
5115     if (PA_REFCNT_DEC(p) > 0)
5116         return;
5117
5118     while ((c = pa_idxset_first(p->connections, NULL)))
5119         native_connection_unlink(c);
5120
5121     pa_idxset_free(p->connections, NULL);
5122
5123     pa_strlist_free(p->servers);
5124
5125     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
5126         pa_hook_done(&p->hooks[h]);
5127
5128     pa_hashmap_free(p->extensions);
5129
5130     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
5131
5132     pa_xfree(p);
5133 }
5134
5135 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
5136     pa_assert(p);
5137     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5138     pa_assert(name);
5139
5140     p->servers = pa_strlist_prepend(p->servers, name);
5141
5142     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
5143 }
5144
5145 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
5146     pa_assert(p);
5147     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5148     pa_assert(name);
5149
5150     p->servers = pa_strlist_remove(p->servers, name);
5151
5152     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
5153 }
5154
5155 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
5156     pa_assert(p);
5157     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5158
5159     return p->hooks;
5160 }
5161
5162 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
5163     pa_assert(p);
5164     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5165
5166     return p->servers;
5167 }
5168
5169 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
5170     pa_assert(p);
5171     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5172     pa_assert(m);
5173     pa_assert(cb);
5174     pa_assert(!pa_hashmap_get(p->extensions, m));
5175
5176     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
5177     return 0;
5178 }
5179
5180 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
5181     pa_assert(p);
5182     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5183     pa_assert(m);
5184
5185     pa_assert_se(pa_hashmap_remove(p->extensions, m));
5186 }
5187
5188 pa_native_options* pa_native_options_new(void) {
5189     pa_native_options *o;
5190
5191     o = pa_xnew0(pa_native_options, 1);
5192     PA_REFCNT_INIT(o);
5193
5194     return o;
5195 }
5196
5197 pa_native_options* pa_native_options_ref(pa_native_options *o) {
5198     pa_assert(o);
5199     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5200
5201     PA_REFCNT_INC(o);
5202
5203     return o;
5204 }
5205
5206 void pa_native_options_unref(pa_native_options *o) {
5207     pa_assert(o);
5208     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5209
5210     if (PA_REFCNT_DEC(o) > 0)
5211         return;
5212
5213     pa_xfree(o->auth_group);
5214
5215     if (o->auth_ip_acl)
5216         pa_ip_acl_free(o->auth_ip_acl);
5217
5218     if (o->auth_cookie)
5219         pa_auth_cookie_unref(o->auth_cookie);
5220
5221     pa_xfree(o);
5222 }
5223
5224 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
5225     bool enabled;
5226     const char *acl;
5227
5228     pa_assert(o);
5229     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5230     pa_assert(ma);
5231
5232     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
5233         pa_log("auth-anonymous= expects a boolean argument.");
5234         return -1;
5235     }
5236
5237     enabled = true;
5238     if (pa_modargs_get_value_boolean(ma, "auth-group-enable", &enabled) < 0) {
5239         pa_log("auth-group-enable= expects a boolean argument.");
5240         return -1;
5241     }
5242
5243     pa_xfree(o->auth_group);
5244     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
5245
5246 #ifndef HAVE_CREDS
5247     if (o->auth_group)
5248         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
5249 #endif
5250
5251     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
5252         pa_ip_acl *ipa;
5253
5254         if (!(ipa = pa_ip_acl_new(acl))) {
5255             pa_log("Failed to parse IP ACL '%s'", acl);
5256             return -1;
5257         }
5258
5259         if (o->auth_ip_acl)
5260             pa_ip_acl_free(o->auth_ip_acl);
5261
5262         o->auth_ip_acl = ipa;
5263     }
5264
5265     enabled = true;
5266     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
5267         pa_log("auth-cookie-enabled= expects a boolean argument.");
5268         return -1;
5269     }
5270
5271     if (o->auth_cookie)
5272         pa_auth_cookie_unref(o->auth_cookie);
5273
5274     if (enabled) {
5275         const char *cn;
5276
5277         /* The new name for this is 'auth-cookie', for compat reasons
5278          * we check the old name too */
5279         cn = pa_modargs_get_value(ma, "auth-cookie", NULL);
5280         if (!cn)
5281             cn = pa_modargs_get_value(ma, "cookie", NULL);
5282
5283         if (cn)
5284             o->auth_cookie = pa_auth_cookie_get(c, cn, true, PA_NATIVE_COOKIE_LENGTH);
5285         else {
5286             o->auth_cookie = pa_auth_cookie_get(c, PA_NATIVE_COOKIE_FILE, false, PA_NATIVE_COOKIE_LENGTH);
5287             if (!o->auth_cookie) {
5288                 o->auth_cookie = pa_auth_cookie_get(c, PA_NATIVE_COOKIE_FILE_FALLBACK, false, PA_NATIVE_COOKIE_LENGTH);
5289
5290                 if (!o->auth_cookie)
5291                     o->auth_cookie = pa_auth_cookie_get(c, PA_NATIVE_COOKIE_FILE, true, PA_NATIVE_COOKIE_LENGTH);
5292             }
5293         }
5294
5295         if (!o->auth_cookie)
5296             return -1;
5297
5298     } else
5299           o->auth_cookie = NULL;
5300
5301     return 0;
5302 }
5303
5304 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
5305     pa_native_connection_assert_ref(c);
5306
5307     return c->pstream;
5308 }
5309
5310 pa_client* pa_native_connection_get_client(pa_native_connection *c) {
5311     pa_native_connection_assert_ref(c);
5312
5313     return c->client;
5314 }