log: place more rate limit invocations
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/native-common.h>
40 #include <pulsecore/packet.h>
41 #include <pulsecore/client.h>
42 #include <pulsecore/source-output.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/pstream.h>
45 #include <pulsecore/tagstruct.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/llist.h>
57 #include <pulsecore/creds.h>
58 #include <pulsecore/core-util.h>
59 #include <pulsecore/ipacl.h>
60 #include <pulsecore/thread-mq.h>
61
62 #include "protocol-native.h"
63
64 /* Kick a client if it doesn't authenticate within this time */
65 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
66
67 /* Don't accept more connection than this */
68 #define MAX_CONNECTIONS 64
69
70 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
71 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
72 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
73 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
74
75 struct pa_native_protocol;
76
77 typedef struct record_stream {
78     pa_msgobject parent;
79
80     pa_native_connection *connection;
81     uint32_t index;
82
83     pa_source_output *source_output;
84     pa_memblockq *memblockq;
85
86     pa_bool_t adjust_latency:1;
87     pa_bool_t early_requests:1;
88
89     pa_buffer_attr buffer_attr;
90
91     pa_atomic_t on_the_fly;
92     pa_usec_t configured_source_latency;
93     size_t drop_initial;
94
95     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
96     size_t on_the_fly_snapshot;
97     pa_usec_t current_monitor_latency;
98     pa_usec_t current_source_latency;
99 } record_stream;
100
101 PA_DECLARE_CLASS(record_stream);
102 #define RECORD_STREAM(o) (record_stream_cast(o))
103 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
104
105 typedef struct output_stream {
106     pa_msgobject parent;
107 } output_stream;
108
109 PA_DECLARE_CLASS(output_stream);
110 #define OUTPUT_STREAM(o) (output_stream_cast(o))
111 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
112
113 typedef struct playback_stream {
114     output_stream parent;
115
116     pa_native_connection *connection;
117     uint32_t index;
118
119     pa_sink_input *sink_input;
120     pa_memblockq *memblockq;
121
122     pa_bool_t adjust_latency:1;
123     pa_bool_t early_requests:1;
124
125     pa_bool_t is_underrun:1;
126     pa_bool_t drain_request:1;
127     uint32_t drain_tag;
128     uint32_t syncid;
129
130     pa_atomic_t missing;
131     pa_usec_t configured_sink_latency;
132     pa_buffer_attr buffer_attr;
133
134     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
135     int64_t read_index, write_index;
136     size_t render_memblockq_length;
137     pa_usec_t current_sink_latency;
138     uint64_t playing_for, underrun_for;
139 } playback_stream;
140
141 PA_DECLARE_CLASS(playback_stream);
142 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
143 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
144
145 typedef struct upload_stream {
146     output_stream parent;
147
148     pa_native_connection *connection;
149     uint32_t index;
150
151     pa_memchunk memchunk;
152     size_t length;
153     char *name;
154     pa_sample_spec sample_spec;
155     pa_channel_map channel_map;
156     pa_proplist *proplist;
157 } upload_stream;
158
159 PA_DECLARE_CLASS(upload_stream);
160 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
161 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
162
163 struct pa_native_connection {
164     pa_msgobject parent;
165     pa_native_protocol *protocol;
166     pa_native_options *options;
167     pa_bool_t authorized:1;
168     pa_bool_t is_local:1;
169     uint32_t version;
170     pa_client *client;
171     pa_pstream *pstream;
172     pa_pdispatch *pdispatch;
173     pa_idxset *record_streams, *output_streams;
174     uint32_t rrobin_index;
175     pa_subscription *subscription;
176     pa_time_event *auth_timeout_event;
177 };
178
179 PA_DECLARE_CLASS(pa_native_connection);
180 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
181 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
182
183 struct pa_native_protocol {
184     PA_REFCNT_DECLARE;
185
186     pa_core *core;
187     pa_idxset *connections;
188
189     pa_strlist *servers;
190     pa_hook hooks[PA_NATIVE_HOOK_MAX];
191
192     pa_hashmap *extensions;
193 };
194
195 enum {
196     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
197 };
198
199 enum {
200     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
201     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
202     SINK_INPUT_MESSAGE_FLUSH,
203     SINK_INPUT_MESSAGE_TRIGGER,
204     SINK_INPUT_MESSAGE_SEEK,
205     SINK_INPUT_MESSAGE_PREBUF_FORCE,
206     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
207     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
208 };
209
210 enum {
211     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
212     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
213     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
214     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
215     PLAYBACK_STREAM_MESSAGE_STARTED,
216     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
217 };
218
219 enum {
220     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
221 };
222
223 enum {
224     CONNECTION_MESSAGE_RELEASE,
225     CONNECTION_MESSAGE_REVOKE
226 };
227
228 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
229 static void sink_input_kill_cb(pa_sink_input *i);
230 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
231 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
232 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
235 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
236
237 static void native_connection_send_memblock(pa_native_connection *c);
238 static void playback_stream_request_bytes(struct playback_stream*s);
239
240 static void source_output_kill_cb(pa_source_output *o);
241 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
242 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
243 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
244 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
245 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
246
247 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
249
250 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
289
290 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
291     [PA_COMMAND_ERROR] = NULL,
292     [PA_COMMAND_TIMEOUT] = NULL,
293     [PA_COMMAND_REPLY] = NULL,
294     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
295     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
296     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
297     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
298     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
299     [PA_COMMAND_AUTH] = command_auth,
300     [PA_COMMAND_REQUEST] = NULL,
301     [PA_COMMAND_EXIT] = command_exit,
302     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
303     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
304     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
305     [PA_COMMAND_STAT] = command_stat,
306     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
307     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
308     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
309     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
310     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
311     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
312     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
313     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
314     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
315     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
316     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
317     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
318     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
319     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
320     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
321     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
328     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
329     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
330     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
331
332     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
333     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
334     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
335
336     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
337     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
338     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
339
340     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
341     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
342
343     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
344     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
346     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
347
348     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
349     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
350
351     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
352     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
353     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
354     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
355     [PA_COMMAND_KILL_CLIENT] = command_kill,
356     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
357     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
358     [PA_COMMAND_LOAD_MODULE] = command_load_module,
359     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
360
361     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
362     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
363     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
364     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
365
366     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
367     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
368
369     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
370     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
371
372     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
373     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
374
375     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
376     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
377     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
378
379     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
380     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
381     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
382
383     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
384
385     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
386     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
387
388     [PA_COMMAND_EXTENSION] = command_extension
389 };
390
391 /* structure management */
392
393 /* Called from main context */
394 static void upload_stream_unlink(upload_stream *s) {
395     pa_assert(s);
396
397     if (!s->connection)
398         return;
399
400     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
401     s->connection = NULL;
402     upload_stream_unref(s);
403 }
404
405 /* Called from main context */
406 static void upload_stream_free(pa_object *o) {
407     upload_stream *s = UPLOAD_STREAM(o);
408     pa_assert(s);
409
410     upload_stream_unlink(s);
411
412     pa_xfree(s->name);
413
414     if (s->proplist)
415         pa_proplist_free(s->proplist);
416
417     if (s->memchunk.memblock)
418         pa_memblock_unref(s->memchunk.memblock);
419
420     pa_xfree(s);
421 }
422
423 /* Called from main context */
424 static upload_stream* upload_stream_new(
425         pa_native_connection *c,
426         const pa_sample_spec *ss,
427         const pa_channel_map *map,
428         const char *name,
429         size_t length,
430         pa_proplist *p) {
431
432     upload_stream *s;
433
434     pa_assert(c);
435     pa_assert(ss);
436     pa_assert(name);
437     pa_assert(length > 0);
438     pa_assert(p);
439
440     s = pa_msgobject_new(upload_stream);
441     s->parent.parent.parent.free = upload_stream_free;
442     s->connection = c;
443     s->sample_spec = *ss;
444     s->channel_map = *map;
445     s->name = pa_xstrdup(name);
446     pa_memchunk_reset(&s->memchunk);
447     s->length = length;
448     s->proplist = pa_proplist_copy(p);
449     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
450
451     pa_idxset_put(c->output_streams, s, &s->index);
452
453     return s;
454 }
455
456 /* Called from main context */
457 static void record_stream_unlink(record_stream *s) {
458     pa_assert(s);
459
460     if (!s->connection)
461         return;
462
463     if (s->source_output) {
464         pa_source_output_unlink(s->source_output);
465         pa_source_output_unref(s->source_output);
466         s->source_output = NULL;
467     }
468
469     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
470     s->connection = NULL;
471     record_stream_unref(s);
472 }
473
474 /* Called from main context */
475 static void record_stream_free(pa_object *o) {
476     record_stream *s = RECORD_STREAM(o);
477     pa_assert(s);
478
479     record_stream_unlink(s);
480
481     pa_memblockq_free(s->memblockq);
482     pa_xfree(s);
483 }
484
485 /* Called from main context */
486 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
487     record_stream *s = RECORD_STREAM(o);
488     record_stream_assert_ref(s);
489
490     if (!s->connection)
491         return -1;
492
493     switch (code) {
494
495         case RECORD_STREAM_MESSAGE_POST_DATA:
496
497             /* We try to keep up to date with how many bytes are
498              * currently on the fly */
499             pa_atomic_sub(&s->on_the_fly, chunk->length);
500
501             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
502 /*                 pa_log_warn("Failed to push data into output queue."); */
503                 return -1;
504             }
505
506             if (!pa_pstream_is_pending(s->connection->pstream))
507                 native_connection_send_memblock(s->connection);
508
509             break;
510     }
511
512     return 0;
513 }
514
515 /* Called from main context */
516 static void fix_record_buffer_attr_pre(record_stream *s) {
517
518     size_t frame_size;
519     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
520
521     pa_assert(s);
522
523     /* This function will be called from the main thread, before as
524      * well as after the source output has been activated using
525      * pa_source_output_put()! That means it may not touch any
526      * ->thread_info data! */
527
528     frame_size = pa_frame_size(&s->source_output->sample_spec);
529
530     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
531         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
532     if (s->buffer_attr.maxlength <= 0)
533         s->buffer_attr.maxlength = (uint32_t) frame_size;
534
535     if (s->buffer_attr.fragsize == (uint32_t) -1)
536         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
537     if (s->buffer_attr.fragsize <= 0)
538         s->buffer_attr.fragsize = (uint32_t) frame_size;
539
540     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
541
542     if (s->early_requests) {
543
544         /* In early request mode we need to emulate the classic
545          * fragment-based playback model. We do this setting the source
546          * latency to the fragment size. */
547
548         source_usec = fragsize_usec;
549
550     } else if (s->adjust_latency) {
551
552         /* So, the user asked us to adjust the latency according to
553          * what the source can provide. Half the latency will be
554          * spent on the hw buffer, half of it in the async buffer
555          * queue we maintain for each client. */
556
557         source_usec = fragsize_usec/2;
558
559     } else {
560
561         /* Ok, the user didn't ask us to adjust the latency, hence we
562          * don't */
563
564         source_usec = (pa_usec_t) -1;
565     }
566
567     if (source_usec != (pa_usec_t) -1)
568         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
569     else
570         s->configured_source_latency = 0;
571
572     if (s->early_requests) {
573
574         /* Ok, we didn't necessarily get what we were asking for, so
575          * let's tell the user */
576
577         fragsize_usec = s->configured_source_latency;
578
579     } else if (s->adjust_latency) {
580
581         /* Now subtract what we actually got */
582
583         if (fragsize_usec >= s->configured_source_latency*2)
584             fragsize_usec -= s->configured_source_latency;
585         else
586             fragsize_usec = s->configured_source_latency;
587     }
588
589     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
590         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
591
592         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
593
594     if (s->buffer_attr.fragsize <= 0)
595         s->buffer_attr.fragsize = (uint32_t) frame_size;
596 }
597
598 /* Called from main context */
599 static void fix_record_buffer_attr_post(record_stream *s) {
600     size_t base;
601
602     pa_assert(s);
603
604     /* This function will be called from the main thread, before as
605      * well as after the source output has been activated using
606      * pa_source_output_put()! That means it may not touch and
607      * ->thread_info data! */
608
609     base = pa_frame_size(&s->source_output->sample_spec);
610
611     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
612     if (s->buffer_attr.fragsize <= 0)
613         s->buffer_attr.fragsize = base;
614
615     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
616         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
617 }
618
619 /* Called from main context */
620 static record_stream* record_stream_new(
621         pa_native_connection *c,
622         pa_source *source,
623         pa_sample_spec *ss,
624         pa_channel_map *map,
625         pa_bool_t peak_detect,
626         pa_buffer_attr *attr,
627         pa_source_output_flags_t flags,
628         pa_proplist *p,
629         pa_bool_t adjust_latency,
630         pa_sink_input *direct_on_input,
631         pa_bool_t early_requests,
632         int *ret) {
633
634     record_stream *s;
635     pa_source_output *source_output = NULL;
636     size_t base;
637     pa_source_output_new_data data;
638
639     pa_assert(c);
640     pa_assert(ss);
641     pa_assert(p);
642     pa_assert(ret);
643
644     pa_source_output_new_data_init(&data);
645
646     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
647     data.driver = __FILE__;
648     data.module = c->options->module;
649     data.client = c->client;
650     data.source = source;
651     data.direct_on_input = direct_on_input;
652     pa_source_output_new_data_set_sample_spec(&data, ss);
653     pa_source_output_new_data_set_channel_map(&data, map);
654     if (peak_detect)
655         data.resample_method = PA_RESAMPLER_PEAKS;
656
657     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
658
659     pa_source_output_new_data_done(&data);
660
661     if (!source_output)
662         return NULL;
663
664     s = pa_msgobject_new(record_stream);
665     s->parent.parent.free = record_stream_free;
666     s->parent.process_msg = record_stream_process_msg;
667     s->connection = c;
668     s->source_output = source_output;
669     s->buffer_attr = *attr;
670     s->adjust_latency = adjust_latency;
671     s->early_requests = early_requests;
672     pa_atomic_store(&s->on_the_fly, 0);
673
674     s->source_output->parent.process_msg = source_output_process_msg;
675     s->source_output->push = source_output_push_cb;
676     s->source_output->kill = source_output_kill_cb;
677     s->source_output->get_latency = source_output_get_latency_cb;
678     s->source_output->moving = source_output_moving_cb;
679     s->source_output->suspend = source_output_suspend_cb;
680     s->source_output->send_event = source_output_send_event_cb;
681     s->source_output->userdata = s;
682
683     fix_record_buffer_attr_pre(s);
684
685     s->memblockq = pa_memblockq_new(
686             0,
687             s->buffer_attr.maxlength,
688             0,
689             base = pa_frame_size(&source_output->sample_spec),
690             1,
691             0,
692             0,
693             NULL);
694
695     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
696     fix_record_buffer_attr_post(s);
697
698     *ss = s->source_output->sample_spec;
699     *map = s->source_output->channel_map;
700
701     pa_idxset_put(c->record_streams, s, &s->index);
702
703     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
704                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
705                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
706                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
707
708     pa_source_output_put(s->source_output);
709     return s;
710 }
711
712 /* Called from main context */
713 static void record_stream_send_killed(record_stream *r) {
714     pa_tagstruct *t;
715     record_stream_assert_ref(r);
716
717     t = pa_tagstruct_new(NULL, 0);
718     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
719     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
720     pa_tagstruct_putu32(t, r->index);
721     pa_pstream_send_tagstruct(r->connection->pstream, t);
722 }
723
724 /* Called from main context */
725 static void playback_stream_unlink(playback_stream *s) {
726     pa_assert(s);
727
728     if (!s->connection)
729         return;
730
731     if (s->sink_input) {
732         pa_sink_input_unlink(s->sink_input);
733         pa_sink_input_unref(s->sink_input);
734         s->sink_input = NULL;
735     }
736
737     if (s->drain_request)
738         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
739
740     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
741     s->connection = NULL;
742     playback_stream_unref(s);
743 }
744
745 /* Called from main context */
746 static void playback_stream_free(pa_object* o) {
747     playback_stream *s = PLAYBACK_STREAM(o);
748     pa_assert(s);
749
750     playback_stream_unlink(s);
751
752     pa_memblockq_free(s->memblockq);
753     pa_xfree(s);
754 }
755
756 /* Called from main context */
757 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
758     playback_stream *s = PLAYBACK_STREAM(o);
759     playback_stream_assert_ref(s);
760
761     if (!s->connection)
762         return -1;
763
764     switch (code) {
765
766         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
767             pa_tagstruct *t;
768             int l = 0;
769
770             for (;;) {
771                 if ((l = pa_atomic_load(&s->missing)) <= 0)
772                     return 0;
773
774                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
775                     break;
776             }
777
778             t = pa_tagstruct_new(NULL, 0);
779             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
780             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
781             pa_tagstruct_putu32(t, s->index);
782             pa_tagstruct_putu32(t, (uint32_t) l);
783             pa_pstream_send_tagstruct(s->connection->pstream, t);
784
785 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
786             break;
787         }
788
789         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
790             pa_tagstruct *t;
791
792 /*             pa_log("signalling underflow"); */
793
794             /* Report that we're empty */
795             t = pa_tagstruct_new(NULL, 0);
796             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
797             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
798             pa_tagstruct_putu32(t, s->index);
799             pa_pstream_send_tagstruct(s->connection->pstream, t);
800             break;
801         }
802
803         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
804             pa_tagstruct *t;
805
806             /* Notify the user we're overflowed*/
807             t = pa_tagstruct_new(NULL, 0);
808             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
809             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
810             pa_tagstruct_putu32(t, s->index);
811             pa_pstream_send_tagstruct(s->connection->pstream, t);
812             break;
813         }
814
815         case PLAYBACK_STREAM_MESSAGE_STARTED:
816
817             if (s->connection->version >= 13) {
818                 pa_tagstruct *t;
819
820                 /* Notify the user we started playback */
821                 t = pa_tagstruct_new(NULL, 0);
822                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
823                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
824                 pa_tagstruct_putu32(t, s->index);
825                 pa_pstream_send_tagstruct(s->connection->pstream, t);
826             }
827
828             break;
829
830         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
831             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
832             break;
833
834         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
835             pa_tagstruct *t;
836
837             s->buffer_attr.tlength = (uint32_t) offset;
838
839             t = pa_tagstruct_new(NULL, 0);
840             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
841             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
842             pa_tagstruct_putu32(t, s->index);
843             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
844             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
845             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
846             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
847             pa_tagstruct_put_usec(t, s->configured_sink_latency);
848             pa_pstream_send_tagstruct(s->connection->pstream, t);
849
850             break;
851         }
852     }
853
854     return 0;
855 }
856
857 /* Called from main context */
858 static void fix_playback_buffer_attr(playback_stream *s) {
859     size_t frame_size, max_prebuf;
860     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
861
862     pa_assert(s);
863
864     /* This function will be called from the main thread, before as
865      * well as after the sink input has been activated using
866      * pa_sink_input_put()! That means it may not touch any
867      * ->thread_info data, such as the memblockq! */
868
869     frame_size = pa_frame_size(&s->sink_input->sample_spec);
870
871     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
872         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
873     if (s->buffer_attr.maxlength <= 0)
874         s->buffer_attr.maxlength = (uint32_t) frame_size;
875
876     if (s->buffer_attr.tlength == (uint32_t) -1)
877         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
878     if (s->buffer_attr.tlength <= 0)
879         s->buffer_attr.tlength = (uint32_t) frame_size;
880
881     if (s->buffer_attr.minreq == (uint32_t) -1)
882         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
883     if (s->buffer_attr.minreq <= 0)
884         s->buffer_attr.minreq = (uint32_t) frame_size;
885
886     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
887         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
888
889     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
890     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
891
892     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
893                 (double) tlength_usec / PA_USEC_PER_MSEC,
894                 (double) minreq_usec / PA_USEC_PER_MSEC);
895
896     if (s->early_requests) {
897
898         /* In early request mode we need to emulate the classic
899          * fragment-based playback model. We do this setting the sink
900          * latency to the fragment size. */
901
902         sink_usec = minreq_usec;
903         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
904
905     } else if (s->adjust_latency) {
906
907         /* So, the user asked us to adjust the latency of the stream
908          * buffer according to the what the sink can provide. The
909          * tlength passed in shall be the overall latency. Roughly
910          * half the latency will be spent on the hw buffer, the other
911          * half of it in the async buffer queue we maintain for each
912          * client. In between we'll have a safety space of size
913          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
914          * empty and needs to be filled, then our buffer must have
915          * enough data to fulfill this request immediatly and thus
916          * have at least the same tlength as the size of the hw
917          * buffer. It additionally needs space for 2 times minreq
918          * because if the buffer ran empty and a partial fillup
919          * happens immediately on the next iteration we need to be
920          * able to fulfill it and give the application also minreq
921          * time to fill it up again for the next request Makes 2 times
922          * minreq in plus.. */
923
924         if (tlength_usec > minreq_usec*2)
925             sink_usec = (tlength_usec - minreq_usec*2)/2;
926         else
927             sink_usec = 0;
928
929         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
930
931     } else {
932
933         /* Ok, the user didn't ask us to adjust the latency, but we
934          * still need to make sure that the parameters from the user
935          * do make sense. */
936
937         if (tlength_usec > minreq_usec*2)
938             sink_usec = (tlength_usec - minreq_usec*2);
939         else
940             sink_usec = 0;
941
942         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
943     }
944
945     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
946
947     if (s->early_requests) {
948
949         /* Ok, we didn't necessarily get what we were asking for, so
950          * let's tell the user */
951
952         minreq_usec = s->configured_sink_latency;
953
954     } else if (s->adjust_latency) {
955
956         /* Ok, we didn't necessarily get what we were asking for, so
957          * let's subtract from what we asked for for the remaining
958          * buffer space */
959
960         if (tlength_usec >= s->configured_sink_latency)
961             tlength_usec -= s->configured_sink_latency;
962     }
963
964     /* FIXME: This is actually larger than necessary, since not all of
965      * the sink latency is actually rewritable. */
966     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
967         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
968
969     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
970         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
971         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
972
973     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
974         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
975         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
976
977     if (s->buffer_attr.minreq <= 0) {
978         s->buffer_attr.minreq = (uint32_t) frame_size;
979         s->buffer_attr.tlength += (uint32_t) frame_size*2;
980     }
981
982     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
983         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
984
985     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
986
987     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
988         s->buffer_attr.prebuf > max_prebuf)
989         s->buffer_attr.prebuf = max_prebuf;
990 }
991
992 /* Called from main context */
993 static playback_stream* playback_stream_new(
994         pa_native_connection *c,
995         pa_sink *sink,
996         pa_sample_spec *ss,
997         pa_channel_map *map,
998         pa_buffer_attr *a,
999         pa_cvolume *volume,
1000         pa_bool_t muted,
1001         pa_bool_t muted_set,
1002         uint32_t syncid,
1003         uint32_t *missing,
1004         pa_sink_input_flags_t flags,
1005         pa_proplist *p,
1006         pa_bool_t adjust_latency,
1007         pa_bool_t early_requests,
1008         int *ret) {
1009
1010     playback_stream *s, *ssync;
1011     pa_sink_input *sink_input = NULL;
1012     pa_memchunk silence;
1013     uint32_t idx;
1014     int64_t start_index;
1015     pa_sink_input_new_data data;
1016
1017     pa_assert(c);
1018     pa_assert(ss);
1019     pa_assert(missing);
1020     pa_assert(p);
1021     pa_assert(ret);
1022
1023     /* Find syncid group */
1024     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1025
1026         if (!playback_stream_isinstance(ssync))
1027             continue;
1028
1029         if (ssync->syncid == syncid)
1030             break;
1031     }
1032
1033     /* Synced streams must connect to the same sink */
1034     if (ssync) {
1035
1036         if (!sink)
1037             sink = ssync->sink_input->sink;
1038         else if (sink != ssync->sink_input->sink) {
1039             *ret = PA_ERR_INVALID;
1040             return NULL;
1041         }
1042     }
1043
1044     pa_sink_input_new_data_init(&data);
1045
1046     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1047     data.driver = __FILE__;
1048     data.module = c->options->module;
1049     data.client = c->client;
1050     data.sink = sink;
1051     pa_sink_input_new_data_set_sample_spec(&data, ss);
1052     pa_sink_input_new_data_set_channel_map(&data, map);
1053     if (volume)
1054         pa_sink_input_new_data_set_volume(&data, volume);
1055     if (muted_set)
1056         pa_sink_input_new_data_set_muted(&data, muted);
1057     data.sync_base = ssync ? ssync->sink_input : NULL;
1058
1059     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1060
1061     pa_sink_input_new_data_done(&data);
1062
1063     if (!sink_input)
1064         return NULL;
1065
1066     s = pa_msgobject_new(playback_stream);
1067     s->parent.parent.parent.free = playback_stream_free;
1068     s->parent.parent.process_msg = playback_stream_process_msg;
1069     s->connection = c;
1070     s->syncid = syncid;
1071     s->sink_input = sink_input;
1072     s->is_underrun = TRUE;
1073     s->drain_request = FALSE;
1074     pa_atomic_store(&s->missing, 0);
1075     s->buffer_attr = *a;
1076     s->adjust_latency = adjust_latency;
1077     s->early_requests = early_requests;
1078
1079     s->sink_input->parent.process_msg = sink_input_process_msg;
1080     s->sink_input->pop = sink_input_pop_cb;
1081     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1082     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1083     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1084     s->sink_input->kill = sink_input_kill_cb;
1085     s->sink_input->moving = sink_input_moving_cb;
1086     s->sink_input->suspend = sink_input_suspend_cb;
1087     s->sink_input->send_event = sink_input_send_event_cb;
1088     s->sink_input->userdata = s;
1089
1090     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1091
1092     fix_playback_buffer_attr(s);
1093
1094     pa_sink_input_get_silence(sink_input, &silence);
1095     s->memblockq = pa_memblockq_new(
1096             start_index,
1097             s->buffer_attr.maxlength,
1098             s->buffer_attr.tlength,
1099             pa_frame_size(&sink_input->sample_spec),
1100             s->buffer_attr.prebuf,
1101             s->buffer_attr.minreq,
1102             0,
1103             &silence);
1104     pa_memblock_unref(silence.memblock);
1105
1106     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1107
1108     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1109
1110     *ss = s->sink_input->sample_spec;
1111     *map = s->sink_input->channel_map;
1112
1113     pa_idxset_put(c->output_streams, s, &s->index);
1114
1115     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1116                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1117                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1118                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1119                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1120
1121     pa_sink_input_put(s->sink_input);
1122     return s;
1123 }
1124
1125 /* Called from IO context */
1126 static void playback_stream_request_bytes(playback_stream *s) {
1127     size_t m, minreq;
1128     int previous_missing;
1129
1130     playback_stream_assert_ref(s);
1131
1132     m = pa_memblockq_pop_missing(s->memblockq);
1133
1134     if (m <= 0)
1135         return;
1136
1137 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1138
1139     previous_missing = pa_atomic_add(&s->missing, (int) m);
1140     minreq = pa_memblockq_get_minreq(s->memblockq);
1141
1142     if (pa_memblockq_prebuf_active(s->memblockq) ||
1143         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1144         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1145 }
1146
1147 /* Called from main context */
1148 static void playback_stream_send_killed(playback_stream *p) {
1149     pa_tagstruct *t;
1150     playback_stream_assert_ref(p);
1151
1152     t = pa_tagstruct_new(NULL, 0);
1153     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1154     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1155     pa_tagstruct_putu32(t, p->index);
1156     pa_pstream_send_tagstruct(p->connection->pstream, t);
1157 }
1158
1159 /* Called from main context */
1160 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1161     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1162     pa_native_connection_assert_ref(c);
1163
1164     if (!c->protocol)
1165         return -1;
1166
1167     switch (code) {
1168
1169         case CONNECTION_MESSAGE_REVOKE:
1170             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1171             break;
1172
1173         case CONNECTION_MESSAGE_RELEASE:
1174             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1175             break;
1176     }
1177
1178     return 0;
1179 }
1180
1181 /* Called from main context */
1182 static void native_connection_unlink(pa_native_connection *c) {
1183     record_stream *r;
1184     output_stream *o;
1185
1186     pa_assert(c);
1187
1188     if (!c->protocol)
1189         return;
1190
1191     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1192
1193     if (c->options)
1194         pa_native_options_unref(c->options);
1195
1196     while ((r = pa_idxset_first(c->record_streams, NULL)))
1197         record_stream_unlink(r);
1198
1199     while ((o = pa_idxset_first(c->output_streams, NULL)))
1200         if (playback_stream_isinstance(o))
1201             playback_stream_unlink(PLAYBACK_STREAM(o));
1202         else
1203             upload_stream_unlink(UPLOAD_STREAM(o));
1204
1205     if (c->subscription)
1206         pa_subscription_free(c->subscription);
1207
1208     if (c->pstream)
1209         pa_pstream_unlink(c->pstream);
1210
1211     if (c->auth_timeout_event) {
1212         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1213         c->auth_timeout_event = NULL;
1214     }
1215
1216     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1217     c->protocol = NULL;
1218     pa_native_connection_unref(c);
1219 }
1220
1221 /* Called from main context */
1222 static void native_connection_free(pa_object *o) {
1223     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1224
1225     pa_assert(c);
1226
1227     native_connection_unlink(c);
1228
1229     pa_idxset_free(c->record_streams, NULL, NULL);
1230     pa_idxset_free(c->output_streams, NULL, NULL);
1231
1232     pa_pdispatch_unref(c->pdispatch);
1233     pa_pstream_unref(c->pstream);
1234     pa_client_free(c->client);
1235
1236     pa_xfree(c);
1237 }
1238
1239 /* Called from main context */
1240 static void native_connection_send_memblock(pa_native_connection *c) {
1241     uint32_t start;
1242     record_stream *r;
1243
1244     start = PA_IDXSET_INVALID;
1245     for (;;) {
1246         pa_memchunk chunk;
1247
1248         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1249             return;
1250
1251         if (start == PA_IDXSET_INVALID)
1252             start = c->rrobin_index;
1253         else if (start == c->rrobin_index)
1254             return;
1255
1256         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1257             pa_memchunk schunk = chunk;
1258
1259             if (schunk.length > r->buffer_attr.fragsize)
1260                 schunk.length = r->buffer_attr.fragsize;
1261
1262             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1263
1264             pa_memblockq_drop(r->memblockq, schunk.length);
1265             pa_memblock_unref(schunk.memblock);
1266
1267             return;
1268         }
1269     }
1270 }
1271
1272 /*** sink input callbacks ***/
1273
1274 /* Called from thread context */
1275 static void handle_seek(playback_stream *s, int64_t indexw) {
1276     playback_stream_assert_ref(s);
1277
1278 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1279
1280     if (s->sink_input->thread_info.underrun_for > 0) {
1281
1282 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1283
1284         if (pa_memblockq_is_readable(s->memblockq)) {
1285
1286             /* We just ended an underrun, let's ask the sink
1287              * for a complete rewind rewrite */
1288
1289             pa_log_debug("Requesting rewind due to end of underrun.");
1290             pa_sink_input_request_rewind(s->sink_input,
1291                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1292                                          FALSE, TRUE, FALSE);
1293         }
1294
1295     } else {
1296         int64_t indexr;
1297
1298         indexr = pa_memblockq_get_read_index(s->memblockq);
1299
1300         if (indexw < indexr) {
1301             /* OK, the sink already asked for this data, so
1302              * let's have it usk us again */
1303
1304             pa_log_debug("Requesting rewind due to rewrite.");
1305             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1306         }
1307     }
1308
1309     playback_stream_request_bytes(s);
1310 }
1311
1312 /* Called from thread context */
1313 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1314     pa_sink_input *i = PA_SINK_INPUT(o);
1315     playback_stream *s;
1316
1317     pa_sink_input_assert_ref(i);
1318     s = PLAYBACK_STREAM(i->userdata);
1319     playback_stream_assert_ref(s);
1320
1321     switch (code) {
1322
1323         case SINK_INPUT_MESSAGE_SEEK: {
1324             int64_t windex;
1325
1326             windex = pa_memblockq_get_write_index(s->memblockq);
1327
1328             /* The client side is incapable of accounting correctly
1329              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1330              * able to deal with that. */
1331
1332             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1333
1334             handle_seek(s, windex);
1335             return 0;
1336         }
1337
1338         case SINK_INPUT_MESSAGE_POST_DATA: {
1339             int64_t windex;
1340
1341             pa_assert(chunk);
1342
1343             windex = pa_memblockq_get_write_index(s->memblockq);
1344
1345 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1346
1347             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1348
1349                 if (pa_log_ratelimit())
1350                     pa_log_warn("Failed to push data into queue");
1351                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1352                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1353             }
1354
1355             handle_seek(s, windex);
1356
1357 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1358
1359             return 0;
1360         }
1361
1362         case SINK_INPUT_MESSAGE_DRAIN:
1363         case SINK_INPUT_MESSAGE_FLUSH:
1364         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1365         case SINK_INPUT_MESSAGE_TRIGGER: {
1366
1367             int64_t windex;
1368             pa_sink_input *isync;
1369             void (*func)(pa_memblockq *bq);
1370
1371             switch  (code) {
1372                 case SINK_INPUT_MESSAGE_FLUSH:
1373                     func = pa_memblockq_flush_write;
1374                     break;
1375
1376                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1377                     func = pa_memblockq_prebuf_force;
1378                     break;
1379
1380                 case SINK_INPUT_MESSAGE_DRAIN:
1381                 case SINK_INPUT_MESSAGE_TRIGGER:
1382                     func = pa_memblockq_prebuf_disable;
1383                     break;
1384
1385                 default:
1386                     pa_assert_not_reached();
1387             }
1388
1389             windex = pa_memblockq_get_write_index(s->memblockq);
1390             func(s->memblockq);
1391             handle_seek(s, windex);
1392
1393             /* Do the same for all other members in the sync group */
1394             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1395                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1396                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1397                 func(ssync->memblockq);
1398                 handle_seek(ssync, windex);
1399             }
1400
1401             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1402                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1403                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1404                 func(ssync->memblockq);
1405                 handle_seek(ssync, windex);
1406             }
1407
1408             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1409                 if (!pa_memblockq_is_readable(s->memblockq))
1410                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1411                 else {
1412                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1413                     s->drain_request = TRUE;
1414                 }
1415             }
1416
1417             return 0;
1418         }
1419
1420         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1421             /* Atomically get a snapshot of all timing parameters... */
1422             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1423             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1424             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1425             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1426             s->underrun_for = s->sink_input->thread_info.underrun_for;
1427             s->playing_for = s->sink_input->thread_info.playing_for;
1428
1429             return 0;
1430
1431         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1432             int64_t windex;
1433
1434             windex = pa_memblockq_get_write_index(s->memblockq);
1435
1436             pa_memblockq_prebuf_force(s->memblockq);
1437
1438             handle_seek(s, windex);
1439
1440             /* Fall through to the default handler */
1441             break;
1442         }
1443
1444         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1445             pa_usec_t *r = userdata;
1446
1447             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1448
1449             /* Fall through, the default handler will add in the extra
1450              * latency added by the resampler */
1451             break;
1452         }
1453
1454         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1455             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1456             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1457             return 0;
1458         }
1459     }
1460
1461     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1462 }
1463
1464 /* Called from thread context */
1465 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1466     playback_stream *s;
1467
1468     pa_sink_input_assert_ref(i);
1469     s = PLAYBACK_STREAM(i->userdata);
1470     playback_stream_assert_ref(s);
1471     pa_assert(chunk);
1472
1473 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1474
1475     if (pa_memblockq_is_readable(s->memblockq))
1476         s->is_underrun = FALSE;
1477     else {
1478         if (!s->is_underrun)
1479             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1480
1481         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1482             s->drain_request = FALSE;
1483             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1484         } else if (!s->is_underrun)
1485             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1486
1487         s->is_underrun = TRUE;
1488
1489         playback_stream_request_bytes(s);
1490     }
1491
1492     /* This call will not fail with prebuf=0, hence we check for
1493        underrun explicitly above */
1494     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1495         return -1;
1496
1497     chunk->length = PA_MIN(nbytes, chunk->length);
1498
1499     if (i->thread_info.underrun_for > 0)
1500         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1501
1502     pa_memblockq_drop(s->memblockq, chunk->length);
1503     playback_stream_request_bytes(s);
1504
1505     return 0;
1506 }
1507
1508 /* Called from thread context */
1509 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1510     playback_stream *s;
1511
1512     pa_sink_input_assert_ref(i);
1513     s = PLAYBACK_STREAM(i->userdata);
1514     playback_stream_assert_ref(s);
1515
1516     /* If we are in an underrun, then we don't rewind */
1517     if (i->thread_info.underrun_for > 0)
1518         return;
1519
1520     pa_memblockq_rewind(s->memblockq, nbytes);
1521 }
1522
1523 /* Called from thread context */
1524 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1525     playback_stream *s;
1526
1527     pa_sink_input_assert_ref(i);
1528     s = PLAYBACK_STREAM(i->userdata);
1529     playback_stream_assert_ref(s);
1530
1531     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1532 }
1533
1534 /* Called from thread context */
1535 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1536     playback_stream *s;
1537     size_t new_tlength, old_tlength;
1538
1539     pa_sink_input_assert_ref(i);
1540     s = PLAYBACK_STREAM(i->userdata);
1541     playback_stream_assert_ref(s);
1542
1543     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1544     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1545
1546     if (old_tlength < new_tlength) {
1547         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1548         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1549         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1550
1551         if (new_tlength == old_tlength)
1552             pa_log_debug("Failed to increase tlength");
1553         else {
1554             pa_log_debug("Notifying client about increased tlength");
1555             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1556         }
1557     }
1558 }
1559
1560 /* Called from main context */
1561 static void sink_input_kill_cb(pa_sink_input *i) {
1562     playback_stream *s;
1563
1564     pa_sink_input_assert_ref(i);
1565     s = PLAYBACK_STREAM(i->userdata);
1566     playback_stream_assert_ref(s);
1567
1568     playback_stream_send_killed(s);
1569     playback_stream_unlink(s);
1570 }
1571
1572 /* Called from main context */
1573 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1574     playback_stream *s;
1575     pa_tagstruct *t;
1576
1577     pa_sink_input_assert_ref(i);
1578     s = PLAYBACK_STREAM(i->userdata);
1579     playback_stream_assert_ref(s);
1580
1581     if (s->connection->version < 15)
1582       return;
1583
1584     t = pa_tagstruct_new(NULL, 0);
1585     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1586     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1587     pa_tagstruct_putu32(t, s->index);
1588     pa_tagstruct_puts(t, event);
1589     pa_tagstruct_put_proplist(t, pl);
1590     pa_pstream_send_tagstruct(s->connection->pstream, t);
1591 }
1592
1593 /* Called from main context */
1594 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1595     playback_stream *s;
1596     pa_tagstruct *t;
1597
1598     pa_sink_input_assert_ref(i);
1599     s = PLAYBACK_STREAM(i->userdata);
1600     playback_stream_assert_ref(s);
1601
1602     if (s->connection->version < 12)
1603       return;
1604
1605     t = pa_tagstruct_new(NULL, 0);
1606     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1607     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1608     pa_tagstruct_putu32(t, s->index);
1609     pa_tagstruct_put_boolean(t, suspend);
1610     pa_pstream_send_tagstruct(s->connection->pstream, t);
1611 }
1612
1613 /* Called from main context */
1614 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1615     playback_stream *s;
1616     pa_tagstruct *t;
1617
1618     pa_sink_input_assert_ref(i);
1619     s = PLAYBACK_STREAM(i->userdata);
1620     playback_stream_assert_ref(s);
1621
1622     if (!dest)
1623         return;
1624
1625     fix_playback_buffer_attr(s);
1626     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1627     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1628
1629     if (s->connection->version < 12)
1630       return;
1631
1632     t = pa_tagstruct_new(NULL, 0);
1633     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1634     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1635     pa_tagstruct_putu32(t, s->index);
1636     pa_tagstruct_putu32(t, dest->index);
1637     pa_tagstruct_puts(t, dest->name);
1638     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1639
1640     if (s->connection->version >= 13) {
1641         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1642         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1643         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1644         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1645         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1646     }
1647
1648     pa_pstream_send_tagstruct(s->connection->pstream, t);
1649 }
1650
1651 /*** source_output callbacks ***/
1652
1653 /* Called from thread context */
1654 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1655     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1656     record_stream *s;
1657
1658     pa_source_output_assert_ref(o);
1659     s = RECORD_STREAM(o->userdata);
1660     record_stream_assert_ref(s);
1661
1662     switch (code) {
1663         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1664             /* Atomically get a snapshot of all timing parameters... */
1665             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1666             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1667             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1668             return 0;
1669     }
1670
1671     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1672 }
1673
1674 /* Called from thread context */
1675 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1676     record_stream *s;
1677
1678     pa_source_output_assert_ref(o);
1679     s = RECORD_STREAM(o->userdata);
1680     record_stream_assert_ref(s);
1681     pa_assert(chunk);
1682
1683     pa_atomic_add(&s->on_the_fly, chunk->length);
1684     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1685 }
1686
1687 static void source_output_kill_cb(pa_source_output *o) {
1688     record_stream *s;
1689
1690     pa_source_output_assert_ref(o);
1691     s = RECORD_STREAM(o->userdata);
1692     record_stream_assert_ref(s);
1693
1694     record_stream_send_killed(s);
1695     record_stream_unlink(s);
1696 }
1697
1698 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1699     record_stream *s;
1700
1701     pa_source_output_assert_ref(o);
1702     s = RECORD_STREAM(o->userdata);
1703     record_stream_assert_ref(s);
1704
1705     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1706
1707     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1708 }
1709
1710 /* Called from main context */
1711 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1712     record_stream *s;
1713     pa_tagstruct *t;
1714
1715     pa_source_output_assert_ref(o);
1716     s = RECORD_STREAM(o->userdata);
1717     record_stream_assert_ref(s);
1718
1719     if (s->connection->version < 15)
1720       return;
1721
1722     t = pa_tagstruct_new(NULL, 0);
1723     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1724     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1725     pa_tagstruct_putu32(t, s->index);
1726     pa_tagstruct_puts(t, event);
1727     pa_tagstruct_put_proplist(t, pl);
1728     pa_pstream_send_tagstruct(s->connection->pstream, t);
1729 }
1730
1731 /* Called from main context */
1732 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1733     record_stream *s;
1734     pa_tagstruct *t;
1735
1736     pa_source_output_assert_ref(o);
1737     s = RECORD_STREAM(o->userdata);
1738     record_stream_assert_ref(s);
1739
1740     if (s->connection->version < 12)
1741       return;
1742
1743     t = pa_tagstruct_new(NULL, 0);
1744     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1745     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1746     pa_tagstruct_putu32(t, s->index);
1747     pa_tagstruct_put_boolean(t, suspend);
1748     pa_pstream_send_tagstruct(s->connection->pstream, t);
1749 }
1750
1751 /* Called from main context */
1752 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1753     record_stream *s;
1754     pa_tagstruct *t;
1755
1756     pa_source_output_assert_ref(o);
1757     s = RECORD_STREAM(o->userdata);
1758     record_stream_assert_ref(s);
1759
1760     if (!dest)
1761         return;
1762
1763     fix_record_buffer_attr_pre(s);
1764     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1765     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1766     fix_record_buffer_attr_post(s);
1767
1768     if (s->connection->version < 12)
1769       return;
1770
1771     t = pa_tagstruct_new(NULL, 0);
1772     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1773     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1774     pa_tagstruct_putu32(t, s->index);
1775     pa_tagstruct_putu32(t, dest->index);
1776     pa_tagstruct_puts(t, dest->name);
1777     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1778
1779     if (s->connection->version >= 13) {
1780         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1781         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1782         pa_tagstruct_put_usec(t, s->configured_source_latency);
1783     }
1784
1785     pa_pstream_send_tagstruct(s->connection->pstream, t);
1786 }
1787
1788 /*** pdispatch callbacks ***/
1789
1790 static void protocol_error(pa_native_connection *c) {
1791     pa_log("protocol error, kicking client");
1792     native_connection_unlink(c);
1793 }
1794
1795 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1796 if (!(expression)) { \
1797     pa_pstream_send_error((pstream), (tag), (error)); \
1798     return; \
1799 } \
1800 } while(0);
1801
1802 static pa_tagstruct *reply_new(uint32_t tag) {
1803     pa_tagstruct *reply;
1804
1805     reply = pa_tagstruct_new(NULL, 0);
1806     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1807     pa_tagstruct_putu32(reply, tag);
1808     return reply;
1809 }
1810
1811 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1812     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1813     playback_stream *s;
1814     uint32_t sink_index, syncid, missing;
1815     pa_buffer_attr attr;
1816     const char *name = NULL, *sink_name;
1817     pa_sample_spec ss;
1818     pa_channel_map map;
1819     pa_tagstruct *reply;
1820     pa_sink *sink = NULL;
1821     pa_cvolume volume;
1822     pa_bool_t
1823         corked = FALSE,
1824         no_remap = FALSE,
1825         no_remix = FALSE,
1826         fix_format = FALSE,
1827         fix_rate = FALSE,
1828         fix_channels = FALSE,
1829         no_move = FALSE,
1830         variable_rate = FALSE,
1831         muted = FALSE,
1832         adjust_latency = FALSE,
1833         early_requests = FALSE,
1834         dont_inhibit_auto_suspend = FALSE,
1835         muted_set = FALSE,
1836         fail_on_suspend = FALSE;
1837     pa_sink_input_flags_t flags = 0;
1838     pa_proplist *p;
1839     pa_bool_t volume_set = TRUE;
1840     int ret = PA_ERR_INVALID;
1841
1842     pa_native_connection_assert_ref(c);
1843     pa_assert(t);
1844     memset(&attr, 0, sizeof(attr));
1845
1846     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1847         pa_tagstruct_get(
1848                 t,
1849                 PA_TAG_SAMPLE_SPEC, &ss,
1850                 PA_TAG_CHANNEL_MAP, &map,
1851                 PA_TAG_U32, &sink_index,
1852                 PA_TAG_STRING, &sink_name,
1853                 PA_TAG_U32, &attr.maxlength,
1854                 PA_TAG_BOOLEAN, &corked,
1855                 PA_TAG_U32, &attr.tlength,
1856                 PA_TAG_U32, &attr.prebuf,
1857                 PA_TAG_U32, &attr.minreq,
1858                 PA_TAG_U32, &syncid,
1859                 PA_TAG_CVOLUME, &volume,
1860                 PA_TAG_INVALID) < 0) {
1861
1862         protocol_error(c);
1863         return;
1864     }
1865
1866     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1867     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1868     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1869     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1870     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1871     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1872     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1873     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1874
1875     p = pa_proplist_new();
1876
1877     if (name)
1878         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1879
1880     if (c->version >= 12)  {
1881         /* Since 0.9.8 the user can ask for a couple of additional flags */
1882
1883         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1884             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1885             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1886             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1887             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1888             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1889             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1890
1891             protocol_error(c);
1892             pa_proplist_free(p);
1893             return;
1894         }
1895     }
1896
1897     if (c->version >= 13) {
1898
1899         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1900             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1901             pa_tagstruct_get_proplist(t, p) < 0) {
1902             protocol_error(c);
1903             pa_proplist_free(p);
1904             return;
1905         }
1906     }
1907
1908     if (c->version >= 14) {
1909
1910         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1911             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1912             protocol_error(c);
1913             pa_proplist_free(p);
1914             return;
1915         }
1916     }
1917
1918     if (c->version >= 15) {
1919
1920         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1921             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1922             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1923             protocol_error(c);
1924             pa_proplist_free(p);
1925             return;
1926         }
1927     }
1928
1929     if (!pa_tagstruct_eof(t)) {
1930         protocol_error(c);
1931         pa_proplist_free(p);
1932         return;
1933     }
1934
1935     if (sink_index != PA_INVALID_INDEX) {
1936
1937         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1938             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1939             pa_proplist_free(p);
1940             return;
1941         }
1942
1943     } else if (sink_name) {
1944
1945         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1946             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1947             pa_proplist_free(p);
1948             return;
1949         }
1950     }
1951
1952     flags =
1953         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1954         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1955         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1956         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1957         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1958         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1959         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1960         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1961         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1962         (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0);
1963
1964     /* Only since protocol version 15 there's a seperate muted_set
1965      * flag. For older versions we synthesize it here */
1966     muted_set = muted_set || muted;
1967
1968     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1969     pa_proplist_free(p);
1970
1971     CHECK_VALIDITY(c->pstream, s, tag, ret);
1972
1973     reply = reply_new(tag);
1974     pa_tagstruct_putu32(reply, s->index);
1975     pa_assert(s->sink_input);
1976     pa_tagstruct_putu32(reply, s->sink_input->index);
1977     pa_tagstruct_putu32(reply, missing);
1978
1979 /*     pa_log("initial request is %u", missing); */
1980
1981     if (c->version >= 9) {
1982         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1983
1984         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1985         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1986         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1987         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1988     }
1989
1990     if (c->version >= 12) {
1991         /* Since 0.9.8 we support sending the chosen sample
1992          * spec/channel map/device/suspend status back to the
1993          * client */
1994
1995         pa_tagstruct_put_sample_spec(reply, &ss);
1996         pa_tagstruct_put_channel_map(reply, &map);
1997
1998         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1999         pa_tagstruct_puts(reply, s->sink_input->sink->name);
2000
2001         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2002     }
2003
2004     if (c->version >= 13)
2005         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2006
2007     pa_pstream_send_tagstruct(c->pstream, reply);
2008 }
2009
2010 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2011     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2012     uint32_t channel;
2013
2014     pa_native_connection_assert_ref(c);
2015     pa_assert(t);
2016
2017     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2018         !pa_tagstruct_eof(t)) {
2019         protocol_error(c);
2020         return;
2021     }
2022
2023     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2024
2025     switch (command) {
2026
2027         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2028             playback_stream *s;
2029             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2030                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2031                 return;
2032             }
2033
2034             playback_stream_unlink(s);
2035             break;
2036         }
2037
2038         case PA_COMMAND_DELETE_RECORD_STREAM: {
2039             record_stream *s;
2040             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2041                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2042                 return;
2043             }
2044
2045             record_stream_unlink(s);
2046             break;
2047         }
2048
2049         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2050             upload_stream *s;
2051
2052             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2053                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2054                 return;
2055             }
2056
2057             upload_stream_unlink(s);
2058             break;
2059         }
2060
2061         default:
2062             pa_assert_not_reached();
2063     }
2064
2065     pa_pstream_send_simple_ack(c->pstream, tag);
2066 }
2067
2068 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2069     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2070     record_stream *s;
2071     pa_buffer_attr attr;
2072     uint32_t source_index;
2073     const char *name = NULL, *source_name;
2074     pa_sample_spec ss;
2075     pa_channel_map map;
2076     pa_tagstruct *reply;
2077     pa_source *source = NULL;
2078     pa_bool_t
2079         corked = FALSE,
2080         no_remap = FALSE,
2081         no_remix = FALSE,
2082         fix_format = FALSE,
2083         fix_rate = FALSE,
2084         fix_channels = FALSE,
2085         no_move = FALSE,
2086         variable_rate = FALSE,
2087         adjust_latency = FALSE,
2088         peak_detect = FALSE,
2089         early_requests = FALSE,
2090         dont_inhibit_auto_suspend = FALSE,
2091         fail_on_suspend = FALSE;
2092     pa_source_output_flags_t flags = 0;
2093     pa_proplist *p;
2094     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2095     pa_sink_input *direct_on_input = NULL;
2096     int ret = PA_ERR_INVALID;
2097
2098     pa_native_connection_assert_ref(c);
2099     pa_assert(t);
2100
2101     memset(&attr, 0, sizeof(attr));
2102
2103     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2104         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2105         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2106         pa_tagstruct_getu32(t, &source_index) < 0 ||
2107         pa_tagstruct_gets(t, &source_name) < 0 ||
2108         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2109         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2110         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2111         protocol_error(c);
2112         return;
2113     }
2114
2115     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2116     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2117     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2118     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2119     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2120     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2121     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2122
2123     p = pa_proplist_new();
2124
2125     if (name)
2126         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2127
2128     if (c->version >= 12)  {
2129         /* Since 0.9.8 the user can ask for a couple of additional flags */
2130
2131         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2132             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2133             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2134             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2135             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2136             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2137             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2138
2139             protocol_error(c);
2140             pa_proplist_free(p);
2141             return;
2142         }
2143     }
2144
2145     if (c->version >= 13) {
2146
2147         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2148             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2149             pa_tagstruct_get_proplist(t, p) < 0 ||
2150             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2151             protocol_error(c);
2152             pa_proplist_free(p);
2153             return;
2154         }
2155     }
2156
2157     if (c->version >= 14) {
2158
2159         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2160             protocol_error(c);
2161             pa_proplist_free(p);
2162             return;
2163         }
2164     }
2165
2166     if (c->version >= 15) {
2167
2168         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2169             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2170             protocol_error(c);
2171             pa_proplist_free(p);
2172             return;
2173         }
2174     }
2175
2176     if (!pa_tagstruct_eof(t)) {
2177         protocol_error(c);
2178         pa_proplist_free(p);
2179         return;
2180     }
2181
2182     if (source_index != PA_INVALID_INDEX) {
2183
2184         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2185             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2186             pa_proplist_free(p);
2187             return;
2188         }
2189
2190     } else if (source_name) {
2191
2192         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2193             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2194             pa_proplist_free(p);
2195             return;
2196         }
2197     }
2198
2199     if (direct_on_input_idx != PA_INVALID_INDEX) {
2200
2201         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2202             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2203             pa_proplist_free(p);
2204             return;
2205         }
2206     }
2207
2208     flags =
2209         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2210         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2211         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2212         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2213         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2214         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2215         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2216         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2217         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2218         (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0);
2219
2220     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2221     pa_proplist_free(p);
2222
2223     CHECK_VALIDITY(c->pstream, s, tag, ret);
2224
2225     reply = reply_new(tag);
2226     pa_tagstruct_putu32(reply, s->index);
2227     pa_assert(s->source_output);
2228     pa_tagstruct_putu32(reply, s->source_output->index);
2229
2230     if (c->version >= 9) {
2231         /* Since 0.9 we support sending the buffer metrics back to the client */
2232
2233         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2234         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2235     }
2236
2237     if (c->version >= 12) {
2238         /* Since 0.9.8 we support sending the chosen sample
2239          * spec/channel map/device/suspend status back to the
2240          * client */
2241
2242         pa_tagstruct_put_sample_spec(reply, &ss);
2243         pa_tagstruct_put_channel_map(reply, &map);
2244
2245         pa_tagstruct_putu32(reply, s->source_output->source->index);
2246         pa_tagstruct_puts(reply, s->source_output->source->name);
2247
2248         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2249     }
2250
2251     if (c->version >= 13)
2252         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2253
2254     pa_pstream_send_tagstruct(c->pstream, reply);
2255 }
2256
2257 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2258     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2259     int ret;
2260
2261     pa_native_connection_assert_ref(c);
2262     pa_assert(t);
2263
2264     if (!pa_tagstruct_eof(t)) {
2265         protocol_error(c);
2266         return;
2267     }
2268
2269     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2270     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2271     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2272
2273     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2274 }
2275
2276 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2277     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2278     const void*cookie;
2279     pa_tagstruct *reply;
2280     pa_bool_t shm_on_remote = FALSE, do_shm;
2281
2282     pa_native_connection_assert_ref(c);
2283     pa_assert(t);
2284
2285     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2286         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2287         !pa_tagstruct_eof(t)) {
2288         protocol_error(c);
2289         return;
2290     }
2291
2292     /* Minimum supported version */
2293     if (c->version < 8) {
2294         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2295         return;
2296     }
2297
2298     /* Starting with protocol version 13 the MSB of the version tag
2299        reflects if shm is available for this pa_native_connection or
2300        not. */
2301     if (c->version >= 13) {
2302         shm_on_remote = !!(c->version & 0x80000000U);
2303         c->version &= 0x7FFFFFFFU;
2304     }
2305
2306     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2307
2308     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2309
2310     if (!c->authorized) {
2311         pa_bool_t success = FALSE;
2312
2313 #ifdef HAVE_CREDS
2314         const pa_creds *creds;
2315
2316         if ((creds = pa_pdispatch_creds(pd))) {
2317             if (creds->uid == getuid())
2318                 success = TRUE;
2319             else if (c->options->auth_group) {
2320                 int r;
2321                 gid_t gid;
2322
2323                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2324                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2325                 else if (gid == creds->gid)
2326                     success = TRUE;
2327
2328                 if (!success) {
2329                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2330                         pa_log_warn("Failed to check group membership.");
2331                     else if (r > 0)
2332                         success = TRUE;
2333                 }
2334             }
2335
2336             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2337                         (unsigned long) creds->uid,
2338                         (unsigned long) creds->gid,
2339                         (int) success);
2340         }
2341 #endif
2342
2343         if (!success && c->options->auth_cookie) {
2344             const uint8_t *ac;
2345
2346             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2347                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2348                     success = TRUE;
2349         }
2350
2351         if (!success) {
2352             pa_log_warn("Denied access to client with invalid authorization data.");
2353             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2354             return;
2355         }
2356
2357         c->authorized = TRUE;
2358         if (c->auth_timeout_event) {
2359             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2360             c->auth_timeout_event = NULL;
2361         }
2362     }
2363
2364     /* Enable shared memory support if possible */
2365     do_shm =
2366         pa_mempool_is_shared(c->protocol->core->mempool) &&
2367         c->is_local;
2368
2369     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2370
2371     if (do_shm)
2372         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2373             do_shm = FALSE;
2374
2375 #ifdef HAVE_CREDS
2376     if (do_shm) {
2377         /* Only enable SHM if both sides are owned by the same
2378          * user. This is a security measure because otherwise data
2379          * private to the user might leak. */
2380
2381         const pa_creds *creds;
2382         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2383             do_shm = FALSE;
2384     }
2385 #endif
2386
2387     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2388     pa_pstream_enable_shm(c->pstream, do_shm);
2389
2390     reply = reply_new(tag);
2391     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2392
2393 #ifdef HAVE_CREDS
2394 {
2395     /* SHM support is only enabled after both sides made sure they are the same user. */
2396
2397     pa_creds ucred;
2398
2399     ucred.uid = getuid();
2400     ucred.gid = getgid();
2401
2402     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2403 }
2404 #else
2405     pa_pstream_send_tagstruct(c->pstream, reply);
2406 #endif
2407 }
2408
2409 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2410     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2411     const char *name = NULL;
2412     pa_proplist *p;
2413     pa_tagstruct *reply;
2414
2415     pa_native_connection_assert_ref(c);
2416     pa_assert(t);
2417
2418     p = pa_proplist_new();
2419
2420     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2421         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2422         !pa_tagstruct_eof(t)) {
2423
2424         protocol_error(c);
2425         pa_proplist_free(p);
2426         return;
2427     }
2428
2429     if (name)
2430         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2431             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2432             pa_proplist_free(p);
2433             return;
2434         }
2435
2436     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2437     pa_proplist_free(p);
2438
2439     reply = reply_new(tag);
2440
2441     if (c->version >= 13)
2442         pa_tagstruct_putu32(reply, c->client->index);
2443
2444     pa_pstream_send_tagstruct(c->pstream, reply);
2445 }
2446
2447 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2448     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2449     const char *name;
2450     uint32_t idx = PA_IDXSET_INVALID;
2451
2452     pa_native_connection_assert_ref(c);
2453     pa_assert(t);
2454
2455     if (pa_tagstruct_gets(t, &name) < 0 ||
2456         !pa_tagstruct_eof(t)) {
2457         protocol_error(c);
2458         return;
2459     }
2460
2461     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2462     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2463
2464     if (command == PA_COMMAND_LOOKUP_SINK) {
2465         pa_sink *sink;
2466         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2467             idx = sink->index;
2468     } else {
2469         pa_source *source;
2470         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2471         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2472             idx = source->index;
2473     }
2474
2475     if (idx == PA_IDXSET_INVALID)
2476         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2477     else {
2478         pa_tagstruct *reply;
2479         reply = reply_new(tag);
2480         pa_tagstruct_putu32(reply, idx);
2481         pa_pstream_send_tagstruct(c->pstream, reply);
2482     }
2483 }
2484
2485 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2486     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2487     uint32_t idx;
2488     playback_stream *s;
2489
2490     pa_native_connection_assert_ref(c);
2491     pa_assert(t);
2492
2493     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2494         !pa_tagstruct_eof(t)) {
2495         protocol_error(c);
2496         return;
2497     }
2498
2499     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2500     s = pa_idxset_get_by_index(c->output_streams, idx);
2501     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2502     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2503
2504     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2505 }
2506
2507 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2508     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2509     pa_tagstruct *reply;
2510     const pa_mempool_stat *stat;
2511
2512     pa_native_connection_assert_ref(c);
2513     pa_assert(t);
2514
2515     if (!pa_tagstruct_eof(t)) {
2516         protocol_error(c);
2517         return;
2518     }
2519
2520     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2521
2522     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2523
2524     reply = reply_new(tag);
2525     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2526     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2527     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2528     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2529     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2530     pa_pstream_send_tagstruct(c->pstream, reply);
2531 }
2532
2533 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2534     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2535     pa_tagstruct *reply;
2536     playback_stream *s;
2537     struct timeval tv, now;
2538     uint32_t idx;
2539
2540     pa_native_connection_assert_ref(c);
2541     pa_assert(t);
2542
2543     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2544         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2545         !pa_tagstruct_eof(t)) {
2546         protocol_error(c);
2547         return;
2548     }
2549
2550     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2551     s = pa_idxset_get_by_index(c->output_streams, idx);
2552     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2553     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2554
2555     /* Get an atomic snapshot of all timing parameters */
2556     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2557
2558     reply = reply_new(tag);
2559     pa_tagstruct_put_usec(reply,
2560                           s->current_sink_latency +
2561                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2562     pa_tagstruct_put_usec(reply, 0);
2563     pa_tagstruct_put_boolean(reply,
2564                              s->playing_for > 0 &&
2565                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2566                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2567     pa_tagstruct_put_timeval(reply, &tv);
2568     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2569     pa_tagstruct_puts64(reply, s->write_index);
2570     pa_tagstruct_puts64(reply, s->read_index);
2571
2572     if (c->version >= 13) {
2573         pa_tagstruct_putu64(reply, s->underrun_for);
2574         pa_tagstruct_putu64(reply, s->playing_for);
2575     }
2576
2577     pa_pstream_send_tagstruct(c->pstream, reply);
2578 }
2579
2580 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2581     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2582     pa_tagstruct *reply;
2583     record_stream *s;
2584     struct timeval tv, now;
2585     uint32_t idx;
2586
2587     pa_native_connection_assert_ref(c);
2588     pa_assert(t);
2589
2590     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2591         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2592         !pa_tagstruct_eof(t)) {
2593         protocol_error(c);
2594         return;
2595     }
2596
2597     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2598     s = pa_idxset_get_by_index(c->record_streams, idx);
2599     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2600
2601     /* Get an atomic snapshot of all timing parameters */
2602     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2603
2604     reply = reply_new(tag);
2605     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2606     pa_tagstruct_put_usec(reply,
2607                           s->current_source_latency +
2608                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2609     pa_tagstruct_put_boolean(reply,
2610                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2611                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2612     pa_tagstruct_put_timeval(reply, &tv);
2613     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2614     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2615     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2616     pa_pstream_send_tagstruct(c->pstream, reply);
2617 }
2618
2619 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2620     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2621     upload_stream *s;
2622     uint32_t length;
2623     const char *name = NULL;
2624     pa_sample_spec ss;
2625     pa_channel_map map;
2626     pa_tagstruct *reply;
2627     pa_proplist *p;
2628
2629     pa_native_connection_assert_ref(c);
2630     pa_assert(t);
2631
2632     if (pa_tagstruct_gets(t, &name) < 0 ||
2633         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2634         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2635         pa_tagstruct_getu32(t, &length) < 0) {
2636         protocol_error(c);
2637         return;
2638     }
2639
2640     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2641     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2642     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2643     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2644     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2645     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2646
2647     p = pa_proplist_new();
2648
2649     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2650         !pa_tagstruct_eof(t)) {
2651
2652         protocol_error(c);
2653         pa_proplist_free(p);
2654         return;
2655     }
2656
2657     if (c->version < 13)
2658         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2659     else if (!name)
2660         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2661             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2662
2663     if (!name || !pa_namereg_is_valid_name(name)) {
2664         pa_proplist_free(p);
2665         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2666     }
2667
2668     s = upload_stream_new(c, &ss, &map, name, length, p);
2669     pa_proplist_free(p);
2670
2671     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2672
2673     reply = reply_new(tag);
2674     pa_tagstruct_putu32(reply, s->index);
2675     pa_tagstruct_putu32(reply, length);
2676     pa_pstream_send_tagstruct(c->pstream, reply);
2677 }
2678
2679 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2680     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2681     uint32_t channel;
2682     upload_stream *s;
2683     uint32_t idx;
2684
2685     pa_native_connection_assert_ref(c);
2686     pa_assert(t);
2687
2688     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2689         !pa_tagstruct_eof(t)) {
2690         protocol_error(c);
2691         return;
2692     }
2693
2694     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2695
2696     s = pa_idxset_get_by_index(c->output_streams, channel);
2697     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2698     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2699
2700     if (!s->memchunk.memblock)
2701         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2702     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2703         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2704     else
2705         pa_pstream_send_simple_ack(c->pstream, tag);
2706
2707     upload_stream_unlink(s);
2708 }
2709
2710 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2711     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2712     uint32_t sink_index;
2713     pa_volume_t volume;
2714     pa_sink *sink;
2715     const char *name, *sink_name;
2716     uint32_t idx;
2717     pa_proplist *p;
2718     pa_tagstruct *reply;
2719
2720     pa_native_connection_assert_ref(c);
2721     pa_assert(t);
2722
2723     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2724
2725     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2726         pa_tagstruct_gets(t, &sink_name) < 0 ||
2727         pa_tagstruct_getu32(t, &volume) < 0 ||
2728         pa_tagstruct_gets(t, &name) < 0) {
2729         protocol_error(c);
2730         return;
2731     }
2732
2733     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2734     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2735     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2736     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2737
2738     if (sink_index != PA_INVALID_INDEX)
2739         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2740     else
2741         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2742
2743     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2744
2745     p = pa_proplist_new();
2746
2747     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2748         !pa_tagstruct_eof(t)) {
2749         protocol_error(c);
2750         pa_proplist_free(p);
2751         return;
2752     }
2753
2754     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2755
2756     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2757         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2758         pa_proplist_free(p);
2759         return;
2760     }
2761
2762     pa_proplist_free(p);
2763
2764     reply = reply_new(tag);
2765
2766     if (c->version >= 13)
2767         pa_tagstruct_putu32(reply, idx);
2768
2769     pa_pstream_send_tagstruct(c->pstream, reply);
2770 }
2771
2772 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2773     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2774     const char *name;
2775
2776     pa_native_connection_assert_ref(c);
2777     pa_assert(t);
2778
2779     if (pa_tagstruct_gets(t, &name) < 0 ||
2780         !pa_tagstruct_eof(t)) {
2781         protocol_error(c);
2782         return;
2783     }
2784
2785     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2786     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2787
2788     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2789         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2790         return;
2791     }
2792
2793     pa_pstream_send_simple_ack(c->pstream, tag);
2794 }
2795
2796 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2797     pa_assert(c);
2798     pa_assert(fixed);
2799     pa_assert(original);
2800
2801     *fixed = *original;
2802
2803     if (c->version < 12) {
2804         /* Before protocol version 12 we didn't support S32 samples,
2805          * so we need to lie about this to the client */
2806
2807         if (fixed->format == PA_SAMPLE_S32LE)
2808             fixed->format = PA_SAMPLE_FLOAT32LE;
2809         if (fixed->format == PA_SAMPLE_S32BE)
2810             fixed->format = PA_SAMPLE_FLOAT32BE;
2811     }
2812
2813     if (c->version < 15) {
2814         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2815             fixed->format = PA_SAMPLE_FLOAT32LE;
2816         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2817             fixed->format = PA_SAMPLE_FLOAT32BE;
2818     }
2819 }
2820
2821 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2822     pa_sample_spec fixed_ss;
2823
2824     pa_assert(t);
2825     pa_sink_assert_ref(sink);
2826
2827     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2828
2829     pa_tagstruct_put(
2830         t,
2831         PA_TAG_U32, sink->index,
2832         PA_TAG_STRING, sink->name,
2833         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2834         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2835         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2836         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2837         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE, FALSE),
2838         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2839         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2840         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2841         PA_TAG_USEC, pa_sink_get_latency(sink),
2842         PA_TAG_STRING, sink->driver,
2843         PA_TAG_U32, sink->flags,
2844         PA_TAG_INVALID);
2845
2846     if (c->version >= 13) {
2847         pa_tagstruct_put_proplist(t, sink->proplist);
2848         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2849     }
2850
2851     if (c->version >= 15) {
2852         pa_tagstruct_put_volume(t, sink->base_volume);
2853         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2854             pa_log_error("Internal sink state is invalid.");
2855         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2856         pa_tagstruct_putu32(t, sink->n_volume_steps);
2857         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2858     }
2859
2860     if (c->version >= 16) {
2861         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
2862
2863         if (sink->ports) {
2864             void *state;
2865             pa_device_port *p;
2866
2867             PA_HASHMAP_FOREACH(p, sink->ports, state) {
2868                 pa_tagstruct_puts(t, p->name);
2869                 pa_tagstruct_puts(t, p->description);
2870                 pa_tagstruct_putu32(t, p->priority);
2871             }
2872         }
2873
2874         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
2875     }
2876 }
2877
2878 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2879     pa_sample_spec fixed_ss;
2880
2881     pa_assert(t);
2882     pa_source_assert_ref(source);
2883
2884     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2885
2886     pa_tagstruct_put(
2887         t,
2888         PA_TAG_U32, source->index,
2889         PA_TAG_STRING, source->name,
2890         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2891         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2892         PA_TAG_CHANNEL_MAP, &source->channel_map,
2893         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2894         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2895         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2896         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2897         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2898         PA_TAG_USEC, pa_source_get_latency(source),
2899         PA_TAG_STRING, source->driver,
2900         PA_TAG_U32, source->flags,
2901         PA_TAG_INVALID);
2902
2903     if (c->version >= 13) {
2904         pa_tagstruct_put_proplist(t, source->proplist);
2905         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2906     }
2907
2908     if (c->version >= 15) {
2909         pa_tagstruct_put_volume(t, source->base_volume);
2910         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2911             pa_log_error("Internal source state is invalid.");
2912         pa_tagstruct_putu32(t, pa_source_get_state(source));
2913         pa_tagstruct_putu32(t, source->n_volume_steps);
2914         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2915     }
2916
2917     if (c->version >= 16) {
2918
2919         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
2920
2921         if (source->ports) {
2922             void *state;
2923             pa_device_port *p;
2924
2925             PA_HASHMAP_FOREACH(p, source->ports, state) {
2926                 pa_tagstruct_puts(t, p->name);
2927                 pa_tagstruct_puts(t, p->description);
2928                 pa_tagstruct_putu32(t, p->priority);
2929             }
2930         }
2931
2932         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
2933     }
2934 }
2935
2936 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2937     pa_assert(t);
2938     pa_assert(client);
2939
2940     pa_tagstruct_putu32(t, client->index);
2941     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2942     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2943     pa_tagstruct_puts(t, client->driver);
2944
2945     if (c->version >= 13)
2946         pa_tagstruct_put_proplist(t, client->proplist);
2947 }
2948
2949 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2950     void *state = NULL;
2951     pa_card_profile *p;
2952
2953     pa_assert(t);
2954     pa_assert(card);
2955
2956     pa_tagstruct_putu32(t, card->index);
2957     pa_tagstruct_puts(t, card->name);
2958     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2959     pa_tagstruct_puts(t, card->driver);
2960
2961     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2962
2963     if (card->profiles) {
2964         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2965             pa_tagstruct_puts(t, p->name);
2966             pa_tagstruct_puts(t, p->description);
2967             pa_tagstruct_putu32(t, p->n_sinks);
2968             pa_tagstruct_putu32(t, p->n_sources);
2969             pa_tagstruct_putu32(t, p->priority);
2970         }
2971     }
2972
2973     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2974     pa_tagstruct_put_proplist(t, card->proplist);
2975 }
2976
2977 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2978     pa_assert(t);
2979     pa_assert(module);
2980
2981     pa_tagstruct_putu32(t, module->index);
2982     pa_tagstruct_puts(t, module->name);
2983     pa_tagstruct_puts(t, module->argument);
2984     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2985
2986     if (c->version < 15)
2987         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2988
2989     if (c->version >= 15)
2990         pa_tagstruct_put_proplist(t, module->proplist);
2991 }
2992
2993 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2994     pa_sample_spec fixed_ss;
2995     pa_usec_t sink_latency;
2996     pa_cvolume v;
2997
2998     pa_assert(t);
2999     pa_sink_input_assert_ref(s);
3000
3001     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3002
3003     pa_tagstruct_putu32(t, s->index);
3004     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3005     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3006     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3007     pa_tagstruct_putu32(t, s->sink->index);
3008     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3009     pa_tagstruct_put_channel_map(t, &s->channel_map);
3010     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s, &v, TRUE));
3011     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3012     pa_tagstruct_put_usec(t, sink_latency);
3013     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3014     pa_tagstruct_puts(t, s->driver);
3015     if (c->version >= 11)
3016         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3017     if (c->version >= 13)
3018         pa_tagstruct_put_proplist(t, s->proplist);
3019 }
3020
3021 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3022     pa_sample_spec fixed_ss;
3023     pa_usec_t source_latency;
3024
3025     pa_assert(t);
3026     pa_source_output_assert_ref(s);
3027
3028     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3029
3030     pa_tagstruct_putu32(t, s->index);
3031     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3032     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3033     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3034     pa_tagstruct_putu32(t, s->source->index);
3035     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3036     pa_tagstruct_put_channel_map(t, &s->channel_map);
3037     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3038     pa_tagstruct_put_usec(t, source_latency);
3039     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3040     pa_tagstruct_puts(t, s->driver);
3041
3042     if (c->version >= 13)
3043         pa_tagstruct_put_proplist(t, s->proplist);
3044 }
3045
3046 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3047     pa_sample_spec fixed_ss;
3048     pa_cvolume v;
3049
3050     pa_assert(t);
3051     pa_assert(e);
3052
3053     if (e->memchunk.memblock)
3054         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3055     else
3056         memset(&fixed_ss, 0, sizeof(fixed_ss));
3057
3058     pa_tagstruct_putu32(t, e->index);
3059     pa_tagstruct_puts(t, e->name);
3060
3061     if (e->volume_is_set)
3062         v = e->volume;
3063     else
3064         pa_cvolume_init(&v);
3065
3066     pa_tagstruct_put_cvolume(t, &v);
3067     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3068     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3069     pa_tagstruct_put_channel_map(t, &e->channel_map);
3070     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3071     pa_tagstruct_put_boolean(t, e->lazy);
3072     pa_tagstruct_puts(t, e->filename);
3073
3074     if (c->version >= 13)
3075         pa_tagstruct_put_proplist(t, e->proplist);
3076 }
3077
3078 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3079     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3080     uint32_t idx;
3081     pa_sink *sink = NULL;
3082     pa_source *source = NULL;
3083     pa_client *client = NULL;
3084     pa_card *card = NULL;
3085     pa_module *module = NULL;
3086     pa_sink_input *si = NULL;
3087     pa_source_output *so = NULL;
3088     pa_scache_entry *sce = NULL;
3089     const char *name = NULL;
3090     pa_tagstruct *reply;
3091
3092     pa_native_connection_assert_ref(c);
3093     pa_assert(t);
3094
3095     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3096         (command != PA_COMMAND_GET_CLIENT_INFO &&
3097          command != PA_COMMAND_GET_MODULE_INFO &&
3098          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3099          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3100          pa_tagstruct_gets(t, &name) < 0) ||
3101         !pa_tagstruct_eof(t)) {
3102         protocol_error(c);
3103         return;
3104     }
3105
3106     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3107     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3108     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3109     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3110     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3111
3112     if (command == PA_COMMAND_GET_SINK_INFO) {
3113         if (idx != PA_INVALID_INDEX)
3114             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3115         else
3116             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3117     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3118         if (idx != PA_INVALID_INDEX)
3119             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3120         else
3121             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3122     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3123         if (idx != PA_INVALID_INDEX)
3124             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3125         else
3126             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3127     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3128         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3129     else if (command == PA_COMMAND_GET_MODULE_INFO)
3130         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3131     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3132         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3133     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3134         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3135     else {
3136         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3137         if (idx != PA_INVALID_INDEX)
3138             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3139         else
3140             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3141     }
3142
3143     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3144         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3145         return;
3146     }
3147
3148     reply = reply_new(tag);
3149     if (sink)
3150         sink_fill_tagstruct(c, reply, sink);
3151     else if (source)
3152         source_fill_tagstruct(c, reply, source);
3153     else if (client)
3154         client_fill_tagstruct(c, reply, client);
3155     else if (card)
3156         card_fill_tagstruct(c, reply, card);
3157     else if (module)
3158         module_fill_tagstruct(c, reply, module);
3159     else if (si)
3160         sink_input_fill_tagstruct(c, reply, si);
3161     else if (so)
3162         source_output_fill_tagstruct(c, reply, so);
3163     else
3164         scache_fill_tagstruct(c, reply, sce);
3165     pa_pstream_send_tagstruct(c->pstream, reply);
3166 }
3167
3168 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3169     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3170     pa_idxset *i;
3171     uint32_t idx;
3172     void *p;
3173     pa_tagstruct *reply;
3174
3175     pa_native_connection_assert_ref(c);
3176     pa_assert(t);
3177
3178     if (!pa_tagstruct_eof(t)) {
3179         protocol_error(c);
3180         return;
3181     }
3182
3183     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3184
3185     reply = reply_new(tag);
3186
3187     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3188         i = c->protocol->core->sinks;
3189     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3190         i = c->protocol->core->sources;
3191     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3192         i = c->protocol->core->clients;
3193     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3194         i = c->protocol->core->cards;
3195     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3196         i = c->protocol->core->modules;
3197     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3198         i = c->protocol->core->sink_inputs;
3199     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3200         i = c->protocol->core->source_outputs;
3201     else {
3202         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3203         i = c->protocol->core->scache;
3204     }
3205
3206     if (i) {
3207         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3208             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3209                 sink_fill_tagstruct(c, reply, p);
3210             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3211                 source_fill_tagstruct(c, reply, p);
3212             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3213                 client_fill_tagstruct(c, reply, p);
3214             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3215                 card_fill_tagstruct(c, reply, p);
3216             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3217                 module_fill_tagstruct(c, reply, p);
3218             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3219                 sink_input_fill_tagstruct(c, reply, p);
3220             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3221                 source_output_fill_tagstruct(c, reply, p);
3222             else {
3223                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3224                 scache_fill_tagstruct(c, reply, p);
3225             }
3226         }
3227     }
3228
3229     pa_pstream_send_tagstruct(c->pstream, reply);
3230 }
3231
3232 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3233     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3234     pa_tagstruct *reply;
3235     pa_sink *def_sink;
3236     pa_source *def_source;
3237     pa_sample_spec fixed_ss;
3238     char *h, *u;
3239
3240     pa_native_connection_assert_ref(c);
3241     pa_assert(t);
3242
3243     if (!pa_tagstruct_eof(t)) {
3244         protocol_error(c);
3245         return;
3246     }
3247
3248     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3249
3250     reply = reply_new(tag);
3251     pa_tagstruct_puts(reply, PACKAGE_NAME);
3252     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3253
3254     u = pa_get_user_name_malloc();
3255     pa_tagstruct_puts(reply, u);
3256     pa_xfree(u);
3257
3258     h = pa_get_host_name_malloc();
3259     pa_tagstruct_puts(reply, h);
3260     pa_xfree(h);
3261
3262     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3263     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3264
3265     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3266     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3267     def_source = pa_namereg_get_default_source(c->protocol->core);
3268     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3269
3270     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3271
3272     if (c->version >= 15)
3273         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3274
3275     pa_pstream_send_tagstruct(c->pstream, reply);
3276 }
3277
3278 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3279     pa_tagstruct *t;
3280     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3281
3282     pa_native_connection_assert_ref(c);
3283
3284     t = pa_tagstruct_new(NULL, 0);
3285     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3286     pa_tagstruct_putu32(t, (uint32_t) -1);
3287     pa_tagstruct_putu32(t, e);
3288     pa_tagstruct_putu32(t, idx);
3289     pa_pstream_send_tagstruct(c->pstream, t);
3290 }
3291
3292 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3293     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3294     pa_subscription_mask_t m;
3295
3296     pa_native_connection_assert_ref(c);
3297     pa_assert(t);
3298
3299     if (pa_tagstruct_getu32(t, &m) < 0 ||
3300         !pa_tagstruct_eof(t)) {
3301         protocol_error(c);
3302         return;
3303     }
3304
3305     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3306     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3307
3308     if (c->subscription)
3309         pa_subscription_free(c->subscription);
3310
3311     if (m != 0) {
3312         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3313         pa_assert(c->subscription);
3314     } else
3315         c->subscription = NULL;
3316
3317     pa_pstream_send_simple_ack(c->pstream, tag);
3318 }
3319
3320 static void command_set_volume(
3321         pa_pdispatch *pd,
3322         uint32_t command,
3323         uint32_t tag,
3324         pa_tagstruct *t,
3325         void *userdata) {
3326
3327     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3328     uint32_t idx;
3329     pa_cvolume volume;
3330     pa_sink *sink = NULL;
3331     pa_source *source = NULL;
3332     pa_sink_input *si = NULL;
3333     const char *name = NULL;
3334     const char *client_name;
3335
3336     pa_native_connection_assert_ref(c);
3337     pa_assert(t);
3338
3339     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3340         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3341         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3342         pa_tagstruct_get_cvolume(t, &volume) ||
3343         !pa_tagstruct_eof(t)) {
3344         protocol_error(c);
3345         return;
3346     }
3347
3348     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3349     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3350     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3351     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3352     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3353     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3354
3355     switch (command) {
3356
3357         case PA_COMMAND_SET_SINK_VOLUME:
3358             if (idx != PA_INVALID_INDEX)
3359                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3360             else
3361                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3362             break;
3363
3364         case PA_COMMAND_SET_SOURCE_VOLUME:
3365             if (idx != PA_INVALID_INDEX)
3366                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3367             else
3368                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3369             break;
3370
3371         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3372             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3373             break;
3374
3375         default:
3376             pa_assert_not_reached();
3377     }
3378
3379     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3380
3381     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3382
3383     if (sink) {
3384         pa_log("Client %s changes volume of sink %s.", client_name, sink->name);
3385         pa_sink_set_volume(sink, &volume, TRUE, TRUE, TRUE, TRUE);
3386     } else if (source) {
3387         pa_log("Client %s changes volume of sink %s.", client_name, source->name);
3388         pa_source_set_volume(source, &volume, TRUE);
3389     } else if (si) {
3390         pa_log("Client %s changes volume of sink %s.",
3391                      client_name,
3392                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3393         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3394     }
3395
3396     pa_pstream_send_simple_ack(c->pstream, tag);
3397 }
3398
3399 static void command_set_mute(
3400         pa_pdispatch *pd,
3401         uint32_t command,
3402         uint32_t tag,
3403         pa_tagstruct *t,
3404         void *userdata) {
3405
3406     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3407     uint32_t idx;
3408     pa_bool_t mute;
3409     pa_sink *sink = NULL;
3410     pa_source *source = NULL;
3411     pa_sink_input *si = NULL;
3412     const char *name = NULL;
3413
3414     pa_native_connection_assert_ref(c);
3415     pa_assert(t);
3416
3417     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3418         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3419         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3420         pa_tagstruct_get_boolean(t, &mute) ||
3421         !pa_tagstruct_eof(t)) {
3422         protocol_error(c);
3423         return;
3424     }
3425
3426     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3427     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3428     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3429     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3430     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3431
3432     switch (command) {
3433
3434         case PA_COMMAND_SET_SINK_MUTE:
3435
3436             if (idx != PA_INVALID_INDEX)
3437                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3438             else
3439                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3440
3441             break;
3442
3443         case PA_COMMAND_SET_SOURCE_MUTE:
3444             if (idx != PA_INVALID_INDEX)
3445                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3446             else
3447                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3448
3449             break;
3450
3451         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3452             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3453             break;
3454
3455         default:
3456             pa_assert_not_reached();
3457     }
3458
3459     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3460
3461     if (sink)
3462         pa_sink_set_mute(sink, mute, TRUE);
3463     else if (source)
3464         pa_source_set_mute(source, mute, TRUE);
3465     else if (si)
3466         pa_sink_input_set_mute(si, mute, TRUE);
3467
3468     pa_pstream_send_simple_ack(c->pstream, tag);
3469 }
3470
3471 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3472     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3473     uint32_t idx;
3474     pa_bool_t b;
3475     playback_stream *s;
3476
3477     pa_native_connection_assert_ref(c);
3478     pa_assert(t);
3479
3480     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3481         pa_tagstruct_get_boolean(t, &b) < 0 ||
3482         !pa_tagstruct_eof(t)) {
3483         protocol_error(c);
3484         return;
3485     }
3486
3487     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3488     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3489     s = pa_idxset_get_by_index(c->output_streams, idx);
3490     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3491     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3492
3493     pa_sink_input_cork(s->sink_input, b);
3494
3495     if (b)
3496         s->is_underrun = TRUE;
3497
3498     pa_pstream_send_simple_ack(c->pstream, tag);
3499 }
3500
3501 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3502     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3503     uint32_t idx;
3504     playback_stream *s;
3505
3506     pa_native_connection_assert_ref(c);
3507     pa_assert(t);
3508
3509     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3510         !pa_tagstruct_eof(t)) {
3511         protocol_error(c);
3512         return;
3513     }
3514
3515     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3516     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3517     s = pa_idxset_get_by_index(c->output_streams, idx);
3518     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3519     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3520
3521     switch (command) {
3522         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3523             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3524             break;
3525
3526         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3527             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3528             break;
3529
3530         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3531             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3532             break;
3533
3534         default:
3535             pa_assert_not_reached();
3536     }
3537
3538     pa_pstream_send_simple_ack(c->pstream, tag);
3539 }
3540
3541 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3542     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3543     uint32_t idx;
3544     record_stream *s;
3545     pa_bool_t b;
3546
3547     pa_native_connection_assert_ref(c);
3548     pa_assert(t);
3549
3550     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3551         pa_tagstruct_get_boolean(t, &b) < 0 ||
3552         !pa_tagstruct_eof(t)) {
3553         protocol_error(c);
3554         return;
3555     }
3556
3557     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3558     s = pa_idxset_get_by_index(c->record_streams, idx);
3559     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3560
3561     pa_source_output_cork(s->source_output, b);
3562     pa_memblockq_prebuf_force(s->memblockq);
3563     pa_pstream_send_simple_ack(c->pstream, tag);
3564 }
3565
3566 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3567     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3568     uint32_t idx;
3569     record_stream *s;
3570
3571     pa_native_connection_assert_ref(c);
3572     pa_assert(t);
3573
3574     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3575         !pa_tagstruct_eof(t)) {
3576         protocol_error(c);
3577         return;
3578     }
3579
3580     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3581     s = pa_idxset_get_by_index(c->record_streams, idx);
3582     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3583
3584     pa_memblockq_flush_read(s->memblockq);
3585     pa_pstream_send_simple_ack(c->pstream, tag);
3586 }
3587
3588 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3589     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3590     uint32_t idx;
3591     pa_buffer_attr a;
3592     pa_tagstruct *reply;
3593
3594     pa_native_connection_assert_ref(c);
3595     pa_assert(t);
3596
3597     memset(&a, 0, sizeof(a));
3598
3599     if (pa_tagstruct_getu32(t, &idx) < 0) {
3600         protocol_error(c);
3601         return;
3602     }
3603
3604     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3605
3606     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3607         playback_stream *s;
3608         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3609
3610         s = pa_idxset_get_by_index(c->output_streams, idx);
3611         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3612         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3613
3614         if (pa_tagstruct_get(
3615                     t,
3616                     PA_TAG_U32, &a.maxlength,
3617                     PA_TAG_U32, &a.tlength,
3618                     PA_TAG_U32, &a.prebuf,
3619                     PA_TAG_U32, &a.minreq,
3620                     PA_TAG_INVALID) < 0 ||
3621             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3622             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3623             !pa_tagstruct_eof(t)) {
3624             protocol_error(c);
3625             return;
3626         }
3627
3628         s->adjust_latency = adjust_latency;
3629         s->early_requests = early_requests;
3630         s->buffer_attr = a;
3631
3632         fix_playback_buffer_attr(s);
3633         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3634
3635         reply = reply_new(tag);
3636         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3637         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3638         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3639         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3640
3641         if (c->version >= 13)
3642             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3643
3644     } else {
3645         record_stream *s;
3646         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3647         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3648
3649         s = pa_idxset_get_by_index(c->record_streams, idx);
3650         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3651
3652         if (pa_tagstruct_get(
3653                     t,
3654                     PA_TAG_U32, &a.maxlength,
3655                     PA_TAG_U32, &a.fragsize,
3656                     PA_TAG_INVALID) < 0 ||
3657             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3658             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3659             !pa_tagstruct_eof(t)) {
3660             protocol_error(c);
3661             return;
3662         }
3663
3664         s->adjust_latency = adjust_latency;
3665         s->early_requests = early_requests;
3666         s->buffer_attr = a;
3667
3668         fix_record_buffer_attr_pre(s);
3669         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3670         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3671         fix_record_buffer_attr_post(s);
3672
3673         reply = reply_new(tag);
3674         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3675         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3676
3677         if (c->version >= 13)
3678             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3679     }
3680
3681     pa_pstream_send_tagstruct(c->pstream, reply);
3682 }
3683
3684 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3685     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3686     uint32_t idx;
3687     uint32_t rate;
3688
3689     pa_native_connection_assert_ref(c);
3690     pa_assert(t);
3691
3692     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3693         pa_tagstruct_getu32(t, &rate) < 0 ||
3694         !pa_tagstruct_eof(t)) {
3695         protocol_error(c);
3696         return;
3697     }
3698
3699     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3700     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3701
3702     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3703         playback_stream *s;
3704
3705         s = pa_idxset_get_by_index(c->output_streams, idx);
3706         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3707         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3708
3709         pa_sink_input_set_rate(s->sink_input, rate);
3710
3711     } else {
3712         record_stream *s;
3713         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3714
3715         s = pa_idxset_get_by_index(c->record_streams, idx);
3716         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3717
3718         pa_source_output_set_rate(s->source_output, rate);
3719     }
3720
3721     pa_pstream_send_simple_ack(c->pstream, tag);
3722 }
3723
3724 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3725     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3726     uint32_t idx;
3727     uint32_t mode;
3728     pa_proplist *p;
3729
3730     pa_native_connection_assert_ref(c);
3731     pa_assert(t);
3732
3733     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3734
3735     p = pa_proplist_new();
3736
3737     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3738
3739         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3740             pa_tagstruct_get_proplist(t, p) < 0 ||
3741             !pa_tagstruct_eof(t)) {
3742             protocol_error(c);
3743             pa_proplist_free(p);
3744             return;
3745         }
3746
3747     } else {
3748
3749         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3750             pa_tagstruct_getu32(t, &mode) < 0 ||
3751             pa_tagstruct_get_proplist(t, p) < 0 ||
3752             !pa_tagstruct_eof(t)) {
3753             protocol_error(c);
3754             pa_proplist_free(p);
3755             return;
3756         }
3757     }
3758
3759     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3760         pa_proplist_free(p);
3761         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3762     }
3763
3764     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3765         playback_stream *s;
3766
3767         s = pa_idxset_get_by_index(c->output_streams, idx);
3768         if (!s || !playback_stream_isinstance(s)) {
3769             pa_proplist_free(p);
3770             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3771         }
3772         pa_sink_input_update_proplist(s->sink_input, mode, p);
3773
3774     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3775         record_stream *s;
3776
3777         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3778             pa_proplist_free(p);
3779             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3780         }
3781         pa_source_output_update_proplist(s->source_output, mode, p);
3782
3783     } else {
3784         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3785
3786         pa_client_update_proplist(c->client, mode, p);
3787     }
3788
3789     pa_pstream_send_simple_ack(c->pstream, tag);
3790     pa_proplist_free(p);
3791 }
3792
3793 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3794     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3795     uint32_t idx;
3796     unsigned changed = 0;
3797     pa_proplist *p;
3798     pa_strlist *l = NULL;
3799
3800     pa_native_connection_assert_ref(c);
3801     pa_assert(t);
3802
3803     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3804
3805     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3806
3807         if (pa_tagstruct_getu32(t, &idx) < 0) {
3808             protocol_error(c);
3809             return;
3810         }
3811     }
3812
3813     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3814         playback_stream *s;
3815
3816         s = pa_idxset_get_by_index(c->output_streams, idx);
3817         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3818         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3819
3820         p = s->sink_input->proplist;
3821
3822     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3823         record_stream *s;
3824
3825         s = pa_idxset_get_by_index(c->record_streams, idx);
3826         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3827
3828         p = s->source_output->proplist;
3829     } else {
3830         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3831
3832         p = c->client->proplist;
3833     }
3834
3835     for (;;) {
3836         const char *k;
3837
3838         if (pa_tagstruct_gets(t, &k) < 0) {
3839             protocol_error(c);
3840             pa_strlist_free(l);
3841             return;
3842         }
3843
3844         if (!k)
3845             break;
3846
3847         l = pa_strlist_prepend(l, k);
3848     }
3849
3850     if (!pa_tagstruct_eof(t)) {
3851         protocol_error(c);
3852         pa_strlist_free(l);
3853         return;
3854     }
3855
3856     for (;;) {
3857         char *z;
3858
3859         l = pa_strlist_pop(l, &z);
3860
3861         if (!z)
3862             break;
3863
3864         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3865         pa_xfree(z);
3866     }
3867
3868     pa_pstream_send_simple_ack(c->pstream, tag);
3869
3870     if (changed) {
3871         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3872             playback_stream *s;
3873
3874             s = pa_idxset_get_by_index(c->output_streams, idx);
3875             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3876
3877         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3878             record_stream *s;
3879
3880             s = pa_idxset_get_by_index(c->record_streams, idx);
3881             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3882
3883         } else {
3884             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3885             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3886         }
3887     }
3888 }
3889
3890 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3891     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3892     const char *s;
3893
3894     pa_native_connection_assert_ref(c);
3895     pa_assert(t);
3896
3897     if (pa_tagstruct_gets(t, &s) < 0 ||
3898         !pa_tagstruct_eof(t)) {
3899         protocol_error(c);
3900         return;
3901     }
3902
3903     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3904     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3905
3906     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3907         pa_source *source;
3908
3909         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3910         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3911
3912         pa_namereg_set_default_source(c->protocol->core, source);
3913     } else {
3914         pa_sink *sink;
3915         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3916
3917         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3918         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3919
3920         pa_namereg_set_default_sink(c->protocol->core, sink);
3921     }
3922
3923     pa_pstream_send_simple_ack(c->pstream, tag);
3924 }
3925
3926 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3927     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3928     uint32_t idx;
3929     const char *name;
3930
3931     pa_native_connection_assert_ref(c);
3932     pa_assert(t);
3933
3934     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3935         pa_tagstruct_gets(t, &name) < 0 ||
3936         !pa_tagstruct_eof(t)) {
3937         protocol_error(c);
3938         return;
3939     }
3940
3941     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3942     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3943
3944     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3945         playback_stream *s;
3946
3947         s = pa_idxset_get_by_index(c->output_streams, idx);
3948         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3949         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3950
3951         pa_sink_input_set_name(s->sink_input, name);
3952
3953     } else {
3954         record_stream *s;
3955         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3956
3957         s = pa_idxset_get_by_index(c->record_streams, idx);
3958         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3959
3960         pa_source_output_set_name(s->source_output, name);
3961     }
3962
3963     pa_pstream_send_simple_ack(c->pstream, tag);
3964 }
3965
3966 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3967     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3968     uint32_t idx;
3969
3970     pa_native_connection_assert_ref(c);
3971     pa_assert(t);
3972
3973     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3974         !pa_tagstruct_eof(t)) {
3975         protocol_error(c);
3976         return;
3977     }
3978
3979     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3980
3981     if (command == PA_COMMAND_KILL_CLIENT) {
3982         pa_client *client;
3983
3984         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3985         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3986
3987         pa_native_connection_ref(c);
3988         pa_client_kill(client);
3989
3990     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3991         pa_sink_input *s;
3992
3993         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3994         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3995
3996         pa_native_connection_ref(c);
3997         pa_sink_input_kill(s);
3998     } else {
3999         pa_source_output *s;
4000
4001         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4002
4003         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4004         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4005
4006         pa_native_connection_ref(c);
4007         pa_source_output_kill(s);
4008     }
4009
4010     pa_pstream_send_simple_ack(c->pstream, tag);
4011     pa_native_connection_unref(c);
4012 }
4013
4014 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4015     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4016     pa_module *m;
4017     const char *name, *argument;
4018     pa_tagstruct *reply;
4019
4020     pa_native_connection_assert_ref(c);
4021     pa_assert(t);
4022
4023     if (pa_tagstruct_gets(t, &name) < 0 ||
4024         pa_tagstruct_gets(t, &argument) < 0 ||
4025         !pa_tagstruct_eof(t)) {
4026         protocol_error(c);
4027         return;
4028     }
4029
4030     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4031     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4032     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4033
4034     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4035         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4036         return;
4037     }
4038
4039     reply = reply_new(tag);
4040     pa_tagstruct_putu32(reply, m->index);
4041     pa_pstream_send_tagstruct(c->pstream, reply);
4042 }
4043
4044 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4045     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4046     uint32_t idx;
4047     pa_module *m;
4048
4049     pa_native_connection_assert_ref(c);
4050     pa_assert(t);
4051
4052     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4053         !pa_tagstruct_eof(t)) {
4054         protocol_error(c);
4055         return;
4056     }
4057
4058     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4059     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4060     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4061
4062     pa_module_unload_request(m, FALSE);
4063     pa_pstream_send_simple_ack(c->pstream, tag);
4064 }
4065
4066 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4067     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4068     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4069     const char *name_device = NULL;
4070
4071     pa_native_connection_assert_ref(c);
4072     pa_assert(t);
4073
4074     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4075         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4076         pa_tagstruct_gets(t, &name_device) < 0 ||
4077         !pa_tagstruct_eof(t)) {
4078         protocol_error(c);
4079         return;
4080     }
4081
4082     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4083     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4084
4085     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4086     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4087     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4088     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4089
4090     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4091         pa_sink_input *si = NULL;
4092         pa_sink *sink = NULL;
4093
4094         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4095
4096         if (idx_device != PA_INVALID_INDEX)
4097             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4098         else
4099             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4100
4101         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4102
4103         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4104             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4105             return;
4106         }
4107     } else {
4108         pa_source_output *so = NULL;
4109         pa_source *source;
4110
4111         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4112
4113         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4114
4115         if (idx_device != PA_INVALID_INDEX)
4116             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4117         else
4118             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4119
4120         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4121
4122         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4123             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4124             return;
4125         }
4126     }
4127
4128     pa_pstream_send_simple_ack(c->pstream, tag);
4129 }
4130
4131 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4132     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4133     uint32_t idx = PA_INVALID_INDEX;
4134     const char *name = NULL;
4135     pa_bool_t b;
4136
4137     pa_native_connection_assert_ref(c);
4138     pa_assert(t);
4139
4140     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4141         pa_tagstruct_gets(t, &name) < 0 ||
4142         pa_tagstruct_get_boolean(t, &b) < 0 ||
4143         !pa_tagstruct_eof(t)) {
4144         protocol_error(c);
4145         return;
4146     }
4147
4148     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4149     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4150     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4151     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4152     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4153
4154     if (command == PA_COMMAND_SUSPEND_SINK) {
4155
4156         if (idx == PA_INVALID_INDEX && name && !*name) {
4157
4158             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4159
4160             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4161                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4162                 return;
4163             }
4164         } else {
4165             pa_sink *sink = NULL;
4166
4167             if (idx != PA_INVALID_INDEX)
4168                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4169             else
4170                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4171
4172             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4173
4174             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4175                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4176                 return;
4177             }
4178         }
4179     } else {
4180
4181         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4182
4183         if (idx == PA_INVALID_INDEX && name && !*name) {
4184
4185             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4186
4187             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4188                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4189                 return;
4190             }
4191
4192         } else {
4193             pa_source *source;
4194
4195             if (idx != PA_INVALID_INDEX)
4196                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4197             else
4198                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4199
4200             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4201
4202             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4203                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4204                 return;
4205             }
4206         }
4207     }
4208
4209     pa_pstream_send_simple_ack(c->pstream, tag);
4210 }
4211
4212 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4213     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4214     uint32_t idx = PA_INVALID_INDEX;
4215     const char *name = NULL;
4216     pa_module *m;
4217     pa_native_protocol_ext_cb_t cb;
4218
4219     pa_native_connection_assert_ref(c);
4220     pa_assert(t);
4221
4222     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4223         pa_tagstruct_gets(t, &name) < 0) {
4224         protocol_error(c);
4225         return;
4226     }
4227
4228     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4229     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4230     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4231     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4232     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4233
4234     if (idx != PA_INVALID_INDEX)
4235         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4236     else {
4237         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4238             if (strcmp(name, m->name) == 0)
4239                 break;
4240     }
4241
4242     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4243     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4244
4245     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4246     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4247
4248     if (cb(c->protocol, m, c, tag, t) < 0)
4249         protocol_error(c);
4250 }
4251
4252 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4253     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4254     uint32_t idx = PA_INVALID_INDEX;
4255     const char *name = NULL, *profile = NULL;
4256     pa_card *card = NULL;
4257     int ret;
4258
4259     pa_native_connection_assert_ref(c);
4260     pa_assert(t);
4261
4262     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4263         pa_tagstruct_gets(t, &name) < 0 ||
4264         pa_tagstruct_gets(t, &profile) < 0 ||
4265         !pa_tagstruct_eof(t)) {
4266         protocol_error(c);
4267         return;
4268     }
4269
4270     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4271     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4272     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4273     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4274     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4275
4276     if (idx != PA_INVALID_INDEX)
4277         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4278     else
4279         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4280
4281     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4282
4283     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4284         pa_pstream_send_error(c->pstream, tag, -ret);
4285         return;
4286     }
4287
4288     pa_pstream_send_simple_ack(c->pstream, tag);
4289 }
4290
4291 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4292     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4293     uint32_t idx = PA_INVALID_INDEX;
4294     const char *name = NULL, *port = NULL;
4295     int ret;
4296
4297     pa_native_connection_assert_ref(c);
4298     pa_assert(t);
4299
4300     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4301         pa_tagstruct_gets(t, &name) < 0 ||
4302         pa_tagstruct_gets(t, &port) < 0 ||
4303         !pa_tagstruct_eof(t)) {
4304         protocol_error(c);
4305         return;
4306     }
4307
4308     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4309     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4310     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4311     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4312     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4313
4314     if (command == PA_COMMAND_SET_SINK_PORT) {
4315         pa_sink *sink;
4316
4317         if (idx != PA_INVALID_INDEX)
4318             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4319         else
4320             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4321
4322         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4323
4324         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4325             pa_pstream_send_error(c->pstream, tag, -ret);
4326             return;
4327         }
4328     } else {
4329         pa_source *source;
4330
4331         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4332
4333         if (idx != PA_INVALID_INDEX)
4334             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4335         else
4336             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4337
4338         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4339
4340         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4341             pa_pstream_send_error(c->pstream, tag, -ret);
4342             return;
4343         }
4344     }
4345
4346     pa_pstream_send_simple_ack(c->pstream, tag);
4347 }
4348
4349 /*** pstream callbacks ***/
4350
4351 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4352     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4353
4354     pa_assert(p);
4355     pa_assert(packet);
4356     pa_native_connection_assert_ref(c);
4357
4358     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4359         pa_log("invalid packet.");
4360         native_connection_unlink(c);
4361     }
4362 }
4363
4364 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4365     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4366     output_stream *stream;
4367
4368     pa_assert(p);
4369     pa_assert(chunk);
4370     pa_native_connection_assert_ref(c);
4371
4372     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4373         pa_log_debug("Client sent block for invalid stream.");
4374         /* Ignoring */
4375         return;
4376     }
4377
4378 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4379
4380     if (playback_stream_isinstance(stream)) {
4381         playback_stream *ps = PLAYBACK_STREAM(stream);
4382
4383         if (chunk->memblock) {
4384             if (seek != PA_SEEK_RELATIVE || offset != 0)
4385                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4386
4387             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4388         } else
4389             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4390
4391     } else {
4392         upload_stream *u = UPLOAD_STREAM(stream);
4393         size_t l;
4394
4395         if (!u->memchunk.memblock) {
4396             if (u->length == chunk->length && chunk->memblock) {
4397                 u->memchunk = *chunk;
4398                 pa_memblock_ref(u->memchunk.memblock);
4399                 u->length = 0;
4400             } else {
4401                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4402                 u->memchunk.index = u->memchunk.length = 0;
4403             }
4404         }
4405
4406         pa_assert(u->memchunk.memblock);
4407
4408         l = u->length;
4409         if (l > chunk->length)
4410             l = chunk->length;
4411
4412         if (l > 0) {
4413             void *dst;
4414             dst = pa_memblock_acquire(u->memchunk.memblock);
4415
4416             if (chunk->memblock) {
4417                 void *src;
4418                 src = pa_memblock_acquire(chunk->memblock);
4419
4420                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4421                        (uint8_t*) src + chunk->index, l);
4422
4423                 pa_memblock_release(chunk->memblock);
4424             } else
4425                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4426
4427             pa_memblock_release(u->memchunk.memblock);
4428
4429             u->memchunk.length += l;
4430             u->length -= l;
4431         }
4432     }
4433 }
4434
4435 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4436     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4437
4438     pa_assert(p);
4439     pa_native_connection_assert_ref(c);
4440
4441     native_connection_unlink(c);
4442     pa_log_info("Connection died.");
4443 }
4444
4445 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4446     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4447
4448     pa_assert(p);
4449     pa_native_connection_assert_ref(c);
4450
4451     native_connection_send_memblock(c);
4452 }
4453
4454 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4455     pa_thread_mq *q;
4456
4457     if (!(q = pa_thread_mq_get()))
4458         pa_pstream_send_revoke(p, block_id);
4459     else
4460         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4461 }
4462
4463 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4464     pa_thread_mq *q;
4465
4466     if (!(q = pa_thread_mq_get()))
4467         pa_pstream_send_release(p, block_id);
4468     else
4469         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4470 }
4471
4472 /*** client callbacks ***/
4473
4474 static void client_kill_cb(pa_client *c) {
4475     pa_assert(c);
4476
4477     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4478     pa_log_info("Connection killed.");
4479 }
4480
4481 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4482     pa_tagstruct *t;
4483     pa_native_connection *c;
4484
4485     pa_assert(client);
4486     c = PA_NATIVE_CONNECTION(client->userdata);
4487     pa_native_connection_assert_ref(c);
4488
4489     if (c->version < 15)
4490       return;
4491
4492     t = pa_tagstruct_new(NULL, 0);
4493     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4494     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4495     pa_tagstruct_puts(t, event);
4496     pa_tagstruct_put_proplist(t, pl);
4497     pa_pstream_send_tagstruct(c->pstream, t);
4498 }
4499
4500 /*** module entry points ***/
4501
4502 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4503     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4504
4505     pa_assert(m);
4506     pa_native_connection_assert_ref(c);
4507     pa_assert(c->auth_timeout_event == e);
4508
4509     if (!c->authorized) {
4510         native_connection_unlink(c);
4511         pa_log_info("Connection terminated due to authentication timeout.");
4512     }
4513 }
4514
4515 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4516     pa_native_connection *c;
4517     char pname[128];
4518     pa_client *client;
4519     pa_client_new_data data;
4520
4521     pa_assert(p);
4522     pa_assert(io);
4523     pa_assert(o);
4524
4525     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4526         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4527         pa_iochannel_free(io);
4528         return;
4529     }
4530
4531     pa_client_new_data_init(&data);
4532     data.module = o->module;
4533     data.driver = __FILE__;
4534     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4535     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4536     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4537     client = pa_client_new(p->core, &data);
4538     pa_client_new_data_done(&data);
4539
4540     if (!client)
4541         return;
4542
4543     c = pa_msgobject_new(pa_native_connection);
4544     c->parent.parent.free = native_connection_free;
4545     c->parent.process_msg = native_connection_process_msg;
4546     c->protocol = p;
4547     c->options = pa_native_options_ref(o);
4548     c->authorized = FALSE;
4549
4550     if (o->auth_anonymous) {
4551         pa_log_info("Client authenticated anonymously.");
4552         c->authorized = TRUE;
4553     }
4554
4555     if (!c->authorized &&
4556         o->auth_ip_acl &&
4557         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4558
4559         pa_log_info("Client authenticated by IP ACL.");
4560         c->authorized = TRUE;
4561     }
4562
4563     if (!c->authorized)
4564         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4565     else
4566         c->auth_timeout_event = NULL;
4567
4568     c->is_local = pa_iochannel_socket_is_local(io);
4569     c->version = 8;
4570
4571     c->client = client;
4572     c->client->kill = client_kill_cb;
4573     c->client->send_event = client_send_event_cb;
4574     c->client->userdata = c;
4575
4576     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4577     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4578     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4579     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4580     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4581     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4582     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4583
4584     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4585
4586     c->record_streams = pa_idxset_new(NULL, NULL);
4587     c->output_streams = pa_idxset_new(NULL, NULL);
4588
4589     c->rrobin_index = PA_IDXSET_INVALID;
4590     c->subscription = NULL;
4591
4592     pa_idxset_put(p->connections, c, NULL);
4593
4594 #ifdef HAVE_CREDS
4595     if (pa_iochannel_creds_supported(io))
4596         pa_iochannel_creds_enable(io);
4597 #endif
4598
4599     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4600 }
4601
4602 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4603     pa_native_connection *c;
4604     void *state = NULL;
4605
4606     pa_assert(p);
4607     pa_assert(m);
4608
4609     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4610         if (c->options->module == m)
4611             native_connection_unlink(c);
4612 }
4613
4614 static pa_native_protocol* native_protocol_new(pa_core *c) {
4615     pa_native_protocol *p;
4616     pa_native_hook_t h;
4617
4618     pa_assert(c);
4619
4620     p = pa_xnew(pa_native_protocol, 1);
4621     PA_REFCNT_INIT(p);
4622     p->core = c;
4623     p->connections = pa_idxset_new(NULL, NULL);
4624
4625     p->servers = NULL;
4626
4627     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4628
4629     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4630         pa_hook_init(&p->hooks[h], p);
4631
4632     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4633
4634     return p;
4635 }
4636
4637 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4638     pa_native_protocol *p;
4639
4640     if ((p = pa_shared_get(c, "native-protocol")))
4641         return pa_native_protocol_ref(p);
4642
4643     return native_protocol_new(c);
4644 }
4645
4646 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4647     pa_assert(p);
4648     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4649
4650     PA_REFCNT_INC(p);
4651
4652     return p;
4653 }
4654
4655 void pa_native_protocol_unref(pa_native_protocol *p) {
4656     pa_native_connection *c;
4657     pa_native_hook_t h;
4658
4659     pa_assert(p);
4660     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4661
4662     if (PA_REFCNT_DEC(p) > 0)
4663         return;
4664
4665     while ((c = pa_idxset_first(p->connections, NULL)))
4666         native_connection_unlink(c);
4667
4668     pa_idxset_free(p->connections, NULL, NULL);
4669
4670     pa_strlist_free(p->servers);
4671
4672     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4673         pa_hook_done(&p->hooks[h]);
4674
4675     pa_hashmap_free(p->extensions, NULL, NULL);
4676
4677     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4678
4679     pa_xfree(p);
4680 }
4681
4682 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4683     pa_assert(p);
4684     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4685     pa_assert(name);
4686
4687     p->servers = pa_strlist_prepend(p->servers, name);
4688
4689     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4690 }
4691
4692 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4693     pa_assert(p);
4694     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4695     pa_assert(name);
4696
4697     p->servers = pa_strlist_remove(p->servers, name);
4698
4699     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4700 }
4701
4702 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4703     pa_assert(p);
4704     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4705
4706     return p->hooks;
4707 }
4708
4709 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4710     pa_assert(p);
4711     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4712
4713     return p->servers;
4714 }
4715
4716 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4717     pa_assert(p);
4718     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4719     pa_assert(m);
4720     pa_assert(cb);
4721     pa_assert(!pa_hashmap_get(p->extensions, m));
4722
4723     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4724     return 0;
4725 }
4726
4727 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4728     pa_assert(p);
4729     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4730     pa_assert(m);
4731
4732     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4733 }
4734
4735 pa_native_options* pa_native_options_new(void) {
4736     pa_native_options *o;
4737
4738     o = pa_xnew0(pa_native_options, 1);
4739     PA_REFCNT_INIT(o);
4740
4741     return o;
4742 }
4743
4744 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4745     pa_assert(o);
4746     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4747
4748     PA_REFCNT_INC(o);
4749
4750     return o;
4751 }
4752
4753 void pa_native_options_unref(pa_native_options *o) {
4754     pa_assert(o);
4755     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4756
4757     if (PA_REFCNT_DEC(o) > 0)
4758         return;
4759
4760     pa_xfree(o->auth_group);
4761
4762     if (o->auth_ip_acl)
4763         pa_ip_acl_free(o->auth_ip_acl);
4764
4765     if (o->auth_cookie)
4766         pa_auth_cookie_unref(o->auth_cookie);
4767
4768     pa_xfree(o);
4769 }
4770
4771 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4772     pa_bool_t enabled;
4773     const char *acl;
4774
4775     pa_assert(o);
4776     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4777     pa_assert(ma);
4778
4779     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4780         pa_log("auth-anonymous= expects a boolean argument.");
4781         return -1;
4782     }
4783
4784     enabled = TRUE;
4785     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4786         pa_log("auth-group-enabled= expects a boolean argument.");
4787         return -1;
4788     }
4789
4790     pa_xfree(o->auth_group);
4791     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4792
4793 #ifndef HAVE_CREDS
4794     if (o->auth_group)
4795         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4796 #endif
4797
4798     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4799         pa_ip_acl *ipa;
4800
4801         if (!(ipa = pa_ip_acl_new(acl))) {
4802             pa_log("Failed to parse IP ACL '%s'", acl);
4803             return -1;
4804         }
4805
4806         if (o->auth_ip_acl)
4807             pa_ip_acl_free(o->auth_ip_acl);
4808
4809         o->auth_ip_acl = ipa;
4810     }
4811
4812     enabled = TRUE;
4813     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4814         pa_log("auth-cookie-enabled= expects a boolean argument.");
4815         return -1;
4816     }
4817
4818     if (o->auth_cookie)
4819         pa_auth_cookie_unref(o->auth_cookie);
4820
4821     if (enabled) {
4822         const char *cn;
4823
4824         /* The new name for this is 'auth-cookie', for compat reasons
4825          * we check the old name too */
4826         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4827             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4828                 cn = PA_NATIVE_COOKIE_FILE;
4829
4830         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4831             return -1;
4832
4833     } else
4834           o->auth_cookie = NULL;
4835
4836     return 0;
4837 }
4838
4839 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4840     pa_native_connection_assert_ref(c);
4841
4842     return c->pstream;
4843 }