introspect: Expose port info per card to clients
[platform/upstream/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38 #include <pulse/internal.h>
39
40 #include <pulsecore/native-common.h>
41 #include <pulsecore/packet.h>
42 #include <pulsecore/client.h>
43 #include <pulsecore/source-output.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pstream.h>
46 #include <pulsecore/tagstruct.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* #define PROTOCOL_NATIVE_DEBUG */
64
65 /* Kick a client if it doesn't authenticate within this time */
66 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
67
68 /* Don't accept more connection than this */
69 #define MAX_CONNECTIONS 64
70
71 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
72 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
73 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
74 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
75
76 struct pa_native_protocol;
77
78 typedef struct record_stream {
79     pa_msgobject parent;
80
81     pa_native_connection *connection;
82     uint32_t index;
83
84     pa_source_output *source_output;
85     pa_memblockq *memblockq;
86
87     pa_bool_t adjust_latency:1;
88     pa_bool_t early_requests:1;
89
90     /* Requested buffer attributes */
91     pa_buffer_attr buffer_attr_req;
92     /* Fixed-up and adjusted buffer attributes */
93     pa_buffer_attr buffer_attr;
94
95     pa_atomic_t on_the_fly;
96     pa_usec_t configured_source_latency;
97     size_t drop_initial;
98
99     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
100     size_t on_the_fly_snapshot;
101     pa_usec_t current_monitor_latency;
102     pa_usec_t current_source_latency;
103 } record_stream;
104
105 #define RECORD_STREAM(o) (record_stream_cast(o))
106 PA_DEFINE_PRIVATE_CLASS(record_stream, pa_msgobject);
107
108 typedef struct output_stream {
109     pa_msgobject parent;
110 } output_stream;
111
112 #define OUTPUT_STREAM(o) (output_stream_cast(o))
113 PA_DEFINE_PRIVATE_CLASS(output_stream, pa_msgobject);
114
115 typedef struct playback_stream {
116     output_stream parent;
117
118     pa_native_connection *connection;
119     uint32_t index;
120
121     pa_sink_input *sink_input;
122     pa_memblockq *memblockq;
123
124     pa_bool_t adjust_latency:1;
125     pa_bool_t early_requests:1;
126
127     pa_bool_t is_underrun:1;
128     pa_bool_t drain_request:1;
129     uint32_t drain_tag;
130     uint32_t syncid;
131
132     /* Optimization to avoid too many rewinds with a lot of small blocks */
133     pa_atomic_t seek_or_post_in_queue;
134     int64_t seek_windex;
135
136     pa_atomic_t missing;
137     pa_usec_t configured_sink_latency;
138     /* Requested buffer attributes */
139     pa_buffer_attr buffer_attr_req;
140     /* Fixed-up and adjusted buffer attributes */
141     pa_buffer_attr buffer_attr;
142
143     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
144     int64_t read_index, write_index;
145     size_t render_memblockq_length;
146     pa_usec_t current_sink_latency;
147     uint64_t playing_for, underrun_for;
148 } playback_stream;
149
150 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
151 PA_DEFINE_PRIVATE_CLASS(playback_stream, output_stream);
152
153 typedef struct upload_stream {
154     output_stream parent;
155
156     pa_native_connection *connection;
157     uint32_t index;
158
159     pa_memchunk memchunk;
160     size_t length;
161     char *name;
162     pa_sample_spec sample_spec;
163     pa_channel_map channel_map;
164     pa_proplist *proplist;
165 } upload_stream;
166
167 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
168 PA_DEFINE_PRIVATE_CLASS(upload_stream, output_stream);
169
170 struct pa_native_connection {
171     pa_msgobject parent;
172     pa_native_protocol *protocol;
173     pa_native_options *options;
174     pa_bool_t authorized:1;
175     pa_bool_t is_local:1;
176     uint32_t version;
177     pa_client *client;
178     pa_pstream *pstream;
179     pa_pdispatch *pdispatch;
180     pa_idxset *record_streams, *output_streams;
181     uint32_t rrobin_index;
182     pa_subscription *subscription;
183     pa_time_event *auth_timeout_event;
184 };
185
186 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
187 PA_DEFINE_PRIVATE_CLASS(pa_native_connection, pa_msgobject);
188
189 struct pa_native_protocol {
190     PA_REFCNT_DECLARE;
191
192     pa_core *core;
193     pa_idxset *connections;
194
195     pa_strlist *servers;
196     pa_hook hooks[PA_NATIVE_HOOK_MAX];
197
198     pa_hashmap *extensions;
199 };
200
201 enum {
202     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
203 };
204
205 enum {
206     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
207     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
208     SINK_INPUT_MESSAGE_FLUSH,
209     SINK_INPUT_MESSAGE_TRIGGER,
210     SINK_INPUT_MESSAGE_SEEK,
211     SINK_INPUT_MESSAGE_PREBUF_FORCE,
212     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
213     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
214 };
215
216 enum {
217     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
218     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
219     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
220     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
221     PLAYBACK_STREAM_MESSAGE_STARTED,
222     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
223 };
224
225 enum {
226     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
227 };
228
229 enum {
230     CONNECTION_MESSAGE_RELEASE,
231     CONNECTION_MESSAGE_REVOKE
232 };
233
234 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
235 static void sink_input_kill_cb(pa_sink_input *i);
236 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
237 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
238 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
239 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
240 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
241 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
242
243 static void native_connection_send_memblock(pa_native_connection *c);
244 static void playback_stream_request_bytes(struct playback_stream*s);
245
246 static void source_output_kill_cb(pa_source_output *o);
247 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
248 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
249 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
250 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
251 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
252
253 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
254 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
255
256 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
289 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
290 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
291 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
292 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
293 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
294 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
295
296 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
297     [PA_COMMAND_ERROR] = NULL,
298     [PA_COMMAND_TIMEOUT] = NULL,
299     [PA_COMMAND_REPLY] = NULL,
300     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
301     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
302     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
303     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
304     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
305     [PA_COMMAND_AUTH] = command_auth,
306     [PA_COMMAND_REQUEST] = NULL,
307     [PA_COMMAND_EXIT] = command_exit,
308     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
309     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
310     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
311     [PA_COMMAND_STAT] = command_stat,
312     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
313     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
314     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
315     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
316     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
317     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
318     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
319     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
320     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
321     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
322     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
323     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
324     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
325     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
326     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
327     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
328     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
329     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
330     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
331     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
332     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
333     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
334     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
335     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
336     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
337
338     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
339     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
340     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
341     [PA_COMMAND_SET_SOURCE_OUTPUT_VOLUME] = command_set_volume,
342
343     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
344     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
345     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
346     [PA_COMMAND_SET_SOURCE_OUTPUT_MUTE] = command_set_mute,
347
348     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
349     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
350
351     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
352     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
353     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
354     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
355
356     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
357     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
358
359     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
360     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
361     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
362     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
363     [PA_COMMAND_KILL_CLIENT] = command_kill,
364     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
365     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
366     [PA_COMMAND_LOAD_MODULE] = command_load_module,
367     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
368
369     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
370     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
371     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
372     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
373
374     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
375     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
376
377     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
378     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
379
380     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
381     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
382
383     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
384     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
385     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
386
387     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
388     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
389     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
390
391     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
392
393     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
394     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
395
396     [PA_COMMAND_EXTENSION] = command_extension
397 };
398
399 /* structure management */
400
401 /* Called from main context */
402 static void upload_stream_unlink(upload_stream *s) {
403     pa_assert(s);
404
405     if (!s->connection)
406         return;
407
408     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
409     s->connection = NULL;
410     upload_stream_unref(s);
411 }
412
413 /* Called from main context */
414 static void upload_stream_free(pa_object *o) {
415     upload_stream *s = UPLOAD_STREAM(o);
416     pa_assert(s);
417
418     upload_stream_unlink(s);
419
420     pa_xfree(s->name);
421
422     if (s->proplist)
423         pa_proplist_free(s->proplist);
424
425     if (s->memchunk.memblock)
426         pa_memblock_unref(s->memchunk.memblock);
427
428     pa_xfree(s);
429 }
430
431 /* Called from main context */
432 static upload_stream* upload_stream_new(
433         pa_native_connection *c,
434         const pa_sample_spec *ss,
435         const pa_channel_map *map,
436         const char *name,
437         size_t length,
438         pa_proplist *p) {
439
440     upload_stream *s;
441
442     pa_assert(c);
443     pa_assert(ss);
444     pa_assert(name);
445     pa_assert(length > 0);
446     pa_assert(p);
447
448     s = pa_msgobject_new(upload_stream);
449     s->parent.parent.parent.free = upload_stream_free;
450     s->connection = c;
451     s->sample_spec = *ss;
452     s->channel_map = *map;
453     s->name = pa_xstrdup(name);
454     pa_memchunk_reset(&s->memchunk);
455     s->length = length;
456     s->proplist = pa_proplist_copy(p);
457     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
458
459     pa_idxset_put(c->output_streams, s, &s->index);
460
461     return s;
462 }
463
464 /* Called from main context */
465 static void record_stream_unlink(record_stream *s) {
466     pa_assert(s);
467
468     if (!s->connection)
469         return;
470
471     if (s->source_output) {
472         pa_source_output_unlink(s->source_output);
473         pa_source_output_unref(s->source_output);
474         s->source_output = NULL;
475     }
476
477     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
478     s->connection = NULL;
479     record_stream_unref(s);
480 }
481
482 /* Called from main context */
483 static void record_stream_free(pa_object *o) {
484     record_stream *s = RECORD_STREAM(o);
485     pa_assert(s);
486
487     record_stream_unlink(s);
488
489     pa_memblockq_free(s->memblockq);
490     pa_xfree(s);
491 }
492
493 /* Called from main context */
494 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
495     record_stream *s = RECORD_STREAM(o);
496     record_stream_assert_ref(s);
497
498     if (!s->connection)
499         return -1;
500
501     switch (code) {
502
503         case RECORD_STREAM_MESSAGE_POST_DATA:
504
505             /* We try to keep up to date with how many bytes are
506              * currently on the fly */
507             pa_atomic_sub(&s->on_the_fly, chunk->length);
508
509             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
510 /*                 pa_log_warn("Failed to push data into output queue."); */
511                 return -1;
512             }
513
514             if (!pa_pstream_is_pending(s->connection->pstream))
515                 native_connection_send_memblock(s->connection);
516
517             break;
518     }
519
520     return 0;
521 }
522
523 /* Called from main context */
524 static void fix_record_buffer_attr_pre(record_stream *s) {
525
526     size_t frame_size;
527     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
528
529     pa_assert(s);
530
531     /* This function will be called from the main thread, before as
532      * well as after the source output has been activated using
533      * pa_source_output_put()! That means it may not touch any
534      * ->thread_info data! */
535
536     frame_size = pa_frame_size(&s->source_output->sample_spec);
537     s->buffer_attr = s->buffer_attr_req;
538
539     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
540         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
541     if (s->buffer_attr.maxlength <= 0)
542         s->buffer_attr.maxlength = (uint32_t) frame_size;
543
544     if (s->buffer_attr.fragsize == (uint32_t) -1)
545         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
546     if (s->buffer_attr.fragsize <= 0)
547         s->buffer_attr.fragsize = (uint32_t) frame_size;
548
549     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
550
551     if (s->early_requests) {
552
553         /* In early request mode we need to emulate the classic
554          * fragment-based playback model. We do this setting the source
555          * latency to the fragment size. */
556
557         source_usec = fragsize_usec;
558
559     } else if (s->adjust_latency) {
560
561         /* So, the user asked us to adjust the latency according to
562          * what the source can provide. Half the latency will be
563          * spent on the hw buffer, half of it in the async buffer
564          * queue we maintain for each client. */
565
566         source_usec = fragsize_usec/2;
567
568     } else {
569
570         /* Ok, the user didn't ask us to adjust the latency, hence we
571          * don't */
572
573         source_usec = (pa_usec_t) -1;
574     }
575
576     if (source_usec != (pa_usec_t) -1)
577         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
578     else
579         s->configured_source_latency = 0;
580
581     if (s->early_requests) {
582
583         /* Ok, we didn't necessarily get what we were asking for, so
584          * let's tell the user */
585
586         fragsize_usec = s->configured_source_latency;
587
588     } else if (s->adjust_latency) {
589
590         /* Now subtract what we actually got */
591
592         if (fragsize_usec >= s->configured_source_latency*2)
593             fragsize_usec -= s->configured_source_latency;
594         else
595             fragsize_usec = s->configured_source_latency;
596     }
597
598     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
599         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
600
601         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
602
603     if (s->buffer_attr.fragsize <= 0)
604         s->buffer_attr.fragsize = (uint32_t) frame_size;
605 }
606
607 /* Called from main context */
608 static void fix_record_buffer_attr_post(record_stream *s) {
609     size_t base;
610
611     pa_assert(s);
612
613     /* This function will be called from the main thread, before as
614      * well as after the source output has been activated using
615      * pa_source_output_put()! That means it may not touch and
616      * ->thread_info data! */
617
618     base = pa_frame_size(&s->source_output->sample_spec);
619
620     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
621     if (s->buffer_attr.fragsize <= 0)
622         s->buffer_attr.fragsize = base;
623
624     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
625         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
626 }
627
628 /* Called from main context */
629 static record_stream* record_stream_new(
630         pa_native_connection *c,
631         pa_source *source,
632         pa_sample_spec *ss,
633         pa_channel_map *map,
634         pa_idxset *formats,
635         pa_buffer_attr *attr,
636         pa_cvolume *volume,
637         pa_bool_t muted,
638         pa_bool_t muted_set,
639         pa_source_output_flags_t flags,
640         pa_proplist *p,
641         pa_bool_t adjust_latency,
642         pa_bool_t early_requests,
643         pa_bool_t relative_volume,
644         pa_bool_t peak_detect,
645         pa_sink_input *direct_on_input,
646         int *ret) {
647
648     record_stream *s;
649     pa_source_output *source_output = NULL;
650     pa_source_output_new_data data;
651     char *memblockq_name;
652
653     pa_assert(c);
654     pa_assert(ss);
655     pa_assert(p);
656     pa_assert(ret);
657
658     pa_source_output_new_data_init(&data);
659
660     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
661     data.driver = __FILE__;
662     data.module = c->options->module;
663     data.client = c->client;
664     if (source)
665         pa_source_output_new_data_set_source(&data, source, TRUE);
666     if (pa_sample_spec_valid(ss))
667         pa_source_output_new_data_set_sample_spec(&data, ss);
668     if (pa_channel_map_valid(map))
669         pa_source_output_new_data_set_channel_map(&data, map);
670     if (formats)
671         pa_source_output_new_data_set_formats(&data, formats);
672     data.direct_on_input = direct_on_input;
673     if (volume) {
674         pa_source_output_new_data_set_volume(&data, volume);
675         data.volume_is_absolute = !relative_volume;
676         data.save_volume = TRUE;
677     }
678     if (muted_set) {
679         pa_source_output_new_data_set_muted(&data, muted);
680         data.save_muted = TRUE;
681     }
682     if (peak_detect)
683         data.resample_method = PA_RESAMPLER_PEAKS;
684     data.flags = flags;
685
686     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data);
687
688     pa_source_output_new_data_done(&data);
689
690     if (!source_output)
691         return NULL;
692
693     s = pa_msgobject_new(record_stream);
694     s->parent.parent.free = record_stream_free;
695     s->parent.process_msg = record_stream_process_msg;
696     s->connection = c;
697     s->source_output = source_output;
698     s->buffer_attr_req = *attr;
699     s->adjust_latency = adjust_latency;
700     s->early_requests = early_requests;
701     pa_atomic_store(&s->on_the_fly, 0);
702
703     s->source_output->parent.process_msg = source_output_process_msg;
704     s->source_output->push = source_output_push_cb;
705     s->source_output->kill = source_output_kill_cb;
706     s->source_output->get_latency = source_output_get_latency_cb;
707     s->source_output->moving = source_output_moving_cb;
708     s->source_output->suspend = source_output_suspend_cb;
709     s->source_output->send_event = source_output_send_event_cb;
710     s->source_output->userdata = s;
711
712     fix_record_buffer_attr_pre(s);
713
714     memblockq_name = pa_sprintf_malloc("native protocol record stream memblockq [%u]", s->source_output->index);
715     s->memblockq = pa_memblockq_new(
716             memblockq_name,
717             0,
718             s->buffer_attr.maxlength,
719             0,
720             &source_output->sample_spec,
721             1,
722             0,
723             0,
724             NULL);
725     pa_xfree(memblockq_name);
726
727     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
728     fix_record_buffer_attr_post(s);
729
730     *ss = s->source_output->sample_spec;
731     *map = s->source_output->channel_map;
732
733     pa_idxset_put(c->record_streams, s, &s->index);
734
735     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
736                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
737                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
738                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
739
740     pa_source_output_put(s->source_output);
741     return s;
742 }
743
744 /* Called from main context */
745 static void record_stream_send_killed(record_stream *r) {
746     pa_tagstruct *t;
747     record_stream_assert_ref(r);
748
749     t = pa_tagstruct_new(NULL, 0);
750     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
751     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
752     pa_tagstruct_putu32(t, r->index);
753     pa_pstream_send_tagstruct(r->connection->pstream, t);
754 }
755
756 /* Called from main context */
757 static void playback_stream_unlink(playback_stream *s) {
758     pa_assert(s);
759
760     if (!s->connection)
761         return;
762
763     if (s->sink_input) {
764         pa_sink_input_unlink(s->sink_input);
765         pa_sink_input_unref(s->sink_input);
766         s->sink_input = NULL;
767     }
768
769     if (s->drain_request)
770         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
771
772     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
773     s->connection = NULL;
774     playback_stream_unref(s);
775 }
776
777 /* Called from main context */
778 static void playback_stream_free(pa_object* o) {
779     playback_stream *s = PLAYBACK_STREAM(o);
780     pa_assert(s);
781
782     playback_stream_unlink(s);
783
784     pa_memblockq_free(s->memblockq);
785     pa_xfree(s);
786 }
787
788 /* Called from main context */
789 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
790     playback_stream *s = PLAYBACK_STREAM(o);
791     playback_stream_assert_ref(s);
792
793     if (!s->connection)
794         return -1;
795
796     switch (code) {
797
798         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
799             pa_tagstruct *t;
800             int l = 0;
801
802             for (;;) {
803                 if ((l = pa_atomic_load(&s->missing)) <= 0)
804                     return 0;
805
806                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
807                     break;
808             }
809
810             t = pa_tagstruct_new(NULL, 0);
811             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
812             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
813             pa_tagstruct_putu32(t, s->index);
814             pa_tagstruct_putu32(t, (uint32_t) l);
815             pa_pstream_send_tagstruct(s->connection->pstream, t);
816
817 #ifdef PROTOCOL_NATIVE_DEBUG
818             pa_log("Requesting %lu bytes", (unsigned long) l);
819 #endif
820             break;
821         }
822
823         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
824             pa_tagstruct *t;
825
826 #ifdef PROTOCOL_NATIVE_DEBUG
827             pa_log("signalling underflow");
828 #endif
829
830             /* Report that we're empty */
831             t = pa_tagstruct_new(NULL, 0);
832             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
833             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
834             pa_tagstruct_putu32(t, s->index);
835             if (s->connection->version >= 23)
836                 pa_tagstruct_puts64(t, offset);
837             pa_pstream_send_tagstruct(s->connection->pstream, t);
838             break;
839         }
840
841         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
842             pa_tagstruct *t;
843
844             /* Notify the user we're overflowed*/
845             t = pa_tagstruct_new(NULL, 0);
846             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
847             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
848             pa_tagstruct_putu32(t, s->index);
849             pa_pstream_send_tagstruct(s->connection->pstream, t);
850             break;
851         }
852
853         case PLAYBACK_STREAM_MESSAGE_STARTED:
854
855             if (s->connection->version >= 13) {
856                 pa_tagstruct *t;
857
858                 /* Notify the user we started playback */
859                 t = pa_tagstruct_new(NULL, 0);
860                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
861                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
862                 pa_tagstruct_putu32(t, s->index);
863                 pa_pstream_send_tagstruct(s->connection->pstream, t);
864             }
865
866             break;
867
868         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
869             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
870             break;
871
872         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH:
873
874             s->buffer_attr.tlength = (uint32_t) offset;
875
876             if (s->connection->version >= 15) {
877                 pa_tagstruct *t;
878
879                 t = pa_tagstruct_new(NULL, 0);
880                 pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
881                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
882                 pa_tagstruct_putu32(t, s->index);
883                 pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
884                 pa_tagstruct_putu32(t, s->buffer_attr.tlength);
885                 pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
886                 pa_tagstruct_putu32(t, s->buffer_attr.minreq);
887                 pa_tagstruct_put_usec(t, s->configured_sink_latency);
888                 pa_pstream_send_tagstruct(s->connection->pstream, t);
889             }
890
891             break;
892     }
893
894     return 0;
895 }
896
897 /* Called from main context */
898 static void fix_playback_buffer_attr(playback_stream *s) {
899     size_t frame_size, max_prebuf;
900     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
901
902     pa_assert(s);
903
904 #ifdef PROTOCOL_NATIVE_DEBUG
905     pa_log("Client requested: maxlength=%li bytes tlength=%li bytes minreq=%li bytes prebuf=%li bytes",
906            (long) s->buffer_attr.maxlength,
907            (long) s->buffer_attr.tlength,
908            (long) s->buffer_attr.minreq,
909            (long) s->buffer_attr.prebuf);
910
911     pa_log("Client requested: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms",
912            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
913            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
914            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
915            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC));
916 #endif
917
918     /* This function will be called from the main thread, before as
919      * well as after the sink input has been activated using
920      * pa_sink_input_put()! That means it may not touch any
921      * ->thread_info data, such as the memblockq! */
922
923     frame_size = pa_frame_size(&s->sink_input->sample_spec);
924     s->buffer_attr = s->buffer_attr_req;
925
926     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
927         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
928     if (s->buffer_attr.maxlength <= 0)
929         s->buffer_attr.maxlength = (uint32_t) frame_size;
930
931     if (s->buffer_attr.tlength == (uint32_t) -1)
932         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
933     if (s->buffer_attr.tlength <= 0)
934         s->buffer_attr.tlength = (uint32_t) frame_size;
935
936     if (s->buffer_attr.minreq == (uint32_t) -1)
937         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
938     if (s->buffer_attr.minreq <= 0)
939         s->buffer_attr.minreq = (uint32_t) frame_size;
940
941     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
942         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
943
944     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
945     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
946
947     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
948                 (double) tlength_usec / PA_USEC_PER_MSEC,
949                 (double) minreq_usec / PA_USEC_PER_MSEC);
950
951     if (s->early_requests) {
952
953         /* In early request mode we need to emulate the classic
954          * fragment-based playback model. We do this setting the sink
955          * latency to the fragment size. */
956
957         sink_usec = minreq_usec;
958         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
959
960     } else if (s->adjust_latency) {
961
962         /* So, the user asked us to adjust the latency of the stream
963          * buffer according to the what the sink can provide. The
964          * tlength passed in shall be the overall latency. Roughly
965          * half the latency will be spent on the hw buffer, the other
966          * half of it in the async buffer queue we maintain for each
967          * client. In between we'll have a safety space of size
968          * 2*minreq. Why the 2*minreq? When the hw buffer is completely
969          * empty and needs to be filled, then our buffer must have
970          * enough data to fulfill this request immediately and thus
971          * have at least the same tlength as the size of the hw
972          * buffer. It additionally needs space for 2 times minreq
973          * because if the buffer ran empty and a partial fillup
974          * happens immediately on the next iteration we need to be
975          * able to fulfill it and give the application also minreq
976          * time to fill it up again for the next request Makes 2 times
977          * minreq in plus.. */
978
979         if (tlength_usec > minreq_usec*2)
980             sink_usec = (tlength_usec - minreq_usec*2)/2;
981         else
982             sink_usec = 0;
983
984         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
985
986     } else {
987
988         /* Ok, the user didn't ask us to adjust the latency, but we
989          * still need to make sure that the parameters from the user
990          * do make sense. */
991
992         if (tlength_usec > minreq_usec*2)
993             sink_usec = (tlength_usec - minreq_usec*2);
994         else
995             sink_usec = 0;
996
997         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
998     }
999
1000     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
1001
1002     if (s->early_requests) {
1003
1004         /* Ok, we didn't necessarily get what we were asking for, so
1005          * let's tell the user */
1006
1007         minreq_usec = s->configured_sink_latency;
1008
1009     } else if (s->adjust_latency) {
1010
1011         /* Ok, we didn't necessarily get what we were asking for, so
1012          * let's subtract from what we asked for for the remaining
1013          * buffer space */
1014
1015         if (tlength_usec >= s->configured_sink_latency)
1016             tlength_usec -= s->configured_sink_latency;
1017     }
1018
1019     pa_log_debug("Requested latency=%0.2f ms, Received latency=%0.2f ms",
1020                  (double) sink_usec / PA_USEC_PER_MSEC,
1021                  (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1022
1023     /* FIXME: This is actually larger than necessary, since not all of
1024      * the sink latency is actually rewritable. */
1025     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
1026         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
1027
1028     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
1029         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
1030         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
1031
1032     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
1033         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
1034         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
1035
1036     if (s->buffer_attr.minreq <= 0) {
1037         s->buffer_attr.minreq = (uint32_t) frame_size;
1038         s->buffer_attr.tlength += (uint32_t) frame_size*2;
1039     }
1040
1041     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
1042         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
1043
1044     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
1045
1046     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
1047         s->buffer_attr.prebuf > max_prebuf)
1048         s->buffer_attr.prebuf = max_prebuf;
1049
1050 #ifdef PROTOCOL_NATIVE_DEBUG
1051     pa_log("Client accepted: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms",
1052            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
1053            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
1054            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
1055            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC));
1056 #endif
1057 }
1058
1059 /* Called from main context */
1060 static playback_stream* playback_stream_new(
1061         pa_native_connection *c,
1062         pa_sink *sink,
1063         pa_sample_spec *ss,
1064         pa_channel_map *map,
1065         pa_idxset *formats,
1066         pa_buffer_attr *a,
1067         pa_cvolume *volume,
1068         pa_bool_t muted,
1069         pa_bool_t muted_set,
1070         pa_sink_input_flags_t flags,
1071         pa_proplist *p,
1072         pa_bool_t adjust_latency,
1073         pa_bool_t early_requests,
1074         pa_bool_t relative_volume,
1075         uint32_t syncid,
1076         uint32_t *missing,
1077         int *ret) {
1078
1079     /* Note: This function takes ownership of the 'formats' param, so we need
1080      * to take extra care to not leak it */
1081
1082     playback_stream *ssync;
1083     playback_stream *s = NULL;
1084     pa_sink_input *sink_input = NULL;
1085     pa_memchunk silence;
1086     uint32_t idx;
1087     int64_t start_index;
1088     pa_sink_input_new_data data;
1089     char *memblockq_name;
1090
1091     pa_assert(c);
1092     pa_assert(ss);
1093     pa_assert(missing);
1094     pa_assert(p);
1095     pa_assert(ret);
1096
1097     /* Find syncid group */
1098     PA_IDXSET_FOREACH(ssync, c->output_streams, idx) {
1099
1100         if (!playback_stream_isinstance(ssync))
1101             continue;
1102
1103         if (ssync->syncid == syncid)
1104             break;
1105     }
1106
1107     /* Synced streams must connect to the same sink */
1108     if (ssync) {
1109
1110         if (!sink)
1111             sink = ssync->sink_input->sink;
1112         else if (sink != ssync->sink_input->sink) {
1113             *ret = PA_ERR_INVALID;
1114             goto out;
1115         }
1116     }
1117
1118     pa_sink_input_new_data_init(&data);
1119
1120     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1121     data.driver = __FILE__;
1122     data.module = c->options->module;
1123     data.client = c->client;
1124     if (sink)
1125         pa_sink_input_new_data_set_sink(&data, sink, TRUE);
1126     if (pa_sample_spec_valid(ss))
1127         pa_sink_input_new_data_set_sample_spec(&data, ss);
1128     if (pa_channel_map_valid(map))
1129         pa_sink_input_new_data_set_channel_map(&data, map);
1130     if (formats) {
1131         pa_sink_input_new_data_set_formats(&data, formats);
1132         /* Ownership transferred to new_data, so we don't free it ourselves */
1133         formats = NULL;
1134     }
1135     if (volume) {
1136         pa_sink_input_new_data_set_volume(&data, volume);
1137         data.volume_is_absolute = !relative_volume;
1138         data.save_volume = TRUE;
1139     }
1140     if (muted_set) {
1141         pa_sink_input_new_data_set_muted(&data, muted);
1142         data.save_muted = TRUE;
1143     }
1144     data.sync_base = ssync ? ssync->sink_input : NULL;
1145     data.flags = flags;
1146
1147     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data);
1148
1149     pa_sink_input_new_data_done(&data);
1150
1151     if (!sink_input)
1152         goto out;
1153
1154     s = pa_msgobject_new(playback_stream);
1155     s->parent.parent.parent.free = playback_stream_free;
1156     s->parent.parent.process_msg = playback_stream_process_msg;
1157     s->connection = c;
1158     s->syncid = syncid;
1159     s->sink_input = sink_input;
1160     s->is_underrun = TRUE;
1161     s->drain_request = FALSE;
1162     pa_atomic_store(&s->missing, 0);
1163     s->buffer_attr_req = *a;
1164     s->adjust_latency = adjust_latency;
1165     s->early_requests = early_requests;
1166     pa_atomic_store(&s->seek_or_post_in_queue, 0);
1167     s->seek_windex = -1;
1168
1169     s->sink_input->parent.process_msg = sink_input_process_msg;
1170     s->sink_input->pop = sink_input_pop_cb;
1171     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1172     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1173     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1174     s->sink_input->kill = sink_input_kill_cb;
1175     s->sink_input->moving = sink_input_moving_cb;
1176     s->sink_input->suspend = sink_input_suspend_cb;
1177     s->sink_input->send_event = sink_input_send_event_cb;
1178     s->sink_input->userdata = s;
1179
1180     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1181
1182     fix_playback_buffer_attr(s);
1183
1184     pa_sink_input_get_silence(sink_input, &silence);
1185     memblockq_name = pa_sprintf_malloc("native protocol playback stream memblockq [%u]", s->sink_input->index);
1186     s->memblockq = pa_memblockq_new(
1187             memblockq_name,
1188             start_index,
1189             s->buffer_attr.maxlength,
1190             s->buffer_attr.tlength,
1191             &sink_input->sample_spec,
1192             s->buffer_attr.prebuf,
1193             s->buffer_attr.minreq,
1194             0,
1195             &silence);
1196     pa_xfree(memblockq_name);
1197     pa_memblock_unref(silence.memblock);
1198
1199     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1200
1201     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1202
1203 #ifdef PROTOCOL_NATIVE_DEBUG
1204     pa_log("missing original: %li", (long int) *missing);
1205 #endif
1206
1207     *ss = s->sink_input->sample_spec;
1208     *map = s->sink_input->channel_map;
1209
1210     pa_idxset_put(c->output_streams, s, &s->index);
1211
1212     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1213                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1214                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1215                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1216                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1217
1218     pa_sink_input_put(s->sink_input);
1219
1220 out:
1221     if (formats)
1222         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
1223
1224     return s;
1225 }
1226
1227 /* Called from IO context */
1228 static void playback_stream_request_bytes(playback_stream *s) {
1229     size_t m, minreq;
1230     int previous_missing;
1231
1232     playback_stream_assert_ref(s);
1233
1234     m = pa_memblockq_pop_missing(s->memblockq);
1235
1236     /* pa_log("request_bytes(%lu) (tlength=%lu minreq=%lu length=%lu really missing=%lli)", */
1237     /*        (unsigned long) m, */
1238     /*        pa_memblockq_get_tlength(s->memblockq), */
1239     /*        pa_memblockq_get_minreq(s->memblockq), */
1240     /*        pa_memblockq_get_length(s->memblockq), */
1241     /*        (long long) pa_memblockq_get_tlength(s->memblockq) - (long long) pa_memblockq_get_length(s->memblockq)); */
1242
1243     if (m <= 0)
1244         return;
1245
1246 #ifdef PROTOCOL_NATIVE_DEBUG
1247     pa_log("request_bytes(%lu)", (unsigned long) m);
1248 #endif
1249
1250     previous_missing = pa_atomic_add(&s->missing, (int) m);
1251     minreq = pa_memblockq_get_minreq(s->memblockq);
1252
1253     if (pa_memblockq_prebuf_active(s->memblockq) ||
1254         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1255         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1256 }
1257
1258 /* Called from main context */
1259 static void playback_stream_send_killed(playback_stream *p) {
1260     pa_tagstruct *t;
1261     playback_stream_assert_ref(p);
1262
1263     t = pa_tagstruct_new(NULL, 0);
1264     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1265     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1266     pa_tagstruct_putu32(t, p->index);
1267     pa_pstream_send_tagstruct(p->connection->pstream, t);
1268 }
1269
1270 /* Called from main context */
1271 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1272     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1273     pa_native_connection_assert_ref(c);
1274
1275     if (!c->protocol)
1276         return -1;
1277
1278     switch (code) {
1279
1280         case CONNECTION_MESSAGE_REVOKE:
1281             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1282             break;
1283
1284         case CONNECTION_MESSAGE_RELEASE:
1285             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1286             break;
1287     }
1288
1289     return 0;
1290 }
1291
1292 /* Called from main context */
1293 static void native_connection_unlink(pa_native_connection *c) {
1294     record_stream *r;
1295     output_stream *o;
1296
1297     pa_assert(c);
1298
1299     if (!c->protocol)
1300         return;
1301
1302     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1303
1304     if (c->options)
1305         pa_native_options_unref(c->options);
1306
1307     while ((r = pa_idxset_first(c->record_streams, NULL)))
1308         record_stream_unlink(r);
1309
1310     while ((o = pa_idxset_first(c->output_streams, NULL)))
1311         if (playback_stream_isinstance(o))
1312             playback_stream_unlink(PLAYBACK_STREAM(o));
1313         else
1314             upload_stream_unlink(UPLOAD_STREAM(o));
1315
1316     if (c->subscription)
1317         pa_subscription_free(c->subscription);
1318
1319     if (c->pstream)
1320         pa_pstream_unlink(c->pstream);
1321
1322     if (c->auth_timeout_event) {
1323         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1324         c->auth_timeout_event = NULL;
1325     }
1326
1327     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1328     c->protocol = NULL;
1329     pa_native_connection_unref(c);
1330 }
1331
1332 /* Called from main context */
1333 static void native_connection_free(pa_object *o) {
1334     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1335
1336     pa_assert(c);
1337
1338     native_connection_unlink(c);
1339
1340     pa_idxset_free(c->record_streams, NULL, NULL);
1341     pa_idxset_free(c->output_streams, NULL, NULL);
1342
1343     pa_pdispatch_unref(c->pdispatch);
1344     pa_pstream_unref(c->pstream);
1345     pa_client_free(c->client);
1346
1347     pa_xfree(c);
1348 }
1349
1350 /* Called from main context */
1351 static void native_connection_send_memblock(pa_native_connection *c) {
1352     uint32_t start;
1353     record_stream *r;
1354
1355     start = PA_IDXSET_INVALID;
1356     for (;;) {
1357         pa_memchunk chunk;
1358
1359         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1360             return;
1361
1362         if (start == PA_IDXSET_INVALID)
1363             start = c->rrobin_index;
1364         else if (start == c->rrobin_index)
1365             return;
1366
1367         if (pa_memblockq_peek(r->memblockq, &chunk) >= 0) {
1368             pa_memchunk schunk = chunk;
1369
1370             if (schunk.length > r->buffer_attr.fragsize)
1371                 schunk.length = r->buffer_attr.fragsize;
1372
1373             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1374
1375             pa_memblockq_drop(r->memblockq, schunk.length);
1376             pa_memblock_unref(schunk.memblock);
1377
1378             return;
1379         }
1380     }
1381 }
1382
1383 /*** sink input callbacks ***/
1384
1385 /* Called from thread context */
1386 static void handle_seek(playback_stream *s, int64_t indexw) {
1387     playback_stream_assert_ref(s);
1388
1389 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1390
1391     if (s->sink_input->thread_info.underrun_for > 0) {
1392
1393 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1394
1395         if (pa_memblockq_is_readable(s->memblockq)) {
1396
1397             /* We just ended an underrun, let's ask the sink
1398              * for a complete rewind rewrite */
1399
1400             pa_log_debug("Requesting rewind due to end of underrun.");
1401             pa_sink_input_request_rewind(s->sink_input,
1402                                          (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 :
1403                                                    s->sink_input->thread_info.underrun_for),
1404                                          FALSE, TRUE, FALSE);
1405         }
1406
1407     } else {
1408         int64_t indexr;
1409
1410         indexr = pa_memblockq_get_read_index(s->memblockq);
1411
1412         if (indexw < indexr) {
1413             /* OK, the sink already asked for this data, so
1414              * let's have it ask us again */
1415
1416             pa_log_debug("Requesting rewind due to rewrite.");
1417             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1418         }
1419     }
1420
1421     playback_stream_request_bytes(s);
1422 }
1423
1424 static void flush_write_no_account(pa_memblockq *q) {
1425     pa_memblockq_flush_write(q, FALSE);
1426 }
1427
1428 /* Called from thread context */
1429 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1430     pa_sink_input *i = PA_SINK_INPUT(o);
1431     playback_stream *s;
1432
1433     pa_sink_input_assert_ref(i);
1434     s = PLAYBACK_STREAM(i->userdata);
1435     playback_stream_assert_ref(s);
1436
1437     switch (code) {
1438
1439         case SINK_INPUT_MESSAGE_SEEK:
1440         case SINK_INPUT_MESSAGE_POST_DATA: {
1441             int64_t windex = pa_memblockq_get_write_index(s->memblockq);
1442
1443             if (code == SINK_INPUT_MESSAGE_SEEK) {
1444                 /* The client side is incapable of accounting correctly
1445                  * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1446                  * able to deal with that. */
1447
1448                 pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1449                 windex = PA_MIN(windex, pa_memblockq_get_write_index(s->memblockq));
1450             }
1451
1452             if (chunk && pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1453                 if (pa_log_ratelimit(PA_LOG_WARN))
1454                     pa_log_warn("Failed to push data into queue");
1455                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1456                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1457             }
1458
1459             /* If more data is in queue, we rewind later instead. */
1460             if (s->seek_windex != -1)
1461                 windex = PA_MIN(windex, s->seek_windex);
1462             if (pa_atomic_dec(&s->seek_or_post_in_queue) > 1)
1463                 s->seek_windex = windex;
1464             else {
1465                 s->seek_windex = -1;
1466                 handle_seek(s, windex);
1467             }
1468             return 0;
1469         }
1470
1471         case SINK_INPUT_MESSAGE_DRAIN:
1472         case SINK_INPUT_MESSAGE_FLUSH:
1473         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1474         case SINK_INPUT_MESSAGE_TRIGGER: {
1475
1476             int64_t windex;
1477             pa_sink_input *isync;
1478             void (*func)(pa_memblockq *bq);
1479
1480             switch (code) {
1481                 case SINK_INPUT_MESSAGE_FLUSH:
1482                     func = flush_write_no_account;
1483                     break;
1484
1485                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1486                     func = pa_memblockq_prebuf_force;
1487                     break;
1488
1489                 case SINK_INPUT_MESSAGE_DRAIN:
1490                 case SINK_INPUT_MESSAGE_TRIGGER:
1491                     func = pa_memblockq_prebuf_disable;
1492                     break;
1493
1494                 default:
1495                     pa_assert_not_reached();
1496             }
1497
1498             windex = pa_memblockq_get_write_index(s->memblockq);
1499             func(s->memblockq);
1500             handle_seek(s, windex);
1501
1502             /* Do the same for all other members in the sync group */
1503             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1504                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1505                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1506                 func(ssync->memblockq);
1507                 handle_seek(ssync, windex);
1508             }
1509
1510             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1511                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1512                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1513                 func(ssync->memblockq);
1514                 handle_seek(ssync, windex);
1515             }
1516
1517             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1518                 if (!pa_memblockq_is_readable(s->memblockq))
1519                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1520                 else {
1521                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1522                     s->drain_request = TRUE;
1523                 }
1524             }
1525
1526             return 0;
1527         }
1528
1529         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1530             /* Atomically get a snapshot of all timing parameters... */
1531             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1532             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1533             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1534             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1535             s->underrun_for = s->sink_input->thread_info.underrun_for;
1536             s->playing_for = s->sink_input->thread_info.playing_for;
1537
1538             return 0;
1539
1540         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1541             int64_t windex;
1542
1543             windex = pa_memblockq_get_write_index(s->memblockq);
1544
1545             pa_memblockq_prebuf_force(s->memblockq);
1546
1547             handle_seek(s, windex);
1548
1549             /* Fall through to the default handler */
1550             break;
1551         }
1552
1553         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1554             pa_usec_t *r = userdata;
1555
1556             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1557
1558             /* Fall through, the default handler will add in the extra
1559              * latency added by the resampler */
1560             break;
1561         }
1562
1563         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1564             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1565             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1566             return 0;
1567         }
1568     }
1569
1570     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1571 }
1572
1573 /* Called from thread context */
1574 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1575     playback_stream *s;
1576
1577     pa_sink_input_assert_ref(i);
1578     s = PLAYBACK_STREAM(i->userdata);
1579     playback_stream_assert_ref(s);
1580     pa_assert(chunk);
1581
1582 #ifdef PROTOCOL_NATIVE_DEBUG
1583     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq));
1584 #endif
1585
1586     if (pa_memblockq_is_readable(s->memblockq))
1587         s->is_underrun = FALSE;
1588     else {
1589         if (!s->is_underrun)
1590             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1591
1592         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1593             s->drain_request = FALSE;
1594             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1595         } else if (!s->is_underrun)
1596             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
1597
1598         s->is_underrun = TRUE;
1599
1600         playback_stream_request_bytes(s);
1601     }
1602
1603     /* This call will not fail with prebuf=0, hence we check for
1604        underrun explicitly above */
1605     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1606         return -1;
1607
1608     chunk->length = PA_MIN(nbytes, chunk->length);
1609
1610     if (i->thread_info.underrun_for > 0)
1611         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1612
1613     pa_memblockq_drop(s->memblockq, chunk->length);
1614     playback_stream_request_bytes(s);
1615
1616     return 0;
1617 }
1618
1619 /* Called from thread context */
1620 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1621     playback_stream *s;
1622
1623     pa_sink_input_assert_ref(i);
1624     s = PLAYBACK_STREAM(i->userdata);
1625     playback_stream_assert_ref(s);
1626
1627     /* If we are in an underrun, then we don't rewind */
1628     if (i->thread_info.underrun_for > 0)
1629         return;
1630
1631     pa_memblockq_rewind(s->memblockq, nbytes);
1632 }
1633
1634 /* Called from thread context */
1635 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1636     playback_stream *s;
1637
1638     pa_sink_input_assert_ref(i);
1639     s = PLAYBACK_STREAM(i->userdata);
1640     playback_stream_assert_ref(s);
1641
1642     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1643 }
1644
1645 /* Called from thread context */
1646 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1647     playback_stream *s;
1648     size_t new_tlength, old_tlength;
1649
1650     pa_sink_input_assert_ref(i);
1651     s = PLAYBACK_STREAM(i->userdata);
1652     playback_stream_assert_ref(s);
1653
1654     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1655     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1656
1657     if (old_tlength < new_tlength) {
1658         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1659         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1660         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1661
1662         if (new_tlength == old_tlength)
1663             pa_log_debug("Failed to increase tlength");
1664         else {
1665             pa_log_debug("Notifying client about increased tlength");
1666             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1667         }
1668     }
1669 }
1670
1671 /* Called from main context */
1672 static void sink_input_kill_cb(pa_sink_input *i) {
1673     playback_stream *s;
1674
1675     pa_sink_input_assert_ref(i);
1676     s = PLAYBACK_STREAM(i->userdata);
1677     playback_stream_assert_ref(s);
1678
1679     playback_stream_send_killed(s);
1680     playback_stream_unlink(s);
1681 }
1682
1683 /* Called from main context */
1684 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1685     playback_stream *s;
1686     pa_tagstruct *t;
1687
1688     pa_sink_input_assert_ref(i);
1689     s = PLAYBACK_STREAM(i->userdata);
1690     playback_stream_assert_ref(s);
1691
1692     if (s->connection->version < 15)
1693       return;
1694
1695     t = pa_tagstruct_new(NULL, 0);
1696     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1697     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1698     pa_tagstruct_putu32(t, s->index);
1699     pa_tagstruct_puts(t, event);
1700     pa_tagstruct_put_proplist(t, pl);
1701     pa_pstream_send_tagstruct(s->connection->pstream, t);
1702 }
1703
1704 /* Called from main context */
1705 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1706     playback_stream *s;
1707     pa_tagstruct *t;
1708
1709     pa_sink_input_assert_ref(i);
1710     s = PLAYBACK_STREAM(i->userdata);
1711     playback_stream_assert_ref(s);
1712
1713     if (s->connection->version < 12)
1714       return;
1715
1716     t = pa_tagstruct_new(NULL, 0);
1717     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1718     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1719     pa_tagstruct_putu32(t, s->index);
1720     pa_tagstruct_put_boolean(t, suspend);
1721     pa_pstream_send_tagstruct(s->connection->pstream, t);
1722 }
1723
1724 /* Called from main context */
1725 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1726     playback_stream *s;
1727     pa_tagstruct *t;
1728
1729     pa_sink_input_assert_ref(i);
1730     s = PLAYBACK_STREAM(i->userdata);
1731     playback_stream_assert_ref(s);
1732
1733     if (!dest)
1734         return;
1735
1736     fix_playback_buffer_attr(s);
1737     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1738     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1739
1740     if (s->connection->version < 12)
1741       return;
1742
1743     t = pa_tagstruct_new(NULL, 0);
1744     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1745     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1746     pa_tagstruct_putu32(t, s->index);
1747     pa_tagstruct_putu32(t, dest->index);
1748     pa_tagstruct_puts(t, dest->name);
1749     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1750
1751     if (s->connection->version >= 13) {
1752         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1753         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1754         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1755         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1756         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1757     }
1758
1759     pa_pstream_send_tagstruct(s->connection->pstream, t);
1760 }
1761
1762 /*** source_output callbacks ***/
1763
1764 /* Called from thread context */
1765 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1766     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1767     record_stream *s;
1768
1769     pa_source_output_assert_ref(o);
1770     s = RECORD_STREAM(o->userdata);
1771     record_stream_assert_ref(s);
1772
1773     switch (code) {
1774         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1775             /* Atomically get a snapshot of all timing parameters... */
1776             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1777             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1778             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1779             return 0;
1780     }
1781
1782     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1783 }
1784
1785 /* Called from thread context */
1786 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1787     record_stream *s;
1788
1789     pa_source_output_assert_ref(o);
1790     s = RECORD_STREAM(o->userdata);
1791     record_stream_assert_ref(s);
1792     pa_assert(chunk);
1793
1794     pa_atomic_add(&s->on_the_fly, chunk->length);
1795     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1796 }
1797
1798 static void source_output_kill_cb(pa_source_output *o) {
1799     record_stream *s;
1800
1801     pa_source_output_assert_ref(o);
1802     s = RECORD_STREAM(o->userdata);
1803     record_stream_assert_ref(s);
1804
1805     record_stream_send_killed(s);
1806     record_stream_unlink(s);
1807 }
1808
1809 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1810     record_stream *s;
1811
1812     pa_source_output_assert_ref(o);
1813     s = RECORD_STREAM(o->userdata);
1814     record_stream_assert_ref(s);
1815
1816     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1817
1818     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1819 }
1820
1821 /* Called from main context */
1822 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1823     record_stream *s;
1824     pa_tagstruct *t;
1825
1826     pa_source_output_assert_ref(o);
1827     s = RECORD_STREAM(o->userdata);
1828     record_stream_assert_ref(s);
1829
1830     if (s->connection->version < 15)
1831       return;
1832
1833     t = pa_tagstruct_new(NULL, 0);
1834     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1835     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1836     pa_tagstruct_putu32(t, s->index);
1837     pa_tagstruct_puts(t, event);
1838     pa_tagstruct_put_proplist(t, pl);
1839     pa_pstream_send_tagstruct(s->connection->pstream, t);
1840 }
1841
1842 /* Called from main context */
1843 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1844     record_stream *s;
1845     pa_tagstruct *t;
1846
1847     pa_source_output_assert_ref(o);
1848     s = RECORD_STREAM(o->userdata);
1849     record_stream_assert_ref(s);
1850
1851     if (s->connection->version < 12)
1852       return;
1853
1854     t = pa_tagstruct_new(NULL, 0);
1855     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1856     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1857     pa_tagstruct_putu32(t, s->index);
1858     pa_tagstruct_put_boolean(t, suspend);
1859     pa_pstream_send_tagstruct(s->connection->pstream, t);
1860 }
1861
1862 /* Called from main context */
1863 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1864     record_stream *s;
1865     pa_tagstruct *t;
1866
1867     pa_source_output_assert_ref(o);
1868     s = RECORD_STREAM(o->userdata);
1869     record_stream_assert_ref(s);
1870
1871     if (!dest)
1872         return;
1873
1874     fix_record_buffer_attr_pre(s);
1875     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1876     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1877     fix_record_buffer_attr_post(s);
1878
1879     if (s->connection->version < 12)
1880       return;
1881
1882     t = pa_tagstruct_new(NULL, 0);
1883     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1884     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1885     pa_tagstruct_putu32(t, s->index);
1886     pa_tagstruct_putu32(t, dest->index);
1887     pa_tagstruct_puts(t, dest->name);
1888     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1889
1890     if (s->connection->version >= 13) {
1891         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1892         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1893         pa_tagstruct_put_usec(t, s->configured_source_latency);
1894     }
1895
1896     pa_pstream_send_tagstruct(s->connection->pstream, t);
1897 }
1898
1899 /*** pdispatch callbacks ***/
1900
1901 static void protocol_error(pa_native_connection *c) {
1902     pa_log("protocol error, kicking client");
1903     native_connection_unlink(c);
1904 }
1905
1906 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1907 if (!(expression)) { \
1908     pa_pstream_send_error((pstream), (tag), (error)); \
1909     return; \
1910 } \
1911 } while(0);
1912
1913 #define CHECK_VALIDITY_GOTO(pstream, expression, tag, error, label) do { \
1914 if (!(expression)) { \
1915     pa_pstream_send_error((pstream), (tag), (error)); \
1916     goto label; \
1917 } \
1918 } while(0);
1919
1920 static pa_tagstruct *reply_new(uint32_t tag) {
1921     pa_tagstruct *reply;
1922
1923     reply = pa_tagstruct_new(NULL, 0);
1924     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1925     pa_tagstruct_putu32(reply, tag);
1926     return reply;
1927 }
1928
1929 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1930     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1931     playback_stream *s;
1932     uint32_t sink_index, syncid, missing = 0;
1933     pa_buffer_attr attr;
1934     const char *name = NULL, *sink_name;
1935     pa_sample_spec ss;
1936     pa_channel_map map;
1937     pa_tagstruct *reply;
1938     pa_sink *sink = NULL;
1939     pa_cvolume volume;
1940     pa_bool_t
1941         corked = FALSE,
1942         no_remap = FALSE,
1943         no_remix = FALSE,
1944         fix_format = FALSE,
1945         fix_rate = FALSE,
1946         fix_channels = FALSE,
1947         no_move = FALSE,
1948         variable_rate = FALSE,
1949         muted = FALSE,
1950         adjust_latency = FALSE,
1951         early_requests = FALSE,
1952         dont_inhibit_auto_suspend = FALSE,
1953         volume_set = TRUE,
1954         muted_set = FALSE,
1955         fail_on_suspend = FALSE,
1956         relative_volume = FALSE,
1957         passthrough = FALSE;
1958
1959     pa_sink_input_flags_t flags = 0;
1960     pa_proplist *p = NULL;
1961     int ret = PA_ERR_INVALID;
1962     uint8_t n_formats = 0;
1963     pa_format_info *format;
1964     pa_idxset *formats = NULL;
1965     uint32_t i;
1966
1967     pa_native_connection_assert_ref(c);
1968     pa_assert(t);
1969     memset(&attr, 0, sizeof(attr));
1970
1971     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1972         pa_tagstruct_get(
1973                 t,
1974                 PA_TAG_SAMPLE_SPEC, &ss,
1975                 PA_TAG_CHANNEL_MAP, &map,
1976                 PA_TAG_U32, &sink_index,
1977                 PA_TAG_STRING, &sink_name,
1978                 PA_TAG_U32, &attr.maxlength,
1979                 PA_TAG_BOOLEAN, &corked,
1980                 PA_TAG_U32, &attr.tlength,
1981                 PA_TAG_U32, &attr.prebuf,
1982                 PA_TAG_U32, &attr.minreq,
1983                 PA_TAG_U32, &syncid,
1984                 PA_TAG_CVOLUME, &volume,
1985                 PA_TAG_INVALID) < 0) {
1986
1987         protocol_error(c);
1988         goto finish;
1989     }
1990
1991     CHECK_VALIDITY_GOTO(c->pstream, c->authorized, tag, PA_ERR_ACCESS, finish);
1992     CHECK_VALIDITY_GOTO(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID, finish);
1993     CHECK_VALIDITY_GOTO(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID, finish);
1994     CHECK_VALIDITY_GOTO(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID, finish);
1995     CHECK_VALIDITY_GOTO(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID, finish);
1996
1997     p = pa_proplist_new();
1998
1999     if (name)
2000         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2001
2002     if (c->version >= 12) {
2003         /* Since 0.9.8 the user can ask for a couple of additional flags */
2004
2005         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2006             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2007             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2008             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2009             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2010             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2011             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2012
2013             protocol_error(c);
2014             goto finish;
2015         }
2016     }
2017
2018     if (c->version >= 13) {
2019
2020         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
2021             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2022             pa_tagstruct_get_proplist(t, p) < 0) {
2023
2024             protocol_error(c);
2025             goto finish;
2026         }
2027     }
2028
2029     if (c->version >= 14) {
2030
2031         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
2032             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2033
2034             protocol_error(c);
2035             goto finish;
2036         }
2037     }
2038
2039     if (c->version >= 15) {
2040
2041         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
2042             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2043             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2044
2045             protocol_error(c);
2046             goto finish;
2047         }
2048     }
2049
2050     if (c->version >= 17) {
2051
2052         if (pa_tagstruct_get_boolean(t, &relative_volume) < 0) {
2053
2054             protocol_error(c);
2055             goto finish;
2056         }
2057     }
2058
2059     if (c->version >= 18) {
2060
2061         if (pa_tagstruct_get_boolean(t, &passthrough) < 0 ) {
2062             protocol_error(c);
2063             goto finish;
2064         }
2065     }
2066
2067     if (c->version >= 21) {
2068
2069         if (pa_tagstruct_getu8(t, &n_formats) < 0) {
2070             protocol_error(c);
2071             goto finish;
2072         }
2073
2074         if (n_formats)
2075             formats = pa_idxset_new(NULL, NULL);
2076
2077         for (i = 0; i < n_formats; i++) {
2078             format = pa_format_info_new();
2079             if (pa_tagstruct_get_format_info(t, format) < 0) {
2080                 protocol_error(c);
2081                 goto finish;
2082             }
2083             pa_idxset_put(formats, format, NULL);
2084         }
2085     }
2086
2087     if (n_formats == 0) {
2088         CHECK_VALIDITY_GOTO(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID, finish);
2089         CHECK_VALIDITY_GOTO(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID, finish);
2090         CHECK_VALIDITY_GOTO(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID, finish);
2091     } else {
2092         PA_IDXSET_FOREACH(format, formats, i) {
2093             CHECK_VALIDITY_GOTO(c->pstream, pa_format_info_valid(format), tag, PA_ERR_INVALID, finish);
2094         }
2095     }
2096
2097     if (!pa_tagstruct_eof(t)) {
2098         protocol_error(c);
2099         goto finish;
2100     }
2101
2102     if (sink_index != PA_INVALID_INDEX) {
2103
2104         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
2105             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2106             goto finish;
2107         }
2108
2109     } else if (sink_name) {
2110
2111         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
2112             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2113             goto finish;
2114         }
2115     }
2116
2117     flags =
2118         (corked ? PA_SINK_INPUT_START_CORKED : 0) |
2119         (no_remap ? PA_SINK_INPUT_NO_REMAP : 0) |
2120         (no_remix ? PA_SINK_INPUT_NO_REMIX : 0) |
2121         (fix_format ? PA_SINK_INPUT_FIX_FORMAT : 0) |
2122         (fix_rate ? PA_SINK_INPUT_FIX_RATE : 0) |
2123         (fix_channels ? PA_SINK_INPUT_FIX_CHANNELS : 0) |
2124         (no_move ? PA_SINK_INPUT_DONT_MOVE : 0) |
2125         (variable_rate ? PA_SINK_INPUT_VARIABLE_RATE : 0) |
2126         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2127         (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0) |
2128         (passthrough ? PA_SINK_INPUT_PASSTHROUGH : 0);
2129
2130     /* Only since protocol version 15 there's a separate muted_set
2131      * flag. For older versions we synthesize it here */
2132     muted_set = muted_set || muted;
2133
2134     s = playback_stream_new(c, sink, &ss, &map, formats, &attr, volume_set ? &volume : NULL, muted, muted_set, flags, p, adjust_latency, early_requests, relative_volume, syncid, &missing, &ret);
2135     /* We no longer own the formats idxset */
2136     formats = NULL;
2137
2138     CHECK_VALIDITY_GOTO(c->pstream, s, tag, ret, finish);
2139
2140     reply = reply_new(tag);
2141     pa_tagstruct_putu32(reply, s->index);
2142     pa_assert(s->sink_input);
2143     pa_tagstruct_putu32(reply, s->sink_input->index);
2144     pa_tagstruct_putu32(reply, missing);
2145
2146 #ifdef PROTOCOL_NATIVE_DEBUG
2147     pa_log("initial request is %u", missing);
2148 #endif
2149
2150     if (c->version >= 9) {
2151         /* Since 0.9.0 we support sending the buffer metrics back to the client */
2152
2153         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2154         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
2155         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
2156         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
2157     }
2158
2159     if (c->version >= 12) {
2160         /* Since 0.9.8 we support sending the chosen sample
2161          * spec/channel map/device/suspend status back to the
2162          * client */
2163
2164         pa_tagstruct_put_sample_spec(reply, &ss);
2165         pa_tagstruct_put_channel_map(reply, &map);
2166
2167         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
2168         pa_tagstruct_puts(reply, s->sink_input->sink->name);
2169
2170         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2171     }
2172
2173     if (c->version >= 13)
2174         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2175
2176     if (c->version >= 21) {
2177         /* Send back the format we negotiated */
2178         if (s->sink_input->format)
2179             pa_tagstruct_put_format_info(reply, s->sink_input->format);
2180         else {
2181             pa_format_info *f = pa_format_info_new();
2182             pa_tagstruct_put_format_info(reply, f);
2183             pa_format_info_free(f);
2184         }
2185     }
2186
2187     pa_pstream_send_tagstruct(c->pstream, reply);
2188
2189 finish:
2190     if (p)
2191         pa_proplist_free(p);
2192     if (formats)
2193         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
2194 }
2195
2196 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2197     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2198     uint32_t channel;
2199
2200     pa_native_connection_assert_ref(c);
2201     pa_assert(t);
2202
2203     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2204         !pa_tagstruct_eof(t)) {
2205         protocol_error(c);
2206         return;
2207     }
2208
2209     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2210
2211     switch (command) {
2212
2213         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2214             playback_stream *s;
2215             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2216                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2217                 return;
2218             }
2219
2220             playback_stream_unlink(s);
2221             break;
2222         }
2223
2224         case PA_COMMAND_DELETE_RECORD_STREAM: {
2225             record_stream *s;
2226             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2227                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2228                 return;
2229             }
2230
2231             record_stream_unlink(s);
2232             break;
2233         }
2234
2235         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2236             upload_stream *s;
2237
2238             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2239                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2240                 return;
2241             }
2242
2243             upload_stream_unlink(s);
2244             break;
2245         }
2246
2247         default:
2248             pa_assert_not_reached();
2249     }
2250
2251     pa_pstream_send_simple_ack(c->pstream, tag);
2252 }
2253
2254 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2255     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2256     record_stream *s;
2257     pa_buffer_attr attr;
2258     uint32_t source_index;
2259     const char *name = NULL, *source_name;
2260     pa_sample_spec ss;
2261     pa_channel_map map;
2262     pa_tagstruct *reply;
2263     pa_source *source = NULL;
2264     pa_cvolume volume;
2265     pa_bool_t
2266         corked = FALSE,
2267         no_remap = FALSE,
2268         no_remix = FALSE,
2269         fix_format = FALSE,
2270         fix_rate = FALSE,
2271         fix_channels = FALSE,
2272         no_move = FALSE,
2273         variable_rate = FALSE,
2274         muted = FALSE,
2275         adjust_latency = FALSE,
2276         peak_detect = FALSE,
2277         early_requests = FALSE,
2278         dont_inhibit_auto_suspend = FALSE,
2279         volume_set = FALSE,
2280         muted_set = FALSE,
2281         fail_on_suspend = FALSE,
2282         relative_volume = FALSE,
2283         passthrough = FALSE;
2284
2285     pa_source_output_flags_t flags = 0;
2286     pa_proplist *p = NULL;
2287     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2288     pa_sink_input *direct_on_input = NULL;
2289     int ret = PA_ERR_INVALID;
2290     uint8_t n_formats = 0;
2291     pa_format_info *format;
2292     pa_idxset *formats = NULL;
2293     uint32_t i;
2294
2295     pa_native_connection_assert_ref(c);
2296     pa_assert(t);
2297
2298     memset(&attr, 0, sizeof(attr));
2299
2300     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2301         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2302         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2303         pa_tagstruct_getu32(t, &source_index) < 0 ||
2304         pa_tagstruct_gets(t, &source_name) < 0 ||
2305         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2306         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2307         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2308
2309         protocol_error(c);
2310         goto finish;
2311     }
2312
2313     CHECK_VALIDITY_GOTO(c->pstream, c->authorized, tag, PA_ERR_ACCESS, finish);
2314     CHECK_VALIDITY_GOTO(c->pstream, !source_name || pa_namereg_is_valid_name_or_wildcard(source_name, PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID, finish);
2315     CHECK_VALIDITY_GOTO(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID, finish);
2316     CHECK_VALIDITY_GOTO(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID, finish);
2317
2318     p = pa_proplist_new();
2319
2320     if (name)
2321         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2322
2323     if (c->version >= 12) {
2324         /* Since 0.9.8 the user can ask for a couple of additional flags */
2325
2326         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2327             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2328             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2329             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2330             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2331             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2332             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2333
2334             protocol_error(c);
2335             goto finish;
2336         }
2337     }
2338
2339     if (c->version >= 13) {
2340
2341         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2342             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2343             pa_tagstruct_get_proplist(t, p) < 0 ||
2344             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2345
2346             protocol_error(c);
2347             goto finish;
2348         }
2349     }
2350
2351     if (c->version >= 14) {
2352
2353         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2354             protocol_error(c);
2355             goto finish;
2356         }
2357     }
2358
2359     if (c->version >= 15) {
2360
2361         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2362             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2363
2364             protocol_error(c);
2365             goto finish;
2366         }
2367     }
2368
2369     if (c->version >= 22) {
2370         /* For newer client versions (with per-source-output volumes), we try
2371          * to make the behaviour for playback and record streams the same. */
2372         volume_set = TRUE;
2373
2374         if (pa_tagstruct_getu8(t, &n_formats) < 0) {
2375             protocol_error(c);
2376             goto finish;
2377         }
2378
2379         if (n_formats)
2380             formats = pa_idxset_new(NULL, NULL);
2381
2382         for (i = 0; i < n_formats; i++) {
2383             format = pa_format_info_new();
2384             if (pa_tagstruct_get_format_info(t, format) < 0) {
2385                 protocol_error(c);
2386                 goto finish;
2387             }
2388             pa_idxset_put(formats, format, NULL);
2389         }
2390
2391         if (pa_tagstruct_get_cvolume(t, &volume) < 0 ||
2392             pa_tagstruct_get_boolean(t, &muted) < 0 ||
2393             pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
2394             pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
2395             pa_tagstruct_get_boolean(t, &relative_volume) < 0 ||
2396             pa_tagstruct_get_boolean(t, &passthrough) < 0) {
2397
2398             protocol_error(c);
2399             goto finish;
2400         }
2401
2402         CHECK_VALIDITY_GOTO(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID, finish);
2403     }
2404
2405     if (n_formats == 0) {
2406         CHECK_VALIDITY_GOTO(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID, finish);
2407         CHECK_VALIDITY_GOTO(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID, finish);
2408         CHECK_VALIDITY_GOTO(c->pstream, c->version < 22 || (volume.channels == ss.channels), tag, PA_ERR_INVALID, finish);
2409         CHECK_VALIDITY_GOTO(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID, finish);
2410     } else {
2411         PA_IDXSET_FOREACH(format, formats, i) {
2412             CHECK_VALIDITY_GOTO(c->pstream, pa_format_info_valid(format), tag, PA_ERR_INVALID, finish);
2413         }
2414     }
2415
2416
2417     if (!pa_tagstruct_eof(t)) {
2418         protocol_error(c);
2419         goto finish;
2420     }
2421
2422     if (source_index != PA_INVALID_INDEX) {
2423
2424         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2425             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2426             goto finish;
2427         }
2428
2429     } else if (source_name) {
2430
2431         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2432             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2433             goto finish;
2434         }
2435     }
2436
2437     if (direct_on_input_idx != PA_INVALID_INDEX) {
2438
2439         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2440             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2441             goto finish;
2442         }
2443     }
2444
2445     flags =
2446         (corked ? PA_SOURCE_OUTPUT_START_CORKED : 0) |
2447         (no_remap ? PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2448         (no_remix ? PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2449         (fix_format ? PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2450         (fix_rate ? PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2451         (fix_channels ? PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2452         (no_move ? PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2453         (variable_rate ? PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2454         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2455         (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0) |
2456         (passthrough ? PA_SOURCE_OUTPUT_PASSTHROUGH : 0);
2457
2458     s = record_stream_new(c, source, &ss, &map, formats, &attr, volume_set ? &volume : NULL, muted, muted_set, flags, p, adjust_latency, early_requests, relative_volume, peak_detect, direct_on_input, &ret);
2459
2460     CHECK_VALIDITY_GOTO(c->pstream, s, tag, ret, finish);
2461
2462     reply = reply_new(tag);
2463     pa_tagstruct_putu32(reply, s->index);
2464     pa_assert(s->source_output);
2465     pa_tagstruct_putu32(reply, s->source_output->index);
2466
2467     if (c->version >= 9) {
2468         /* Since 0.9 we support sending the buffer metrics back to the client */
2469
2470         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2471         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2472     }
2473
2474     if (c->version >= 12) {
2475         /* Since 0.9.8 we support sending the chosen sample
2476          * spec/channel map/device/suspend status back to the
2477          * client */
2478
2479         pa_tagstruct_put_sample_spec(reply, &ss);
2480         pa_tagstruct_put_channel_map(reply, &map);
2481
2482         pa_tagstruct_putu32(reply, s->source_output->source->index);
2483         pa_tagstruct_puts(reply, s->source_output->source->name);
2484
2485         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2486     }
2487
2488     if (c->version >= 13)
2489         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2490
2491     if (c->version >= 22) {
2492         /* Send back the format we negotiated */
2493         if (s->source_output->format)
2494             pa_tagstruct_put_format_info(reply, s->source_output->format);
2495         else {
2496             pa_format_info *f = pa_format_info_new();
2497             pa_tagstruct_put_format_info(reply, f);
2498             pa_format_info_free(f);
2499         }
2500     }
2501
2502     pa_pstream_send_tagstruct(c->pstream, reply);
2503
2504 finish:
2505     if (p)
2506         pa_proplist_free(p);
2507     if (formats)
2508         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
2509 }
2510
2511 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2512     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2513     int ret;
2514
2515     pa_native_connection_assert_ref(c);
2516     pa_assert(t);
2517
2518     if (!pa_tagstruct_eof(t)) {
2519         protocol_error(c);
2520         return;
2521     }
2522
2523     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2524     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2525     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2526
2527     pa_log_debug("Client %s asks us to terminate.", pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY)));
2528
2529     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2530 }
2531
2532 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2533     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2534     const void*cookie;
2535     pa_tagstruct *reply;
2536     pa_bool_t shm_on_remote = FALSE, do_shm;
2537
2538     pa_native_connection_assert_ref(c);
2539     pa_assert(t);
2540
2541     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2542         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2543         !pa_tagstruct_eof(t)) {
2544         protocol_error(c);
2545         return;
2546     }
2547
2548     /* Minimum supported version */
2549     if (c->version < 8) {
2550         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2551         return;
2552     }
2553
2554     /* Starting with protocol version 13 the MSB of the version tag
2555        reflects if shm is available for this pa_native_connection or
2556        not. */
2557     if (c->version >= 13) {
2558         shm_on_remote = !!(c->version & 0x80000000U);
2559         c->version &= 0x7FFFFFFFU;
2560     }
2561
2562     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2563
2564     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2565
2566     if (!c->authorized) {
2567         pa_bool_t success = FALSE;
2568
2569 #ifdef HAVE_CREDS
2570         const pa_creds *creds;
2571
2572         if ((creds = pa_pdispatch_creds(pd))) {
2573             if (creds->uid == getuid())
2574                 success = TRUE;
2575             else if (c->options->auth_group) {
2576                 int r;
2577                 gid_t gid;
2578
2579                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2580                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2581                 else if (gid == creds->gid)
2582                     success = TRUE;
2583
2584                 if (!success) {
2585                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2586                         pa_log_warn("Failed to check group membership.");
2587                     else if (r > 0)
2588                         success = TRUE;
2589                 }
2590             }
2591
2592             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2593                         (unsigned long) creds->uid,
2594                         (unsigned long) creds->gid,
2595                         (int) success);
2596         }
2597 #endif
2598
2599         if (!success && c->options->auth_cookie) {
2600             const uint8_t *ac;
2601
2602             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2603                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2604                     success = TRUE;
2605         }
2606
2607         if (!success) {
2608             pa_log_warn("Denied access to client with invalid authorization data.");
2609             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2610             return;
2611         }
2612
2613         c->authorized = TRUE;
2614         if (c->auth_timeout_event) {
2615             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2616             c->auth_timeout_event = NULL;
2617         }
2618     }
2619
2620     /* Enable shared memory support if possible */
2621     do_shm =
2622         pa_mempool_is_shared(c->protocol->core->mempool) &&
2623         c->is_local;
2624
2625     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2626
2627     if (do_shm)
2628         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2629             do_shm = FALSE;
2630
2631 #ifdef HAVE_CREDS
2632     if (do_shm) {
2633         /* Only enable SHM if both sides are owned by the same
2634          * user. This is a security measure because otherwise data
2635          * private to the user might leak. */
2636
2637         const pa_creds *creds;
2638         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2639             do_shm = FALSE;
2640     }
2641 #endif
2642
2643     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2644     pa_pstream_enable_shm(c->pstream, do_shm);
2645
2646     reply = reply_new(tag);
2647     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2648
2649 #ifdef HAVE_CREDS
2650 {
2651     /* SHM support is only enabled after both sides made sure they are the same user. */
2652
2653     pa_creds ucred;
2654
2655     ucred.uid = getuid();
2656     ucred.gid = getgid();
2657
2658     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2659 }
2660 #else
2661     pa_pstream_send_tagstruct(c->pstream, reply);
2662 #endif
2663 }
2664
2665 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2666     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2667     const char *name = NULL;
2668     pa_proplist *p;
2669     pa_tagstruct *reply;
2670
2671     pa_native_connection_assert_ref(c);
2672     pa_assert(t);
2673
2674     p = pa_proplist_new();
2675
2676     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2677         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2678         !pa_tagstruct_eof(t)) {
2679
2680         protocol_error(c);
2681         pa_proplist_free(p);
2682         return;
2683     }
2684
2685     if (name)
2686         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2687             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2688             pa_proplist_free(p);
2689             return;
2690         }
2691
2692     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2693     pa_proplist_free(p);
2694
2695     reply = reply_new(tag);
2696
2697     if (c->version >= 13)
2698         pa_tagstruct_putu32(reply, c->client->index);
2699
2700     pa_pstream_send_tagstruct(c->pstream, reply);
2701 }
2702
2703 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2704     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2705     const char *name;
2706     uint32_t idx = PA_IDXSET_INVALID;
2707
2708     pa_native_connection_assert_ref(c);
2709     pa_assert(t);
2710
2711     if (pa_tagstruct_gets(t, &name) < 0 ||
2712         !pa_tagstruct_eof(t)) {
2713         protocol_error(c);
2714         return;
2715     }
2716
2717     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2718     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_LOOKUP_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
2719
2720     if (command == PA_COMMAND_LOOKUP_SINK) {
2721         pa_sink *sink;
2722         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2723             idx = sink->index;
2724     } else {
2725         pa_source *source;
2726         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2727         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2728             idx = source->index;
2729     }
2730
2731     if (idx == PA_IDXSET_INVALID)
2732         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2733     else {
2734         pa_tagstruct *reply;
2735         reply = reply_new(tag);
2736         pa_tagstruct_putu32(reply, idx);
2737         pa_pstream_send_tagstruct(c->pstream, reply);
2738     }
2739 }
2740
2741 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2742     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2743     uint32_t idx;
2744     playback_stream *s;
2745
2746     pa_native_connection_assert_ref(c);
2747     pa_assert(t);
2748
2749     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2750         !pa_tagstruct_eof(t)) {
2751         protocol_error(c);
2752         return;
2753     }
2754
2755     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2756     s = pa_idxset_get_by_index(c->output_streams, idx);
2757     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2758     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2759
2760     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2761 }
2762
2763 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2764     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2765     pa_tagstruct *reply;
2766     const pa_mempool_stat *stat;
2767
2768     pa_native_connection_assert_ref(c);
2769     pa_assert(t);
2770
2771     if (!pa_tagstruct_eof(t)) {
2772         protocol_error(c);
2773         return;
2774     }
2775
2776     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2777
2778     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2779
2780     reply = reply_new(tag);
2781     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2782     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2783     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2784     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2785     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2786     pa_pstream_send_tagstruct(c->pstream, reply);
2787 }
2788
2789 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2790     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2791     pa_tagstruct *reply;
2792     playback_stream *s;
2793     struct timeval tv, now;
2794     uint32_t idx;
2795
2796     pa_native_connection_assert_ref(c);
2797     pa_assert(t);
2798
2799     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2800         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2801         !pa_tagstruct_eof(t)) {
2802         protocol_error(c);
2803         return;
2804     }
2805
2806     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2807     s = pa_idxset_get_by_index(c->output_streams, idx);
2808     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2809     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2810
2811     /* Get an atomic snapshot of all timing parameters */
2812     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2813
2814     reply = reply_new(tag);
2815     pa_tagstruct_put_usec(reply,
2816                           s->current_sink_latency +
2817                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2818     pa_tagstruct_put_usec(reply, 0);
2819     pa_tagstruct_put_boolean(reply,
2820                              s->playing_for > 0 &&
2821                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2822                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2823     pa_tagstruct_put_timeval(reply, &tv);
2824     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2825     pa_tagstruct_puts64(reply, s->write_index);
2826     pa_tagstruct_puts64(reply, s->read_index);
2827
2828     if (c->version >= 13) {
2829         pa_tagstruct_putu64(reply, s->underrun_for);
2830         pa_tagstruct_putu64(reply, s->playing_for);
2831     }
2832
2833     pa_pstream_send_tagstruct(c->pstream, reply);
2834 }
2835
2836 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2837     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2838     pa_tagstruct *reply;
2839     record_stream *s;
2840     struct timeval tv, now;
2841     uint32_t idx;
2842
2843     pa_native_connection_assert_ref(c);
2844     pa_assert(t);
2845
2846     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2847         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2848         !pa_tagstruct_eof(t)) {
2849         protocol_error(c);
2850         return;
2851     }
2852
2853     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2854     s = pa_idxset_get_by_index(c->record_streams, idx);
2855     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2856
2857     /* Get an atomic snapshot of all timing parameters */
2858     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2859
2860     reply = reply_new(tag);
2861     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2862     pa_tagstruct_put_usec(reply,
2863                           s->current_source_latency +
2864                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->source->sample_spec));
2865     pa_tagstruct_put_boolean(reply,
2866                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2867                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2868     pa_tagstruct_put_timeval(reply, &tv);
2869     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2870     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2871     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2872     pa_pstream_send_tagstruct(c->pstream, reply);
2873 }
2874
2875 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2876     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2877     upload_stream *s;
2878     uint32_t length;
2879     const char *name = NULL;
2880     pa_sample_spec ss;
2881     pa_channel_map map;
2882     pa_tagstruct *reply;
2883     pa_proplist *p;
2884
2885     pa_native_connection_assert_ref(c);
2886     pa_assert(t);
2887
2888     if (pa_tagstruct_gets(t, &name) < 0 ||
2889         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2890         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2891         pa_tagstruct_getu32(t, &length) < 0) {
2892         protocol_error(c);
2893         return;
2894     }
2895
2896     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2897     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2898     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2899     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2900     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2901     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2902
2903     p = pa_proplist_new();
2904
2905     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2906         !pa_tagstruct_eof(t)) {
2907
2908         protocol_error(c);
2909         pa_proplist_free(p);
2910         return;
2911     }
2912
2913     if (c->version < 13)
2914         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2915     else if (!name)
2916         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2917             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2918
2919     if (!name || !pa_namereg_is_valid_name(name)) {
2920         pa_proplist_free(p);
2921         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2922     }
2923
2924     s = upload_stream_new(c, &ss, &map, name, length, p);
2925     pa_proplist_free(p);
2926
2927     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2928
2929     reply = reply_new(tag);
2930     pa_tagstruct_putu32(reply, s->index);
2931     pa_tagstruct_putu32(reply, length);
2932     pa_pstream_send_tagstruct(c->pstream, reply);
2933 }
2934
2935 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2936     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2937     uint32_t channel;
2938     upload_stream *s;
2939     uint32_t idx;
2940
2941     pa_native_connection_assert_ref(c);
2942     pa_assert(t);
2943
2944     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2945         !pa_tagstruct_eof(t)) {
2946         protocol_error(c);
2947         return;
2948     }
2949
2950     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2951
2952     s = pa_idxset_get_by_index(c->output_streams, channel);
2953     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2954     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2955
2956     if (!s->memchunk.memblock)
2957         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2958     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2959         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2960     else
2961         pa_pstream_send_simple_ack(c->pstream, tag);
2962
2963     upload_stream_unlink(s);
2964 }
2965
2966 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2967     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2968     uint32_t sink_index;
2969     pa_volume_t volume;
2970     pa_sink *sink;
2971     const char *name, *sink_name;
2972     uint32_t idx;
2973     pa_proplist *p;
2974     pa_tagstruct *reply;
2975
2976     pa_native_connection_assert_ref(c);
2977     pa_assert(t);
2978
2979     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2980
2981     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2982         pa_tagstruct_gets(t, &sink_name) < 0 ||
2983         pa_tagstruct_getu32(t, &volume) < 0 ||
2984         pa_tagstruct_gets(t, &name) < 0) {
2985         protocol_error(c);
2986         return;
2987     }
2988
2989     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID);
2990     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2991     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2992     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2993
2994     if (sink_index != PA_INVALID_INDEX)
2995         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2996     else
2997         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2998
2999     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3000
3001     p = pa_proplist_new();
3002
3003     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
3004         !pa_tagstruct_eof(t)) {
3005         protocol_error(c);
3006         pa_proplist_free(p);
3007         return;
3008     }
3009
3010     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
3011
3012     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
3013         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3014         pa_proplist_free(p);
3015         return;
3016     }
3017
3018     pa_proplist_free(p);
3019
3020     reply = reply_new(tag);
3021
3022     if (c->version >= 13)
3023         pa_tagstruct_putu32(reply, idx);
3024
3025     pa_pstream_send_tagstruct(c->pstream, reply);
3026 }
3027
3028 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3029     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3030     const char *name;
3031
3032     pa_native_connection_assert_ref(c);
3033     pa_assert(t);
3034
3035     if (pa_tagstruct_gets(t, &name) < 0 ||
3036         !pa_tagstruct_eof(t)) {
3037         protocol_error(c);
3038         return;
3039     }
3040
3041     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3042     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3043
3044     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
3045         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3046         return;
3047     }
3048
3049     pa_pstream_send_simple_ack(c->pstream, tag);
3050 }
3051
3052 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
3053     pa_assert(c);
3054     pa_assert(fixed);
3055     pa_assert(original);
3056
3057     *fixed = *original;
3058
3059     if (c->version < 12) {
3060         /* Before protocol version 12 we didn't support S32 samples,
3061          * so we need to lie about this to the client */
3062
3063         if (fixed->format == PA_SAMPLE_S32LE)
3064             fixed->format = PA_SAMPLE_FLOAT32LE;
3065         if (fixed->format == PA_SAMPLE_S32BE)
3066             fixed->format = PA_SAMPLE_FLOAT32BE;
3067     }
3068
3069     if (c->version < 15) {
3070         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
3071             fixed->format = PA_SAMPLE_FLOAT32LE;
3072         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
3073             fixed->format = PA_SAMPLE_FLOAT32BE;
3074     }
3075 }
3076
3077 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
3078     pa_sample_spec fixed_ss;
3079
3080     pa_assert(t);
3081     pa_sink_assert_ref(sink);
3082
3083     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
3084
3085     pa_tagstruct_put(
3086         t,
3087         PA_TAG_U32, sink->index,
3088         PA_TAG_STRING, sink->name,
3089         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
3090         PA_TAG_SAMPLE_SPEC, &fixed_ss,
3091         PA_TAG_CHANNEL_MAP, &sink->channel_map,
3092         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
3093         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
3094         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
3095         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
3096         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
3097         PA_TAG_USEC, pa_sink_get_latency(sink),
3098         PA_TAG_STRING, sink->driver,
3099         PA_TAG_U32, sink->flags & PA_SINK_CLIENT_FLAGS_MASK,
3100         PA_TAG_INVALID);
3101
3102     if (c->version >= 13) {
3103         pa_tagstruct_put_proplist(t, sink->proplist);
3104         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
3105     }
3106
3107     if (c->version >= 15) {
3108         pa_tagstruct_put_volume(t, sink->base_volume);
3109         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
3110             pa_log_error("Internal sink state is invalid.");
3111         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
3112         pa_tagstruct_putu32(t, sink->n_volume_steps);
3113         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
3114     }
3115
3116     if (c->version >= 16) {
3117         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
3118
3119         if (sink->ports) {
3120             void *state;
3121             pa_device_port *p;
3122
3123             PA_HASHMAP_FOREACH(p, sink->ports, state) {
3124                 pa_tagstruct_puts(t, p->name);
3125                 pa_tagstruct_puts(t, p->description);
3126                 pa_tagstruct_putu32(t, p->priority);
3127                 if (c->version >= 24)
3128                     pa_tagstruct_putu32(t, p->available);
3129             }
3130         }
3131
3132         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
3133     }
3134
3135     if (c->version >= 21) {
3136         uint32_t i;
3137         pa_format_info *f;
3138         pa_idxset *formats = pa_sink_get_formats(sink);
3139
3140         pa_tagstruct_putu8(t, (uint8_t) pa_idxset_size(formats));
3141         PA_IDXSET_FOREACH(f, formats, i) {
3142             pa_tagstruct_put_format_info(t, f);
3143         }
3144
3145         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
3146     }
3147 }
3148
3149 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
3150     pa_sample_spec fixed_ss;
3151
3152     pa_assert(t);
3153     pa_source_assert_ref(source);
3154
3155     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
3156
3157     pa_tagstruct_put(
3158         t,
3159         PA_TAG_U32, source->index,
3160         PA_TAG_STRING, source->name,
3161         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
3162         PA_TAG_SAMPLE_SPEC, &fixed_ss,
3163         PA_TAG_CHANNEL_MAP, &source->channel_map,
3164         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
3165         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
3166         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
3167         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
3168         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
3169         PA_TAG_USEC, pa_source_get_latency(source),
3170         PA_TAG_STRING, source->driver,
3171         PA_TAG_U32, source->flags & PA_SOURCE_CLIENT_FLAGS_MASK,
3172         PA_TAG_INVALID);
3173
3174     if (c->version >= 13) {
3175         pa_tagstruct_put_proplist(t, source->proplist);
3176         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
3177     }
3178
3179     if (c->version >= 15) {
3180         pa_tagstruct_put_volume(t, source->base_volume);
3181         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
3182             pa_log_error("Internal source state is invalid.");
3183         pa_tagstruct_putu32(t, pa_source_get_state(source));
3184         pa_tagstruct_putu32(t, source->n_volume_steps);
3185         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
3186     }
3187
3188     if (c->version >= 16) {
3189
3190         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
3191
3192         if (source->ports) {
3193             void *state;
3194             pa_device_port *p;
3195
3196             PA_HASHMAP_FOREACH(p, source->ports, state) {
3197                 pa_tagstruct_puts(t, p->name);
3198                 pa_tagstruct_puts(t, p->description);
3199                 pa_tagstruct_putu32(t, p->priority);
3200                 if (c->version >= 24)
3201                     pa_tagstruct_putu32(t, p->available);
3202             }
3203         }
3204
3205         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
3206     }
3207
3208     if (c->version >= 22) {
3209         uint32_t i;
3210         pa_format_info *f;
3211         pa_idxset *formats = pa_source_get_formats(source);
3212
3213         pa_tagstruct_putu8(t, (uint8_t) pa_idxset_size(formats));
3214         PA_IDXSET_FOREACH(f, formats, i) {
3215             pa_tagstruct_put_format_info(t, f);
3216         }
3217
3218         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
3219     }
3220 }
3221
3222 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
3223     pa_assert(t);
3224     pa_assert(client);
3225
3226     pa_tagstruct_putu32(t, client->index);
3227     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
3228     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
3229     pa_tagstruct_puts(t, client->driver);
3230
3231     if (c->version >= 13)
3232         pa_tagstruct_put_proplist(t, client->proplist);
3233 }
3234
3235 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
3236     void *state = NULL;
3237     pa_card_profile *p;
3238
3239     pa_assert(t);
3240     pa_assert(card);
3241
3242     pa_tagstruct_putu32(t, card->index);
3243     pa_tagstruct_puts(t, card->name);
3244     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
3245     pa_tagstruct_puts(t, card->driver);
3246
3247     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
3248
3249     if (card->profiles) {
3250         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
3251             pa_tagstruct_puts(t, p->name);
3252             pa_tagstruct_puts(t, p->description);
3253             pa_tagstruct_putu32(t, p->n_sinks);
3254             pa_tagstruct_putu32(t, p->n_sources);
3255             pa_tagstruct_putu32(t, p->priority);
3256         }
3257     }
3258
3259     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
3260     pa_tagstruct_put_proplist(t, card->proplist);
3261
3262     if (c->version < 26)
3263         return;
3264
3265     if (card->ports) {
3266         pa_device_port* port;
3267         pa_proplist* proplist = pa_proplist_new(); /* For now - push an empty proplist */
3268
3269         pa_tagstruct_putu32(t, pa_hashmap_size(card->ports));
3270
3271         PA_HASHMAP_FOREACH(port, card->ports, state) {
3272             pa_tagstruct_puts(t, port->name);
3273             pa_tagstruct_puts(t, port->description);
3274             pa_tagstruct_putu32(t, port->priority);
3275             pa_tagstruct_putu32(t, port->available);
3276             pa_tagstruct_putu8(t, /* FIXME: port->direction */ (port->is_input ? PA_DIRECTION_INPUT : 0) | (port->is_output ? PA_DIRECTION_OUTPUT : 0));
3277             pa_tagstruct_put_proplist(t, proplist);
3278
3279             if (port->profiles) {
3280                 void* state2;
3281                 pa_tagstruct_putu32(t, pa_hashmap_size(port->profiles));
3282                 PA_HASHMAP_FOREACH(p, port->profiles, state2)
3283                     pa_tagstruct_puts(t, p->name);
3284             } else
3285                 pa_tagstruct_putu32(t, 0);
3286         }
3287
3288         pa_proplist_free(proplist);
3289     } else
3290         pa_tagstruct_putu32(t, 0);
3291 }
3292
3293 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
3294     pa_assert(t);
3295     pa_assert(module);
3296
3297     pa_tagstruct_putu32(t, module->index);
3298     pa_tagstruct_puts(t, module->name);
3299     pa_tagstruct_puts(t, module->argument);
3300     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
3301
3302     if (c->version < 15)
3303         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
3304
3305     if (c->version >= 15)
3306         pa_tagstruct_put_proplist(t, module->proplist);
3307 }
3308
3309 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
3310     pa_sample_spec fixed_ss;
3311     pa_usec_t sink_latency;
3312     pa_cvolume v;
3313     pa_bool_t has_volume = FALSE;
3314
3315     pa_assert(t);
3316     pa_sink_input_assert_ref(s);
3317
3318     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3319
3320     has_volume = pa_sink_input_is_volume_readable(s);
3321     if (has_volume)
3322         pa_sink_input_get_volume(s, &v, TRUE);
3323     else
3324         pa_cvolume_reset(&v, fixed_ss.channels);
3325
3326     pa_tagstruct_putu32(t, s->index);
3327     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3328     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3329     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3330     pa_tagstruct_putu32(t, s->sink->index);
3331     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3332     pa_tagstruct_put_channel_map(t, &s->channel_map);
3333     pa_tagstruct_put_cvolume(t, &v);
3334     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3335     pa_tagstruct_put_usec(t, sink_latency);
3336     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3337     pa_tagstruct_puts(t, s->driver);
3338     if (c->version >= 11)
3339         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3340     if (c->version >= 13)
3341         pa_tagstruct_put_proplist(t, s->proplist);
3342     if (c->version >= 19)
3343         pa_tagstruct_put_boolean(t, (pa_sink_input_get_state(s) == PA_SINK_INPUT_CORKED));
3344     if (c->version >= 20) {
3345         pa_tagstruct_put_boolean(t, has_volume);
3346         pa_tagstruct_put_boolean(t, s->volume_writable);
3347     }
3348     if (c->version >= 21)
3349         pa_tagstruct_put_format_info(t, s->format);
3350 }
3351
3352 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3353     pa_sample_spec fixed_ss;
3354     pa_usec_t source_latency;
3355     pa_cvolume v;
3356     pa_bool_t has_volume = FALSE;
3357
3358     pa_assert(t);
3359     pa_source_output_assert_ref(s);
3360
3361     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3362
3363     has_volume = pa_source_output_is_volume_readable(s);
3364     if (has_volume)
3365         pa_source_output_get_volume(s, &v, TRUE);
3366     else
3367         pa_cvolume_reset(&v, fixed_ss.channels);
3368
3369     pa_tagstruct_putu32(t, s->index);
3370     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3371     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3372     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3373     pa_tagstruct_putu32(t, s->source->index);
3374     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3375     pa_tagstruct_put_channel_map(t, &s->channel_map);
3376     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3377     pa_tagstruct_put_usec(t, source_latency);
3378     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3379     pa_tagstruct_puts(t, s->driver);
3380     if (c->version >= 13)
3381         pa_tagstruct_put_proplist(t, s->proplist);
3382     if (c->version >= 19)
3383         pa_tagstruct_put_boolean(t, (pa_source_output_get_state(s) == PA_SOURCE_OUTPUT_CORKED));
3384     if (c->version >= 22) {
3385         pa_tagstruct_put_cvolume(t, &v);
3386         pa_tagstruct_put_boolean(t, pa_source_output_get_mute(s));
3387         pa_tagstruct_put_boolean(t, has_volume);
3388         pa_tagstruct_put_boolean(t, s->volume_writable);
3389         pa_tagstruct_put_format_info(t, s->format);
3390     }
3391 }
3392
3393 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3394     pa_sample_spec fixed_ss;
3395     pa_cvolume v;
3396
3397     pa_assert(t);
3398     pa_assert(e);
3399
3400     if (e->memchunk.memblock)
3401         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3402     else
3403         memset(&fixed_ss, 0, sizeof(fixed_ss));
3404
3405     pa_tagstruct_putu32(t, e->index);
3406     pa_tagstruct_puts(t, e->name);
3407
3408     if (e->volume_is_set)
3409         v = e->volume;
3410     else
3411         pa_cvolume_init(&v);
3412
3413     pa_tagstruct_put_cvolume(t, &v);
3414     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3415     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3416     pa_tagstruct_put_channel_map(t, &e->channel_map);
3417     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3418     pa_tagstruct_put_boolean(t, e->lazy);
3419     pa_tagstruct_puts(t, e->filename);
3420
3421     if (c->version >= 13)
3422         pa_tagstruct_put_proplist(t, e->proplist);
3423 }
3424
3425 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3426     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3427     uint32_t idx;
3428     pa_sink *sink = NULL;
3429     pa_source *source = NULL;
3430     pa_client *client = NULL;
3431     pa_card *card = NULL;
3432     pa_module *module = NULL;
3433     pa_sink_input *si = NULL;
3434     pa_source_output *so = NULL;
3435     pa_scache_entry *sce = NULL;
3436     const char *name = NULL;
3437     pa_tagstruct *reply;
3438
3439     pa_native_connection_assert_ref(c);
3440     pa_assert(t);
3441
3442     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3443         (command != PA_COMMAND_GET_CLIENT_INFO &&
3444          command != PA_COMMAND_GET_MODULE_INFO &&
3445          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3446          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3447          pa_tagstruct_gets(t, &name) < 0) ||
3448         !pa_tagstruct_eof(t)) {
3449         protocol_error(c);
3450         return;
3451     }
3452
3453     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3454     CHECK_VALIDITY(c->pstream, !name ||
3455                    (command == PA_COMMAND_GET_SINK_INFO &&
3456                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SINK)) ||
3457                    (command == PA_COMMAND_GET_SOURCE_INFO &&
3458                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SOURCE)) ||
3459                    pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3460     CHECK_VALIDITY(c->pstream, command == PA_COMMAND_GET_SINK_INFO ||
3461                    command == PA_COMMAND_GET_SOURCE_INFO ||
3462                    (idx != PA_INVALID_INDEX || name), tag, PA_ERR_INVALID);
3463     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3464     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3465
3466     if (command == PA_COMMAND_GET_SINK_INFO) {
3467         if (idx != PA_INVALID_INDEX)
3468             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3469         else
3470             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3471     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3472         if (idx != PA_INVALID_INDEX)
3473             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3474         else
3475             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3476     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3477         if (idx != PA_INVALID_INDEX)
3478             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3479         else
3480             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3481     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3482         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3483     else if (command == PA_COMMAND_GET_MODULE_INFO)
3484         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3485     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3486         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3487     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3488         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3489     else {
3490         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3491         if (idx != PA_INVALID_INDEX)
3492             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3493         else
3494             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3495     }
3496
3497     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3498         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3499         return;
3500     }
3501
3502     reply = reply_new(tag);
3503     if (sink)
3504         sink_fill_tagstruct(c, reply, sink);
3505     else if (source)
3506         source_fill_tagstruct(c, reply, source);
3507     else if (client)
3508         client_fill_tagstruct(c, reply, client);
3509     else if (card)
3510         card_fill_tagstruct(c, reply, card);
3511     else if (module)
3512         module_fill_tagstruct(c, reply, module);
3513     else if (si)
3514         sink_input_fill_tagstruct(c, reply, si);
3515     else if (so)
3516         source_output_fill_tagstruct(c, reply, so);
3517     else
3518         scache_fill_tagstruct(c, reply, sce);
3519     pa_pstream_send_tagstruct(c->pstream, reply);
3520 }
3521
3522 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3523     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3524     pa_idxset *i;
3525     uint32_t idx;
3526     void *p;
3527     pa_tagstruct *reply;
3528
3529     pa_native_connection_assert_ref(c);
3530     pa_assert(t);
3531
3532     if (!pa_tagstruct_eof(t)) {
3533         protocol_error(c);
3534         return;
3535     }
3536
3537     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3538
3539     reply = reply_new(tag);
3540
3541     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3542         i = c->protocol->core->sinks;
3543     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3544         i = c->protocol->core->sources;
3545     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3546         i = c->protocol->core->clients;
3547     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3548         i = c->protocol->core->cards;
3549     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3550         i = c->protocol->core->modules;
3551     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3552         i = c->protocol->core->sink_inputs;
3553     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3554         i = c->protocol->core->source_outputs;
3555     else {
3556         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3557         i = c->protocol->core->scache;
3558     }
3559
3560     if (i) {
3561         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3562             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3563                 sink_fill_tagstruct(c, reply, p);
3564             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3565                 source_fill_tagstruct(c, reply, p);
3566             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3567                 client_fill_tagstruct(c, reply, p);
3568             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3569                 card_fill_tagstruct(c, reply, p);
3570             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3571                 module_fill_tagstruct(c, reply, p);
3572             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3573                 sink_input_fill_tagstruct(c, reply, p);
3574             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3575                 source_output_fill_tagstruct(c, reply, p);
3576             else {
3577                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3578                 scache_fill_tagstruct(c, reply, p);
3579             }
3580         }
3581     }
3582
3583     pa_pstream_send_tagstruct(c->pstream, reply);
3584 }
3585
3586 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3587     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3588     pa_tagstruct *reply;
3589     pa_sink *def_sink;
3590     pa_source *def_source;
3591     pa_sample_spec fixed_ss;
3592     char *h, *u;
3593
3594     pa_native_connection_assert_ref(c);
3595     pa_assert(t);
3596
3597     if (!pa_tagstruct_eof(t)) {
3598         protocol_error(c);
3599         return;
3600     }
3601
3602     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3603
3604     reply = reply_new(tag);
3605     pa_tagstruct_puts(reply, PACKAGE_NAME);
3606     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3607
3608     u = pa_get_user_name_malloc();
3609     pa_tagstruct_puts(reply, u);
3610     pa_xfree(u);
3611
3612     h = pa_get_host_name_malloc();
3613     pa_tagstruct_puts(reply, h);
3614     pa_xfree(h);
3615
3616     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3617     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3618
3619     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3620     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3621     def_source = pa_namereg_get_default_source(c->protocol->core);
3622     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3623
3624     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3625
3626     if (c->version >= 15)
3627         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3628
3629     pa_pstream_send_tagstruct(c->pstream, reply);
3630 }
3631
3632 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3633     pa_tagstruct *t;
3634     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3635
3636     pa_native_connection_assert_ref(c);
3637
3638     t = pa_tagstruct_new(NULL, 0);
3639     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3640     pa_tagstruct_putu32(t, (uint32_t) -1);
3641     pa_tagstruct_putu32(t, e);
3642     pa_tagstruct_putu32(t, idx);
3643     pa_pstream_send_tagstruct(c->pstream, t);
3644 }
3645
3646 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3647     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3648     pa_subscription_mask_t m;
3649
3650     pa_native_connection_assert_ref(c);
3651     pa_assert(t);
3652
3653     if (pa_tagstruct_getu32(t, &m) < 0 ||
3654         !pa_tagstruct_eof(t)) {
3655         protocol_error(c);
3656         return;
3657     }
3658
3659     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3660     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3661
3662     if (c->subscription)
3663         pa_subscription_free(c->subscription);
3664
3665     if (m != 0) {
3666         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3667         pa_assert(c->subscription);
3668     } else
3669         c->subscription = NULL;
3670
3671     pa_pstream_send_simple_ack(c->pstream, tag);
3672 }
3673
3674 static void command_set_volume(
3675         pa_pdispatch *pd,
3676         uint32_t command,
3677         uint32_t tag,
3678         pa_tagstruct *t,
3679         void *userdata) {
3680
3681     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3682     uint32_t idx;
3683     pa_cvolume volume;
3684     pa_sink *sink = NULL;
3685     pa_source *source = NULL;
3686     pa_sink_input *si = NULL;
3687     pa_source_output *so = NULL;
3688     const char *name = NULL;
3689     const char *client_name;
3690
3691     pa_native_connection_assert_ref(c);
3692     pa_assert(t);
3693
3694     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3695         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3696         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3697         pa_tagstruct_get_cvolume(t, &volume) ||
3698         !pa_tagstruct_eof(t)) {
3699         protocol_error(c);
3700         return;
3701     }
3702
3703     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3704     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_VOLUME ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3705     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3706     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3707     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3708     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3709
3710     switch (command) {
3711
3712         case PA_COMMAND_SET_SINK_VOLUME:
3713             if (idx != PA_INVALID_INDEX)
3714                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3715             else
3716                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3717             break;
3718
3719         case PA_COMMAND_SET_SOURCE_VOLUME:
3720             if (idx != PA_INVALID_INDEX)
3721                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3722             else
3723                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3724             break;
3725
3726         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3727             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3728             break;
3729
3730         case PA_COMMAND_SET_SOURCE_OUTPUT_VOLUME:
3731             so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3732             break;
3733
3734         default:
3735             pa_assert_not_reached();
3736     }
3737
3738     CHECK_VALIDITY(c->pstream, si || so || sink || source, tag, PA_ERR_NOENTITY);
3739
3740     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3741
3742     if (sink) {
3743         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &sink->sample_spec), tag, PA_ERR_INVALID);
3744
3745         pa_log_debug("Client %s changes volume of sink %s.", client_name, sink->name);
3746         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3747     } else if (source) {
3748         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &source->sample_spec), tag, PA_ERR_INVALID);
3749
3750         pa_log_debug("Client %s changes volume of source %s.", client_name, source->name);
3751         pa_source_set_volume(source, &volume, TRUE, TRUE);
3752     } else if (si) {
3753         CHECK_VALIDITY(c->pstream, si->volume_writable, tag, PA_ERR_BADSTATE);
3754         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &si->sample_spec), tag, PA_ERR_INVALID);
3755
3756         pa_log_debug("Client %s changes volume of sink input %s.",
3757                      client_name,
3758                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3759         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3760     } else if (so) {
3761         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &so->sample_spec), tag, PA_ERR_INVALID);
3762
3763         pa_log_debug("Client %s changes volume of source output %s.",
3764                      client_name,
3765                      pa_strnull(pa_proplist_gets(so->proplist, PA_PROP_MEDIA_NAME)));
3766         pa_source_output_set_volume(so, &volume, TRUE, TRUE);
3767     }
3768
3769     pa_pstream_send_simple_ack(c->pstream, tag);
3770 }
3771
3772 static void command_set_mute(
3773         pa_pdispatch *pd,
3774         uint32_t command,
3775         uint32_t tag,
3776         pa_tagstruct *t,
3777         void *userdata) {
3778
3779     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3780     uint32_t idx;
3781     pa_bool_t mute;
3782     pa_sink *sink = NULL;
3783     pa_source *source = NULL;
3784     pa_sink_input *si = NULL;
3785     pa_source_output *so = NULL;
3786     const char *name = NULL, *client_name;
3787
3788     pa_native_connection_assert_ref(c);
3789     pa_assert(t);
3790
3791     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3792         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3793         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3794         pa_tagstruct_get_boolean(t, &mute) ||
3795         !pa_tagstruct_eof(t)) {
3796         protocol_error(c);
3797         return;
3798     }
3799
3800     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3801     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_MUTE ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3802     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3803     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3804     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3805
3806     switch (command) {
3807
3808         case PA_COMMAND_SET_SINK_MUTE:
3809             if (idx != PA_INVALID_INDEX)
3810                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3811             else
3812                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3813
3814             break;
3815
3816         case PA_COMMAND_SET_SOURCE_MUTE:
3817             if (idx != PA_INVALID_INDEX)
3818                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3819             else
3820                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3821
3822             break;
3823
3824         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3825             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3826             break;
3827
3828         case PA_COMMAND_SET_SOURCE_OUTPUT_MUTE:
3829             so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3830             break;
3831
3832         default:
3833             pa_assert_not_reached();
3834     }
3835
3836     CHECK_VALIDITY(c->pstream, si || so || sink || source, tag, PA_ERR_NOENTITY);
3837
3838     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3839
3840     if (sink) {
3841         pa_log_debug("Client %s changes mute of sink %s.", client_name, sink->name);
3842         pa_sink_set_mute(sink, mute, TRUE);
3843     } else if (source) {
3844         pa_log_debug("Client %s changes mute of source %s.", client_name, source->name);
3845         pa_source_set_mute(source, mute, TRUE);
3846     } else if (si) {
3847         pa_log_debug("Client %s changes mute of sink input %s.",
3848                      client_name,
3849                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3850         pa_sink_input_set_mute(si, mute, TRUE);
3851     } else if (so) {
3852         pa_log_debug("Client %s changes mute of source output %s.",
3853                      client_name,
3854                      pa_strnull(pa_proplist_gets(so->proplist, PA_PROP_MEDIA_NAME)));
3855         pa_source_output_set_mute(so, mute, TRUE);
3856     }
3857
3858     pa_pstream_send_simple_ack(c->pstream, tag);
3859 }
3860
3861 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3862     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3863     uint32_t idx;
3864     pa_bool_t b;
3865     playback_stream *s;
3866
3867     pa_native_connection_assert_ref(c);
3868     pa_assert(t);
3869
3870     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3871         pa_tagstruct_get_boolean(t, &b) < 0 ||
3872         !pa_tagstruct_eof(t)) {
3873         protocol_error(c);
3874         return;
3875     }
3876
3877     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3878     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3879     s = pa_idxset_get_by_index(c->output_streams, idx);
3880     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3881     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3882
3883     pa_sink_input_cork(s->sink_input, b);
3884
3885     if (b)
3886         s->is_underrun = TRUE;
3887
3888     pa_pstream_send_simple_ack(c->pstream, tag);
3889 }
3890
3891 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3892     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3893     uint32_t idx;
3894     playback_stream *s;
3895
3896     pa_native_connection_assert_ref(c);
3897     pa_assert(t);
3898
3899     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3900         !pa_tagstruct_eof(t)) {
3901         protocol_error(c);
3902         return;
3903     }
3904
3905     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3906     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3907     s = pa_idxset_get_by_index(c->output_streams, idx);
3908     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3909     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3910
3911     switch (command) {
3912         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3913             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3914             break;
3915
3916         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3917             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3918             break;
3919
3920         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3921             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3922             break;
3923
3924         default:
3925             pa_assert_not_reached();
3926     }
3927
3928     pa_pstream_send_simple_ack(c->pstream, tag);
3929 }
3930
3931 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3932     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3933     uint32_t idx;
3934     record_stream *s;
3935     pa_bool_t b;
3936
3937     pa_native_connection_assert_ref(c);
3938     pa_assert(t);
3939
3940     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3941         pa_tagstruct_get_boolean(t, &b) < 0 ||
3942         !pa_tagstruct_eof(t)) {
3943         protocol_error(c);
3944         return;
3945     }
3946
3947     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3948     s = pa_idxset_get_by_index(c->record_streams, idx);
3949     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3950
3951     pa_source_output_cork(s->source_output, b);
3952     pa_memblockq_prebuf_force(s->memblockq);
3953     pa_pstream_send_simple_ack(c->pstream, tag);
3954 }
3955
3956 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3957     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3958     uint32_t idx;
3959     record_stream *s;
3960
3961     pa_native_connection_assert_ref(c);
3962     pa_assert(t);
3963
3964     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3965         !pa_tagstruct_eof(t)) {
3966         protocol_error(c);
3967         return;
3968     }
3969
3970     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3971     s = pa_idxset_get_by_index(c->record_streams, idx);
3972     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3973
3974     pa_memblockq_flush_read(s->memblockq);
3975     pa_pstream_send_simple_ack(c->pstream, tag);
3976 }
3977
3978 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3979     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3980     uint32_t idx;
3981     pa_buffer_attr a;
3982     pa_tagstruct *reply;
3983
3984     pa_native_connection_assert_ref(c);
3985     pa_assert(t);
3986
3987     memset(&a, 0, sizeof(a));
3988
3989     if (pa_tagstruct_getu32(t, &idx) < 0) {
3990         protocol_error(c);
3991         return;
3992     }
3993
3994     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3995
3996     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3997         playback_stream *s;
3998         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3999
4000         s = pa_idxset_get_by_index(c->output_streams, idx);
4001         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4002         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4003
4004         if (pa_tagstruct_get(
4005                     t,
4006                     PA_TAG_U32, &a.maxlength,
4007                     PA_TAG_U32, &a.tlength,
4008                     PA_TAG_U32, &a.prebuf,
4009                     PA_TAG_U32, &a.minreq,
4010                     PA_TAG_INVALID) < 0 ||
4011             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
4012             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
4013             !pa_tagstruct_eof(t)) {
4014             protocol_error(c);
4015             return;
4016         }
4017
4018         s->adjust_latency = adjust_latency;
4019         s->early_requests = early_requests;
4020         s->buffer_attr_req = a;
4021
4022         fix_playback_buffer_attr(s);
4023         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
4024
4025         reply = reply_new(tag);
4026         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
4027         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
4028         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
4029         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
4030
4031         if (c->version >= 13)
4032             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
4033
4034     } else {
4035         record_stream *s;
4036         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
4037         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
4038
4039         s = pa_idxset_get_by_index(c->record_streams, idx);
4040         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4041
4042         if (pa_tagstruct_get(
4043                     t,
4044                     PA_TAG_U32, &a.maxlength,
4045                     PA_TAG_U32, &a.fragsize,
4046                     PA_TAG_INVALID) < 0 ||
4047             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
4048             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
4049             !pa_tagstruct_eof(t)) {
4050             protocol_error(c);
4051             return;
4052         }
4053
4054         s->adjust_latency = adjust_latency;
4055         s->early_requests = early_requests;
4056         s->buffer_attr_req = a;
4057
4058         fix_record_buffer_attr_pre(s);
4059         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
4060         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
4061         fix_record_buffer_attr_post(s);
4062
4063         reply = reply_new(tag);
4064         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
4065         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
4066
4067         if (c->version >= 13)
4068             pa_tagstruct_put_usec(reply, s->configured_source_latency);
4069     }
4070
4071     pa_pstream_send_tagstruct(c->pstream, reply);
4072 }
4073
4074 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4075     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4076     uint32_t idx;
4077     uint32_t rate;
4078
4079     pa_native_connection_assert_ref(c);
4080     pa_assert(t);
4081
4082     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4083         pa_tagstruct_getu32(t, &rate) < 0 ||
4084         !pa_tagstruct_eof(t)) {
4085         protocol_error(c);
4086         return;
4087     }
4088
4089     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4090     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
4091
4092     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
4093         playback_stream *s;
4094
4095         s = pa_idxset_get_by_index(c->output_streams, idx);
4096         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4097         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4098
4099         pa_sink_input_set_rate(s->sink_input, rate);
4100
4101     } else {
4102         record_stream *s;
4103         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
4104
4105         s = pa_idxset_get_by_index(c->record_streams, idx);
4106         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4107
4108         pa_source_output_set_rate(s->source_output, rate);
4109     }
4110
4111     pa_pstream_send_simple_ack(c->pstream, tag);
4112 }
4113
4114 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4115     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4116     uint32_t idx;
4117     uint32_t mode;
4118     pa_proplist *p;
4119
4120     pa_native_connection_assert_ref(c);
4121     pa_assert(t);
4122
4123     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4124
4125     p = pa_proplist_new();
4126
4127     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
4128
4129         if (pa_tagstruct_getu32(t, &mode) < 0 ||
4130             pa_tagstruct_get_proplist(t, p) < 0 ||
4131             !pa_tagstruct_eof(t)) {
4132             protocol_error(c);
4133             pa_proplist_free(p);
4134             return;
4135         }
4136
4137     } else {
4138
4139         if (pa_tagstruct_getu32(t, &idx) < 0 ||
4140             pa_tagstruct_getu32(t, &mode) < 0 ||
4141             pa_tagstruct_get_proplist(t, p) < 0 ||
4142             !pa_tagstruct_eof(t)) {
4143             protocol_error(c);
4144             pa_proplist_free(p);
4145             return;
4146         }
4147     }
4148
4149     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
4150         pa_proplist_free(p);
4151         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
4152     }
4153
4154     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
4155         playback_stream *s;
4156
4157         s = pa_idxset_get_by_index(c->output_streams, idx);
4158         if (!s || !playback_stream_isinstance(s)) {
4159             pa_proplist_free(p);
4160             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
4161         }
4162         pa_sink_input_update_proplist(s->sink_input, mode, p);
4163
4164     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
4165         record_stream *s;
4166
4167         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
4168             pa_proplist_free(p);
4169             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
4170         }
4171         pa_source_output_update_proplist(s->source_output, mode, p);
4172
4173     } else {
4174         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
4175
4176         pa_client_update_proplist(c->client, mode, p);
4177     }
4178
4179     pa_pstream_send_simple_ack(c->pstream, tag);
4180     pa_proplist_free(p);
4181 }
4182
4183 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4184     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4185     uint32_t idx;
4186     unsigned changed = 0;
4187     pa_proplist *p;
4188     pa_strlist *l = NULL;
4189
4190     pa_native_connection_assert_ref(c);
4191     pa_assert(t);
4192
4193     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4194
4195     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
4196
4197         if (pa_tagstruct_getu32(t, &idx) < 0) {
4198             protocol_error(c);
4199             return;
4200         }
4201     }
4202
4203     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
4204         playback_stream *s;
4205
4206         s = pa_idxset_get_by_index(c->output_streams, idx);
4207         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4208         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4209
4210         p = s->sink_input->proplist;
4211
4212     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
4213         record_stream *s;
4214
4215         s = pa_idxset_get_by_index(c->record_streams, idx);
4216         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4217
4218         p = s->source_output->proplist;
4219     } else {
4220         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
4221
4222         p = c->client->proplist;
4223     }
4224
4225     for (;;) {
4226         const char *k;
4227
4228         if (pa_tagstruct_gets(t, &k) < 0) {
4229             protocol_error(c);
4230             pa_strlist_free(l);
4231             return;
4232         }
4233
4234         if (!k)
4235             break;
4236
4237         l = pa_strlist_prepend(l, k);
4238     }
4239
4240     if (!pa_tagstruct_eof(t)) {
4241         protocol_error(c);
4242         pa_strlist_free(l);
4243         return;
4244     }
4245
4246     for (;;) {
4247         char *z;
4248
4249         l = pa_strlist_pop(l, &z);
4250
4251         if (!z)
4252             break;
4253
4254         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
4255         pa_xfree(z);
4256     }
4257
4258     pa_pstream_send_simple_ack(c->pstream, tag);
4259
4260     if (changed) {
4261         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
4262             playback_stream *s;
4263
4264             s = pa_idxset_get_by_index(c->output_streams, idx);
4265             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
4266
4267         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
4268             record_stream *s;
4269
4270             s = pa_idxset_get_by_index(c->record_streams, idx);
4271             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
4272
4273         } else {
4274             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
4275             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
4276         }
4277     }
4278 }
4279
4280 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4281     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4282     const char *s;
4283
4284     pa_native_connection_assert_ref(c);
4285     pa_assert(t);
4286
4287     if (pa_tagstruct_gets(t, &s) < 0 ||
4288         !pa_tagstruct_eof(t)) {
4289         protocol_error(c);
4290         return;
4291     }
4292
4293     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4294     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
4295
4296     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
4297         pa_source *source;
4298
4299         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
4300         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4301
4302         pa_namereg_set_default_source(c->protocol->core, source);
4303     } else {
4304         pa_sink *sink;
4305         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
4306
4307         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
4308         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4309
4310         pa_namereg_set_default_sink(c->protocol->core, sink);
4311     }
4312
4313     pa_pstream_send_simple_ack(c->pstream, tag);
4314 }
4315
4316 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4317     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4318     uint32_t idx;
4319     const char *name;
4320
4321     pa_native_connection_assert_ref(c);
4322     pa_assert(t);
4323
4324     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4325         pa_tagstruct_gets(t, &name) < 0 ||
4326         !pa_tagstruct_eof(t)) {
4327         protocol_error(c);
4328         return;
4329     }
4330
4331     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4332     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
4333
4334     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
4335         playback_stream *s;
4336
4337         s = pa_idxset_get_by_index(c->output_streams, idx);
4338         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4339         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4340
4341         pa_sink_input_set_name(s->sink_input, name);
4342
4343     } else {
4344         record_stream *s;
4345         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
4346
4347         s = pa_idxset_get_by_index(c->record_streams, idx);
4348         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4349
4350         pa_source_output_set_name(s->source_output, name);
4351     }
4352
4353     pa_pstream_send_simple_ack(c->pstream, tag);
4354 }
4355
4356 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4357     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4358     uint32_t idx;
4359
4360     pa_native_connection_assert_ref(c);
4361     pa_assert(t);
4362
4363     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4364         !pa_tagstruct_eof(t)) {
4365         protocol_error(c);
4366         return;
4367     }
4368
4369     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4370
4371     if (command == PA_COMMAND_KILL_CLIENT) {
4372         pa_client *client;
4373
4374         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
4375         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
4376
4377         pa_native_connection_ref(c);
4378         pa_client_kill(client);
4379
4380     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
4381         pa_sink_input *s;
4382
4383         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4384         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4385
4386         pa_native_connection_ref(c);
4387         pa_sink_input_kill(s);
4388     } else {
4389         pa_source_output *s;
4390
4391         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4392
4393         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4394         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4395
4396         pa_native_connection_ref(c);
4397         pa_source_output_kill(s);
4398     }
4399
4400     pa_pstream_send_simple_ack(c->pstream, tag);
4401     pa_native_connection_unref(c);
4402 }
4403
4404 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4405     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4406     pa_module *m;
4407     const char *name, *argument;
4408     pa_tagstruct *reply;
4409
4410     pa_native_connection_assert_ref(c);
4411     pa_assert(t);
4412
4413     if (pa_tagstruct_gets(t, &name) < 0 ||
4414         pa_tagstruct_gets(t, &argument) < 0 ||
4415         !pa_tagstruct_eof(t)) {
4416         protocol_error(c);
4417         return;
4418     }
4419
4420     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4421     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4422     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4423
4424     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4425         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4426         return;
4427     }
4428
4429     reply = reply_new(tag);
4430     pa_tagstruct_putu32(reply, m->index);
4431     pa_pstream_send_tagstruct(c->pstream, reply);
4432 }
4433
4434 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4435     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4436     uint32_t idx;
4437     pa_module *m;
4438
4439     pa_native_connection_assert_ref(c);
4440     pa_assert(t);
4441
4442     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4443         !pa_tagstruct_eof(t)) {
4444         protocol_error(c);
4445         return;
4446     }
4447
4448     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4449     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4450     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4451
4452     pa_module_unload_request(m, FALSE);
4453     pa_pstream_send_simple_ack(c->pstream, tag);
4454 }
4455
4456 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4457     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4458     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4459     const char *name_device = NULL;
4460
4461     pa_native_connection_assert_ref(c);
4462     pa_assert(t);
4463
4464     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4465         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4466         pa_tagstruct_gets(t, &name_device) < 0 ||
4467         !pa_tagstruct_eof(t)) {
4468         protocol_error(c);
4469         return;
4470     }
4471
4472     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4473     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4474
4475     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name_or_wildcard(name_device, command == PA_COMMAND_MOVE_SINK_INPUT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4476     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4477     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4478     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4479
4480     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4481         pa_sink_input *si = NULL;
4482         pa_sink *sink = NULL;
4483
4484         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4485
4486         if (idx_device != PA_INVALID_INDEX)
4487             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4488         else
4489             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4490
4491         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4492
4493         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4494             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4495             return;
4496         }
4497     } else {
4498         pa_source_output *so = NULL;
4499         pa_source *source;
4500
4501         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4502
4503         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4504
4505         if (idx_device != PA_INVALID_INDEX)
4506             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4507         else
4508             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4509
4510         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4511
4512         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4513             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4514             return;
4515         }
4516     }
4517
4518     pa_pstream_send_simple_ack(c->pstream, tag);
4519 }
4520
4521 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4522     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4523     uint32_t idx = PA_INVALID_INDEX;
4524     const char *name = NULL;
4525     pa_bool_t b;
4526
4527     pa_native_connection_assert_ref(c);
4528     pa_assert(t);
4529
4530     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4531         pa_tagstruct_gets(t, &name) < 0 ||
4532         pa_tagstruct_get_boolean(t, &b) < 0 ||
4533         !pa_tagstruct_eof(t)) {
4534         protocol_error(c);
4535         return;
4536     }
4537
4538     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4539     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SUSPEND_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE) || *name == 0, tag, PA_ERR_INVALID);
4540     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4541     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4542     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4543
4544     if (command == PA_COMMAND_SUSPEND_SINK) {
4545
4546         if (idx == PA_INVALID_INDEX && name && !*name) {
4547
4548             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4549
4550             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4551                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4552                 return;
4553             }
4554         } else {
4555             pa_sink *sink = NULL;
4556
4557             if (idx != PA_INVALID_INDEX)
4558                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4559             else
4560                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4561
4562             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4563
4564             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4565                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4566                 return;
4567             }
4568         }
4569     } else {
4570
4571         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4572
4573         if (idx == PA_INVALID_INDEX && name && !*name) {
4574
4575             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4576
4577             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4578                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4579                 return;
4580             }
4581
4582         } else {
4583             pa_source *source;
4584
4585             if (idx != PA_INVALID_INDEX)
4586                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4587             else
4588                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4589
4590             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4591
4592             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4593                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4594                 return;
4595             }
4596         }
4597     }
4598
4599     pa_pstream_send_simple_ack(c->pstream, tag);
4600 }
4601
4602 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4603     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4604     uint32_t idx = PA_INVALID_INDEX;
4605     const char *name = NULL;
4606     pa_module *m;
4607     pa_native_protocol_ext_cb_t cb;
4608
4609     pa_native_connection_assert_ref(c);
4610     pa_assert(t);
4611
4612     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4613         pa_tagstruct_gets(t, &name) < 0) {
4614         protocol_error(c);
4615         return;
4616     }
4617
4618     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4619     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4620     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4621     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4622     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4623
4624     if (idx != PA_INVALID_INDEX)
4625         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4626     else {
4627         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4628             if (strcmp(name, m->name) == 0)
4629                 break;
4630     }
4631
4632     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4633     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4634
4635     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4636     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4637
4638     if (cb(c->protocol, m, c, tag, t) < 0)
4639         protocol_error(c);
4640 }
4641
4642 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4643     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4644     uint32_t idx = PA_INVALID_INDEX;
4645     const char *name = NULL, *profile = NULL;
4646     pa_card *card = NULL;
4647     int ret;
4648
4649     pa_native_connection_assert_ref(c);
4650     pa_assert(t);
4651
4652     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4653         pa_tagstruct_gets(t, &name) < 0 ||
4654         pa_tagstruct_gets(t, &profile) < 0 ||
4655         !pa_tagstruct_eof(t)) {
4656         protocol_error(c);
4657         return;
4658     }
4659
4660     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4661     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4662     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4663     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4664     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4665
4666     if (idx != PA_INVALID_INDEX)
4667         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4668     else
4669         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4670
4671     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4672
4673     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4674         pa_pstream_send_error(c->pstream, tag, -ret);
4675         return;
4676     }
4677
4678     pa_pstream_send_simple_ack(c->pstream, tag);
4679 }
4680
4681 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4682     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4683     uint32_t idx = PA_INVALID_INDEX;
4684     const char *name = NULL, *port = NULL;
4685     int ret;
4686
4687     pa_native_connection_assert_ref(c);
4688     pa_assert(t);
4689
4690     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4691         pa_tagstruct_gets(t, &name) < 0 ||
4692         pa_tagstruct_gets(t, &port) < 0 ||
4693         !pa_tagstruct_eof(t)) {
4694         protocol_error(c);
4695         return;
4696     }
4697
4698     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4699     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_PORT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4700     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4701     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4702     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4703
4704     if (command == PA_COMMAND_SET_SINK_PORT) {
4705         pa_sink *sink;
4706
4707         if (idx != PA_INVALID_INDEX)
4708             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4709         else
4710             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4711
4712         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4713
4714         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4715             pa_pstream_send_error(c->pstream, tag, -ret);
4716             return;
4717         }
4718     } else {
4719         pa_source *source;
4720
4721         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4722
4723         if (idx != PA_INVALID_INDEX)
4724             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4725         else
4726             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4727
4728         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4729
4730         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4731             pa_pstream_send_error(c->pstream, tag, -ret);
4732             return;
4733         }
4734     }
4735
4736     pa_pstream_send_simple_ack(c->pstream, tag);
4737 }
4738
4739 /*** pstream callbacks ***/
4740
4741 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4742     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4743
4744     pa_assert(p);
4745     pa_assert(packet);
4746     pa_native_connection_assert_ref(c);
4747
4748     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4749         pa_log("invalid packet.");
4750         native_connection_unlink(c);
4751     }
4752 }
4753
4754 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4755     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4756     output_stream *stream;
4757
4758     pa_assert(p);
4759     pa_assert(chunk);
4760     pa_native_connection_assert_ref(c);
4761
4762     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4763         pa_log_debug("Client sent block for invalid stream.");
4764         /* Ignoring */
4765         return;
4766     }
4767
4768 #ifdef PROTOCOL_NATIVE_DEBUG
4769     pa_log("got %lu bytes from client", (unsigned long) chunk->length);
4770 #endif
4771
4772     if (playback_stream_isinstance(stream)) {
4773         playback_stream *ps = PLAYBACK_STREAM(stream);
4774
4775         pa_atomic_inc(&ps->seek_or_post_in_queue);
4776         if (chunk->memblock) {
4777             if (seek != PA_SEEK_RELATIVE || offset != 0)
4778                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, chunk, NULL);
4779             else
4780                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4781         } else
4782             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4783
4784     } else {
4785         upload_stream *u = UPLOAD_STREAM(stream);
4786         size_t l;
4787
4788         if (!u->memchunk.memblock) {
4789             if (u->length == chunk->length && chunk->memblock) {
4790                 u->memchunk = *chunk;
4791                 pa_memblock_ref(u->memchunk.memblock);
4792                 u->length = 0;
4793             } else {
4794                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4795                 u->memchunk.index = u->memchunk.length = 0;
4796             }
4797         }
4798
4799         pa_assert(u->memchunk.memblock);
4800
4801         l = u->length;
4802         if (l > chunk->length)
4803             l = chunk->length;
4804
4805         if (l > 0) {
4806             void *dst;
4807             dst = pa_memblock_acquire(u->memchunk.memblock);
4808
4809             if (chunk->memblock) {
4810                 void *src;
4811                 src = pa_memblock_acquire(chunk->memblock);
4812
4813                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4814                        (uint8_t*) src + chunk->index, l);
4815
4816                 pa_memblock_release(chunk->memblock);
4817             } else
4818                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4819
4820             pa_memblock_release(u->memchunk.memblock);
4821
4822             u->memchunk.length += l;
4823             u->length -= l;
4824         }
4825     }
4826 }
4827
4828 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4829     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4830
4831     pa_assert(p);
4832     pa_native_connection_assert_ref(c);
4833
4834     native_connection_unlink(c);
4835     pa_log_info("Connection died.");
4836 }
4837
4838 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4839     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4840
4841     pa_assert(p);
4842     pa_native_connection_assert_ref(c);
4843
4844     native_connection_send_memblock(c);
4845 }
4846
4847 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4848     pa_thread_mq *q;
4849
4850     if (!(q = pa_thread_mq_get()))
4851         pa_pstream_send_revoke(p, block_id);
4852     else
4853         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4854 }
4855
4856 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4857     pa_thread_mq *q;
4858
4859     if (!(q = pa_thread_mq_get()))
4860         pa_pstream_send_release(p, block_id);
4861     else
4862         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4863 }
4864
4865 /*** client callbacks ***/
4866
4867 static void client_kill_cb(pa_client *c) {
4868     pa_assert(c);
4869
4870     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4871     pa_log_info("Connection killed.");
4872 }
4873
4874 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4875     pa_tagstruct *t;
4876     pa_native_connection *c;
4877
4878     pa_assert(client);
4879     c = PA_NATIVE_CONNECTION(client->userdata);
4880     pa_native_connection_assert_ref(c);
4881
4882     if (c->version < 15)
4883       return;
4884
4885     t = pa_tagstruct_new(NULL, 0);
4886     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4887     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4888     pa_tagstruct_puts(t, event);
4889     pa_tagstruct_put_proplist(t, pl);
4890     pa_pstream_send_tagstruct(c->pstream, t);
4891 }
4892
4893 /*** module entry points ***/
4894
4895 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4896     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4897
4898     pa_assert(m);
4899     pa_native_connection_assert_ref(c);
4900     pa_assert(c->auth_timeout_event == e);
4901
4902     if (!c->authorized) {
4903         native_connection_unlink(c);
4904         pa_log_info("Connection terminated due to authentication timeout.");
4905     }
4906 }
4907
4908 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4909     pa_native_connection *c;
4910     char pname[128];
4911     pa_client *client;
4912     pa_client_new_data data;
4913
4914     pa_assert(p);
4915     pa_assert(io);
4916     pa_assert(o);
4917
4918     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4919         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4920         pa_iochannel_free(io);
4921         return;
4922     }
4923
4924     pa_client_new_data_init(&data);
4925     data.module = o->module;
4926     data.driver = __FILE__;
4927     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4928     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4929     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4930     client = pa_client_new(p->core, &data);
4931     pa_client_new_data_done(&data);
4932
4933     if (!client)
4934         return;
4935
4936     c = pa_msgobject_new(pa_native_connection);
4937     c->parent.parent.free = native_connection_free;
4938     c->parent.process_msg = native_connection_process_msg;
4939     c->protocol = p;
4940     c->options = pa_native_options_ref(o);
4941     c->authorized = FALSE;
4942
4943     if (o->auth_anonymous) {
4944         pa_log_info("Client authenticated anonymously.");
4945         c->authorized = TRUE;
4946     }
4947
4948     if (!c->authorized &&
4949         o->auth_ip_acl &&
4950         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4951
4952         pa_log_info("Client authenticated by IP ACL.");
4953         c->authorized = TRUE;
4954     }
4955
4956     if (!c->authorized)
4957         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4958     else
4959         c->auth_timeout_event = NULL;
4960
4961     c->is_local = pa_iochannel_socket_is_local(io);
4962     c->version = 8;
4963
4964     c->client = client;
4965     c->client->kill = client_kill_cb;
4966     c->client->send_event = client_send_event_cb;
4967     c->client->userdata = c;
4968
4969     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4970     pa_pstream_set_receive_packet_callback(c->pstream, pstream_packet_callback, c);
4971     pa_pstream_set_receive_memblock_callback(c->pstream, pstream_memblock_callback, c);
4972     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4973     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4974     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4975     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4976
4977     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4978
4979     c->record_streams = pa_idxset_new(NULL, NULL);
4980     c->output_streams = pa_idxset_new(NULL, NULL);
4981
4982     c->rrobin_index = PA_IDXSET_INVALID;
4983     c->subscription = NULL;
4984
4985     pa_idxset_put(p->connections, c, NULL);
4986
4987 #ifdef HAVE_CREDS
4988     if (pa_iochannel_creds_supported(io))
4989         pa_iochannel_creds_enable(io);
4990 #endif
4991
4992     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4993 }
4994
4995 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4996     pa_native_connection *c;
4997     void *state = NULL;
4998
4999     pa_assert(p);
5000     pa_assert(m);
5001
5002     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
5003         if (c->options->module == m)
5004             native_connection_unlink(c);
5005 }
5006
5007 static pa_native_protocol* native_protocol_new(pa_core *c) {
5008     pa_native_protocol *p;
5009     pa_native_hook_t h;
5010
5011     pa_assert(c);
5012
5013     p = pa_xnew(pa_native_protocol, 1);
5014     PA_REFCNT_INIT(p);
5015     p->core = c;
5016     p->connections = pa_idxset_new(NULL, NULL);
5017
5018     p->servers = NULL;
5019
5020     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
5021
5022     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
5023         pa_hook_init(&p->hooks[h], p);
5024
5025     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
5026
5027     return p;
5028 }
5029
5030 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
5031     pa_native_protocol *p;
5032
5033     if ((p = pa_shared_get(c, "native-protocol")))
5034         return pa_native_protocol_ref(p);
5035
5036     return native_protocol_new(c);
5037 }
5038
5039 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
5040     pa_assert(p);
5041     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5042
5043     PA_REFCNT_INC(p);
5044
5045     return p;
5046 }
5047
5048 void pa_native_protocol_unref(pa_native_protocol *p) {
5049     pa_native_connection *c;
5050     pa_native_hook_t h;
5051
5052     pa_assert(p);
5053     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5054
5055     if (PA_REFCNT_DEC(p) > 0)
5056         return;
5057
5058     while ((c = pa_idxset_first(p->connections, NULL)))
5059         native_connection_unlink(c);
5060
5061     pa_idxset_free(p->connections, NULL, NULL);
5062
5063     pa_strlist_free(p->servers);
5064
5065     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
5066         pa_hook_done(&p->hooks[h]);
5067
5068     pa_hashmap_free(p->extensions, NULL, NULL);
5069
5070     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
5071
5072     pa_xfree(p);
5073 }
5074
5075 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
5076     pa_assert(p);
5077     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5078     pa_assert(name);
5079
5080     p->servers = pa_strlist_prepend(p->servers, name);
5081
5082     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
5083 }
5084
5085 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
5086     pa_assert(p);
5087     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5088     pa_assert(name);
5089
5090     p->servers = pa_strlist_remove(p->servers, name);
5091
5092     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
5093 }
5094
5095 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
5096     pa_assert(p);
5097     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5098
5099     return p->hooks;
5100 }
5101
5102 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
5103     pa_assert(p);
5104     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5105
5106     return p->servers;
5107 }
5108
5109 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
5110     pa_assert(p);
5111     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5112     pa_assert(m);
5113     pa_assert(cb);
5114     pa_assert(!pa_hashmap_get(p->extensions, m));
5115
5116     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
5117     return 0;
5118 }
5119
5120 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
5121     pa_assert(p);
5122     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5123     pa_assert(m);
5124
5125     pa_assert_se(pa_hashmap_remove(p->extensions, m));
5126 }
5127
5128 pa_native_options* pa_native_options_new(void) {
5129     pa_native_options *o;
5130
5131     o = pa_xnew0(pa_native_options, 1);
5132     PA_REFCNT_INIT(o);
5133
5134     return o;
5135 }
5136
5137 pa_native_options* pa_native_options_ref(pa_native_options *o) {
5138     pa_assert(o);
5139     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5140
5141     PA_REFCNT_INC(o);
5142
5143     return o;
5144 }
5145
5146 void pa_native_options_unref(pa_native_options *o) {
5147     pa_assert(o);
5148     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5149
5150     if (PA_REFCNT_DEC(o) > 0)
5151         return;
5152
5153     pa_xfree(o->auth_group);
5154
5155     if (o->auth_ip_acl)
5156         pa_ip_acl_free(o->auth_ip_acl);
5157
5158     if (o->auth_cookie)
5159         pa_auth_cookie_unref(o->auth_cookie);
5160
5161     pa_xfree(o);
5162 }
5163
5164 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
5165     pa_bool_t enabled;
5166     const char *acl;
5167
5168     pa_assert(o);
5169     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5170     pa_assert(ma);
5171
5172     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
5173         pa_log("auth-anonymous= expects a boolean argument.");
5174         return -1;
5175     }
5176
5177     enabled = TRUE;
5178     if (pa_modargs_get_value_boolean(ma, "auth-group-enable", &enabled) < 0) {
5179         pa_log("auth-group-enable= expects a boolean argument.");
5180         return -1;
5181     }
5182
5183     pa_xfree(o->auth_group);
5184     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
5185
5186 #ifndef HAVE_CREDS
5187     if (o->auth_group)
5188         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
5189 #endif
5190
5191     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
5192         pa_ip_acl *ipa;
5193
5194         if (!(ipa = pa_ip_acl_new(acl))) {
5195             pa_log("Failed to parse IP ACL '%s'", acl);
5196             return -1;
5197         }
5198
5199         if (o->auth_ip_acl)
5200             pa_ip_acl_free(o->auth_ip_acl);
5201
5202         o->auth_ip_acl = ipa;
5203     }
5204
5205     enabled = TRUE;
5206     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
5207         pa_log("auth-cookie-enabled= expects a boolean argument.");
5208         return -1;
5209     }
5210
5211     if (o->auth_cookie)
5212         pa_auth_cookie_unref(o->auth_cookie);
5213
5214     if (enabled) {
5215         const char *cn;
5216
5217         /* The new name for this is 'auth-cookie', for compat reasons
5218          * we check the old name too */
5219         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
5220             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
5221                 cn = PA_NATIVE_COOKIE_FILE;
5222
5223         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
5224             return -1;
5225
5226     } else
5227           o->auth_cookie = NULL;
5228
5229     return 0;
5230 }
5231
5232 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
5233     pa_native_connection_assert_ref(c);
5234
5235     return c->pstream;
5236 }
5237
5238 pa_client* pa_native_connection_get_client(pa_native_connection *c) {
5239     pa_native_connection_assert_ref(c);
5240
5241     return c->client;
5242 }