native: handle moving() callback with NULL destination properly
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/native-common.h>
40 #include <pulsecore/packet.h>
41 #include <pulsecore/client.h>
42 #include <pulsecore/source-output.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/pstream.h>
45 #include <pulsecore/tagstruct.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/llist.h>
57 #include <pulsecore/creds.h>
58 #include <pulsecore/core-util.h>
59 #include <pulsecore/ipacl.h>
60 #include <pulsecore/thread-mq.h>
61
62 #include "protocol-native.h"
63
64 /* Kick a client if it doesn't authenticate within this time */
65 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
66
67 /* Don't accept more connection than this */
68 #define MAX_CONNECTIONS 64
69
70 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
71 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
72 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
73 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
74
75 struct pa_native_protocol;
76
77 typedef struct record_stream {
78     pa_msgobject parent;
79
80     pa_native_connection *connection;
81     uint32_t index;
82
83     pa_source_output *source_output;
84     pa_memblockq *memblockq;
85
86     pa_bool_t adjust_latency:1;
87     pa_bool_t early_requests:1;
88
89     pa_buffer_attr buffer_attr;
90
91     pa_atomic_t on_the_fly;
92     pa_usec_t configured_source_latency;
93     size_t drop_initial;
94
95     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
96     size_t on_the_fly_snapshot;
97     pa_usec_t current_monitor_latency;
98     pa_usec_t current_source_latency;
99 } record_stream;
100
101 PA_DECLARE_CLASS(record_stream);
102 #define RECORD_STREAM(o) (record_stream_cast(o))
103 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
104
105 typedef struct output_stream {
106     pa_msgobject parent;
107 } output_stream;
108
109 PA_DECLARE_CLASS(output_stream);
110 #define OUTPUT_STREAM(o) (output_stream_cast(o))
111 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
112
113 typedef struct playback_stream {
114     output_stream parent;
115
116     pa_native_connection *connection;
117     uint32_t index;
118
119     pa_sink_input *sink_input;
120     pa_memblockq *memblockq;
121
122     pa_bool_t adjust_latency:1;
123     pa_bool_t early_requests:1;
124
125     pa_bool_t is_underrun:1;
126     pa_bool_t drain_request:1;
127     uint32_t drain_tag;
128     uint32_t syncid;
129
130     pa_atomic_t missing;
131     pa_usec_t configured_sink_latency;
132     pa_buffer_attr buffer_attr;
133
134     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
135     int64_t read_index, write_index;
136     size_t render_memblockq_length;
137     pa_usec_t current_sink_latency;
138     uint64_t playing_for, underrun_for;
139 } playback_stream;
140
141 PA_DECLARE_CLASS(playback_stream);
142 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
143 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
144
145 typedef struct upload_stream {
146     output_stream parent;
147
148     pa_native_connection *connection;
149     uint32_t index;
150
151     pa_memchunk memchunk;
152     size_t length;
153     char *name;
154     pa_sample_spec sample_spec;
155     pa_channel_map channel_map;
156     pa_proplist *proplist;
157 } upload_stream;
158
159 PA_DECLARE_CLASS(upload_stream);
160 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
161 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
162
163 struct pa_native_connection {
164     pa_msgobject parent;
165     pa_native_protocol *protocol;
166     pa_native_options *options;
167     pa_bool_t authorized:1;
168     pa_bool_t is_local:1;
169     uint32_t version;
170     pa_client *client;
171     pa_pstream *pstream;
172     pa_pdispatch *pdispatch;
173     pa_idxset *record_streams, *output_streams;
174     uint32_t rrobin_index;
175     pa_subscription *subscription;
176     pa_time_event *auth_timeout_event;
177 };
178
179 PA_DECLARE_CLASS(pa_native_connection);
180 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
181 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
182
183 struct pa_native_protocol {
184     PA_REFCNT_DECLARE;
185
186     pa_core *core;
187     pa_idxset *connections;
188
189     pa_strlist *servers;
190     pa_hook hooks[PA_NATIVE_HOOK_MAX];
191
192     pa_hashmap *extensions;
193 };
194
195 enum {
196     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
197 };
198
199 enum {
200     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
201     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
202     SINK_INPUT_MESSAGE_FLUSH,
203     SINK_INPUT_MESSAGE_TRIGGER,
204     SINK_INPUT_MESSAGE_SEEK,
205     SINK_INPUT_MESSAGE_PREBUF_FORCE,
206     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
207     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
208 };
209
210 enum {
211     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
212     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
213     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
214     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
215     PLAYBACK_STREAM_MESSAGE_STARTED,
216     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
217 };
218
219 enum {
220     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
221 };
222
223 enum {
224     CONNECTION_MESSAGE_RELEASE,
225     CONNECTION_MESSAGE_REVOKE
226 };
227
228 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
229 static void sink_input_kill_cb(pa_sink_input *i);
230 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
231 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
232 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
235 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
236
237 static void native_connection_send_memblock(pa_native_connection *c);
238 static void playback_stream_request_bytes(struct playback_stream*s);
239
240 static void source_output_kill_cb(pa_source_output *o);
241 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
242 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
243 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
244 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
245 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
246
247 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
249
250 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
289
290 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
291     [PA_COMMAND_ERROR] = NULL,
292     [PA_COMMAND_TIMEOUT] = NULL,
293     [PA_COMMAND_REPLY] = NULL,
294     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
295     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
296     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
297     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
298     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
299     [PA_COMMAND_AUTH] = command_auth,
300     [PA_COMMAND_REQUEST] = NULL,
301     [PA_COMMAND_EXIT] = command_exit,
302     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
303     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
304     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
305     [PA_COMMAND_STAT] = command_stat,
306     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
307     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
308     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
309     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
310     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
311     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
312     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
313     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
314     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
315     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
316     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
317     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
318     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
319     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
320     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
321     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
328     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
329     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
330     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
331
332     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
333     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
334     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
335
336     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
337     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
338     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
339
340     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
341     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
342
343     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
344     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
346     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
347
348     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
349     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
350
351     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
352     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
353     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
354     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
355     [PA_COMMAND_KILL_CLIENT] = command_kill,
356     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
357     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
358     [PA_COMMAND_LOAD_MODULE] = command_load_module,
359     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
360
361     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
362     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
363     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
364     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
365
366     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
367     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
368
369     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
370     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
371
372     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
373     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
374
375     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
376     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
377     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
378
379     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
380     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
381     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
382
383     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
384
385     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
386     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
387
388     [PA_COMMAND_EXTENSION] = command_extension
389 };
390
391 /* structure management */
392
393 /* Called from main context */
394 static void upload_stream_unlink(upload_stream *s) {
395     pa_assert(s);
396
397     if (!s->connection)
398         return;
399
400     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
401     s->connection = NULL;
402     upload_stream_unref(s);
403 }
404
405 /* Called from main context */
406 static void upload_stream_free(pa_object *o) {
407     upload_stream *s = UPLOAD_STREAM(o);
408     pa_assert(s);
409
410     upload_stream_unlink(s);
411
412     pa_xfree(s->name);
413
414     if (s->proplist)
415         pa_proplist_free(s->proplist);
416
417     if (s->memchunk.memblock)
418         pa_memblock_unref(s->memchunk.memblock);
419
420     pa_xfree(s);
421 }
422
423 /* Called from main context */
424 static upload_stream* upload_stream_new(
425         pa_native_connection *c,
426         const pa_sample_spec *ss,
427         const pa_channel_map *map,
428         const char *name,
429         size_t length,
430         pa_proplist *p) {
431
432     upload_stream *s;
433
434     pa_assert(c);
435     pa_assert(ss);
436     pa_assert(name);
437     pa_assert(length > 0);
438     pa_assert(p);
439
440     s = pa_msgobject_new(upload_stream);
441     s->parent.parent.parent.free = upload_stream_free;
442     s->connection = c;
443     s->sample_spec = *ss;
444     s->channel_map = *map;
445     s->name = pa_xstrdup(name);
446     pa_memchunk_reset(&s->memchunk);
447     s->length = length;
448     s->proplist = pa_proplist_copy(p);
449     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
450
451     pa_idxset_put(c->output_streams, s, &s->index);
452
453     return s;
454 }
455
456 /* Called from main context */
457 static void record_stream_unlink(record_stream *s) {
458     pa_assert(s);
459
460     if (!s->connection)
461         return;
462
463     if (s->source_output) {
464         pa_source_output_unlink(s->source_output);
465         pa_source_output_unref(s->source_output);
466         s->source_output = NULL;
467     }
468
469     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
470     s->connection = NULL;
471     record_stream_unref(s);
472 }
473
474 /* Called from main context */
475 static void record_stream_free(pa_object *o) {
476     record_stream *s = RECORD_STREAM(o);
477     pa_assert(s);
478
479     record_stream_unlink(s);
480
481     pa_memblockq_free(s->memblockq);
482     pa_xfree(s);
483 }
484
485 /* Called from main context */
486 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
487     record_stream *s = RECORD_STREAM(o);
488     record_stream_assert_ref(s);
489
490     if (!s->connection)
491         return -1;
492
493     switch (code) {
494
495         case RECORD_STREAM_MESSAGE_POST_DATA:
496
497             /* We try to keep up to date with how many bytes are
498              * currently on the fly */
499             pa_atomic_sub(&s->on_the_fly, chunk->length);
500
501             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
502 /*                 pa_log_warn("Failed to push data into output queue."); */
503                 return -1;
504             }
505
506             if (!pa_pstream_is_pending(s->connection->pstream))
507                 native_connection_send_memblock(s->connection);
508
509             break;
510     }
511
512     return 0;
513 }
514
515 /* Called from main context */
516 static void fix_record_buffer_attr_pre(record_stream *s) {
517
518     size_t frame_size;
519     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
520
521     pa_assert(s);
522
523     /* This function will be called from the main thread, before as
524      * well as after the source output has been activated using
525      * pa_source_output_put()! That means it may not touch any
526      * ->thread_info data! */
527
528     frame_size = pa_frame_size(&s->source_output->sample_spec);
529
530     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
531         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
532     if (s->buffer_attr.maxlength <= 0)
533         s->buffer_attr.maxlength = (uint32_t) frame_size;
534
535     if (s->buffer_attr.fragsize == (uint32_t) -1)
536         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
537     if (s->buffer_attr.fragsize <= 0)
538         s->buffer_attr.fragsize = (uint32_t) frame_size;
539
540     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
541
542     if (s->early_requests) {
543
544         /* In early request mode we need to emulate the classic
545          * fragment-based playback model. We do this setting the source
546          * latency to the fragment size. */
547
548         source_usec = fragsize_usec;
549
550     } else if (s->adjust_latency) {
551
552         /* So, the user asked us to adjust the latency according to
553          * what the source can provide. Half the latency will be
554          * spent on the hw buffer, half of it in the async buffer
555          * queue we maintain for each client. */
556
557         source_usec = fragsize_usec/2;
558
559     } else {
560
561         /* Ok, the user didn't ask us to adjust the latency, hence we
562          * don't */
563
564         source_usec = (pa_usec_t) -1;
565     }
566
567     if (source_usec != (pa_usec_t) -1)
568         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
569     else
570         s->configured_source_latency = 0;
571
572     if (s->early_requests) {
573
574         /* Ok, we didn't necessarily get what we were asking for, so
575          * let's tell the user */
576
577         fragsize_usec = s->configured_source_latency;
578
579     } else if (s->adjust_latency) {
580
581         /* Now subtract what we actually got */
582
583         if (fragsize_usec >= s->configured_source_latency*2)
584             fragsize_usec -= s->configured_source_latency;
585         else
586             fragsize_usec = s->configured_source_latency;
587     }
588
589     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
590         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
591
592         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
593
594     if (s->buffer_attr.fragsize <= 0)
595         s->buffer_attr.fragsize = (uint32_t) frame_size;
596 }
597
598 /* Called from main context */
599 static void fix_record_buffer_attr_post(record_stream *s) {
600     size_t base;
601
602     pa_assert(s);
603
604     /* This function will be called from the main thread, before as
605      * well as after the source output has been activated using
606      * pa_source_output_put()! That means it may not touch and
607      * ->thread_info data! */
608
609     base = pa_frame_size(&s->source_output->sample_spec);
610
611     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
612     if (s->buffer_attr.fragsize <= 0)
613         s->buffer_attr.fragsize = base;
614
615     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
616         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
617 }
618
619 /* Called from main context */
620 static record_stream* record_stream_new(
621         pa_native_connection *c,
622         pa_source *source,
623         pa_sample_spec *ss,
624         pa_channel_map *map,
625         pa_bool_t peak_detect,
626         pa_buffer_attr *attr,
627         pa_source_output_flags_t flags,
628         pa_proplist *p,
629         pa_bool_t adjust_latency,
630         pa_sink_input *direct_on_input,
631         pa_bool_t early_requests,
632         int *ret) {
633
634     record_stream *s;
635     pa_source_output *source_output = NULL;
636     size_t base;
637     pa_source_output_new_data data;
638
639     pa_assert(c);
640     pa_assert(ss);
641     pa_assert(p);
642     pa_assert(ret);
643
644     pa_source_output_new_data_init(&data);
645
646     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
647     data.driver = __FILE__;
648     data.module = c->options->module;
649     data.client = c->client;
650     data.source = source;
651     data.direct_on_input = direct_on_input;
652     pa_source_output_new_data_set_sample_spec(&data, ss);
653     pa_source_output_new_data_set_channel_map(&data, map);
654     if (peak_detect)
655         data.resample_method = PA_RESAMPLER_PEAKS;
656
657     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
658
659     pa_source_output_new_data_done(&data);
660
661     if (!source_output)
662         return NULL;
663
664     s = pa_msgobject_new(record_stream);
665     s->parent.parent.free = record_stream_free;
666     s->parent.process_msg = record_stream_process_msg;
667     s->connection = c;
668     s->source_output = source_output;
669     s->buffer_attr = *attr;
670     s->adjust_latency = adjust_latency;
671     s->early_requests = early_requests;
672     pa_atomic_store(&s->on_the_fly, 0);
673
674     s->source_output->parent.process_msg = source_output_process_msg;
675     s->source_output->push = source_output_push_cb;
676     s->source_output->kill = source_output_kill_cb;
677     s->source_output->get_latency = source_output_get_latency_cb;
678     s->source_output->moving = source_output_moving_cb;
679     s->source_output->suspend = source_output_suspend_cb;
680     s->source_output->send_event = source_output_send_event_cb;
681     s->source_output->userdata = s;
682
683     fix_record_buffer_attr_pre(s);
684
685     s->memblockq = pa_memblockq_new(
686             0,
687             s->buffer_attr.maxlength,
688             0,
689             base = pa_frame_size(&source_output->sample_spec),
690             1,
691             0,
692             0,
693             NULL);
694
695     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
696     fix_record_buffer_attr_post(s);
697
698     *ss = s->source_output->sample_spec;
699     *map = s->source_output->channel_map;
700
701     pa_idxset_put(c->record_streams, s, &s->index);
702
703     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
704                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
705                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
706                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
707
708     pa_source_output_put(s->source_output);
709     return s;
710 }
711
712 /* Called from main context */
713 static void record_stream_send_killed(record_stream *r) {
714     pa_tagstruct *t;
715     record_stream_assert_ref(r);
716
717     t = pa_tagstruct_new(NULL, 0);
718     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
719     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
720     pa_tagstruct_putu32(t, r->index);
721     pa_pstream_send_tagstruct(r->connection->pstream, t);
722 }
723
724 /* Called from main context */
725 static void playback_stream_unlink(playback_stream *s) {
726     pa_assert(s);
727
728     if (!s->connection)
729         return;
730
731     if (s->sink_input) {
732         pa_sink_input_unlink(s->sink_input);
733         pa_sink_input_unref(s->sink_input);
734         s->sink_input = NULL;
735     }
736
737     if (s->drain_request)
738         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
739
740     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
741     s->connection = NULL;
742     playback_stream_unref(s);
743 }
744
745 /* Called from main context */
746 static void playback_stream_free(pa_object* o) {
747     playback_stream *s = PLAYBACK_STREAM(o);
748     pa_assert(s);
749
750     playback_stream_unlink(s);
751
752     pa_memblockq_free(s->memblockq);
753     pa_xfree(s);
754 }
755
756 /* Called from main context */
757 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
758     playback_stream *s = PLAYBACK_STREAM(o);
759     playback_stream_assert_ref(s);
760
761     if (!s->connection)
762         return -1;
763
764     switch (code) {
765
766         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
767             pa_tagstruct *t;
768             int l = 0;
769
770             for (;;) {
771                 if ((l = pa_atomic_load(&s->missing)) <= 0)
772                     return 0;
773
774                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
775                     break;
776             }
777
778             t = pa_tagstruct_new(NULL, 0);
779             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
780             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
781             pa_tagstruct_putu32(t, s->index);
782             pa_tagstruct_putu32(t, (uint32_t) l);
783             pa_pstream_send_tagstruct(s->connection->pstream, t);
784
785 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
786             break;
787         }
788
789         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
790             pa_tagstruct *t;
791
792 /*             pa_log("signalling underflow"); */
793
794             /* Report that we're empty */
795             t = pa_tagstruct_new(NULL, 0);
796             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
797             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
798             pa_tagstruct_putu32(t, s->index);
799             pa_pstream_send_tagstruct(s->connection->pstream, t);
800             break;
801         }
802
803         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
804             pa_tagstruct *t;
805
806             /* Notify the user we're overflowed*/
807             t = pa_tagstruct_new(NULL, 0);
808             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
809             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
810             pa_tagstruct_putu32(t, s->index);
811             pa_pstream_send_tagstruct(s->connection->pstream, t);
812             break;
813         }
814
815         case PLAYBACK_STREAM_MESSAGE_STARTED:
816
817             if (s->connection->version >= 13) {
818                 pa_tagstruct *t;
819
820                 /* Notify the user we started playback */
821                 t = pa_tagstruct_new(NULL, 0);
822                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
823                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
824                 pa_tagstruct_putu32(t, s->index);
825                 pa_pstream_send_tagstruct(s->connection->pstream, t);
826             }
827
828             break;
829
830         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
831             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
832             break;
833
834         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
835             pa_tagstruct *t;
836
837             s->buffer_attr.tlength = (uint32_t) offset;
838
839             t = pa_tagstruct_new(NULL, 0);
840             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
841             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
842             pa_tagstruct_putu32(t, s->index);
843             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
844             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
845             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
846             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
847             pa_tagstruct_put_usec(t, s->configured_sink_latency);
848             pa_pstream_send_tagstruct(s->connection->pstream, t);
849
850             break;
851         }
852     }
853
854     return 0;
855 }
856
857 /* Called from main context */
858 static void fix_playback_buffer_attr(playback_stream *s) {
859     size_t frame_size, max_prebuf;
860     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
861
862     pa_assert(s);
863
864     /* This function will be called from the main thread, before as
865      * well as after the sink input has been activated using
866      * pa_sink_input_put()! That means it may not touch any
867      * ->thread_info data, such as the memblockq! */
868
869     frame_size = pa_frame_size(&s->sink_input->sample_spec);
870
871     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
872         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
873     if (s->buffer_attr.maxlength <= 0)
874         s->buffer_attr.maxlength = (uint32_t) frame_size;
875
876     if (s->buffer_attr.tlength == (uint32_t) -1)
877         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
878     if (s->buffer_attr.tlength <= 0)
879         s->buffer_attr.tlength = (uint32_t) frame_size;
880
881     if (s->buffer_attr.minreq == (uint32_t) -1)
882         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
883     if (s->buffer_attr.minreq <= 0)
884         s->buffer_attr.minreq = (uint32_t) frame_size;
885
886     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
887         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
888
889     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
890     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
891
892     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
893                 (double) tlength_usec / PA_USEC_PER_MSEC,
894                 (double) minreq_usec / PA_USEC_PER_MSEC);
895
896     if (s->early_requests) {
897
898         /* In early request mode we need to emulate the classic
899          * fragment-based playback model. We do this setting the sink
900          * latency to the fragment size. */
901
902         sink_usec = minreq_usec;
903         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
904
905     } else if (s->adjust_latency) {
906
907         /* So, the user asked us to adjust the latency of the stream
908          * buffer according to the what the sink can provide. The
909          * tlength passed in shall be the overall latency. Roughly
910          * half the latency will be spent on the hw buffer, the other
911          * half of it in the async buffer queue we maintain for each
912          * client. In between we'll have a safety space of size
913          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
914          * empty and needs to be filled, then our buffer must have
915          * enough data to fulfill this request immediatly and thus
916          * have at least the same tlength as the size of the hw
917          * buffer. It additionally needs space for 2 times minreq
918          * because if the buffer ran empty and a partial fillup
919          * happens immediately on the next iteration we need to be
920          * able to fulfill it and give the application also minreq
921          * time to fill it up again for the next request Makes 2 times
922          * minreq in plus.. */
923
924         if (tlength_usec > minreq_usec*2)
925             sink_usec = (tlength_usec - minreq_usec*2)/2;
926         else
927             sink_usec = 0;
928
929         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
930
931     } else {
932
933         /* Ok, the user didn't ask us to adjust the latency, but we
934          * still need to make sure that the parameters from the user
935          * do make sense. */
936
937         if (tlength_usec > minreq_usec*2)
938             sink_usec = (tlength_usec - minreq_usec*2);
939         else
940             sink_usec = 0;
941
942         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
943     }
944
945     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
946
947     if (s->early_requests) {
948
949         /* Ok, we didn't necessarily get what we were asking for, so
950          * let's tell the user */
951
952         minreq_usec = s->configured_sink_latency;
953
954     } else if (s->adjust_latency) {
955
956         /* Ok, we didn't necessarily get what we were asking for, so
957          * let's subtract from what we asked for for the remaining
958          * buffer space */
959
960         if (tlength_usec >= s->configured_sink_latency)
961             tlength_usec -= s->configured_sink_latency;
962     }
963
964     /* FIXME: This is actually larger than necessary, since not all of
965      * the sink latency is actually rewritable. */
966     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
967         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
968
969     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
970         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
971         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
972
973     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
974         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
975         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
976
977     if (s->buffer_attr.minreq <= 0) {
978         s->buffer_attr.minreq = (uint32_t) frame_size;
979         s->buffer_attr.tlength += (uint32_t) frame_size*2;
980     }
981
982     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
983         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
984
985     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
986
987     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
988         s->buffer_attr.prebuf > max_prebuf)
989         s->buffer_attr.prebuf = max_prebuf;
990 }
991
992 /* Called from main context */
993 static playback_stream* playback_stream_new(
994         pa_native_connection *c,
995         pa_sink *sink,
996         pa_sample_spec *ss,
997         pa_channel_map *map,
998         pa_buffer_attr *a,
999         pa_cvolume *volume,
1000         pa_bool_t muted,
1001         pa_bool_t muted_set,
1002         uint32_t syncid,
1003         uint32_t *missing,
1004         pa_sink_input_flags_t flags,
1005         pa_proplist *p,
1006         pa_bool_t adjust_latency,
1007         pa_bool_t early_requests,
1008         int *ret) {
1009
1010     playback_stream *s, *ssync;
1011     pa_sink_input *sink_input = NULL;
1012     pa_memchunk silence;
1013     uint32_t idx;
1014     int64_t start_index;
1015     pa_sink_input_new_data data;
1016
1017     pa_assert(c);
1018     pa_assert(ss);
1019     pa_assert(missing);
1020     pa_assert(p);
1021     pa_assert(ret);
1022
1023     /* Find syncid group */
1024     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1025
1026         if (!playback_stream_isinstance(ssync))
1027             continue;
1028
1029         if (ssync->syncid == syncid)
1030             break;
1031     }
1032
1033     /* Synced streams must connect to the same sink */
1034     if (ssync) {
1035
1036         if (!sink)
1037             sink = ssync->sink_input->sink;
1038         else if (sink != ssync->sink_input->sink) {
1039             *ret = PA_ERR_INVALID;
1040             return NULL;
1041         }
1042     }
1043
1044     pa_sink_input_new_data_init(&data);
1045
1046     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1047     data.driver = __FILE__;
1048     data.module = c->options->module;
1049     data.client = c->client;
1050     data.sink = sink;
1051     pa_sink_input_new_data_set_sample_spec(&data, ss);
1052     pa_sink_input_new_data_set_channel_map(&data, map);
1053     if (volume)
1054         pa_sink_input_new_data_set_volume(&data, volume);
1055     if (muted_set)
1056         pa_sink_input_new_data_set_muted(&data, muted);
1057     data.sync_base = ssync ? ssync->sink_input : NULL;
1058
1059     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1060
1061     pa_sink_input_new_data_done(&data);
1062
1063     if (!sink_input)
1064         return NULL;
1065
1066     s = pa_msgobject_new(playback_stream);
1067     s->parent.parent.parent.free = playback_stream_free;
1068     s->parent.parent.process_msg = playback_stream_process_msg;
1069     s->connection = c;
1070     s->syncid = syncid;
1071     s->sink_input = sink_input;
1072     s->is_underrun = TRUE;
1073     s->drain_request = FALSE;
1074     pa_atomic_store(&s->missing, 0);
1075     s->buffer_attr = *a;
1076     s->adjust_latency = adjust_latency;
1077     s->early_requests = early_requests;
1078
1079     s->sink_input->parent.process_msg = sink_input_process_msg;
1080     s->sink_input->pop = sink_input_pop_cb;
1081     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1082     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1083     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1084     s->sink_input->kill = sink_input_kill_cb;
1085     s->sink_input->moving = sink_input_moving_cb;
1086     s->sink_input->suspend = sink_input_suspend_cb;
1087     s->sink_input->send_event = sink_input_send_event_cb;
1088     s->sink_input->userdata = s;
1089
1090     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1091
1092     fix_playback_buffer_attr(s);
1093
1094     pa_sink_input_get_silence(sink_input, &silence);
1095     s->memblockq = pa_memblockq_new(
1096             start_index,
1097             s->buffer_attr.maxlength,
1098             s->buffer_attr.tlength,
1099             pa_frame_size(&sink_input->sample_spec),
1100             s->buffer_attr.prebuf,
1101             s->buffer_attr.minreq,
1102             0,
1103             &silence);
1104     pa_memblock_unref(silence.memblock);
1105
1106     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1107
1108     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1109
1110     *ss = s->sink_input->sample_spec;
1111     *map = s->sink_input->channel_map;
1112
1113     pa_idxset_put(c->output_streams, s, &s->index);
1114
1115     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1116                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1117                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1118                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1119                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1120
1121     pa_sink_input_put(s->sink_input);
1122     return s;
1123 }
1124
1125 /* Called from IO context */
1126 static void playback_stream_request_bytes(playback_stream *s) {
1127     size_t m, minreq;
1128     int previous_missing;
1129
1130     playback_stream_assert_ref(s);
1131
1132     m = pa_memblockq_pop_missing(s->memblockq);
1133
1134     if (m <= 0)
1135         return;
1136
1137 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1138
1139     previous_missing = pa_atomic_add(&s->missing, (int) m);
1140     minreq = pa_memblockq_get_minreq(s->memblockq);
1141
1142     if (pa_memblockq_prebuf_active(s->memblockq) ||
1143         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1144         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1145 }
1146
1147 /* Called from main context */
1148 static void playback_stream_send_killed(playback_stream *p) {
1149     pa_tagstruct *t;
1150     playback_stream_assert_ref(p);
1151
1152     t = pa_tagstruct_new(NULL, 0);
1153     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1154     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1155     pa_tagstruct_putu32(t, p->index);
1156     pa_pstream_send_tagstruct(p->connection->pstream, t);
1157 }
1158
1159 /* Called from main context */
1160 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1161     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1162     pa_native_connection_assert_ref(c);
1163
1164     if (!c->protocol)
1165         return -1;
1166
1167     switch (code) {
1168
1169         case CONNECTION_MESSAGE_REVOKE:
1170             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1171             break;
1172
1173         case CONNECTION_MESSAGE_RELEASE:
1174             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1175             break;
1176     }
1177
1178     return 0;
1179 }
1180
1181 /* Called from main context */
1182 static void native_connection_unlink(pa_native_connection *c) {
1183     record_stream *r;
1184     output_stream *o;
1185
1186     pa_assert(c);
1187
1188     if (!c->protocol)
1189         return;
1190
1191     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1192
1193     if (c->options)
1194         pa_native_options_unref(c->options);
1195
1196     while ((r = pa_idxset_first(c->record_streams, NULL)))
1197         record_stream_unlink(r);
1198
1199     while ((o = pa_idxset_first(c->output_streams, NULL)))
1200         if (playback_stream_isinstance(o))
1201             playback_stream_unlink(PLAYBACK_STREAM(o));
1202         else
1203             upload_stream_unlink(UPLOAD_STREAM(o));
1204
1205     if (c->subscription)
1206         pa_subscription_free(c->subscription);
1207
1208     if (c->pstream)
1209         pa_pstream_unlink(c->pstream);
1210
1211     if (c->auth_timeout_event) {
1212         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1213         c->auth_timeout_event = NULL;
1214     }
1215
1216     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1217     c->protocol = NULL;
1218     pa_native_connection_unref(c);
1219 }
1220
1221 /* Called from main context */
1222 static void native_connection_free(pa_object *o) {
1223     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1224
1225     pa_assert(c);
1226
1227     native_connection_unlink(c);
1228
1229     pa_idxset_free(c->record_streams, NULL, NULL);
1230     pa_idxset_free(c->output_streams, NULL, NULL);
1231
1232     pa_pdispatch_unref(c->pdispatch);
1233     pa_pstream_unref(c->pstream);
1234     pa_client_free(c->client);
1235
1236     pa_xfree(c);
1237 }
1238
1239 /* Called from main context */
1240 static void native_connection_send_memblock(pa_native_connection *c) {
1241     uint32_t start;
1242     record_stream *r;
1243
1244     start = PA_IDXSET_INVALID;
1245     for (;;) {
1246         pa_memchunk chunk;
1247
1248         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1249             return;
1250
1251         if (start == PA_IDXSET_INVALID)
1252             start = c->rrobin_index;
1253         else if (start == c->rrobin_index)
1254             return;
1255
1256         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1257             pa_memchunk schunk = chunk;
1258
1259             if (schunk.length > r->buffer_attr.fragsize)
1260                 schunk.length = r->buffer_attr.fragsize;
1261
1262             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1263
1264             pa_memblockq_drop(r->memblockq, schunk.length);
1265             pa_memblock_unref(schunk.memblock);
1266
1267             return;
1268         }
1269     }
1270 }
1271
1272 /*** sink input callbacks ***/
1273
1274 /* Called from thread context */
1275 static void handle_seek(playback_stream *s, int64_t indexw) {
1276     playback_stream_assert_ref(s);
1277
1278 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1279
1280     if (s->sink_input->thread_info.underrun_for > 0) {
1281
1282 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1283
1284         if (pa_memblockq_is_readable(s->memblockq)) {
1285
1286             /* We just ended an underrun, let's ask the sink
1287              * for a complete rewind rewrite */
1288
1289             pa_log_debug("Requesting rewind due to end of underrun.");
1290             pa_sink_input_request_rewind(s->sink_input,
1291                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1292                                          FALSE, TRUE, FALSE);
1293         }
1294
1295     } else {
1296         int64_t indexr;
1297
1298         indexr = pa_memblockq_get_read_index(s->memblockq);
1299
1300         if (indexw < indexr) {
1301             /* OK, the sink already asked for this data, so
1302              * let's have it usk us again */
1303
1304             pa_log_debug("Requesting rewind due to rewrite.");
1305             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1306         }
1307     }
1308
1309     playback_stream_request_bytes(s);
1310 }
1311
1312 /* Called from thread context */
1313 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1314     pa_sink_input *i = PA_SINK_INPUT(o);
1315     playback_stream *s;
1316
1317     pa_sink_input_assert_ref(i);
1318     s = PLAYBACK_STREAM(i->userdata);
1319     playback_stream_assert_ref(s);
1320
1321     switch (code) {
1322
1323         case SINK_INPUT_MESSAGE_SEEK: {
1324             int64_t windex;
1325
1326             windex = pa_memblockq_get_write_index(s->memblockq);
1327
1328             /* The client side is incapable of accounting correctly
1329              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1330              * able to deal with that. */
1331
1332             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1333
1334             handle_seek(s, windex);
1335             return 0;
1336         }
1337
1338         case SINK_INPUT_MESSAGE_POST_DATA: {
1339             int64_t windex;
1340
1341             pa_assert(chunk);
1342
1343             windex = pa_memblockq_get_write_index(s->memblockq);
1344
1345 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1346
1347             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1348                 pa_log_warn("Failed to push data into queue");
1349                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1350                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1351             }
1352
1353             handle_seek(s, windex);
1354
1355 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1356
1357             return 0;
1358         }
1359
1360         case SINK_INPUT_MESSAGE_DRAIN:
1361         case SINK_INPUT_MESSAGE_FLUSH:
1362         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1363         case SINK_INPUT_MESSAGE_TRIGGER: {
1364
1365             int64_t windex;
1366             pa_sink_input *isync;
1367             void (*func)(pa_memblockq *bq);
1368
1369             switch  (code) {
1370                 case SINK_INPUT_MESSAGE_FLUSH:
1371                     func = pa_memblockq_flush_write;
1372                     break;
1373
1374                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1375                     func = pa_memblockq_prebuf_force;
1376                     break;
1377
1378                 case SINK_INPUT_MESSAGE_DRAIN:
1379                 case SINK_INPUT_MESSAGE_TRIGGER:
1380                     func = pa_memblockq_prebuf_disable;
1381                     break;
1382
1383                 default:
1384                     pa_assert_not_reached();
1385             }
1386
1387             windex = pa_memblockq_get_write_index(s->memblockq);
1388             func(s->memblockq);
1389             handle_seek(s, windex);
1390
1391             /* Do the same for all other members in the sync group */
1392             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1393                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1394                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1395                 func(ssync->memblockq);
1396                 handle_seek(ssync, windex);
1397             }
1398
1399             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1400                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1401                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1402                 func(ssync->memblockq);
1403                 handle_seek(ssync, windex);
1404             }
1405
1406             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1407                 if (!pa_memblockq_is_readable(s->memblockq))
1408                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1409                 else {
1410                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1411                     s->drain_request = TRUE;
1412                 }
1413             }
1414
1415             return 0;
1416         }
1417
1418         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1419             /* Atomically get a snapshot of all timing parameters... */
1420             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1421             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1422             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1423             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1424             s->underrun_for = s->sink_input->thread_info.underrun_for;
1425             s->playing_for = s->sink_input->thread_info.playing_for;
1426
1427             return 0;
1428
1429         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1430             int64_t windex;
1431
1432             windex = pa_memblockq_get_write_index(s->memblockq);
1433
1434             pa_memblockq_prebuf_force(s->memblockq);
1435
1436             handle_seek(s, windex);
1437
1438             /* Fall through to the default handler */
1439             break;
1440         }
1441
1442         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1443             pa_usec_t *r = userdata;
1444
1445             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1446
1447             /* Fall through, the default handler will add in the extra
1448              * latency added by the resampler */
1449             break;
1450         }
1451
1452         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1453             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1454             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1455             return 0;
1456         }
1457     }
1458
1459     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1460 }
1461
1462 /* Called from thread context */
1463 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1464     playback_stream *s;
1465
1466     pa_sink_input_assert_ref(i);
1467     s = PLAYBACK_STREAM(i->userdata);
1468     playback_stream_assert_ref(s);
1469     pa_assert(chunk);
1470
1471 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1472
1473     if (pa_memblockq_is_readable(s->memblockq))
1474         s->is_underrun = FALSE;
1475     else {
1476         if (!s->is_underrun)
1477             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1478
1479         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1480             s->drain_request = FALSE;
1481             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1482         } else if (!s->is_underrun)
1483             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1484
1485         s->is_underrun = TRUE;
1486
1487         playback_stream_request_bytes(s);
1488     }
1489
1490     /* This call will not fail with prebuf=0, hence we check for
1491        underrun explicitly above */
1492     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1493         return -1;
1494
1495     chunk->length = PA_MIN(nbytes, chunk->length);
1496
1497     if (i->thread_info.underrun_for > 0)
1498         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1499
1500     pa_memblockq_drop(s->memblockq, chunk->length);
1501     playback_stream_request_bytes(s);
1502
1503     return 0;
1504 }
1505
1506 /* Called from thread context */
1507 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1508     playback_stream *s;
1509
1510     pa_sink_input_assert_ref(i);
1511     s = PLAYBACK_STREAM(i->userdata);
1512     playback_stream_assert_ref(s);
1513
1514     /* If we are in an underrun, then we don't rewind */
1515     if (i->thread_info.underrun_for > 0)
1516         return;
1517
1518     pa_memblockq_rewind(s->memblockq, nbytes);
1519 }
1520
1521 /* Called from thread context */
1522 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1523     playback_stream *s;
1524
1525     pa_sink_input_assert_ref(i);
1526     s = PLAYBACK_STREAM(i->userdata);
1527     playback_stream_assert_ref(s);
1528
1529     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1530 }
1531
1532 /* Called from thread context */
1533 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1534     playback_stream *s;
1535     size_t new_tlength, old_tlength;
1536
1537     pa_sink_input_assert_ref(i);
1538     s = PLAYBACK_STREAM(i->userdata);
1539     playback_stream_assert_ref(s);
1540
1541     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1542     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1543
1544     if (old_tlength < new_tlength) {
1545         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1546         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1547         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1548
1549         if (new_tlength == old_tlength)
1550             pa_log_debug("Failed to increase tlength");
1551         else {
1552             pa_log_debug("Notifying client about increased tlength");
1553             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1554         }
1555     }
1556 }
1557
1558 /* Called from main context */
1559 static void sink_input_kill_cb(pa_sink_input *i) {
1560     playback_stream *s;
1561
1562     pa_sink_input_assert_ref(i);
1563     s = PLAYBACK_STREAM(i->userdata);
1564     playback_stream_assert_ref(s);
1565
1566     playback_stream_send_killed(s);
1567     playback_stream_unlink(s);
1568 }
1569
1570 /* Called from main context */
1571 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1572     playback_stream *s;
1573     pa_tagstruct *t;
1574
1575     pa_sink_input_assert_ref(i);
1576     s = PLAYBACK_STREAM(i->userdata);
1577     playback_stream_assert_ref(s);
1578
1579     if (s->connection->version < 15)
1580       return;
1581
1582     t = pa_tagstruct_new(NULL, 0);
1583     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1584     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1585     pa_tagstruct_putu32(t, s->index);
1586     pa_tagstruct_puts(t, event);
1587     pa_tagstruct_put_proplist(t, pl);
1588     pa_pstream_send_tagstruct(s->connection->pstream, t);
1589 }
1590
1591 /* Called from main context */
1592 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1593     playback_stream *s;
1594     pa_tagstruct *t;
1595
1596     pa_sink_input_assert_ref(i);
1597     s = PLAYBACK_STREAM(i->userdata);
1598     playback_stream_assert_ref(s);
1599
1600     if (s->connection->version < 12)
1601       return;
1602
1603     t = pa_tagstruct_new(NULL, 0);
1604     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1605     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1606     pa_tagstruct_putu32(t, s->index);
1607     pa_tagstruct_put_boolean(t, suspend);
1608     pa_pstream_send_tagstruct(s->connection->pstream, t);
1609 }
1610
1611 /* Called from main context */
1612 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1613     playback_stream *s;
1614     pa_tagstruct *t;
1615
1616     pa_sink_input_assert_ref(i);
1617     s = PLAYBACK_STREAM(i->userdata);
1618     playback_stream_assert_ref(s);
1619
1620     if (!dest)
1621         return;
1622
1623     fix_playback_buffer_attr(s);
1624     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1625     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1626
1627     if (s->connection->version < 12)
1628       return;
1629
1630     t = pa_tagstruct_new(NULL, 0);
1631     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1632     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1633     pa_tagstruct_putu32(t, s->index);
1634     pa_tagstruct_putu32(t, dest->index);
1635     pa_tagstruct_puts(t, dest->name);
1636     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1637
1638     if (s->connection->version >= 13) {
1639         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1640         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1641         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1642         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1643         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1644     }
1645
1646     pa_pstream_send_tagstruct(s->connection->pstream, t);
1647 }
1648
1649 /*** source_output callbacks ***/
1650
1651 /* Called from thread context */
1652 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1653     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1654     record_stream *s;
1655
1656     pa_source_output_assert_ref(o);
1657     s = RECORD_STREAM(o->userdata);
1658     record_stream_assert_ref(s);
1659
1660     switch (code) {
1661         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1662             /* Atomically get a snapshot of all timing parameters... */
1663             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1664             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1665             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1666             return 0;
1667     }
1668
1669     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1670 }
1671
1672 /* Called from thread context */
1673 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1674     record_stream *s;
1675
1676     pa_source_output_assert_ref(o);
1677     s = RECORD_STREAM(o->userdata);
1678     record_stream_assert_ref(s);
1679     pa_assert(chunk);
1680
1681     pa_atomic_add(&s->on_the_fly, chunk->length);
1682     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1683 }
1684
1685 static void source_output_kill_cb(pa_source_output *o) {
1686     record_stream *s;
1687
1688     pa_source_output_assert_ref(o);
1689     s = RECORD_STREAM(o->userdata);
1690     record_stream_assert_ref(s);
1691
1692     record_stream_send_killed(s);
1693     record_stream_unlink(s);
1694 }
1695
1696 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1697     record_stream *s;
1698
1699     pa_source_output_assert_ref(o);
1700     s = RECORD_STREAM(o->userdata);
1701     record_stream_assert_ref(s);
1702
1703     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1704
1705     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1706 }
1707
1708 /* Called from main context */
1709 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1710     record_stream *s;
1711     pa_tagstruct *t;
1712
1713     pa_source_output_assert_ref(o);
1714     s = RECORD_STREAM(o->userdata);
1715     record_stream_assert_ref(s);
1716
1717     if (s->connection->version < 15)
1718       return;
1719
1720     t = pa_tagstruct_new(NULL, 0);
1721     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1722     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1723     pa_tagstruct_putu32(t, s->index);
1724     pa_tagstruct_puts(t, event);
1725     pa_tagstruct_put_proplist(t, pl);
1726     pa_pstream_send_tagstruct(s->connection->pstream, t);
1727 }
1728
1729 /* Called from main context */
1730 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1731     record_stream *s;
1732     pa_tagstruct *t;
1733
1734     pa_source_output_assert_ref(o);
1735     s = RECORD_STREAM(o->userdata);
1736     record_stream_assert_ref(s);
1737
1738     if (s->connection->version < 12)
1739       return;
1740
1741     t = pa_tagstruct_new(NULL, 0);
1742     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1743     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1744     pa_tagstruct_putu32(t, s->index);
1745     pa_tagstruct_put_boolean(t, suspend);
1746     pa_pstream_send_tagstruct(s->connection->pstream, t);
1747 }
1748
1749 /* Called from main context */
1750 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1751     record_stream *s;
1752     pa_tagstruct *t;
1753
1754     pa_source_output_assert_ref(o);
1755     s = RECORD_STREAM(o->userdata);
1756     record_stream_assert_ref(s);
1757
1758     if (!dest)
1759         return;
1760
1761     fix_record_buffer_attr_pre(s);
1762     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1763     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1764     fix_record_buffer_attr_post(s);
1765
1766     if (s->connection->version < 12)
1767       return;
1768
1769     t = pa_tagstruct_new(NULL, 0);
1770     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1771     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1772     pa_tagstruct_putu32(t, s->index);
1773     pa_tagstruct_putu32(t, dest->index);
1774     pa_tagstruct_puts(t, dest->name);
1775     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1776
1777     if (s->connection->version >= 13) {
1778         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1779         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1780         pa_tagstruct_put_usec(t, s->configured_source_latency);
1781     }
1782
1783     pa_pstream_send_tagstruct(s->connection->pstream, t);
1784 }
1785
1786 /*** pdispatch callbacks ***/
1787
1788 static void protocol_error(pa_native_connection *c) {
1789     pa_log("protocol error, kicking client");
1790     native_connection_unlink(c);
1791 }
1792
1793 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1794 if (!(expression)) { \
1795     pa_pstream_send_error((pstream), (tag), (error)); \
1796     return; \
1797 } \
1798 } while(0);
1799
1800 static pa_tagstruct *reply_new(uint32_t tag) {
1801     pa_tagstruct *reply;
1802
1803     reply = pa_tagstruct_new(NULL, 0);
1804     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1805     pa_tagstruct_putu32(reply, tag);
1806     return reply;
1807 }
1808
1809 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1810     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1811     playback_stream *s;
1812     uint32_t sink_index, syncid, missing;
1813     pa_buffer_attr attr;
1814     const char *name = NULL, *sink_name;
1815     pa_sample_spec ss;
1816     pa_channel_map map;
1817     pa_tagstruct *reply;
1818     pa_sink *sink = NULL;
1819     pa_cvolume volume;
1820     pa_bool_t
1821         corked = FALSE,
1822         no_remap = FALSE,
1823         no_remix = FALSE,
1824         fix_format = FALSE,
1825         fix_rate = FALSE,
1826         fix_channels = FALSE,
1827         no_move = FALSE,
1828         variable_rate = FALSE,
1829         muted = FALSE,
1830         adjust_latency = FALSE,
1831         early_requests = FALSE,
1832         dont_inhibit_auto_suspend = FALSE,
1833         muted_set = FALSE,
1834         fail_on_suspend = FALSE;
1835     pa_sink_input_flags_t flags = 0;
1836     pa_proplist *p;
1837     pa_bool_t volume_set = TRUE;
1838     int ret = PA_ERR_INVALID;
1839
1840     pa_native_connection_assert_ref(c);
1841     pa_assert(t);
1842     memset(&attr, 0, sizeof(attr));
1843
1844     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1845         pa_tagstruct_get(
1846                 t,
1847                 PA_TAG_SAMPLE_SPEC, &ss,
1848                 PA_TAG_CHANNEL_MAP, &map,
1849                 PA_TAG_U32, &sink_index,
1850                 PA_TAG_STRING, &sink_name,
1851                 PA_TAG_U32, &attr.maxlength,
1852                 PA_TAG_BOOLEAN, &corked,
1853                 PA_TAG_U32, &attr.tlength,
1854                 PA_TAG_U32, &attr.prebuf,
1855                 PA_TAG_U32, &attr.minreq,
1856                 PA_TAG_U32, &syncid,
1857                 PA_TAG_CVOLUME, &volume,
1858                 PA_TAG_INVALID) < 0) {
1859
1860         protocol_error(c);
1861         return;
1862     }
1863
1864     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1865     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1866     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1867     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1868     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1869     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1870     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1871     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1872
1873     p = pa_proplist_new();
1874
1875     if (name)
1876         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1877
1878     if (c->version >= 12)  {
1879         /* Since 0.9.8 the user can ask for a couple of additional flags */
1880
1881         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1882             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1883             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1884             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1885             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1886             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1887             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1888
1889             protocol_error(c);
1890             pa_proplist_free(p);
1891             return;
1892         }
1893     }
1894
1895     if (c->version >= 13) {
1896
1897         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1898             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1899             pa_tagstruct_get_proplist(t, p) < 0) {
1900             protocol_error(c);
1901             pa_proplist_free(p);
1902             return;
1903         }
1904     }
1905
1906     if (c->version >= 14) {
1907
1908         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1909             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1910             protocol_error(c);
1911             pa_proplist_free(p);
1912             return;
1913         }
1914     }
1915
1916     if (c->version >= 15) {
1917
1918         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1919             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1920             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1921             protocol_error(c);
1922             pa_proplist_free(p);
1923             return;
1924         }
1925     }
1926
1927     if (!pa_tagstruct_eof(t)) {
1928         protocol_error(c);
1929         pa_proplist_free(p);
1930         return;
1931     }
1932
1933     if (sink_index != PA_INVALID_INDEX) {
1934
1935         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1936             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1937             pa_proplist_free(p);
1938             return;
1939         }
1940
1941     } else if (sink_name) {
1942
1943         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1944             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1945             pa_proplist_free(p);
1946             return;
1947         }
1948     }
1949
1950     flags =
1951         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1952         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1953         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1954         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1955         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1956         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1957         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1958         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1959         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1960         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1961
1962     /* Only since protocol version 15 there's a seperate muted_set
1963      * flag. For older versions we synthesize it here */
1964     muted_set = muted_set || muted;
1965
1966     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1967     pa_proplist_free(p);
1968
1969     CHECK_VALIDITY(c->pstream, s, tag, ret);
1970
1971     reply = reply_new(tag);
1972     pa_tagstruct_putu32(reply, s->index);
1973     pa_assert(s->sink_input);
1974     pa_tagstruct_putu32(reply, s->sink_input->index);
1975     pa_tagstruct_putu32(reply, missing);
1976
1977 /*     pa_log("initial request is %u", missing); */
1978
1979     if (c->version >= 9) {
1980         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1981
1982         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1983         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1984         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1985         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1986     }
1987
1988     if (c->version >= 12) {
1989         /* Since 0.9.8 we support sending the chosen sample
1990          * spec/channel map/device/suspend status back to the
1991          * client */
1992
1993         pa_tagstruct_put_sample_spec(reply, &ss);
1994         pa_tagstruct_put_channel_map(reply, &map);
1995
1996         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1997         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1998
1999         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2000     }
2001
2002     if (c->version >= 13)
2003         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2004
2005     pa_pstream_send_tagstruct(c->pstream, reply);
2006 }
2007
2008 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2009     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2010     uint32_t channel;
2011
2012     pa_native_connection_assert_ref(c);
2013     pa_assert(t);
2014
2015     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2016         !pa_tagstruct_eof(t)) {
2017         protocol_error(c);
2018         return;
2019     }
2020
2021     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2022
2023     switch (command) {
2024
2025         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2026             playback_stream *s;
2027             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2028                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2029                 return;
2030             }
2031
2032             playback_stream_unlink(s);
2033             break;
2034         }
2035
2036         case PA_COMMAND_DELETE_RECORD_STREAM: {
2037             record_stream *s;
2038             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2039                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2040                 return;
2041             }
2042
2043             record_stream_unlink(s);
2044             break;
2045         }
2046
2047         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2048             upload_stream *s;
2049
2050             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2051                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2052                 return;
2053             }
2054
2055             upload_stream_unlink(s);
2056             break;
2057         }
2058
2059         default:
2060             pa_assert_not_reached();
2061     }
2062
2063     pa_pstream_send_simple_ack(c->pstream, tag);
2064 }
2065
2066 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2067     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2068     record_stream *s;
2069     pa_buffer_attr attr;
2070     uint32_t source_index;
2071     const char *name = NULL, *source_name;
2072     pa_sample_spec ss;
2073     pa_channel_map map;
2074     pa_tagstruct *reply;
2075     pa_source *source = NULL;
2076     pa_bool_t
2077         corked = FALSE,
2078         no_remap = FALSE,
2079         no_remix = FALSE,
2080         fix_format = FALSE,
2081         fix_rate = FALSE,
2082         fix_channels = FALSE,
2083         no_move = FALSE,
2084         variable_rate = FALSE,
2085         adjust_latency = FALSE,
2086         peak_detect = FALSE,
2087         early_requests = FALSE,
2088         dont_inhibit_auto_suspend = FALSE,
2089         fail_on_suspend = FALSE;
2090     pa_source_output_flags_t flags = 0;
2091     pa_proplist *p;
2092     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2093     pa_sink_input *direct_on_input = NULL;
2094     int ret = PA_ERR_INVALID;
2095
2096     pa_native_connection_assert_ref(c);
2097     pa_assert(t);
2098
2099     memset(&attr, 0, sizeof(attr));
2100
2101     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2102         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2103         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2104         pa_tagstruct_getu32(t, &source_index) < 0 ||
2105         pa_tagstruct_gets(t, &source_name) < 0 ||
2106         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2107         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2108         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2109         protocol_error(c);
2110         return;
2111     }
2112
2113     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2114     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2115     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2116     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2117     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2118     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2119     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2120
2121     p = pa_proplist_new();
2122
2123     if (name)
2124         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2125
2126     if (c->version >= 12)  {
2127         /* Since 0.9.8 the user can ask for a couple of additional flags */
2128
2129         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2130             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2131             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2132             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2133             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2134             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2135             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2136
2137             protocol_error(c);
2138             pa_proplist_free(p);
2139             return;
2140         }
2141     }
2142
2143     if (c->version >= 13) {
2144
2145         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2146             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2147             pa_tagstruct_get_proplist(t, p) < 0 ||
2148             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2149             protocol_error(c);
2150             pa_proplist_free(p);
2151             return;
2152         }
2153     }
2154
2155     if (c->version >= 14) {
2156
2157         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2158             protocol_error(c);
2159             pa_proplist_free(p);
2160             return;
2161         }
2162     }
2163
2164     if (c->version >= 15) {
2165
2166         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2167             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2168             protocol_error(c);
2169             pa_proplist_free(p);
2170             return;
2171         }
2172     }
2173
2174     if (!pa_tagstruct_eof(t)) {
2175         protocol_error(c);
2176         pa_proplist_free(p);
2177         return;
2178     }
2179
2180     if (source_index != PA_INVALID_INDEX) {
2181
2182         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2183             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2184             pa_proplist_free(p);
2185             return;
2186         }
2187
2188     } else if (source_name) {
2189
2190         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2191             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2192             pa_proplist_free(p);
2193             return;
2194         }
2195     }
2196
2197     if (direct_on_input_idx != PA_INVALID_INDEX) {
2198
2199         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2200             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2201             pa_proplist_free(p);
2202             return;
2203         }
2204     }
2205
2206     flags =
2207         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2208         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2209         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2210         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2211         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2212         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2213         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2214         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2215         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2216         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2217
2218     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2219     pa_proplist_free(p);
2220
2221     CHECK_VALIDITY(c->pstream, s, tag, ret);
2222
2223     reply = reply_new(tag);
2224     pa_tagstruct_putu32(reply, s->index);
2225     pa_assert(s->source_output);
2226     pa_tagstruct_putu32(reply, s->source_output->index);
2227
2228     if (c->version >= 9) {
2229         /* Since 0.9 we support sending the buffer metrics back to the client */
2230
2231         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2232         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2233     }
2234
2235     if (c->version >= 12) {
2236         /* Since 0.9.8 we support sending the chosen sample
2237          * spec/channel map/device/suspend status back to the
2238          * client */
2239
2240         pa_tagstruct_put_sample_spec(reply, &ss);
2241         pa_tagstruct_put_channel_map(reply, &map);
2242
2243         pa_tagstruct_putu32(reply, s->source_output->source->index);
2244         pa_tagstruct_puts(reply, s->source_output->source->name);
2245
2246         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2247     }
2248
2249     if (c->version >= 13)
2250         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2251
2252     pa_pstream_send_tagstruct(c->pstream, reply);
2253 }
2254
2255 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2256     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2257     int ret;
2258
2259     pa_native_connection_assert_ref(c);
2260     pa_assert(t);
2261
2262     if (!pa_tagstruct_eof(t)) {
2263         protocol_error(c);
2264         return;
2265     }
2266
2267     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2268     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2269     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2270
2271     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2272 }
2273
2274 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2275     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2276     const void*cookie;
2277     pa_tagstruct *reply;
2278     pa_bool_t shm_on_remote = FALSE, do_shm;
2279
2280     pa_native_connection_assert_ref(c);
2281     pa_assert(t);
2282
2283     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2284         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2285         !pa_tagstruct_eof(t)) {
2286         protocol_error(c);
2287         return;
2288     }
2289
2290     /* Minimum supported version */
2291     if (c->version < 8) {
2292         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2293         return;
2294     }
2295
2296     /* Starting with protocol version 13 the MSB of the version tag
2297        reflects if shm is available for this pa_native_connection or
2298        not. */
2299     if (c->version >= 13) {
2300         shm_on_remote = !!(c->version & 0x80000000U);
2301         c->version &= 0x7FFFFFFFU;
2302     }
2303
2304     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2305
2306     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2307
2308     if (!c->authorized) {
2309         pa_bool_t success = FALSE;
2310
2311 #ifdef HAVE_CREDS
2312         const pa_creds *creds;
2313
2314         if ((creds = pa_pdispatch_creds(pd))) {
2315             if (creds->uid == getuid())
2316                 success = TRUE;
2317             else if (c->options->auth_group) {
2318                 int r;
2319                 gid_t gid;
2320
2321                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2322                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2323                 else if (gid == creds->gid)
2324                     success = TRUE;
2325
2326                 if (!success) {
2327                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2328                         pa_log_warn("Failed to check group membership.");
2329                     else if (r > 0)
2330                         success = TRUE;
2331                 }
2332             }
2333
2334             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2335                         (unsigned long) creds->uid,
2336                         (unsigned long) creds->gid,
2337                         (int) success);
2338         }
2339 #endif
2340
2341         if (!success && c->options->auth_cookie) {
2342             const uint8_t *ac;
2343
2344             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2345                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2346                     success = TRUE;
2347         }
2348
2349         if (!success) {
2350             pa_log_warn("Denied access to client with invalid authorization data.");
2351             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2352             return;
2353         }
2354
2355         c->authorized = TRUE;
2356         if (c->auth_timeout_event) {
2357             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2358             c->auth_timeout_event = NULL;
2359         }
2360     }
2361
2362     /* Enable shared memory support if possible */
2363     do_shm =
2364         pa_mempool_is_shared(c->protocol->core->mempool) &&
2365         c->is_local;
2366
2367     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2368
2369     if (do_shm)
2370         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2371             do_shm = FALSE;
2372
2373 #ifdef HAVE_CREDS
2374     if (do_shm) {
2375         /* Only enable SHM if both sides are owned by the same
2376          * user. This is a security measure because otherwise data
2377          * private to the user might leak. */
2378
2379         const pa_creds *creds;
2380         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2381             do_shm = FALSE;
2382     }
2383 #endif
2384
2385     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2386     pa_pstream_enable_shm(c->pstream, do_shm);
2387
2388     reply = reply_new(tag);
2389     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2390
2391 #ifdef HAVE_CREDS
2392 {
2393     /* SHM support is only enabled after both sides made sure they are the same user. */
2394
2395     pa_creds ucred;
2396
2397     ucred.uid = getuid();
2398     ucred.gid = getgid();
2399
2400     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2401 }
2402 #else
2403     pa_pstream_send_tagstruct(c->pstream, reply);
2404 #endif
2405 }
2406
2407 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2408     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2409     const char *name = NULL;
2410     pa_proplist *p;
2411     pa_tagstruct *reply;
2412
2413     pa_native_connection_assert_ref(c);
2414     pa_assert(t);
2415
2416     p = pa_proplist_new();
2417
2418     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2419         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2420         !pa_tagstruct_eof(t)) {
2421
2422         protocol_error(c);
2423         pa_proplist_free(p);
2424         return;
2425     }
2426
2427     if (name)
2428         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2429             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2430             pa_proplist_free(p);
2431             return;
2432         }
2433
2434     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2435     pa_proplist_free(p);
2436
2437     reply = reply_new(tag);
2438
2439     if (c->version >= 13)
2440         pa_tagstruct_putu32(reply, c->client->index);
2441
2442     pa_pstream_send_tagstruct(c->pstream, reply);
2443 }
2444
2445 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2446     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2447     const char *name;
2448     uint32_t idx = PA_IDXSET_INVALID;
2449
2450     pa_native_connection_assert_ref(c);
2451     pa_assert(t);
2452
2453     if (pa_tagstruct_gets(t, &name) < 0 ||
2454         !pa_tagstruct_eof(t)) {
2455         protocol_error(c);
2456         return;
2457     }
2458
2459     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2460     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2461
2462     if (command == PA_COMMAND_LOOKUP_SINK) {
2463         pa_sink *sink;
2464         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2465             idx = sink->index;
2466     } else {
2467         pa_source *source;
2468         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2469         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2470             idx = source->index;
2471     }
2472
2473     if (idx == PA_IDXSET_INVALID)
2474         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2475     else {
2476         pa_tagstruct *reply;
2477         reply = reply_new(tag);
2478         pa_tagstruct_putu32(reply, idx);
2479         pa_pstream_send_tagstruct(c->pstream, reply);
2480     }
2481 }
2482
2483 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2484     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2485     uint32_t idx;
2486     playback_stream *s;
2487
2488     pa_native_connection_assert_ref(c);
2489     pa_assert(t);
2490
2491     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2492         !pa_tagstruct_eof(t)) {
2493         protocol_error(c);
2494         return;
2495     }
2496
2497     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2498     s = pa_idxset_get_by_index(c->output_streams, idx);
2499     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2500     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2501
2502     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2503 }
2504
2505 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2506     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2507     pa_tagstruct *reply;
2508     const pa_mempool_stat *stat;
2509
2510     pa_native_connection_assert_ref(c);
2511     pa_assert(t);
2512
2513     if (!pa_tagstruct_eof(t)) {
2514         protocol_error(c);
2515         return;
2516     }
2517
2518     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2519
2520     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2521
2522     reply = reply_new(tag);
2523     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2524     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2525     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2526     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2527     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2528     pa_pstream_send_tagstruct(c->pstream, reply);
2529 }
2530
2531 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2532     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2533     pa_tagstruct *reply;
2534     playback_stream *s;
2535     struct timeval tv, now;
2536     uint32_t idx;
2537
2538     pa_native_connection_assert_ref(c);
2539     pa_assert(t);
2540
2541     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2542         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2543         !pa_tagstruct_eof(t)) {
2544         protocol_error(c);
2545         return;
2546     }
2547
2548     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2549     s = pa_idxset_get_by_index(c->output_streams, idx);
2550     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2551     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2552
2553     /* Get an atomic snapshot of all timing parameters */
2554     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2555
2556     reply = reply_new(tag);
2557     pa_tagstruct_put_usec(reply,
2558                           s->current_sink_latency +
2559                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2560     pa_tagstruct_put_usec(reply, 0);
2561     pa_tagstruct_put_boolean(reply,
2562                              s->playing_for > 0 &&
2563                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2564                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2565     pa_tagstruct_put_timeval(reply, &tv);
2566     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2567     pa_tagstruct_puts64(reply, s->write_index);
2568     pa_tagstruct_puts64(reply, s->read_index);
2569
2570     if (c->version >= 13) {
2571         pa_tagstruct_putu64(reply, s->underrun_for);
2572         pa_tagstruct_putu64(reply, s->playing_for);
2573     }
2574
2575     pa_pstream_send_tagstruct(c->pstream, reply);
2576 }
2577
2578 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2579     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2580     pa_tagstruct *reply;
2581     record_stream *s;
2582     struct timeval tv, now;
2583     uint32_t idx;
2584
2585     pa_native_connection_assert_ref(c);
2586     pa_assert(t);
2587
2588     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2589         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2590         !pa_tagstruct_eof(t)) {
2591         protocol_error(c);
2592         return;
2593     }
2594
2595     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2596     s = pa_idxset_get_by_index(c->record_streams, idx);
2597     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2598
2599     /* Get an atomic snapshot of all timing parameters */
2600     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2601
2602     reply = reply_new(tag);
2603     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2604     pa_tagstruct_put_usec(reply,
2605                           s->current_source_latency +
2606                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2607     pa_tagstruct_put_boolean(reply,
2608                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2609                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2610     pa_tagstruct_put_timeval(reply, &tv);
2611     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2612     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2613     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2614     pa_pstream_send_tagstruct(c->pstream, reply);
2615 }
2616
2617 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2618     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2619     upload_stream *s;
2620     uint32_t length;
2621     const char *name = NULL;
2622     pa_sample_spec ss;
2623     pa_channel_map map;
2624     pa_tagstruct *reply;
2625     pa_proplist *p;
2626
2627     pa_native_connection_assert_ref(c);
2628     pa_assert(t);
2629
2630     if (pa_tagstruct_gets(t, &name) < 0 ||
2631         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2632         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2633         pa_tagstruct_getu32(t, &length) < 0) {
2634         protocol_error(c);
2635         return;
2636     }
2637
2638     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2639     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2640     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2641     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2642     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2643     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2644
2645     p = pa_proplist_new();
2646
2647     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2648         !pa_tagstruct_eof(t)) {
2649
2650         protocol_error(c);
2651         pa_proplist_free(p);
2652         return;
2653     }
2654
2655     if (c->version < 13)
2656         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2657     else if (!name)
2658         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2659             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2660
2661     if (!name || !pa_namereg_is_valid_name(name)) {
2662         pa_proplist_free(p);
2663         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2664     }
2665
2666     s = upload_stream_new(c, &ss, &map, name, length, p);
2667     pa_proplist_free(p);
2668
2669     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2670
2671     reply = reply_new(tag);
2672     pa_tagstruct_putu32(reply, s->index);
2673     pa_tagstruct_putu32(reply, length);
2674     pa_pstream_send_tagstruct(c->pstream, reply);
2675 }
2676
2677 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2678     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2679     uint32_t channel;
2680     upload_stream *s;
2681     uint32_t idx;
2682
2683     pa_native_connection_assert_ref(c);
2684     pa_assert(t);
2685
2686     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2687         !pa_tagstruct_eof(t)) {
2688         protocol_error(c);
2689         return;
2690     }
2691
2692     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2693
2694     s = pa_idxset_get_by_index(c->output_streams, channel);
2695     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2696     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2697
2698     if (!s->memchunk.memblock)
2699         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2700     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2701         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2702     else
2703         pa_pstream_send_simple_ack(c->pstream, tag);
2704
2705     upload_stream_unlink(s);
2706 }
2707
2708 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2709     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2710     uint32_t sink_index;
2711     pa_volume_t volume;
2712     pa_sink *sink;
2713     const char *name, *sink_name;
2714     uint32_t idx;
2715     pa_proplist *p;
2716     pa_tagstruct *reply;
2717
2718     pa_native_connection_assert_ref(c);
2719     pa_assert(t);
2720
2721     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2722
2723     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2724         pa_tagstruct_gets(t, &sink_name) < 0 ||
2725         pa_tagstruct_getu32(t, &volume) < 0 ||
2726         pa_tagstruct_gets(t, &name) < 0) {
2727         protocol_error(c);
2728         return;
2729     }
2730
2731     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2732     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2733     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2734     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2735
2736     if (sink_index != PA_INVALID_INDEX)
2737         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2738     else
2739         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2740
2741     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2742
2743     p = pa_proplist_new();
2744
2745     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2746         !pa_tagstruct_eof(t)) {
2747         protocol_error(c);
2748         pa_proplist_free(p);
2749         return;
2750     }
2751
2752     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2753
2754     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2755         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2756         pa_proplist_free(p);
2757         return;
2758     }
2759
2760     pa_proplist_free(p);
2761
2762     reply = reply_new(tag);
2763
2764     if (c->version >= 13)
2765         pa_tagstruct_putu32(reply, idx);
2766
2767     pa_pstream_send_tagstruct(c->pstream, reply);
2768 }
2769
2770 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2771     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2772     const char *name;
2773
2774     pa_native_connection_assert_ref(c);
2775     pa_assert(t);
2776
2777     if (pa_tagstruct_gets(t, &name) < 0 ||
2778         !pa_tagstruct_eof(t)) {
2779         protocol_error(c);
2780         return;
2781     }
2782
2783     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2784     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2785
2786     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2787         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2788         return;
2789     }
2790
2791     pa_pstream_send_simple_ack(c->pstream, tag);
2792 }
2793
2794 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2795     pa_assert(c);
2796     pa_assert(fixed);
2797     pa_assert(original);
2798
2799     *fixed = *original;
2800
2801     if (c->version < 12) {
2802         /* Before protocol version 12 we didn't support S32 samples,
2803          * so we need to lie about this to the client */
2804
2805         if (fixed->format == PA_SAMPLE_S32LE)
2806             fixed->format = PA_SAMPLE_FLOAT32LE;
2807         if (fixed->format == PA_SAMPLE_S32BE)
2808             fixed->format = PA_SAMPLE_FLOAT32BE;
2809     }
2810
2811     if (c->version < 15) {
2812         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2813             fixed->format = PA_SAMPLE_FLOAT32LE;
2814         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2815             fixed->format = PA_SAMPLE_FLOAT32BE;
2816     }
2817 }
2818
2819 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2820     pa_sample_spec fixed_ss;
2821
2822     pa_assert(t);
2823     pa_sink_assert_ref(sink);
2824
2825     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2826
2827     pa_tagstruct_put(
2828         t,
2829         PA_TAG_U32, sink->index,
2830         PA_TAG_STRING, sink->name,
2831         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2832         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2833         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2834         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2835         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE, FALSE),
2836         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2837         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2838         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2839         PA_TAG_USEC, pa_sink_get_latency(sink),
2840         PA_TAG_STRING, sink->driver,
2841         PA_TAG_U32, sink->flags,
2842         PA_TAG_INVALID);
2843
2844     if (c->version >= 13) {
2845         pa_tagstruct_put_proplist(t, sink->proplist);
2846         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2847     }
2848
2849     if (c->version >= 15) {
2850         pa_tagstruct_put_volume(t, sink->base_volume);
2851         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2852             pa_log_error("Internal sink state is invalid.");
2853         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2854         pa_tagstruct_putu32(t, sink->n_volume_steps);
2855         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2856     }
2857
2858     if (c->version >= 16) {
2859         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
2860
2861         if (sink->ports) {
2862             void *state;
2863             pa_device_port *p;
2864
2865             PA_HASHMAP_FOREACH(p, sink->ports, state) {
2866                 pa_tagstruct_puts(t, p->name);
2867                 pa_tagstruct_puts(t, p->description);
2868                 pa_tagstruct_putu32(t, p->priority);
2869             }
2870         }
2871
2872         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
2873     }
2874 }
2875
2876 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2877     pa_sample_spec fixed_ss;
2878
2879     pa_assert(t);
2880     pa_source_assert_ref(source);
2881
2882     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2883
2884     pa_tagstruct_put(
2885         t,
2886         PA_TAG_U32, source->index,
2887         PA_TAG_STRING, source->name,
2888         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2889         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2890         PA_TAG_CHANNEL_MAP, &source->channel_map,
2891         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2892         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2893         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2894         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2895         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2896         PA_TAG_USEC, pa_source_get_latency(source),
2897         PA_TAG_STRING, source->driver,
2898         PA_TAG_U32, source->flags,
2899         PA_TAG_INVALID);
2900
2901     if (c->version >= 13) {
2902         pa_tagstruct_put_proplist(t, source->proplist);
2903         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2904     }
2905
2906     if (c->version >= 15) {
2907         pa_tagstruct_put_volume(t, source->base_volume);
2908         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2909             pa_log_error("Internal source state is invalid.");
2910         pa_tagstruct_putu32(t, pa_source_get_state(source));
2911         pa_tagstruct_putu32(t, source->n_volume_steps);
2912         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2913     }
2914
2915     if (c->version >= 16) {
2916
2917         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
2918
2919         if (source->ports) {
2920             void *state;
2921             pa_device_port *p;
2922
2923             PA_HASHMAP_FOREACH(p, source->ports, state) {
2924                 pa_tagstruct_puts(t, p->name);
2925                 pa_tagstruct_puts(t, p->description);
2926                 pa_tagstruct_putu32(t, p->priority);
2927             }
2928         }
2929
2930         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
2931     }
2932 }
2933
2934 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2935     pa_assert(t);
2936     pa_assert(client);
2937
2938     pa_tagstruct_putu32(t, client->index);
2939     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2940     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2941     pa_tagstruct_puts(t, client->driver);
2942
2943     if (c->version >= 13)
2944         pa_tagstruct_put_proplist(t, client->proplist);
2945 }
2946
2947 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2948     void *state = NULL;
2949     pa_card_profile *p;
2950
2951     pa_assert(t);
2952     pa_assert(card);
2953
2954     pa_tagstruct_putu32(t, card->index);
2955     pa_tagstruct_puts(t, card->name);
2956     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2957     pa_tagstruct_puts(t, card->driver);
2958
2959     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2960
2961     if (card->profiles) {
2962         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2963             pa_tagstruct_puts(t, p->name);
2964             pa_tagstruct_puts(t, p->description);
2965             pa_tagstruct_putu32(t, p->n_sinks);
2966             pa_tagstruct_putu32(t, p->n_sources);
2967             pa_tagstruct_putu32(t, p->priority);
2968         }
2969     }
2970
2971     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2972     pa_tagstruct_put_proplist(t, card->proplist);
2973 }
2974
2975 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2976     pa_assert(t);
2977     pa_assert(module);
2978
2979     pa_tagstruct_putu32(t, module->index);
2980     pa_tagstruct_puts(t, module->name);
2981     pa_tagstruct_puts(t, module->argument);
2982     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2983
2984     if (c->version < 15)
2985         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2986
2987     if (c->version >= 15)
2988         pa_tagstruct_put_proplist(t, module->proplist);
2989 }
2990
2991 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2992     pa_sample_spec fixed_ss;
2993     pa_usec_t sink_latency;
2994     pa_cvolume v;
2995
2996     pa_assert(t);
2997     pa_sink_input_assert_ref(s);
2998
2999     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3000
3001     pa_tagstruct_putu32(t, s->index);
3002     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3003     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3004     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3005     pa_tagstruct_putu32(t, s->sink->index);
3006     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3007     pa_tagstruct_put_channel_map(t, &s->channel_map);
3008     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s, &v, TRUE));
3009     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3010     pa_tagstruct_put_usec(t, sink_latency);
3011     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3012     pa_tagstruct_puts(t, s->driver);
3013     if (c->version >= 11)
3014         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3015     if (c->version >= 13)
3016         pa_tagstruct_put_proplist(t, s->proplist);
3017 }
3018
3019 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3020     pa_sample_spec fixed_ss;
3021     pa_usec_t source_latency;
3022
3023     pa_assert(t);
3024     pa_source_output_assert_ref(s);
3025
3026     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3027
3028     pa_tagstruct_putu32(t, s->index);
3029     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3030     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3031     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3032     pa_tagstruct_putu32(t, s->source->index);
3033     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3034     pa_tagstruct_put_channel_map(t, &s->channel_map);
3035     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3036     pa_tagstruct_put_usec(t, source_latency);
3037     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3038     pa_tagstruct_puts(t, s->driver);
3039
3040     if (c->version >= 13)
3041         pa_tagstruct_put_proplist(t, s->proplist);
3042 }
3043
3044 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3045     pa_sample_spec fixed_ss;
3046     pa_cvolume v;
3047
3048     pa_assert(t);
3049     pa_assert(e);
3050
3051     if (e->memchunk.memblock)
3052         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3053     else
3054         memset(&fixed_ss, 0, sizeof(fixed_ss));
3055
3056     pa_tagstruct_putu32(t, e->index);
3057     pa_tagstruct_puts(t, e->name);
3058
3059     if (e->volume_is_set)
3060         v = e->volume;
3061     else
3062         pa_cvolume_init(&v);
3063
3064     pa_tagstruct_put_cvolume(t, &v);
3065     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3066     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3067     pa_tagstruct_put_channel_map(t, &e->channel_map);
3068     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3069     pa_tagstruct_put_boolean(t, e->lazy);
3070     pa_tagstruct_puts(t, e->filename);
3071
3072     if (c->version >= 13)
3073         pa_tagstruct_put_proplist(t, e->proplist);
3074 }
3075
3076 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3077     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3078     uint32_t idx;
3079     pa_sink *sink = NULL;
3080     pa_source *source = NULL;
3081     pa_client *client = NULL;
3082     pa_card *card = NULL;
3083     pa_module *module = NULL;
3084     pa_sink_input *si = NULL;
3085     pa_source_output *so = NULL;
3086     pa_scache_entry *sce = NULL;
3087     const char *name = NULL;
3088     pa_tagstruct *reply;
3089
3090     pa_native_connection_assert_ref(c);
3091     pa_assert(t);
3092
3093     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3094         (command != PA_COMMAND_GET_CLIENT_INFO &&
3095          command != PA_COMMAND_GET_MODULE_INFO &&
3096          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3097          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3098          pa_tagstruct_gets(t, &name) < 0) ||
3099         !pa_tagstruct_eof(t)) {
3100         protocol_error(c);
3101         return;
3102     }
3103
3104     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3105     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3106     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3107     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3108     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3109
3110     if (command == PA_COMMAND_GET_SINK_INFO) {
3111         if (idx != PA_INVALID_INDEX)
3112             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3113         else
3114             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3115     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3116         if (idx != PA_INVALID_INDEX)
3117             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3118         else
3119             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3120     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3121         if (idx != PA_INVALID_INDEX)
3122             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3123         else
3124             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3125     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3126         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3127     else if (command == PA_COMMAND_GET_MODULE_INFO)
3128         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3129     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3130         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3131     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3132         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3133     else {
3134         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3135         if (idx != PA_INVALID_INDEX)
3136             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3137         else
3138             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3139     }
3140
3141     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3142         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3143         return;
3144     }
3145
3146     reply = reply_new(tag);
3147     if (sink)
3148         sink_fill_tagstruct(c, reply, sink);
3149     else if (source)
3150         source_fill_tagstruct(c, reply, source);
3151     else if (client)
3152         client_fill_tagstruct(c, reply, client);
3153     else if (card)
3154         card_fill_tagstruct(c, reply, card);
3155     else if (module)
3156         module_fill_tagstruct(c, reply, module);
3157     else if (si)
3158         sink_input_fill_tagstruct(c, reply, si);
3159     else if (so)
3160         source_output_fill_tagstruct(c, reply, so);
3161     else
3162         scache_fill_tagstruct(c, reply, sce);
3163     pa_pstream_send_tagstruct(c->pstream, reply);
3164 }
3165
3166 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3167     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3168     pa_idxset *i;
3169     uint32_t idx;
3170     void *p;
3171     pa_tagstruct *reply;
3172
3173     pa_native_connection_assert_ref(c);
3174     pa_assert(t);
3175
3176     if (!pa_tagstruct_eof(t)) {
3177         protocol_error(c);
3178         return;
3179     }
3180
3181     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3182
3183     reply = reply_new(tag);
3184
3185     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3186         i = c->protocol->core->sinks;
3187     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3188         i = c->protocol->core->sources;
3189     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3190         i = c->protocol->core->clients;
3191     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3192         i = c->protocol->core->cards;
3193     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3194         i = c->protocol->core->modules;
3195     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3196         i = c->protocol->core->sink_inputs;
3197     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3198         i = c->protocol->core->source_outputs;
3199     else {
3200         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3201         i = c->protocol->core->scache;
3202     }
3203
3204     if (i) {
3205         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3206             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3207                 sink_fill_tagstruct(c, reply, p);
3208             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3209                 source_fill_tagstruct(c, reply, p);
3210             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3211                 client_fill_tagstruct(c, reply, p);
3212             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3213                 card_fill_tagstruct(c, reply, p);
3214             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3215                 module_fill_tagstruct(c, reply, p);
3216             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3217                 sink_input_fill_tagstruct(c, reply, p);
3218             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3219                 source_output_fill_tagstruct(c, reply, p);
3220             else {
3221                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3222                 scache_fill_tagstruct(c, reply, p);
3223             }
3224         }
3225     }
3226
3227     pa_pstream_send_tagstruct(c->pstream, reply);
3228 }
3229
3230 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3231     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3232     pa_tagstruct *reply;
3233     pa_sink *def_sink;
3234     pa_source *def_source;
3235     pa_sample_spec fixed_ss;
3236     char *h, *u;
3237
3238     pa_native_connection_assert_ref(c);
3239     pa_assert(t);
3240
3241     if (!pa_tagstruct_eof(t)) {
3242         protocol_error(c);
3243         return;
3244     }
3245
3246     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3247
3248     reply = reply_new(tag);
3249     pa_tagstruct_puts(reply, PACKAGE_NAME);
3250     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3251
3252     u = pa_get_user_name_malloc();
3253     pa_tagstruct_puts(reply, u);
3254     pa_xfree(u);
3255
3256     h = pa_get_host_name_malloc();
3257     pa_tagstruct_puts(reply, h);
3258     pa_xfree(h);
3259
3260     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3261     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3262
3263     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3264     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3265     def_source = pa_namereg_get_default_source(c->protocol->core);
3266     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3267
3268     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3269
3270     if (c->version >= 15)
3271         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3272
3273     pa_pstream_send_tagstruct(c->pstream, reply);
3274 }
3275
3276 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3277     pa_tagstruct *t;
3278     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3279
3280     pa_native_connection_assert_ref(c);
3281
3282     t = pa_tagstruct_new(NULL, 0);
3283     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3284     pa_tagstruct_putu32(t, (uint32_t) -1);
3285     pa_tagstruct_putu32(t, e);
3286     pa_tagstruct_putu32(t, idx);
3287     pa_pstream_send_tagstruct(c->pstream, t);
3288 }
3289
3290 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3291     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3292     pa_subscription_mask_t m;
3293
3294     pa_native_connection_assert_ref(c);
3295     pa_assert(t);
3296
3297     if (pa_tagstruct_getu32(t, &m) < 0 ||
3298         !pa_tagstruct_eof(t)) {
3299         protocol_error(c);
3300         return;
3301     }
3302
3303     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3304     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3305
3306     if (c->subscription)
3307         pa_subscription_free(c->subscription);
3308
3309     if (m != 0) {
3310         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3311         pa_assert(c->subscription);
3312     } else
3313         c->subscription = NULL;
3314
3315     pa_pstream_send_simple_ack(c->pstream, tag);
3316 }
3317
3318 static void command_set_volume(
3319         pa_pdispatch *pd,
3320         uint32_t command,
3321         uint32_t tag,
3322         pa_tagstruct *t,
3323         void *userdata) {
3324
3325     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3326     uint32_t idx;
3327     pa_cvolume volume;
3328     pa_sink *sink = NULL;
3329     pa_source *source = NULL;
3330     pa_sink_input *si = NULL;
3331     const char *name = NULL;
3332     const char *client_name;
3333
3334     pa_native_connection_assert_ref(c);
3335     pa_assert(t);
3336
3337     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3338         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3339         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3340         pa_tagstruct_get_cvolume(t, &volume) ||
3341         !pa_tagstruct_eof(t)) {
3342         protocol_error(c);
3343         return;
3344     }
3345
3346     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3347     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3348     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3349     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3350     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3351     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3352
3353     switch (command) {
3354
3355         case PA_COMMAND_SET_SINK_VOLUME:
3356             if (idx != PA_INVALID_INDEX)
3357                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3358             else
3359                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3360             break;
3361
3362         case PA_COMMAND_SET_SOURCE_VOLUME:
3363             if (idx != PA_INVALID_INDEX)
3364                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3365             else
3366                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3367             break;
3368
3369         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3370             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3371             break;
3372
3373         default:
3374             pa_assert_not_reached();
3375     }
3376
3377     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3378
3379     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3380
3381     if (sink) {
3382         pa_log("Client %s changes volume of sink %s.", client_name, sink->name);
3383         pa_sink_set_volume(sink, &volume, TRUE, TRUE, TRUE, TRUE);
3384     } else if (source) {
3385         pa_log("Client %s changes volume of sink %s.", client_name, source->name);
3386         pa_source_set_volume(source, &volume, TRUE);
3387     } else if (si) {
3388         pa_log("Client %s changes volume of sink %s.",
3389                      client_name,
3390                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3391         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3392     }
3393
3394     pa_pstream_send_simple_ack(c->pstream, tag);
3395 }
3396
3397 static void command_set_mute(
3398         pa_pdispatch *pd,
3399         uint32_t command,
3400         uint32_t tag,
3401         pa_tagstruct *t,
3402         void *userdata) {
3403
3404     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3405     uint32_t idx;
3406     pa_bool_t mute;
3407     pa_sink *sink = NULL;
3408     pa_source *source = NULL;
3409     pa_sink_input *si = NULL;
3410     const char *name = NULL;
3411
3412     pa_native_connection_assert_ref(c);
3413     pa_assert(t);
3414
3415     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3416         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3417         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3418         pa_tagstruct_get_boolean(t, &mute) ||
3419         !pa_tagstruct_eof(t)) {
3420         protocol_error(c);
3421         return;
3422     }
3423
3424     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3425     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3426     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3427     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3428     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3429
3430     switch (command) {
3431
3432         case PA_COMMAND_SET_SINK_MUTE:
3433
3434             if (idx != PA_INVALID_INDEX)
3435                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3436             else
3437                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3438
3439             break;
3440
3441         case PA_COMMAND_SET_SOURCE_MUTE:
3442             if (idx != PA_INVALID_INDEX)
3443                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3444             else
3445                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3446
3447             break;
3448
3449         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3450             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3451             break;
3452
3453         default:
3454             pa_assert_not_reached();
3455     }
3456
3457     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3458
3459     if (sink)
3460         pa_sink_set_mute(sink, mute, TRUE);
3461     else if (source)
3462         pa_source_set_mute(source, mute, TRUE);
3463     else if (si)
3464         pa_sink_input_set_mute(si, mute, TRUE);
3465
3466     pa_pstream_send_simple_ack(c->pstream, tag);
3467 }
3468
3469 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3470     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3471     uint32_t idx;
3472     pa_bool_t b;
3473     playback_stream *s;
3474
3475     pa_native_connection_assert_ref(c);
3476     pa_assert(t);
3477
3478     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3479         pa_tagstruct_get_boolean(t, &b) < 0 ||
3480         !pa_tagstruct_eof(t)) {
3481         protocol_error(c);
3482         return;
3483     }
3484
3485     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3486     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3487     s = pa_idxset_get_by_index(c->output_streams, idx);
3488     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3489     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3490
3491     pa_sink_input_cork(s->sink_input, b);
3492
3493     if (b)
3494         s->is_underrun = TRUE;
3495
3496     pa_pstream_send_simple_ack(c->pstream, tag);
3497 }
3498
3499 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3500     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3501     uint32_t idx;
3502     playback_stream *s;
3503
3504     pa_native_connection_assert_ref(c);
3505     pa_assert(t);
3506
3507     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3508         !pa_tagstruct_eof(t)) {
3509         protocol_error(c);
3510         return;
3511     }
3512
3513     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3514     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3515     s = pa_idxset_get_by_index(c->output_streams, idx);
3516     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3517     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3518
3519     switch (command) {
3520         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3521             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3522             break;
3523
3524         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3525             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3526             break;
3527
3528         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3529             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3530             break;
3531
3532         default:
3533             pa_assert_not_reached();
3534     }
3535
3536     pa_pstream_send_simple_ack(c->pstream, tag);
3537 }
3538
3539 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3540     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3541     uint32_t idx;
3542     record_stream *s;
3543     pa_bool_t b;
3544
3545     pa_native_connection_assert_ref(c);
3546     pa_assert(t);
3547
3548     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3549         pa_tagstruct_get_boolean(t, &b) < 0 ||
3550         !pa_tagstruct_eof(t)) {
3551         protocol_error(c);
3552         return;
3553     }
3554
3555     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3556     s = pa_idxset_get_by_index(c->record_streams, idx);
3557     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3558
3559     pa_source_output_cork(s->source_output, b);
3560     pa_memblockq_prebuf_force(s->memblockq);
3561     pa_pstream_send_simple_ack(c->pstream, tag);
3562 }
3563
3564 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3565     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3566     uint32_t idx;
3567     record_stream *s;
3568
3569     pa_native_connection_assert_ref(c);
3570     pa_assert(t);
3571
3572     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3573         !pa_tagstruct_eof(t)) {
3574         protocol_error(c);
3575         return;
3576     }
3577
3578     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3579     s = pa_idxset_get_by_index(c->record_streams, idx);
3580     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3581
3582     pa_memblockq_flush_read(s->memblockq);
3583     pa_pstream_send_simple_ack(c->pstream, tag);
3584 }
3585
3586 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3587     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3588     uint32_t idx;
3589     pa_buffer_attr a;
3590     pa_tagstruct *reply;
3591
3592     pa_native_connection_assert_ref(c);
3593     pa_assert(t);
3594
3595     memset(&a, 0, sizeof(a));
3596
3597     if (pa_tagstruct_getu32(t, &idx) < 0) {
3598         protocol_error(c);
3599         return;
3600     }
3601
3602     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3603
3604     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3605         playback_stream *s;
3606         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3607
3608         s = pa_idxset_get_by_index(c->output_streams, idx);
3609         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3610         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3611
3612         if (pa_tagstruct_get(
3613                     t,
3614                     PA_TAG_U32, &a.maxlength,
3615                     PA_TAG_U32, &a.tlength,
3616                     PA_TAG_U32, &a.prebuf,
3617                     PA_TAG_U32, &a.minreq,
3618                     PA_TAG_INVALID) < 0 ||
3619             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3620             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3621             !pa_tagstruct_eof(t)) {
3622             protocol_error(c);
3623             return;
3624         }
3625
3626         s->adjust_latency = adjust_latency;
3627         s->early_requests = early_requests;
3628         s->buffer_attr = a;
3629
3630         fix_playback_buffer_attr(s);
3631         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3632
3633         reply = reply_new(tag);
3634         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3635         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3636         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3637         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3638
3639         if (c->version >= 13)
3640             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3641
3642     } else {
3643         record_stream *s;
3644         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3645         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3646
3647         s = pa_idxset_get_by_index(c->record_streams, idx);
3648         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3649
3650         if (pa_tagstruct_get(
3651                     t,
3652                     PA_TAG_U32, &a.maxlength,
3653                     PA_TAG_U32, &a.fragsize,
3654                     PA_TAG_INVALID) < 0 ||
3655             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3656             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3657             !pa_tagstruct_eof(t)) {
3658             protocol_error(c);
3659             return;
3660         }
3661
3662         s->adjust_latency = adjust_latency;
3663         s->early_requests = early_requests;
3664         s->buffer_attr = a;
3665
3666         fix_record_buffer_attr_pre(s);
3667         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3668         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3669         fix_record_buffer_attr_post(s);
3670
3671         reply = reply_new(tag);
3672         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3673         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3674
3675         if (c->version >= 13)
3676             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3677     }
3678
3679     pa_pstream_send_tagstruct(c->pstream, reply);
3680 }
3681
3682 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3683     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3684     uint32_t idx;
3685     uint32_t rate;
3686
3687     pa_native_connection_assert_ref(c);
3688     pa_assert(t);
3689
3690     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3691         pa_tagstruct_getu32(t, &rate) < 0 ||
3692         !pa_tagstruct_eof(t)) {
3693         protocol_error(c);
3694         return;
3695     }
3696
3697     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3698     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3699
3700     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3701         playback_stream *s;
3702
3703         s = pa_idxset_get_by_index(c->output_streams, idx);
3704         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3705         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3706
3707         pa_sink_input_set_rate(s->sink_input, rate);
3708
3709     } else {
3710         record_stream *s;
3711         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3712
3713         s = pa_idxset_get_by_index(c->record_streams, idx);
3714         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3715
3716         pa_source_output_set_rate(s->source_output, rate);
3717     }
3718
3719     pa_pstream_send_simple_ack(c->pstream, tag);
3720 }
3721
3722 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3723     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3724     uint32_t idx;
3725     uint32_t mode;
3726     pa_proplist *p;
3727
3728     pa_native_connection_assert_ref(c);
3729     pa_assert(t);
3730
3731     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3732
3733     p = pa_proplist_new();
3734
3735     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3736
3737         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3738             pa_tagstruct_get_proplist(t, p) < 0 ||
3739             !pa_tagstruct_eof(t)) {
3740             protocol_error(c);
3741             pa_proplist_free(p);
3742             return;
3743         }
3744
3745     } else {
3746
3747         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3748             pa_tagstruct_getu32(t, &mode) < 0 ||
3749             pa_tagstruct_get_proplist(t, p) < 0 ||
3750             !pa_tagstruct_eof(t)) {
3751             protocol_error(c);
3752             pa_proplist_free(p);
3753             return;
3754         }
3755     }
3756
3757     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3758         pa_proplist_free(p);
3759         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3760     }
3761
3762     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3763         playback_stream *s;
3764
3765         s = pa_idxset_get_by_index(c->output_streams, idx);
3766         if (!s || !playback_stream_isinstance(s)) {
3767             pa_proplist_free(p);
3768             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3769         }
3770         pa_sink_input_update_proplist(s->sink_input, mode, p);
3771
3772     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3773         record_stream *s;
3774
3775         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3776             pa_proplist_free(p);
3777             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3778         }
3779         pa_source_output_update_proplist(s->source_output, mode, p);
3780
3781     } else {
3782         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3783
3784         pa_client_update_proplist(c->client, mode, p);
3785     }
3786
3787     pa_pstream_send_simple_ack(c->pstream, tag);
3788     pa_proplist_free(p);
3789 }
3790
3791 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3792     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3793     uint32_t idx;
3794     unsigned changed = 0;
3795     pa_proplist *p;
3796     pa_strlist *l = NULL;
3797
3798     pa_native_connection_assert_ref(c);
3799     pa_assert(t);
3800
3801     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3802
3803     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3804
3805         if (pa_tagstruct_getu32(t, &idx) < 0) {
3806             protocol_error(c);
3807             return;
3808         }
3809     }
3810
3811     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3812         playback_stream *s;
3813
3814         s = pa_idxset_get_by_index(c->output_streams, idx);
3815         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3816         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3817
3818         p = s->sink_input->proplist;
3819
3820     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3821         record_stream *s;
3822
3823         s = pa_idxset_get_by_index(c->record_streams, idx);
3824         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3825
3826         p = s->source_output->proplist;
3827     } else {
3828         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3829
3830         p = c->client->proplist;
3831     }
3832
3833     for (;;) {
3834         const char *k;
3835
3836         if (pa_tagstruct_gets(t, &k) < 0) {
3837             protocol_error(c);
3838             pa_strlist_free(l);
3839             return;
3840         }
3841
3842         if (!k)
3843             break;
3844
3845         l = pa_strlist_prepend(l, k);
3846     }
3847
3848     if (!pa_tagstruct_eof(t)) {
3849         protocol_error(c);
3850         pa_strlist_free(l);
3851         return;
3852     }
3853
3854     for (;;) {
3855         char *z;
3856
3857         l = pa_strlist_pop(l, &z);
3858
3859         if (!z)
3860             break;
3861
3862         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3863         pa_xfree(z);
3864     }
3865
3866     pa_pstream_send_simple_ack(c->pstream, tag);
3867
3868     if (changed) {
3869         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3870             playback_stream *s;
3871
3872             s = pa_idxset_get_by_index(c->output_streams, idx);
3873             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3874
3875         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3876             record_stream *s;
3877
3878             s = pa_idxset_get_by_index(c->record_streams, idx);
3879             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3880
3881         } else {
3882             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3883             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3884         }
3885     }
3886 }
3887
3888 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3889     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3890     const char *s;
3891
3892     pa_native_connection_assert_ref(c);
3893     pa_assert(t);
3894
3895     if (pa_tagstruct_gets(t, &s) < 0 ||
3896         !pa_tagstruct_eof(t)) {
3897         protocol_error(c);
3898         return;
3899     }
3900
3901     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3902     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3903
3904     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3905         pa_source *source;
3906
3907         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3908         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3909
3910         pa_namereg_set_default_source(c->protocol->core, source);
3911     } else {
3912         pa_sink *sink;
3913         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3914
3915         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3916         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3917
3918         pa_namereg_set_default_sink(c->protocol->core, sink);
3919     }
3920
3921     pa_pstream_send_simple_ack(c->pstream, tag);
3922 }
3923
3924 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3925     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3926     uint32_t idx;
3927     const char *name;
3928
3929     pa_native_connection_assert_ref(c);
3930     pa_assert(t);
3931
3932     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3933         pa_tagstruct_gets(t, &name) < 0 ||
3934         !pa_tagstruct_eof(t)) {
3935         protocol_error(c);
3936         return;
3937     }
3938
3939     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3940     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3941
3942     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3943         playback_stream *s;
3944
3945         s = pa_idxset_get_by_index(c->output_streams, idx);
3946         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3947         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3948
3949         pa_sink_input_set_name(s->sink_input, name);
3950
3951     } else {
3952         record_stream *s;
3953         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3954
3955         s = pa_idxset_get_by_index(c->record_streams, idx);
3956         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3957
3958         pa_source_output_set_name(s->source_output, name);
3959     }
3960
3961     pa_pstream_send_simple_ack(c->pstream, tag);
3962 }
3963
3964 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3965     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3966     uint32_t idx;
3967
3968     pa_native_connection_assert_ref(c);
3969     pa_assert(t);
3970
3971     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3972         !pa_tagstruct_eof(t)) {
3973         protocol_error(c);
3974         return;
3975     }
3976
3977     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3978
3979     if (command == PA_COMMAND_KILL_CLIENT) {
3980         pa_client *client;
3981
3982         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3983         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3984
3985         pa_native_connection_ref(c);
3986         pa_client_kill(client);
3987
3988     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3989         pa_sink_input *s;
3990
3991         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3992         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3993
3994         pa_native_connection_ref(c);
3995         pa_sink_input_kill(s);
3996     } else {
3997         pa_source_output *s;
3998
3999         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4000
4001         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4002         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4003
4004         pa_native_connection_ref(c);
4005         pa_source_output_kill(s);
4006     }
4007
4008     pa_pstream_send_simple_ack(c->pstream, tag);
4009     pa_native_connection_unref(c);
4010 }
4011
4012 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4013     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4014     pa_module *m;
4015     const char *name, *argument;
4016     pa_tagstruct *reply;
4017
4018     pa_native_connection_assert_ref(c);
4019     pa_assert(t);
4020
4021     if (pa_tagstruct_gets(t, &name) < 0 ||
4022         pa_tagstruct_gets(t, &argument) < 0 ||
4023         !pa_tagstruct_eof(t)) {
4024         protocol_error(c);
4025         return;
4026     }
4027
4028     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4029     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4030     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4031
4032     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4033         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4034         return;
4035     }
4036
4037     reply = reply_new(tag);
4038     pa_tagstruct_putu32(reply, m->index);
4039     pa_pstream_send_tagstruct(c->pstream, reply);
4040 }
4041
4042 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4043     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4044     uint32_t idx;
4045     pa_module *m;
4046
4047     pa_native_connection_assert_ref(c);
4048     pa_assert(t);
4049
4050     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4051         !pa_tagstruct_eof(t)) {
4052         protocol_error(c);
4053         return;
4054     }
4055
4056     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4057     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4058     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4059
4060     pa_module_unload_request(m, FALSE);
4061     pa_pstream_send_simple_ack(c->pstream, tag);
4062 }
4063
4064 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4065     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4066     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4067     const char *name_device = NULL;
4068
4069     pa_native_connection_assert_ref(c);
4070     pa_assert(t);
4071
4072     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4073         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4074         pa_tagstruct_gets(t, &name_device) < 0 ||
4075         !pa_tagstruct_eof(t)) {
4076         protocol_error(c);
4077         return;
4078     }
4079
4080     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4081     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4082
4083     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4084     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4085     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4086     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4087
4088     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4089         pa_sink_input *si = NULL;
4090         pa_sink *sink = NULL;
4091
4092         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4093
4094         if (idx_device != PA_INVALID_INDEX)
4095             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4096         else
4097             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4098
4099         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4100
4101         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4102             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4103             return;
4104         }
4105     } else {
4106         pa_source_output *so = NULL;
4107         pa_source *source;
4108
4109         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4110
4111         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4112
4113         if (idx_device != PA_INVALID_INDEX)
4114             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4115         else
4116             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4117
4118         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4119
4120         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4121             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4122             return;
4123         }
4124     }
4125
4126     pa_pstream_send_simple_ack(c->pstream, tag);
4127 }
4128
4129 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4130     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4131     uint32_t idx = PA_INVALID_INDEX;
4132     const char *name = NULL;
4133     pa_bool_t b;
4134
4135     pa_native_connection_assert_ref(c);
4136     pa_assert(t);
4137
4138     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4139         pa_tagstruct_gets(t, &name) < 0 ||
4140         pa_tagstruct_get_boolean(t, &b) < 0 ||
4141         !pa_tagstruct_eof(t)) {
4142         protocol_error(c);
4143         return;
4144     }
4145
4146     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4147     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4148     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4149     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4150     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4151
4152     if (command == PA_COMMAND_SUSPEND_SINK) {
4153
4154         if (idx == PA_INVALID_INDEX && name && !*name) {
4155
4156             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4157
4158             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4159                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4160                 return;
4161             }
4162         } else {
4163             pa_sink *sink = NULL;
4164
4165             if (idx != PA_INVALID_INDEX)
4166                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4167             else
4168                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4169
4170             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4171
4172             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4173                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4174                 return;
4175             }
4176         }
4177     } else {
4178
4179         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4180
4181         if (idx == PA_INVALID_INDEX && name && !*name) {
4182
4183             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4184
4185             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4186                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4187                 return;
4188             }
4189
4190         } else {
4191             pa_source *source;
4192
4193             if (idx != PA_INVALID_INDEX)
4194                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4195             else
4196                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4197
4198             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4199
4200             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4201                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4202                 return;
4203             }
4204         }
4205     }
4206
4207     pa_pstream_send_simple_ack(c->pstream, tag);
4208 }
4209
4210 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4211     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4212     uint32_t idx = PA_INVALID_INDEX;
4213     const char *name = NULL;
4214     pa_module *m;
4215     pa_native_protocol_ext_cb_t cb;
4216
4217     pa_native_connection_assert_ref(c);
4218     pa_assert(t);
4219
4220     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4221         pa_tagstruct_gets(t, &name) < 0) {
4222         protocol_error(c);
4223         return;
4224     }
4225
4226     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4227     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4228     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4229     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4230     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4231
4232     if (idx != PA_INVALID_INDEX)
4233         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4234     else {
4235         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4236             if (strcmp(name, m->name) == 0)
4237                 break;
4238     }
4239
4240     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4241     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4242
4243     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4244     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4245
4246     if (cb(c->protocol, m, c, tag, t) < 0)
4247         protocol_error(c);
4248 }
4249
4250 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4251     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4252     uint32_t idx = PA_INVALID_INDEX;
4253     const char *name = NULL, *profile = NULL;
4254     pa_card *card = NULL;
4255     int ret;
4256
4257     pa_native_connection_assert_ref(c);
4258     pa_assert(t);
4259
4260     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4261         pa_tagstruct_gets(t, &name) < 0 ||
4262         pa_tagstruct_gets(t, &profile) < 0 ||
4263         !pa_tagstruct_eof(t)) {
4264         protocol_error(c);
4265         return;
4266     }
4267
4268     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4269     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4270     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4271     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4272     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4273
4274     if (idx != PA_INVALID_INDEX)
4275         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4276     else
4277         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4278
4279     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4280
4281     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4282         pa_pstream_send_error(c->pstream, tag, -ret);
4283         return;
4284     }
4285
4286     pa_pstream_send_simple_ack(c->pstream, tag);
4287 }
4288
4289 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4290     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4291     uint32_t idx = PA_INVALID_INDEX;
4292     const char *name = NULL, *port = NULL;
4293     int ret;
4294
4295     pa_native_connection_assert_ref(c);
4296     pa_assert(t);
4297
4298     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4299         pa_tagstruct_gets(t, &name) < 0 ||
4300         pa_tagstruct_gets(t, &port) < 0 ||
4301         !pa_tagstruct_eof(t)) {
4302         protocol_error(c);
4303         return;
4304     }
4305
4306     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4307     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4308     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4309     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4310     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4311
4312     if (command == PA_COMMAND_SET_SINK_PORT) {
4313         pa_sink *sink;
4314
4315         if (idx != PA_INVALID_INDEX)
4316             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4317         else
4318             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4319
4320         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4321
4322         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4323             pa_pstream_send_error(c->pstream, tag, -ret);
4324             return;
4325         }
4326     } else {
4327         pa_source *source;
4328
4329         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4330
4331         if (idx != PA_INVALID_INDEX)
4332             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4333         else
4334             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4335
4336         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4337
4338         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4339             pa_pstream_send_error(c->pstream, tag, -ret);
4340             return;
4341         }
4342     }
4343
4344     pa_pstream_send_simple_ack(c->pstream, tag);
4345 }
4346
4347 /*** pstream callbacks ***/
4348
4349 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4350     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4351
4352     pa_assert(p);
4353     pa_assert(packet);
4354     pa_native_connection_assert_ref(c);
4355
4356     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4357         pa_log("invalid packet.");
4358         native_connection_unlink(c);
4359     }
4360 }
4361
4362 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4363     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4364     output_stream *stream;
4365
4366     pa_assert(p);
4367     pa_assert(chunk);
4368     pa_native_connection_assert_ref(c);
4369
4370     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4371         pa_log_debug("Client sent block for invalid stream.");
4372         /* Ignoring */
4373         return;
4374     }
4375
4376 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4377
4378     if (playback_stream_isinstance(stream)) {
4379         playback_stream *ps = PLAYBACK_STREAM(stream);
4380
4381         if (chunk->memblock) {
4382             if (seek != PA_SEEK_RELATIVE || offset != 0)
4383                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4384
4385             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4386         } else
4387             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4388
4389     } else {
4390         upload_stream *u = UPLOAD_STREAM(stream);
4391         size_t l;
4392
4393         if (!u->memchunk.memblock) {
4394             if (u->length == chunk->length && chunk->memblock) {
4395                 u->memchunk = *chunk;
4396                 pa_memblock_ref(u->memchunk.memblock);
4397                 u->length = 0;
4398             } else {
4399                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4400                 u->memchunk.index = u->memchunk.length = 0;
4401             }
4402         }
4403
4404         pa_assert(u->memchunk.memblock);
4405
4406         l = u->length;
4407         if (l > chunk->length)
4408             l = chunk->length;
4409
4410         if (l > 0) {
4411             void *dst;
4412             dst = pa_memblock_acquire(u->memchunk.memblock);
4413
4414             if (chunk->memblock) {
4415                 void *src;
4416                 src = pa_memblock_acquire(chunk->memblock);
4417
4418                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4419                        (uint8_t*) src + chunk->index, l);
4420
4421                 pa_memblock_release(chunk->memblock);
4422             } else
4423                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4424
4425             pa_memblock_release(u->memchunk.memblock);
4426
4427             u->memchunk.length += l;
4428             u->length -= l;
4429         }
4430     }
4431 }
4432
4433 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4434     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4435
4436     pa_assert(p);
4437     pa_native_connection_assert_ref(c);
4438
4439     native_connection_unlink(c);
4440     pa_log_info("Connection died.");
4441 }
4442
4443 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4444     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4445
4446     pa_assert(p);
4447     pa_native_connection_assert_ref(c);
4448
4449     native_connection_send_memblock(c);
4450 }
4451
4452 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4453     pa_thread_mq *q;
4454
4455     if (!(q = pa_thread_mq_get()))
4456         pa_pstream_send_revoke(p, block_id);
4457     else
4458         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4459 }
4460
4461 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4462     pa_thread_mq *q;
4463
4464     if (!(q = pa_thread_mq_get()))
4465         pa_pstream_send_release(p, block_id);
4466     else
4467         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4468 }
4469
4470 /*** client callbacks ***/
4471
4472 static void client_kill_cb(pa_client *c) {
4473     pa_assert(c);
4474
4475     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4476     pa_log_info("Connection killed.");
4477 }
4478
4479 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4480     pa_tagstruct *t;
4481     pa_native_connection *c;
4482
4483     pa_assert(client);
4484     c = PA_NATIVE_CONNECTION(client->userdata);
4485     pa_native_connection_assert_ref(c);
4486
4487     if (c->version < 15)
4488       return;
4489
4490     t = pa_tagstruct_new(NULL, 0);
4491     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4492     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4493     pa_tagstruct_puts(t, event);
4494     pa_tagstruct_put_proplist(t, pl);
4495     pa_pstream_send_tagstruct(c->pstream, t);
4496 }
4497
4498 /*** module entry points ***/
4499
4500 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4501     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4502
4503     pa_assert(m);
4504     pa_native_connection_assert_ref(c);
4505     pa_assert(c->auth_timeout_event == e);
4506
4507     if (!c->authorized) {
4508         native_connection_unlink(c);
4509         pa_log_info("Connection terminated due to authentication timeout.");
4510     }
4511 }
4512
4513 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4514     pa_native_connection *c;
4515     char pname[128];
4516     pa_client *client;
4517     pa_client_new_data data;
4518
4519     pa_assert(p);
4520     pa_assert(io);
4521     pa_assert(o);
4522
4523     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4524         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4525         pa_iochannel_free(io);
4526         return;
4527     }
4528
4529     pa_client_new_data_init(&data);
4530     data.module = o->module;
4531     data.driver = __FILE__;
4532     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4533     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4534     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4535     client = pa_client_new(p->core, &data);
4536     pa_client_new_data_done(&data);
4537
4538     if (!client)
4539         return;
4540
4541     c = pa_msgobject_new(pa_native_connection);
4542     c->parent.parent.free = native_connection_free;
4543     c->parent.process_msg = native_connection_process_msg;
4544     c->protocol = p;
4545     c->options = pa_native_options_ref(o);
4546     c->authorized = FALSE;
4547
4548     if (o->auth_anonymous) {
4549         pa_log_info("Client authenticated anonymously.");
4550         c->authorized = TRUE;
4551     }
4552
4553     if (!c->authorized &&
4554         o->auth_ip_acl &&
4555         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4556
4557         pa_log_info("Client authenticated by IP ACL.");
4558         c->authorized = TRUE;
4559     }
4560
4561     if (!c->authorized)
4562         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4563     else
4564         c->auth_timeout_event = NULL;
4565
4566     c->is_local = pa_iochannel_socket_is_local(io);
4567     c->version = 8;
4568
4569     c->client = client;
4570     c->client->kill = client_kill_cb;
4571     c->client->send_event = client_send_event_cb;
4572     c->client->userdata = c;
4573
4574     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4575     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4576     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4577     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4578     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4579     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4580     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4581
4582     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4583
4584     c->record_streams = pa_idxset_new(NULL, NULL);
4585     c->output_streams = pa_idxset_new(NULL, NULL);
4586
4587     c->rrobin_index = PA_IDXSET_INVALID;
4588     c->subscription = NULL;
4589
4590     pa_idxset_put(p->connections, c, NULL);
4591
4592 #ifdef HAVE_CREDS
4593     if (pa_iochannel_creds_supported(io))
4594         pa_iochannel_creds_enable(io);
4595 #endif
4596
4597     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4598 }
4599
4600 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4601     pa_native_connection *c;
4602     void *state = NULL;
4603
4604     pa_assert(p);
4605     pa_assert(m);
4606
4607     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4608         if (c->options->module == m)
4609             native_connection_unlink(c);
4610 }
4611
4612 static pa_native_protocol* native_protocol_new(pa_core *c) {
4613     pa_native_protocol *p;
4614     pa_native_hook_t h;
4615
4616     pa_assert(c);
4617
4618     p = pa_xnew(pa_native_protocol, 1);
4619     PA_REFCNT_INIT(p);
4620     p->core = c;
4621     p->connections = pa_idxset_new(NULL, NULL);
4622
4623     p->servers = NULL;
4624
4625     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4626
4627     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4628         pa_hook_init(&p->hooks[h], p);
4629
4630     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4631
4632     return p;
4633 }
4634
4635 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4636     pa_native_protocol *p;
4637
4638     if ((p = pa_shared_get(c, "native-protocol")))
4639         return pa_native_protocol_ref(p);
4640
4641     return native_protocol_new(c);
4642 }
4643
4644 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4645     pa_assert(p);
4646     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4647
4648     PA_REFCNT_INC(p);
4649
4650     return p;
4651 }
4652
4653 void pa_native_protocol_unref(pa_native_protocol *p) {
4654     pa_native_connection *c;
4655     pa_native_hook_t h;
4656
4657     pa_assert(p);
4658     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4659
4660     if (PA_REFCNT_DEC(p) > 0)
4661         return;
4662
4663     while ((c = pa_idxset_first(p->connections, NULL)))
4664         native_connection_unlink(c);
4665
4666     pa_idxset_free(p->connections, NULL, NULL);
4667
4668     pa_strlist_free(p->servers);
4669
4670     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4671         pa_hook_done(&p->hooks[h]);
4672
4673     pa_hashmap_free(p->extensions, NULL, NULL);
4674
4675     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4676
4677     pa_xfree(p);
4678 }
4679
4680 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4681     pa_assert(p);
4682     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4683     pa_assert(name);
4684
4685     p->servers = pa_strlist_prepend(p->servers, name);
4686
4687     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4688 }
4689
4690 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4691     pa_assert(p);
4692     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4693     pa_assert(name);
4694
4695     p->servers = pa_strlist_remove(p->servers, name);
4696
4697     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4698 }
4699
4700 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4701     pa_assert(p);
4702     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4703
4704     return p->hooks;
4705 }
4706
4707 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4708     pa_assert(p);
4709     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4710
4711     return p->servers;
4712 }
4713
4714 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4715     pa_assert(p);
4716     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4717     pa_assert(m);
4718     pa_assert(cb);
4719     pa_assert(!pa_hashmap_get(p->extensions, m));
4720
4721     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4722     return 0;
4723 }
4724
4725 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4726     pa_assert(p);
4727     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4728     pa_assert(m);
4729
4730     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4731 }
4732
4733 pa_native_options* pa_native_options_new(void) {
4734     pa_native_options *o;
4735
4736     o = pa_xnew0(pa_native_options, 1);
4737     PA_REFCNT_INIT(o);
4738
4739     return o;
4740 }
4741
4742 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4743     pa_assert(o);
4744     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4745
4746     PA_REFCNT_INC(o);
4747
4748     return o;
4749 }
4750
4751 void pa_native_options_unref(pa_native_options *o) {
4752     pa_assert(o);
4753     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4754
4755     if (PA_REFCNT_DEC(o) > 0)
4756         return;
4757
4758     pa_xfree(o->auth_group);
4759
4760     if (o->auth_ip_acl)
4761         pa_ip_acl_free(o->auth_ip_acl);
4762
4763     if (o->auth_cookie)
4764         pa_auth_cookie_unref(o->auth_cookie);
4765
4766     pa_xfree(o);
4767 }
4768
4769 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4770     pa_bool_t enabled;
4771     const char *acl;
4772
4773     pa_assert(o);
4774     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4775     pa_assert(ma);
4776
4777     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4778         pa_log("auth-anonymous= expects a boolean argument.");
4779         return -1;
4780     }
4781
4782     enabled = TRUE;
4783     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4784         pa_log("auth-group-enabled= expects a boolean argument.");
4785         return -1;
4786     }
4787
4788     pa_xfree(o->auth_group);
4789     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4790
4791 #ifndef HAVE_CREDS
4792     if (o->auth_group)
4793         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4794 #endif
4795
4796     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4797         pa_ip_acl *ipa;
4798
4799         if (!(ipa = pa_ip_acl_new(acl))) {
4800             pa_log("Failed to parse IP ACL '%s'", acl);
4801             return -1;
4802         }
4803
4804         if (o->auth_ip_acl)
4805             pa_ip_acl_free(o->auth_ip_acl);
4806
4807         o->auth_ip_acl = ipa;
4808     }
4809
4810     enabled = TRUE;
4811     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4812         pa_log("auth-cookie-enabled= expects a boolean argument.");
4813         return -1;
4814     }
4815
4816     if (o->auth_cookie)
4817         pa_auth_cookie_unref(o->auth_cookie);
4818
4819     if (enabled) {
4820         const char *cn;
4821
4822         /* The new name for this is 'auth-cookie', for compat reasons
4823          * we check the old name too */
4824         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4825             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4826                 cn = PA_NATIVE_COOKIE_FILE;
4827
4828         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4829             return -1;
4830
4831     } else
4832           o->auth_cookie = NULL;
4833
4834     return 0;
4835 }
4836
4837 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4838     pa_native_connection_assert_ref(c);
4839
4840     return c->pstream;
4841 }