Merge commit 'origin/master-tx'
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/native-common.h>
40 #include <pulsecore/packet.h>
41 #include <pulsecore/client.h>
42 #include <pulsecore/source-output.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/pstream.h>
45 #include <pulsecore/tagstruct.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/llist.h>
57 #include <pulsecore/creds.h>
58 #include <pulsecore/core-util.h>
59 #include <pulsecore/ipacl.h>
60 #include <pulsecore/thread-mq.h>
61
62 #include "protocol-native.h"
63
64 /* Kick a client if it doesn't authenticate within this time */
65 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
66
67 /* Don't accept more connection than this */
68 #define MAX_CONNECTIONS 64
69
70 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
71 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
72 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
73 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
74
75 struct pa_native_protocol;
76
77 typedef struct record_stream {
78     pa_msgobject parent;
79
80     pa_native_connection *connection;
81     uint32_t index;
82
83     pa_source_output *source_output;
84     pa_memblockq *memblockq;
85
86     pa_bool_t adjust_latency:1;
87     pa_bool_t early_requests:1;
88
89     pa_buffer_attr buffer_attr;
90
91     pa_atomic_t on_the_fly;
92     pa_usec_t configured_source_latency;
93     size_t drop_initial;
94
95     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
96     size_t on_the_fly_snapshot;
97     pa_usec_t current_monitor_latency;
98     pa_usec_t current_source_latency;
99 } record_stream;
100
101 #define RECORD_STREAM(o) (record_stream_cast(o))
102 PA_DEFINE_PRIVATE_CLASS(record_stream, pa_msgobject);
103
104 typedef struct output_stream {
105     pa_msgobject parent;
106 } output_stream;
107
108 #define OUTPUT_STREAM(o) (output_stream_cast(o))
109 PA_DEFINE_PRIVATE_CLASS(output_stream, pa_msgobject);
110
111 typedef struct playback_stream {
112     output_stream parent;
113
114     pa_native_connection *connection;
115     uint32_t index;
116
117     pa_sink_input *sink_input;
118     pa_memblockq *memblockq;
119
120     pa_bool_t adjust_latency:1;
121     pa_bool_t early_requests:1;
122
123     pa_bool_t is_underrun:1;
124     pa_bool_t drain_request:1;
125     uint32_t drain_tag;
126     uint32_t syncid;
127
128     pa_atomic_t missing;
129     pa_usec_t configured_sink_latency;
130     pa_buffer_attr buffer_attr;
131
132     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
133     int64_t read_index, write_index;
134     size_t render_memblockq_length;
135     pa_usec_t current_sink_latency;
136     uint64_t playing_for, underrun_for;
137 } playback_stream;
138
139 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
140 PA_DEFINE_PRIVATE_CLASS(playback_stream, output_stream);
141
142 typedef struct upload_stream {
143     output_stream parent;
144
145     pa_native_connection *connection;
146     uint32_t index;
147
148     pa_memchunk memchunk;
149     size_t length;
150     char *name;
151     pa_sample_spec sample_spec;
152     pa_channel_map channel_map;
153     pa_proplist *proplist;
154 } upload_stream;
155
156 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
157 PA_DEFINE_PRIVATE_CLASS(upload_stream, output_stream);
158
159 struct pa_native_connection {
160     pa_msgobject parent;
161     pa_native_protocol *protocol;
162     pa_native_options *options;
163     pa_bool_t authorized:1;
164     pa_bool_t is_local:1;
165     uint32_t version;
166     pa_client *client;
167     pa_pstream *pstream;
168     pa_pdispatch *pdispatch;
169     pa_idxset *record_streams, *output_streams;
170     uint32_t rrobin_index;
171     pa_subscription *subscription;
172     pa_time_event *auth_timeout_event;
173 };
174
175 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
176 PA_DEFINE_PRIVATE_CLASS(pa_native_connection, pa_msgobject);
177
178 struct pa_native_protocol {
179     PA_REFCNT_DECLARE;
180
181     pa_core *core;
182     pa_idxset *connections;
183
184     pa_strlist *servers;
185     pa_hook hooks[PA_NATIVE_HOOK_MAX];
186
187     pa_hashmap *extensions;
188 };
189
190 enum {
191     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
192 };
193
194 enum {
195     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
196     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
197     SINK_INPUT_MESSAGE_FLUSH,
198     SINK_INPUT_MESSAGE_TRIGGER,
199     SINK_INPUT_MESSAGE_SEEK,
200     SINK_INPUT_MESSAGE_PREBUF_FORCE,
201     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
202     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
203 };
204
205 enum {
206     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
207     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
208     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
209     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
210     PLAYBACK_STREAM_MESSAGE_STARTED,
211     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
212 };
213
214 enum {
215     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
216 };
217
218 enum {
219     CONNECTION_MESSAGE_RELEASE,
220     CONNECTION_MESSAGE_REVOKE
221 };
222
223 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
224 static void sink_input_kill_cb(pa_sink_input *i);
225 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
226 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
227 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
228 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
229 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
230 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
231
232 static void native_connection_send_memblock(pa_native_connection *c);
233 static void playback_stream_request_bytes(struct playback_stream*s);
234
235 static void source_output_kill_cb(pa_source_output *o);
236 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
237 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
238 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
239 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
240 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
241
242 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
243 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
244
245 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
246 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
247 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
248 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
249 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284
285 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
286     [PA_COMMAND_ERROR] = NULL,
287     [PA_COMMAND_TIMEOUT] = NULL,
288     [PA_COMMAND_REPLY] = NULL,
289     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
290     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
291     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
292     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
293     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
294     [PA_COMMAND_AUTH] = command_auth,
295     [PA_COMMAND_REQUEST] = NULL,
296     [PA_COMMAND_EXIT] = command_exit,
297     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
298     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
299     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
300     [PA_COMMAND_STAT] = command_stat,
301     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
302     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
303     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
304     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
305     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
306     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
307     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
308     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
309     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
310     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
311     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
312     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
313     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
314     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
315     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
316     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
317     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
318     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
319     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
320     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
321     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
325     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
326
327     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
328     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
329     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
330
331     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
332     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
333     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
334
335     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
336     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
337
338     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
339     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
340     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
341     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
342
343     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
344     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
345
346     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
347     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
348     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
349     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
350     [PA_COMMAND_KILL_CLIENT] = command_kill,
351     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
352     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
353     [PA_COMMAND_LOAD_MODULE] = command_load_module,
354     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
355
356     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
357     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
358     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
359     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
360
361     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
362     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
363
364     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
365     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
366
367     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
368     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
369
370     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
371     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
372     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
373
374     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
375     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
376     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
377
378     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
379
380     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
381     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
382
383     [PA_COMMAND_EXTENSION] = command_extension
384 };
385
386 /* structure management */
387
388 /* Called from main context */
389 static void upload_stream_unlink(upload_stream *s) {
390     pa_assert(s);
391
392     if (!s->connection)
393         return;
394
395     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
396     s->connection = NULL;
397     upload_stream_unref(s);
398 }
399
400 /* Called from main context */
401 static void upload_stream_free(pa_object *o) {
402     upload_stream *s = UPLOAD_STREAM(o);
403     pa_assert(s);
404
405     upload_stream_unlink(s);
406
407     pa_xfree(s->name);
408
409     if (s->proplist)
410         pa_proplist_free(s->proplist);
411
412     if (s->memchunk.memblock)
413         pa_memblock_unref(s->memchunk.memblock);
414
415     pa_xfree(s);
416 }
417
418 /* Called from main context */
419 static upload_stream* upload_stream_new(
420         pa_native_connection *c,
421         const pa_sample_spec *ss,
422         const pa_channel_map *map,
423         const char *name,
424         size_t length,
425         pa_proplist *p) {
426
427     upload_stream *s;
428
429     pa_assert(c);
430     pa_assert(ss);
431     pa_assert(name);
432     pa_assert(length > 0);
433     pa_assert(p);
434
435     s = pa_msgobject_new(upload_stream);
436     s->parent.parent.parent.free = upload_stream_free;
437     s->connection = c;
438     s->sample_spec = *ss;
439     s->channel_map = *map;
440     s->name = pa_xstrdup(name);
441     pa_memchunk_reset(&s->memchunk);
442     s->length = length;
443     s->proplist = pa_proplist_copy(p);
444     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
445
446     pa_idxset_put(c->output_streams, s, &s->index);
447
448     return s;
449 }
450
451 /* Called from main context */
452 static void record_stream_unlink(record_stream *s) {
453     pa_assert(s);
454
455     if (!s->connection)
456         return;
457
458     if (s->source_output) {
459         pa_source_output_unlink(s->source_output);
460         pa_source_output_unref(s->source_output);
461         s->source_output = NULL;
462     }
463
464     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
465     s->connection = NULL;
466     record_stream_unref(s);
467 }
468
469 /* Called from main context */
470 static void record_stream_free(pa_object *o) {
471     record_stream *s = RECORD_STREAM(o);
472     pa_assert(s);
473
474     record_stream_unlink(s);
475
476     pa_memblockq_free(s->memblockq);
477     pa_xfree(s);
478 }
479
480 /* Called from main context */
481 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
482     record_stream *s = RECORD_STREAM(o);
483     record_stream_assert_ref(s);
484
485     if (!s->connection)
486         return -1;
487
488     switch (code) {
489
490         case RECORD_STREAM_MESSAGE_POST_DATA:
491
492             /* We try to keep up to date with how many bytes are
493              * currently on the fly */
494             pa_atomic_sub(&s->on_the_fly, chunk->length);
495
496             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
497 /*                 pa_log_warn("Failed to push data into output queue."); */
498                 return -1;
499             }
500
501             if (!pa_pstream_is_pending(s->connection->pstream))
502                 native_connection_send_memblock(s->connection);
503
504             break;
505     }
506
507     return 0;
508 }
509
510 /* Called from main context */
511 static void fix_record_buffer_attr_pre(record_stream *s) {
512
513     size_t frame_size;
514     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
515
516     pa_assert(s);
517
518     /* This function will be called from the main thread, before as
519      * well as after the source output has been activated using
520      * pa_source_output_put()! That means it may not touch any
521      * ->thread_info data! */
522
523     frame_size = pa_frame_size(&s->source_output->sample_spec);
524
525     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
526         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
527     if (s->buffer_attr.maxlength <= 0)
528         s->buffer_attr.maxlength = (uint32_t) frame_size;
529
530     if (s->buffer_attr.fragsize == (uint32_t) -1)
531         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
532     if (s->buffer_attr.fragsize <= 0)
533         s->buffer_attr.fragsize = (uint32_t) frame_size;
534
535     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
536
537     if (s->early_requests) {
538
539         /* In early request mode we need to emulate the classic
540          * fragment-based playback model. We do this setting the source
541          * latency to the fragment size. */
542
543         source_usec = fragsize_usec;
544
545     } else if (s->adjust_latency) {
546
547         /* So, the user asked us to adjust the latency according to
548          * what the source can provide. Half the latency will be
549          * spent on the hw buffer, half of it in the async buffer
550          * queue we maintain for each client. */
551
552         source_usec = fragsize_usec/2;
553
554     } else {
555
556         /* Ok, the user didn't ask us to adjust the latency, hence we
557          * don't */
558
559         source_usec = (pa_usec_t) -1;
560     }
561
562     if (source_usec != (pa_usec_t) -1)
563         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
564     else
565         s->configured_source_latency = 0;
566
567     if (s->early_requests) {
568
569         /* Ok, we didn't necessarily get what we were asking for, so
570          * let's tell the user */
571
572         fragsize_usec = s->configured_source_latency;
573
574     } else if (s->adjust_latency) {
575
576         /* Now subtract what we actually got */
577
578         if (fragsize_usec >= s->configured_source_latency*2)
579             fragsize_usec -= s->configured_source_latency;
580         else
581             fragsize_usec = s->configured_source_latency;
582     }
583
584     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
585         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
586
587         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
588
589     if (s->buffer_attr.fragsize <= 0)
590         s->buffer_attr.fragsize = (uint32_t) frame_size;
591 }
592
593 /* Called from main context */
594 static void fix_record_buffer_attr_post(record_stream *s) {
595     size_t base;
596
597     pa_assert(s);
598
599     /* This function will be called from the main thread, before as
600      * well as after the source output has been activated using
601      * pa_source_output_put()! That means it may not touch and
602      * ->thread_info data! */
603
604     base = pa_frame_size(&s->source_output->sample_spec);
605
606     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
607     if (s->buffer_attr.fragsize <= 0)
608         s->buffer_attr.fragsize = base;
609
610     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
611         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
612 }
613
614 /* Called from main context */
615 static record_stream* record_stream_new(
616         pa_native_connection *c,
617         pa_source *source,
618         pa_sample_spec *ss,
619         pa_channel_map *map,
620         pa_bool_t peak_detect,
621         pa_buffer_attr *attr,
622         pa_source_output_flags_t flags,
623         pa_proplist *p,
624         pa_bool_t adjust_latency,
625         pa_sink_input *direct_on_input,
626         pa_bool_t early_requests,
627         int *ret) {
628
629     record_stream *s;
630     pa_source_output *source_output = NULL;
631     size_t base;
632     pa_source_output_new_data data;
633
634     pa_assert(c);
635     pa_assert(ss);
636     pa_assert(p);
637     pa_assert(ret);
638
639     pa_source_output_new_data_init(&data);
640
641     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
642     data.driver = __FILE__;
643     data.module = c->options->module;
644     data.client = c->client;
645     data.source = source;
646     data.direct_on_input = direct_on_input;
647     pa_source_output_new_data_set_sample_spec(&data, ss);
648     pa_source_output_new_data_set_channel_map(&data, map);
649     if (peak_detect)
650         data.resample_method = PA_RESAMPLER_PEAKS;
651
652     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
653
654     pa_source_output_new_data_done(&data);
655
656     if (!source_output)
657         return NULL;
658
659     s = pa_msgobject_new(record_stream);
660     s->parent.parent.free = record_stream_free;
661     s->parent.process_msg = record_stream_process_msg;
662     s->connection = c;
663     s->source_output = source_output;
664     s->buffer_attr = *attr;
665     s->adjust_latency = adjust_latency;
666     s->early_requests = early_requests;
667     pa_atomic_store(&s->on_the_fly, 0);
668
669     s->source_output->parent.process_msg = source_output_process_msg;
670     s->source_output->push = source_output_push_cb;
671     s->source_output->kill = source_output_kill_cb;
672     s->source_output->get_latency = source_output_get_latency_cb;
673     s->source_output->moving = source_output_moving_cb;
674     s->source_output->suspend = source_output_suspend_cb;
675     s->source_output->send_event = source_output_send_event_cb;
676     s->source_output->userdata = s;
677
678     fix_record_buffer_attr_pre(s);
679
680     s->memblockq = pa_memblockq_new(
681             0,
682             s->buffer_attr.maxlength,
683             0,
684             base = pa_frame_size(&source_output->sample_spec),
685             1,
686             0,
687             0,
688             NULL);
689
690     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
691     fix_record_buffer_attr_post(s);
692
693     *ss = s->source_output->sample_spec;
694     *map = s->source_output->channel_map;
695
696     pa_idxset_put(c->record_streams, s, &s->index);
697
698     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
699                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
700                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
701                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
702
703     pa_source_output_put(s->source_output);
704     return s;
705 }
706
707 /* Called from main context */
708 static void record_stream_send_killed(record_stream *r) {
709     pa_tagstruct *t;
710     record_stream_assert_ref(r);
711
712     t = pa_tagstruct_new(NULL, 0);
713     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
714     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
715     pa_tagstruct_putu32(t, r->index);
716     pa_pstream_send_tagstruct(r->connection->pstream, t);
717 }
718
719 /* Called from main context */
720 static void playback_stream_unlink(playback_stream *s) {
721     pa_assert(s);
722
723     if (!s->connection)
724         return;
725
726     if (s->sink_input) {
727         pa_sink_input_unlink(s->sink_input);
728         pa_sink_input_unref(s->sink_input);
729         s->sink_input = NULL;
730     }
731
732     if (s->drain_request)
733         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
734
735     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
736     s->connection = NULL;
737     playback_stream_unref(s);
738 }
739
740 /* Called from main context */
741 static void playback_stream_free(pa_object* o) {
742     playback_stream *s = PLAYBACK_STREAM(o);
743     pa_assert(s);
744
745     playback_stream_unlink(s);
746
747     pa_memblockq_free(s->memblockq);
748     pa_xfree(s);
749 }
750
751 /* Called from main context */
752 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
753     playback_stream *s = PLAYBACK_STREAM(o);
754     playback_stream_assert_ref(s);
755
756     if (!s->connection)
757         return -1;
758
759     switch (code) {
760
761         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
762             pa_tagstruct *t;
763             int l = 0;
764
765             for (;;) {
766                 if ((l = pa_atomic_load(&s->missing)) <= 0)
767                     return 0;
768
769                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
770                     break;
771             }
772
773             t = pa_tagstruct_new(NULL, 0);
774             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
775             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
776             pa_tagstruct_putu32(t, s->index);
777             pa_tagstruct_putu32(t, (uint32_t) l);
778             pa_pstream_send_tagstruct(s->connection->pstream, t);
779
780 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
781             break;
782         }
783
784         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
785             pa_tagstruct *t;
786
787 /*             pa_log("signalling underflow"); */
788
789             /* Report that we're empty */
790             t = pa_tagstruct_new(NULL, 0);
791             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
792             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
793             pa_tagstruct_putu32(t, s->index);
794             pa_pstream_send_tagstruct(s->connection->pstream, t);
795             break;
796         }
797
798         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
799             pa_tagstruct *t;
800
801             /* Notify the user we're overflowed*/
802             t = pa_tagstruct_new(NULL, 0);
803             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
804             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
805             pa_tagstruct_putu32(t, s->index);
806             pa_pstream_send_tagstruct(s->connection->pstream, t);
807             break;
808         }
809
810         case PLAYBACK_STREAM_MESSAGE_STARTED:
811
812             if (s->connection->version >= 13) {
813                 pa_tagstruct *t;
814
815                 /* Notify the user we started playback */
816                 t = pa_tagstruct_new(NULL, 0);
817                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
818                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
819                 pa_tagstruct_putu32(t, s->index);
820                 pa_pstream_send_tagstruct(s->connection->pstream, t);
821             }
822
823             break;
824
825         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
826             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
827             break;
828
829         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
830             pa_tagstruct *t;
831
832             s->buffer_attr.tlength = (uint32_t) offset;
833
834             t = pa_tagstruct_new(NULL, 0);
835             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
836             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
837             pa_tagstruct_putu32(t, s->index);
838             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
839             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
840             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
841             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
842             pa_tagstruct_put_usec(t, s->configured_sink_latency);
843             pa_pstream_send_tagstruct(s->connection->pstream, t);
844
845             break;
846         }
847     }
848
849     return 0;
850 }
851
852 /* Called from main context */
853 static void fix_playback_buffer_attr(playback_stream *s) {
854     size_t frame_size, max_prebuf;
855     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
856
857     pa_assert(s);
858
859     /* This function will be called from the main thread, before as
860      * well as after the sink input has been activated using
861      * pa_sink_input_put()! That means it may not touch any
862      * ->thread_info data, such as the memblockq! */
863
864     frame_size = pa_frame_size(&s->sink_input->sample_spec);
865
866     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
867         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
868     if (s->buffer_attr.maxlength <= 0)
869         s->buffer_attr.maxlength = (uint32_t) frame_size;
870
871     if (s->buffer_attr.tlength == (uint32_t) -1)
872         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
873     if (s->buffer_attr.tlength <= 0)
874         s->buffer_attr.tlength = (uint32_t) frame_size;
875
876     if (s->buffer_attr.minreq == (uint32_t) -1)
877         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
878     if (s->buffer_attr.minreq <= 0)
879         s->buffer_attr.minreq = (uint32_t) frame_size;
880
881     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
882         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
883
884     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
885     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
886
887     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
888                 (double) tlength_usec / PA_USEC_PER_MSEC,
889                 (double) minreq_usec / PA_USEC_PER_MSEC);
890
891     if (s->early_requests) {
892
893         /* In early request mode we need to emulate the classic
894          * fragment-based playback model. We do this setting the sink
895          * latency to the fragment size. */
896
897         sink_usec = minreq_usec;
898         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
899
900     } else if (s->adjust_latency) {
901
902         /* So, the user asked us to adjust the latency of the stream
903          * buffer according to the what the sink can provide. The
904          * tlength passed in shall be the overall latency. Roughly
905          * half the latency will be spent on the hw buffer, the other
906          * half of it in the async buffer queue we maintain for each
907          * client. In between we'll have a safety space of size
908          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
909          * empty and needs to be filled, then our buffer must have
910          * enough data to fulfill this request immediatly and thus
911          * have at least the same tlength as the size of the hw
912          * buffer. It additionally needs space for 2 times minreq
913          * because if the buffer ran empty and a partial fillup
914          * happens immediately on the next iteration we need to be
915          * able to fulfill it and give the application also minreq
916          * time to fill it up again for the next request Makes 2 times
917          * minreq in plus.. */
918
919         if (tlength_usec > minreq_usec*2)
920             sink_usec = (tlength_usec - minreq_usec*2)/2;
921         else
922             sink_usec = 0;
923
924         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
925
926     } else {
927
928         /* Ok, the user didn't ask us to adjust the latency, but we
929          * still need to make sure that the parameters from the user
930          * do make sense. */
931
932         if (tlength_usec > minreq_usec*2)
933             sink_usec = (tlength_usec - minreq_usec*2);
934         else
935             sink_usec = 0;
936
937         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
938     }
939
940     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
941
942     if (s->early_requests) {
943
944         /* Ok, we didn't necessarily get what we were asking for, so
945          * let's tell the user */
946
947         minreq_usec = s->configured_sink_latency;
948
949     } else if (s->adjust_latency) {
950
951         /* Ok, we didn't necessarily get what we were asking for, so
952          * let's subtract from what we asked for for the remaining
953          * buffer space */
954
955         if (tlength_usec >= s->configured_sink_latency)
956             tlength_usec -= s->configured_sink_latency;
957     }
958
959     /* FIXME: This is actually larger than necessary, since not all of
960      * the sink latency is actually rewritable. */
961     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
962         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
963
964     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
965         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
966         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
967
968     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
969         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
970         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
971
972     if (s->buffer_attr.minreq <= 0) {
973         s->buffer_attr.minreq = (uint32_t) frame_size;
974         s->buffer_attr.tlength += (uint32_t) frame_size*2;
975     }
976
977     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
978         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
979
980     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
981
982     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
983         s->buffer_attr.prebuf > max_prebuf)
984         s->buffer_attr.prebuf = max_prebuf;
985 }
986
987 /* Called from main context */
988 static playback_stream* playback_stream_new(
989         pa_native_connection *c,
990         pa_sink *sink,
991         pa_sample_spec *ss,
992         pa_channel_map *map,
993         pa_buffer_attr *a,
994         pa_cvolume *volume,
995         pa_bool_t muted,
996         pa_bool_t muted_set,
997         uint32_t syncid,
998         uint32_t *missing,
999         pa_sink_input_flags_t flags,
1000         pa_proplist *p,
1001         pa_bool_t adjust_latency,
1002         pa_bool_t early_requests,
1003         int *ret) {
1004
1005     playback_stream *s, *ssync;
1006     pa_sink_input *sink_input = NULL;
1007     pa_memchunk silence;
1008     uint32_t idx;
1009     int64_t start_index;
1010     pa_sink_input_new_data data;
1011
1012     pa_assert(c);
1013     pa_assert(ss);
1014     pa_assert(missing);
1015     pa_assert(p);
1016     pa_assert(ret);
1017
1018     /* Find syncid group */
1019     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1020
1021         if (!playback_stream_isinstance(ssync))
1022             continue;
1023
1024         if (ssync->syncid == syncid)
1025             break;
1026     }
1027
1028     /* Synced streams must connect to the same sink */
1029     if (ssync) {
1030
1031         if (!sink)
1032             sink = ssync->sink_input->sink;
1033         else if (sink != ssync->sink_input->sink) {
1034             *ret = PA_ERR_INVALID;
1035             return NULL;
1036         }
1037     }
1038
1039     pa_sink_input_new_data_init(&data);
1040
1041     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1042     data.driver = __FILE__;
1043     data.module = c->options->module;
1044     data.client = c->client;
1045     data.sink = sink;
1046     pa_sink_input_new_data_set_sample_spec(&data, ss);
1047     pa_sink_input_new_data_set_channel_map(&data, map);
1048     if (volume)
1049         pa_sink_input_new_data_set_volume(&data, volume);
1050     if (muted_set)
1051         pa_sink_input_new_data_set_muted(&data, muted);
1052     data.sync_base = ssync ? ssync->sink_input : NULL;
1053
1054     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1055
1056     pa_sink_input_new_data_done(&data);
1057
1058     if (!sink_input)
1059         return NULL;
1060
1061     s = pa_msgobject_new(playback_stream);
1062     s->parent.parent.parent.free = playback_stream_free;
1063     s->parent.parent.process_msg = playback_stream_process_msg;
1064     s->connection = c;
1065     s->syncid = syncid;
1066     s->sink_input = sink_input;
1067     s->is_underrun = TRUE;
1068     s->drain_request = FALSE;
1069     pa_atomic_store(&s->missing, 0);
1070     s->buffer_attr = *a;
1071     s->adjust_latency = adjust_latency;
1072     s->early_requests = early_requests;
1073
1074     s->sink_input->parent.process_msg = sink_input_process_msg;
1075     s->sink_input->pop = sink_input_pop_cb;
1076     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1077     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1078     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1079     s->sink_input->kill = sink_input_kill_cb;
1080     s->sink_input->moving = sink_input_moving_cb;
1081     s->sink_input->suspend = sink_input_suspend_cb;
1082     s->sink_input->send_event = sink_input_send_event_cb;
1083     s->sink_input->userdata = s;
1084
1085     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1086
1087     fix_playback_buffer_attr(s);
1088
1089     pa_sink_input_get_silence(sink_input, &silence);
1090     s->memblockq = pa_memblockq_new(
1091             start_index,
1092             s->buffer_attr.maxlength,
1093             s->buffer_attr.tlength,
1094             pa_frame_size(&sink_input->sample_spec),
1095             s->buffer_attr.prebuf,
1096             s->buffer_attr.minreq,
1097             0,
1098             &silence);
1099     pa_memblock_unref(silence.memblock);
1100
1101     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1102
1103     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1104
1105     *ss = s->sink_input->sample_spec;
1106     *map = s->sink_input->channel_map;
1107
1108     pa_idxset_put(c->output_streams, s, &s->index);
1109
1110     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1111                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1112                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1113                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1114                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1115
1116     pa_sink_input_put(s->sink_input);
1117     return s;
1118 }
1119
1120 /* Called from IO context */
1121 static void playback_stream_request_bytes(playback_stream *s) {
1122     size_t m, minreq;
1123     int previous_missing;
1124
1125     playback_stream_assert_ref(s);
1126
1127     m = pa_memblockq_pop_missing(s->memblockq);
1128
1129     /* pa_log("request_bytes(%lu) (tlength=%lu minreq=%lu length=%lu)", */
1130     /*        (unsigned long) m, */
1131     /*        pa_memblockq_get_tlength(s->memblockq), */
1132     /*        pa_memblockq_get_minreq(s->memblockq), */
1133     /*        pa_memblockq_get_length(s->memblockq)); */
1134
1135     if (m <= 0)
1136         return;
1137
1138 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1139
1140     previous_missing = pa_atomic_add(&s->missing, (int) m);
1141     minreq = pa_memblockq_get_minreq(s->memblockq);
1142
1143     if (pa_memblockq_prebuf_active(s->memblockq) ||
1144         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1145         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1146 }
1147
1148 /* Called from main context */
1149 static void playback_stream_send_killed(playback_stream *p) {
1150     pa_tagstruct *t;
1151     playback_stream_assert_ref(p);
1152
1153     t = pa_tagstruct_new(NULL, 0);
1154     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1155     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1156     pa_tagstruct_putu32(t, p->index);
1157     pa_pstream_send_tagstruct(p->connection->pstream, t);
1158 }
1159
1160 /* Called from main context */
1161 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1162     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1163     pa_native_connection_assert_ref(c);
1164
1165     if (!c->protocol)
1166         return -1;
1167
1168     switch (code) {
1169
1170         case CONNECTION_MESSAGE_REVOKE:
1171             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1172             break;
1173
1174         case CONNECTION_MESSAGE_RELEASE:
1175             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1176             break;
1177     }
1178
1179     return 0;
1180 }
1181
1182 /* Called from main context */
1183 static void native_connection_unlink(pa_native_connection *c) {
1184     record_stream *r;
1185     output_stream *o;
1186
1187     pa_assert(c);
1188
1189     if (!c->protocol)
1190         return;
1191
1192     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1193
1194     if (c->options)
1195         pa_native_options_unref(c->options);
1196
1197     while ((r = pa_idxset_first(c->record_streams, NULL)))
1198         record_stream_unlink(r);
1199
1200     while ((o = pa_idxset_first(c->output_streams, NULL)))
1201         if (playback_stream_isinstance(o))
1202             playback_stream_unlink(PLAYBACK_STREAM(o));
1203         else
1204             upload_stream_unlink(UPLOAD_STREAM(o));
1205
1206     if (c->subscription)
1207         pa_subscription_free(c->subscription);
1208
1209     if (c->pstream)
1210         pa_pstream_unlink(c->pstream);
1211
1212     if (c->auth_timeout_event) {
1213         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1214         c->auth_timeout_event = NULL;
1215     }
1216
1217     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1218     c->protocol = NULL;
1219     pa_native_connection_unref(c);
1220 }
1221
1222 /* Called from main context */
1223 static void native_connection_free(pa_object *o) {
1224     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1225
1226     pa_assert(c);
1227
1228     native_connection_unlink(c);
1229
1230     pa_idxset_free(c->record_streams, NULL, NULL);
1231     pa_idxset_free(c->output_streams, NULL, NULL);
1232
1233     pa_pdispatch_unref(c->pdispatch);
1234     pa_pstream_unref(c->pstream);
1235     pa_client_free(c->client);
1236
1237     pa_xfree(c);
1238 }
1239
1240 /* Called from main context */
1241 static void native_connection_send_memblock(pa_native_connection *c) {
1242     uint32_t start;
1243     record_stream *r;
1244
1245     start = PA_IDXSET_INVALID;
1246     for (;;) {
1247         pa_memchunk chunk;
1248
1249         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1250             return;
1251
1252         if (start == PA_IDXSET_INVALID)
1253             start = c->rrobin_index;
1254         else if (start == c->rrobin_index)
1255             return;
1256
1257         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1258             pa_memchunk schunk = chunk;
1259
1260             if (schunk.length > r->buffer_attr.fragsize)
1261                 schunk.length = r->buffer_attr.fragsize;
1262
1263             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1264
1265             pa_memblockq_drop(r->memblockq, schunk.length);
1266             pa_memblock_unref(schunk.memblock);
1267
1268             return;
1269         }
1270     }
1271 }
1272
1273 /*** sink input callbacks ***/
1274
1275 /* Called from thread context */
1276 static void handle_seek(playback_stream *s, int64_t indexw) {
1277     playback_stream_assert_ref(s);
1278
1279 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1280
1281     if (s->sink_input->thread_info.underrun_for > 0) {
1282
1283 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1284
1285         if (pa_memblockq_is_readable(s->memblockq)) {
1286
1287             /* We just ended an underrun, let's ask the sink
1288              * for a complete rewind rewrite */
1289
1290             pa_log_debug("Requesting rewind due to end of underrun.");
1291             pa_sink_input_request_rewind(s->sink_input,
1292                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1293                                          FALSE, TRUE, FALSE);
1294         }
1295
1296     } else {
1297         int64_t indexr;
1298
1299         indexr = pa_memblockq_get_read_index(s->memblockq);
1300
1301         if (indexw < indexr) {
1302             /* OK, the sink already asked for this data, so
1303              * let's have it usk us again */
1304
1305             pa_log_debug("Requesting rewind due to rewrite.");
1306             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1307         }
1308     }
1309
1310     playback_stream_request_bytes(s);
1311 }
1312
1313 /* Called from thread context */
1314 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1315     pa_sink_input *i = PA_SINK_INPUT(o);
1316     playback_stream *s;
1317
1318     pa_sink_input_assert_ref(i);
1319     s = PLAYBACK_STREAM(i->userdata);
1320     playback_stream_assert_ref(s);
1321
1322     switch (code) {
1323
1324         case SINK_INPUT_MESSAGE_SEEK: {
1325             int64_t windex;
1326
1327             windex = pa_memblockq_get_write_index(s->memblockq);
1328
1329             /* The client side is incapable of accounting correctly
1330              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1331              * able to deal with that. */
1332
1333             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1334
1335             handle_seek(s, windex);
1336             return 0;
1337         }
1338
1339         case SINK_INPUT_MESSAGE_POST_DATA: {
1340             int64_t windex;
1341
1342             pa_assert(chunk);
1343
1344             windex = pa_memblockq_get_write_index(s->memblockq);
1345
1346 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1347
1348             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1349
1350                 if (pa_log_ratelimit())
1351                     pa_log_warn("Failed to push data into queue");
1352                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1353                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1354             }
1355
1356             handle_seek(s, windex);
1357
1358 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1359
1360             return 0;
1361         }
1362
1363         case SINK_INPUT_MESSAGE_DRAIN:
1364         case SINK_INPUT_MESSAGE_FLUSH:
1365         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1366         case SINK_INPUT_MESSAGE_TRIGGER: {
1367
1368             int64_t windex;
1369             pa_sink_input *isync;
1370             void (*func)(pa_memblockq *bq);
1371
1372             switch  (code) {
1373                 case SINK_INPUT_MESSAGE_FLUSH:
1374                     func = pa_memblockq_flush_write;
1375                     break;
1376
1377                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1378                     func = pa_memblockq_prebuf_force;
1379                     break;
1380
1381                 case SINK_INPUT_MESSAGE_DRAIN:
1382                 case SINK_INPUT_MESSAGE_TRIGGER:
1383                     func = pa_memblockq_prebuf_disable;
1384                     break;
1385
1386                 default:
1387                     pa_assert_not_reached();
1388             }
1389
1390             windex = pa_memblockq_get_write_index(s->memblockq);
1391             func(s->memblockq);
1392             handle_seek(s, windex);
1393
1394             /* Do the same for all other members in the sync group */
1395             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1396                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1397                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1398                 func(ssync->memblockq);
1399                 handle_seek(ssync, windex);
1400             }
1401
1402             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1403                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1404                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1405                 func(ssync->memblockq);
1406                 handle_seek(ssync, windex);
1407             }
1408
1409             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1410                 if (!pa_memblockq_is_readable(s->memblockq))
1411                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1412                 else {
1413                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1414                     s->drain_request = TRUE;
1415                 }
1416             }
1417
1418             return 0;
1419         }
1420
1421         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1422             /* Atomically get a snapshot of all timing parameters... */
1423             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1424             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1425             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1426             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1427             s->underrun_for = s->sink_input->thread_info.underrun_for;
1428             s->playing_for = s->sink_input->thread_info.playing_for;
1429
1430             return 0;
1431
1432         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1433             int64_t windex;
1434
1435             windex = pa_memblockq_get_write_index(s->memblockq);
1436
1437             pa_memblockq_prebuf_force(s->memblockq);
1438
1439             handle_seek(s, windex);
1440
1441             /* Fall through to the default handler */
1442             break;
1443         }
1444
1445         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1446             pa_usec_t *r = userdata;
1447
1448             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1449
1450             /* Fall through, the default handler will add in the extra
1451              * latency added by the resampler */
1452             break;
1453         }
1454
1455         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1456             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1457             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1458             return 0;
1459         }
1460     }
1461
1462     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1463 }
1464
1465 /* Called from thread context */
1466 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1467     playback_stream *s;
1468
1469     pa_sink_input_assert_ref(i);
1470     s = PLAYBACK_STREAM(i->userdata);
1471     playback_stream_assert_ref(s);
1472     pa_assert(chunk);
1473
1474 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1475
1476     if (pa_memblockq_is_readable(s->memblockq))
1477         s->is_underrun = FALSE;
1478     else {
1479         if (!s->is_underrun)
1480             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1481
1482         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1483             s->drain_request = FALSE;
1484             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1485         } else if (!s->is_underrun)
1486             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1487
1488         s->is_underrun = TRUE;
1489
1490         playback_stream_request_bytes(s);
1491     }
1492
1493     /* This call will not fail with prebuf=0, hence we check for
1494        underrun explicitly above */
1495     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1496         return -1;
1497
1498     chunk->length = PA_MIN(nbytes, chunk->length);
1499
1500     if (i->thread_info.underrun_for > 0)
1501         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1502
1503     pa_memblockq_drop(s->memblockq, chunk->length);
1504     playback_stream_request_bytes(s);
1505
1506     return 0;
1507 }
1508
1509 /* Called from thread context */
1510 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1511     playback_stream *s;
1512
1513     pa_sink_input_assert_ref(i);
1514     s = PLAYBACK_STREAM(i->userdata);
1515     playback_stream_assert_ref(s);
1516
1517     /* If we are in an underrun, then we don't rewind */
1518     if (i->thread_info.underrun_for > 0)
1519         return;
1520
1521     pa_memblockq_rewind(s->memblockq, nbytes);
1522 }
1523
1524 /* Called from thread context */
1525 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1526     playback_stream *s;
1527
1528     pa_sink_input_assert_ref(i);
1529     s = PLAYBACK_STREAM(i->userdata);
1530     playback_stream_assert_ref(s);
1531
1532     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1533 }
1534
1535 /* Called from thread context */
1536 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1537     playback_stream *s;
1538     size_t new_tlength, old_tlength;
1539
1540     pa_sink_input_assert_ref(i);
1541     s = PLAYBACK_STREAM(i->userdata);
1542     playback_stream_assert_ref(s);
1543
1544     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1545     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1546
1547     if (old_tlength < new_tlength) {
1548         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1549         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1550         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1551
1552         if (new_tlength == old_tlength)
1553             pa_log_debug("Failed to increase tlength");
1554         else {
1555             pa_log_debug("Notifying client about increased tlength");
1556             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1557         }
1558     }
1559 }
1560
1561 /* Called from main context */
1562 static void sink_input_kill_cb(pa_sink_input *i) {
1563     playback_stream *s;
1564
1565     pa_sink_input_assert_ref(i);
1566     s = PLAYBACK_STREAM(i->userdata);
1567     playback_stream_assert_ref(s);
1568
1569     playback_stream_send_killed(s);
1570     playback_stream_unlink(s);
1571 }
1572
1573 /* Called from main context */
1574 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1575     playback_stream *s;
1576     pa_tagstruct *t;
1577
1578     pa_sink_input_assert_ref(i);
1579     s = PLAYBACK_STREAM(i->userdata);
1580     playback_stream_assert_ref(s);
1581
1582     if (s->connection->version < 15)
1583       return;
1584
1585     t = pa_tagstruct_new(NULL, 0);
1586     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1587     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1588     pa_tagstruct_putu32(t, s->index);
1589     pa_tagstruct_puts(t, event);
1590     pa_tagstruct_put_proplist(t, pl);
1591     pa_pstream_send_tagstruct(s->connection->pstream, t);
1592 }
1593
1594 /* Called from main context */
1595 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1596     playback_stream *s;
1597     pa_tagstruct *t;
1598
1599     pa_sink_input_assert_ref(i);
1600     s = PLAYBACK_STREAM(i->userdata);
1601     playback_stream_assert_ref(s);
1602
1603     if (s->connection->version < 12)
1604       return;
1605
1606     t = pa_tagstruct_new(NULL, 0);
1607     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1608     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1609     pa_tagstruct_putu32(t, s->index);
1610     pa_tagstruct_put_boolean(t, suspend);
1611     pa_pstream_send_tagstruct(s->connection->pstream, t);
1612 }
1613
1614 /* Called from main context */
1615 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1616     playback_stream *s;
1617     pa_tagstruct *t;
1618
1619     pa_sink_input_assert_ref(i);
1620     s = PLAYBACK_STREAM(i->userdata);
1621     playback_stream_assert_ref(s);
1622
1623     if (!dest)
1624         return;
1625
1626     fix_playback_buffer_attr(s);
1627     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1628     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1629
1630     if (s->connection->version < 12)
1631       return;
1632
1633     t = pa_tagstruct_new(NULL, 0);
1634     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1635     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1636     pa_tagstruct_putu32(t, s->index);
1637     pa_tagstruct_putu32(t, dest->index);
1638     pa_tagstruct_puts(t, dest->name);
1639     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1640
1641     if (s->connection->version >= 13) {
1642         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1643         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1644         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1645         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1646         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1647     }
1648
1649     pa_pstream_send_tagstruct(s->connection->pstream, t);
1650 }
1651
1652 /*** source_output callbacks ***/
1653
1654 /* Called from thread context */
1655 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1656     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1657     record_stream *s;
1658
1659     pa_source_output_assert_ref(o);
1660     s = RECORD_STREAM(o->userdata);
1661     record_stream_assert_ref(s);
1662
1663     switch (code) {
1664         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1665             /* Atomically get a snapshot of all timing parameters... */
1666             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1667             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1668             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1669             return 0;
1670     }
1671
1672     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1673 }
1674
1675 /* Called from thread context */
1676 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1677     record_stream *s;
1678
1679     pa_source_output_assert_ref(o);
1680     s = RECORD_STREAM(o->userdata);
1681     record_stream_assert_ref(s);
1682     pa_assert(chunk);
1683
1684     pa_atomic_add(&s->on_the_fly, chunk->length);
1685     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1686 }
1687
1688 static void source_output_kill_cb(pa_source_output *o) {
1689     record_stream *s;
1690
1691     pa_source_output_assert_ref(o);
1692     s = RECORD_STREAM(o->userdata);
1693     record_stream_assert_ref(s);
1694
1695     record_stream_send_killed(s);
1696     record_stream_unlink(s);
1697 }
1698
1699 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1700     record_stream *s;
1701
1702     pa_source_output_assert_ref(o);
1703     s = RECORD_STREAM(o->userdata);
1704     record_stream_assert_ref(s);
1705
1706     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1707
1708     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1709 }
1710
1711 /* Called from main context */
1712 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1713     record_stream *s;
1714     pa_tagstruct *t;
1715
1716     pa_source_output_assert_ref(o);
1717     s = RECORD_STREAM(o->userdata);
1718     record_stream_assert_ref(s);
1719
1720     if (s->connection->version < 15)
1721       return;
1722
1723     t = pa_tagstruct_new(NULL, 0);
1724     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1725     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1726     pa_tagstruct_putu32(t, s->index);
1727     pa_tagstruct_puts(t, event);
1728     pa_tagstruct_put_proplist(t, pl);
1729     pa_pstream_send_tagstruct(s->connection->pstream, t);
1730 }
1731
1732 /* Called from main context */
1733 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1734     record_stream *s;
1735     pa_tagstruct *t;
1736
1737     pa_source_output_assert_ref(o);
1738     s = RECORD_STREAM(o->userdata);
1739     record_stream_assert_ref(s);
1740
1741     if (s->connection->version < 12)
1742       return;
1743
1744     t = pa_tagstruct_new(NULL, 0);
1745     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1746     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1747     pa_tagstruct_putu32(t, s->index);
1748     pa_tagstruct_put_boolean(t, suspend);
1749     pa_pstream_send_tagstruct(s->connection->pstream, t);
1750 }
1751
1752 /* Called from main context */
1753 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1754     record_stream *s;
1755     pa_tagstruct *t;
1756
1757     pa_source_output_assert_ref(o);
1758     s = RECORD_STREAM(o->userdata);
1759     record_stream_assert_ref(s);
1760
1761     if (!dest)
1762         return;
1763
1764     fix_record_buffer_attr_pre(s);
1765     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1766     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1767     fix_record_buffer_attr_post(s);
1768
1769     if (s->connection->version < 12)
1770       return;
1771
1772     t = pa_tagstruct_new(NULL, 0);
1773     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1774     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1775     pa_tagstruct_putu32(t, s->index);
1776     pa_tagstruct_putu32(t, dest->index);
1777     pa_tagstruct_puts(t, dest->name);
1778     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1779
1780     if (s->connection->version >= 13) {
1781         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1782         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1783         pa_tagstruct_put_usec(t, s->configured_source_latency);
1784     }
1785
1786     pa_pstream_send_tagstruct(s->connection->pstream, t);
1787 }
1788
1789 /*** pdispatch callbacks ***/
1790
1791 static void protocol_error(pa_native_connection *c) {
1792     pa_log("protocol error, kicking client");
1793     native_connection_unlink(c);
1794 }
1795
1796 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1797 if (!(expression)) { \
1798     pa_pstream_send_error((pstream), (tag), (error)); \
1799     return; \
1800 } \
1801 } while(0);
1802
1803 static pa_tagstruct *reply_new(uint32_t tag) {
1804     pa_tagstruct *reply;
1805
1806     reply = pa_tagstruct_new(NULL, 0);
1807     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1808     pa_tagstruct_putu32(reply, tag);
1809     return reply;
1810 }
1811
1812 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1813     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1814     playback_stream *s;
1815     uint32_t sink_index, syncid, missing;
1816     pa_buffer_attr attr;
1817     const char *name = NULL, *sink_name;
1818     pa_sample_spec ss;
1819     pa_channel_map map;
1820     pa_tagstruct *reply;
1821     pa_sink *sink = NULL;
1822     pa_cvolume volume;
1823     pa_bool_t
1824         corked = FALSE,
1825         no_remap = FALSE,
1826         no_remix = FALSE,
1827         fix_format = FALSE,
1828         fix_rate = FALSE,
1829         fix_channels = FALSE,
1830         no_move = FALSE,
1831         variable_rate = FALSE,
1832         muted = FALSE,
1833         adjust_latency = FALSE,
1834         early_requests = FALSE,
1835         dont_inhibit_auto_suspend = FALSE,
1836         muted_set = FALSE,
1837         fail_on_suspend = FALSE;
1838     pa_sink_input_flags_t flags = 0;
1839     pa_proplist *p;
1840     pa_bool_t volume_set = TRUE;
1841     int ret = PA_ERR_INVALID;
1842
1843     pa_native_connection_assert_ref(c);
1844     pa_assert(t);
1845     memset(&attr, 0, sizeof(attr));
1846
1847     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1848         pa_tagstruct_get(
1849                 t,
1850                 PA_TAG_SAMPLE_SPEC, &ss,
1851                 PA_TAG_CHANNEL_MAP, &map,
1852                 PA_TAG_U32, &sink_index,
1853                 PA_TAG_STRING, &sink_name,
1854                 PA_TAG_U32, &attr.maxlength,
1855                 PA_TAG_BOOLEAN, &corked,
1856                 PA_TAG_U32, &attr.tlength,
1857                 PA_TAG_U32, &attr.prebuf,
1858                 PA_TAG_U32, &attr.minreq,
1859                 PA_TAG_U32, &syncid,
1860                 PA_TAG_CVOLUME, &volume,
1861                 PA_TAG_INVALID) < 0) {
1862
1863         protocol_error(c);
1864         return;
1865     }
1866
1867     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1868     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1869     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1870     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1871     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1872     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1873     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1874     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1875
1876     p = pa_proplist_new();
1877
1878     if (name)
1879         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1880
1881     if (c->version >= 12)  {
1882         /* Since 0.9.8 the user can ask for a couple of additional flags */
1883
1884         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1885             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1886             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1887             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1888             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1889             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1890             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1891
1892             protocol_error(c);
1893             pa_proplist_free(p);
1894             return;
1895         }
1896     }
1897
1898     if (c->version >= 13) {
1899
1900         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1901             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1902             pa_tagstruct_get_proplist(t, p) < 0) {
1903             protocol_error(c);
1904             pa_proplist_free(p);
1905             return;
1906         }
1907     }
1908
1909     if (c->version >= 14) {
1910
1911         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1912             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1913             protocol_error(c);
1914             pa_proplist_free(p);
1915             return;
1916         }
1917     }
1918
1919     if (c->version >= 15) {
1920
1921         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1922             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1923             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1924             protocol_error(c);
1925             pa_proplist_free(p);
1926             return;
1927         }
1928     }
1929
1930     if (!pa_tagstruct_eof(t)) {
1931         protocol_error(c);
1932         pa_proplist_free(p);
1933         return;
1934     }
1935
1936     if (sink_index != PA_INVALID_INDEX) {
1937
1938         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1939             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1940             pa_proplist_free(p);
1941             return;
1942         }
1943
1944     } else if (sink_name) {
1945
1946         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1947             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1948             pa_proplist_free(p);
1949             return;
1950         }
1951     }
1952
1953     flags =
1954         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1955         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1956         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1957         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1958         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1959         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1960         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1961         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1962         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1963         (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0);
1964
1965     /* Only since protocol version 15 there's a seperate muted_set
1966      * flag. For older versions we synthesize it here */
1967     muted_set = muted_set || muted;
1968
1969     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1970     pa_proplist_free(p);
1971
1972     CHECK_VALIDITY(c->pstream, s, tag, ret);
1973
1974     reply = reply_new(tag);
1975     pa_tagstruct_putu32(reply, s->index);
1976     pa_assert(s->sink_input);
1977     pa_tagstruct_putu32(reply, s->sink_input->index);
1978     pa_tagstruct_putu32(reply, missing);
1979
1980 /*     pa_log("initial request is %u", missing); */
1981
1982     if (c->version >= 9) {
1983         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1984
1985         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1986         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1987         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1988         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1989     }
1990
1991     if (c->version >= 12) {
1992         /* Since 0.9.8 we support sending the chosen sample
1993          * spec/channel map/device/suspend status back to the
1994          * client */
1995
1996         pa_tagstruct_put_sample_spec(reply, &ss);
1997         pa_tagstruct_put_channel_map(reply, &map);
1998
1999         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
2000         pa_tagstruct_puts(reply, s->sink_input->sink->name);
2001
2002         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2003     }
2004
2005     if (c->version >= 13)
2006         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2007
2008     pa_pstream_send_tagstruct(c->pstream, reply);
2009 }
2010
2011 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2012     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2013     uint32_t channel;
2014
2015     pa_native_connection_assert_ref(c);
2016     pa_assert(t);
2017
2018     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2019         !pa_tagstruct_eof(t)) {
2020         protocol_error(c);
2021         return;
2022     }
2023
2024     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2025
2026     switch (command) {
2027
2028         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2029             playback_stream *s;
2030             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2031                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2032                 return;
2033             }
2034
2035             playback_stream_unlink(s);
2036             break;
2037         }
2038
2039         case PA_COMMAND_DELETE_RECORD_STREAM: {
2040             record_stream *s;
2041             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2042                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2043                 return;
2044             }
2045
2046             record_stream_unlink(s);
2047             break;
2048         }
2049
2050         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2051             upload_stream *s;
2052
2053             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2054                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2055                 return;
2056             }
2057
2058             upload_stream_unlink(s);
2059             break;
2060         }
2061
2062         default:
2063             pa_assert_not_reached();
2064     }
2065
2066     pa_pstream_send_simple_ack(c->pstream, tag);
2067 }
2068
2069 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2070     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2071     record_stream *s;
2072     pa_buffer_attr attr;
2073     uint32_t source_index;
2074     const char *name = NULL, *source_name;
2075     pa_sample_spec ss;
2076     pa_channel_map map;
2077     pa_tagstruct *reply;
2078     pa_source *source = NULL;
2079     pa_bool_t
2080         corked = FALSE,
2081         no_remap = FALSE,
2082         no_remix = FALSE,
2083         fix_format = FALSE,
2084         fix_rate = FALSE,
2085         fix_channels = FALSE,
2086         no_move = FALSE,
2087         variable_rate = FALSE,
2088         adjust_latency = FALSE,
2089         peak_detect = FALSE,
2090         early_requests = FALSE,
2091         dont_inhibit_auto_suspend = FALSE,
2092         fail_on_suspend = FALSE;
2093     pa_source_output_flags_t flags = 0;
2094     pa_proplist *p;
2095     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2096     pa_sink_input *direct_on_input = NULL;
2097     int ret = PA_ERR_INVALID;
2098
2099     pa_native_connection_assert_ref(c);
2100     pa_assert(t);
2101
2102     memset(&attr, 0, sizeof(attr));
2103
2104     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2105         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2106         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2107         pa_tagstruct_getu32(t, &source_index) < 0 ||
2108         pa_tagstruct_gets(t, &source_name) < 0 ||
2109         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2110         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2111         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2112         protocol_error(c);
2113         return;
2114     }
2115
2116     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2117     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2118     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2119     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2120     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2121     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2122     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2123
2124     p = pa_proplist_new();
2125
2126     if (name)
2127         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2128
2129     if (c->version >= 12)  {
2130         /* Since 0.9.8 the user can ask for a couple of additional flags */
2131
2132         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2133             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2134             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2135             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2136             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2137             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2138             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2139
2140             protocol_error(c);
2141             pa_proplist_free(p);
2142             return;
2143         }
2144     }
2145
2146     if (c->version >= 13) {
2147
2148         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2149             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2150             pa_tagstruct_get_proplist(t, p) < 0 ||
2151             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2152             protocol_error(c);
2153             pa_proplist_free(p);
2154             return;
2155         }
2156     }
2157
2158     if (c->version >= 14) {
2159
2160         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2161             protocol_error(c);
2162             pa_proplist_free(p);
2163             return;
2164         }
2165     }
2166
2167     if (c->version >= 15) {
2168
2169         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2170             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2171             protocol_error(c);
2172             pa_proplist_free(p);
2173             return;
2174         }
2175     }
2176
2177     if (!pa_tagstruct_eof(t)) {
2178         protocol_error(c);
2179         pa_proplist_free(p);
2180         return;
2181     }
2182
2183     if (source_index != PA_INVALID_INDEX) {
2184
2185         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2186             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2187             pa_proplist_free(p);
2188             return;
2189         }
2190
2191     } else if (source_name) {
2192
2193         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2194             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2195             pa_proplist_free(p);
2196             return;
2197         }
2198     }
2199
2200     if (direct_on_input_idx != PA_INVALID_INDEX) {
2201
2202         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2203             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2204             pa_proplist_free(p);
2205             return;
2206         }
2207     }
2208
2209     flags =
2210         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2211         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2212         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2213         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2214         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2215         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2216         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2217         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2218         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2219         (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0);
2220
2221     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2222     pa_proplist_free(p);
2223
2224     CHECK_VALIDITY(c->pstream, s, tag, ret);
2225
2226     reply = reply_new(tag);
2227     pa_tagstruct_putu32(reply, s->index);
2228     pa_assert(s->source_output);
2229     pa_tagstruct_putu32(reply, s->source_output->index);
2230
2231     if (c->version >= 9) {
2232         /* Since 0.9 we support sending the buffer metrics back to the client */
2233
2234         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2235         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2236     }
2237
2238     if (c->version >= 12) {
2239         /* Since 0.9.8 we support sending the chosen sample
2240          * spec/channel map/device/suspend status back to the
2241          * client */
2242
2243         pa_tagstruct_put_sample_spec(reply, &ss);
2244         pa_tagstruct_put_channel_map(reply, &map);
2245
2246         pa_tagstruct_putu32(reply, s->source_output->source->index);
2247         pa_tagstruct_puts(reply, s->source_output->source->name);
2248
2249         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2250     }
2251
2252     if (c->version >= 13)
2253         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2254
2255     pa_pstream_send_tagstruct(c->pstream, reply);
2256 }
2257
2258 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2259     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2260     int ret;
2261
2262     pa_native_connection_assert_ref(c);
2263     pa_assert(t);
2264
2265     if (!pa_tagstruct_eof(t)) {
2266         protocol_error(c);
2267         return;
2268     }
2269
2270     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2271     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2272     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2273
2274     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2275 }
2276
2277 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2278     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2279     const void*cookie;
2280     pa_tagstruct *reply;
2281     pa_bool_t shm_on_remote = FALSE, do_shm;
2282
2283     pa_native_connection_assert_ref(c);
2284     pa_assert(t);
2285
2286     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2287         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2288         !pa_tagstruct_eof(t)) {
2289         protocol_error(c);
2290         return;
2291     }
2292
2293     /* Minimum supported version */
2294     if (c->version < 8) {
2295         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2296         return;
2297     }
2298
2299     /* Starting with protocol version 13 the MSB of the version tag
2300        reflects if shm is available for this pa_native_connection or
2301        not. */
2302     if (c->version >= 13) {
2303         shm_on_remote = !!(c->version & 0x80000000U);
2304         c->version &= 0x7FFFFFFFU;
2305     }
2306
2307     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2308
2309     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2310
2311     if (!c->authorized) {
2312         pa_bool_t success = FALSE;
2313
2314 #ifdef HAVE_CREDS
2315         const pa_creds *creds;
2316
2317         if ((creds = pa_pdispatch_creds(pd))) {
2318             if (creds->uid == getuid())
2319                 success = TRUE;
2320             else if (c->options->auth_group) {
2321                 int r;
2322                 gid_t gid;
2323
2324                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2325                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2326                 else if (gid == creds->gid)
2327                     success = TRUE;
2328
2329                 if (!success) {
2330                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2331                         pa_log_warn("Failed to check group membership.");
2332                     else if (r > 0)
2333                         success = TRUE;
2334                 }
2335             }
2336
2337             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2338                         (unsigned long) creds->uid,
2339                         (unsigned long) creds->gid,
2340                         (int) success);
2341         }
2342 #endif
2343
2344         if (!success && c->options->auth_cookie) {
2345             const uint8_t *ac;
2346
2347             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2348                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2349                     success = TRUE;
2350         }
2351
2352         if (!success) {
2353             pa_log_warn("Denied access to client with invalid authorization data.");
2354             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2355             return;
2356         }
2357
2358         c->authorized = TRUE;
2359         if (c->auth_timeout_event) {
2360             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2361             c->auth_timeout_event = NULL;
2362         }
2363     }
2364
2365     /* Enable shared memory support if possible */
2366     do_shm =
2367         pa_mempool_is_shared(c->protocol->core->mempool) &&
2368         c->is_local;
2369
2370     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2371
2372     if (do_shm)
2373         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2374             do_shm = FALSE;
2375
2376 #ifdef HAVE_CREDS
2377     if (do_shm) {
2378         /* Only enable SHM if both sides are owned by the same
2379          * user. This is a security measure because otherwise data
2380          * private to the user might leak. */
2381
2382         const pa_creds *creds;
2383         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2384             do_shm = FALSE;
2385     }
2386 #endif
2387
2388     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2389     pa_pstream_enable_shm(c->pstream, do_shm);
2390
2391     reply = reply_new(tag);
2392     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2393
2394 #ifdef HAVE_CREDS
2395 {
2396     /* SHM support is only enabled after both sides made sure they are the same user. */
2397
2398     pa_creds ucred;
2399
2400     ucred.uid = getuid();
2401     ucred.gid = getgid();
2402
2403     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2404 }
2405 #else
2406     pa_pstream_send_tagstruct(c->pstream, reply);
2407 #endif
2408 }
2409
2410 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2411     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2412     const char *name = NULL;
2413     pa_proplist *p;
2414     pa_tagstruct *reply;
2415
2416     pa_native_connection_assert_ref(c);
2417     pa_assert(t);
2418
2419     p = pa_proplist_new();
2420
2421     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2422         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2423         !pa_tagstruct_eof(t)) {
2424
2425         protocol_error(c);
2426         pa_proplist_free(p);
2427         return;
2428     }
2429
2430     if (name)
2431         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2432             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2433             pa_proplist_free(p);
2434             return;
2435         }
2436
2437     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2438     pa_proplist_free(p);
2439
2440     reply = reply_new(tag);
2441
2442     if (c->version >= 13)
2443         pa_tagstruct_putu32(reply, c->client->index);
2444
2445     pa_pstream_send_tagstruct(c->pstream, reply);
2446 }
2447
2448 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2449     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2450     const char *name;
2451     uint32_t idx = PA_IDXSET_INVALID;
2452
2453     pa_native_connection_assert_ref(c);
2454     pa_assert(t);
2455
2456     if (pa_tagstruct_gets(t, &name) < 0 ||
2457         !pa_tagstruct_eof(t)) {
2458         protocol_error(c);
2459         return;
2460     }
2461
2462     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2463     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2464
2465     if (command == PA_COMMAND_LOOKUP_SINK) {
2466         pa_sink *sink;
2467         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2468             idx = sink->index;
2469     } else {
2470         pa_source *source;
2471         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2472         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2473             idx = source->index;
2474     }
2475
2476     if (idx == PA_IDXSET_INVALID)
2477         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2478     else {
2479         pa_tagstruct *reply;
2480         reply = reply_new(tag);
2481         pa_tagstruct_putu32(reply, idx);
2482         pa_pstream_send_tagstruct(c->pstream, reply);
2483     }
2484 }
2485
2486 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2487     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2488     uint32_t idx;
2489     playback_stream *s;
2490
2491     pa_native_connection_assert_ref(c);
2492     pa_assert(t);
2493
2494     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2495         !pa_tagstruct_eof(t)) {
2496         protocol_error(c);
2497         return;
2498     }
2499
2500     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2501     s = pa_idxset_get_by_index(c->output_streams, idx);
2502     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2503     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2504
2505     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2506 }
2507
2508 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2509     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2510     pa_tagstruct *reply;
2511     const pa_mempool_stat *stat;
2512
2513     pa_native_connection_assert_ref(c);
2514     pa_assert(t);
2515
2516     if (!pa_tagstruct_eof(t)) {
2517         protocol_error(c);
2518         return;
2519     }
2520
2521     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2522
2523     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2524
2525     reply = reply_new(tag);
2526     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2527     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2528     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2529     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2530     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2531     pa_pstream_send_tagstruct(c->pstream, reply);
2532 }
2533
2534 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2535     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2536     pa_tagstruct *reply;
2537     playback_stream *s;
2538     struct timeval tv, now;
2539     uint32_t idx;
2540
2541     pa_native_connection_assert_ref(c);
2542     pa_assert(t);
2543
2544     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2545         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2546         !pa_tagstruct_eof(t)) {
2547         protocol_error(c);
2548         return;
2549     }
2550
2551     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2552     s = pa_idxset_get_by_index(c->output_streams, idx);
2553     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2554     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2555
2556     /* Get an atomic snapshot of all timing parameters */
2557     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2558
2559     reply = reply_new(tag);
2560     pa_tagstruct_put_usec(reply,
2561                           s->current_sink_latency +
2562                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2563     pa_tagstruct_put_usec(reply, 0);
2564     pa_tagstruct_put_boolean(reply,
2565                              s->playing_for > 0 &&
2566                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2567                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2568     pa_tagstruct_put_timeval(reply, &tv);
2569     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2570     pa_tagstruct_puts64(reply, s->write_index);
2571     pa_tagstruct_puts64(reply, s->read_index);
2572
2573     if (c->version >= 13) {
2574         pa_tagstruct_putu64(reply, s->underrun_for);
2575         pa_tagstruct_putu64(reply, s->playing_for);
2576     }
2577
2578     pa_pstream_send_tagstruct(c->pstream, reply);
2579 }
2580
2581 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2582     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2583     pa_tagstruct *reply;
2584     record_stream *s;
2585     struct timeval tv, now;
2586     uint32_t idx;
2587
2588     pa_native_connection_assert_ref(c);
2589     pa_assert(t);
2590
2591     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2592         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2593         !pa_tagstruct_eof(t)) {
2594         protocol_error(c);
2595         return;
2596     }
2597
2598     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2599     s = pa_idxset_get_by_index(c->record_streams, idx);
2600     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2601
2602     /* Get an atomic snapshot of all timing parameters */
2603     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2604
2605     reply = reply_new(tag);
2606     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2607     pa_tagstruct_put_usec(reply,
2608                           s->current_source_latency +
2609                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2610     pa_tagstruct_put_boolean(reply,
2611                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2612                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2613     pa_tagstruct_put_timeval(reply, &tv);
2614     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2615     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2616     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2617     pa_pstream_send_tagstruct(c->pstream, reply);
2618 }
2619
2620 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2621     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2622     upload_stream *s;
2623     uint32_t length;
2624     const char *name = NULL;
2625     pa_sample_spec ss;
2626     pa_channel_map map;
2627     pa_tagstruct *reply;
2628     pa_proplist *p;
2629
2630     pa_native_connection_assert_ref(c);
2631     pa_assert(t);
2632
2633     if (pa_tagstruct_gets(t, &name) < 0 ||
2634         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2635         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2636         pa_tagstruct_getu32(t, &length) < 0) {
2637         protocol_error(c);
2638         return;
2639     }
2640
2641     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2642     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2643     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2644     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2645     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2646     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2647
2648     p = pa_proplist_new();
2649
2650     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2651         !pa_tagstruct_eof(t)) {
2652
2653         protocol_error(c);
2654         pa_proplist_free(p);
2655         return;
2656     }
2657
2658     if (c->version < 13)
2659         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2660     else if (!name)
2661         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2662             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2663
2664     if (!name || !pa_namereg_is_valid_name(name)) {
2665         pa_proplist_free(p);
2666         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2667     }
2668
2669     s = upload_stream_new(c, &ss, &map, name, length, p);
2670     pa_proplist_free(p);
2671
2672     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2673
2674     reply = reply_new(tag);
2675     pa_tagstruct_putu32(reply, s->index);
2676     pa_tagstruct_putu32(reply, length);
2677     pa_pstream_send_tagstruct(c->pstream, reply);
2678 }
2679
2680 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2681     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2682     uint32_t channel;
2683     upload_stream *s;
2684     uint32_t idx;
2685
2686     pa_native_connection_assert_ref(c);
2687     pa_assert(t);
2688
2689     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2690         !pa_tagstruct_eof(t)) {
2691         protocol_error(c);
2692         return;
2693     }
2694
2695     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2696
2697     s = pa_idxset_get_by_index(c->output_streams, channel);
2698     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2699     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2700
2701     if (!s->memchunk.memblock)
2702         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2703     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2704         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2705     else
2706         pa_pstream_send_simple_ack(c->pstream, tag);
2707
2708     upload_stream_unlink(s);
2709 }
2710
2711 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2712     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2713     uint32_t sink_index;
2714     pa_volume_t volume;
2715     pa_sink *sink;
2716     const char *name, *sink_name;
2717     uint32_t idx;
2718     pa_proplist *p;
2719     pa_tagstruct *reply;
2720
2721     pa_native_connection_assert_ref(c);
2722     pa_assert(t);
2723
2724     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2725
2726     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2727         pa_tagstruct_gets(t, &sink_name) < 0 ||
2728         pa_tagstruct_getu32(t, &volume) < 0 ||
2729         pa_tagstruct_gets(t, &name) < 0) {
2730         protocol_error(c);
2731         return;
2732     }
2733
2734     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2735     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2736     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2737     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2738
2739     if (sink_index != PA_INVALID_INDEX)
2740         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2741     else
2742         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2743
2744     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2745
2746     p = pa_proplist_new();
2747
2748     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2749         !pa_tagstruct_eof(t)) {
2750         protocol_error(c);
2751         pa_proplist_free(p);
2752         return;
2753     }
2754
2755     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2756
2757     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2758         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2759         pa_proplist_free(p);
2760         return;
2761     }
2762
2763     pa_proplist_free(p);
2764
2765     reply = reply_new(tag);
2766
2767     if (c->version >= 13)
2768         pa_tagstruct_putu32(reply, idx);
2769
2770     pa_pstream_send_tagstruct(c->pstream, reply);
2771 }
2772
2773 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2774     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2775     const char *name;
2776
2777     pa_native_connection_assert_ref(c);
2778     pa_assert(t);
2779
2780     if (pa_tagstruct_gets(t, &name) < 0 ||
2781         !pa_tagstruct_eof(t)) {
2782         protocol_error(c);
2783         return;
2784     }
2785
2786     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2787     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2788
2789     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2790         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2791         return;
2792     }
2793
2794     pa_pstream_send_simple_ack(c->pstream, tag);
2795 }
2796
2797 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2798     pa_assert(c);
2799     pa_assert(fixed);
2800     pa_assert(original);
2801
2802     *fixed = *original;
2803
2804     if (c->version < 12) {
2805         /* Before protocol version 12 we didn't support S32 samples,
2806          * so we need to lie about this to the client */
2807
2808         if (fixed->format == PA_SAMPLE_S32LE)
2809             fixed->format = PA_SAMPLE_FLOAT32LE;
2810         if (fixed->format == PA_SAMPLE_S32BE)
2811             fixed->format = PA_SAMPLE_FLOAT32BE;
2812     }
2813
2814     if (c->version < 15) {
2815         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2816             fixed->format = PA_SAMPLE_FLOAT32LE;
2817         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2818             fixed->format = PA_SAMPLE_FLOAT32BE;
2819     }
2820 }
2821
2822 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2823     pa_sample_spec fixed_ss;
2824
2825     pa_assert(t);
2826     pa_sink_assert_ref(sink);
2827
2828     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2829
2830     pa_tagstruct_put(
2831         t,
2832         PA_TAG_U32, sink->index,
2833         PA_TAG_STRING, sink->name,
2834         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2835         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2836         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2837         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2838         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
2839         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2840         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2841         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2842         PA_TAG_USEC, pa_sink_get_latency(sink),
2843         PA_TAG_STRING, sink->driver,
2844         PA_TAG_U32, sink->flags,
2845         PA_TAG_INVALID);
2846
2847     if (c->version >= 13) {
2848         pa_tagstruct_put_proplist(t, sink->proplist);
2849         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2850     }
2851
2852     if (c->version >= 15) {
2853         pa_tagstruct_put_volume(t, sink->base_volume);
2854         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2855             pa_log_error("Internal sink state is invalid.");
2856         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2857         pa_tagstruct_putu32(t, sink->n_volume_steps);
2858         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2859     }
2860
2861     if (c->version >= 16) {
2862         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
2863
2864         if (sink->ports) {
2865             void *state;
2866             pa_device_port *p;
2867
2868             PA_HASHMAP_FOREACH(p, sink->ports, state) {
2869                 pa_tagstruct_puts(t, p->name);
2870                 pa_tagstruct_puts(t, p->description);
2871                 pa_tagstruct_putu32(t, p->priority);
2872             }
2873         }
2874
2875         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
2876     }
2877 }
2878
2879 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2880     pa_sample_spec fixed_ss;
2881
2882     pa_assert(t);
2883     pa_source_assert_ref(source);
2884
2885     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2886
2887     pa_tagstruct_put(
2888         t,
2889         PA_TAG_U32, source->index,
2890         PA_TAG_STRING, source->name,
2891         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2892         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2893         PA_TAG_CHANNEL_MAP, &source->channel_map,
2894         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2895         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2896         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2897         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2898         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2899         PA_TAG_USEC, pa_source_get_latency(source),
2900         PA_TAG_STRING, source->driver,
2901         PA_TAG_U32, source->flags,
2902         PA_TAG_INVALID);
2903
2904     if (c->version >= 13) {
2905         pa_tagstruct_put_proplist(t, source->proplist);
2906         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2907     }
2908
2909     if (c->version >= 15) {
2910         pa_tagstruct_put_volume(t, source->base_volume);
2911         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2912             pa_log_error("Internal source state is invalid.");
2913         pa_tagstruct_putu32(t, pa_source_get_state(source));
2914         pa_tagstruct_putu32(t, source->n_volume_steps);
2915         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2916     }
2917
2918     if (c->version >= 16) {
2919
2920         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
2921
2922         if (source->ports) {
2923             void *state;
2924             pa_device_port *p;
2925
2926             PA_HASHMAP_FOREACH(p, source->ports, state) {
2927                 pa_tagstruct_puts(t, p->name);
2928                 pa_tagstruct_puts(t, p->description);
2929                 pa_tagstruct_putu32(t, p->priority);
2930             }
2931         }
2932
2933         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
2934     }
2935 }
2936
2937 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2938     pa_assert(t);
2939     pa_assert(client);
2940
2941     pa_tagstruct_putu32(t, client->index);
2942     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2943     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2944     pa_tagstruct_puts(t, client->driver);
2945
2946     if (c->version >= 13)
2947         pa_tagstruct_put_proplist(t, client->proplist);
2948 }
2949
2950 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2951     void *state = NULL;
2952     pa_card_profile *p;
2953
2954     pa_assert(t);
2955     pa_assert(card);
2956
2957     pa_tagstruct_putu32(t, card->index);
2958     pa_tagstruct_puts(t, card->name);
2959     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2960     pa_tagstruct_puts(t, card->driver);
2961
2962     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2963
2964     if (card->profiles) {
2965         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2966             pa_tagstruct_puts(t, p->name);
2967             pa_tagstruct_puts(t, p->description);
2968             pa_tagstruct_putu32(t, p->n_sinks);
2969             pa_tagstruct_putu32(t, p->n_sources);
2970             pa_tagstruct_putu32(t, p->priority);
2971         }
2972     }
2973
2974     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2975     pa_tagstruct_put_proplist(t, card->proplist);
2976 }
2977
2978 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2979     pa_assert(t);
2980     pa_assert(module);
2981
2982     pa_tagstruct_putu32(t, module->index);
2983     pa_tagstruct_puts(t, module->name);
2984     pa_tagstruct_puts(t, module->argument);
2985     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2986
2987     if (c->version < 15)
2988         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2989
2990     if (c->version >= 15)
2991         pa_tagstruct_put_proplist(t, module->proplist);
2992 }
2993
2994 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2995     pa_sample_spec fixed_ss;
2996     pa_usec_t sink_latency;
2997     pa_cvolume v;
2998
2999     pa_assert(t);
3000     pa_sink_input_assert_ref(s);
3001
3002     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3003
3004     pa_tagstruct_putu32(t, s->index);
3005     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3006     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3007     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3008     pa_tagstruct_putu32(t, s->sink->index);
3009     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3010     pa_tagstruct_put_channel_map(t, &s->channel_map);
3011     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s, &v, TRUE));
3012     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3013     pa_tagstruct_put_usec(t, sink_latency);
3014     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3015     pa_tagstruct_puts(t, s->driver);
3016     if (c->version >= 11)
3017         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3018     if (c->version >= 13)
3019         pa_tagstruct_put_proplist(t, s->proplist);
3020 }
3021
3022 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3023     pa_sample_spec fixed_ss;
3024     pa_usec_t source_latency;
3025
3026     pa_assert(t);
3027     pa_source_output_assert_ref(s);
3028
3029     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3030
3031     pa_tagstruct_putu32(t, s->index);
3032     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3033     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3034     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3035     pa_tagstruct_putu32(t, s->source->index);
3036     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3037     pa_tagstruct_put_channel_map(t, &s->channel_map);
3038     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3039     pa_tagstruct_put_usec(t, source_latency);
3040     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3041     pa_tagstruct_puts(t, s->driver);
3042
3043     if (c->version >= 13)
3044         pa_tagstruct_put_proplist(t, s->proplist);
3045 }
3046
3047 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3048     pa_sample_spec fixed_ss;
3049     pa_cvolume v;
3050
3051     pa_assert(t);
3052     pa_assert(e);
3053
3054     if (e->memchunk.memblock)
3055         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3056     else
3057         memset(&fixed_ss, 0, sizeof(fixed_ss));
3058
3059     pa_tagstruct_putu32(t, e->index);
3060     pa_tagstruct_puts(t, e->name);
3061
3062     if (e->volume_is_set)
3063         v = e->volume;
3064     else
3065         pa_cvolume_init(&v);
3066
3067     pa_tagstruct_put_cvolume(t, &v);
3068     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3069     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3070     pa_tagstruct_put_channel_map(t, &e->channel_map);
3071     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3072     pa_tagstruct_put_boolean(t, e->lazy);
3073     pa_tagstruct_puts(t, e->filename);
3074
3075     if (c->version >= 13)
3076         pa_tagstruct_put_proplist(t, e->proplist);
3077 }
3078
3079 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3080     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3081     uint32_t idx;
3082     pa_sink *sink = NULL;
3083     pa_source *source = NULL;
3084     pa_client *client = NULL;
3085     pa_card *card = NULL;
3086     pa_module *module = NULL;
3087     pa_sink_input *si = NULL;
3088     pa_source_output *so = NULL;
3089     pa_scache_entry *sce = NULL;
3090     const char *name = NULL;
3091     pa_tagstruct *reply;
3092
3093     pa_native_connection_assert_ref(c);
3094     pa_assert(t);
3095
3096     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3097         (command != PA_COMMAND_GET_CLIENT_INFO &&
3098          command != PA_COMMAND_GET_MODULE_INFO &&
3099          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3100          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3101          pa_tagstruct_gets(t, &name) < 0) ||
3102         !pa_tagstruct_eof(t)) {
3103         protocol_error(c);
3104         return;
3105     }
3106
3107     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3108     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3109     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3110     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3111     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3112
3113     if (command == PA_COMMAND_GET_SINK_INFO) {
3114         if (idx != PA_INVALID_INDEX)
3115             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3116         else
3117             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3118     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3119         if (idx != PA_INVALID_INDEX)
3120             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3121         else
3122             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3123     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3124         if (idx != PA_INVALID_INDEX)
3125             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3126         else
3127             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3128     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3129         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3130     else if (command == PA_COMMAND_GET_MODULE_INFO)
3131         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3132     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3133         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3134     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3135         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3136     else {
3137         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3138         if (idx != PA_INVALID_INDEX)
3139             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3140         else
3141             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3142     }
3143
3144     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3145         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3146         return;
3147     }
3148
3149     reply = reply_new(tag);
3150     if (sink)
3151         sink_fill_tagstruct(c, reply, sink);
3152     else if (source)
3153         source_fill_tagstruct(c, reply, source);
3154     else if (client)
3155         client_fill_tagstruct(c, reply, client);
3156     else if (card)
3157         card_fill_tagstruct(c, reply, card);
3158     else if (module)
3159         module_fill_tagstruct(c, reply, module);
3160     else if (si)
3161         sink_input_fill_tagstruct(c, reply, si);
3162     else if (so)
3163         source_output_fill_tagstruct(c, reply, so);
3164     else
3165         scache_fill_tagstruct(c, reply, sce);
3166     pa_pstream_send_tagstruct(c->pstream, reply);
3167 }
3168
3169 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3170     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3171     pa_idxset *i;
3172     uint32_t idx;
3173     void *p;
3174     pa_tagstruct *reply;
3175
3176     pa_native_connection_assert_ref(c);
3177     pa_assert(t);
3178
3179     if (!pa_tagstruct_eof(t)) {
3180         protocol_error(c);
3181         return;
3182     }
3183
3184     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3185
3186     reply = reply_new(tag);
3187
3188     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3189         i = c->protocol->core->sinks;
3190     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3191         i = c->protocol->core->sources;
3192     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3193         i = c->protocol->core->clients;
3194     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3195         i = c->protocol->core->cards;
3196     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3197         i = c->protocol->core->modules;
3198     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3199         i = c->protocol->core->sink_inputs;
3200     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3201         i = c->protocol->core->source_outputs;
3202     else {
3203         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3204         i = c->protocol->core->scache;
3205     }
3206
3207     if (i) {
3208         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3209             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3210                 sink_fill_tagstruct(c, reply, p);
3211             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3212                 source_fill_tagstruct(c, reply, p);
3213             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3214                 client_fill_tagstruct(c, reply, p);
3215             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3216                 card_fill_tagstruct(c, reply, p);
3217             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3218                 module_fill_tagstruct(c, reply, p);
3219             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3220                 sink_input_fill_tagstruct(c, reply, p);
3221             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3222                 source_output_fill_tagstruct(c, reply, p);
3223             else {
3224                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3225                 scache_fill_tagstruct(c, reply, p);
3226             }
3227         }
3228     }
3229
3230     pa_pstream_send_tagstruct(c->pstream, reply);
3231 }
3232
3233 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3234     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3235     pa_tagstruct *reply;
3236     pa_sink *def_sink;
3237     pa_source *def_source;
3238     pa_sample_spec fixed_ss;
3239     char *h, *u;
3240
3241     pa_native_connection_assert_ref(c);
3242     pa_assert(t);
3243
3244     if (!pa_tagstruct_eof(t)) {
3245         protocol_error(c);
3246         return;
3247     }
3248
3249     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3250
3251     reply = reply_new(tag);
3252     pa_tagstruct_puts(reply, PACKAGE_NAME);
3253     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3254
3255     u = pa_get_user_name_malloc();
3256     pa_tagstruct_puts(reply, u);
3257     pa_xfree(u);
3258
3259     h = pa_get_host_name_malloc();
3260     pa_tagstruct_puts(reply, h);
3261     pa_xfree(h);
3262
3263     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3264     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3265
3266     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3267     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3268     def_source = pa_namereg_get_default_source(c->protocol->core);
3269     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3270
3271     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3272
3273     if (c->version >= 15)
3274         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3275
3276     pa_pstream_send_tagstruct(c->pstream, reply);
3277 }
3278
3279 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3280     pa_tagstruct *t;
3281     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3282
3283     pa_native_connection_assert_ref(c);
3284
3285     t = pa_tagstruct_new(NULL, 0);
3286     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3287     pa_tagstruct_putu32(t, (uint32_t) -1);
3288     pa_tagstruct_putu32(t, e);
3289     pa_tagstruct_putu32(t, idx);
3290     pa_pstream_send_tagstruct(c->pstream, t);
3291 }
3292
3293 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3294     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3295     pa_subscription_mask_t m;
3296
3297     pa_native_connection_assert_ref(c);
3298     pa_assert(t);
3299
3300     if (pa_tagstruct_getu32(t, &m) < 0 ||
3301         !pa_tagstruct_eof(t)) {
3302         protocol_error(c);
3303         return;
3304     }
3305
3306     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3307     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3308
3309     if (c->subscription)
3310         pa_subscription_free(c->subscription);
3311
3312     if (m != 0) {
3313         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3314         pa_assert(c->subscription);
3315     } else
3316         c->subscription = NULL;
3317
3318     pa_pstream_send_simple_ack(c->pstream, tag);
3319 }
3320
3321 static void command_set_volume(
3322         pa_pdispatch *pd,
3323         uint32_t command,
3324         uint32_t tag,
3325         pa_tagstruct *t,
3326         void *userdata) {
3327
3328     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3329     uint32_t idx;
3330     pa_cvolume volume;
3331     pa_sink *sink = NULL;
3332     pa_source *source = NULL;
3333     pa_sink_input *si = NULL;
3334     const char *name = NULL;
3335     const char *client_name;
3336
3337     pa_native_connection_assert_ref(c);
3338     pa_assert(t);
3339
3340     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3341         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3342         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3343         pa_tagstruct_get_cvolume(t, &volume) ||
3344         !pa_tagstruct_eof(t)) {
3345         protocol_error(c);
3346         return;
3347     }
3348
3349     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3350     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3351     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3352     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3353     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3354     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3355
3356     switch (command) {
3357
3358         case PA_COMMAND_SET_SINK_VOLUME:
3359             if (idx != PA_INVALID_INDEX)
3360                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3361             else
3362                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3363             break;
3364
3365         case PA_COMMAND_SET_SOURCE_VOLUME:
3366             if (idx != PA_INVALID_INDEX)
3367                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3368             else
3369                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3370             break;
3371
3372         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3373             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3374             break;
3375
3376         default:
3377             pa_assert_not_reached();
3378     }
3379
3380     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3381
3382     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3383
3384     if (sink) {
3385         pa_log_debug("Client %s changes volume of sink %s.", client_name, sink->name);
3386         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3387     } else if (source) {
3388         pa_log_debug("Client %s changes volume of sink %s.", client_name, source->name);
3389         pa_source_set_volume(source, &volume, TRUE);
3390     } else if (si) {
3391         pa_log_debug("Client %s changes volume of sink %s.",
3392                      client_name,
3393                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3394         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3395     }
3396
3397     pa_pstream_send_simple_ack(c->pstream, tag);
3398 }
3399
3400 static void command_set_mute(
3401         pa_pdispatch *pd,
3402         uint32_t command,
3403         uint32_t tag,
3404         pa_tagstruct *t,
3405         void *userdata) {
3406
3407     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3408     uint32_t idx;
3409     pa_bool_t mute;
3410     pa_sink *sink = NULL;
3411     pa_source *source = NULL;
3412     pa_sink_input *si = NULL;
3413     const char *name = NULL;
3414
3415     pa_native_connection_assert_ref(c);
3416     pa_assert(t);
3417
3418     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3419         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3420         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3421         pa_tagstruct_get_boolean(t, &mute) ||
3422         !pa_tagstruct_eof(t)) {
3423         protocol_error(c);
3424         return;
3425     }
3426
3427     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3428     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3429     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3430     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3431     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3432
3433     switch (command) {
3434
3435         case PA_COMMAND_SET_SINK_MUTE:
3436
3437             if (idx != PA_INVALID_INDEX)
3438                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3439             else
3440                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3441
3442             break;
3443
3444         case PA_COMMAND_SET_SOURCE_MUTE:
3445             if (idx != PA_INVALID_INDEX)
3446                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3447             else
3448                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3449
3450             break;
3451
3452         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3453             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3454             break;
3455
3456         default:
3457             pa_assert_not_reached();
3458     }
3459
3460     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3461
3462     if (sink)
3463         pa_sink_set_mute(sink, mute, TRUE);
3464     else if (source)
3465         pa_source_set_mute(source, mute, TRUE);
3466     else if (si)
3467         pa_sink_input_set_mute(si, mute, TRUE);
3468
3469     pa_pstream_send_simple_ack(c->pstream, tag);
3470 }
3471
3472 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3473     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3474     uint32_t idx;
3475     pa_bool_t b;
3476     playback_stream *s;
3477
3478     pa_native_connection_assert_ref(c);
3479     pa_assert(t);
3480
3481     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3482         pa_tagstruct_get_boolean(t, &b) < 0 ||
3483         !pa_tagstruct_eof(t)) {
3484         protocol_error(c);
3485         return;
3486     }
3487
3488     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3489     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3490     s = pa_idxset_get_by_index(c->output_streams, idx);
3491     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3492     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3493
3494     pa_sink_input_cork(s->sink_input, b);
3495
3496     if (b)
3497         s->is_underrun = TRUE;
3498
3499     pa_pstream_send_simple_ack(c->pstream, tag);
3500 }
3501
3502 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3503     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3504     uint32_t idx;
3505     playback_stream *s;
3506
3507     pa_native_connection_assert_ref(c);
3508     pa_assert(t);
3509
3510     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3511         !pa_tagstruct_eof(t)) {
3512         protocol_error(c);
3513         return;
3514     }
3515
3516     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3517     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3518     s = pa_idxset_get_by_index(c->output_streams, idx);
3519     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3520     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3521
3522     switch (command) {
3523         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3524             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3525             break;
3526
3527         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3528             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3529             break;
3530
3531         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3532             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3533             break;
3534
3535         default:
3536             pa_assert_not_reached();
3537     }
3538
3539     pa_pstream_send_simple_ack(c->pstream, tag);
3540 }
3541
3542 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3543     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3544     uint32_t idx;
3545     record_stream *s;
3546     pa_bool_t b;
3547
3548     pa_native_connection_assert_ref(c);
3549     pa_assert(t);
3550
3551     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3552         pa_tagstruct_get_boolean(t, &b) < 0 ||
3553         !pa_tagstruct_eof(t)) {
3554         protocol_error(c);
3555         return;
3556     }
3557
3558     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3559     s = pa_idxset_get_by_index(c->record_streams, idx);
3560     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3561
3562     pa_source_output_cork(s->source_output, b);
3563     pa_memblockq_prebuf_force(s->memblockq);
3564     pa_pstream_send_simple_ack(c->pstream, tag);
3565 }
3566
3567 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3568     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3569     uint32_t idx;
3570     record_stream *s;
3571
3572     pa_native_connection_assert_ref(c);
3573     pa_assert(t);
3574
3575     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3576         !pa_tagstruct_eof(t)) {
3577         protocol_error(c);
3578         return;
3579     }
3580
3581     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3582     s = pa_idxset_get_by_index(c->record_streams, idx);
3583     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3584
3585     pa_memblockq_flush_read(s->memblockq);
3586     pa_pstream_send_simple_ack(c->pstream, tag);
3587 }
3588
3589 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3590     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3591     uint32_t idx;
3592     pa_buffer_attr a;
3593     pa_tagstruct *reply;
3594
3595     pa_native_connection_assert_ref(c);
3596     pa_assert(t);
3597
3598     memset(&a, 0, sizeof(a));
3599
3600     if (pa_tagstruct_getu32(t, &idx) < 0) {
3601         protocol_error(c);
3602         return;
3603     }
3604
3605     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3606
3607     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3608         playback_stream *s;
3609         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3610
3611         s = pa_idxset_get_by_index(c->output_streams, idx);
3612         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3613         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3614
3615         if (pa_tagstruct_get(
3616                     t,
3617                     PA_TAG_U32, &a.maxlength,
3618                     PA_TAG_U32, &a.tlength,
3619                     PA_TAG_U32, &a.prebuf,
3620                     PA_TAG_U32, &a.minreq,
3621                     PA_TAG_INVALID) < 0 ||
3622             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3623             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3624             !pa_tagstruct_eof(t)) {
3625             protocol_error(c);
3626             return;
3627         }
3628
3629         s->adjust_latency = adjust_latency;
3630         s->early_requests = early_requests;
3631         s->buffer_attr = a;
3632
3633         fix_playback_buffer_attr(s);
3634         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3635
3636         reply = reply_new(tag);
3637         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3638         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3639         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3640         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3641
3642         if (c->version >= 13)
3643             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3644
3645     } else {
3646         record_stream *s;
3647         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3648         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3649
3650         s = pa_idxset_get_by_index(c->record_streams, idx);
3651         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3652
3653         if (pa_tagstruct_get(
3654                     t,
3655                     PA_TAG_U32, &a.maxlength,
3656                     PA_TAG_U32, &a.fragsize,
3657                     PA_TAG_INVALID) < 0 ||
3658             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3659             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3660             !pa_tagstruct_eof(t)) {
3661             protocol_error(c);
3662             return;
3663         }
3664
3665         s->adjust_latency = adjust_latency;
3666         s->early_requests = early_requests;
3667         s->buffer_attr = a;
3668
3669         fix_record_buffer_attr_pre(s);
3670         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3671         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3672         fix_record_buffer_attr_post(s);
3673
3674         reply = reply_new(tag);
3675         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3676         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3677
3678         if (c->version >= 13)
3679             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3680     }
3681
3682     pa_pstream_send_tagstruct(c->pstream, reply);
3683 }
3684
3685 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3686     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3687     uint32_t idx;
3688     uint32_t rate;
3689
3690     pa_native_connection_assert_ref(c);
3691     pa_assert(t);
3692
3693     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3694         pa_tagstruct_getu32(t, &rate) < 0 ||
3695         !pa_tagstruct_eof(t)) {
3696         protocol_error(c);
3697         return;
3698     }
3699
3700     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3701     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3702
3703     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3704         playback_stream *s;
3705
3706         s = pa_idxset_get_by_index(c->output_streams, idx);
3707         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3708         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3709
3710         pa_sink_input_set_rate(s->sink_input, rate);
3711
3712     } else {
3713         record_stream *s;
3714         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3715
3716         s = pa_idxset_get_by_index(c->record_streams, idx);
3717         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3718
3719         pa_source_output_set_rate(s->source_output, rate);
3720     }
3721
3722     pa_pstream_send_simple_ack(c->pstream, tag);
3723 }
3724
3725 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3726     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3727     uint32_t idx;
3728     uint32_t mode;
3729     pa_proplist *p;
3730
3731     pa_native_connection_assert_ref(c);
3732     pa_assert(t);
3733
3734     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3735
3736     p = pa_proplist_new();
3737
3738     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3739
3740         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3741             pa_tagstruct_get_proplist(t, p) < 0 ||
3742             !pa_tagstruct_eof(t)) {
3743             protocol_error(c);
3744             pa_proplist_free(p);
3745             return;
3746         }
3747
3748     } else {
3749
3750         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3751             pa_tagstruct_getu32(t, &mode) < 0 ||
3752             pa_tagstruct_get_proplist(t, p) < 0 ||
3753             !pa_tagstruct_eof(t)) {
3754             protocol_error(c);
3755             pa_proplist_free(p);
3756             return;
3757         }
3758     }
3759
3760     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3761         pa_proplist_free(p);
3762         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3763     }
3764
3765     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3766         playback_stream *s;
3767
3768         s = pa_idxset_get_by_index(c->output_streams, idx);
3769         if (!s || !playback_stream_isinstance(s)) {
3770             pa_proplist_free(p);
3771             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3772         }
3773         pa_sink_input_update_proplist(s->sink_input, mode, p);
3774
3775     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3776         record_stream *s;
3777
3778         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3779             pa_proplist_free(p);
3780             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3781         }
3782         pa_source_output_update_proplist(s->source_output, mode, p);
3783
3784     } else {
3785         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3786
3787         pa_client_update_proplist(c->client, mode, p);
3788     }
3789
3790     pa_pstream_send_simple_ack(c->pstream, tag);
3791     pa_proplist_free(p);
3792 }
3793
3794 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3795     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3796     uint32_t idx;
3797     unsigned changed = 0;
3798     pa_proplist *p;
3799     pa_strlist *l = NULL;
3800
3801     pa_native_connection_assert_ref(c);
3802     pa_assert(t);
3803
3804     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3805
3806     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3807
3808         if (pa_tagstruct_getu32(t, &idx) < 0) {
3809             protocol_error(c);
3810             return;
3811         }
3812     }
3813
3814     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3815         playback_stream *s;
3816
3817         s = pa_idxset_get_by_index(c->output_streams, idx);
3818         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3819         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3820
3821         p = s->sink_input->proplist;
3822
3823     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3824         record_stream *s;
3825
3826         s = pa_idxset_get_by_index(c->record_streams, idx);
3827         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3828
3829         p = s->source_output->proplist;
3830     } else {
3831         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3832
3833         p = c->client->proplist;
3834     }
3835
3836     for (;;) {
3837         const char *k;
3838
3839         if (pa_tagstruct_gets(t, &k) < 0) {
3840             protocol_error(c);
3841             pa_strlist_free(l);
3842             return;
3843         }
3844
3845         if (!k)
3846             break;
3847
3848         l = pa_strlist_prepend(l, k);
3849     }
3850
3851     if (!pa_tagstruct_eof(t)) {
3852         protocol_error(c);
3853         pa_strlist_free(l);
3854         return;
3855     }
3856
3857     for (;;) {
3858         char *z;
3859
3860         l = pa_strlist_pop(l, &z);
3861
3862         if (!z)
3863             break;
3864
3865         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3866         pa_xfree(z);
3867     }
3868
3869     pa_pstream_send_simple_ack(c->pstream, tag);
3870
3871     if (changed) {
3872         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3873             playback_stream *s;
3874
3875             s = pa_idxset_get_by_index(c->output_streams, idx);
3876             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3877
3878         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3879             record_stream *s;
3880
3881             s = pa_idxset_get_by_index(c->record_streams, idx);
3882             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3883
3884         } else {
3885             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3886             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3887         }
3888     }
3889 }
3890
3891 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3892     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3893     const char *s;
3894
3895     pa_native_connection_assert_ref(c);
3896     pa_assert(t);
3897
3898     if (pa_tagstruct_gets(t, &s) < 0 ||
3899         !pa_tagstruct_eof(t)) {
3900         protocol_error(c);
3901         return;
3902     }
3903
3904     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3905     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3906
3907     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3908         pa_source *source;
3909
3910         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3911         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3912
3913         pa_namereg_set_default_source(c->protocol->core, source);
3914     } else {
3915         pa_sink *sink;
3916         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3917
3918         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3919         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3920
3921         pa_namereg_set_default_sink(c->protocol->core, sink);
3922     }
3923
3924     pa_pstream_send_simple_ack(c->pstream, tag);
3925 }
3926
3927 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3928     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3929     uint32_t idx;
3930     const char *name;
3931
3932     pa_native_connection_assert_ref(c);
3933     pa_assert(t);
3934
3935     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3936         pa_tagstruct_gets(t, &name) < 0 ||
3937         !pa_tagstruct_eof(t)) {
3938         protocol_error(c);
3939         return;
3940     }
3941
3942     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3943     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3944
3945     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3946         playback_stream *s;
3947
3948         s = pa_idxset_get_by_index(c->output_streams, idx);
3949         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3950         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3951
3952         pa_sink_input_set_name(s->sink_input, name);
3953
3954     } else {
3955         record_stream *s;
3956         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3957
3958         s = pa_idxset_get_by_index(c->record_streams, idx);
3959         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3960
3961         pa_source_output_set_name(s->source_output, name);
3962     }
3963
3964     pa_pstream_send_simple_ack(c->pstream, tag);
3965 }
3966
3967 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3968     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3969     uint32_t idx;
3970
3971     pa_native_connection_assert_ref(c);
3972     pa_assert(t);
3973
3974     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3975         !pa_tagstruct_eof(t)) {
3976         protocol_error(c);
3977         return;
3978     }
3979
3980     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3981
3982     if (command == PA_COMMAND_KILL_CLIENT) {
3983         pa_client *client;
3984
3985         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3986         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3987
3988         pa_native_connection_ref(c);
3989         pa_client_kill(client);
3990
3991     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3992         pa_sink_input *s;
3993
3994         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3995         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3996
3997         pa_native_connection_ref(c);
3998         pa_sink_input_kill(s);
3999     } else {
4000         pa_source_output *s;
4001
4002         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4003
4004         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4005         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4006
4007         pa_native_connection_ref(c);
4008         pa_source_output_kill(s);
4009     }
4010
4011     pa_pstream_send_simple_ack(c->pstream, tag);
4012     pa_native_connection_unref(c);
4013 }
4014
4015 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4016     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4017     pa_module *m;
4018     const char *name, *argument;
4019     pa_tagstruct *reply;
4020
4021     pa_native_connection_assert_ref(c);
4022     pa_assert(t);
4023
4024     if (pa_tagstruct_gets(t, &name) < 0 ||
4025         pa_tagstruct_gets(t, &argument) < 0 ||
4026         !pa_tagstruct_eof(t)) {
4027         protocol_error(c);
4028         return;
4029     }
4030
4031     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4032     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4033     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4034
4035     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4036         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4037         return;
4038     }
4039
4040     reply = reply_new(tag);
4041     pa_tagstruct_putu32(reply, m->index);
4042     pa_pstream_send_tagstruct(c->pstream, reply);
4043 }
4044
4045 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4046     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4047     uint32_t idx;
4048     pa_module *m;
4049
4050     pa_native_connection_assert_ref(c);
4051     pa_assert(t);
4052
4053     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4054         !pa_tagstruct_eof(t)) {
4055         protocol_error(c);
4056         return;
4057     }
4058
4059     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4060     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4061     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4062
4063     pa_module_unload_request(m, FALSE);
4064     pa_pstream_send_simple_ack(c->pstream, tag);
4065 }
4066
4067 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4068     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4069     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4070     const char *name_device = NULL;
4071
4072     pa_native_connection_assert_ref(c);
4073     pa_assert(t);
4074
4075     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4076         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4077         pa_tagstruct_gets(t, &name_device) < 0 ||
4078         !pa_tagstruct_eof(t)) {
4079         protocol_error(c);
4080         return;
4081     }
4082
4083     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4084     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4085
4086     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4087     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4088     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4089     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4090
4091     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4092         pa_sink_input *si = NULL;
4093         pa_sink *sink = NULL;
4094
4095         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4096
4097         if (idx_device != PA_INVALID_INDEX)
4098             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4099         else
4100             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4101
4102         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4103
4104         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4105             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4106             return;
4107         }
4108     } else {
4109         pa_source_output *so = NULL;
4110         pa_source *source;
4111
4112         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4113
4114         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4115
4116         if (idx_device != PA_INVALID_INDEX)
4117             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4118         else
4119             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4120
4121         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4122
4123         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4124             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4125             return;
4126         }
4127     }
4128
4129     pa_pstream_send_simple_ack(c->pstream, tag);
4130 }
4131
4132 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4133     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4134     uint32_t idx = PA_INVALID_INDEX;
4135     const char *name = NULL;
4136     pa_bool_t b;
4137
4138     pa_native_connection_assert_ref(c);
4139     pa_assert(t);
4140
4141     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4142         pa_tagstruct_gets(t, &name) < 0 ||
4143         pa_tagstruct_get_boolean(t, &b) < 0 ||
4144         !pa_tagstruct_eof(t)) {
4145         protocol_error(c);
4146         return;
4147     }
4148
4149     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4150     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4151     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4152     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4153     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4154
4155     if (command == PA_COMMAND_SUSPEND_SINK) {
4156
4157         if (idx == PA_INVALID_INDEX && name && !*name) {
4158
4159             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4160
4161             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4162                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4163                 return;
4164             }
4165         } else {
4166             pa_sink *sink = NULL;
4167
4168             if (idx != PA_INVALID_INDEX)
4169                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4170             else
4171                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4172
4173             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4174
4175             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4176                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4177                 return;
4178             }
4179         }
4180     } else {
4181
4182         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4183
4184         if (idx == PA_INVALID_INDEX && name && !*name) {
4185
4186             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4187
4188             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4189                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4190                 return;
4191             }
4192
4193         } else {
4194             pa_source *source;
4195
4196             if (idx != PA_INVALID_INDEX)
4197                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4198             else
4199                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4200
4201             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4202
4203             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4204                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4205                 return;
4206             }
4207         }
4208     }
4209
4210     pa_pstream_send_simple_ack(c->pstream, tag);
4211 }
4212
4213 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4214     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4215     uint32_t idx = PA_INVALID_INDEX;
4216     const char *name = NULL;
4217     pa_module *m;
4218     pa_native_protocol_ext_cb_t cb;
4219
4220     pa_native_connection_assert_ref(c);
4221     pa_assert(t);
4222
4223     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4224         pa_tagstruct_gets(t, &name) < 0) {
4225         protocol_error(c);
4226         return;
4227     }
4228
4229     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4230     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4231     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4232     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4233     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4234
4235     if (idx != PA_INVALID_INDEX)
4236         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4237     else {
4238         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4239             if (strcmp(name, m->name) == 0)
4240                 break;
4241     }
4242
4243     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4244     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4245
4246     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4247     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4248
4249     if (cb(c->protocol, m, c, tag, t) < 0)
4250         protocol_error(c);
4251 }
4252
4253 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4254     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4255     uint32_t idx = PA_INVALID_INDEX;
4256     const char *name = NULL, *profile = NULL;
4257     pa_card *card = NULL;
4258     int ret;
4259
4260     pa_native_connection_assert_ref(c);
4261     pa_assert(t);
4262
4263     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4264         pa_tagstruct_gets(t, &name) < 0 ||
4265         pa_tagstruct_gets(t, &profile) < 0 ||
4266         !pa_tagstruct_eof(t)) {
4267         protocol_error(c);
4268         return;
4269     }
4270
4271     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4272     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4273     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4274     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4275     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4276
4277     if (idx != PA_INVALID_INDEX)
4278         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4279     else
4280         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4281
4282     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4283
4284     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4285         pa_pstream_send_error(c->pstream, tag, -ret);
4286         return;
4287     }
4288
4289     pa_pstream_send_simple_ack(c->pstream, tag);
4290 }
4291
4292 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4293     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4294     uint32_t idx = PA_INVALID_INDEX;
4295     const char *name = NULL, *port = NULL;
4296     int ret;
4297
4298     pa_native_connection_assert_ref(c);
4299     pa_assert(t);
4300
4301     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4302         pa_tagstruct_gets(t, &name) < 0 ||
4303         pa_tagstruct_gets(t, &port) < 0 ||
4304         !pa_tagstruct_eof(t)) {
4305         protocol_error(c);
4306         return;
4307     }
4308
4309     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4310     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4311     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4312     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4313     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4314
4315     if (command == PA_COMMAND_SET_SINK_PORT) {
4316         pa_sink *sink;
4317
4318         if (idx != PA_INVALID_INDEX)
4319             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4320         else
4321             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4322
4323         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4324
4325         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4326             pa_pstream_send_error(c->pstream, tag, -ret);
4327             return;
4328         }
4329     } else {
4330         pa_source *source;
4331
4332         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4333
4334         if (idx != PA_INVALID_INDEX)
4335             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4336         else
4337             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4338
4339         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4340
4341         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4342             pa_pstream_send_error(c->pstream, tag, -ret);
4343             return;
4344         }
4345     }
4346
4347     pa_pstream_send_simple_ack(c->pstream, tag);
4348 }
4349
4350 /*** pstream callbacks ***/
4351
4352 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4353     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4354
4355     pa_assert(p);
4356     pa_assert(packet);
4357     pa_native_connection_assert_ref(c);
4358
4359     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4360         pa_log("invalid packet.");
4361         native_connection_unlink(c);
4362     }
4363 }
4364
4365 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4366     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4367     output_stream *stream;
4368
4369     pa_assert(p);
4370     pa_assert(chunk);
4371     pa_native_connection_assert_ref(c);
4372
4373     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4374         pa_log_debug("Client sent block for invalid stream.");
4375         /* Ignoring */
4376         return;
4377     }
4378
4379 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4380
4381     if (playback_stream_isinstance(stream)) {
4382         playback_stream *ps = PLAYBACK_STREAM(stream);
4383
4384         if (chunk->memblock) {
4385             if (seek != PA_SEEK_RELATIVE || offset != 0)
4386                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4387
4388             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4389         } else
4390             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4391
4392     } else {
4393         upload_stream *u = UPLOAD_STREAM(stream);
4394         size_t l;
4395
4396         if (!u->memchunk.memblock) {
4397             if (u->length == chunk->length && chunk->memblock) {
4398                 u->memchunk = *chunk;
4399                 pa_memblock_ref(u->memchunk.memblock);
4400                 u->length = 0;
4401             } else {
4402                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4403                 u->memchunk.index = u->memchunk.length = 0;
4404             }
4405         }
4406
4407         pa_assert(u->memchunk.memblock);
4408
4409         l = u->length;
4410         if (l > chunk->length)
4411             l = chunk->length;
4412
4413         if (l > 0) {
4414             void *dst;
4415             dst = pa_memblock_acquire(u->memchunk.memblock);
4416
4417             if (chunk->memblock) {
4418                 void *src;
4419                 src = pa_memblock_acquire(chunk->memblock);
4420
4421                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4422                        (uint8_t*) src + chunk->index, l);
4423
4424                 pa_memblock_release(chunk->memblock);
4425             } else
4426                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4427
4428             pa_memblock_release(u->memchunk.memblock);
4429
4430             u->memchunk.length += l;
4431             u->length -= l;
4432         }
4433     }
4434 }
4435
4436 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4437     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4438
4439     pa_assert(p);
4440     pa_native_connection_assert_ref(c);
4441
4442     native_connection_unlink(c);
4443     pa_log_info("Connection died.");
4444 }
4445
4446 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4447     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4448
4449     pa_assert(p);
4450     pa_native_connection_assert_ref(c);
4451
4452     native_connection_send_memblock(c);
4453 }
4454
4455 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4456     pa_thread_mq *q;
4457
4458     if (!(q = pa_thread_mq_get()))
4459         pa_pstream_send_revoke(p, block_id);
4460     else
4461         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4462 }
4463
4464 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4465     pa_thread_mq *q;
4466
4467     if (!(q = pa_thread_mq_get()))
4468         pa_pstream_send_release(p, block_id);
4469     else
4470         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4471 }
4472
4473 /*** client callbacks ***/
4474
4475 static void client_kill_cb(pa_client *c) {
4476     pa_assert(c);
4477
4478     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4479     pa_log_info("Connection killed.");
4480 }
4481
4482 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4483     pa_tagstruct *t;
4484     pa_native_connection *c;
4485
4486     pa_assert(client);
4487     c = PA_NATIVE_CONNECTION(client->userdata);
4488     pa_native_connection_assert_ref(c);
4489
4490     if (c->version < 15)
4491       return;
4492
4493     t = pa_tagstruct_new(NULL, 0);
4494     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4495     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4496     pa_tagstruct_puts(t, event);
4497     pa_tagstruct_put_proplist(t, pl);
4498     pa_pstream_send_tagstruct(c->pstream, t);
4499 }
4500
4501 /*** module entry points ***/
4502
4503 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4504     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4505
4506     pa_assert(m);
4507     pa_native_connection_assert_ref(c);
4508     pa_assert(c->auth_timeout_event == e);
4509
4510     if (!c->authorized) {
4511         native_connection_unlink(c);
4512         pa_log_info("Connection terminated due to authentication timeout.");
4513     }
4514 }
4515
4516 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4517     pa_native_connection *c;
4518     char pname[128];
4519     pa_client *client;
4520     pa_client_new_data data;
4521
4522     pa_assert(p);
4523     pa_assert(io);
4524     pa_assert(o);
4525
4526     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4527         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4528         pa_iochannel_free(io);
4529         return;
4530     }
4531
4532     pa_client_new_data_init(&data);
4533     data.module = o->module;
4534     data.driver = __FILE__;
4535     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4536     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4537     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4538     client = pa_client_new(p->core, &data);
4539     pa_client_new_data_done(&data);
4540
4541     if (!client)
4542         return;
4543
4544     c = pa_msgobject_new(pa_native_connection);
4545     c->parent.parent.free = native_connection_free;
4546     c->parent.process_msg = native_connection_process_msg;
4547     c->protocol = p;
4548     c->options = pa_native_options_ref(o);
4549     c->authorized = FALSE;
4550
4551     if (o->auth_anonymous) {
4552         pa_log_info("Client authenticated anonymously.");
4553         c->authorized = TRUE;
4554     }
4555
4556     if (!c->authorized &&
4557         o->auth_ip_acl &&
4558         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4559
4560         pa_log_info("Client authenticated by IP ACL.");
4561         c->authorized = TRUE;
4562     }
4563
4564     if (!c->authorized)
4565         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4566     else
4567         c->auth_timeout_event = NULL;
4568
4569     c->is_local = pa_iochannel_socket_is_local(io);
4570     c->version = 8;
4571
4572     c->client = client;
4573     c->client->kill = client_kill_cb;
4574     c->client->send_event = client_send_event_cb;
4575     c->client->userdata = c;
4576
4577     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4578     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4579     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4580     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4581     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4582     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4583     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4584
4585     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4586
4587     c->record_streams = pa_idxset_new(NULL, NULL);
4588     c->output_streams = pa_idxset_new(NULL, NULL);
4589
4590     c->rrobin_index = PA_IDXSET_INVALID;
4591     c->subscription = NULL;
4592
4593     pa_idxset_put(p->connections, c, NULL);
4594
4595 #ifdef HAVE_CREDS
4596     if (pa_iochannel_creds_supported(io))
4597         pa_iochannel_creds_enable(io);
4598 #endif
4599
4600     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4601 }
4602
4603 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4604     pa_native_connection *c;
4605     void *state = NULL;
4606
4607     pa_assert(p);
4608     pa_assert(m);
4609
4610     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4611         if (c->options->module == m)
4612             native_connection_unlink(c);
4613 }
4614
4615 static pa_native_protocol* native_protocol_new(pa_core *c) {
4616     pa_native_protocol *p;
4617     pa_native_hook_t h;
4618
4619     pa_assert(c);
4620
4621     p = pa_xnew(pa_native_protocol, 1);
4622     PA_REFCNT_INIT(p);
4623     p->core = c;
4624     p->connections = pa_idxset_new(NULL, NULL);
4625
4626     p->servers = NULL;
4627
4628     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4629
4630     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4631         pa_hook_init(&p->hooks[h], p);
4632
4633     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4634
4635     return p;
4636 }
4637
4638 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4639     pa_native_protocol *p;
4640
4641     if ((p = pa_shared_get(c, "native-protocol")))
4642         return pa_native_protocol_ref(p);
4643
4644     return native_protocol_new(c);
4645 }
4646
4647 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4648     pa_assert(p);
4649     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4650
4651     PA_REFCNT_INC(p);
4652
4653     return p;
4654 }
4655
4656 void pa_native_protocol_unref(pa_native_protocol *p) {
4657     pa_native_connection *c;
4658     pa_native_hook_t h;
4659
4660     pa_assert(p);
4661     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4662
4663     if (PA_REFCNT_DEC(p) > 0)
4664         return;
4665
4666     while ((c = pa_idxset_first(p->connections, NULL)))
4667         native_connection_unlink(c);
4668
4669     pa_idxset_free(p->connections, NULL, NULL);
4670
4671     pa_strlist_free(p->servers);
4672
4673     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4674         pa_hook_done(&p->hooks[h]);
4675
4676     pa_hashmap_free(p->extensions, NULL, NULL);
4677
4678     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4679
4680     pa_xfree(p);
4681 }
4682
4683 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4684     pa_assert(p);
4685     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4686     pa_assert(name);
4687
4688     p->servers = pa_strlist_prepend(p->servers, name);
4689
4690     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4691 }
4692
4693 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4694     pa_assert(p);
4695     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4696     pa_assert(name);
4697
4698     p->servers = pa_strlist_remove(p->servers, name);
4699
4700     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4701 }
4702
4703 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4704     pa_assert(p);
4705     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4706
4707     return p->hooks;
4708 }
4709
4710 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4711     pa_assert(p);
4712     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4713
4714     return p->servers;
4715 }
4716
4717 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4718     pa_assert(p);
4719     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4720     pa_assert(m);
4721     pa_assert(cb);
4722     pa_assert(!pa_hashmap_get(p->extensions, m));
4723
4724     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4725     return 0;
4726 }
4727
4728 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4729     pa_assert(p);
4730     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4731     pa_assert(m);
4732
4733     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4734 }
4735
4736 pa_native_options* pa_native_options_new(void) {
4737     pa_native_options *o;
4738
4739     o = pa_xnew0(pa_native_options, 1);
4740     PA_REFCNT_INIT(o);
4741
4742     return o;
4743 }
4744
4745 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4746     pa_assert(o);
4747     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4748
4749     PA_REFCNT_INC(o);
4750
4751     return o;
4752 }
4753
4754 void pa_native_options_unref(pa_native_options *o) {
4755     pa_assert(o);
4756     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4757
4758     if (PA_REFCNT_DEC(o) > 0)
4759         return;
4760
4761     pa_xfree(o->auth_group);
4762
4763     if (o->auth_ip_acl)
4764         pa_ip_acl_free(o->auth_ip_acl);
4765
4766     if (o->auth_cookie)
4767         pa_auth_cookie_unref(o->auth_cookie);
4768
4769     pa_xfree(o);
4770 }
4771
4772 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4773     pa_bool_t enabled;
4774     const char *acl;
4775
4776     pa_assert(o);
4777     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4778     pa_assert(ma);
4779
4780     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4781         pa_log("auth-anonymous= expects a boolean argument.");
4782         return -1;
4783     }
4784
4785     enabled = TRUE;
4786     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4787         pa_log("auth-group-enabled= expects a boolean argument.");
4788         return -1;
4789     }
4790
4791     pa_xfree(o->auth_group);
4792     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4793
4794 #ifndef HAVE_CREDS
4795     if (o->auth_group)
4796         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4797 #endif
4798
4799     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4800         pa_ip_acl *ipa;
4801
4802         if (!(ipa = pa_ip_acl_new(acl))) {
4803             pa_log("Failed to parse IP ACL '%s'", acl);
4804             return -1;
4805         }
4806
4807         if (o->auth_ip_acl)
4808             pa_ip_acl_free(o->auth_ip_acl);
4809
4810         o->auth_ip_acl = ipa;
4811     }
4812
4813     enabled = TRUE;
4814     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4815         pa_log("auth-cookie-enabled= expects a boolean argument.");
4816         return -1;
4817     }
4818
4819     if (o->auth_cookie)
4820         pa_auth_cookie_unref(o->auth_cookie);
4821
4822     if (enabled) {
4823         const char *cn;
4824
4825         /* The new name for this is 'auth-cookie', for compat reasons
4826          * we check the old name too */
4827         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4828             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4829                 cn = PA_NATIVE_COOKIE_FILE;
4830
4831         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4832             return -1;
4833
4834     } else
4835           o->auth_cookie = NULL;
4836
4837     return 0;
4838 }
4839
4840 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4841     pa_native_connection_assert_ref(c);
4842
4843     return c->pstream;
4844 }