protocol-native: compare uint64_t variable with (uint64_t) -1 instead of (size_t...
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/native-common.h>
40 #include <pulsecore/packet.h>
41 #include <pulsecore/client.h>
42 #include <pulsecore/source-output.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/pstream.h>
45 #include <pulsecore/tagstruct.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/llist.h>
57 #include <pulsecore/creds.h>
58 #include <pulsecore/core-util.h>
59 #include <pulsecore/ipacl.h>
60 #include <pulsecore/thread-mq.h>
61
62 #include "protocol-native.h"
63
64 /* Kick a client if it doesn't authenticate within this time */
65 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
66
67 /* Don't accept more connection than this */
68 #define MAX_CONNECTIONS 64
69
70 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
71 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
72 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
73 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
74
75 struct pa_native_protocol;
76
77 typedef struct record_stream {
78     pa_msgobject parent;
79
80     pa_native_connection *connection;
81     uint32_t index;
82
83     pa_source_output *source_output;
84     pa_memblockq *memblockq;
85
86     pa_bool_t adjust_latency:1;
87     pa_bool_t early_requests:1;
88
89     pa_buffer_attr buffer_attr;
90
91     pa_atomic_t on_the_fly;
92     pa_usec_t configured_source_latency;
93     size_t drop_initial;
94
95     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
96     size_t on_the_fly_snapshot;
97     pa_usec_t current_monitor_latency;
98     pa_usec_t current_source_latency;
99 } record_stream;
100
101 #define RECORD_STREAM(o) (record_stream_cast(o))
102 PA_DEFINE_PRIVATE_CLASS(record_stream, pa_msgobject);
103
104 typedef struct output_stream {
105     pa_msgobject parent;
106 } output_stream;
107
108 #define OUTPUT_STREAM(o) (output_stream_cast(o))
109 PA_DEFINE_PRIVATE_CLASS(output_stream, pa_msgobject);
110
111 typedef struct playback_stream {
112     output_stream parent;
113
114     pa_native_connection *connection;
115     uint32_t index;
116
117     pa_sink_input *sink_input;
118     pa_memblockq *memblockq;
119
120     pa_bool_t adjust_latency:1;
121     pa_bool_t early_requests:1;
122
123     pa_bool_t is_underrun:1;
124     pa_bool_t drain_request:1;
125     uint32_t drain_tag;
126     uint32_t syncid;
127
128     pa_atomic_t missing;
129     pa_usec_t configured_sink_latency;
130     pa_buffer_attr buffer_attr;
131
132     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
133     int64_t read_index, write_index;
134     size_t render_memblockq_length;
135     pa_usec_t current_sink_latency;
136     uint64_t playing_for, underrun_for;
137 } playback_stream;
138
139 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
140 PA_DEFINE_PRIVATE_CLASS(playback_stream, output_stream);
141
142 typedef struct upload_stream {
143     output_stream parent;
144
145     pa_native_connection *connection;
146     uint32_t index;
147
148     pa_memchunk memchunk;
149     size_t length;
150     char *name;
151     pa_sample_spec sample_spec;
152     pa_channel_map channel_map;
153     pa_proplist *proplist;
154 } upload_stream;
155
156 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
157 PA_DEFINE_PRIVATE_CLASS(upload_stream, output_stream);
158
159 struct pa_native_connection {
160     pa_msgobject parent;
161     pa_native_protocol *protocol;
162     pa_native_options *options;
163     pa_bool_t authorized:1;
164     pa_bool_t is_local:1;
165     uint32_t version;
166     pa_client *client;
167     pa_pstream *pstream;
168     pa_pdispatch *pdispatch;
169     pa_idxset *record_streams, *output_streams;
170     uint32_t rrobin_index;
171     pa_subscription *subscription;
172     pa_time_event *auth_timeout_event;
173 };
174
175 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
176 PA_DEFINE_PRIVATE_CLASS(pa_native_connection, pa_msgobject);
177
178 struct pa_native_protocol {
179     PA_REFCNT_DECLARE;
180
181     pa_core *core;
182     pa_idxset *connections;
183
184     pa_strlist *servers;
185     pa_hook hooks[PA_NATIVE_HOOK_MAX];
186
187     pa_hashmap *extensions;
188 };
189
190 enum {
191     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
192 };
193
194 enum {
195     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
196     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
197     SINK_INPUT_MESSAGE_FLUSH,
198     SINK_INPUT_MESSAGE_TRIGGER,
199     SINK_INPUT_MESSAGE_SEEK,
200     SINK_INPUT_MESSAGE_PREBUF_FORCE,
201     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
202     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
203 };
204
205 enum {
206     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
207     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
208     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
209     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
210     PLAYBACK_STREAM_MESSAGE_STARTED,
211     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
212 };
213
214 enum {
215     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
216 };
217
218 enum {
219     CONNECTION_MESSAGE_RELEASE,
220     CONNECTION_MESSAGE_REVOKE
221 };
222
223 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
224 static void sink_input_kill_cb(pa_sink_input *i);
225 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
226 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
227 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
228 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
229 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
230 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
231
232 static void native_connection_send_memblock(pa_native_connection *c);
233 static void playback_stream_request_bytes(struct playback_stream*s);
234
235 static void source_output_kill_cb(pa_source_output *o);
236 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
237 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
238 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
239 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
240 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
241
242 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
243 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
244
245 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
246 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
247 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
248 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
249 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284
285 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
286     [PA_COMMAND_ERROR] = NULL,
287     [PA_COMMAND_TIMEOUT] = NULL,
288     [PA_COMMAND_REPLY] = NULL,
289     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
290     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
291     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
292     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
293     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
294     [PA_COMMAND_AUTH] = command_auth,
295     [PA_COMMAND_REQUEST] = NULL,
296     [PA_COMMAND_EXIT] = command_exit,
297     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
298     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
299     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
300     [PA_COMMAND_STAT] = command_stat,
301     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
302     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
303     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
304     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
305     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
306     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
307     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
308     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
309     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
310     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
311     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
312     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
313     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
314     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
315     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
316     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
317     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
318     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
319     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
320     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
321     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
325     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
326
327     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
328     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
329     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
330
331     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
332     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
333     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
334
335     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
336     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
337
338     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
339     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
340     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
341     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
342
343     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
344     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
345
346     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
347     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
348     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
349     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
350     [PA_COMMAND_KILL_CLIENT] = command_kill,
351     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
352     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
353     [PA_COMMAND_LOAD_MODULE] = command_load_module,
354     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
355
356     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
357     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
358     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
359     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
360
361     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
362     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
363
364     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
365     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
366
367     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
368     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
369
370     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
371     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
372     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
373
374     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
375     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
376     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
377
378     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
379
380     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
381     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
382
383     [PA_COMMAND_EXTENSION] = command_extension
384 };
385
386 /* structure management */
387
388 /* Called from main context */
389 static void upload_stream_unlink(upload_stream *s) {
390     pa_assert(s);
391
392     if (!s->connection)
393         return;
394
395     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
396     s->connection = NULL;
397     upload_stream_unref(s);
398 }
399
400 /* Called from main context */
401 static void upload_stream_free(pa_object *o) {
402     upload_stream *s = UPLOAD_STREAM(o);
403     pa_assert(s);
404
405     upload_stream_unlink(s);
406
407     pa_xfree(s->name);
408
409     if (s->proplist)
410         pa_proplist_free(s->proplist);
411
412     if (s->memchunk.memblock)
413         pa_memblock_unref(s->memchunk.memblock);
414
415     pa_xfree(s);
416 }
417
418 /* Called from main context */
419 static upload_stream* upload_stream_new(
420         pa_native_connection *c,
421         const pa_sample_spec *ss,
422         const pa_channel_map *map,
423         const char *name,
424         size_t length,
425         pa_proplist *p) {
426
427     upload_stream *s;
428
429     pa_assert(c);
430     pa_assert(ss);
431     pa_assert(name);
432     pa_assert(length > 0);
433     pa_assert(p);
434
435     s = pa_msgobject_new(upload_stream);
436     s->parent.parent.parent.free = upload_stream_free;
437     s->connection = c;
438     s->sample_spec = *ss;
439     s->channel_map = *map;
440     s->name = pa_xstrdup(name);
441     pa_memchunk_reset(&s->memchunk);
442     s->length = length;
443     s->proplist = pa_proplist_copy(p);
444     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
445
446     pa_idxset_put(c->output_streams, s, &s->index);
447
448     return s;
449 }
450
451 /* Called from main context */
452 static void record_stream_unlink(record_stream *s) {
453     pa_assert(s);
454
455     if (!s->connection)
456         return;
457
458     if (s->source_output) {
459         pa_source_output_unlink(s->source_output);
460         pa_source_output_unref(s->source_output);
461         s->source_output = NULL;
462     }
463
464     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
465     s->connection = NULL;
466     record_stream_unref(s);
467 }
468
469 /* Called from main context */
470 static void record_stream_free(pa_object *o) {
471     record_stream *s = RECORD_STREAM(o);
472     pa_assert(s);
473
474     record_stream_unlink(s);
475
476     pa_memblockq_free(s->memblockq);
477     pa_xfree(s);
478 }
479
480 /* Called from main context */
481 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
482     record_stream *s = RECORD_STREAM(o);
483     record_stream_assert_ref(s);
484
485     if (!s->connection)
486         return -1;
487
488     switch (code) {
489
490         case RECORD_STREAM_MESSAGE_POST_DATA:
491
492             /* We try to keep up to date with how many bytes are
493              * currently on the fly */
494             pa_atomic_sub(&s->on_the_fly, chunk->length);
495
496             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
497 /*                 pa_log_warn("Failed to push data into output queue."); */
498                 return -1;
499             }
500
501             if (!pa_pstream_is_pending(s->connection->pstream))
502                 native_connection_send_memblock(s->connection);
503
504             break;
505     }
506
507     return 0;
508 }
509
510 /* Called from main context */
511 static void fix_record_buffer_attr_pre(record_stream *s) {
512
513     size_t frame_size;
514     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
515
516     pa_assert(s);
517
518     /* This function will be called from the main thread, before as
519      * well as after the source output has been activated using
520      * pa_source_output_put()! That means it may not touch any
521      * ->thread_info data! */
522
523     frame_size = pa_frame_size(&s->source_output->sample_spec);
524
525     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
526         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
527     if (s->buffer_attr.maxlength <= 0)
528         s->buffer_attr.maxlength = (uint32_t) frame_size;
529
530     if (s->buffer_attr.fragsize == (uint32_t) -1)
531         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
532     if (s->buffer_attr.fragsize <= 0)
533         s->buffer_attr.fragsize = (uint32_t) frame_size;
534
535     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
536
537     if (s->early_requests) {
538
539         /* In early request mode we need to emulate the classic
540          * fragment-based playback model. We do this setting the source
541          * latency to the fragment size. */
542
543         source_usec = fragsize_usec;
544
545     } else if (s->adjust_latency) {
546
547         /* So, the user asked us to adjust the latency according to
548          * what the source can provide. Half the latency will be
549          * spent on the hw buffer, half of it in the async buffer
550          * queue we maintain for each client. */
551
552         source_usec = fragsize_usec/2;
553
554     } else {
555
556         /* Ok, the user didn't ask us to adjust the latency, hence we
557          * don't */
558
559         source_usec = (pa_usec_t) -1;
560     }
561
562     if (source_usec != (pa_usec_t) -1)
563         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
564     else
565         s->configured_source_latency = 0;
566
567     if (s->early_requests) {
568
569         /* Ok, we didn't necessarily get what we were asking for, so
570          * let's tell the user */
571
572         fragsize_usec = s->configured_source_latency;
573
574     } else if (s->adjust_latency) {
575
576         /* Now subtract what we actually got */
577
578         if (fragsize_usec >= s->configured_source_latency*2)
579             fragsize_usec -= s->configured_source_latency;
580         else
581             fragsize_usec = s->configured_source_latency;
582     }
583
584     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
585         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
586
587         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
588
589     if (s->buffer_attr.fragsize <= 0)
590         s->buffer_attr.fragsize = (uint32_t) frame_size;
591 }
592
593 /* Called from main context */
594 static void fix_record_buffer_attr_post(record_stream *s) {
595     size_t base;
596
597     pa_assert(s);
598
599     /* This function will be called from the main thread, before as
600      * well as after the source output has been activated using
601      * pa_source_output_put()! That means it may not touch and
602      * ->thread_info data! */
603
604     base = pa_frame_size(&s->source_output->sample_spec);
605
606     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
607     if (s->buffer_attr.fragsize <= 0)
608         s->buffer_attr.fragsize = base;
609
610     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
611         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
612 }
613
614 /* Called from main context */
615 static record_stream* record_stream_new(
616         pa_native_connection *c,
617         pa_source *source,
618         pa_sample_spec *ss,
619         pa_channel_map *map,
620         pa_bool_t peak_detect,
621         pa_buffer_attr *attr,
622         pa_source_output_flags_t flags,
623         pa_proplist *p,
624         pa_bool_t adjust_latency,
625         pa_sink_input *direct_on_input,
626         pa_bool_t early_requests,
627         int *ret) {
628
629     record_stream *s;
630     pa_source_output *source_output = NULL;
631     size_t base;
632     pa_source_output_new_data data;
633
634     pa_assert(c);
635     pa_assert(ss);
636     pa_assert(p);
637     pa_assert(ret);
638
639     pa_source_output_new_data_init(&data);
640
641     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
642     data.driver = __FILE__;
643     data.module = c->options->module;
644     data.client = c->client;
645     data.source = source;
646     data.direct_on_input = direct_on_input;
647     pa_source_output_new_data_set_sample_spec(&data, ss);
648     pa_source_output_new_data_set_channel_map(&data, map);
649     if (peak_detect)
650         data.resample_method = PA_RESAMPLER_PEAKS;
651
652     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
653
654     pa_source_output_new_data_done(&data);
655
656     if (!source_output)
657         return NULL;
658
659     s = pa_msgobject_new(record_stream);
660     s->parent.parent.free = record_stream_free;
661     s->parent.process_msg = record_stream_process_msg;
662     s->connection = c;
663     s->source_output = source_output;
664     s->buffer_attr = *attr;
665     s->adjust_latency = adjust_latency;
666     s->early_requests = early_requests;
667     pa_atomic_store(&s->on_the_fly, 0);
668
669     s->source_output->parent.process_msg = source_output_process_msg;
670     s->source_output->push = source_output_push_cb;
671     s->source_output->kill = source_output_kill_cb;
672     s->source_output->get_latency = source_output_get_latency_cb;
673     s->source_output->moving = source_output_moving_cb;
674     s->source_output->suspend = source_output_suspend_cb;
675     s->source_output->send_event = source_output_send_event_cb;
676     s->source_output->userdata = s;
677
678     fix_record_buffer_attr_pre(s);
679
680     s->memblockq = pa_memblockq_new(
681             0,
682             s->buffer_attr.maxlength,
683             0,
684             base = pa_frame_size(&source_output->sample_spec),
685             1,
686             0,
687             0,
688             NULL);
689
690     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
691     fix_record_buffer_attr_post(s);
692
693     *ss = s->source_output->sample_spec;
694     *map = s->source_output->channel_map;
695
696     pa_idxset_put(c->record_streams, s, &s->index);
697
698     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
699                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
700                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
701                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
702
703     pa_source_output_put(s->source_output);
704     return s;
705 }
706
707 /* Called from main context */
708 static void record_stream_send_killed(record_stream *r) {
709     pa_tagstruct *t;
710     record_stream_assert_ref(r);
711
712     t = pa_tagstruct_new(NULL, 0);
713     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
714     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
715     pa_tagstruct_putu32(t, r->index);
716     pa_pstream_send_tagstruct(r->connection->pstream, t);
717 }
718
719 /* Called from main context */
720 static void playback_stream_unlink(playback_stream *s) {
721     pa_assert(s);
722
723     if (!s->connection)
724         return;
725
726     if (s->sink_input) {
727         pa_sink_input_unlink(s->sink_input);
728         pa_sink_input_unref(s->sink_input);
729         s->sink_input = NULL;
730     }
731
732     if (s->drain_request)
733         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
734
735     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
736     s->connection = NULL;
737     playback_stream_unref(s);
738 }
739
740 /* Called from main context */
741 static void playback_stream_free(pa_object* o) {
742     playback_stream *s = PLAYBACK_STREAM(o);
743     pa_assert(s);
744
745     playback_stream_unlink(s);
746
747     pa_memblockq_free(s->memblockq);
748     pa_xfree(s);
749 }
750
751 /* Called from main context */
752 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
753     playback_stream *s = PLAYBACK_STREAM(o);
754     playback_stream_assert_ref(s);
755
756     if (!s->connection)
757         return -1;
758
759     switch (code) {
760
761         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
762             pa_tagstruct *t;
763             int l = 0;
764
765             for (;;) {
766                 if ((l = pa_atomic_load(&s->missing)) <= 0)
767                     return 0;
768
769                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
770                     break;
771             }
772
773             t = pa_tagstruct_new(NULL, 0);
774             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
775             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
776             pa_tagstruct_putu32(t, s->index);
777             pa_tagstruct_putu32(t, (uint32_t) l);
778             pa_pstream_send_tagstruct(s->connection->pstream, t);
779
780 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
781             break;
782         }
783
784         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
785             pa_tagstruct *t;
786
787 /*             pa_log("signalling underflow"); */
788
789             /* Report that we're empty */
790             t = pa_tagstruct_new(NULL, 0);
791             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
792             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
793             pa_tagstruct_putu32(t, s->index);
794             pa_pstream_send_tagstruct(s->connection->pstream, t);
795             break;
796         }
797
798         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
799             pa_tagstruct *t;
800
801             /* Notify the user we're overflowed*/
802             t = pa_tagstruct_new(NULL, 0);
803             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
804             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
805             pa_tagstruct_putu32(t, s->index);
806             pa_pstream_send_tagstruct(s->connection->pstream, t);
807             break;
808         }
809
810         case PLAYBACK_STREAM_MESSAGE_STARTED:
811
812             if (s->connection->version >= 13) {
813                 pa_tagstruct *t;
814
815                 /* Notify the user we started playback */
816                 t = pa_tagstruct_new(NULL, 0);
817                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
818                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
819                 pa_tagstruct_putu32(t, s->index);
820                 pa_pstream_send_tagstruct(s->connection->pstream, t);
821             }
822
823             break;
824
825         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
826             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
827             break;
828
829         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
830             pa_tagstruct *t;
831
832             s->buffer_attr.tlength = (uint32_t) offset;
833
834             t = pa_tagstruct_new(NULL, 0);
835             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
836             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
837             pa_tagstruct_putu32(t, s->index);
838             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
839             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
840             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
841             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
842             pa_tagstruct_put_usec(t, s->configured_sink_latency);
843             pa_pstream_send_tagstruct(s->connection->pstream, t);
844
845             break;
846         }
847     }
848
849     return 0;
850 }
851
852 /* Called from main context */
853 static void fix_playback_buffer_attr(playback_stream *s) {
854     size_t frame_size, max_prebuf;
855     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
856
857     pa_assert(s);
858
859     /* This function will be called from the main thread, before as
860      * well as after the sink input has been activated using
861      * pa_sink_input_put()! That means it may not touch any
862      * ->thread_info data, such as the memblockq! */
863
864     frame_size = pa_frame_size(&s->sink_input->sample_spec);
865
866     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
867         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
868     if (s->buffer_attr.maxlength <= 0)
869         s->buffer_attr.maxlength = (uint32_t) frame_size;
870
871     if (s->buffer_attr.tlength == (uint32_t) -1)
872         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
873     if (s->buffer_attr.tlength <= 0)
874         s->buffer_attr.tlength = (uint32_t) frame_size;
875
876     if (s->buffer_attr.minreq == (uint32_t) -1)
877         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
878     if (s->buffer_attr.minreq <= 0)
879         s->buffer_attr.minreq = (uint32_t) frame_size;
880
881     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
882         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
883
884     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
885     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
886
887     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
888                 (double) tlength_usec / PA_USEC_PER_MSEC,
889                 (double) minreq_usec / PA_USEC_PER_MSEC);
890
891     if (s->early_requests) {
892
893         /* In early request mode we need to emulate the classic
894          * fragment-based playback model. We do this setting the sink
895          * latency to the fragment size. */
896
897         sink_usec = minreq_usec;
898         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
899
900     } else if (s->adjust_latency) {
901
902         /* So, the user asked us to adjust the latency of the stream
903          * buffer according to the what the sink can provide. The
904          * tlength passed in shall be the overall latency. Roughly
905          * half the latency will be spent on the hw buffer, the other
906          * half of it in the async buffer queue we maintain for each
907          * client. In between we'll have a safety space of size
908          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
909          * empty and needs to be filled, then our buffer must have
910          * enough data to fulfill this request immediatly and thus
911          * have at least the same tlength as the size of the hw
912          * buffer. It additionally needs space for 2 times minreq
913          * because if the buffer ran empty and a partial fillup
914          * happens immediately on the next iteration we need to be
915          * able to fulfill it and give the application also minreq
916          * time to fill it up again for the next request Makes 2 times
917          * minreq in plus.. */
918
919         if (tlength_usec > minreq_usec*2)
920             sink_usec = (tlength_usec - minreq_usec*2)/2;
921         else
922             sink_usec = 0;
923
924         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
925
926     } else {
927
928         /* Ok, the user didn't ask us to adjust the latency, but we
929          * still need to make sure that the parameters from the user
930          * do make sense. */
931
932         if (tlength_usec > minreq_usec*2)
933             sink_usec = (tlength_usec - minreq_usec*2);
934         else
935             sink_usec = 0;
936
937         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
938     }
939
940     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
941
942     if (s->early_requests) {
943
944         /* Ok, we didn't necessarily get what we were asking for, so
945          * let's tell the user */
946
947         minreq_usec = s->configured_sink_latency;
948
949     } else if (s->adjust_latency) {
950
951         /* Ok, we didn't necessarily get what we were asking for, so
952          * let's subtract from what we asked for for the remaining
953          * buffer space */
954
955         if (tlength_usec >= s->configured_sink_latency)
956             tlength_usec -= s->configured_sink_latency;
957     }
958
959     /* FIXME: This is actually larger than necessary, since not all of
960      * the sink latency is actually rewritable. */
961     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
962         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
963
964     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
965         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
966         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
967
968     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
969         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
970         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
971
972     if (s->buffer_attr.minreq <= 0) {
973         s->buffer_attr.minreq = (uint32_t) frame_size;
974         s->buffer_attr.tlength += (uint32_t) frame_size*2;
975     }
976
977     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
978         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
979
980     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
981
982     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
983         s->buffer_attr.prebuf > max_prebuf)
984         s->buffer_attr.prebuf = max_prebuf;
985 }
986
987 /* Called from main context */
988 static playback_stream* playback_stream_new(
989         pa_native_connection *c,
990         pa_sink *sink,
991         pa_sample_spec *ss,
992         pa_channel_map *map,
993         pa_buffer_attr *a,
994         pa_cvolume *volume,
995         pa_bool_t muted,
996         pa_bool_t muted_set,
997         uint32_t syncid,
998         uint32_t *missing,
999         pa_sink_input_flags_t flags,
1000         pa_proplist *p,
1001         pa_bool_t adjust_latency,
1002         pa_bool_t early_requests,
1003         int *ret) {
1004
1005     playback_stream *s, *ssync;
1006     pa_sink_input *sink_input = NULL;
1007     pa_memchunk silence;
1008     uint32_t idx;
1009     int64_t start_index;
1010     pa_sink_input_new_data data;
1011
1012     pa_assert(c);
1013     pa_assert(ss);
1014     pa_assert(missing);
1015     pa_assert(p);
1016     pa_assert(ret);
1017
1018     /* Find syncid group */
1019     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1020
1021         if (!playback_stream_isinstance(ssync))
1022             continue;
1023
1024         if (ssync->syncid == syncid)
1025             break;
1026     }
1027
1028     /* Synced streams must connect to the same sink */
1029     if (ssync) {
1030
1031         if (!sink)
1032             sink = ssync->sink_input->sink;
1033         else if (sink != ssync->sink_input->sink) {
1034             *ret = PA_ERR_INVALID;
1035             return NULL;
1036         }
1037     }
1038
1039     pa_sink_input_new_data_init(&data);
1040
1041     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1042     data.driver = __FILE__;
1043     data.module = c->options->module;
1044     data.client = c->client;
1045     data.sink = sink;
1046     pa_sink_input_new_data_set_sample_spec(&data, ss);
1047     pa_sink_input_new_data_set_channel_map(&data, map);
1048     if (volume)
1049         pa_sink_input_new_data_set_volume(&data, volume);
1050     if (muted_set)
1051         pa_sink_input_new_data_set_muted(&data, muted);
1052     data.sync_base = ssync ? ssync->sink_input : NULL;
1053
1054     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1055
1056     pa_sink_input_new_data_done(&data);
1057
1058     if (!sink_input)
1059         return NULL;
1060
1061     s = pa_msgobject_new(playback_stream);
1062     s->parent.parent.parent.free = playback_stream_free;
1063     s->parent.parent.process_msg = playback_stream_process_msg;
1064     s->connection = c;
1065     s->syncid = syncid;
1066     s->sink_input = sink_input;
1067     s->is_underrun = TRUE;
1068     s->drain_request = FALSE;
1069     pa_atomic_store(&s->missing, 0);
1070     s->buffer_attr = *a;
1071     s->adjust_latency = adjust_latency;
1072     s->early_requests = early_requests;
1073
1074     s->sink_input->parent.process_msg = sink_input_process_msg;
1075     s->sink_input->pop = sink_input_pop_cb;
1076     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1077     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1078     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1079     s->sink_input->kill = sink_input_kill_cb;
1080     s->sink_input->moving = sink_input_moving_cb;
1081     s->sink_input->suspend = sink_input_suspend_cb;
1082     s->sink_input->send_event = sink_input_send_event_cb;
1083     s->sink_input->userdata = s;
1084
1085     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1086
1087     fix_playback_buffer_attr(s);
1088
1089     pa_sink_input_get_silence(sink_input, &silence);
1090     s->memblockq = pa_memblockq_new(
1091             start_index,
1092             s->buffer_attr.maxlength,
1093             s->buffer_attr.tlength,
1094             pa_frame_size(&sink_input->sample_spec),
1095             s->buffer_attr.prebuf,
1096             s->buffer_attr.minreq,
1097             0,
1098             &silence);
1099     pa_memblock_unref(silence.memblock);
1100
1101     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1102
1103     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1104
1105     *ss = s->sink_input->sample_spec;
1106     *map = s->sink_input->channel_map;
1107
1108     pa_idxset_put(c->output_streams, s, &s->index);
1109
1110     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1111                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1112                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1113                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1114                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1115
1116     pa_sink_input_put(s->sink_input);
1117     return s;
1118 }
1119
1120 /* Called from IO context */
1121 static void playback_stream_request_bytes(playback_stream *s) {
1122     size_t m, minreq;
1123     int previous_missing;
1124
1125     playback_stream_assert_ref(s);
1126
1127     m = pa_memblockq_pop_missing(s->memblockq);
1128
1129     /* pa_log("request_bytes(%lu) (tlength=%lu minreq=%lu length=%lu)", */
1130     /*        (unsigned long) m, */
1131     /*        pa_memblockq_get_tlength(s->memblockq), */
1132     /*        pa_memblockq_get_minreq(s->memblockq), */
1133     /*        pa_memblockq_get_length(s->memblockq)); */
1134
1135     if (m <= 0)
1136         return;
1137
1138 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1139
1140     previous_missing = pa_atomic_add(&s->missing, (int) m);
1141     minreq = pa_memblockq_get_minreq(s->memblockq);
1142
1143     if (pa_memblockq_prebuf_active(s->memblockq) ||
1144         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1145         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1146 }
1147
1148 /* Called from main context */
1149 static void playback_stream_send_killed(playback_stream *p) {
1150     pa_tagstruct *t;
1151     playback_stream_assert_ref(p);
1152
1153     t = pa_tagstruct_new(NULL, 0);
1154     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1155     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1156     pa_tagstruct_putu32(t, p->index);
1157     pa_pstream_send_tagstruct(p->connection->pstream, t);
1158 }
1159
1160 /* Called from main context */
1161 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1162     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1163     pa_native_connection_assert_ref(c);
1164
1165     if (!c->protocol)
1166         return -1;
1167
1168     switch (code) {
1169
1170         case CONNECTION_MESSAGE_REVOKE:
1171             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1172             break;
1173
1174         case CONNECTION_MESSAGE_RELEASE:
1175             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1176             break;
1177     }
1178
1179     return 0;
1180 }
1181
1182 /* Called from main context */
1183 static void native_connection_unlink(pa_native_connection *c) {
1184     record_stream *r;
1185     output_stream *o;
1186
1187     pa_assert(c);
1188
1189     if (!c->protocol)
1190         return;
1191
1192     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1193
1194     if (c->options)
1195         pa_native_options_unref(c->options);
1196
1197     while ((r = pa_idxset_first(c->record_streams, NULL)))
1198         record_stream_unlink(r);
1199
1200     while ((o = pa_idxset_first(c->output_streams, NULL)))
1201         if (playback_stream_isinstance(o))
1202             playback_stream_unlink(PLAYBACK_STREAM(o));
1203         else
1204             upload_stream_unlink(UPLOAD_STREAM(o));
1205
1206     if (c->subscription)
1207         pa_subscription_free(c->subscription);
1208
1209     if (c->pstream)
1210         pa_pstream_unlink(c->pstream);
1211
1212     if (c->auth_timeout_event) {
1213         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1214         c->auth_timeout_event = NULL;
1215     }
1216
1217     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1218     c->protocol = NULL;
1219     pa_native_connection_unref(c);
1220 }
1221
1222 /* Called from main context */
1223 static void native_connection_free(pa_object *o) {
1224     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1225
1226     pa_assert(c);
1227
1228     native_connection_unlink(c);
1229
1230     pa_idxset_free(c->record_streams, NULL, NULL);
1231     pa_idxset_free(c->output_streams, NULL, NULL);
1232
1233     pa_pdispatch_unref(c->pdispatch);
1234     pa_pstream_unref(c->pstream);
1235     pa_client_free(c->client);
1236
1237     pa_xfree(c);
1238 }
1239
1240 /* Called from main context */
1241 static void native_connection_send_memblock(pa_native_connection *c) {
1242     uint32_t start;
1243     record_stream *r;
1244
1245     start = PA_IDXSET_INVALID;
1246     for (;;) {
1247         pa_memchunk chunk;
1248
1249         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1250             return;
1251
1252         if (start == PA_IDXSET_INVALID)
1253             start = c->rrobin_index;
1254         else if (start == c->rrobin_index)
1255             return;
1256
1257         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1258             pa_memchunk schunk = chunk;
1259
1260             if (schunk.length > r->buffer_attr.fragsize)
1261                 schunk.length = r->buffer_attr.fragsize;
1262
1263             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1264
1265             pa_memblockq_drop(r->memblockq, schunk.length);
1266             pa_memblock_unref(schunk.memblock);
1267
1268             return;
1269         }
1270     }
1271 }
1272
1273 /*** sink input callbacks ***/
1274
1275 /* Called from thread context */
1276 static void handle_seek(playback_stream *s, int64_t indexw) {
1277     playback_stream_assert_ref(s);
1278
1279 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1280
1281     if (s->sink_input->thread_info.underrun_for > 0) {
1282
1283 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1284
1285         if (pa_memblockq_is_readable(s->memblockq)) {
1286
1287             /* We just ended an underrun, let's ask the sink
1288              * for a complete rewind rewrite */
1289
1290             pa_log_debug("Requesting rewind due to end of underrun.");
1291             pa_sink_input_request_rewind(s->sink_input,
1292                                          (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 :
1293                                                    s->sink_input->thread_info.underrun_for),
1294                                          FALSE, TRUE, FALSE);
1295         }
1296
1297     } else {
1298         int64_t indexr;
1299
1300         indexr = pa_memblockq_get_read_index(s->memblockq);
1301
1302         if (indexw < indexr) {
1303             /* OK, the sink already asked for this data, so
1304              * let's have it usk us again */
1305
1306             pa_log_debug("Requesting rewind due to rewrite.");
1307             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1308         }
1309     }
1310
1311     playback_stream_request_bytes(s);
1312 }
1313
1314 /* Called from thread context */
1315 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1316     pa_sink_input *i = PA_SINK_INPUT(o);
1317     playback_stream *s;
1318
1319     pa_sink_input_assert_ref(i);
1320     s = PLAYBACK_STREAM(i->userdata);
1321     playback_stream_assert_ref(s);
1322
1323     switch (code) {
1324
1325         case SINK_INPUT_MESSAGE_SEEK: {
1326             int64_t windex;
1327
1328             windex = pa_memblockq_get_write_index(s->memblockq);
1329
1330             /* The client side is incapable of accounting correctly
1331              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1332              * able to deal with that. */
1333
1334             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1335
1336             handle_seek(s, windex);
1337             return 0;
1338         }
1339
1340         case SINK_INPUT_MESSAGE_POST_DATA: {
1341             int64_t windex;
1342
1343             pa_assert(chunk);
1344
1345             windex = pa_memblockq_get_write_index(s->memblockq);
1346
1347 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1348
1349             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1350
1351                 if (pa_log_ratelimit())
1352                     pa_log_warn("Failed to push data into queue");
1353                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1354                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1355             }
1356
1357             handle_seek(s, windex);
1358
1359 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1360
1361             return 0;
1362         }
1363
1364         case SINK_INPUT_MESSAGE_DRAIN:
1365         case SINK_INPUT_MESSAGE_FLUSH:
1366         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1367         case SINK_INPUT_MESSAGE_TRIGGER: {
1368
1369             int64_t windex;
1370             pa_sink_input *isync;
1371             void (*func)(pa_memblockq *bq);
1372
1373             switch  (code) {
1374                 case SINK_INPUT_MESSAGE_FLUSH:
1375                     func = pa_memblockq_flush_write;
1376                     break;
1377
1378                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1379                     func = pa_memblockq_prebuf_force;
1380                     break;
1381
1382                 case SINK_INPUT_MESSAGE_DRAIN:
1383                 case SINK_INPUT_MESSAGE_TRIGGER:
1384                     func = pa_memblockq_prebuf_disable;
1385                     break;
1386
1387                 default:
1388                     pa_assert_not_reached();
1389             }
1390
1391             windex = pa_memblockq_get_write_index(s->memblockq);
1392             func(s->memblockq);
1393             handle_seek(s, windex);
1394
1395             /* Do the same for all other members in the sync group */
1396             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1397                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1398                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1399                 func(ssync->memblockq);
1400                 handle_seek(ssync, windex);
1401             }
1402
1403             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1404                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1405                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1406                 func(ssync->memblockq);
1407                 handle_seek(ssync, windex);
1408             }
1409
1410             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1411                 if (!pa_memblockq_is_readable(s->memblockq))
1412                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1413                 else {
1414                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1415                     s->drain_request = TRUE;
1416                 }
1417             }
1418
1419             return 0;
1420         }
1421
1422         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1423             /* Atomically get a snapshot of all timing parameters... */
1424             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1425             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1426             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1427             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1428             s->underrun_for = s->sink_input->thread_info.underrun_for;
1429             s->playing_for = s->sink_input->thread_info.playing_for;
1430
1431             return 0;
1432
1433         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1434             int64_t windex;
1435
1436             windex = pa_memblockq_get_write_index(s->memblockq);
1437
1438             pa_memblockq_prebuf_force(s->memblockq);
1439
1440             handle_seek(s, windex);
1441
1442             /* Fall through to the default handler */
1443             break;
1444         }
1445
1446         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1447             pa_usec_t *r = userdata;
1448
1449             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1450
1451             /* Fall through, the default handler will add in the extra
1452              * latency added by the resampler */
1453             break;
1454         }
1455
1456         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1457             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1458             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1459             return 0;
1460         }
1461     }
1462
1463     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1464 }
1465
1466 /* Called from thread context */
1467 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1468     playback_stream *s;
1469
1470     pa_sink_input_assert_ref(i);
1471     s = PLAYBACK_STREAM(i->userdata);
1472     playback_stream_assert_ref(s);
1473     pa_assert(chunk);
1474
1475 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1476
1477     if (pa_memblockq_is_readable(s->memblockq))
1478         s->is_underrun = FALSE;
1479     else {
1480         if (!s->is_underrun)
1481             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1482
1483         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1484             s->drain_request = FALSE;
1485             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1486         } else if (!s->is_underrun)
1487             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1488
1489         s->is_underrun = TRUE;
1490
1491         playback_stream_request_bytes(s);
1492     }
1493
1494     /* This call will not fail with prebuf=0, hence we check for
1495        underrun explicitly above */
1496     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1497         return -1;
1498
1499     chunk->length = PA_MIN(nbytes, chunk->length);
1500
1501     if (i->thread_info.underrun_for > 0)
1502         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1503
1504     pa_memblockq_drop(s->memblockq, chunk->length);
1505     playback_stream_request_bytes(s);
1506
1507     return 0;
1508 }
1509
1510 /* Called from thread context */
1511 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1512     playback_stream *s;
1513
1514     pa_sink_input_assert_ref(i);
1515     s = PLAYBACK_STREAM(i->userdata);
1516     playback_stream_assert_ref(s);
1517
1518     /* If we are in an underrun, then we don't rewind */
1519     if (i->thread_info.underrun_for > 0)
1520         return;
1521
1522     pa_memblockq_rewind(s->memblockq, nbytes);
1523 }
1524
1525 /* Called from thread context */
1526 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1527     playback_stream *s;
1528
1529     pa_sink_input_assert_ref(i);
1530     s = PLAYBACK_STREAM(i->userdata);
1531     playback_stream_assert_ref(s);
1532
1533     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1534 }
1535
1536 /* Called from thread context */
1537 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1538     playback_stream *s;
1539     size_t new_tlength, old_tlength;
1540
1541     pa_sink_input_assert_ref(i);
1542     s = PLAYBACK_STREAM(i->userdata);
1543     playback_stream_assert_ref(s);
1544
1545     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1546     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1547
1548     if (old_tlength < new_tlength) {
1549         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1550         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1551         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1552
1553         if (new_tlength == old_tlength)
1554             pa_log_debug("Failed to increase tlength");
1555         else {
1556             pa_log_debug("Notifying client about increased tlength");
1557             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1558         }
1559     }
1560 }
1561
1562 /* Called from main context */
1563 static void sink_input_kill_cb(pa_sink_input *i) {
1564     playback_stream *s;
1565
1566     pa_sink_input_assert_ref(i);
1567     s = PLAYBACK_STREAM(i->userdata);
1568     playback_stream_assert_ref(s);
1569
1570     playback_stream_send_killed(s);
1571     playback_stream_unlink(s);
1572 }
1573
1574 /* Called from main context */
1575 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1576     playback_stream *s;
1577     pa_tagstruct *t;
1578
1579     pa_sink_input_assert_ref(i);
1580     s = PLAYBACK_STREAM(i->userdata);
1581     playback_stream_assert_ref(s);
1582
1583     if (s->connection->version < 15)
1584       return;
1585
1586     t = pa_tagstruct_new(NULL, 0);
1587     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1588     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1589     pa_tagstruct_putu32(t, s->index);
1590     pa_tagstruct_puts(t, event);
1591     pa_tagstruct_put_proplist(t, pl);
1592     pa_pstream_send_tagstruct(s->connection->pstream, t);
1593 }
1594
1595 /* Called from main context */
1596 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1597     playback_stream *s;
1598     pa_tagstruct *t;
1599
1600     pa_sink_input_assert_ref(i);
1601     s = PLAYBACK_STREAM(i->userdata);
1602     playback_stream_assert_ref(s);
1603
1604     if (s->connection->version < 12)
1605       return;
1606
1607     t = pa_tagstruct_new(NULL, 0);
1608     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1609     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1610     pa_tagstruct_putu32(t, s->index);
1611     pa_tagstruct_put_boolean(t, suspend);
1612     pa_pstream_send_tagstruct(s->connection->pstream, t);
1613 }
1614
1615 /* Called from main context */
1616 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1617     playback_stream *s;
1618     pa_tagstruct *t;
1619
1620     pa_sink_input_assert_ref(i);
1621     s = PLAYBACK_STREAM(i->userdata);
1622     playback_stream_assert_ref(s);
1623
1624     if (!dest)
1625         return;
1626
1627     fix_playback_buffer_attr(s);
1628     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1629     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1630
1631     if (s->connection->version < 12)
1632       return;
1633
1634     t = pa_tagstruct_new(NULL, 0);
1635     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1636     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1637     pa_tagstruct_putu32(t, s->index);
1638     pa_tagstruct_putu32(t, dest->index);
1639     pa_tagstruct_puts(t, dest->name);
1640     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1641
1642     if (s->connection->version >= 13) {
1643         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1644         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1645         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1646         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1647         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1648     }
1649
1650     pa_pstream_send_tagstruct(s->connection->pstream, t);
1651 }
1652
1653 /*** source_output callbacks ***/
1654
1655 /* Called from thread context */
1656 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1657     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1658     record_stream *s;
1659
1660     pa_source_output_assert_ref(o);
1661     s = RECORD_STREAM(o->userdata);
1662     record_stream_assert_ref(s);
1663
1664     switch (code) {
1665         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1666             /* Atomically get a snapshot of all timing parameters... */
1667             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1668             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1669             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1670             return 0;
1671     }
1672
1673     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1674 }
1675
1676 /* Called from thread context */
1677 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1678     record_stream *s;
1679
1680     pa_source_output_assert_ref(o);
1681     s = RECORD_STREAM(o->userdata);
1682     record_stream_assert_ref(s);
1683     pa_assert(chunk);
1684
1685     pa_atomic_add(&s->on_the_fly, chunk->length);
1686     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1687 }
1688
1689 static void source_output_kill_cb(pa_source_output *o) {
1690     record_stream *s;
1691
1692     pa_source_output_assert_ref(o);
1693     s = RECORD_STREAM(o->userdata);
1694     record_stream_assert_ref(s);
1695
1696     record_stream_send_killed(s);
1697     record_stream_unlink(s);
1698 }
1699
1700 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1701     record_stream *s;
1702
1703     pa_source_output_assert_ref(o);
1704     s = RECORD_STREAM(o->userdata);
1705     record_stream_assert_ref(s);
1706
1707     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1708
1709     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1710 }
1711
1712 /* Called from main context */
1713 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1714     record_stream *s;
1715     pa_tagstruct *t;
1716
1717     pa_source_output_assert_ref(o);
1718     s = RECORD_STREAM(o->userdata);
1719     record_stream_assert_ref(s);
1720
1721     if (s->connection->version < 15)
1722       return;
1723
1724     t = pa_tagstruct_new(NULL, 0);
1725     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1726     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1727     pa_tagstruct_putu32(t, s->index);
1728     pa_tagstruct_puts(t, event);
1729     pa_tagstruct_put_proplist(t, pl);
1730     pa_pstream_send_tagstruct(s->connection->pstream, t);
1731 }
1732
1733 /* Called from main context */
1734 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1735     record_stream *s;
1736     pa_tagstruct *t;
1737
1738     pa_source_output_assert_ref(o);
1739     s = RECORD_STREAM(o->userdata);
1740     record_stream_assert_ref(s);
1741
1742     if (s->connection->version < 12)
1743       return;
1744
1745     t = pa_tagstruct_new(NULL, 0);
1746     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1747     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1748     pa_tagstruct_putu32(t, s->index);
1749     pa_tagstruct_put_boolean(t, suspend);
1750     pa_pstream_send_tagstruct(s->connection->pstream, t);
1751 }
1752
1753 /* Called from main context */
1754 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1755     record_stream *s;
1756     pa_tagstruct *t;
1757
1758     pa_source_output_assert_ref(o);
1759     s = RECORD_STREAM(o->userdata);
1760     record_stream_assert_ref(s);
1761
1762     if (!dest)
1763         return;
1764
1765     fix_record_buffer_attr_pre(s);
1766     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1767     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1768     fix_record_buffer_attr_post(s);
1769
1770     if (s->connection->version < 12)
1771       return;
1772
1773     t = pa_tagstruct_new(NULL, 0);
1774     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1775     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1776     pa_tagstruct_putu32(t, s->index);
1777     pa_tagstruct_putu32(t, dest->index);
1778     pa_tagstruct_puts(t, dest->name);
1779     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1780
1781     if (s->connection->version >= 13) {
1782         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1783         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1784         pa_tagstruct_put_usec(t, s->configured_source_latency);
1785     }
1786
1787     pa_pstream_send_tagstruct(s->connection->pstream, t);
1788 }
1789
1790 /*** pdispatch callbacks ***/
1791
1792 static void protocol_error(pa_native_connection *c) {
1793     pa_log("protocol error, kicking client");
1794     native_connection_unlink(c);
1795 }
1796
1797 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1798 if (!(expression)) { \
1799     pa_pstream_send_error((pstream), (tag), (error)); \
1800     return; \
1801 } \
1802 } while(0);
1803
1804 static pa_tagstruct *reply_new(uint32_t tag) {
1805     pa_tagstruct *reply;
1806
1807     reply = pa_tagstruct_new(NULL, 0);
1808     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1809     pa_tagstruct_putu32(reply, tag);
1810     return reply;
1811 }
1812
1813 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1814     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1815     playback_stream *s;
1816     uint32_t sink_index, syncid, missing;
1817     pa_buffer_attr attr;
1818     const char *name = NULL, *sink_name;
1819     pa_sample_spec ss;
1820     pa_channel_map map;
1821     pa_tagstruct *reply;
1822     pa_sink *sink = NULL;
1823     pa_cvolume volume;
1824     pa_bool_t
1825         corked = FALSE,
1826         no_remap = FALSE,
1827         no_remix = FALSE,
1828         fix_format = FALSE,
1829         fix_rate = FALSE,
1830         fix_channels = FALSE,
1831         no_move = FALSE,
1832         variable_rate = FALSE,
1833         muted = FALSE,
1834         adjust_latency = FALSE,
1835         early_requests = FALSE,
1836         dont_inhibit_auto_suspend = FALSE,
1837         muted_set = FALSE,
1838         fail_on_suspend = FALSE;
1839     pa_sink_input_flags_t flags = 0;
1840     pa_proplist *p;
1841     pa_bool_t volume_set = TRUE;
1842     int ret = PA_ERR_INVALID;
1843
1844     pa_native_connection_assert_ref(c);
1845     pa_assert(t);
1846     memset(&attr, 0, sizeof(attr));
1847
1848     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1849         pa_tagstruct_get(
1850                 t,
1851                 PA_TAG_SAMPLE_SPEC, &ss,
1852                 PA_TAG_CHANNEL_MAP, &map,
1853                 PA_TAG_U32, &sink_index,
1854                 PA_TAG_STRING, &sink_name,
1855                 PA_TAG_U32, &attr.maxlength,
1856                 PA_TAG_BOOLEAN, &corked,
1857                 PA_TAG_U32, &attr.tlength,
1858                 PA_TAG_U32, &attr.prebuf,
1859                 PA_TAG_U32, &attr.minreq,
1860                 PA_TAG_U32, &syncid,
1861                 PA_TAG_CVOLUME, &volume,
1862                 PA_TAG_INVALID) < 0) {
1863
1864         protocol_error(c);
1865         return;
1866     }
1867
1868     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1869     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1870     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1871     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1872     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1873     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1874     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1875     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1876
1877     p = pa_proplist_new();
1878
1879     if (name)
1880         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1881
1882     if (c->version >= 12)  {
1883         /* Since 0.9.8 the user can ask for a couple of additional flags */
1884
1885         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1886             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1887             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1888             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1889             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1890             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1891             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1892
1893             protocol_error(c);
1894             pa_proplist_free(p);
1895             return;
1896         }
1897     }
1898
1899     if (c->version >= 13) {
1900
1901         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1902             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1903             pa_tagstruct_get_proplist(t, p) < 0) {
1904             protocol_error(c);
1905             pa_proplist_free(p);
1906             return;
1907         }
1908     }
1909
1910     if (c->version >= 14) {
1911
1912         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1913             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1914             protocol_error(c);
1915             pa_proplist_free(p);
1916             return;
1917         }
1918     }
1919
1920     if (c->version >= 15) {
1921
1922         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1923             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1924             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1925             protocol_error(c);
1926             pa_proplist_free(p);
1927             return;
1928         }
1929     }
1930
1931     if (!pa_tagstruct_eof(t)) {
1932         protocol_error(c);
1933         pa_proplist_free(p);
1934         return;
1935     }
1936
1937     if (sink_index != PA_INVALID_INDEX) {
1938
1939         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1940             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1941             pa_proplist_free(p);
1942             return;
1943         }
1944
1945     } else if (sink_name) {
1946
1947         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1948             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1949             pa_proplist_free(p);
1950             return;
1951         }
1952     }
1953
1954     flags =
1955         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1956         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1957         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1958         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1959         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1960         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1961         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1962         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1963         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1964         (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0);
1965
1966     /* Only since protocol version 15 there's a seperate muted_set
1967      * flag. For older versions we synthesize it here */
1968     muted_set = muted_set || muted;
1969
1970     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1971     pa_proplist_free(p);
1972
1973     CHECK_VALIDITY(c->pstream, s, tag, ret);
1974
1975     reply = reply_new(tag);
1976     pa_tagstruct_putu32(reply, s->index);
1977     pa_assert(s->sink_input);
1978     pa_tagstruct_putu32(reply, s->sink_input->index);
1979     pa_tagstruct_putu32(reply, missing);
1980
1981 /*     pa_log("initial request is %u", missing); */
1982
1983     if (c->version >= 9) {
1984         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1985
1986         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1987         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1988         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1989         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1990     }
1991
1992     if (c->version >= 12) {
1993         /* Since 0.9.8 we support sending the chosen sample
1994          * spec/channel map/device/suspend status back to the
1995          * client */
1996
1997         pa_tagstruct_put_sample_spec(reply, &ss);
1998         pa_tagstruct_put_channel_map(reply, &map);
1999
2000         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
2001         pa_tagstruct_puts(reply, s->sink_input->sink->name);
2002
2003         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2004     }
2005
2006     if (c->version >= 13)
2007         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2008
2009     pa_pstream_send_tagstruct(c->pstream, reply);
2010 }
2011
2012 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2013     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2014     uint32_t channel;
2015
2016     pa_native_connection_assert_ref(c);
2017     pa_assert(t);
2018
2019     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2020         !pa_tagstruct_eof(t)) {
2021         protocol_error(c);
2022         return;
2023     }
2024
2025     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2026
2027     switch (command) {
2028
2029         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2030             playback_stream *s;
2031             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2032                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2033                 return;
2034             }
2035
2036             playback_stream_unlink(s);
2037             break;
2038         }
2039
2040         case PA_COMMAND_DELETE_RECORD_STREAM: {
2041             record_stream *s;
2042             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2043                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2044                 return;
2045             }
2046
2047             record_stream_unlink(s);
2048             break;
2049         }
2050
2051         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2052             upload_stream *s;
2053
2054             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2055                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2056                 return;
2057             }
2058
2059             upload_stream_unlink(s);
2060             break;
2061         }
2062
2063         default:
2064             pa_assert_not_reached();
2065     }
2066
2067     pa_pstream_send_simple_ack(c->pstream, tag);
2068 }
2069
2070 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2071     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2072     record_stream *s;
2073     pa_buffer_attr attr;
2074     uint32_t source_index;
2075     const char *name = NULL, *source_name;
2076     pa_sample_spec ss;
2077     pa_channel_map map;
2078     pa_tagstruct *reply;
2079     pa_source *source = NULL;
2080     pa_bool_t
2081         corked = FALSE,
2082         no_remap = FALSE,
2083         no_remix = FALSE,
2084         fix_format = FALSE,
2085         fix_rate = FALSE,
2086         fix_channels = FALSE,
2087         no_move = FALSE,
2088         variable_rate = FALSE,
2089         adjust_latency = FALSE,
2090         peak_detect = FALSE,
2091         early_requests = FALSE,
2092         dont_inhibit_auto_suspend = FALSE,
2093         fail_on_suspend = FALSE;
2094     pa_source_output_flags_t flags = 0;
2095     pa_proplist *p;
2096     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2097     pa_sink_input *direct_on_input = NULL;
2098     int ret = PA_ERR_INVALID;
2099
2100     pa_native_connection_assert_ref(c);
2101     pa_assert(t);
2102
2103     memset(&attr, 0, sizeof(attr));
2104
2105     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2106         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2107         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2108         pa_tagstruct_getu32(t, &source_index) < 0 ||
2109         pa_tagstruct_gets(t, &source_name) < 0 ||
2110         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2111         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2112         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2113         protocol_error(c);
2114         return;
2115     }
2116
2117     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2118     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2119     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2120     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2121     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2122     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2123     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2124
2125     p = pa_proplist_new();
2126
2127     if (name)
2128         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2129
2130     if (c->version >= 12)  {
2131         /* Since 0.9.8 the user can ask for a couple of additional flags */
2132
2133         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2134             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2135             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2136             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2137             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2138             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2139             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2140
2141             protocol_error(c);
2142             pa_proplist_free(p);
2143             return;
2144         }
2145     }
2146
2147     if (c->version >= 13) {
2148
2149         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2150             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2151             pa_tagstruct_get_proplist(t, p) < 0 ||
2152             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2153             protocol_error(c);
2154             pa_proplist_free(p);
2155             return;
2156         }
2157     }
2158
2159     if (c->version >= 14) {
2160
2161         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2162             protocol_error(c);
2163             pa_proplist_free(p);
2164             return;
2165         }
2166     }
2167
2168     if (c->version >= 15) {
2169
2170         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2171             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2172             protocol_error(c);
2173             pa_proplist_free(p);
2174             return;
2175         }
2176     }
2177
2178     if (!pa_tagstruct_eof(t)) {
2179         protocol_error(c);
2180         pa_proplist_free(p);
2181         return;
2182     }
2183
2184     if (source_index != PA_INVALID_INDEX) {
2185
2186         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2187             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2188             pa_proplist_free(p);
2189             return;
2190         }
2191
2192     } else if (source_name) {
2193
2194         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2195             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2196             pa_proplist_free(p);
2197             return;
2198         }
2199     }
2200
2201     if (direct_on_input_idx != PA_INVALID_INDEX) {
2202
2203         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2204             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2205             pa_proplist_free(p);
2206             return;
2207         }
2208     }
2209
2210     flags =
2211         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2212         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2213         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2214         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2215         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2216         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2217         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2218         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2219         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2220         (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0);
2221
2222     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2223     pa_proplist_free(p);
2224
2225     CHECK_VALIDITY(c->pstream, s, tag, ret);
2226
2227     reply = reply_new(tag);
2228     pa_tagstruct_putu32(reply, s->index);
2229     pa_assert(s->source_output);
2230     pa_tagstruct_putu32(reply, s->source_output->index);
2231
2232     if (c->version >= 9) {
2233         /* Since 0.9 we support sending the buffer metrics back to the client */
2234
2235         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2236         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2237     }
2238
2239     if (c->version >= 12) {
2240         /* Since 0.9.8 we support sending the chosen sample
2241          * spec/channel map/device/suspend status back to the
2242          * client */
2243
2244         pa_tagstruct_put_sample_spec(reply, &ss);
2245         pa_tagstruct_put_channel_map(reply, &map);
2246
2247         pa_tagstruct_putu32(reply, s->source_output->source->index);
2248         pa_tagstruct_puts(reply, s->source_output->source->name);
2249
2250         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2251     }
2252
2253     if (c->version >= 13)
2254         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2255
2256     pa_pstream_send_tagstruct(c->pstream, reply);
2257 }
2258
2259 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2260     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2261     int ret;
2262
2263     pa_native_connection_assert_ref(c);
2264     pa_assert(t);
2265
2266     if (!pa_tagstruct_eof(t)) {
2267         protocol_error(c);
2268         return;
2269     }
2270
2271     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2272     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2273     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2274
2275     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2276 }
2277
2278 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2279     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2280     const void*cookie;
2281     pa_tagstruct *reply;
2282     pa_bool_t shm_on_remote = FALSE, do_shm;
2283
2284     pa_native_connection_assert_ref(c);
2285     pa_assert(t);
2286
2287     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2288         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2289         !pa_tagstruct_eof(t)) {
2290         protocol_error(c);
2291         return;
2292     }
2293
2294     /* Minimum supported version */
2295     if (c->version < 8) {
2296         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2297         return;
2298     }
2299
2300     /* Starting with protocol version 13 the MSB of the version tag
2301        reflects if shm is available for this pa_native_connection or
2302        not. */
2303     if (c->version >= 13) {
2304         shm_on_remote = !!(c->version & 0x80000000U);
2305         c->version &= 0x7FFFFFFFU;
2306     }
2307
2308     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2309
2310     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2311
2312     if (!c->authorized) {
2313         pa_bool_t success = FALSE;
2314
2315 #ifdef HAVE_CREDS
2316         const pa_creds *creds;
2317
2318         if ((creds = pa_pdispatch_creds(pd))) {
2319             if (creds->uid == getuid())
2320                 success = TRUE;
2321             else if (c->options->auth_group) {
2322                 int r;
2323                 gid_t gid;
2324
2325                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2326                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2327                 else if (gid == creds->gid)
2328                     success = TRUE;
2329
2330                 if (!success) {
2331                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2332                         pa_log_warn("Failed to check group membership.");
2333                     else if (r > 0)
2334                         success = TRUE;
2335                 }
2336             }
2337
2338             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2339                         (unsigned long) creds->uid,
2340                         (unsigned long) creds->gid,
2341                         (int) success);
2342         }
2343 #endif
2344
2345         if (!success && c->options->auth_cookie) {
2346             const uint8_t *ac;
2347
2348             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2349                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2350                     success = TRUE;
2351         }
2352
2353         if (!success) {
2354             pa_log_warn("Denied access to client with invalid authorization data.");
2355             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2356             return;
2357         }
2358
2359         c->authorized = TRUE;
2360         if (c->auth_timeout_event) {
2361             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2362             c->auth_timeout_event = NULL;
2363         }
2364     }
2365
2366     /* Enable shared memory support if possible */
2367     do_shm =
2368         pa_mempool_is_shared(c->protocol->core->mempool) &&
2369         c->is_local;
2370
2371     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2372
2373     if (do_shm)
2374         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2375             do_shm = FALSE;
2376
2377 #ifdef HAVE_CREDS
2378     if (do_shm) {
2379         /* Only enable SHM if both sides are owned by the same
2380          * user. This is a security measure because otherwise data
2381          * private to the user might leak. */
2382
2383         const pa_creds *creds;
2384         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2385             do_shm = FALSE;
2386     }
2387 #endif
2388
2389     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2390     pa_pstream_enable_shm(c->pstream, do_shm);
2391
2392     reply = reply_new(tag);
2393     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2394
2395 #ifdef HAVE_CREDS
2396 {
2397     /* SHM support is only enabled after both sides made sure they are the same user. */
2398
2399     pa_creds ucred;
2400
2401     ucred.uid = getuid();
2402     ucred.gid = getgid();
2403
2404     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2405 }
2406 #else
2407     pa_pstream_send_tagstruct(c->pstream, reply);
2408 #endif
2409 }
2410
2411 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2412     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2413     const char *name = NULL;
2414     pa_proplist *p;
2415     pa_tagstruct *reply;
2416
2417     pa_native_connection_assert_ref(c);
2418     pa_assert(t);
2419
2420     p = pa_proplist_new();
2421
2422     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2423         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2424         !pa_tagstruct_eof(t)) {
2425
2426         protocol_error(c);
2427         pa_proplist_free(p);
2428         return;
2429     }
2430
2431     if (name)
2432         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2433             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2434             pa_proplist_free(p);
2435             return;
2436         }
2437
2438     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2439     pa_proplist_free(p);
2440
2441     reply = reply_new(tag);
2442
2443     if (c->version >= 13)
2444         pa_tagstruct_putu32(reply, c->client->index);
2445
2446     pa_pstream_send_tagstruct(c->pstream, reply);
2447 }
2448
2449 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2450     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2451     const char *name;
2452     uint32_t idx = PA_IDXSET_INVALID;
2453
2454     pa_native_connection_assert_ref(c);
2455     pa_assert(t);
2456
2457     if (pa_tagstruct_gets(t, &name) < 0 ||
2458         !pa_tagstruct_eof(t)) {
2459         protocol_error(c);
2460         return;
2461     }
2462
2463     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2464     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2465
2466     if (command == PA_COMMAND_LOOKUP_SINK) {
2467         pa_sink *sink;
2468         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2469             idx = sink->index;
2470     } else {
2471         pa_source *source;
2472         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2473         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2474             idx = source->index;
2475     }
2476
2477     if (idx == PA_IDXSET_INVALID)
2478         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2479     else {
2480         pa_tagstruct *reply;
2481         reply = reply_new(tag);
2482         pa_tagstruct_putu32(reply, idx);
2483         pa_pstream_send_tagstruct(c->pstream, reply);
2484     }
2485 }
2486
2487 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2488     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2489     uint32_t idx;
2490     playback_stream *s;
2491
2492     pa_native_connection_assert_ref(c);
2493     pa_assert(t);
2494
2495     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2496         !pa_tagstruct_eof(t)) {
2497         protocol_error(c);
2498         return;
2499     }
2500
2501     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2502     s = pa_idxset_get_by_index(c->output_streams, idx);
2503     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2504     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2505
2506     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2507 }
2508
2509 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2510     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2511     pa_tagstruct *reply;
2512     const pa_mempool_stat *stat;
2513
2514     pa_native_connection_assert_ref(c);
2515     pa_assert(t);
2516
2517     if (!pa_tagstruct_eof(t)) {
2518         protocol_error(c);
2519         return;
2520     }
2521
2522     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2523
2524     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2525
2526     reply = reply_new(tag);
2527     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2528     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2529     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2530     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2531     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2532     pa_pstream_send_tagstruct(c->pstream, reply);
2533 }
2534
2535 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2536     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2537     pa_tagstruct *reply;
2538     playback_stream *s;
2539     struct timeval tv, now;
2540     uint32_t idx;
2541
2542     pa_native_connection_assert_ref(c);
2543     pa_assert(t);
2544
2545     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2546         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2547         !pa_tagstruct_eof(t)) {
2548         protocol_error(c);
2549         return;
2550     }
2551
2552     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2553     s = pa_idxset_get_by_index(c->output_streams, idx);
2554     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2555     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2556
2557     /* Get an atomic snapshot of all timing parameters */
2558     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2559
2560     reply = reply_new(tag);
2561     pa_tagstruct_put_usec(reply,
2562                           s->current_sink_latency +
2563                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2564     pa_tagstruct_put_usec(reply, 0);
2565     pa_tagstruct_put_boolean(reply,
2566                              s->playing_for > 0 &&
2567                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2568                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2569     pa_tagstruct_put_timeval(reply, &tv);
2570     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2571     pa_tagstruct_puts64(reply, s->write_index);
2572     pa_tagstruct_puts64(reply, s->read_index);
2573
2574     if (c->version >= 13) {
2575         pa_tagstruct_putu64(reply, s->underrun_for);
2576         pa_tagstruct_putu64(reply, s->playing_for);
2577     }
2578
2579     pa_pstream_send_tagstruct(c->pstream, reply);
2580 }
2581
2582 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2583     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2584     pa_tagstruct *reply;
2585     record_stream *s;
2586     struct timeval tv, now;
2587     uint32_t idx;
2588
2589     pa_native_connection_assert_ref(c);
2590     pa_assert(t);
2591
2592     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2593         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2594         !pa_tagstruct_eof(t)) {
2595         protocol_error(c);
2596         return;
2597     }
2598
2599     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2600     s = pa_idxset_get_by_index(c->record_streams, idx);
2601     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2602
2603     /* Get an atomic snapshot of all timing parameters */
2604     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2605
2606     reply = reply_new(tag);
2607     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2608     pa_tagstruct_put_usec(reply,
2609                           s->current_source_latency +
2610                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2611     pa_tagstruct_put_boolean(reply,
2612                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2613                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2614     pa_tagstruct_put_timeval(reply, &tv);
2615     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2616     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2617     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2618     pa_pstream_send_tagstruct(c->pstream, reply);
2619 }
2620
2621 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2622     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2623     upload_stream *s;
2624     uint32_t length;
2625     const char *name = NULL;
2626     pa_sample_spec ss;
2627     pa_channel_map map;
2628     pa_tagstruct *reply;
2629     pa_proplist *p;
2630
2631     pa_native_connection_assert_ref(c);
2632     pa_assert(t);
2633
2634     if (pa_tagstruct_gets(t, &name) < 0 ||
2635         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2636         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2637         pa_tagstruct_getu32(t, &length) < 0) {
2638         protocol_error(c);
2639         return;
2640     }
2641
2642     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2643     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2644     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2645     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2646     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2647     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2648
2649     p = pa_proplist_new();
2650
2651     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2652         !pa_tagstruct_eof(t)) {
2653
2654         protocol_error(c);
2655         pa_proplist_free(p);
2656         return;
2657     }
2658
2659     if (c->version < 13)
2660         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2661     else if (!name)
2662         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2663             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2664
2665     if (!name || !pa_namereg_is_valid_name(name)) {
2666         pa_proplist_free(p);
2667         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2668     }
2669
2670     s = upload_stream_new(c, &ss, &map, name, length, p);
2671     pa_proplist_free(p);
2672
2673     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2674
2675     reply = reply_new(tag);
2676     pa_tagstruct_putu32(reply, s->index);
2677     pa_tagstruct_putu32(reply, length);
2678     pa_pstream_send_tagstruct(c->pstream, reply);
2679 }
2680
2681 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2682     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2683     uint32_t channel;
2684     upload_stream *s;
2685     uint32_t idx;
2686
2687     pa_native_connection_assert_ref(c);
2688     pa_assert(t);
2689
2690     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2691         !pa_tagstruct_eof(t)) {
2692         protocol_error(c);
2693         return;
2694     }
2695
2696     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2697
2698     s = pa_idxset_get_by_index(c->output_streams, channel);
2699     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2700     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2701
2702     if (!s->memchunk.memblock)
2703         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2704     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2705         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2706     else
2707         pa_pstream_send_simple_ack(c->pstream, tag);
2708
2709     upload_stream_unlink(s);
2710 }
2711
2712 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2713     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2714     uint32_t sink_index;
2715     pa_volume_t volume;
2716     pa_sink *sink;
2717     const char *name, *sink_name;
2718     uint32_t idx;
2719     pa_proplist *p;
2720     pa_tagstruct *reply;
2721
2722     pa_native_connection_assert_ref(c);
2723     pa_assert(t);
2724
2725     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2726
2727     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2728         pa_tagstruct_gets(t, &sink_name) < 0 ||
2729         pa_tagstruct_getu32(t, &volume) < 0 ||
2730         pa_tagstruct_gets(t, &name) < 0) {
2731         protocol_error(c);
2732         return;
2733     }
2734
2735     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2736     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2737     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2738     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2739
2740     if (sink_index != PA_INVALID_INDEX)
2741         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2742     else
2743         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2744
2745     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2746
2747     p = pa_proplist_new();
2748
2749     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2750         !pa_tagstruct_eof(t)) {
2751         protocol_error(c);
2752         pa_proplist_free(p);
2753         return;
2754     }
2755
2756     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2757
2758     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2759         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2760         pa_proplist_free(p);
2761         return;
2762     }
2763
2764     pa_proplist_free(p);
2765
2766     reply = reply_new(tag);
2767
2768     if (c->version >= 13)
2769         pa_tagstruct_putu32(reply, idx);
2770
2771     pa_pstream_send_tagstruct(c->pstream, reply);
2772 }
2773
2774 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2775     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2776     const char *name;
2777
2778     pa_native_connection_assert_ref(c);
2779     pa_assert(t);
2780
2781     if (pa_tagstruct_gets(t, &name) < 0 ||
2782         !pa_tagstruct_eof(t)) {
2783         protocol_error(c);
2784         return;
2785     }
2786
2787     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2788     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2789
2790     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2791         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2792         return;
2793     }
2794
2795     pa_pstream_send_simple_ack(c->pstream, tag);
2796 }
2797
2798 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2799     pa_assert(c);
2800     pa_assert(fixed);
2801     pa_assert(original);
2802
2803     *fixed = *original;
2804
2805     if (c->version < 12) {
2806         /* Before protocol version 12 we didn't support S32 samples,
2807          * so we need to lie about this to the client */
2808
2809         if (fixed->format == PA_SAMPLE_S32LE)
2810             fixed->format = PA_SAMPLE_FLOAT32LE;
2811         if (fixed->format == PA_SAMPLE_S32BE)
2812             fixed->format = PA_SAMPLE_FLOAT32BE;
2813     }
2814
2815     if (c->version < 15) {
2816         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2817             fixed->format = PA_SAMPLE_FLOAT32LE;
2818         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2819             fixed->format = PA_SAMPLE_FLOAT32BE;
2820     }
2821 }
2822
2823 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2824     pa_sample_spec fixed_ss;
2825
2826     pa_assert(t);
2827     pa_sink_assert_ref(sink);
2828
2829     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2830
2831     pa_tagstruct_put(
2832         t,
2833         PA_TAG_U32, sink->index,
2834         PA_TAG_STRING, sink->name,
2835         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2836         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2837         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2838         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2839         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
2840         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2841         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2842         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2843         PA_TAG_USEC, pa_sink_get_latency(sink),
2844         PA_TAG_STRING, sink->driver,
2845         PA_TAG_U32, sink->flags,
2846         PA_TAG_INVALID);
2847
2848     if (c->version >= 13) {
2849         pa_tagstruct_put_proplist(t, sink->proplist);
2850         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2851     }
2852
2853     if (c->version >= 15) {
2854         pa_tagstruct_put_volume(t, sink->base_volume);
2855         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2856             pa_log_error("Internal sink state is invalid.");
2857         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2858         pa_tagstruct_putu32(t, sink->n_volume_steps);
2859         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2860     }
2861
2862     if (c->version >= 16) {
2863         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
2864
2865         if (sink->ports) {
2866             void *state;
2867             pa_device_port *p;
2868
2869             PA_HASHMAP_FOREACH(p, sink->ports, state) {
2870                 pa_tagstruct_puts(t, p->name);
2871                 pa_tagstruct_puts(t, p->description);
2872                 pa_tagstruct_putu32(t, p->priority);
2873             }
2874         }
2875
2876         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
2877     }
2878 }
2879
2880 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2881     pa_sample_spec fixed_ss;
2882
2883     pa_assert(t);
2884     pa_source_assert_ref(source);
2885
2886     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2887
2888     pa_tagstruct_put(
2889         t,
2890         PA_TAG_U32, source->index,
2891         PA_TAG_STRING, source->name,
2892         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2893         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2894         PA_TAG_CHANNEL_MAP, &source->channel_map,
2895         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2896         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2897         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2898         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2899         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2900         PA_TAG_USEC, pa_source_get_latency(source),
2901         PA_TAG_STRING, source->driver,
2902         PA_TAG_U32, source->flags,
2903         PA_TAG_INVALID);
2904
2905     if (c->version >= 13) {
2906         pa_tagstruct_put_proplist(t, source->proplist);
2907         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2908     }
2909
2910     if (c->version >= 15) {
2911         pa_tagstruct_put_volume(t, source->base_volume);
2912         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2913             pa_log_error("Internal source state is invalid.");
2914         pa_tagstruct_putu32(t, pa_source_get_state(source));
2915         pa_tagstruct_putu32(t, source->n_volume_steps);
2916         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2917     }
2918
2919     if (c->version >= 16) {
2920
2921         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
2922
2923         if (source->ports) {
2924             void *state;
2925             pa_device_port *p;
2926
2927             PA_HASHMAP_FOREACH(p, source->ports, state) {
2928                 pa_tagstruct_puts(t, p->name);
2929                 pa_tagstruct_puts(t, p->description);
2930                 pa_tagstruct_putu32(t, p->priority);
2931             }
2932         }
2933
2934         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
2935     }
2936 }
2937
2938 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2939     pa_assert(t);
2940     pa_assert(client);
2941
2942     pa_tagstruct_putu32(t, client->index);
2943     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2944     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2945     pa_tagstruct_puts(t, client->driver);
2946
2947     if (c->version >= 13)
2948         pa_tagstruct_put_proplist(t, client->proplist);
2949 }
2950
2951 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2952     void *state = NULL;
2953     pa_card_profile *p;
2954
2955     pa_assert(t);
2956     pa_assert(card);
2957
2958     pa_tagstruct_putu32(t, card->index);
2959     pa_tagstruct_puts(t, card->name);
2960     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2961     pa_tagstruct_puts(t, card->driver);
2962
2963     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2964
2965     if (card->profiles) {
2966         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2967             pa_tagstruct_puts(t, p->name);
2968             pa_tagstruct_puts(t, p->description);
2969             pa_tagstruct_putu32(t, p->n_sinks);
2970             pa_tagstruct_putu32(t, p->n_sources);
2971             pa_tagstruct_putu32(t, p->priority);
2972         }
2973     }
2974
2975     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2976     pa_tagstruct_put_proplist(t, card->proplist);
2977 }
2978
2979 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2980     pa_assert(t);
2981     pa_assert(module);
2982
2983     pa_tagstruct_putu32(t, module->index);
2984     pa_tagstruct_puts(t, module->name);
2985     pa_tagstruct_puts(t, module->argument);
2986     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2987
2988     if (c->version < 15)
2989         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2990
2991     if (c->version >= 15)
2992         pa_tagstruct_put_proplist(t, module->proplist);
2993 }
2994
2995 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2996     pa_sample_spec fixed_ss;
2997     pa_usec_t sink_latency;
2998     pa_cvolume v;
2999
3000     pa_assert(t);
3001     pa_sink_input_assert_ref(s);
3002
3003     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3004
3005     pa_tagstruct_putu32(t, s->index);
3006     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3007     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3008     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3009     pa_tagstruct_putu32(t, s->sink->index);
3010     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3011     pa_tagstruct_put_channel_map(t, &s->channel_map);
3012     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s, &v, TRUE));
3013     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3014     pa_tagstruct_put_usec(t, sink_latency);
3015     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3016     pa_tagstruct_puts(t, s->driver);
3017     if (c->version >= 11)
3018         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3019     if (c->version >= 13)
3020         pa_tagstruct_put_proplist(t, s->proplist);
3021 }
3022
3023 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3024     pa_sample_spec fixed_ss;
3025     pa_usec_t source_latency;
3026
3027     pa_assert(t);
3028     pa_source_output_assert_ref(s);
3029
3030     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3031
3032     pa_tagstruct_putu32(t, s->index);
3033     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3034     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3035     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3036     pa_tagstruct_putu32(t, s->source->index);
3037     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3038     pa_tagstruct_put_channel_map(t, &s->channel_map);
3039     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3040     pa_tagstruct_put_usec(t, source_latency);
3041     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3042     pa_tagstruct_puts(t, s->driver);
3043
3044     if (c->version >= 13)
3045         pa_tagstruct_put_proplist(t, s->proplist);
3046 }
3047
3048 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3049     pa_sample_spec fixed_ss;
3050     pa_cvolume v;
3051
3052     pa_assert(t);
3053     pa_assert(e);
3054
3055     if (e->memchunk.memblock)
3056         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3057     else
3058         memset(&fixed_ss, 0, sizeof(fixed_ss));
3059
3060     pa_tagstruct_putu32(t, e->index);
3061     pa_tagstruct_puts(t, e->name);
3062
3063     if (e->volume_is_set)
3064         v = e->volume;
3065     else
3066         pa_cvolume_init(&v);
3067
3068     pa_tagstruct_put_cvolume(t, &v);
3069     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3070     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3071     pa_tagstruct_put_channel_map(t, &e->channel_map);
3072     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3073     pa_tagstruct_put_boolean(t, e->lazy);
3074     pa_tagstruct_puts(t, e->filename);
3075
3076     if (c->version >= 13)
3077         pa_tagstruct_put_proplist(t, e->proplist);
3078 }
3079
3080 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3081     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3082     uint32_t idx;
3083     pa_sink *sink = NULL;
3084     pa_source *source = NULL;
3085     pa_client *client = NULL;
3086     pa_card *card = NULL;
3087     pa_module *module = NULL;
3088     pa_sink_input *si = NULL;
3089     pa_source_output *so = NULL;
3090     pa_scache_entry *sce = NULL;
3091     const char *name = NULL;
3092     pa_tagstruct *reply;
3093
3094     pa_native_connection_assert_ref(c);
3095     pa_assert(t);
3096
3097     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3098         (command != PA_COMMAND_GET_CLIENT_INFO &&
3099          command != PA_COMMAND_GET_MODULE_INFO &&
3100          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3101          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3102          pa_tagstruct_gets(t, &name) < 0) ||
3103         !pa_tagstruct_eof(t)) {
3104         protocol_error(c);
3105         return;
3106     }
3107
3108     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3109     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3110     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3111     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3112     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3113
3114     if (command == PA_COMMAND_GET_SINK_INFO) {
3115         if (idx != PA_INVALID_INDEX)
3116             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3117         else
3118             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3119     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3120         if (idx != PA_INVALID_INDEX)
3121             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3122         else
3123             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3124     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3125         if (idx != PA_INVALID_INDEX)
3126             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3127         else
3128             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3129     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3130         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3131     else if (command == PA_COMMAND_GET_MODULE_INFO)
3132         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3133     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3134         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3135     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3136         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3137     else {
3138         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3139         if (idx != PA_INVALID_INDEX)
3140             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3141         else
3142             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3143     }
3144
3145     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3146         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3147         return;
3148     }
3149
3150     reply = reply_new(tag);
3151     if (sink)
3152         sink_fill_tagstruct(c, reply, sink);
3153     else if (source)
3154         source_fill_tagstruct(c, reply, source);
3155     else if (client)
3156         client_fill_tagstruct(c, reply, client);
3157     else if (card)
3158         card_fill_tagstruct(c, reply, card);
3159     else if (module)
3160         module_fill_tagstruct(c, reply, module);
3161     else if (si)
3162         sink_input_fill_tagstruct(c, reply, si);
3163     else if (so)
3164         source_output_fill_tagstruct(c, reply, so);
3165     else
3166         scache_fill_tagstruct(c, reply, sce);
3167     pa_pstream_send_tagstruct(c->pstream, reply);
3168 }
3169
3170 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3171     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3172     pa_idxset *i;
3173     uint32_t idx;
3174     void *p;
3175     pa_tagstruct *reply;
3176
3177     pa_native_connection_assert_ref(c);
3178     pa_assert(t);
3179
3180     if (!pa_tagstruct_eof(t)) {
3181         protocol_error(c);
3182         return;
3183     }
3184
3185     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3186
3187     reply = reply_new(tag);
3188
3189     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3190         i = c->protocol->core->sinks;
3191     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3192         i = c->protocol->core->sources;
3193     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3194         i = c->protocol->core->clients;
3195     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3196         i = c->protocol->core->cards;
3197     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3198         i = c->protocol->core->modules;
3199     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3200         i = c->protocol->core->sink_inputs;
3201     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3202         i = c->protocol->core->source_outputs;
3203     else {
3204         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3205         i = c->protocol->core->scache;
3206     }
3207
3208     if (i) {
3209         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3210             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3211                 sink_fill_tagstruct(c, reply, p);
3212             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3213                 source_fill_tagstruct(c, reply, p);
3214             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3215                 client_fill_tagstruct(c, reply, p);
3216             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3217                 card_fill_tagstruct(c, reply, p);
3218             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3219                 module_fill_tagstruct(c, reply, p);
3220             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3221                 sink_input_fill_tagstruct(c, reply, p);
3222             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3223                 source_output_fill_tagstruct(c, reply, p);
3224             else {
3225                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3226                 scache_fill_tagstruct(c, reply, p);
3227             }
3228         }
3229     }
3230
3231     pa_pstream_send_tagstruct(c->pstream, reply);
3232 }
3233
3234 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3235     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3236     pa_tagstruct *reply;
3237     pa_sink *def_sink;
3238     pa_source *def_source;
3239     pa_sample_spec fixed_ss;
3240     char *h, *u;
3241
3242     pa_native_connection_assert_ref(c);
3243     pa_assert(t);
3244
3245     if (!pa_tagstruct_eof(t)) {
3246         protocol_error(c);
3247         return;
3248     }
3249
3250     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3251
3252     reply = reply_new(tag);
3253     pa_tagstruct_puts(reply, PACKAGE_NAME);
3254     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3255
3256     u = pa_get_user_name_malloc();
3257     pa_tagstruct_puts(reply, u);
3258     pa_xfree(u);
3259
3260     h = pa_get_host_name_malloc();
3261     pa_tagstruct_puts(reply, h);
3262     pa_xfree(h);
3263
3264     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3265     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3266
3267     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3268     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3269     def_source = pa_namereg_get_default_source(c->protocol->core);
3270     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3271
3272     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3273
3274     if (c->version >= 15)
3275         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3276
3277     pa_pstream_send_tagstruct(c->pstream, reply);
3278 }
3279
3280 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3281     pa_tagstruct *t;
3282     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3283
3284     pa_native_connection_assert_ref(c);
3285
3286     t = pa_tagstruct_new(NULL, 0);
3287     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3288     pa_tagstruct_putu32(t, (uint32_t) -1);
3289     pa_tagstruct_putu32(t, e);
3290     pa_tagstruct_putu32(t, idx);
3291     pa_pstream_send_tagstruct(c->pstream, t);
3292 }
3293
3294 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3295     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3296     pa_subscription_mask_t m;
3297
3298     pa_native_connection_assert_ref(c);
3299     pa_assert(t);
3300
3301     if (pa_tagstruct_getu32(t, &m) < 0 ||
3302         !pa_tagstruct_eof(t)) {
3303         protocol_error(c);
3304         return;
3305     }
3306
3307     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3308     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3309
3310     if (c->subscription)
3311         pa_subscription_free(c->subscription);
3312
3313     if (m != 0) {
3314         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3315         pa_assert(c->subscription);
3316     } else
3317         c->subscription = NULL;
3318
3319     pa_pstream_send_simple_ack(c->pstream, tag);
3320 }
3321
3322 static void command_set_volume(
3323         pa_pdispatch *pd,
3324         uint32_t command,
3325         uint32_t tag,
3326         pa_tagstruct *t,
3327         void *userdata) {
3328
3329     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3330     uint32_t idx;
3331     pa_cvolume volume;
3332     pa_sink *sink = NULL;
3333     pa_source *source = NULL;
3334     pa_sink_input *si = NULL;
3335     const char *name = NULL;
3336     const char *client_name;
3337
3338     pa_native_connection_assert_ref(c);
3339     pa_assert(t);
3340
3341     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3342         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3343         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3344         pa_tagstruct_get_cvolume(t, &volume) ||
3345         !pa_tagstruct_eof(t)) {
3346         protocol_error(c);
3347         return;
3348     }
3349
3350     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3351     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3352     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3353     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3354     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3355     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3356
3357     switch (command) {
3358
3359         case PA_COMMAND_SET_SINK_VOLUME:
3360             if (idx != PA_INVALID_INDEX)
3361                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3362             else
3363                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3364             break;
3365
3366         case PA_COMMAND_SET_SOURCE_VOLUME:
3367             if (idx != PA_INVALID_INDEX)
3368                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3369             else
3370                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3371             break;
3372
3373         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3374             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3375             break;
3376
3377         default:
3378             pa_assert_not_reached();
3379     }
3380
3381     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3382
3383     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3384
3385     if (sink) {
3386         pa_log_debug("Client %s changes volume of sink %s.", client_name, sink->name);
3387         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3388     } else if (source) {
3389         pa_log_debug("Client %s changes volume of sink %s.", client_name, source->name);
3390         pa_source_set_volume(source, &volume, TRUE);
3391     } else if (si) {
3392         pa_log_debug("Client %s changes volume of sink %s.",
3393                      client_name,
3394                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3395         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3396     }
3397
3398     pa_pstream_send_simple_ack(c->pstream, tag);
3399 }
3400
3401 static void command_set_mute(
3402         pa_pdispatch *pd,
3403         uint32_t command,
3404         uint32_t tag,
3405         pa_tagstruct *t,
3406         void *userdata) {
3407
3408     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3409     uint32_t idx;
3410     pa_bool_t mute;
3411     pa_sink *sink = NULL;
3412     pa_source *source = NULL;
3413     pa_sink_input *si = NULL;
3414     const char *name = NULL;
3415
3416     pa_native_connection_assert_ref(c);
3417     pa_assert(t);
3418
3419     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3420         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3421         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3422         pa_tagstruct_get_boolean(t, &mute) ||
3423         !pa_tagstruct_eof(t)) {
3424         protocol_error(c);
3425         return;
3426     }
3427
3428     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3429     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3430     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3431     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3432     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3433
3434     switch (command) {
3435
3436         case PA_COMMAND_SET_SINK_MUTE:
3437
3438             if (idx != PA_INVALID_INDEX)
3439                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3440             else
3441                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3442
3443             break;
3444
3445         case PA_COMMAND_SET_SOURCE_MUTE:
3446             if (idx != PA_INVALID_INDEX)
3447                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3448             else
3449                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3450
3451             break;
3452
3453         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3454             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3455             break;
3456
3457         default:
3458             pa_assert_not_reached();
3459     }
3460
3461     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3462
3463     if (sink)
3464         pa_sink_set_mute(sink, mute, TRUE);
3465     else if (source)
3466         pa_source_set_mute(source, mute, TRUE);
3467     else if (si)
3468         pa_sink_input_set_mute(si, mute, TRUE);
3469
3470     pa_pstream_send_simple_ack(c->pstream, tag);
3471 }
3472
3473 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3474     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3475     uint32_t idx;
3476     pa_bool_t b;
3477     playback_stream *s;
3478
3479     pa_native_connection_assert_ref(c);
3480     pa_assert(t);
3481
3482     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3483         pa_tagstruct_get_boolean(t, &b) < 0 ||
3484         !pa_tagstruct_eof(t)) {
3485         protocol_error(c);
3486         return;
3487     }
3488
3489     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3490     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3491     s = pa_idxset_get_by_index(c->output_streams, idx);
3492     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3493     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3494
3495     pa_sink_input_cork(s->sink_input, b);
3496
3497     if (b)
3498         s->is_underrun = TRUE;
3499
3500     pa_pstream_send_simple_ack(c->pstream, tag);
3501 }
3502
3503 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3504     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3505     uint32_t idx;
3506     playback_stream *s;
3507
3508     pa_native_connection_assert_ref(c);
3509     pa_assert(t);
3510
3511     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3512         !pa_tagstruct_eof(t)) {
3513         protocol_error(c);
3514         return;
3515     }
3516
3517     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3518     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3519     s = pa_idxset_get_by_index(c->output_streams, idx);
3520     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3521     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3522
3523     switch (command) {
3524         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3525             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3526             break;
3527
3528         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3529             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3530             break;
3531
3532         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3533             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3534             break;
3535
3536         default:
3537             pa_assert_not_reached();
3538     }
3539
3540     pa_pstream_send_simple_ack(c->pstream, tag);
3541 }
3542
3543 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3544     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3545     uint32_t idx;
3546     record_stream *s;
3547     pa_bool_t b;
3548
3549     pa_native_connection_assert_ref(c);
3550     pa_assert(t);
3551
3552     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3553         pa_tagstruct_get_boolean(t, &b) < 0 ||
3554         !pa_tagstruct_eof(t)) {
3555         protocol_error(c);
3556         return;
3557     }
3558
3559     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3560     s = pa_idxset_get_by_index(c->record_streams, idx);
3561     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3562
3563     pa_source_output_cork(s->source_output, b);
3564     pa_memblockq_prebuf_force(s->memblockq);
3565     pa_pstream_send_simple_ack(c->pstream, tag);
3566 }
3567
3568 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3569     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3570     uint32_t idx;
3571     record_stream *s;
3572
3573     pa_native_connection_assert_ref(c);
3574     pa_assert(t);
3575
3576     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3577         !pa_tagstruct_eof(t)) {
3578         protocol_error(c);
3579         return;
3580     }
3581
3582     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3583     s = pa_idxset_get_by_index(c->record_streams, idx);
3584     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3585
3586     pa_memblockq_flush_read(s->memblockq);
3587     pa_pstream_send_simple_ack(c->pstream, tag);
3588 }
3589
3590 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3591     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3592     uint32_t idx;
3593     pa_buffer_attr a;
3594     pa_tagstruct *reply;
3595
3596     pa_native_connection_assert_ref(c);
3597     pa_assert(t);
3598
3599     memset(&a, 0, sizeof(a));
3600
3601     if (pa_tagstruct_getu32(t, &idx) < 0) {
3602         protocol_error(c);
3603         return;
3604     }
3605
3606     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3607
3608     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3609         playback_stream *s;
3610         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3611
3612         s = pa_idxset_get_by_index(c->output_streams, idx);
3613         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3614         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3615
3616         if (pa_tagstruct_get(
3617                     t,
3618                     PA_TAG_U32, &a.maxlength,
3619                     PA_TAG_U32, &a.tlength,
3620                     PA_TAG_U32, &a.prebuf,
3621                     PA_TAG_U32, &a.minreq,
3622                     PA_TAG_INVALID) < 0 ||
3623             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3624             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3625             !pa_tagstruct_eof(t)) {
3626             protocol_error(c);
3627             return;
3628         }
3629
3630         s->adjust_latency = adjust_latency;
3631         s->early_requests = early_requests;
3632         s->buffer_attr = a;
3633
3634         fix_playback_buffer_attr(s);
3635         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3636
3637         reply = reply_new(tag);
3638         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3639         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3640         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3641         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3642
3643         if (c->version >= 13)
3644             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3645
3646     } else {
3647         record_stream *s;
3648         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3649         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3650
3651         s = pa_idxset_get_by_index(c->record_streams, idx);
3652         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3653
3654         if (pa_tagstruct_get(
3655                     t,
3656                     PA_TAG_U32, &a.maxlength,
3657                     PA_TAG_U32, &a.fragsize,
3658                     PA_TAG_INVALID) < 0 ||
3659             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3660             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3661             !pa_tagstruct_eof(t)) {
3662             protocol_error(c);
3663             return;
3664         }
3665
3666         s->adjust_latency = adjust_latency;
3667         s->early_requests = early_requests;
3668         s->buffer_attr = a;
3669
3670         fix_record_buffer_attr_pre(s);
3671         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3672         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3673         fix_record_buffer_attr_post(s);
3674
3675         reply = reply_new(tag);
3676         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3677         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3678
3679         if (c->version >= 13)
3680             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3681     }
3682
3683     pa_pstream_send_tagstruct(c->pstream, reply);
3684 }
3685
3686 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3687     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3688     uint32_t idx;
3689     uint32_t rate;
3690
3691     pa_native_connection_assert_ref(c);
3692     pa_assert(t);
3693
3694     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3695         pa_tagstruct_getu32(t, &rate) < 0 ||
3696         !pa_tagstruct_eof(t)) {
3697         protocol_error(c);
3698         return;
3699     }
3700
3701     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3702     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3703
3704     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3705         playback_stream *s;
3706
3707         s = pa_idxset_get_by_index(c->output_streams, idx);
3708         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3709         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3710
3711         pa_sink_input_set_rate(s->sink_input, rate);
3712
3713     } else {
3714         record_stream *s;
3715         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3716
3717         s = pa_idxset_get_by_index(c->record_streams, idx);
3718         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3719
3720         pa_source_output_set_rate(s->source_output, rate);
3721     }
3722
3723     pa_pstream_send_simple_ack(c->pstream, tag);
3724 }
3725
3726 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3727     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3728     uint32_t idx;
3729     uint32_t mode;
3730     pa_proplist *p;
3731
3732     pa_native_connection_assert_ref(c);
3733     pa_assert(t);
3734
3735     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3736
3737     p = pa_proplist_new();
3738
3739     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3740
3741         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3742             pa_tagstruct_get_proplist(t, p) < 0 ||
3743             !pa_tagstruct_eof(t)) {
3744             protocol_error(c);
3745             pa_proplist_free(p);
3746             return;
3747         }
3748
3749     } else {
3750
3751         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3752             pa_tagstruct_getu32(t, &mode) < 0 ||
3753             pa_tagstruct_get_proplist(t, p) < 0 ||
3754             !pa_tagstruct_eof(t)) {
3755             protocol_error(c);
3756             pa_proplist_free(p);
3757             return;
3758         }
3759     }
3760
3761     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3762         pa_proplist_free(p);
3763         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3764     }
3765
3766     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3767         playback_stream *s;
3768
3769         s = pa_idxset_get_by_index(c->output_streams, idx);
3770         if (!s || !playback_stream_isinstance(s)) {
3771             pa_proplist_free(p);
3772             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3773         }
3774         pa_sink_input_update_proplist(s->sink_input, mode, p);
3775
3776     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3777         record_stream *s;
3778
3779         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3780             pa_proplist_free(p);
3781             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3782         }
3783         pa_source_output_update_proplist(s->source_output, mode, p);
3784
3785     } else {
3786         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3787
3788         pa_client_update_proplist(c->client, mode, p);
3789     }
3790
3791     pa_pstream_send_simple_ack(c->pstream, tag);
3792     pa_proplist_free(p);
3793 }
3794
3795 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3796     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3797     uint32_t idx;
3798     unsigned changed = 0;
3799     pa_proplist *p;
3800     pa_strlist *l = NULL;
3801
3802     pa_native_connection_assert_ref(c);
3803     pa_assert(t);
3804
3805     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3806
3807     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3808
3809         if (pa_tagstruct_getu32(t, &idx) < 0) {
3810             protocol_error(c);
3811             return;
3812         }
3813     }
3814
3815     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3816         playback_stream *s;
3817
3818         s = pa_idxset_get_by_index(c->output_streams, idx);
3819         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3820         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3821
3822         p = s->sink_input->proplist;
3823
3824     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3825         record_stream *s;
3826
3827         s = pa_idxset_get_by_index(c->record_streams, idx);
3828         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3829
3830         p = s->source_output->proplist;
3831     } else {
3832         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3833
3834         p = c->client->proplist;
3835     }
3836
3837     for (;;) {
3838         const char *k;
3839
3840         if (pa_tagstruct_gets(t, &k) < 0) {
3841             protocol_error(c);
3842             pa_strlist_free(l);
3843             return;
3844         }
3845
3846         if (!k)
3847             break;
3848
3849         l = pa_strlist_prepend(l, k);
3850     }
3851
3852     if (!pa_tagstruct_eof(t)) {
3853         protocol_error(c);
3854         pa_strlist_free(l);
3855         return;
3856     }
3857
3858     for (;;) {
3859         char *z;
3860
3861         l = pa_strlist_pop(l, &z);
3862
3863         if (!z)
3864             break;
3865
3866         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3867         pa_xfree(z);
3868     }
3869
3870     pa_pstream_send_simple_ack(c->pstream, tag);
3871
3872     if (changed) {
3873         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3874             playback_stream *s;
3875
3876             s = pa_idxset_get_by_index(c->output_streams, idx);
3877             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3878
3879         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3880             record_stream *s;
3881
3882             s = pa_idxset_get_by_index(c->record_streams, idx);
3883             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3884
3885         } else {
3886             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3887             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3888         }
3889     }
3890 }
3891
3892 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3893     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3894     const char *s;
3895
3896     pa_native_connection_assert_ref(c);
3897     pa_assert(t);
3898
3899     if (pa_tagstruct_gets(t, &s) < 0 ||
3900         !pa_tagstruct_eof(t)) {
3901         protocol_error(c);
3902         return;
3903     }
3904
3905     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3906     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3907
3908     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3909         pa_source *source;
3910
3911         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3912         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3913
3914         pa_namereg_set_default_source(c->protocol->core, source);
3915     } else {
3916         pa_sink *sink;
3917         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3918
3919         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3920         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3921
3922         pa_namereg_set_default_sink(c->protocol->core, sink);
3923     }
3924
3925     pa_pstream_send_simple_ack(c->pstream, tag);
3926 }
3927
3928 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3929     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3930     uint32_t idx;
3931     const char *name;
3932
3933     pa_native_connection_assert_ref(c);
3934     pa_assert(t);
3935
3936     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3937         pa_tagstruct_gets(t, &name) < 0 ||
3938         !pa_tagstruct_eof(t)) {
3939         protocol_error(c);
3940         return;
3941     }
3942
3943     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3944     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3945
3946     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3947         playback_stream *s;
3948
3949         s = pa_idxset_get_by_index(c->output_streams, idx);
3950         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3951         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3952
3953         pa_sink_input_set_name(s->sink_input, name);
3954
3955     } else {
3956         record_stream *s;
3957         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3958
3959         s = pa_idxset_get_by_index(c->record_streams, idx);
3960         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3961
3962         pa_source_output_set_name(s->source_output, name);
3963     }
3964
3965     pa_pstream_send_simple_ack(c->pstream, tag);
3966 }
3967
3968 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3969     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3970     uint32_t idx;
3971
3972     pa_native_connection_assert_ref(c);
3973     pa_assert(t);
3974
3975     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3976         !pa_tagstruct_eof(t)) {
3977         protocol_error(c);
3978         return;
3979     }
3980
3981     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3982
3983     if (command == PA_COMMAND_KILL_CLIENT) {
3984         pa_client *client;
3985
3986         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3987         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3988
3989         pa_native_connection_ref(c);
3990         pa_client_kill(client);
3991
3992     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3993         pa_sink_input *s;
3994
3995         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3996         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3997
3998         pa_native_connection_ref(c);
3999         pa_sink_input_kill(s);
4000     } else {
4001         pa_source_output *s;
4002
4003         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4004
4005         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4006         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4007
4008         pa_native_connection_ref(c);
4009         pa_source_output_kill(s);
4010     }
4011
4012     pa_pstream_send_simple_ack(c->pstream, tag);
4013     pa_native_connection_unref(c);
4014 }
4015
4016 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4017     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4018     pa_module *m;
4019     const char *name, *argument;
4020     pa_tagstruct *reply;
4021
4022     pa_native_connection_assert_ref(c);
4023     pa_assert(t);
4024
4025     if (pa_tagstruct_gets(t, &name) < 0 ||
4026         pa_tagstruct_gets(t, &argument) < 0 ||
4027         !pa_tagstruct_eof(t)) {
4028         protocol_error(c);
4029         return;
4030     }
4031
4032     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4033     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4034     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4035
4036     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4037         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4038         return;
4039     }
4040
4041     reply = reply_new(tag);
4042     pa_tagstruct_putu32(reply, m->index);
4043     pa_pstream_send_tagstruct(c->pstream, reply);
4044 }
4045
4046 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4047     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4048     uint32_t idx;
4049     pa_module *m;
4050
4051     pa_native_connection_assert_ref(c);
4052     pa_assert(t);
4053
4054     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4055         !pa_tagstruct_eof(t)) {
4056         protocol_error(c);
4057         return;
4058     }
4059
4060     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4061     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4062     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4063
4064     pa_module_unload_request(m, FALSE);
4065     pa_pstream_send_simple_ack(c->pstream, tag);
4066 }
4067
4068 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4069     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4070     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4071     const char *name_device = NULL;
4072
4073     pa_native_connection_assert_ref(c);
4074     pa_assert(t);
4075
4076     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4077         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4078         pa_tagstruct_gets(t, &name_device) < 0 ||
4079         !pa_tagstruct_eof(t)) {
4080         protocol_error(c);
4081         return;
4082     }
4083
4084     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4085     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4086
4087     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4088     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4089     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4090     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4091
4092     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4093         pa_sink_input *si = NULL;
4094         pa_sink *sink = NULL;
4095
4096         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4097
4098         if (idx_device != PA_INVALID_INDEX)
4099             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4100         else
4101             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4102
4103         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4104
4105         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4106             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4107             return;
4108         }
4109     } else {
4110         pa_source_output *so = NULL;
4111         pa_source *source;
4112
4113         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4114
4115         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4116
4117         if (idx_device != PA_INVALID_INDEX)
4118             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4119         else
4120             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4121
4122         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4123
4124         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4125             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4126             return;
4127         }
4128     }
4129
4130     pa_pstream_send_simple_ack(c->pstream, tag);
4131 }
4132
4133 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4134     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4135     uint32_t idx = PA_INVALID_INDEX;
4136     const char *name = NULL;
4137     pa_bool_t b;
4138
4139     pa_native_connection_assert_ref(c);
4140     pa_assert(t);
4141
4142     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4143         pa_tagstruct_gets(t, &name) < 0 ||
4144         pa_tagstruct_get_boolean(t, &b) < 0 ||
4145         !pa_tagstruct_eof(t)) {
4146         protocol_error(c);
4147         return;
4148     }
4149
4150     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4151     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4152     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4153     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4154     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4155
4156     if (command == PA_COMMAND_SUSPEND_SINK) {
4157
4158         if (idx == PA_INVALID_INDEX && name && !*name) {
4159
4160             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4161
4162             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4163                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4164                 return;
4165             }
4166         } else {
4167             pa_sink *sink = NULL;
4168
4169             if (idx != PA_INVALID_INDEX)
4170                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4171             else
4172                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4173
4174             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4175
4176             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4177                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4178                 return;
4179             }
4180         }
4181     } else {
4182
4183         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4184
4185         if (idx == PA_INVALID_INDEX && name && !*name) {
4186
4187             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4188
4189             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4190                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4191                 return;
4192             }
4193
4194         } else {
4195             pa_source *source;
4196
4197             if (idx != PA_INVALID_INDEX)
4198                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4199             else
4200                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4201
4202             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4203
4204             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4205                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4206                 return;
4207             }
4208         }
4209     }
4210
4211     pa_pstream_send_simple_ack(c->pstream, tag);
4212 }
4213
4214 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4215     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4216     uint32_t idx = PA_INVALID_INDEX;
4217     const char *name = NULL;
4218     pa_module *m;
4219     pa_native_protocol_ext_cb_t cb;
4220
4221     pa_native_connection_assert_ref(c);
4222     pa_assert(t);
4223
4224     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4225         pa_tagstruct_gets(t, &name) < 0) {
4226         protocol_error(c);
4227         return;
4228     }
4229
4230     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4231     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4232     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4233     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4234     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4235
4236     if (idx != PA_INVALID_INDEX)
4237         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4238     else {
4239         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4240             if (strcmp(name, m->name) == 0)
4241                 break;
4242     }
4243
4244     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4245     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4246
4247     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4248     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4249
4250     if (cb(c->protocol, m, c, tag, t) < 0)
4251         protocol_error(c);
4252 }
4253
4254 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4255     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4256     uint32_t idx = PA_INVALID_INDEX;
4257     const char *name = NULL, *profile = NULL;
4258     pa_card *card = NULL;
4259     int ret;
4260
4261     pa_native_connection_assert_ref(c);
4262     pa_assert(t);
4263
4264     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4265         pa_tagstruct_gets(t, &name) < 0 ||
4266         pa_tagstruct_gets(t, &profile) < 0 ||
4267         !pa_tagstruct_eof(t)) {
4268         protocol_error(c);
4269         return;
4270     }
4271
4272     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4273     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4274     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4275     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4276     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4277
4278     if (idx != PA_INVALID_INDEX)
4279         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4280     else
4281         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4282
4283     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4284
4285     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4286         pa_pstream_send_error(c->pstream, tag, -ret);
4287         return;
4288     }
4289
4290     pa_pstream_send_simple_ack(c->pstream, tag);
4291 }
4292
4293 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4294     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4295     uint32_t idx = PA_INVALID_INDEX;
4296     const char *name = NULL, *port = NULL;
4297     int ret;
4298
4299     pa_native_connection_assert_ref(c);
4300     pa_assert(t);
4301
4302     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4303         pa_tagstruct_gets(t, &name) < 0 ||
4304         pa_tagstruct_gets(t, &port) < 0 ||
4305         !pa_tagstruct_eof(t)) {
4306         protocol_error(c);
4307         return;
4308     }
4309
4310     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4311     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4312     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4313     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4314     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4315
4316     if (command == PA_COMMAND_SET_SINK_PORT) {
4317         pa_sink *sink;
4318
4319         if (idx != PA_INVALID_INDEX)
4320             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4321         else
4322             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4323
4324         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4325
4326         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4327             pa_pstream_send_error(c->pstream, tag, -ret);
4328             return;
4329         }
4330     } else {
4331         pa_source *source;
4332
4333         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4334
4335         if (idx != PA_INVALID_INDEX)
4336             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4337         else
4338             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4339
4340         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4341
4342         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4343             pa_pstream_send_error(c->pstream, tag, -ret);
4344             return;
4345         }
4346     }
4347
4348     pa_pstream_send_simple_ack(c->pstream, tag);
4349 }
4350
4351 /*** pstream callbacks ***/
4352
4353 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4354     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4355
4356     pa_assert(p);
4357     pa_assert(packet);
4358     pa_native_connection_assert_ref(c);
4359
4360     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4361         pa_log("invalid packet.");
4362         native_connection_unlink(c);
4363     }
4364 }
4365
4366 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4367     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4368     output_stream *stream;
4369
4370     pa_assert(p);
4371     pa_assert(chunk);
4372     pa_native_connection_assert_ref(c);
4373
4374     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4375         pa_log_debug("Client sent block for invalid stream.");
4376         /* Ignoring */
4377         return;
4378     }
4379
4380 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4381
4382     if (playback_stream_isinstance(stream)) {
4383         playback_stream *ps = PLAYBACK_STREAM(stream);
4384
4385         if (chunk->memblock) {
4386             if (seek != PA_SEEK_RELATIVE || offset != 0)
4387                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4388
4389             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4390         } else
4391             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4392
4393     } else {
4394         upload_stream *u = UPLOAD_STREAM(stream);
4395         size_t l;
4396
4397         if (!u->memchunk.memblock) {
4398             if (u->length == chunk->length && chunk->memblock) {
4399                 u->memchunk = *chunk;
4400                 pa_memblock_ref(u->memchunk.memblock);
4401                 u->length = 0;
4402             } else {
4403                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4404                 u->memchunk.index = u->memchunk.length = 0;
4405             }
4406         }
4407
4408         pa_assert(u->memchunk.memblock);
4409
4410         l = u->length;
4411         if (l > chunk->length)
4412             l = chunk->length;
4413
4414         if (l > 0) {
4415             void *dst;
4416             dst = pa_memblock_acquire(u->memchunk.memblock);
4417
4418             if (chunk->memblock) {
4419                 void *src;
4420                 src = pa_memblock_acquire(chunk->memblock);
4421
4422                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4423                        (uint8_t*) src + chunk->index, l);
4424
4425                 pa_memblock_release(chunk->memblock);
4426             } else
4427                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4428
4429             pa_memblock_release(u->memchunk.memblock);
4430
4431             u->memchunk.length += l;
4432             u->length -= l;
4433         }
4434     }
4435 }
4436
4437 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4438     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4439
4440     pa_assert(p);
4441     pa_native_connection_assert_ref(c);
4442
4443     native_connection_unlink(c);
4444     pa_log_info("Connection died.");
4445 }
4446
4447 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4448     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4449
4450     pa_assert(p);
4451     pa_native_connection_assert_ref(c);
4452
4453     native_connection_send_memblock(c);
4454 }
4455
4456 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4457     pa_thread_mq *q;
4458
4459     if (!(q = pa_thread_mq_get()))
4460         pa_pstream_send_revoke(p, block_id);
4461     else
4462         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4463 }
4464
4465 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4466     pa_thread_mq *q;
4467
4468     if (!(q = pa_thread_mq_get()))
4469         pa_pstream_send_release(p, block_id);
4470     else
4471         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4472 }
4473
4474 /*** client callbacks ***/
4475
4476 static void client_kill_cb(pa_client *c) {
4477     pa_assert(c);
4478
4479     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4480     pa_log_info("Connection killed.");
4481 }
4482
4483 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4484     pa_tagstruct *t;
4485     pa_native_connection *c;
4486
4487     pa_assert(client);
4488     c = PA_NATIVE_CONNECTION(client->userdata);
4489     pa_native_connection_assert_ref(c);
4490
4491     if (c->version < 15)
4492       return;
4493
4494     t = pa_tagstruct_new(NULL, 0);
4495     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4496     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4497     pa_tagstruct_puts(t, event);
4498     pa_tagstruct_put_proplist(t, pl);
4499     pa_pstream_send_tagstruct(c->pstream, t);
4500 }
4501
4502 /*** module entry points ***/
4503
4504 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4505     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4506
4507     pa_assert(m);
4508     pa_native_connection_assert_ref(c);
4509     pa_assert(c->auth_timeout_event == e);
4510
4511     if (!c->authorized) {
4512         native_connection_unlink(c);
4513         pa_log_info("Connection terminated due to authentication timeout.");
4514     }
4515 }
4516
4517 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4518     pa_native_connection *c;
4519     char pname[128];
4520     pa_client *client;
4521     pa_client_new_data data;
4522
4523     pa_assert(p);
4524     pa_assert(io);
4525     pa_assert(o);
4526
4527     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4528         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4529         pa_iochannel_free(io);
4530         return;
4531     }
4532
4533     pa_client_new_data_init(&data);
4534     data.module = o->module;
4535     data.driver = __FILE__;
4536     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4537     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4538     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4539     client = pa_client_new(p->core, &data);
4540     pa_client_new_data_done(&data);
4541
4542     if (!client)
4543         return;
4544
4545     c = pa_msgobject_new(pa_native_connection);
4546     c->parent.parent.free = native_connection_free;
4547     c->parent.process_msg = native_connection_process_msg;
4548     c->protocol = p;
4549     c->options = pa_native_options_ref(o);
4550     c->authorized = FALSE;
4551
4552     if (o->auth_anonymous) {
4553         pa_log_info("Client authenticated anonymously.");
4554         c->authorized = TRUE;
4555     }
4556
4557     if (!c->authorized &&
4558         o->auth_ip_acl &&
4559         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4560
4561         pa_log_info("Client authenticated by IP ACL.");
4562         c->authorized = TRUE;
4563     }
4564
4565     if (!c->authorized)
4566         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4567     else
4568         c->auth_timeout_event = NULL;
4569
4570     c->is_local = pa_iochannel_socket_is_local(io);
4571     c->version = 8;
4572
4573     c->client = client;
4574     c->client->kill = client_kill_cb;
4575     c->client->send_event = client_send_event_cb;
4576     c->client->userdata = c;
4577
4578     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4579     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4580     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4581     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4582     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4583     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4584     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4585
4586     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4587
4588     c->record_streams = pa_idxset_new(NULL, NULL);
4589     c->output_streams = pa_idxset_new(NULL, NULL);
4590
4591     c->rrobin_index = PA_IDXSET_INVALID;
4592     c->subscription = NULL;
4593
4594     pa_idxset_put(p->connections, c, NULL);
4595
4596 #ifdef HAVE_CREDS
4597     if (pa_iochannel_creds_supported(io))
4598         pa_iochannel_creds_enable(io);
4599 #endif
4600
4601     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4602 }
4603
4604 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4605     pa_native_connection *c;
4606     void *state = NULL;
4607
4608     pa_assert(p);
4609     pa_assert(m);
4610
4611     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4612         if (c->options->module == m)
4613             native_connection_unlink(c);
4614 }
4615
4616 static pa_native_protocol* native_protocol_new(pa_core *c) {
4617     pa_native_protocol *p;
4618     pa_native_hook_t h;
4619
4620     pa_assert(c);
4621
4622     p = pa_xnew(pa_native_protocol, 1);
4623     PA_REFCNT_INIT(p);
4624     p->core = c;
4625     p->connections = pa_idxset_new(NULL, NULL);
4626
4627     p->servers = NULL;
4628
4629     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4630
4631     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4632         pa_hook_init(&p->hooks[h], p);
4633
4634     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4635
4636     return p;
4637 }
4638
4639 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4640     pa_native_protocol *p;
4641
4642     if ((p = pa_shared_get(c, "native-protocol")))
4643         return pa_native_protocol_ref(p);
4644
4645     return native_protocol_new(c);
4646 }
4647
4648 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4649     pa_assert(p);
4650     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4651
4652     PA_REFCNT_INC(p);
4653
4654     return p;
4655 }
4656
4657 void pa_native_protocol_unref(pa_native_protocol *p) {
4658     pa_native_connection *c;
4659     pa_native_hook_t h;
4660
4661     pa_assert(p);
4662     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4663
4664     if (PA_REFCNT_DEC(p) > 0)
4665         return;
4666
4667     while ((c = pa_idxset_first(p->connections, NULL)))
4668         native_connection_unlink(c);
4669
4670     pa_idxset_free(p->connections, NULL, NULL);
4671
4672     pa_strlist_free(p->servers);
4673
4674     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4675         pa_hook_done(&p->hooks[h]);
4676
4677     pa_hashmap_free(p->extensions, NULL, NULL);
4678
4679     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4680
4681     pa_xfree(p);
4682 }
4683
4684 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4685     pa_assert(p);
4686     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4687     pa_assert(name);
4688
4689     p->servers = pa_strlist_prepend(p->servers, name);
4690
4691     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4692 }
4693
4694 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4695     pa_assert(p);
4696     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4697     pa_assert(name);
4698
4699     p->servers = pa_strlist_remove(p->servers, name);
4700
4701     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4702 }
4703
4704 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4705     pa_assert(p);
4706     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4707
4708     return p->hooks;
4709 }
4710
4711 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4712     pa_assert(p);
4713     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4714
4715     return p->servers;
4716 }
4717
4718 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4719     pa_assert(p);
4720     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4721     pa_assert(m);
4722     pa_assert(cb);
4723     pa_assert(!pa_hashmap_get(p->extensions, m));
4724
4725     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4726     return 0;
4727 }
4728
4729 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4730     pa_assert(p);
4731     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4732     pa_assert(m);
4733
4734     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4735 }
4736
4737 pa_native_options* pa_native_options_new(void) {
4738     pa_native_options *o;
4739
4740     o = pa_xnew0(pa_native_options, 1);
4741     PA_REFCNT_INIT(o);
4742
4743     return o;
4744 }
4745
4746 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4747     pa_assert(o);
4748     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4749
4750     PA_REFCNT_INC(o);
4751
4752     return o;
4753 }
4754
4755 void pa_native_options_unref(pa_native_options *o) {
4756     pa_assert(o);
4757     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4758
4759     if (PA_REFCNT_DEC(o) > 0)
4760         return;
4761
4762     pa_xfree(o->auth_group);
4763
4764     if (o->auth_ip_acl)
4765         pa_ip_acl_free(o->auth_ip_acl);
4766
4767     if (o->auth_cookie)
4768         pa_auth_cookie_unref(o->auth_cookie);
4769
4770     pa_xfree(o);
4771 }
4772
4773 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4774     pa_bool_t enabled;
4775     const char *acl;
4776
4777     pa_assert(o);
4778     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4779     pa_assert(ma);
4780
4781     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4782         pa_log("auth-anonymous= expects a boolean argument.");
4783         return -1;
4784     }
4785
4786     enabled = TRUE;
4787     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4788         pa_log("auth-group-enabled= expects a boolean argument.");
4789         return -1;
4790     }
4791
4792     pa_xfree(o->auth_group);
4793     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4794
4795 #ifndef HAVE_CREDS
4796     if (o->auth_group)
4797         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4798 #endif
4799
4800     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4801         pa_ip_acl *ipa;
4802
4803         if (!(ipa = pa_ip_acl_new(acl))) {
4804             pa_log("Failed to parse IP ACL '%s'", acl);
4805             return -1;
4806         }
4807
4808         if (o->auth_ip_acl)
4809             pa_ip_acl_free(o->auth_ip_acl);
4810
4811         o->auth_ip_acl = ipa;
4812     }
4813
4814     enabled = TRUE;
4815     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4816         pa_log("auth-cookie-enabled= expects a boolean argument.");
4817         return -1;
4818     }
4819
4820     if (o->auth_cookie)
4821         pa_auth_cookie_unref(o->auth_cookie);
4822
4823     if (enabled) {
4824         const char *cn;
4825
4826         /* The new name for this is 'auth-cookie', for compat reasons
4827          * we check the old name too */
4828         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4829             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4830                 cn = PA_NATIVE_COOKIE_FILE;
4831
4832         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4833             return -1;
4834
4835     } else
4836           o->auth_cookie = NULL;
4837
4838     return 0;
4839 }
4840
4841 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4842     pa_native_connection_assert_ref(c);
4843
4844     return c->pstream;
4845 }