protocol-native: log explicitly each time a client triggers a volume change
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/native-common.h>
40 #include <pulsecore/packet.h>
41 #include <pulsecore/client.h>
42 #include <pulsecore/source-output.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/pstream.h>
45 #include <pulsecore/tagstruct.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/llist.h>
57 #include <pulsecore/creds.h>
58 #include <pulsecore/core-util.h>
59 #include <pulsecore/ipacl.h>
60 #include <pulsecore/thread-mq.h>
61
62 #include "protocol-native.h"
63
64 /* Kick a client if it doesn't authenticate within this time */
65 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
66
67 /* Don't accept more connection than this */
68 #define MAX_CONNECTIONS 64
69
70 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
71 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
72 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
73 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
74
75 struct pa_native_protocol;
76
77 typedef struct record_stream {
78     pa_msgobject parent;
79
80     pa_native_connection *connection;
81     uint32_t index;
82
83     pa_source_output *source_output;
84     pa_memblockq *memblockq;
85
86     pa_bool_t adjust_latency:1;
87     pa_bool_t early_requests:1;
88
89     pa_buffer_attr buffer_attr;
90
91     pa_atomic_t on_the_fly;
92     pa_usec_t configured_source_latency;
93     size_t drop_initial;
94
95     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
96     size_t on_the_fly_snapshot;
97     pa_usec_t current_monitor_latency;
98     pa_usec_t current_source_latency;
99 } record_stream;
100
101 PA_DECLARE_CLASS(record_stream);
102 #define RECORD_STREAM(o) (record_stream_cast(o))
103 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
104
105 typedef struct output_stream {
106     pa_msgobject parent;
107 } output_stream;
108
109 PA_DECLARE_CLASS(output_stream);
110 #define OUTPUT_STREAM(o) (output_stream_cast(o))
111 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
112
113 typedef struct playback_stream {
114     output_stream parent;
115
116     pa_native_connection *connection;
117     uint32_t index;
118
119     pa_sink_input *sink_input;
120     pa_memblockq *memblockq;
121
122     pa_bool_t adjust_latency:1;
123     pa_bool_t early_requests:1;
124
125     pa_bool_t is_underrun:1;
126     pa_bool_t drain_request:1;
127     uint32_t drain_tag;
128     uint32_t syncid;
129
130     pa_atomic_t missing;
131     pa_usec_t configured_sink_latency;
132     pa_buffer_attr buffer_attr;
133
134     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
135     int64_t read_index, write_index;
136     size_t render_memblockq_length;
137     pa_usec_t current_sink_latency;
138     uint64_t playing_for, underrun_for;
139 } playback_stream;
140
141 PA_DECLARE_CLASS(playback_stream);
142 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
143 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
144
145 typedef struct upload_stream {
146     output_stream parent;
147
148     pa_native_connection *connection;
149     uint32_t index;
150
151     pa_memchunk memchunk;
152     size_t length;
153     char *name;
154     pa_sample_spec sample_spec;
155     pa_channel_map channel_map;
156     pa_proplist *proplist;
157 } upload_stream;
158
159 PA_DECLARE_CLASS(upload_stream);
160 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
161 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
162
163 struct pa_native_connection {
164     pa_msgobject parent;
165     pa_native_protocol *protocol;
166     pa_native_options *options;
167     pa_bool_t authorized:1;
168     pa_bool_t is_local:1;
169     uint32_t version;
170     pa_client *client;
171     pa_pstream *pstream;
172     pa_pdispatch *pdispatch;
173     pa_idxset *record_streams, *output_streams;
174     uint32_t rrobin_index;
175     pa_subscription *subscription;
176     pa_time_event *auth_timeout_event;
177 };
178
179 PA_DECLARE_CLASS(pa_native_connection);
180 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
181 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
182
183 struct pa_native_protocol {
184     PA_REFCNT_DECLARE;
185
186     pa_core *core;
187     pa_idxset *connections;
188
189     pa_strlist *servers;
190     pa_hook hooks[PA_NATIVE_HOOK_MAX];
191
192     pa_hashmap *extensions;
193 };
194
195 enum {
196     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
197 };
198
199 enum {
200     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
201     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
202     SINK_INPUT_MESSAGE_FLUSH,
203     SINK_INPUT_MESSAGE_TRIGGER,
204     SINK_INPUT_MESSAGE_SEEK,
205     SINK_INPUT_MESSAGE_PREBUF_FORCE,
206     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
207     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
208 };
209
210 enum {
211     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
212     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
213     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
214     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
215     PLAYBACK_STREAM_MESSAGE_STARTED,
216     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
217 };
218
219 enum {
220     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
221 };
222
223 enum {
224     CONNECTION_MESSAGE_RELEASE,
225     CONNECTION_MESSAGE_REVOKE
226 };
227
228 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
229 static void sink_input_kill_cb(pa_sink_input *i);
230 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
231 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
232 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
235 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
236
237 static void native_connection_send_memblock(pa_native_connection *c);
238 static void playback_stream_request_bytes(struct playback_stream*s);
239
240 static void source_output_kill_cb(pa_source_output *o);
241 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
242 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
243 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
244 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
245 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
246
247 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
249
250 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
289
290 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
291     [PA_COMMAND_ERROR] = NULL,
292     [PA_COMMAND_TIMEOUT] = NULL,
293     [PA_COMMAND_REPLY] = NULL,
294     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
295     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
296     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
297     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
298     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
299     [PA_COMMAND_AUTH] = command_auth,
300     [PA_COMMAND_REQUEST] = NULL,
301     [PA_COMMAND_EXIT] = command_exit,
302     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
303     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
304     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
305     [PA_COMMAND_STAT] = command_stat,
306     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
307     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
308     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
309     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
310     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
311     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
312     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
313     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
314     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
315     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
316     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
317     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
318     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
319     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
320     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
321     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
328     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
329     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
330     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
331
332     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
333     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
334     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
335
336     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
337     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
338     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
339
340     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
341     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
342
343     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
344     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
346     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
347
348     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
349     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
350
351     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
352     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
353     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
354     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
355     [PA_COMMAND_KILL_CLIENT] = command_kill,
356     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
357     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
358     [PA_COMMAND_LOAD_MODULE] = command_load_module,
359     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
360
361     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
362     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
363     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
364     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
365
366     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
367     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
368
369     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
370     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
371
372     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
373     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
374
375     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
376     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
377     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
378
379     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
380     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
381     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
382
383     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
384
385     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
386     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
387
388     [PA_COMMAND_EXTENSION] = command_extension
389 };
390
391 /* structure management */
392
393 /* Called from main context */
394 static void upload_stream_unlink(upload_stream *s) {
395     pa_assert(s);
396
397     if (!s->connection)
398         return;
399
400     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
401     s->connection = NULL;
402     upload_stream_unref(s);
403 }
404
405 /* Called from main context */
406 static void upload_stream_free(pa_object *o) {
407     upload_stream *s = UPLOAD_STREAM(o);
408     pa_assert(s);
409
410     upload_stream_unlink(s);
411
412     pa_xfree(s->name);
413
414     if (s->proplist)
415         pa_proplist_free(s->proplist);
416
417     if (s->memchunk.memblock)
418         pa_memblock_unref(s->memchunk.memblock);
419
420     pa_xfree(s);
421 }
422
423 /* Called from main context */
424 static upload_stream* upload_stream_new(
425         pa_native_connection *c,
426         const pa_sample_spec *ss,
427         const pa_channel_map *map,
428         const char *name,
429         size_t length,
430         pa_proplist *p) {
431
432     upload_stream *s;
433
434     pa_assert(c);
435     pa_assert(ss);
436     pa_assert(name);
437     pa_assert(length > 0);
438     pa_assert(p);
439
440     s = pa_msgobject_new(upload_stream);
441     s->parent.parent.parent.free = upload_stream_free;
442     s->connection = c;
443     s->sample_spec = *ss;
444     s->channel_map = *map;
445     s->name = pa_xstrdup(name);
446     pa_memchunk_reset(&s->memchunk);
447     s->length = length;
448     s->proplist = pa_proplist_copy(p);
449     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
450
451     pa_idxset_put(c->output_streams, s, &s->index);
452
453     return s;
454 }
455
456 /* Called from main context */
457 static void record_stream_unlink(record_stream *s) {
458     pa_assert(s);
459
460     if (!s->connection)
461         return;
462
463     if (s->source_output) {
464         pa_source_output_unlink(s->source_output);
465         pa_source_output_unref(s->source_output);
466         s->source_output = NULL;
467     }
468
469     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
470     s->connection = NULL;
471     record_stream_unref(s);
472 }
473
474 /* Called from main context */
475 static void record_stream_free(pa_object *o) {
476     record_stream *s = RECORD_STREAM(o);
477     pa_assert(s);
478
479     record_stream_unlink(s);
480
481     pa_memblockq_free(s->memblockq);
482     pa_xfree(s);
483 }
484
485 /* Called from main context */
486 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
487     record_stream *s = RECORD_STREAM(o);
488     record_stream_assert_ref(s);
489
490     if (!s->connection)
491         return -1;
492
493     switch (code) {
494
495         case RECORD_STREAM_MESSAGE_POST_DATA:
496
497             /* We try to keep up to date with how many bytes are
498              * currently on the fly */
499             pa_atomic_sub(&s->on_the_fly, chunk->length);
500
501             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
502 /*                 pa_log_warn("Failed to push data into output queue."); */
503                 return -1;
504             }
505
506             if (!pa_pstream_is_pending(s->connection->pstream))
507                 native_connection_send_memblock(s->connection);
508
509             break;
510     }
511
512     return 0;
513 }
514
515 /* Called from main context */
516 static void fix_record_buffer_attr_pre(record_stream *s) {
517
518     size_t frame_size;
519     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
520
521     pa_assert(s);
522
523     /* This function will be called from the main thread, before as
524      * well as after the source output has been activated using
525      * pa_source_output_put()! That means it may not touch any
526      * ->thread_info data! */
527
528     frame_size = pa_frame_size(&s->source_output->sample_spec);
529
530     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
531         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
532     if (s->buffer_attr.maxlength <= 0)
533         s->buffer_attr.maxlength = (uint32_t) frame_size;
534
535     if (s->buffer_attr.fragsize == (uint32_t) -1)
536         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
537     if (s->buffer_attr.fragsize <= 0)
538         s->buffer_attr.fragsize = (uint32_t) frame_size;
539
540     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
541
542     if (s->early_requests) {
543
544         /* In early request mode we need to emulate the classic
545          * fragment-based playback model. We do this setting the source
546          * latency to the fragment size. */
547
548         source_usec = fragsize_usec;
549
550     } else if (s->adjust_latency) {
551
552         /* So, the user asked us to adjust the latency according to
553          * what the source can provide. Half the latency will be
554          * spent on the hw buffer, half of it in the async buffer
555          * queue we maintain for each client. */
556
557         source_usec = fragsize_usec/2;
558
559     } else {
560
561         /* Ok, the user didn't ask us to adjust the latency, hence we
562          * don't */
563
564         source_usec = (pa_usec_t) -1;
565     }
566
567     if (source_usec != (pa_usec_t) -1)
568         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
569     else
570         s->configured_source_latency = 0;
571
572     if (s->early_requests) {
573
574         /* Ok, we didn't necessarily get what we were asking for, so
575          * let's tell the user */
576
577         fragsize_usec = s->configured_source_latency;
578
579     } else if (s->adjust_latency) {
580
581         /* Now subtract what we actually got */
582
583         if (fragsize_usec >= s->configured_source_latency*2)
584             fragsize_usec -= s->configured_source_latency;
585         else
586             fragsize_usec = s->configured_source_latency;
587     }
588
589     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
590         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
591
592         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
593
594     if (s->buffer_attr.fragsize <= 0)
595         s->buffer_attr.fragsize = (uint32_t) frame_size;
596 }
597
598 /* Called from main context */
599 static void fix_record_buffer_attr_post(record_stream *s) {
600     size_t base;
601
602     pa_assert(s);
603
604     /* This function will be called from the main thread, before as
605      * well as after the source output has been activated using
606      * pa_source_output_put()! That means it may not touch and
607      * ->thread_info data! */
608
609     base = pa_frame_size(&s->source_output->sample_spec);
610
611     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
612     if (s->buffer_attr.fragsize <= 0)
613         s->buffer_attr.fragsize = base;
614
615     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
616         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
617 }
618
619 /* Called from main context */
620 static record_stream* record_stream_new(
621         pa_native_connection *c,
622         pa_source *source,
623         pa_sample_spec *ss,
624         pa_channel_map *map,
625         pa_bool_t peak_detect,
626         pa_buffer_attr *attr,
627         pa_source_output_flags_t flags,
628         pa_proplist *p,
629         pa_bool_t adjust_latency,
630         pa_sink_input *direct_on_input,
631         pa_bool_t early_requests,
632         int *ret) {
633
634     record_stream *s;
635     pa_source_output *source_output = NULL;
636     size_t base;
637     pa_source_output_new_data data;
638
639     pa_assert(c);
640     pa_assert(ss);
641     pa_assert(p);
642     pa_assert(ret);
643
644     pa_source_output_new_data_init(&data);
645
646     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
647     data.driver = __FILE__;
648     data.module = c->options->module;
649     data.client = c->client;
650     data.source = source;
651     data.direct_on_input = direct_on_input;
652     pa_source_output_new_data_set_sample_spec(&data, ss);
653     pa_source_output_new_data_set_channel_map(&data, map);
654     if (peak_detect)
655         data.resample_method = PA_RESAMPLER_PEAKS;
656
657     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
658
659     pa_source_output_new_data_done(&data);
660
661     if (!source_output)
662         return NULL;
663
664     s = pa_msgobject_new(record_stream);
665     s->parent.parent.free = record_stream_free;
666     s->parent.process_msg = record_stream_process_msg;
667     s->connection = c;
668     s->source_output = source_output;
669     s->buffer_attr = *attr;
670     s->adjust_latency = adjust_latency;
671     s->early_requests = early_requests;
672     pa_atomic_store(&s->on_the_fly, 0);
673
674     s->source_output->parent.process_msg = source_output_process_msg;
675     s->source_output->push = source_output_push_cb;
676     s->source_output->kill = source_output_kill_cb;
677     s->source_output->get_latency = source_output_get_latency_cb;
678     s->source_output->moving = source_output_moving_cb;
679     s->source_output->suspend = source_output_suspend_cb;
680     s->source_output->send_event = source_output_send_event_cb;
681     s->source_output->userdata = s;
682
683     fix_record_buffer_attr_pre(s);
684
685     s->memblockq = pa_memblockq_new(
686             0,
687             s->buffer_attr.maxlength,
688             0,
689             base = pa_frame_size(&source_output->sample_spec),
690             1,
691             0,
692             0,
693             NULL);
694
695     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
696     fix_record_buffer_attr_post(s);
697
698     *ss = s->source_output->sample_spec;
699     *map = s->source_output->channel_map;
700
701     pa_idxset_put(c->record_streams, s, &s->index);
702
703     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
704                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
705                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
706                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
707
708     pa_source_output_put(s->source_output);
709     return s;
710 }
711
712 /* Called from main context */
713 static void record_stream_send_killed(record_stream *r) {
714     pa_tagstruct *t;
715     record_stream_assert_ref(r);
716
717     t = pa_tagstruct_new(NULL, 0);
718     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
719     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
720     pa_tagstruct_putu32(t, r->index);
721     pa_pstream_send_tagstruct(r->connection->pstream, t);
722 }
723
724 /* Called from main context */
725 static void playback_stream_unlink(playback_stream *s) {
726     pa_assert(s);
727
728     if (!s->connection)
729         return;
730
731     if (s->sink_input) {
732         pa_sink_input_unlink(s->sink_input);
733         pa_sink_input_unref(s->sink_input);
734         s->sink_input = NULL;
735     }
736
737     if (s->drain_request)
738         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
739
740     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
741     s->connection = NULL;
742     playback_stream_unref(s);
743 }
744
745 /* Called from main context */
746 static void playback_stream_free(pa_object* o) {
747     playback_stream *s = PLAYBACK_STREAM(o);
748     pa_assert(s);
749
750     playback_stream_unlink(s);
751
752     pa_memblockq_free(s->memblockq);
753     pa_xfree(s);
754 }
755
756 /* Called from main context */
757 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
758     playback_stream *s = PLAYBACK_STREAM(o);
759     playback_stream_assert_ref(s);
760
761     if (!s->connection)
762         return -1;
763
764     switch (code) {
765         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
766             pa_tagstruct *t;
767             int l = 0;
768
769             for (;;) {
770                 if ((l = pa_atomic_load(&s->missing)) <= 0)
771                     return 0;
772
773                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
774                     break;
775             }
776
777             t = pa_tagstruct_new(NULL, 0);
778             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
779             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
780             pa_tagstruct_putu32(t, s->index);
781             pa_tagstruct_putu32(t, (uint32_t) l);
782             pa_pstream_send_tagstruct(s->connection->pstream, t);
783
784 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
785             break;
786         }
787
788         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
789             pa_tagstruct *t;
790
791 /*             pa_log("signalling underflow"); */
792
793             /* Report that we're empty */
794             t = pa_tagstruct_new(NULL, 0);
795             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
796             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
797             pa_tagstruct_putu32(t, s->index);
798             pa_pstream_send_tagstruct(s->connection->pstream, t);
799             break;
800         }
801
802         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
803             pa_tagstruct *t;
804
805             /* Notify the user we're overflowed*/
806             t = pa_tagstruct_new(NULL, 0);
807             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
808             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
809             pa_tagstruct_putu32(t, s->index);
810             pa_pstream_send_tagstruct(s->connection->pstream, t);
811             break;
812         }
813
814         case PLAYBACK_STREAM_MESSAGE_STARTED:
815
816             if (s->connection->version >= 13) {
817                 pa_tagstruct *t;
818
819                 /* Notify the user we started playback */
820                 t = pa_tagstruct_new(NULL, 0);
821                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
822                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
823                 pa_tagstruct_putu32(t, s->index);
824                 pa_pstream_send_tagstruct(s->connection->pstream, t);
825             }
826
827             break;
828
829         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
830             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
831             break;
832
833         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
834             pa_tagstruct *t;
835
836             s->buffer_attr.tlength = (uint32_t) offset;
837
838             t = pa_tagstruct_new(NULL, 0);
839             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
840             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
841             pa_tagstruct_putu32(t, s->index);
842             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
843             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
844             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
845             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
846             pa_tagstruct_put_usec(t, s->configured_sink_latency);
847             pa_pstream_send_tagstruct(s->connection->pstream, t);
848
849             break;
850         }
851     }
852
853     return 0;
854 }
855
856 /* Called from main context */
857 static void fix_playback_buffer_attr(playback_stream *s) {
858     size_t frame_size, max_prebuf;
859     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
860
861     pa_assert(s);
862
863     /* This function will be called from the main thread, before as
864      * well as after the sink input has been activated using
865      * pa_sink_input_put()! That means it may not touch any
866      * ->thread_info data, such as the memblockq! */
867
868     frame_size = pa_frame_size(&s->sink_input->sample_spec);
869
870     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
871         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
872     if (s->buffer_attr.maxlength <= 0)
873         s->buffer_attr.maxlength = (uint32_t) frame_size;
874
875     if (s->buffer_attr.tlength == (uint32_t) -1)
876         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
877     if (s->buffer_attr.tlength <= 0)
878         s->buffer_attr.tlength = (uint32_t) frame_size;
879
880     if (s->buffer_attr.minreq == (uint32_t) -1)
881         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
882     if (s->buffer_attr.minreq <= 0)
883         s->buffer_attr.minreq = (uint32_t) frame_size;
884
885     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
886         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
887
888     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
889     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
890
891     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
892                 (double) tlength_usec / PA_USEC_PER_MSEC,
893                 (double) minreq_usec / PA_USEC_PER_MSEC);
894
895     if (s->early_requests) {
896
897         /* In early request mode we need to emulate the classic
898          * fragment-based playback model. We do this setting the sink
899          * latency to the fragment size. */
900
901         sink_usec = minreq_usec;
902         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
903
904     } else if (s->adjust_latency) {
905
906         /* So, the user asked us to adjust the latency of the stream
907          * buffer according to the what the sink can provide. The
908          * tlength passed in shall be the overall latency. Roughly
909          * half the latency will be spent on the hw buffer, the other
910          * half of it in the async buffer queue we maintain for each
911          * client. In between we'll have a safety space of size
912          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
913          * empty and needs to be filled, then our buffer must have
914          * enough data to fulfill this request immediatly and thus
915          * have at least the same tlength as the size of the hw
916          * buffer. It additionally needs space for 2 times minreq
917          * because if the buffer ran empty and a partial fillup
918          * happens immediately on the next iteration we need to be
919          * able to fulfill it and give the application also minreq
920          * time to fill it up again for the next request Makes 2 times
921          * minreq in plus.. */
922
923         if (tlength_usec > minreq_usec*2)
924             sink_usec = (tlength_usec - minreq_usec*2)/2;
925         else
926             sink_usec = 0;
927
928         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
929
930     } else {
931
932         /* Ok, the user didn't ask us to adjust the latency, but we
933          * still need to make sure that the parameters from the user
934          * do make sense. */
935
936         if (tlength_usec > minreq_usec*2)
937             sink_usec = (tlength_usec - minreq_usec*2);
938         else
939             sink_usec = 0;
940
941         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
942     }
943
944     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
945
946     if (s->early_requests) {
947
948         /* Ok, we didn't necessarily get what we were asking for, so
949          * let's tell the user */
950
951         minreq_usec = s->configured_sink_latency;
952
953     } else if (s->adjust_latency) {
954
955         /* Ok, we didn't necessarily get what we were asking for, so
956          * let's subtract from what we asked for for the remaining
957          * buffer space */
958
959         if (tlength_usec >= s->configured_sink_latency)
960             tlength_usec -= s->configured_sink_latency;
961     }
962
963     /* FIXME: This is actually larger than necessary, since not all of
964      * the sink latency is actually rewritable. */
965     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
966         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
967
968     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
969         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
970         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
971
972     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
973         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
974         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
975
976     if (s->buffer_attr.minreq <= 0) {
977         s->buffer_attr.minreq = (uint32_t) frame_size;
978         s->buffer_attr.tlength += (uint32_t) frame_size*2;
979     }
980
981     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
982         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
983
984     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
985
986     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
987         s->buffer_attr.prebuf > max_prebuf)
988         s->buffer_attr.prebuf = max_prebuf;
989 }
990
991 /* Called from main context */
992 static playback_stream* playback_stream_new(
993         pa_native_connection *c,
994         pa_sink *sink,
995         pa_sample_spec *ss,
996         pa_channel_map *map,
997         pa_buffer_attr *a,
998         pa_cvolume *volume,
999         pa_bool_t muted,
1000         pa_bool_t muted_set,
1001         uint32_t syncid,
1002         uint32_t *missing,
1003         pa_sink_input_flags_t flags,
1004         pa_proplist *p,
1005         pa_bool_t adjust_latency,
1006         pa_bool_t early_requests,
1007         int *ret) {
1008
1009     playback_stream *s, *ssync;
1010     pa_sink_input *sink_input = NULL;
1011     pa_memchunk silence;
1012     uint32_t idx;
1013     int64_t start_index;
1014     pa_sink_input_new_data data;
1015
1016     pa_assert(c);
1017     pa_assert(ss);
1018     pa_assert(missing);
1019     pa_assert(p);
1020     pa_assert(ret);
1021
1022     /* Find syncid group */
1023     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1024
1025         if (!playback_stream_isinstance(ssync))
1026             continue;
1027
1028         if (ssync->syncid == syncid)
1029             break;
1030     }
1031
1032     /* Synced streams must connect to the same sink */
1033     if (ssync) {
1034
1035         if (!sink)
1036             sink = ssync->sink_input->sink;
1037         else if (sink != ssync->sink_input->sink) {
1038             *ret = PA_ERR_INVALID;
1039             return NULL;
1040         }
1041     }
1042
1043     pa_sink_input_new_data_init(&data);
1044
1045     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1046     data.driver = __FILE__;
1047     data.module = c->options->module;
1048     data.client = c->client;
1049     data.sink = sink;
1050     pa_sink_input_new_data_set_sample_spec(&data, ss);
1051     pa_sink_input_new_data_set_channel_map(&data, map);
1052     if (volume)
1053         pa_sink_input_new_data_set_volume(&data, volume);
1054     if (muted_set)
1055         pa_sink_input_new_data_set_muted(&data, muted);
1056     data.sync_base = ssync ? ssync->sink_input : NULL;
1057
1058     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1059
1060     pa_sink_input_new_data_done(&data);
1061
1062     if (!sink_input)
1063         return NULL;
1064
1065     s = pa_msgobject_new(playback_stream);
1066     s->parent.parent.parent.free = playback_stream_free;
1067     s->parent.parent.process_msg = playback_stream_process_msg;
1068     s->connection = c;
1069     s->syncid = syncid;
1070     s->sink_input = sink_input;
1071     s->is_underrun = TRUE;
1072     s->drain_request = FALSE;
1073     pa_atomic_store(&s->missing, 0);
1074     s->buffer_attr = *a;
1075     s->adjust_latency = adjust_latency;
1076     s->early_requests = early_requests;
1077
1078     s->sink_input->parent.process_msg = sink_input_process_msg;
1079     s->sink_input->pop = sink_input_pop_cb;
1080     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1081     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1082     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1083     s->sink_input->kill = sink_input_kill_cb;
1084     s->sink_input->moving = sink_input_moving_cb;
1085     s->sink_input->suspend = sink_input_suspend_cb;
1086     s->sink_input->send_event = sink_input_send_event_cb;
1087     s->sink_input->userdata = s;
1088
1089     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1090
1091     fix_playback_buffer_attr(s);
1092
1093     pa_sink_input_get_silence(sink_input, &silence);
1094     s->memblockq = pa_memblockq_new(
1095             start_index,
1096             s->buffer_attr.maxlength,
1097             s->buffer_attr.tlength,
1098             pa_frame_size(&sink_input->sample_spec),
1099             s->buffer_attr.prebuf,
1100             s->buffer_attr.minreq,
1101             0,
1102             &silence);
1103     pa_memblock_unref(silence.memblock);
1104
1105     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1106
1107     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1108
1109     *ss = s->sink_input->sample_spec;
1110     *map = s->sink_input->channel_map;
1111
1112     pa_idxset_put(c->output_streams, s, &s->index);
1113
1114     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1115                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1116                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1117                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1118                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1119
1120     pa_sink_input_put(s->sink_input);
1121     return s;
1122 }
1123
1124 /* Called from IO context */
1125 static void playback_stream_request_bytes(playback_stream *s) {
1126     size_t m, minreq;
1127     int previous_missing;
1128
1129     playback_stream_assert_ref(s);
1130
1131     m = pa_memblockq_pop_missing(s->memblockq);
1132
1133     if (m <= 0)
1134         return;
1135
1136 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1137
1138     previous_missing = pa_atomic_add(&s->missing, (int) m);
1139     minreq = pa_memblockq_get_minreq(s->memblockq);
1140
1141     if (pa_memblockq_prebuf_active(s->memblockq) ||
1142         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1143         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1144 }
1145
1146
1147 /* Called from main context */
1148 static void playback_stream_send_killed(playback_stream *p) {
1149     pa_tagstruct *t;
1150     playback_stream_assert_ref(p);
1151
1152     t = pa_tagstruct_new(NULL, 0);
1153     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1154     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1155     pa_tagstruct_putu32(t, p->index);
1156     pa_pstream_send_tagstruct(p->connection->pstream, t);
1157 }
1158
1159 /* Called from main context */
1160 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1161     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1162     pa_native_connection_assert_ref(c);
1163
1164     if (!c->protocol)
1165         return -1;
1166
1167     switch (code) {
1168
1169         case CONNECTION_MESSAGE_REVOKE:
1170             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1171             break;
1172
1173         case CONNECTION_MESSAGE_RELEASE:
1174             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1175             break;
1176     }
1177
1178     return 0;
1179 }
1180
1181 /* Called from main context */
1182 static void native_connection_unlink(pa_native_connection *c) {
1183     record_stream *r;
1184     output_stream *o;
1185
1186     pa_assert(c);
1187
1188     if (!c->protocol)
1189         return;
1190
1191     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1192
1193     if (c->options)
1194         pa_native_options_unref(c->options);
1195
1196     while ((r = pa_idxset_first(c->record_streams, NULL)))
1197         record_stream_unlink(r);
1198
1199     while ((o = pa_idxset_first(c->output_streams, NULL)))
1200         if (playback_stream_isinstance(o))
1201             playback_stream_unlink(PLAYBACK_STREAM(o));
1202         else
1203             upload_stream_unlink(UPLOAD_STREAM(o));
1204
1205     if (c->subscription)
1206         pa_subscription_free(c->subscription);
1207
1208     if (c->pstream)
1209         pa_pstream_unlink(c->pstream);
1210
1211     if (c->auth_timeout_event) {
1212         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1213         c->auth_timeout_event = NULL;
1214     }
1215
1216     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1217     c->protocol = NULL;
1218     pa_native_connection_unref(c);
1219 }
1220
1221 /* Called from main context */
1222 static void native_connection_free(pa_object *o) {
1223     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1224
1225     pa_assert(c);
1226
1227     native_connection_unlink(c);
1228
1229     pa_idxset_free(c->record_streams, NULL, NULL);
1230     pa_idxset_free(c->output_streams, NULL, NULL);
1231
1232     pa_pdispatch_unref(c->pdispatch);
1233     pa_pstream_unref(c->pstream);
1234     pa_client_free(c->client);
1235
1236     pa_xfree(c);
1237 }
1238
1239 /* Called from main context */
1240 static void native_connection_send_memblock(pa_native_connection *c) {
1241     uint32_t start;
1242     record_stream *r;
1243
1244     start = PA_IDXSET_INVALID;
1245     for (;;) {
1246         pa_memchunk chunk;
1247
1248         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1249             return;
1250
1251         if (start == PA_IDXSET_INVALID)
1252             start = c->rrobin_index;
1253         else if (start == c->rrobin_index)
1254             return;
1255
1256         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1257             pa_memchunk schunk = chunk;
1258
1259             if (schunk.length > r->buffer_attr.fragsize)
1260                 schunk.length = r->buffer_attr.fragsize;
1261
1262             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1263
1264             pa_memblockq_drop(r->memblockq, schunk.length);
1265             pa_memblock_unref(schunk.memblock);
1266
1267             return;
1268         }
1269     }
1270 }
1271
1272 /*** sink input callbacks ***/
1273
1274 /* Called from thread context */
1275 static void handle_seek(playback_stream *s, int64_t indexw) {
1276     playback_stream_assert_ref(s);
1277
1278 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1279
1280     if (s->sink_input->thread_info.underrun_for > 0) {
1281
1282 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1283
1284         if (pa_memblockq_is_readable(s->memblockq)) {
1285
1286             /* We just ended an underrun, let's ask the sink
1287              * for a complete rewind rewrite */
1288
1289             pa_log_debug("Requesting rewind due to end of underrun.");
1290             pa_sink_input_request_rewind(s->sink_input,
1291                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1292                                          FALSE, TRUE, FALSE);
1293         }
1294
1295     } else {
1296         int64_t indexr;
1297
1298         indexr = pa_memblockq_get_read_index(s->memblockq);
1299
1300         if (indexw < indexr) {
1301             /* OK, the sink already asked for this data, so
1302              * let's have it usk us again */
1303
1304             pa_log_debug("Requesting rewind due to rewrite.");
1305             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1306         }
1307     }
1308
1309     playback_stream_request_bytes(s);
1310 }
1311
1312 /* Called from thread context */
1313 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1314     pa_sink_input *i = PA_SINK_INPUT(o);
1315     playback_stream *s;
1316
1317     pa_sink_input_assert_ref(i);
1318     s = PLAYBACK_STREAM(i->userdata);
1319     playback_stream_assert_ref(s);
1320
1321     switch (code) {
1322
1323         case SINK_INPUT_MESSAGE_SEEK: {
1324             int64_t windex;
1325
1326             windex = pa_memblockq_get_write_index(s->memblockq);
1327
1328             /* The client side is incapable of accounting correctly
1329              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1330              * able to deal with that. */
1331
1332             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1333
1334             handle_seek(s, windex);
1335             return 0;
1336         }
1337
1338         case SINK_INPUT_MESSAGE_POST_DATA: {
1339             int64_t windex;
1340
1341             pa_assert(chunk);
1342
1343             windex = pa_memblockq_get_write_index(s->memblockq);
1344
1345 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1346
1347             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1348                 pa_log_warn("Failed to push data into queue");
1349                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1350                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1351             }
1352
1353             handle_seek(s, windex);
1354
1355 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1356
1357             return 0;
1358         }
1359
1360         case SINK_INPUT_MESSAGE_DRAIN:
1361         case SINK_INPUT_MESSAGE_FLUSH:
1362         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1363         case SINK_INPUT_MESSAGE_TRIGGER: {
1364
1365             int64_t windex;
1366             pa_sink_input *isync;
1367             void (*func)(pa_memblockq *bq);
1368
1369             switch  (code) {
1370                 case SINK_INPUT_MESSAGE_FLUSH:
1371                     func = pa_memblockq_flush_write;
1372                     break;
1373
1374                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1375                     func = pa_memblockq_prebuf_force;
1376                     break;
1377
1378                 case SINK_INPUT_MESSAGE_DRAIN:
1379                 case SINK_INPUT_MESSAGE_TRIGGER:
1380                     func = pa_memblockq_prebuf_disable;
1381                     break;
1382
1383                 default:
1384                     pa_assert_not_reached();
1385             }
1386
1387             windex = pa_memblockq_get_write_index(s->memblockq);
1388             func(s->memblockq);
1389             handle_seek(s, windex);
1390
1391             /* Do the same for all other members in the sync group */
1392             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1393                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1394                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1395                 func(ssync->memblockq);
1396                 handle_seek(ssync, windex);
1397             }
1398
1399             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1400                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1401                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1402                 func(ssync->memblockq);
1403                 handle_seek(ssync, windex);
1404             }
1405
1406             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1407                 if (!pa_memblockq_is_readable(s->memblockq))
1408                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1409                 else {
1410                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1411                     s->drain_request = TRUE;
1412                 }
1413             }
1414
1415             return 0;
1416         }
1417
1418         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1419             /* Atomically get a snapshot of all timing parameters... */
1420             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1421             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1422             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1423             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1424             s->underrun_for = s->sink_input->thread_info.underrun_for;
1425             s->playing_for = s->sink_input->thread_info.playing_for;
1426
1427             return 0;
1428
1429         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1430             int64_t windex;
1431
1432             windex = pa_memblockq_get_write_index(s->memblockq);
1433
1434             pa_memblockq_prebuf_force(s->memblockq);
1435
1436             handle_seek(s, windex);
1437
1438             /* Fall through to the default handler */
1439             break;
1440         }
1441
1442         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1443             pa_usec_t *r = userdata;
1444
1445             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1446
1447             /* Fall through, the default handler will add in the extra
1448              * latency added by the resampler */
1449             break;
1450         }
1451
1452         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1453             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1454             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1455             return 0;
1456         }
1457     }
1458
1459     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1460 }
1461
1462 /* Called from thread context */
1463 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1464     playback_stream *s;
1465
1466     pa_sink_input_assert_ref(i);
1467     s = PLAYBACK_STREAM(i->userdata);
1468     playback_stream_assert_ref(s);
1469     pa_assert(chunk);
1470
1471 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1472
1473     if (pa_memblockq_is_readable(s->memblockq))
1474         s->is_underrun = FALSE;
1475     else {
1476         if (!s->is_underrun)
1477             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1478
1479         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1480             s->drain_request = FALSE;
1481             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1482         } else if (!s->is_underrun)
1483             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1484
1485         s->is_underrun = TRUE;
1486
1487         playback_stream_request_bytes(s);
1488     }
1489
1490     /* This call will not fail with prebuf=0, hence we check for
1491        underrun explicitly above */
1492     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1493         return -1;
1494
1495     chunk->length = PA_MIN(nbytes, chunk->length);
1496
1497     if (i->thread_info.underrun_for > 0)
1498         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1499
1500     pa_memblockq_drop(s->memblockq, chunk->length);
1501     playback_stream_request_bytes(s);
1502
1503     return 0;
1504 }
1505
1506 /* Called from thread context */
1507 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1508     playback_stream *s;
1509
1510     pa_sink_input_assert_ref(i);
1511     s = PLAYBACK_STREAM(i->userdata);
1512     playback_stream_assert_ref(s);
1513
1514     /* If we are in an underrun, then we don't rewind */
1515     if (i->thread_info.underrun_for > 0)
1516         return;
1517
1518     pa_memblockq_rewind(s->memblockq, nbytes);
1519 }
1520
1521 /* Called from thread context */
1522 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1523     playback_stream *s;
1524
1525     pa_sink_input_assert_ref(i);
1526     s = PLAYBACK_STREAM(i->userdata);
1527     playback_stream_assert_ref(s);
1528
1529     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1530 }
1531
1532 /* Called from thread context */
1533 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1534     playback_stream *s;
1535     size_t new_tlength, old_tlength;
1536
1537     pa_sink_input_assert_ref(i);
1538     s = PLAYBACK_STREAM(i->userdata);
1539     playback_stream_assert_ref(s);
1540
1541     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1542     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1543
1544     if (old_tlength < new_tlength) {
1545         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1546         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1547         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1548
1549         if (new_tlength == old_tlength)
1550             pa_log_debug("Failed to increase tlength");
1551         else {
1552             pa_log_debug("Notifying client about increased tlength");
1553             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1554         }
1555     }
1556 }
1557
1558 /* Called from main context */
1559 static void sink_input_kill_cb(pa_sink_input *i) {
1560     playback_stream *s;
1561
1562     pa_sink_input_assert_ref(i);
1563     s = PLAYBACK_STREAM(i->userdata);
1564     playback_stream_assert_ref(s);
1565
1566     playback_stream_send_killed(s);
1567     playback_stream_unlink(s);
1568 }
1569
1570 /* Called from main context */
1571 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1572     playback_stream *s;
1573     pa_tagstruct *t;
1574
1575     pa_sink_input_assert_ref(i);
1576     s = PLAYBACK_STREAM(i->userdata);
1577     playback_stream_assert_ref(s);
1578
1579     if (s->connection->version < 15)
1580       return;
1581
1582     t = pa_tagstruct_new(NULL, 0);
1583     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1584     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1585     pa_tagstruct_putu32(t, s->index);
1586     pa_tagstruct_puts(t, event);
1587     pa_tagstruct_put_proplist(t, pl);
1588     pa_pstream_send_tagstruct(s->connection->pstream, t);
1589 }
1590
1591 /* Called from main context */
1592 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1593     playback_stream *s;
1594     pa_tagstruct *t;
1595
1596     pa_sink_input_assert_ref(i);
1597     s = PLAYBACK_STREAM(i->userdata);
1598     playback_stream_assert_ref(s);
1599
1600     if (s->connection->version < 12)
1601       return;
1602
1603     t = pa_tagstruct_new(NULL, 0);
1604     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1605     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1606     pa_tagstruct_putu32(t, s->index);
1607     pa_tagstruct_put_boolean(t, suspend);
1608     pa_pstream_send_tagstruct(s->connection->pstream, t);
1609 }
1610
1611 /* Called from main context */
1612 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1613     playback_stream *s;
1614     pa_tagstruct *t;
1615
1616     pa_sink_input_assert_ref(i);
1617     s = PLAYBACK_STREAM(i->userdata);
1618     playback_stream_assert_ref(s);
1619
1620     fix_playback_buffer_attr(s);
1621     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1622     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1623
1624     if (s->connection->version < 12)
1625       return;
1626
1627     t = pa_tagstruct_new(NULL, 0);
1628     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1629     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1630     pa_tagstruct_putu32(t, s->index);
1631     pa_tagstruct_putu32(t, dest->index);
1632     pa_tagstruct_puts(t, dest->name);
1633     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1634
1635     if (s->connection->version >= 13) {
1636         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1637         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1638         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1639         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1640         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1641     }
1642
1643     pa_pstream_send_tagstruct(s->connection->pstream, t);
1644 }
1645
1646 /*** source_output callbacks ***/
1647
1648 /* Called from thread context */
1649 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1650     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1651     record_stream *s;
1652
1653     pa_source_output_assert_ref(o);
1654     s = RECORD_STREAM(o->userdata);
1655     record_stream_assert_ref(s);
1656
1657     switch (code) {
1658         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1659             /* Atomically get a snapshot of all timing parameters... */
1660             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1661             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1662             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1663             return 0;
1664     }
1665
1666     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1667 }
1668
1669 /* Called from thread context */
1670 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1671     record_stream *s;
1672
1673     pa_source_output_assert_ref(o);
1674     s = RECORD_STREAM(o->userdata);
1675     record_stream_assert_ref(s);
1676     pa_assert(chunk);
1677
1678     pa_atomic_add(&s->on_the_fly, chunk->length);
1679     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1680 }
1681
1682 static void source_output_kill_cb(pa_source_output *o) {
1683     record_stream *s;
1684
1685     pa_source_output_assert_ref(o);
1686     s = RECORD_STREAM(o->userdata);
1687     record_stream_assert_ref(s);
1688
1689     record_stream_send_killed(s);
1690     record_stream_unlink(s);
1691 }
1692
1693 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1694     record_stream *s;
1695
1696     pa_source_output_assert_ref(o);
1697     s = RECORD_STREAM(o->userdata);
1698     record_stream_assert_ref(s);
1699
1700     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1701
1702     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1703 }
1704
1705 /* Called from main context */
1706 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1707     record_stream *s;
1708     pa_tagstruct *t;
1709
1710     pa_source_output_assert_ref(o);
1711     s = RECORD_STREAM(o->userdata);
1712     record_stream_assert_ref(s);
1713
1714     if (s->connection->version < 15)
1715       return;
1716
1717     t = pa_tagstruct_new(NULL, 0);
1718     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1719     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1720     pa_tagstruct_putu32(t, s->index);
1721     pa_tagstruct_puts(t, event);
1722     pa_tagstruct_put_proplist(t, pl);
1723     pa_pstream_send_tagstruct(s->connection->pstream, t);
1724 }
1725
1726 /* Called from main context */
1727 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1728     record_stream *s;
1729     pa_tagstruct *t;
1730
1731     pa_source_output_assert_ref(o);
1732     s = RECORD_STREAM(o->userdata);
1733     record_stream_assert_ref(s);
1734
1735     if (s->connection->version < 12)
1736       return;
1737
1738     t = pa_tagstruct_new(NULL, 0);
1739     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1740     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1741     pa_tagstruct_putu32(t, s->index);
1742     pa_tagstruct_put_boolean(t, suspend);
1743     pa_pstream_send_tagstruct(s->connection->pstream, t);
1744 }
1745
1746 /* Called from main context */
1747 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1748     record_stream *s;
1749     pa_tagstruct *t;
1750
1751     pa_source_output_assert_ref(o);
1752     s = RECORD_STREAM(o->userdata);
1753     record_stream_assert_ref(s);
1754
1755     fix_record_buffer_attr_pre(s);
1756     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1757     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1758     fix_record_buffer_attr_post(s);
1759
1760     if (s->connection->version < 12)
1761       return;
1762
1763     t = pa_tagstruct_new(NULL, 0);
1764     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1765     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1766     pa_tagstruct_putu32(t, s->index);
1767     pa_tagstruct_putu32(t, dest->index);
1768     pa_tagstruct_puts(t, dest->name);
1769     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1770
1771     if (s->connection->version >= 13) {
1772         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1773         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1774         pa_tagstruct_put_usec(t, s->configured_source_latency);
1775     }
1776
1777     pa_pstream_send_tagstruct(s->connection->pstream, t);
1778 }
1779
1780 /*** pdispatch callbacks ***/
1781
1782 static void protocol_error(pa_native_connection *c) {
1783     pa_log("protocol error, kicking client");
1784     native_connection_unlink(c);
1785 }
1786
1787 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1788 if (!(expression)) { \
1789     pa_pstream_send_error((pstream), (tag), (error)); \
1790     return; \
1791 } \
1792 } while(0);
1793
1794 static pa_tagstruct *reply_new(uint32_t tag) {
1795     pa_tagstruct *reply;
1796
1797     reply = pa_tagstruct_new(NULL, 0);
1798     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1799     pa_tagstruct_putu32(reply, tag);
1800     return reply;
1801 }
1802
1803 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1804     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1805     playback_stream *s;
1806     uint32_t sink_index, syncid, missing;
1807     pa_buffer_attr attr;
1808     const char *name = NULL, *sink_name;
1809     pa_sample_spec ss;
1810     pa_channel_map map;
1811     pa_tagstruct *reply;
1812     pa_sink *sink = NULL;
1813     pa_cvolume volume;
1814     pa_bool_t
1815         corked = FALSE,
1816         no_remap = FALSE,
1817         no_remix = FALSE,
1818         fix_format = FALSE,
1819         fix_rate = FALSE,
1820         fix_channels = FALSE,
1821         no_move = FALSE,
1822         variable_rate = FALSE,
1823         muted = FALSE,
1824         adjust_latency = FALSE,
1825         early_requests = FALSE,
1826         dont_inhibit_auto_suspend = FALSE,
1827         muted_set = FALSE,
1828         fail_on_suspend = FALSE;
1829     pa_sink_input_flags_t flags = 0;
1830     pa_proplist *p;
1831     pa_bool_t volume_set = TRUE;
1832     int ret = PA_ERR_INVALID;
1833
1834     pa_native_connection_assert_ref(c);
1835     pa_assert(t);
1836     memset(&attr, 0, sizeof(attr));
1837
1838     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1839         pa_tagstruct_get(
1840                 t,
1841                 PA_TAG_SAMPLE_SPEC, &ss,
1842                 PA_TAG_CHANNEL_MAP, &map,
1843                 PA_TAG_U32, &sink_index,
1844                 PA_TAG_STRING, &sink_name,
1845                 PA_TAG_U32, &attr.maxlength,
1846                 PA_TAG_BOOLEAN, &corked,
1847                 PA_TAG_U32, &attr.tlength,
1848                 PA_TAG_U32, &attr.prebuf,
1849                 PA_TAG_U32, &attr.minreq,
1850                 PA_TAG_U32, &syncid,
1851                 PA_TAG_CVOLUME, &volume,
1852                 PA_TAG_INVALID) < 0) {
1853
1854         protocol_error(c);
1855         return;
1856     }
1857
1858     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1859     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1860     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1861     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1862     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1863     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1864     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1865     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1866
1867     p = pa_proplist_new();
1868
1869     if (name)
1870         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1871
1872     if (c->version >= 12)  {
1873         /* Since 0.9.8 the user can ask for a couple of additional flags */
1874
1875         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1876             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1877             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1878             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1879             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1880             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1881             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1882
1883             protocol_error(c);
1884             pa_proplist_free(p);
1885             return;
1886         }
1887     }
1888
1889     if (c->version >= 13) {
1890
1891         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1892             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1893             pa_tagstruct_get_proplist(t, p) < 0) {
1894             protocol_error(c);
1895             pa_proplist_free(p);
1896             return;
1897         }
1898     }
1899
1900     if (c->version >= 14) {
1901
1902         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1903             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1904             protocol_error(c);
1905             pa_proplist_free(p);
1906             return;
1907         }
1908     }
1909
1910     if (c->version >= 15) {
1911
1912         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1913             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1914             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1915             protocol_error(c);
1916             pa_proplist_free(p);
1917             return;
1918         }
1919     }
1920
1921     if (!pa_tagstruct_eof(t)) {
1922         protocol_error(c);
1923         pa_proplist_free(p);
1924         return;
1925     }
1926
1927     if (sink_index != PA_INVALID_INDEX) {
1928
1929         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1930             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1931             pa_proplist_free(p);
1932             return;
1933         }
1934
1935     } else if (sink_name) {
1936
1937         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1938             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1939             pa_proplist_free(p);
1940             return;
1941         }
1942     }
1943
1944     flags =
1945         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1946         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1947         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1948         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1949         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1950         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1951         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1952         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1953         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1954         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1955
1956     /* Only since protocol version 15 there's a seperate muted_set
1957      * flag. For older versions we synthesize it here */
1958     muted_set = muted_set || muted;
1959
1960     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1961     pa_proplist_free(p);
1962
1963     CHECK_VALIDITY(c->pstream, s, tag, ret);
1964
1965     reply = reply_new(tag);
1966     pa_tagstruct_putu32(reply, s->index);
1967     pa_assert(s->sink_input);
1968     pa_tagstruct_putu32(reply, s->sink_input->index);
1969     pa_tagstruct_putu32(reply, missing);
1970
1971 /*     pa_log("initial request is %u", missing); */
1972
1973     if (c->version >= 9) {
1974         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1975
1976         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1977         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1978         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1979         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1980     }
1981
1982     if (c->version >= 12) {
1983         /* Since 0.9.8 we support sending the chosen sample
1984          * spec/channel map/device/suspend status back to the
1985          * client */
1986
1987         pa_tagstruct_put_sample_spec(reply, &ss);
1988         pa_tagstruct_put_channel_map(reply, &map);
1989
1990         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1991         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1992
1993         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
1994     }
1995
1996     if (c->version >= 13)
1997         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
1998
1999     pa_pstream_send_tagstruct(c->pstream, reply);
2000 }
2001
2002 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2003     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2004     uint32_t channel;
2005
2006     pa_native_connection_assert_ref(c);
2007     pa_assert(t);
2008
2009     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2010         !pa_tagstruct_eof(t)) {
2011         protocol_error(c);
2012         return;
2013     }
2014
2015     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2016
2017     switch (command) {
2018
2019         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2020             playback_stream *s;
2021             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2022                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2023                 return;
2024             }
2025
2026             playback_stream_unlink(s);
2027             break;
2028         }
2029
2030         case PA_COMMAND_DELETE_RECORD_STREAM: {
2031             record_stream *s;
2032             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2033                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2034                 return;
2035             }
2036
2037             record_stream_unlink(s);
2038             break;
2039         }
2040
2041         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2042             upload_stream *s;
2043
2044             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2045                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2046                 return;
2047             }
2048
2049             upload_stream_unlink(s);
2050             break;
2051         }
2052
2053         default:
2054             pa_assert_not_reached();
2055     }
2056
2057     pa_pstream_send_simple_ack(c->pstream, tag);
2058 }
2059
2060 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2061     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2062     record_stream *s;
2063     pa_buffer_attr attr;
2064     uint32_t source_index;
2065     const char *name = NULL, *source_name;
2066     pa_sample_spec ss;
2067     pa_channel_map map;
2068     pa_tagstruct *reply;
2069     pa_source *source = NULL;
2070     pa_bool_t
2071         corked = FALSE,
2072         no_remap = FALSE,
2073         no_remix = FALSE,
2074         fix_format = FALSE,
2075         fix_rate = FALSE,
2076         fix_channels = FALSE,
2077         no_move = FALSE,
2078         variable_rate = FALSE,
2079         adjust_latency = FALSE,
2080         peak_detect = FALSE,
2081         early_requests = FALSE,
2082         dont_inhibit_auto_suspend = FALSE,
2083         fail_on_suspend = FALSE;
2084     pa_source_output_flags_t flags = 0;
2085     pa_proplist *p;
2086     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2087     pa_sink_input *direct_on_input = NULL;
2088     int ret = PA_ERR_INVALID;
2089
2090     pa_native_connection_assert_ref(c);
2091     pa_assert(t);
2092
2093     memset(&attr, 0, sizeof(attr));
2094
2095     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2096         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2097         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2098         pa_tagstruct_getu32(t, &source_index) < 0 ||
2099         pa_tagstruct_gets(t, &source_name) < 0 ||
2100         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2101         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2102         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2103         protocol_error(c);
2104         return;
2105     }
2106
2107     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2108     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2109     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2110     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2111     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2112     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2113     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2114
2115     p = pa_proplist_new();
2116
2117     if (name)
2118         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2119
2120     if (c->version >= 12)  {
2121         /* Since 0.9.8 the user can ask for a couple of additional flags */
2122
2123         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2124             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2125             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2126             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2127             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2128             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2129             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2130
2131             protocol_error(c);
2132             pa_proplist_free(p);
2133             return;
2134         }
2135     }
2136
2137     if (c->version >= 13) {
2138
2139         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2140             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2141             pa_tagstruct_get_proplist(t, p) < 0 ||
2142             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2143             protocol_error(c);
2144             pa_proplist_free(p);
2145             return;
2146         }
2147     }
2148
2149     if (c->version >= 14) {
2150
2151         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2152             protocol_error(c);
2153             pa_proplist_free(p);
2154             return;
2155         }
2156     }
2157
2158     if (c->version >= 15) {
2159
2160         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2161             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2162             protocol_error(c);
2163             pa_proplist_free(p);
2164             return;
2165         }
2166     }
2167
2168     if (!pa_tagstruct_eof(t)) {
2169         protocol_error(c);
2170         pa_proplist_free(p);
2171         return;
2172     }
2173
2174     if (source_index != PA_INVALID_INDEX) {
2175
2176         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2177             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2178             pa_proplist_free(p);
2179             return;
2180         }
2181
2182     } else if (source_name) {
2183
2184         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2185             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2186             pa_proplist_free(p);
2187             return;
2188         }
2189     }
2190
2191     if (direct_on_input_idx != PA_INVALID_INDEX) {
2192
2193         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2194             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2195             pa_proplist_free(p);
2196             return;
2197         }
2198     }
2199
2200     flags =
2201         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2202         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2203         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2204         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2205         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2206         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2207         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2208         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2209         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2210         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2211
2212     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2213     pa_proplist_free(p);
2214
2215     CHECK_VALIDITY(c->pstream, s, tag, ret);
2216
2217     reply = reply_new(tag);
2218     pa_tagstruct_putu32(reply, s->index);
2219     pa_assert(s->source_output);
2220     pa_tagstruct_putu32(reply, s->source_output->index);
2221
2222     if (c->version >= 9) {
2223         /* Since 0.9 we support sending the buffer metrics back to the client */
2224
2225         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2226         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2227     }
2228
2229     if (c->version >= 12) {
2230         /* Since 0.9.8 we support sending the chosen sample
2231          * spec/channel map/device/suspend status back to the
2232          * client */
2233
2234         pa_tagstruct_put_sample_spec(reply, &ss);
2235         pa_tagstruct_put_channel_map(reply, &map);
2236
2237         pa_tagstruct_putu32(reply, s->source_output->source->index);
2238         pa_tagstruct_puts(reply, s->source_output->source->name);
2239
2240         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2241     }
2242
2243     if (c->version >= 13)
2244         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2245
2246     pa_pstream_send_tagstruct(c->pstream, reply);
2247 }
2248
2249 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2250     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2251     int ret;
2252
2253     pa_native_connection_assert_ref(c);
2254     pa_assert(t);
2255
2256     if (!pa_tagstruct_eof(t)) {
2257         protocol_error(c);
2258         return;
2259     }
2260
2261     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2262     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2263     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2264
2265     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2266 }
2267
2268 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2269     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2270     const void*cookie;
2271     pa_tagstruct *reply;
2272     pa_bool_t shm_on_remote = FALSE, do_shm;
2273
2274     pa_native_connection_assert_ref(c);
2275     pa_assert(t);
2276
2277     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2278         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2279         !pa_tagstruct_eof(t)) {
2280         protocol_error(c);
2281         return;
2282     }
2283
2284     /* Minimum supported version */
2285     if (c->version < 8) {
2286         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2287         return;
2288     }
2289
2290     /* Starting with protocol version 13 the MSB of the version tag
2291        reflects if shm is available for this pa_native_connection or
2292        not. */
2293     if (c->version >= 13) {
2294         shm_on_remote = !!(c->version & 0x80000000U);
2295         c->version &= 0x7FFFFFFFU;
2296     }
2297
2298     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2299
2300     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2301
2302     if (!c->authorized) {
2303         pa_bool_t success = FALSE;
2304
2305 #ifdef HAVE_CREDS
2306         const pa_creds *creds;
2307
2308         if ((creds = pa_pdispatch_creds(pd))) {
2309             if (creds->uid == getuid())
2310                 success = TRUE;
2311             else if (c->options->auth_group) {
2312                 int r;
2313                 gid_t gid;
2314
2315                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2316                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2317                 else if (gid == creds->gid)
2318                     success = TRUE;
2319
2320                 if (!success) {
2321                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2322                         pa_log_warn("Failed to check group membership.");
2323                     else if (r > 0)
2324                         success = TRUE;
2325                 }
2326             }
2327
2328             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2329                         (unsigned long) creds->uid,
2330                         (unsigned long) creds->gid,
2331                         (int) success);
2332         }
2333 #endif
2334
2335         if (!success && c->options->auth_cookie) {
2336             const uint8_t *ac;
2337
2338             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2339                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2340                     success = TRUE;
2341         }
2342
2343         if (!success) {
2344             pa_log_warn("Denied access to client with invalid authorization data.");
2345             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2346             return;
2347         }
2348
2349         c->authorized = TRUE;
2350         if (c->auth_timeout_event) {
2351             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2352             c->auth_timeout_event = NULL;
2353         }
2354     }
2355
2356     /* Enable shared memory support if possible */
2357     do_shm =
2358         pa_mempool_is_shared(c->protocol->core->mempool) &&
2359         c->is_local;
2360
2361     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2362
2363     if (do_shm)
2364         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2365             do_shm = FALSE;
2366
2367 #ifdef HAVE_CREDS
2368     if (do_shm) {
2369         /* Only enable SHM if both sides are owned by the same
2370          * user. This is a security measure because otherwise data
2371          * private to the user might leak. */
2372
2373         const pa_creds *creds;
2374         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2375             do_shm = FALSE;
2376     }
2377 #endif
2378
2379     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2380     pa_pstream_enable_shm(c->pstream, do_shm);
2381
2382     reply = reply_new(tag);
2383     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2384
2385 #ifdef HAVE_CREDS
2386 {
2387     /* SHM support is only enabled after both sides made sure they are the same user. */
2388
2389     pa_creds ucred;
2390
2391     ucred.uid = getuid();
2392     ucred.gid = getgid();
2393
2394     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2395 }
2396 #else
2397     pa_pstream_send_tagstruct(c->pstream, reply);
2398 #endif
2399 }
2400
2401 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2402     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2403     const char *name = NULL;
2404     pa_proplist *p;
2405     pa_tagstruct *reply;
2406
2407     pa_native_connection_assert_ref(c);
2408     pa_assert(t);
2409
2410     p = pa_proplist_new();
2411
2412     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2413         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2414         !pa_tagstruct_eof(t)) {
2415
2416         protocol_error(c);
2417         pa_proplist_free(p);
2418         return;
2419     }
2420
2421     if (name)
2422         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2423             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2424             pa_proplist_free(p);
2425             return;
2426         }
2427
2428     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2429     pa_proplist_free(p);
2430
2431     reply = reply_new(tag);
2432
2433     if (c->version >= 13)
2434         pa_tagstruct_putu32(reply, c->client->index);
2435
2436     pa_pstream_send_tagstruct(c->pstream, reply);
2437 }
2438
2439 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2440     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2441     const char *name;
2442     uint32_t idx = PA_IDXSET_INVALID;
2443
2444     pa_native_connection_assert_ref(c);
2445     pa_assert(t);
2446
2447     if (pa_tagstruct_gets(t, &name) < 0 ||
2448         !pa_tagstruct_eof(t)) {
2449         protocol_error(c);
2450         return;
2451     }
2452
2453     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2454     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2455
2456     if (command == PA_COMMAND_LOOKUP_SINK) {
2457         pa_sink *sink;
2458         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2459             idx = sink->index;
2460     } else {
2461         pa_source *source;
2462         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2463         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2464             idx = source->index;
2465     }
2466
2467     if (idx == PA_IDXSET_INVALID)
2468         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2469     else {
2470         pa_tagstruct *reply;
2471         reply = reply_new(tag);
2472         pa_tagstruct_putu32(reply, idx);
2473         pa_pstream_send_tagstruct(c->pstream, reply);
2474     }
2475 }
2476
2477 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2478     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2479     uint32_t idx;
2480     playback_stream *s;
2481
2482     pa_native_connection_assert_ref(c);
2483     pa_assert(t);
2484
2485     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2486         !pa_tagstruct_eof(t)) {
2487         protocol_error(c);
2488         return;
2489     }
2490
2491     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2492     s = pa_idxset_get_by_index(c->output_streams, idx);
2493     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2494     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2495
2496     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2497 }
2498
2499 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2500     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2501     pa_tagstruct *reply;
2502     const pa_mempool_stat *stat;
2503
2504     pa_native_connection_assert_ref(c);
2505     pa_assert(t);
2506
2507     if (!pa_tagstruct_eof(t)) {
2508         protocol_error(c);
2509         return;
2510     }
2511
2512     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2513
2514     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2515
2516     reply = reply_new(tag);
2517     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2518     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2519     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2520     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2521     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2522     pa_pstream_send_tagstruct(c->pstream, reply);
2523 }
2524
2525 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2526     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2527     pa_tagstruct *reply;
2528     playback_stream *s;
2529     struct timeval tv, now;
2530     uint32_t idx;
2531
2532     pa_native_connection_assert_ref(c);
2533     pa_assert(t);
2534
2535     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2536         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2537         !pa_tagstruct_eof(t)) {
2538         protocol_error(c);
2539         return;
2540     }
2541
2542     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2543     s = pa_idxset_get_by_index(c->output_streams, idx);
2544     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2545     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2546
2547     /* Get an atomic snapshot of all timing parameters */
2548     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2549
2550     reply = reply_new(tag);
2551     pa_tagstruct_put_usec(reply,
2552                           s->current_sink_latency +
2553                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2554     pa_tagstruct_put_usec(reply, 0);
2555     pa_tagstruct_put_boolean(reply,
2556                              s->playing_for > 0 &&
2557                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2558                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2559     pa_tagstruct_put_timeval(reply, &tv);
2560     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2561     pa_tagstruct_puts64(reply, s->write_index);
2562     pa_tagstruct_puts64(reply, s->read_index);
2563
2564     if (c->version >= 13) {
2565         pa_tagstruct_putu64(reply, s->underrun_for);
2566         pa_tagstruct_putu64(reply, s->playing_for);
2567     }
2568
2569     pa_pstream_send_tagstruct(c->pstream, reply);
2570 }
2571
2572 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2573     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2574     pa_tagstruct *reply;
2575     record_stream *s;
2576     struct timeval tv, now;
2577     uint32_t idx;
2578
2579     pa_native_connection_assert_ref(c);
2580     pa_assert(t);
2581
2582     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2583         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2584         !pa_tagstruct_eof(t)) {
2585         protocol_error(c);
2586         return;
2587     }
2588
2589     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2590     s = pa_idxset_get_by_index(c->record_streams, idx);
2591     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2592
2593     /* Get an atomic snapshot of all timing parameters */
2594     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2595
2596     reply = reply_new(tag);
2597     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2598     pa_tagstruct_put_usec(reply,
2599                           s->current_source_latency +
2600                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2601     pa_tagstruct_put_boolean(reply,
2602                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2603                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2604     pa_tagstruct_put_timeval(reply, &tv);
2605     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2606     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2607     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2608     pa_pstream_send_tagstruct(c->pstream, reply);
2609 }
2610
2611 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2612     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2613     upload_stream *s;
2614     uint32_t length;
2615     const char *name = NULL;
2616     pa_sample_spec ss;
2617     pa_channel_map map;
2618     pa_tagstruct *reply;
2619     pa_proplist *p;
2620
2621     pa_native_connection_assert_ref(c);
2622     pa_assert(t);
2623
2624     if (pa_tagstruct_gets(t, &name) < 0 ||
2625         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2626         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2627         pa_tagstruct_getu32(t, &length) < 0) {
2628         protocol_error(c);
2629         return;
2630     }
2631
2632     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2633     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2634     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2635     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2636     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2637     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2638
2639     p = pa_proplist_new();
2640
2641     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2642         !pa_tagstruct_eof(t)) {
2643
2644         protocol_error(c);
2645         pa_proplist_free(p);
2646         return;
2647     }
2648
2649     if (c->version < 13)
2650         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2651     else if (!name)
2652         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2653             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2654
2655     if (!name || !pa_namereg_is_valid_name(name)) {
2656         pa_proplist_free(p);
2657         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2658     }
2659
2660     s = upload_stream_new(c, &ss, &map, name, length, p);
2661     pa_proplist_free(p);
2662
2663     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2664
2665     reply = reply_new(tag);
2666     pa_tagstruct_putu32(reply, s->index);
2667     pa_tagstruct_putu32(reply, length);
2668     pa_pstream_send_tagstruct(c->pstream, reply);
2669 }
2670
2671 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2672     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2673     uint32_t channel;
2674     upload_stream *s;
2675     uint32_t idx;
2676
2677     pa_native_connection_assert_ref(c);
2678     pa_assert(t);
2679
2680     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2681         !pa_tagstruct_eof(t)) {
2682         protocol_error(c);
2683         return;
2684     }
2685
2686     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2687
2688     s = pa_idxset_get_by_index(c->output_streams, channel);
2689     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2690     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2691
2692     if (!s->memchunk.memblock)
2693         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2694     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2695         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2696     else
2697         pa_pstream_send_simple_ack(c->pstream, tag);
2698
2699     upload_stream_unlink(s);
2700 }
2701
2702 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2703     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2704     uint32_t sink_index;
2705     pa_volume_t volume;
2706     pa_sink *sink;
2707     const char *name, *sink_name;
2708     uint32_t idx;
2709     pa_proplist *p;
2710     pa_tagstruct *reply;
2711
2712     pa_native_connection_assert_ref(c);
2713     pa_assert(t);
2714
2715     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2716
2717     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2718         pa_tagstruct_gets(t, &sink_name) < 0 ||
2719         pa_tagstruct_getu32(t, &volume) < 0 ||
2720         pa_tagstruct_gets(t, &name) < 0) {
2721         protocol_error(c);
2722         return;
2723     }
2724
2725     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2726     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2727     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2728     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2729
2730     if (sink_index != PA_INVALID_INDEX)
2731         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2732     else
2733         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2734
2735     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2736
2737     p = pa_proplist_new();
2738
2739     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2740         !pa_tagstruct_eof(t)) {
2741         protocol_error(c);
2742         pa_proplist_free(p);
2743         return;
2744     }
2745
2746     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2747
2748     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2749         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2750         pa_proplist_free(p);
2751         return;
2752     }
2753
2754     pa_proplist_free(p);
2755
2756     reply = reply_new(tag);
2757
2758     if (c->version >= 13)
2759         pa_tagstruct_putu32(reply, idx);
2760
2761     pa_pstream_send_tagstruct(c->pstream, reply);
2762 }
2763
2764 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2765     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2766     const char *name;
2767
2768     pa_native_connection_assert_ref(c);
2769     pa_assert(t);
2770
2771     if (pa_tagstruct_gets(t, &name) < 0 ||
2772         !pa_tagstruct_eof(t)) {
2773         protocol_error(c);
2774         return;
2775     }
2776
2777     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2778     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2779
2780     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2781         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2782         return;
2783     }
2784
2785     pa_pstream_send_simple_ack(c->pstream, tag);
2786 }
2787
2788 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2789     pa_assert(c);
2790     pa_assert(fixed);
2791     pa_assert(original);
2792
2793     *fixed = *original;
2794
2795     if (c->version < 12) {
2796         /* Before protocol version 12 we didn't support S32 samples,
2797          * so we need to lie about this to the client */
2798
2799         if (fixed->format == PA_SAMPLE_S32LE)
2800             fixed->format = PA_SAMPLE_FLOAT32LE;
2801         if (fixed->format == PA_SAMPLE_S32BE)
2802             fixed->format = PA_SAMPLE_FLOAT32BE;
2803     }
2804
2805     if (c->version < 15) {
2806         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2807             fixed->format = PA_SAMPLE_FLOAT32LE;
2808         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2809             fixed->format = PA_SAMPLE_FLOAT32BE;
2810     }
2811 }
2812
2813 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2814     pa_sample_spec fixed_ss;
2815
2816     pa_assert(t);
2817     pa_sink_assert_ref(sink);
2818
2819     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2820
2821     pa_tagstruct_put(
2822         t,
2823         PA_TAG_U32, sink->index,
2824         PA_TAG_STRING, sink->name,
2825         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2826         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2827         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2828         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2829         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE, FALSE),
2830         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2831         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2832         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2833         PA_TAG_USEC, pa_sink_get_latency(sink),
2834         PA_TAG_STRING, sink->driver,
2835         PA_TAG_U32, sink->flags,
2836         PA_TAG_INVALID);
2837
2838     if (c->version >= 13) {
2839         pa_tagstruct_put_proplist(t, sink->proplist);
2840         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2841     }
2842
2843     if (c->version >= 15) {
2844         pa_tagstruct_put_volume(t, sink->base_volume);
2845         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2846             pa_log_error("Internal sink state is invalid.");
2847         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2848         pa_tagstruct_putu32(t, sink->n_volume_steps);
2849         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2850     }
2851
2852     if (c->version >= 16) {
2853         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
2854
2855         if (sink->ports) {
2856             void *state;
2857             pa_device_port *p;
2858
2859             PA_HASHMAP_FOREACH(p, sink->ports, state) {
2860                 pa_tagstruct_puts(t, p->name);
2861                 pa_tagstruct_puts(t, p->description);
2862                 pa_tagstruct_putu32(t, p->priority);
2863             }
2864         }
2865
2866         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
2867     }
2868 }
2869
2870 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2871     pa_sample_spec fixed_ss;
2872
2873     pa_assert(t);
2874     pa_source_assert_ref(source);
2875
2876     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2877
2878     pa_tagstruct_put(
2879         t,
2880         PA_TAG_U32, source->index,
2881         PA_TAG_STRING, source->name,
2882         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2883         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2884         PA_TAG_CHANNEL_MAP, &source->channel_map,
2885         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2886         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2887         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2888         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2889         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2890         PA_TAG_USEC, pa_source_get_latency(source),
2891         PA_TAG_STRING, source->driver,
2892         PA_TAG_U32, source->flags,
2893         PA_TAG_INVALID);
2894
2895     if (c->version >= 13) {
2896         pa_tagstruct_put_proplist(t, source->proplist);
2897         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2898     }
2899
2900     if (c->version >= 15) {
2901         pa_tagstruct_put_volume(t, source->base_volume);
2902         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2903             pa_log_error("Internal source state is invalid.");
2904         pa_tagstruct_putu32(t, pa_source_get_state(source));
2905         pa_tagstruct_putu32(t, source->n_volume_steps);
2906         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2907     }
2908
2909     if (c->version >= 16) {
2910
2911         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
2912
2913         if (source->ports) {
2914             void *state;
2915             pa_device_port *p;
2916
2917             PA_HASHMAP_FOREACH(p, source->ports, state) {
2918                 pa_tagstruct_puts(t, p->name);
2919                 pa_tagstruct_puts(t, p->description);
2920                 pa_tagstruct_putu32(t, p->priority);
2921             }
2922         }
2923
2924         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
2925     }
2926 }
2927
2928 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2929     pa_assert(t);
2930     pa_assert(client);
2931
2932     pa_tagstruct_putu32(t, client->index);
2933     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2934     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2935     pa_tagstruct_puts(t, client->driver);
2936
2937     if (c->version >= 13)
2938         pa_tagstruct_put_proplist(t, client->proplist);
2939 }
2940
2941 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2942     void *state = NULL;
2943     pa_card_profile *p;
2944
2945     pa_assert(t);
2946     pa_assert(card);
2947
2948     pa_tagstruct_putu32(t, card->index);
2949     pa_tagstruct_puts(t, card->name);
2950     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2951     pa_tagstruct_puts(t, card->driver);
2952
2953     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2954
2955     if (card->profiles) {
2956         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2957             pa_tagstruct_puts(t, p->name);
2958             pa_tagstruct_puts(t, p->description);
2959             pa_tagstruct_putu32(t, p->n_sinks);
2960             pa_tagstruct_putu32(t, p->n_sources);
2961             pa_tagstruct_putu32(t, p->priority);
2962         }
2963     }
2964
2965     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2966     pa_tagstruct_put_proplist(t, card->proplist);
2967 }
2968
2969 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2970     pa_assert(t);
2971     pa_assert(module);
2972
2973     pa_tagstruct_putu32(t, module->index);
2974     pa_tagstruct_puts(t, module->name);
2975     pa_tagstruct_puts(t, module->argument);
2976     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2977
2978     if (c->version < 15)
2979         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2980
2981     if (c->version >= 15)
2982         pa_tagstruct_put_proplist(t, module->proplist);
2983 }
2984
2985 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2986     pa_sample_spec fixed_ss;
2987     pa_usec_t sink_latency;
2988     pa_cvolume v;
2989
2990     pa_assert(t);
2991     pa_sink_input_assert_ref(s);
2992
2993     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2994
2995     pa_tagstruct_putu32(t, s->index);
2996     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2997     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2998     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2999     pa_tagstruct_putu32(t, s->sink->index);
3000     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3001     pa_tagstruct_put_channel_map(t, &s->channel_map);
3002     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s, &v, TRUE));
3003     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3004     pa_tagstruct_put_usec(t, sink_latency);
3005     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3006     pa_tagstruct_puts(t, s->driver);
3007     if (c->version >= 11)
3008         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3009     if (c->version >= 13)
3010         pa_tagstruct_put_proplist(t, s->proplist);
3011 }
3012
3013 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3014     pa_sample_spec fixed_ss;
3015     pa_usec_t source_latency;
3016
3017     pa_assert(t);
3018     pa_source_output_assert_ref(s);
3019
3020     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3021
3022     pa_tagstruct_putu32(t, s->index);
3023     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3024     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3025     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3026     pa_tagstruct_putu32(t, s->source->index);
3027     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3028     pa_tagstruct_put_channel_map(t, &s->channel_map);
3029     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3030     pa_tagstruct_put_usec(t, source_latency);
3031     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3032     pa_tagstruct_puts(t, s->driver);
3033
3034     if (c->version >= 13)
3035         pa_tagstruct_put_proplist(t, s->proplist);
3036 }
3037
3038 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3039     pa_sample_spec fixed_ss;
3040     pa_cvolume v;
3041
3042     pa_assert(t);
3043     pa_assert(e);
3044
3045     if (e->memchunk.memblock)
3046         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3047     else
3048         memset(&fixed_ss, 0, sizeof(fixed_ss));
3049
3050     pa_tagstruct_putu32(t, e->index);
3051     pa_tagstruct_puts(t, e->name);
3052
3053     if (e->volume_is_set)
3054         v = e->volume;
3055     else
3056         pa_cvolume_init(&v);
3057
3058     pa_tagstruct_put_cvolume(t, &v);
3059     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3060     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3061     pa_tagstruct_put_channel_map(t, &e->channel_map);
3062     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3063     pa_tagstruct_put_boolean(t, e->lazy);
3064     pa_tagstruct_puts(t, e->filename);
3065
3066     if (c->version >= 13)
3067         pa_tagstruct_put_proplist(t, e->proplist);
3068 }
3069
3070 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3071     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3072     uint32_t idx;
3073     pa_sink *sink = NULL;
3074     pa_source *source = NULL;
3075     pa_client *client = NULL;
3076     pa_card *card = NULL;
3077     pa_module *module = NULL;
3078     pa_sink_input *si = NULL;
3079     pa_source_output *so = NULL;
3080     pa_scache_entry *sce = NULL;
3081     const char *name = NULL;
3082     pa_tagstruct *reply;
3083
3084     pa_native_connection_assert_ref(c);
3085     pa_assert(t);
3086
3087     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3088         (command != PA_COMMAND_GET_CLIENT_INFO &&
3089          command != PA_COMMAND_GET_MODULE_INFO &&
3090          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3091          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3092          pa_tagstruct_gets(t, &name) < 0) ||
3093         !pa_tagstruct_eof(t)) {
3094         protocol_error(c);
3095         return;
3096     }
3097
3098     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3099     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3100     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3101     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3102     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3103
3104     if (command == PA_COMMAND_GET_SINK_INFO) {
3105         if (idx != PA_INVALID_INDEX)
3106             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3107         else
3108             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3109     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3110         if (idx != PA_INVALID_INDEX)
3111             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3112         else
3113             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3114     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3115         if (idx != PA_INVALID_INDEX)
3116             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3117         else
3118             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3119     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3120         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3121     else if (command == PA_COMMAND_GET_MODULE_INFO)
3122         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3123     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3124         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3125     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3126         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3127     else {
3128         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3129         if (idx != PA_INVALID_INDEX)
3130             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3131         else
3132             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3133     }
3134
3135     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3136         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3137         return;
3138     }
3139
3140     reply = reply_new(tag);
3141     if (sink)
3142         sink_fill_tagstruct(c, reply, sink);
3143     else if (source)
3144         source_fill_tagstruct(c, reply, source);
3145     else if (client)
3146         client_fill_tagstruct(c, reply, client);
3147     else if (card)
3148         card_fill_tagstruct(c, reply, card);
3149     else if (module)
3150         module_fill_tagstruct(c, reply, module);
3151     else if (si)
3152         sink_input_fill_tagstruct(c, reply, si);
3153     else if (so)
3154         source_output_fill_tagstruct(c, reply, so);
3155     else
3156         scache_fill_tagstruct(c, reply, sce);
3157     pa_pstream_send_tagstruct(c->pstream, reply);
3158 }
3159
3160 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3161     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3162     pa_idxset *i;
3163     uint32_t idx;
3164     void *p;
3165     pa_tagstruct *reply;
3166
3167     pa_native_connection_assert_ref(c);
3168     pa_assert(t);
3169
3170     if (!pa_tagstruct_eof(t)) {
3171         protocol_error(c);
3172         return;
3173     }
3174
3175     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3176
3177     reply = reply_new(tag);
3178
3179     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3180         i = c->protocol->core->sinks;
3181     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3182         i = c->protocol->core->sources;
3183     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3184         i = c->protocol->core->clients;
3185     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3186         i = c->protocol->core->cards;
3187     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3188         i = c->protocol->core->modules;
3189     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3190         i = c->protocol->core->sink_inputs;
3191     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3192         i = c->protocol->core->source_outputs;
3193     else {
3194         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3195         i = c->protocol->core->scache;
3196     }
3197
3198     if (i) {
3199         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3200             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3201                 sink_fill_tagstruct(c, reply, p);
3202             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3203                 source_fill_tagstruct(c, reply, p);
3204             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3205                 client_fill_tagstruct(c, reply, p);
3206             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3207                 card_fill_tagstruct(c, reply, p);
3208             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3209                 module_fill_tagstruct(c, reply, p);
3210             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3211                 sink_input_fill_tagstruct(c, reply, p);
3212             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3213                 source_output_fill_tagstruct(c, reply, p);
3214             else {
3215                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3216                 scache_fill_tagstruct(c, reply, p);
3217             }
3218         }
3219     }
3220
3221     pa_pstream_send_tagstruct(c->pstream, reply);
3222 }
3223
3224 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3225     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3226     pa_tagstruct *reply;
3227     pa_sink *def_sink;
3228     pa_source *def_source;
3229     pa_sample_spec fixed_ss;
3230     char *h, *u;
3231
3232     pa_native_connection_assert_ref(c);
3233     pa_assert(t);
3234
3235     if (!pa_tagstruct_eof(t)) {
3236         protocol_error(c);
3237         return;
3238     }
3239
3240     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3241
3242     reply = reply_new(tag);
3243     pa_tagstruct_puts(reply, PACKAGE_NAME);
3244     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3245
3246     u = pa_get_user_name_malloc();
3247     pa_tagstruct_puts(reply, u);
3248     pa_xfree(u);
3249
3250     h = pa_get_host_name_malloc();
3251     pa_tagstruct_puts(reply, h);
3252     pa_xfree(h);
3253
3254     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3255     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3256
3257     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3258     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3259     def_source = pa_namereg_get_default_source(c->protocol->core);
3260     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3261
3262     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3263
3264     if (c->version >= 15)
3265         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3266
3267     pa_pstream_send_tagstruct(c->pstream, reply);
3268 }
3269
3270 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3271     pa_tagstruct *t;
3272     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3273
3274     pa_native_connection_assert_ref(c);
3275
3276     t = pa_tagstruct_new(NULL, 0);
3277     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3278     pa_tagstruct_putu32(t, (uint32_t) -1);
3279     pa_tagstruct_putu32(t, e);
3280     pa_tagstruct_putu32(t, idx);
3281     pa_pstream_send_tagstruct(c->pstream, t);
3282 }
3283
3284 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3285     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3286     pa_subscription_mask_t m;
3287
3288     pa_native_connection_assert_ref(c);
3289     pa_assert(t);
3290
3291     if (pa_tagstruct_getu32(t, &m) < 0 ||
3292         !pa_tagstruct_eof(t)) {
3293         protocol_error(c);
3294         return;
3295     }
3296
3297     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3298     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3299
3300     if (c->subscription)
3301         pa_subscription_free(c->subscription);
3302
3303     if (m != 0) {
3304         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3305         pa_assert(c->subscription);
3306     } else
3307         c->subscription = NULL;
3308
3309     pa_pstream_send_simple_ack(c->pstream, tag);
3310 }
3311
3312 static void command_set_volume(
3313         pa_pdispatch *pd,
3314         uint32_t command,
3315         uint32_t tag,
3316         pa_tagstruct *t,
3317         void *userdata) {
3318
3319     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3320     uint32_t idx;
3321     pa_cvolume volume;
3322     pa_sink *sink = NULL;
3323     pa_source *source = NULL;
3324     pa_sink_input *si = NULL;
3325     const char *name = NULL;
3326     const char *client_name;
3327
3328     pa_native_connection_assert_ref(c);
3329     pa_assert(t);
3330
3331     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3332         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3333         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3334         pa_tagstruct_get_cvolume(t, &volume) ||
3335         !pa_tagstruct_eof(t)) {
3336         protocol_error(c);
3337         return;
3338     }
3339
3340     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3341     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3342     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3343     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3344     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3345     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3346
3347     switch (command) {
3348
3349         case PA_COMMAND_SET_SINK_VOLUME:
3350             if (idx != PA_INVALID_INDEX)
3351                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3352             else
3353                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3354             break;
3355
3356         case PA_COMMAND_SET_SOURCE_VOLUME:
3357             if (idx != PA_INVALID_INDEX)
3358                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3359             else
3360                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3361             break;
3362
3363         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3364             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3365             break;
3366
3367         default:
3368             pa_assert_not_reached();
3369     }
3370
3371     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3372
3373     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3374
3375     if (sink) {
3376         pa_log("Client %s changes volume of sink %s.", client_name, sink->name);
3377         pa_sink_set_volume(sink, &volume, TRUE, TRUE, TRUE, TRUE);
3378     } else if (source) {
3379         pa_log("Client %s changes volume of sink %s.", client_name, source->name);
3380         pa_source_set_volume(source, &volume, TRUE);
3381     } else if (si) {
3382         pa_log("Client %s changes volume of sink %s.",
3383                      client_name,
3384                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3385         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3386     }
3387
3388     pa_pstream_send_simple_ack(c->pstream, tag);
3389 }
3390
3391 static void command_set_mute(
3392         pa_pdispatch *pd,
3393         uint32_t command,
3394         uint32_t tag,
3395         pa_tagstruct *t,
3396         void *userdata) {
3397
3398     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3399     uint32_t idx;
3400     pa_bool_t mute;
3401     pa_sink *sink = NULL;
3402     pa_source *source = NULL;
3403     pa_sink_input *si = NULL;
3404     const char *name = NULL;
3405
3406     pa_native_connection_assert_ref(c);
3407     pa_assert(t);
3408
3409     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3410         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3411         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3412         pa_tagstruct_get_boolean(t, &mute) ||
3413         !pa_tagstruct_eof(t)) {
3414         protocol_error(c);
3415         return;
3416     }
3417
3418     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3419     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3420     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3421     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3422     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3423
3424     switch (command) {
3425
3426         case PA_COMMAND_SET_SINK_MUTE:
3427
3428             if (idx != PA_INVALID_INDEX)
3429                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3430             else
3431                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3432
3433             break;
3434
3435         case PA_COMMAND_SET_SOURCE_MUTE:
3436             if (idx != PA_INVALID_INDEX)
3437                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3438             else
3439                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3440
3441             break;
3442
3443         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3444             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3445             break;
3446
3447         default:
3448             pa_assert_not_reached();
3449     }
3450
3451     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3452
3453     if (sink)
3454         pa_sink_set_mute(sink, mute, TRUE);
3455     else if (source)
3456         pa_source_set_mute(source, mute, TRUE);
3457     else if (si)
3458         pa_sink_input_set_mute(si, mute, TRUE);
3459
3460     pa_pstream_send_simple_ack(c->pstream, tag);
3461 }
3462
3463 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3464     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3465     uint32_t idx;
3466     pa_bool_t b;
3467     playback_stream *s;
3468
3469     pa_native_connection_assert_ref(c);
3470     pa_assert(t);
3471
3472     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3473         pa_tagstruct_get_boolean(t, &b) < 0 ||
3474         !pa_tagstruct_eof(t)) {
3475         protocol_error(c);
3476         return;
3477     }
3478
3479     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3480     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3481     s = pa_idxset_get_by_index(c->output_streams, idx);
3482     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3483     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3484
3485     pa_sink_input_cork(s->sink_input, b);
3486
3487     if (b)
3488         s->is_underrun = TRUE;
3489
3490     pa_pstream_send_simple_ack(c->pstream, tag);
3491 }
3492
3493 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3494     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3495     uint32_t idx;
3496     playback_stream *s;
3497
3498     pa_native_connection_assert_ref(c);
3499     pa_assert(t);
3500
3501     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3502         !pa_tagstruct_eof(t)) {
3503         protocol_error(c);
3504         return;
3505     }
3506
3507     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3508     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3509     s = pa_idxset_get_by_index(c->output_streams, idx);
3510     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3511     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3512
3513     switch (command) {
3514         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3515             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3516             break;
3517
3518         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3519             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3520             break;
3521
3522         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3523             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3524             break;
3525
3526         default:
3527             pa_assert_not_reached();
3528     }
3529
3530     pa_pstream_send_simple_ack(c->pstream, tag);
3531 }
3532
3533 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3534     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3535     uint32_t idx;
3536     record_stream *s;
3537     pa_bool_t b;
3538
3539     pa_native_connection_assert_ref(c);
3540     pa_assert(t);
3541
3542     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3543         pa_tagstruct_get_boolean(t, &b) < 0 ||
3544         !pa_tagstruct_eof(t)) {
3545         protocol_error(c);
3546         return;
3547     }
3548
3549     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3550     s = pa_idxset_get_by_index(c->record_streams, idx);
3551     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3552
3553     pa_source_output_cork(s->source_output, b);
3554     pa_memblockq_prebuf_force(s->memblockq);
3555     pa_pstream_send_simple_ack(c->pstream, tag);
3556 }
3557
3558 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3559     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3560     uint32_t idx;
3561     record_stream *s;
3562
3563     pa_native_connection_assert_ref(c);
3564     pa_assert(t);
3565
3566     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3567         !pa_tagstruct_eof(t)) {
3568         protocol_error(c);
3569         return;
3570     }
3571
3572     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3573     s = pa_idxset_get_by_index(c->record_streams, idx);
3574     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3575
3576     pa_memblockq_flush_read(s->memblockq);
3577     pa_pstream_send_simple_ack(c->pstream, tag);
3578 }
3579
3580 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3581     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3582     uint32_t idx;
3583     pa_buffer_attr a;
3584     pa_tagstruct *reply;
3585
3586     pa_native_connection_assert_ref(c);
3587     pa_assert(t);
3588
3589     memset(&a, 0, sizeof(a));
3590
3591     if (pa_tagstruct_getu32(t, &idx) < 0) {
3592         protocol_error(c);
3593         return;
3594     }
3595
3596     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3597
3598     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3599         playback_stream *s;
3600         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3601
3602         s = pa_idxset_get_by_index(c->output_streams, idx);
3603         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3604         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3605
3606         if (pa_tagstruct_get(
3607                     t,
3608                     PA_TAG_U32, &a.maxlength,
3609                     PA_TAG_U32, &a.tlength,
3610                     PA_TAG_U32, &a.prebuf,
3611                     PA_TAG_U32, &a.minreq,
3612                     PA_TAG_INVALID) < 0 ||
3613             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3614             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3615             !pa_tagstruct_eof(t)) {
3616             protocol_error(c);
3617             return;
3618         }
3619
3620         s->adjust_latency = adjust_latency;
3621         s->early_requests = early_requests;
3622         s->buffer_attr = a;
3623
3624         fix_playback_buffer_attr(s);
3625         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3626
3627         reply = reply_new(tag);
3628         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3629         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3630         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3631         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3632
3633         if (c->version >= 13)
3634             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3635
3636     } else {
3637         record_stream *s;
3638         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3639         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3640
3641         s = pa_idxset_get_by_index(c->record_streams, idx);
3642         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3643
3644         if (pa_tagstruct_get(
3645                     t,
3646                     PA_TAG_U32, &a.maxlength,
3647                     PA_TAG_U32, &a.fragsize,
3648                     PA_TAG_INVALID) < 0 ||
3649             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3650             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3651             !pa_tagstruct_eof(t)) {
3652             protocol_error(c);
3653             return;
3654         }
3655
3656         s->adjust_latency = adjust_latency;
3657         s->early_requests = early_requests;
3658         s->buffer_attr = a;
3659
3660         fix_record_buffer_attr_pre(s);
3661         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3662         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3663         fix_record_buffer_attr_post(s);
3664
3665         reply = reply_new(tag);
3666         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3667         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3668
3669         if (c->version >= 13)
3670             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3671     }
3672
3673     pa_pstream_send_tagstruct(c->pstream, reply);
3674 }
3675
3676 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3677     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3678     uint32_t idx;
3679     uint32_t rate;
3680
3681     pa_native_connection_assert_ref(c);
3682     pa_assert(t);
3683
3684     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3685         pa_tagstruct_getu32(t, &rate) < 0 ||
3686         !pa_tagstruct_eof(t)) {
3687         protocol_error(c);
3688         return;
3689     }
3690
3691     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3692     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3693
3694     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3695         playback_stream *s;
3696
3697         s = pa_idxset_get_by_index(c->output_streams, idx);
3698         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3699         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3700
3701         pa_sink_input_set_rate(s->sink_input, rate);
3702
3703     } else {
3704         record_stream *s;
3705         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3706
3707         s = pa_idxset_get_by_index(c->record_streams, idx);
3708         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3709
3710         pa_source_output_set_rate(s->source_output, rate);
3711     }
3712
3713     pa_pstream_send_simple_ack(c->pstream, tag);
3714 }
3715
3716 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3717     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3718     uint32_t idx;
3719     uint32_t mode;
3720     pa_proplist *p;
3721
3722     pa_native_connection_assert_ref(c);
3723     pa_assert(t);
3724
3725     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3726
3727     p = pa_proplist_new();
3728
3729     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3730
3731         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3732             pa_tagstruct_get_proplist(t, p) < 0 ||
3733             !pa_tagstruct_eof(t)) {
3734             protocol_error(c);
3735             pa_proplist_free(p);
3736             return;
3737         }
3738
3739     } else {
3740
3741         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3742             pa_tagstruct_getu32(t, &mode) < 0 ||
3743             pa_tagstruct_get_proplist(t, p) < 0 ||
3744             !pa_tagstruct_eof(t)) {
3745             protocol_error(c);
3746             pa_proplist_free(p);
3747             return;
3748         }
3749     }
3750
3751     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3752         pa_proplist_free(p);
3753         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3754     }
3755
3756     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3757         playback_stream *s;
3758
3759         s = pa_idxset_get_by_index(c->output_streams, idx);
3760         if (!s || !playback_stream_isinstance(s)) {
3761             pa_proplist_free(p);
3762             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3763         }
3764         pa_sink_input_update_proplist(s->sink_input, mode, p);
3765
3766     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3767         record_stream *s;
3768
3769         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3770             pa_proplist_free(p);
3771             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3772         }
3773         pa_source_output_update_proplist(s->source_output, mode, p);
3774
3775     } else {
3776         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3777
3778         pa_client_update_proplist(c->client, mode, p);
3779     }
3780
3781     pa_pstream_send_simple_ack(c->pstream, tag);
3782     pa_proplist_free(p);
3783 }
3784
3785 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3786     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3787     uint32_t idx;
3788     unsigned changed = 0;
3789     pa_proplist *p;
3790     pa_strlist *l = NULL;
3791
3792     pa_native_connection_assert_ref(c);
3793     pa_assert(t);
3794
3795     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3796
3797     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3798
3799         if (pa_tagstruct_getu32(t, &idx) < 0) {
3800             protocol_error(c);
3801             return;
3802         }
3803     }
3804
3805     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3806         playback_stream *s;
3807
3808         s = pa_idxset_get_by_index(c->output_streams, idx);
3809         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3810         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3811
3812         p = s->sink_input->proplist;
3813
3814     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3815         record_stream *s;
3816
3817         s = pa_idxset_get_by_index(c->record_streams, idx);
3818         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3819
3820         p = s->source_output->proplist;
3821     } else {
3822         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3823
3824         p = c->client->proplist;
3825     }
3826
3827     for (;;) {
3828         const char *k;
3829
3830         if (pa_tagstruct_gets(t, &k) < 0) {
3831             protocol_error(c);
3832             pa_strlist_free(l);
3833             return;
3834         }
3835
3836         if (!k)
3837             break;
3838
3839         l = pa_strlist_prepend(l, k);
3840     }
3841
3842     if (!pa_tagstruct_eof(t)) {
3843         protocol_error(c);
3844         pa_strlist_free(l);
3845         return;
3846     }
3847
3848     for (;;) {
3849         char *z;
3850
3851         l = pa_strlist_pop(l, &z);
3852
3853         if (!z)
3854             break;
3855
3856         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3857         pa_xfree(z);
3858     }
3859
3860     pa_pstream_send_simple_ack(c->pstream, tag);
3861
3862     if (changed) {
3863         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3864             playback_stream *s;
3865
3866             s = pa_idxset_get_by_index(c->output_streams, idx);
3867             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3868
3869         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3870             record_stream *s;
3871
3872             s = pa_idxset_get_by_index(c->record_streams, idx);
3873             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3874
3875         } else {
3876             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3877             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3878         }
3879     }
3880 }
3881
3882 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3883     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3884     const char *s;
3885
3886     pa_native_connection_assert_ref(c);
3887     pa_assert(t);
3888
3889     if (pa_tagstruct_gets(t, &s) < 0 ||
3890         !pa_tagstruct_eof(t)) {
3891         protocol_error(c);
3892         return;
3893     }
3894
3895     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3896     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3897
3898     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3899         pa_source *source;
3900
3901         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3902         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3903
3904         pa_namereg_set_default_source(c->protocol->core, source);
3905     } else {
3906         pa_sink *sink;
3907         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3908
3909         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3910         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3911
3912         pa_namereg_set_default_sink(c->protocol->core, sink);
3913     }
3914
3915     pa_pstream_send_simple_ack(c->pstream, tag);
3916 }
3917
3918 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3919     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3920     uint32_t idx;
3921     const char *name;
3922
3923     pa_native_connection_assert_ref(c);
3924     pa_assert(t);
3925
3926     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3927         pa_tagstruct_gets(t, &name) < 0 ||
3928         !pa_tagstruct_eof(t)) {
3929         protocol_error(c);
3930         return;
3931     }
3932
3933     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3934     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3935
3936     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3937         playback_stream *s;
3938
3939         s = pa_idxset_get_by_index(c->output_streams, idx);
3940         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3941         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3942
3943         pa_sink_input_set_name(s->sink_input, name);
3944
3945     } else {
3946         record_stream *s;
3947         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3948
3949         s = pa_idxset_get_by_index(c->record_streams, idx);
3950         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3951
3952         pa_source_output_set_name(s->source_output, name);
3953     }
3954
3955     pa_pstream_send_simple_ack(c->pstream, tag);
3956 }
3957
3958 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3959     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3960     uint32_t idx;
3961
3962     pa_native_connection_assert_ref(c);
3963     pa_assert(t);
3964
3965     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3966         !pa_tagstruct_eof(t)) {
3967         protocol_error(c);
3968         return;
3969     }
3970
3971     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3972
3973     if (command == PA_COMMAND_KILL_CLIENT) {
3974         pa_client *client;
3975
3976         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3977         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3978
3979         pa_native_connection_ref(c);
3980         pa_client_kill(client);
3981
3982     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3983         pa_sink_input *s;
3984
3985         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3986         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3987
3988         pa_native_connection_ref(c);
3989         pa_sink_input_kill(s);
3990     } else {
3991         pa_source_output *s;
3992
3993         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
3994
3995         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3996         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3997
3998         pa_native_connection_ref(c);
3999         pa_source_output_kill(s);
4000     }
4001
4002     pa_pstream_send_simple_ack(c->pstream, tag);
4003     pa_native_connection_unref(c);
4004 }
4005
4006 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4007     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4008     pa_module *m;
4009     const char *name, *argument;
4010     pa_tagstruct *reply;
4011
4012     pa_native_connection_assert_ref(c);
4013     pa_assert(t);
4014
4015     if (pa_tagstruct_gets(t, &name) < 0 ||
4016         pa_tagstruct_gets(t, &argument) < 0 ||
4017         !pa_tagstruct_eof(t)) {
4018         protocol_error(c);
4019         return;
4020     }
4021
4022     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4023     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4024     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4025
4026     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4027         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4028         return;
4029     }
4030
4031     reply = reply_new(tag);
4032     pa_tagstruct_putu32(reply, m->index);
4033     pa_pstream_send_tagstruct(c->pstream, reply);
4034 }
4035
4036 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4037     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4038     uint32_t idx;
4039     pa_module *m;
4040
4041     pa_native_connection_assert_ref(c);
4042     pa_assert(t);
4043
4044     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4045         !pa_tagstruct_eof(t)) {
4046         protocol_error(c);
4047         return;
4048     }
4049
4050     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4051     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4052     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4053
4054     pa_module_unload_request(m, FALSE);
4055     pa_pstream_send_simple_ack(c->pstream, tag);
4056 }
4057
4058 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4059     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4060     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4061     const char *name_device = NULL;
4062
4063     pa_native_connection_assert_ref(c);
4064     pa_assert(t);
4065
4066     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4067         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4068         pa_tagstruct_gets(t, &name_device) < 0 ||
4069         !pa_tagstruct_eof(t)) {
4070         protocol_error(c);
4071         return;
4072     }
4073
4074     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4075     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4076
4077     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4078     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4079     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4080     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4081
4082     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4083         pa_sink_input *si = NULL;
4084         pa_sink *sink = NULL;
4085
4086         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4087
4088         if (idx_device != PA_INVALID_INDEX)
4089             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4090         else
4091             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4092
4093         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4094
4095         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4096             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4097             return;
4098         }
4099     } else {
4100         pa_source_output *so = NULL;
4101         pa_source *source;
4102
4103         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4104
4105         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4106
4107         if (idx_device != PA_INVALID_INDEX)
4108             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4109         else
4110             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4111
4112         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4113
4114         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4115             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4116             return;
4117         }
4118     }
4119
4120     pa_pstream_send_simple_ack(c->pstream, tag);
4121 }
4122
4123 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4124     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4125     uint32_t idx = PA_INVALID_INDEX;
4126     const char *name = NULL;
4127     pa_bool_t b;
4128
4129     pa_native_connection_assert_ref(c);
4130     pa_assert(t);
4131
4132     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4133         pa_tagstruct_gets(t, &name) < 0 ||
4134         pa_tagstruct_get_boolean(t, &b) < 0 ||
4135         !pa_tagstruct_eof(t)) {
4136         protocol_error(c);
4137         return;
4138     }
4139
4140     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4141     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4142     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4143     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4144     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4145
4146     if (command == PA_COMMAND_SUSPEND_SINK) {
4147
4148         if (idx == PA_INVALID_INDEX && name && !*name) {
4149
4150             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4151
4152             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4153                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4154                 return;
4155             }
4156         } else {
4157             pa_sink *sink = NULL;
4158
4159             if (idx != PA_INVALID_INDEX)
4160                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4161             else
4162                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4163
4164             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4165
4166             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4167                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4168                 return;
4169             }
4170         }
4171     } else {
4172
4173         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4174
4175         if (idx == PA_INVALID_INDEX && name && !*name) {
4176
4177             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4178
4179             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4180                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4181                 return;
4182             }
4183
4184         } else {
4185             pa_source *source;
4186
4187             if (idx != PA_INVALID_INDEX)
4188                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4189             else
4190                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4191
4192             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4193
4194             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4195                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4196                 return;
4197             }
4198         }
4199     }
4200
4201     pa_pstream_send_simple_ack(c->pstream, tag);
4202 }
4203
4204 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4205     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4206     uint32_t idx = PA_INVALID_INDEX;
4207     const char *name = NULL;
4208     pa_module *m;
4209     pa_native_protocol_ext_cb_t cb;
4210
4211     pa_native_connection_assert_ref(c);
4212     pa_assert(t);
4213
4214     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4215         pa_tagstruct_gets(t, &name) < 0) {
4216         protocol_error(c);
4217         return;
4218     }
4219
4220     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4221     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4222     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4223     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4224     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4225
4226     if (idx != PA_INVALID_INDEX)
4227         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4228     else {
4229         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4230             if (strcmp(name, m->name) == 0)
4231                 break;
4232     }
4233
4234     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4235     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4236
4237     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4238     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4239
4240     if (cb(c->protocol, m, c, tag, t) < 0)
4241         protocol_error(c);
4242 }
4243
4244 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4245     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4246     uint32_t idx = PA_INVALID_INDEX;
4247     const char *name = NULL, *profile = NULL;
4248     pa_card *card = NULL;
4249     int ret;
4250
4251     pa_native_connection_assert_ref(c);
4252     pa_assert(t);
4253
4254     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4255         pa_tagstruct_gets(t, &name) < 0 ||
4256         pa_tagstruct_gets(t, &profile) < 0 ||
4257         !pa_tagstruct_eof(t)) {
4258         protocol_error(c);
4259         return;
4260     }
4261
4262     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4263     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4264     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4265     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4266     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4267
4268     if (idx != PA_INVALID_INDEX)
4269         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4270     else
4271         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4272
4273     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4274
4275     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4276         pa_pstream_send_error(c->pstream, tag, -ret);
4277         return;
4278     }
4279
4280     pa_pstream_send_simple_ack(c->pstream, tag);
4281 }
4282
4283 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4284     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4285     uint32_t idx = PA_INVALID_INDEX;
4286     const char *name = NULL, *port = NULL;
4287     int ret;
4288
4289     pa_native_connection_assert_ref(c);
4290     pa_assert(t);
4291
4292     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4293         pa_tagstruct_gets(t, &name) < 0 ||
4294         pa_tagstruct_gets(t, &port) < 0 ||
4295         !pa_tagstruct_eof(t)) {
4296         protocol_error(c);
4297         return;
4298     }
4299
4300     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4301     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4302     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4303     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4304     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4305
4306     if (command == PA_COMMAND_SET_SINK_PORT) {
4307         pa_sink *sink;
4308
4309         if (idx != PA_INVALID_INDEX)
4310             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4311         else
4312             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4313
4314         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4315
4316         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4317             pa_pstream_send_error(c->pstream, tag, -ret);
4318             return;
4319         }
4320     } else {
4321         pa_source *source;
4322
4323         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4324
4325         if (idx != PA_INVALID_INDEX)
4326             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4327         else
4328             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4329
4330         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4331
4332         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4333             pa_pstream_send_error(c->pstream, tag, -ret);
4334             return;
4335         }
4336     }
4337
4338     pa_pstream_send_simple_ack(c->pstream, tag);
4339 }
4340
4341 /*** pstream callbacks ***/
4342
4343 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4344     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4345
4346     pa_assert(p);
4347     pa_assert(packet);
4348     pa_native_connection_assert_ref(c);
4349
4350     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4351         pa_log("invalid packet.");
4352         native_connection_unlink(c);
4353     }
4354 }
4355
4356 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4357     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4358     output_stream *stream;
4359
4360     pa_assert(p);
4361     pa_assert(chunk);
4362     pa_native_connection_assert_ref(c);
4363
4364     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4365         pa_log_debug("Client sent block for invalid stream.");
4366         /* Ignoring */
4367         return;
4368     }
4369
4370 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4371
4372     if (playback_stream_isinstance(stream)) {
4373         playback_stream *ps = PLAYBACK_STREAM(stream);
4374
4375         if (chunk->memblock) {
4376             if (seek != PA_SEEK_RELATIVE || offset != 0)
4377                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4378
4379             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4380         } else
4381             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4382
4383     } else {
4384         upload_stream *u = UPLOAD_STREAM(stream);
4385         size_t l;
4386
4387         if (!u->memchunk.memblock) {
4388             if (u->length == chunk->length && chunk->memblock) {
4389                 u->memchunk = *chunk;
4390                 pa_memblock_ref(u->memchunk.memblock);
4391                 u->length = 0;
4392             } else {
4393                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4394                 u->memchunk.index = u->memchunk.length = 0;
4395             }
4396         }
4397
4398         pa_assert(u->memchunk.memblock);
4399
4400         l = u->length;
4401         if (l > chunk->length)
4402             l = chunk->length;
4403
4404         if (l > 0) {
4405             void *dst;
4406             dst = pa_memblock_acquire(u->memchunk.memblock);
4407
4408             if (chunk->memblock) {
4409                 void *src;
4410                 src = pa_memblock_acquire(chunk->memblock);
4411
4412                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4413                        (uint8_t*) src + chunk->index, l);
4414
4415                 pa_memblock_release(chunk->memblock);
4416             } else
4417                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4418
4419             pa_memblock_release(u->memchunk.memblock);
4420
4421             u->memchunk.length += l;
4422             u->length -= l;
4423         }
4424     }
4425 }
4426
4427 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4428     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4429
4430     pa_assert(p);
4431     pa_native_connection_assert_ref(c);
4432
4433     native_connection_unlink(c);
4434     pa_log_info("Connection died.");
4435 }
4436
4437 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4438     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4439
4440     pa_assert(p);
4441     pa_native_connection_assert_ref(c);
4442
4443     native_connection_send_memblock(c);
4444 }
4445
4446 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4447     pa_thread_mq *q;
4448
4449     if (!(q = pa_thread_mq_get()))
4450         pa_pstream_send_revoke(p, block_id);
4451     else
4452         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4453 }
4454
4455 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4456     pa_thread_mq *q;
4457
4458     if (!(q = pa_thread_mq_get()))
4459         pa_pstream_send_release(p, block_id);
4460     else
4461         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4462 }
4463
4464 /*** client callbacks ***/
4465
4466 static void client_kill_cb(pa_client *c) {
4467     pa_assert(c);
4468
4469     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4470     pa_log_info("Connection killed.");
4471 }
4472
4473 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4474     pa_tagstruct *t;
4475     pa_native_connection *c;
4476
4477     pa_assert(client);
4478     c = PA_NATIVE_CONNECTION(client->userdata);
4479     pa_native_connection_assert_ref(c);
4480
4481     if (c->version < 15)
4482       return;
4483
4484     t = pa_tagstruct_new(NULL, 0);
4485     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4486     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4487     pa_tagstruct_puts(t, event);
4488     pa_tagstruct_put_proplist(t, pl);
4489     pa_pstream_send_tagstruct(c->pstream, t);
4490 }
4491
4492 /*** module entry points ***/
4493
4494 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4495     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4496
4497     pa_assert(m);
4498     pa_native_connection_assert_ref(c);
4499     pa_assert(c->auth_timeout_event == e);
4500
4501     if (!c->authorized) {
4502         native_connection_unlink(c);
4503         pa_log_info("Connection terminated due to authentication timeout.");
4504     }
4505 }
4506
4507 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4508     pa_native_connection *c;
4509     char pname[128];
4510     pa_client *client;
4511     pa_client_new_data data;
4512
4513     pa_assert(p);
4514     pa_assert(io);
4515     pa_assert(o);
4516
4517     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4518         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4519         pa_iochannel_free(io);
4520         return;
4521     }
4522
4523     pa_client_new_data_init(&data);
4524     data.module = o->module;
4525     data.driver = __FILE__;
4526     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4527     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4528     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4529     client = pa_client_new(p->core, &data);
4530     pa_client_new_data_done(&data);
4531
4532     if (!client)
4533         return;
4534
4535     c = pa_msgobject_new(pa_native_connection);
4536     c->parent.parent.free = native_connection_free;
4537     c->parent.process_msg = native_connection_process_msg;
4538     c->protocol = p;
4539     c->options = pa_native_options_ref(o);
4540     c->authorized = FALSE;
4541
4542     if (o->auth_anonymous) {
4543         pa_log_info("Client authenticated anonymously.");
4544         c->authorized = TRUE;
4545     }
4546
4547     if (!c->authorized &&
4548         o->auth_ip_acl &&
4549         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4550
4551         pa_log_info("Client authenticated by IP ACL.");
4552         c->authorized = TRUE;
4553     }
4554
4555     if (!c->authorized)
4556         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4557     else
4558         c->auth_timeout_event = NULL;
4559
4560     c->is_local = pa_iochannel_socket_is_local(io);
4561     c->version = 8;
4562
4563     c->client = client;
4564     c->client->kill = client_kill_cb;
4565     c->client->send_event = client_send_event_cb;
4566     c->client->userdata = c;
4567
4568     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4569     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4570     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4571     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4572     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4573     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4574     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4575
4576     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4577
4578     c->record_streams = pa_idxset_new(NULL, NULL);
4579     c->output_streams = pa_idxset_new(NULL, NULL);
4580
4581     c->rrobin_index = PA_IDXSET_INVALID;
4582     c->subscription = NULL;
4583
4584     pa_idxset_put(p->connections, c, NULL);
4585
4586 #ifdef HAVE_CREDS
4587     if (pa_iochannel_creds_supported(io))
4588         pa_iochannel_creds_enable(io);
4589 #endif
4590
4591     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4592 }
4593
4594 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4595     pa_native_connection *c;
4596     void *state = NULL;
4597
4598     pa_assert(p);
4599     pa_assert(m);
4600
4601     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4602         if (c->options->module == m)
4603             native_connection_unlink(c);
4604 }
4605
4606 static pa_native_protocol* native_protocol_new(pa_core *c) {
4607     pa_native_protocol *p;
4608     pa_native_hook_t h;
4609
4610     pa_assert(c);
4611
4612     p = pa_xnew(pa_native_protocol, 1);
4613     PA_REFCNT_INIT(p);
4614     p->core = c;
4615     p->connections = pa_idxset_new(NULL, NULL);
4616
4617     p->servers = NULL;
4618
4619     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4620
4621     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4622         pa_hook_init(&p->hooks[h], p);
4623
4624     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4625
4626     return p;
4627 }
4628
4629 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4630     pa_native_protocol *p;
4631
4632     if ((p = pa_shared_get(c, "native-protocol")))
4633         return pa_native_protocol_ref(p);
4634
4635     return native_protocol_new(c);
4636 }
4637
4638 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4639     pa_assert(p);
4640     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4641
4642     PA_REFCNT_INC(p);
4643
4644     return p;
4645 }
4646
4647 void pa_native_protocol_unref(pa_native_protocol *p) {
4648     pa_native_connection *c;
4649     pa_native_hook_t h;
4650
4651     pa_assert(p);
4652     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4653
4654     if (PA_REFCNT_DEC(p) > 0)
4655         return;
4656
4657     while ((c = pa_idxset_first(p->connections, NULL)))
4658         native_connection_unlink(c);
4659
4660     pa_idxset_free(p->connections, NULL, NULL);
4661
4662     pa_strlist_free(p->servers);
4663
4664     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4665         pa_hook_done(&p->hooks[h]);
4666
4667     pa_hashmap_free(p->extensions, NULL, NULL);
4668
4669     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4670
4671     pa_xfree(p);
4672 }
4673
4674 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4675     pa_assert(p);
4676     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4677     pa_assert(name);
4678
4679     p->servers = pa_strlist_prepend(p->servers, name);
4680
4681     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4682 }
4683
4684 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4685     pa_assert(p);
4686     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4687     pa_assert(name);
4688
4689     p->servers = pa_strlist_remove(p->servers, name);
4690
4691     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4692 }
4693
4694 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4695     pa_assert(p);
4696     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4697
4698     return p->hooks;
4699 }
4700
4701 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4702     pa_assert(p);
4703     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4704
4705     return p->servers;
4706 }
4707
4708 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4709     pa_assert(p);
4710     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4711     pa_assert(m);
4712     pa_assert(cb);
4713     pa_assert(!pa_hashmap_get(p->extensions, m));
4714
4715     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4716     return 0;
4717 }
4718
4719 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4720     pa_assert(p);
4721     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4722     pa_assert(m);
4723
4724     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4725 }
4726
4727 pa_native_options* pa_native_options_new(void) {
4728     pa_native_options *o;
4729
4730     o = pa_xnew0(pa_native_options, 1);
4731     PA_REFCNT_INIT(o);
4732
4733     return o;
4734 }
4735
4736 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4737     pa_assert(o);
4738     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4739
4740     PA_REFCNT_INC(o);
4741
4742     return o;
4743 }
4744
4745 void pa_native_options_unref(pa_native_options *o) {
4746     pa_assert(o);
4747     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4748
4749     if (PA_REFCNT_DEC(o) > 0)
4750         return;
4751
4752     pa_xfree(o->auth_group);
4753
4754     if (o->auth_ip_acl)
4755         pa_ip_acl_free(o->auth_ip_acl);
4756
4757     if (o->auth_cookie)
4758         pa_auth_cookie_unref(o->auth_cookie);
4759
4760     pa_xfree(o);
4761 }
4762
4763 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4764     pa_bool_t enabled;
4765     const char *acl;
4766
4767     pa_assert(o);
4768     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4769     pa_assert(ma);
4770
4771     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4772         pa_log("auth-anonymous= expects a boolean argument.");
4773         return -1;
4774     }
4775
4776     enabled = TRUE;
4777     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4778         pa_log("auth-group-enabled= expects a boolean argument.");
4779         return -1;
4780     }
4781
4782     pa_xfree(o->auth_group);
4783     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4784
4785 #ifndef HAVE_CREDS
4786     if (o->auth_group)
4787         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4788 #endif
4789
4790     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4791         pa_ip_acl *ipa;
4792
4793         if (!(ipa = pa_ip_acl_new(acl))) {
4794             pa_log("Failed to parse IP ACL '%s'", acl);
4795             return -1;
4796         }
4797
4798         if (o->auth_ip_acl)
4799             pa_ip_acl_free(o->auth_ip_acl);
4800
4801         o->auth_ip_acl = ipa;
4802     }
4803
4804     enabled = TRUE;
4805     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4806         pa_log("auth-cookie-enabled= expects a boolean argument.");
4807         return -1;
4808     }
4809
4810     if (o->auth_cookie)
4811         pa_auth_cookie_unref(o->auth_cookie);
4812
4813     if (enabled) {
4814         const char *cn;
4815
4816         /* The new name for this is 'auth-cookie', for compat reasons
4817          * we check the old name too */
4818         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4819             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4820                 cn = PA_NATIVE_COOKIE_FILE;
4821
4822         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4823             return -1;
4824
4825     } else
4826           o->auth_cookie = NULL;
4827
4828     return 0;
4829 }
4830
4831 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4832     pa_native_connection_assert_ref(c);
4833
4834     return c->pstream;
4835 }