core: Fix return of pa_cpu_init_arm()
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38 #include <pulse/internal.h>
39
40 #include <pulsecore/native-common.h>
41 #include <pulsecore/packet.h>
42 #include <pulsecore/client.h>
43 #include <pulsecore/source-output.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pstream.h>
46 #include <pulsecore/tagstruct.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* #define PROTOCOL_NATIVE_DEBUG */
64
65 /* Kick a client if it doesn't authenticate within this time */
66 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
67
68 /* Don't accept more connection than this */
69 #define MAX_CONNECTIONS 64
70
71 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
72 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
73 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
74 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
75
76 struct pa_native_protocol;
77
78 typedef struct record_stream {
79     pa_msgobject parent;
80
81     pa_native_connection *connection;
82     uint32_t index;
83
84     pa_source_output *source_output;
85     pa_memblockq *memblockq;
86
87     pa_bool_t adjust_latency:1;
88     pa_bool_t early_requests:1;
89
90     /* Requested buffer attributes */
91     pa_buffer_attr buffer_attr_req;
92     /* Fixed-up and adjusted buffer attributes */
93     pa_buffer_attr buffer_attr;
94
95     pa_atomic_t on_the_fly;
96     pa_usec_t configured_source_latency;
97     size_t drop_initial;
98
99     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
100     size_t on_the_fly_snapshot;
101     pa_usec_t current_monitor_latency;
102     pa_usec_t current_source_latency;
103 } record_stream;
104
105 #define RECORD_STREAM(o) (record_stream_cast(o))
106 PA_DEFINE_PRIVATE_CLASS(record_stream, pa_msgobject);
107
108 typedef struct output_stream {
109     pa_msgobject parent;
110 } output_stream;
111
112 #define OUTPUT_STREAM(o) (output_stream_cast(o))
113 PA_DEFINE_PRIVATE_CLASS(output_stream, pa_msgobject);
114
115 typedef struct playback_stream {
116     output_stream parent;
117
118     pa_native_connection *connection;
119     uint32_t index;
120
121     pa_sink_input *sink_input;
122     pa_memblockq *memblockq;
123
124     pa_bool_t adjust_latency:1;
125     pa_bool_t early_requests:1;
126
127     pa_bool_t is_underrun:1;
128     pa_bool_t drain_request:1;
129     uint32_t drain_tag;
130     uint32_t syncid;
131
132     /* Optimization to avoid too many rewinds with a lot of small blocks */
133     pa_atomic_t seek_or_post_in_queue;
134     int64_t seek_windex;
135
136     pa_atomic_t missing;
137     pa_usec_t configured_sink_latency;
138     /* Requested buffer attributes */
139     pa_buffer_attr buffer_attr_req;
140     /* Fixed-up and adjusted buffer attributes */
141     pa_buffer_attr buffer_attr;
142
143     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
144     int64_t read_index, write_index;
145     size_t render_memblockq_length;
146     pa_usec_t current_sink_latency;
147     uint64_t playing_for, underrun_for;
148 } playback_stream;
149
150 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
151 PA_DEFINE_PRIVATE_CLASS(playback_stream, output_stream);
152
153 typedef struct upload_stream {
154     output_stream parent;
155
156     pa_native_connection *connection;
157     uint32_t index;
158
159     pa_memchunk memchunk;
160     size_t length;
161     char *name;
162     pa_sample_spec sample_spec;
163     pa_channel_map channel_map;
164     pa_proplist *proplist;
165 } upload_stream;
166
167 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
168 PA_DEFINE_PRIVATE_CLASS(upload_stream, output_stream);
169
170 struct pa_native_connection {
171     pa_msgobject parent;
172     pa_native_protocol *protocol;
173     pa_native_options *options;
174     pa_bool_t authorized:1;
175     pa_bool_t is_local:1;
176     uint32_t version;
177     pa_client *client;
178     pa_pstream *pstream;
179     pa_pdispatch *pdispatch;
180     pa_idxset *record_streams, *output_streams;
181     uint32_t rrobin_index;
182     pa_subscription *subscription;
183     pa_time_event *auth_timeout_event;
184 };
185
186 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
187 PA_DEFINE_PRIVATE_CLASS(pa_native_connection, pa_msgobject);
188
189 struct pa_native_protocol {
190     PA_REFCNT_DECLARE;
191
192     pa_core *core;
193     pa_idxset *connections;
194
195     pa_strlist *servers;
196     pa_hook hooks[PA_NATIVE_HOOK_MAX];
197
198     pa_hashmap *extensions;
199 };
200
201 enum {
202     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
203 };
204
205 enum {
206     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
207     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
208     SINK_INPUT_MESSAGE_FLUSH,
209     SINK_INPUT_MESSAGE_TRIGGER,
210     SINK_INPUT_MESSAGE_SEEK,
211     SINK_INPUT_MESSAGE_PREBUF_FORCE,
212     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
213     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
214 };
215
216 enum {
217     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
218     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
219     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
220     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
221     PLAYBACK_STREAM_MESSAGE_STARTED,
222     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
223 };
224
225 enum {
226     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
227 };
228
229 enum {
230     CONNECTION_MESSAGE_RELEASE,
231     CONNECTION_MESSAGE_REVOKE
232 };
233
234 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
235 static void sink_input_kill_cb(pa_sink_input *i);
236 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
237 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
238 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
239 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
240 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
241 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
242
243 static void native_connection_send_memblock(pa_native_connection *c);
244 static void playback_stream_request_bytes(struct playback_stream*s);
245
246 static void source_output_kill_cb(pa_source_output *o);
247 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
248 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
249 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
250 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
251 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
252
253 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
254 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
255
256 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
289 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
290 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
291 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
292 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
293 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
294 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
295
296 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
297     [PA_COMMAND_ERROR] = NULL,
298     [PA_COMMAND_TIMEOUT] = NULL,
299     [PA_COMMAND_REPLY] = NULL,
300     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
301     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
302     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
303     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
304     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
305     [PA_COMMAND_AUTH] = command_auth,
306     [PA_COMMAND_REQUEST] = NULL,
307     [PA_COMMAND_EXIT] = command_exit,
308     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
309     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
310     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
311     [PA_COMMAND_STAT] = command_stat,
312     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
313     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
314     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
315     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
316     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
317     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
318     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
319     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
320     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
321     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
322     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
323     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
324     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
325     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
326     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
327     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
328     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
329     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
330     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
331     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
332     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
333     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
334     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
335     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
336     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
337
338     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
339     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
340     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
341     [PA_COMMAND_SET_SOURCE_OUTPUT_VOLUME] = command_set_volume,
342
343     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
344     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
345     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
346     [PA_COMMAND_SET_SOURCE_OUTPUT_MUTE] = command_set_mute,
347
348     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
349     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
350
351     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
352     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
353     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
354     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
355
356     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
357     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
358
359     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
360     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
361     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
362     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
363     [PA_COMMAND_KILL_CLIENT] = command_kill,
364     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
365     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
366     [PA_COMMAND_LOAD_MODULE] = command_load_module,
367     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
368
369     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
370     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
371     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
372     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
373
374     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
375     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
376
377     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
378     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
379
380     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
381     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
382
383     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
384     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
385     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
386
387     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
388     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
389     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
390
391     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
392
393     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
394     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
395
396     [PA_COMMAND_EXTENSION] = command_extension
397 };
398
399 /* structure management */
400
401 /* Called from main context */
402 static void upload_stream_unlink(upload_stream *s) {
403     pa_assert(s);
404
405     if (!s->connection)
406         return;
407
408     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
409     s->connection = NULL;
410     upload_stream_unref(s);
411 }
412
413 /* Called from main context */
414 static void upload_stream_free(pa_object *o) {
415     upload_stream *s = UPLOAD_STREAM(o);
416     pa_assert(s);
417
418     upload_stream_unlink(s);
419
420     pa_xfree(s->name);
421
422     if (s->proplist)
423         pa_proplist_free(s->proplist);
424
425     if (s->memchunk.memblock)
426         pa_memblock_unref(s->memchunk.memblock);
427
428     pa_xfree(s);
429 }
430
431 /* Called from main context */
432 static upload_stream* upload_stream_new(
433         pa_native_connection *c,
434         const pa_sample_spec *ss,
435         const pa_channel_map *map,
436         const char *name,
437         size_t length,
438         pa_proplist *p) {
439
440     upload_stream *s;
441
442     pa_assert(c);
443     pa_assert(ss);
444     pa_assert(name);
445     pa_assert(length > 0);
446     pa_assert(p);
447
448     s = pa_msgobject_new(upload_stream);
449     s->parent.parent.parent.free = upload_stream_free;
450     s->connection = c;
451     s->sample_spec = *ss;
452     s->channel_map = *map;
453     s->name = pa_xstrdup(name);
454     pa_memchunk_reset(&s->memchunk);
455     s->length = length;
456     s->proplist = pa_proplist_copy(p);
457     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
458
459     pa_idxset_put(c->output_streams, s, &s->index);
460
461     return s;
462 }
463
464 /* Called from main context */
465 static void record_stream_unlink(record_stream *s) {
466     pa_assert(s);
467
468     if (!s->connection)
469         return;
470
471     if (s->source_output) {
472         pa_source_output_unlink(s->source_output);
473         pa_source_output_unref(s->source_output);
474         s->source_output = NULL;
475     }
476
477     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
478     s->connection = NULL;
479     record_stream_unref(s);
480 }
481
482 /* Called from main context */
483 static void record_stream_free(pa_object *o) {
484     record_stream *s = RECORD_STREAM(o);
485     pa_assert(s);
486
487     record_stream_unlink(s);
488
489     pa_memblockq_free(s->memblockq);
490     pa_xfree(s);
491 }
492
493 /* Called from main context */
494 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
495     record_stream *s = RECORD_STREAM(o);
496     record_stream_assert_ref(s);
497
498     if (!s->connection)
499         return -1;
500
501     switch (code) {
502
503         case RECORD_STREAM_MESSAGE_POST_DATA:
504
505             /* We try to keep up to date with how many bytes are
506              * currently on the fly */
507             pa_atomic_sub(&s->on_the_fly, chunk->length);
508
509             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
510 /*                 pa_log_warn("Failed to push data into output queue."); */
511                 return -1;
512             }
513
514             if (!pa_pstream_is_pending(s->connection->pstream))
515                 native_connection_send_memblock(s->connection);
516
517             break;
518     }
519
520     return 0;
521 }
522
523 /* Called from main context */
524 static void fix_record_buffer_attr_pre(record_stream *s) {
525
526     size_t frame_size;
527     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
528
529     pa_assert(s);
530
531     /* This function will be called from the main thread, before as
532      * well as after the source output has been activated using
533      * pa_source_output_put()! That means it may not touch any
534      * ->thread_info data! */
535
536     frame_size = pa_frame_size(&s->source_output->sample_spec);
537     s->buffer_attr = s->buffer_attr_req;
538
539     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
540         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
541     if (s->buffer_attr.maxlength <= 0)
542         s->buffer_attr.maxlength = (uint32_t) frame_size;
543
544     if (s->buffer_attr.fragsize == (uint32_t) -1)
545         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
546     if (s->buffer_attr.fragsize <= 0)
547         s->buffer_attr.fragsize = (uint32_t) frame_size;
548
549     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
550
551     if (s->early_requests) {
552
553         /* In early request mode we need to emulate the classic
554          * fragment-based playback model. We do this setting the source
555          * latency to the fragment size. */
556
557         source_usec = fragsize_usec;
558
559     } else if (s->adjust_latency) {
560
561         /* So, the user asked us to adjust the latency according to
562          * what the source can provide. Half the latency will be
563          * spent on the hw buffer, half of it in the async buffer
564          * queue we maintain for each client. */
565
566         source_usec = fragsize_usec/2;
567
568     } else {
569
570         /* Ok, the user didn't ask us to adjust the latency, hence we
571          * don't */
572
573         source_usec = (pa_usec_t) -1;
574     }
575
576     if (source_usec != (pa_usec_t) -1)
577         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
578     else
579         s->configured_source_latency = 0;
580
581     if (s->early_requests) {
582
583         /* Ok, we didn't necessarily get what we were asking for, so
584          * let's tell the user */
585
586         fragsize_usec = s->configured_source_latency;
587
588     } else if (s->adjust_latency) {
589
590         /* Now subtract what we actually got */
591
592         if (fragsize_usec >= s->configured_source_latency*2)
593             fragsize_usec -= s->configured_source_latency;
594         else
595             fragsize_usec = s->configured_source_latency;
596     }
597
598     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
599         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
600
601         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
602
603     if (s->buffer_attr.fragsize <= 0)
604         s->buffer_attr.fragsize = (uint32_t) frame_size;
605 }
606
607 /* Called from main context */
608 static void fix_record_buffer_attr_post(record_stream *s) {
609     size_t base;
610
611     pa_assert(s);
612
613     /* This function will be called from the main thread, before as
614      * well as after the source output has been activated using
615      * pa_source_output_put()! That means it may not touch and
616      * ->thread_info data! */
617
618     base = pa_frame_size(&s->source_output->sample_spec);
619
620     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
621     if (s->buffer_attr.fragsize <= 0)
622         s->buffer_attr.fragsize = base;
623
624     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
625         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
626 }
627
628 /* Called from main context */
629 static record_stream* record_stream_new(
630         pa_native_connection *c,
631         pa_source *source,
632         pa_sample_spec *ss,
633         pa_channel_map *map,
634         pa_idxset *formats,
635         pa_buffer_attr *attr,
636         pa_cvolume *volume,
637         pa_bool_t muted,
638         pa_bool_t muted_set,
639         pa_source_output_flags_t flags,
640         pa_proplist *p,
641         pa_bool_t adjust_latency,
642         pa_bool_t early_requests,
643         pa_bool_t relative_volume,
644         pa_bool_t peak_detect,
645         pa_sink_input *direct_on_input,
646         int *ret) {
647
648     record_stream *s;
649     pa_source_output *source_output = NULL;
650     pa_source_output_new_data data;
651     char *memblockq_name;
652
653     pa_assert(c);
654     pa_assert(ss);
655     pa_assert(p);
656     pa_assert(ret);
657
658     pa_source_output_new_data_init(&data);
659
660     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
661     data.driver = __FILE__;
662     data.module = c->options->module;
663     data.client = c->client;
664     if (source)
665         pa_source_output_new_data_set_source(&data, source, TRUE);
666     if (pa_sample_spec_valid(ss))
667         pa_source_output_new_data_set_sample_spec(&data, ss);
668     if (pa_channel_map_valid(map))
669         pa_source_output_new_data_set_channel_map(&data, map);
670     if (formats)
671         pa_source_output_new_data_set_formats(&data, formats);
672     data.direct_on_input = direct_on_input;
673     if (volume) {
674         pa_source_output_new_data_set_volume(&data, volume);
675         data.volume_is_absolute = !relative_volume;
676         data.save_volume = TRUE;
677     }
678     if (muted_set) {
679         pa_source_output_new_data_set_muted(&data, muted);
680         data.save_muted = TRUE;
681     }
682     if (peak_detect)
683         data.resample_method = PA_RESAMPLER_PEAKS;
684     data.flags = flags;
685
686     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data);
687
688     pa_source_output_new_data_done(&data);
689
690     if (!source_output)
691         return NULL;
692
693     s = pa_msgobject_new(record_stream);
694     s->parent.parent.free = record_stream_free;
695     s->parent.process_msg = record_stream_process_msg;
696     s->connection = c;
697     s->source_output = source_output;
698     s->buffer_attr_req = *attr;
699     s->adjust_latency = adjust_latency;
700     s->early_requests = early_requests;
701     pa_atomic_store(&s->on_the_fly, 0);
702
703     s->source_output->parent.process_msg = source_output_process_msg;
704     s->source_output->push = source_output_push_cb;
705     s->source_output->kill = source_output_kill_cb;
706     s->source_output->get_latency = source_output_get_latency_cb;
707     s->source_output->moving = source_output_moving_cb;
708     s->source_output->suspend = source_output_suspend_cb;
709     s->source_output->send_event = source_output_send_event_cb;
710     s->source_output->userdata = s;
711
712     fix_record_buffer_attr_pre(s);
713
714     memblockq_name = pa_sprintf_malloc("native protocol record stream memblockq [%u]", s->source_output->index);
715     s->memblockq = pa_memblockq_new(
716             memblockq_name,
717             0,
718             s->buffer_attr.maxlength,
719             0,
720             &source_output->sample_spec,
721             1,
722             0,
723             0,
724             NULL);
725     pa_xfree(memblockq_name);
726
727     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
728     fix_record_buffer_attr_post(s);
729
730     *ss = s->source_output->sample_spec;
731     *map = s->source_output->channel_map;
732
733     pa_idxset_put(c->record_streams, s, &s->index);
734
735     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
736                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
737                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
738                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
739
740     pa_source_output_put(s->source_output);
741     return s;
742 }
743
744 /* Called from main context */
745 static void record_stream_send_killed(record_stream *r) {
746     pa_tagstruct *t;
747     record_stream_assert_ref(r);
748
749     t = pa_tagstruct_new(NULL, 0);
750     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
751     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
752     pa_tagstruct_putu32(t, r->index);
753     pa_pstream_send_tagstruct(r->connection->pstream, t);
754 }
755
756 /* Called from main context */
757 static void playback_stream_unlink(playback_stream *s) {
758     pa_assert(s);
759
760     if (!s->connection)
761         return;
762
763     if (s->sink_input) {
764         pa_sink_input_unlink(s->sink_input);
765         pa_sink_input_unref(s->sink_input);
766         s->sink_input = NULL;
767     }
768
769     if (s->drain_request)
770         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
771
772     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
773     s->connection = NULL;
774     playback_stream_unref(s);
775 }
776
777 /* Called from main context */
778 static void playback_stream_free(pa_object* o) {
779     playback_stream *s = PLAYBACK_STREAM(o);
780     pa_assert(s);
781
782     playback_stream_unlink(s);
783
784     pa_memblockq_free(s->memblockq);
785     pa_xfree(s);
786 }
787
788 /* Called from main context */
789 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
790     playback_stream *s = PLAYBACK_STREAM(o);
791     playback_stream_assert_ref(s);
792
793     if (!s->connection)
794         return -1;
795
796     switch (code) {
797
798         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
799             pa_tagstruct *t;
800             int l = 0;
801
802             for (;;) {
803                 if ((l = pa_atomic_load(&s->missing)) <= 0)
804                     return 0;
805
806                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
807                     break;
808             }
809
810             t = pa_tagstruct_new(NULL, 0);
811             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
812             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
813             pa_tagstruct_putu32(t, s->index);
814             pa_tagstruct_putu32(t, (uint32_t) l);
815             pa_pstream_send_tagstruct(s->connection->pstream, t);
816
817 #ifdef PROTOCOL_NATIVE_DEBUG
818             pa_log("Requesting %lu bytes", (unsigned long) l);
819 #endif
820             break;
821         }
822
823         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
824             pa_tagstruct *t;
825
826 #ifdef PROTOCOL_NATIVE_DEBUG
827             pa_log("signalling underflow");
828 #endif
829
830             /* Report that we're empty */
831             t = pa_tagstruct_new(NULL, 0);
832             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
833             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
834             pa_tagstruct_putu32(t, s->index);
835             if (s->connection->version >= 23)
836                 pa_tagstruct_puts64(t, offset);
837             pa_pstream_send_tagstruct(s->connection->pstream, t);
838             break;
839         }
840
841         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
842             pa_tagstruct *t;
843
844             /* Notify the user we're overflowed*/
845             t = pa_tagstruct_new(NULL, 0);
846             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
847             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
848             pa_tagstruct_putu32(t, s->index);
849             pa_pstream_send_tagstruct(s->connection->pstream, t);
850             break;
851         }
852
853         case PLAYBACK_STREAM_MESSAGE_STARTED:
854
855             if (s->connection->version >= 13) {
856                 pa_tagstruct *t;
857
858                 /* Notify the user we started playback */
859                 t = pa_tagstruct_new(NULL, 0);
860                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
861                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
862                 pa_tagstruct_putu32(t, s->index);
863                 pa_pstream_send_tagstruct(s->connection->pstream, t);
864             }
865
866             break;
867
868         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
869             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
870             break;
871
872         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH:
873
874             s->buffer_attr.tlength = (uint32_t) offset;
875
876             if (s->connection->version >= 15) {
877                 pa_tagstruct *t;
878
879                 t = pa_tagstruct_new(NULL, 0);
880                 pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
881                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
882                 pa_tagstruct_putu32(t, s->index);
883                 pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
884                 pa_tagstruct_putu32(t, s->buffer_attr.tlength);
885                 pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
886                 pa_tagstruct_putu32(t, s->buffer_attr.minreq);
887                 pa_tagstruct_put_usec(t, s->configured_sink_latency);
888                 pa_pstream_send_tagstruct(s->connection->pstream, t);
889             }
890
891             break;
892     }
893
894     return 0;
895 }
896
897 /* Called from main context */
898 static void fix_playback_buffer_attr(playback_stream *s) {
899     size_t frame_size, max_prebuf;
900     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
901
902     pa_assert(s);
903
904 #ifdef PROTOCOL_NATIVE_DEBUG
905     pa_log("Client requested: maxlength=%li bytes tlength=%li bytes minreq=%li bytes prebuf=%li bytes",
906            (long) s->buffer_attr.maxlength,
907            (long) s->buffer_attr.tlength,
908            (long) s->buffer_attr.minreq,
909            (long) s->buffer_attr.prebuf);
910
911     pa_log("Client requested: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms",
912            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
913            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
914            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
915            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC));
916 #endif
917
918     /* This function will be called from the main thread, before as
919      * well as after the sink input has been activated using
920      * pa_sink_input_put()! That means it may not touch any
921      * ->thread_info data, such as the memblockq! */
922
923     frame_size = pa_frame_size(&s->sink_input->sample_spec);
924     s->buffer_attr = s->buffer_attr_req;
925
926     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
927         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
928     if (s->buffer_attr.maxlength <= 0)
929         s->buffer_attr.maxlength = (uint32_t) frame_size;
930
931     if (s->buffer_attr.tlength == (uint32_t) -1)
932         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
933     if (s->buffer_attr.tlength <= 0)
934         s->buffer_attr.tlength = (uint32_t) frame_size;
935
936     if (s->buffer_attr.minreq == (uint32_t) -1)
937         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
938     if (s->buffer_attr.minreq <= 0)
939         s->buffer_attr.minreq = (uint32_t) frame_size;
940
941     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
942         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
943
944     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
945     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
946
947     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
948                 (double) tlength_usec / PA_USEC_PER_MSEC,
949                 (double) minreq_usec / PA_USEC_PER_MSEC);
950
951     if (s->early_requests) {
952
953         /* In early request mode we need to emulate the classic
954          * fragment-based playback model. We do this setting the sink
955          * latency to the fragment size. */
956
957         sink_usec = minreq_usec;
958         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
959
960     } else if (s->adjust_latency) {
961
962         /* So, the user asked us to adjust the latency of the stream
963          * buffer according to the what the sink can provide. The
964          * tlength passed in shall be the overall latency. Roughly
965          * half the latency will be spent on the hw buffer, the other
966          * half of it in the async buffer queue we maintain for each
967          * client. In between we'll have a safety space of size
968          * 2*minreq. Why the 2*minreq? When the hw buffer is completely
969          * empty and needs to be filled, then our buffer must have
970          * enough data to fulfill this request immediately and thus
971          * have at least the same tlength as the size of the hw
972          * buffer. It additionally needs space for 2 times minreq
973          * because if the buffer ran empty and a partial fillup
974          * happens immediately on the next iteration we need to be
975          * able to fulfill it and give the application also minreq
976          * time to fill it up again for the next request Makes 2 times
977          * minreq in plus.. */
978
979         if (tlength_usec > minreq_usec*2)
980             sink_usec = (tlength_usec - minreq_usec*2)/2;
981         else
982             sink_usec = 0;
983
984         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
985
986     } else {
987
988         /* Ok, the user didn't ask us to adjust the latency, but we
989          * still need to make sure that the parameters from the user
990          * do make sense. */
991
992         if (tlength_usec > minreq_usec*2)
993             sink_usec = (tlength_usec - minreq_usec*2);
994         else
995             sink_usec = 0;
996
997         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
998     }
999
1000     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
1001
1002     if (s->early_requests) {
1003
1004         /* Ok, we didn't necessarily get what we were asking for, so
1005          * let's tell the user */
1006
1007         minreq_usec = s->configured_sink_latency;
1008
1009     } else if (s->adjust_latency) {
1010
1011         /* Ok, we didn't necessarily get what we were asking for, so
1012          * let's subtract from what we asked for for the remaining
1013          * buffer space */
1014
1015         if (tlength_usec >= s->configured_sink_latency)
1016             tlength_usec -= s->configured_sink_latency;
1017     }
1018
1019     pa_log_debug("Requested latency=%0.2f ms, Received latency=%0.2f ms",
1020                  (double) sink_usec / PA_USEC_PER_MSEC,
1021                  (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1022
1023     /* FIXME: This is actually larger than necessary, since not all of
1024      * the sink latency is actually rewritable. */
1025     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
1026         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
1027
1028     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
1029         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
1030         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
1031
1032     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
1033         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
1034         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
1035
1036     if (s->buffer_attr.minreq <= 0) {
1037         s->buffer_attr.minreq = (uint32_t) frame_size;
1038         s->buffer_attr.tlength += (uint32_t) frame_size*2;
1039     }
1040
1041     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
1042         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
1043
1044     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
1045
1046     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
1047         s->buffer_attr.prebuf > max_prebuf)
1048         s->buffer_attr.prebuf = max_prebuf;
1049
1050 #ifdef PROTOCOL_NATIVE_DEBUG
1051     pa_log("Client accepted: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms",
1052            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
1053            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
1054            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC),
1055            (unsigned long) (pa_bytes_to_usec(s->buffer_attr.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC));
1056 #endif
1057 }
1058
1059 /* Called from main context */
1060 static playback_stream* playback_stream_new(
1061         pa_native_connection *c,
1062         pa_sink *sink,
1063         pa_sample_spec *ss,
1064         pa_channel_map *map,
1065         pa_idxset *formats,
1066         pa_buffer_attr *a,
1067         pa_cvolume *volume,
1068         pa_bool_t muted,
1069         pa_bool_t muted_set,
1070         pa_sink_input_flags_t flags,
1071         pa_proplist *p,
1072         pa_bool_t adjust_latency,
1073         pa_bool_t early_requests,
1074         pa_bool_t relative_volume,
1075         uint32_t syncid,
1076         uint32_t *missing,
1077         int *ret) {
1078
1079     /* Note: This function takes ownership of the 'formats' param, so we need
1080      * to take extra care to not leak it */
1081
1082     playback_stream *ssync;
1083     playback_stream *s = NULL;
1084     pa_sink_input *sink_input = NULL;
1085     pa_memchunk silence;
1086     uint32_t idx;
1087     int64_t start_index;
1088     pa_sink_input_new_data data;
1089     char *memblockq_name;
1090
1091     pa_assert(c);
1092     pa_assert(ss);
1093     pa_assert(missing);
1094     pa_assert(p);
1095     pa_assert(ret);
1096
1097     /* Find syncid group */
1098     PA_IDXSET_FOREACH(ssync, c->output_streams, idx) {
1099
1100         if (!playback_stream_isinstance(ssync))
1101             continue;
1102
1103         if (ssync->syncid == syncid)
1104             break;
1105     }
1106
1107     /* Synced streams must connect to the same sink */
1108     if (ssync) {
1109
1110         if (!sink)
1111             sink = ssync->sink_input->sink;
1112         else if (sink != ssync->sink_input->sink) {
1113             *ret = PA_ERR_INVALID;
1114             goto out;
1115         }
1116     }
1117
1118     pa_sink_input_new_data_init(&data);
1119
1120     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1121     data.driver = __FILE__;
1122     data.module = c->options->module;
1123     data.client = c->client;
1124     if (sink)
1125         pa_sink_input_new_data_set_sink(&data, sink, TRUE);
1126     if (pa_sample_spec_valid(ss))
1127         pa_sink_input_new_data_set_sample_spec(&data, ss);
1128     if (pa_channel_map_valid(map))
1129         pa_sink_input_new_data_set_channel_map(&data, map);
1130     if (formats) {
1131         pa_sink_input_new_data_set_formats(&data, formats);
1132         /* Ownership transferred to new_data, so we don't free it ourselves */
1133         formats = NULL;
1134     }
1135     if (volume) {
1136         pa_sink_input_new_data_set_volume(&data, volume);
1137         data.volume_is_absolute = !relative_volume;
1138         data.save_volume = TRUE;
1139     }
1140     if (muted_set) {
1141         pa_sink_input_new_data_set_muted(&data, muted);
1142         data.save_muted = TRUE;
1143     }
1144     data.sync_base = ssync ? ssync->sink_input : NULL;
1145     data.flags = flags;
1146
1147     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data);
1148
1149     pa_sink_input_new_data_done(&data);
1150
1151     if (!sink_input)
1152         goto out;
1153
1154     s = pa_msgobject_new(playback_stream);
1155     s->parent.parent.parent.free = playback_stream_free;
1156     s->parent.parent.process_msg = playback_stream_process_msg;
1157     s->connection = c;
1158     s->syncid = syncid;
1159     s->sink_input = sink_input;
1160     s->is_underrun = TRUE;
1161     s->drain_request = FALSE;
1162     pa_atomic_store(&s->missing, 0);
1163     s->buffer_attr_req = *a;
1164     s->adjust_latency = adjust_latency;
1165     s->early_requests = early_requests;
1166     pa_atomic_store(&s->seek_or_post_in_queue, 0);
1167     s->seek_windex = -1;
1168
1169     s->sink_input->parent.process_msg = sink_input_process_msg;
1170     s->sink_input->pop = sink_input_pop_cb;
1171     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1172     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1173     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1174     s->sink_input->kill = sink_input_kill_cb;
1175     s->sink_input->moving = sink_input_moving_cb;
1176     s->sink_input->suspend = sink_input_suspend_cb;
1177     s->sink_input->send_event = sink_input_send_event_cb;
1178     s->sink_input->userdata = s;
1179
1180     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1181
1182     fix_playback_buffer_attr(s);
1183
1184     pa_sink_input_get_silence(sink_input, &silence);
1185     memblockq_name = pa_sprintf_malloc("native protocol playback stream memblockq [%u]", s->sink_input->index);
1186     s->memblockq = pa_memblockq_new(
1187             memblockq_name,
1188             start_index,
1189             s->buffer_attr.maxlength,
1190             s->buffer_attr.tlength,
1191             &sink_input->sample_spec,
1192             s->buffer_attr.prebuf,
1193             s->buffer_attr.minreq,
1194             0,
1195             &silence);
1196     pa_xfree(memblockq_name);
1197     pa_memblock_unref(silence.memblock);
1198
1199     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1200
1201     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1202
1203 #ifdef PROTOCOL_NATIVE_DEBUG
1204     pa_log("missing original: %li", (long int) *missing);
1205 #endif
1206
1207     *ss = s->sink_input->sample_spec;
1208     *map = s->sink_input->channel_map;
1209
1210     pa_idxset_put(c->output_streams, s, &s->index);
1211
1212     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1213                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1214                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1215                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1216                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1217
1218     pa_sink_input_put(s->sink_input);
1219
1220 out:
1221     if (formats)
1222         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
1223
1224     return s;
1225 }
1226
1227 /* Called from IO context */
1228 static void playback_stream_request_bytes(playback_stream *s) {
1229     size_t m, minreq;
1230     int previous_missing;
1231
1232     playback_stream_assert_ref(s);
1233
1234     m = pa_memblockq_pop_missing(s->memblockq);
1235
1236     /* pa_log("request_bytes(%lu) (tlength=%lu minreq=%lu length=%lu really missing=%lli)", */
1237     /*        (unsigned long) m, */
1238     /*        pa_memblockq_get_tlength(s->memblockq), */
1239     /*        pa_memblockq_get_minreq(s->memblockq), */
1240     /*        pa_memblockq_get_length(s->memblockq), */
1241     /*        (long long) pa_memblockq_get_tlength(s->memblockq) - (long long) pa_memblockq_get_length(s->memblockq)); */
1242
1243     if (m <= 0)
1244         return;
1245
1246 #ifdef PROTOCOL_NATIVE_DEBUG
1247     pa_log("request_bytes(%lu)", (unsigned long) m);
1248 #endif
1249
1250     previous_missing = pa_atomic_add(&s->missing, (int) m);
1251     minreq = pa_memblockq_get_minreq(s->memblockq);
1252
1253     if (pa_memblockq_prebuf_active(s->memblockq) ||
1254         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1255         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1256 }
1257
1258 /* Called from main context */
1259 static void playback_stream_send_killed(playback_stream *p) {
1260     pa_tagstruct *t;
1261     playback_stream_assert_ref(p);
1262
1263     t = pa_tagstruct_new(NULL, 0);
1264     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1265     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1266     pa_tagstruct_putu32(t, p->index);
1267     pa_pstream_send_tagstruct(p->connection->pstream, t);
1268 }
1269
1270 /* Called from main context */
1271 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1272     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1273     pa_native_connection_assert_ref(c);
1274
1275     if (!c->protocol)
1276         return -1;
1277
1278     switch (code) {
1279
1280         case CONNECTION_MESSAGE_REVOKE:
1281             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1282             break;
1283
1284         case CONNECTION_MESSAGE_RELEASE:
1285             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1286             break;
1287     }
1288
1289     return 0;
1290 }
1291
1292 /* Called from main context */
1293 static void native_connection_unlink(pa_native_connection *c) {
1294     record_stream *r;
1295     output_stream *o;
1296
1297     pa_assert(c);
1298
1299     if (!c->protocol)
1300         return;
1301
1302     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1303
1304     if (c->options)
1305         pa_native_options_unref(c->options);
1306
1307     while ((r = pa_idxset_first(c->record_streams, NULL)))
1308         record_stream_unlink(r);
1309
1310     while ((o = pa_idxset_first(c->output_streams, NULL)))
1311         if (playback_stream_isinstance(o))
1312             playback_stream_unlink(PLAYBACK_STREAM(o));
1313         else
1314             upload_stream_unlink(UPLOAD_STREAM(o));
1315
1316     if (c->subscription)
1317         pa_subscription_free(c->subscription);
1318
1319     if (c->pstream)
1320         pa_pstream_unlink(c->pstream);
1321
1322     if (c->auth_timeout_event) {
1323         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1324         c->auth_timeout_event = NULL;
1325     }
1326
1327     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1328     c->protocol = NULL;
1329     pa_native_connection_unref(c);
1330 }
1331
1332 /* Called from main context */
1333 static void native_connection_free(pa_object *o) {
1334     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1335
1336     pa_assert(c);
1337
1338     native_connection_unlink(c);
1339
1340     pa_idxset_free(c->record_streams, NULL, NULL);
1341     pa_idxset_free(c->output_streams, NULL, NULL);
1342
1343     pa_pdispatch_unref(c->pdispatch);
1344     pa_pstream_unref(c->pstream);
1345     pa_client_free(c->client);
1346
1347     pa_xfree(c);
1348 }
1349
1350 /* Called from main context */
1351 static void native_connection_send_memblock(pa_native_connection *c) {
1352     uint32_t start;
1353     record_stream *r;
1354
1355     start = PA_IDXSET_INVALID;
1356     for (;;) {
1357         pa_memchunk chunk;
1358
1359         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1360             return;
1361
1362         if (start == PA_IDXSET_INVALID)
1363             start = c->rrobin_index;
1364         else if (start == c->rrobin_index)
1365             return;
1366
1367         if (pa_memblockq_peek(r->memblockq, &chunk) >= 0) {
1368             pa_memchunk schunk = chunk;
1369
1370             if (schunk.length > r->buffer_attr.fragsize)
1371                 schunk.length = r->buffer_attr.fragsize;
1372
1373             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1374
1375             pa_memblockq_drop(r->memblockq, schunk.length);
1376             pa_memblock_unref(schunk.memblock);
1377
1378             return;
1379         }
1380     }
1381 }
1382
1383 /*** sink input callbacks ***/
1384
1385 /* Called from thread context */
1386 static void handle_seek(playback_stream *s, int64_t indexw) {
1387     playback_stream_assert_ref(s);
1388
1389 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1390
1391     if (s->sink_input->thread_info.underrun_for > 0) {
1392
1393 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1394
1395         if (pa_memblockq_is_readable(s->memblockq)) {
1396
1397             /* We just ended an underrun, let's ask the sink
1398              * for a complete rewind rewrite */
1399
1400             pa_log_debug("Requesting rewind due to end of underrun.");
1401             pa_sink_input_request_rewind(s->sink_input,
1402                                          (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 :
1403                                                    s->sink_input->thread_info.underrun_for),
1404                                          FALSE, TRUE, FALSE);
1405         }
1406
1407     } else {
1408         int64_t indexr;
1409
1410         indexr = pa_memblockq_get_read_index(s->memblockq);
1411
1412         if (indexw < indexr) {
1413             /* OK, the sink already asked for this data, so
1414              * let's have it ask us again */
1415
1416             pa_log_debug("Requesting rewind due to rewrite.");
1417             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1418         }
1419     }
1420
1421     playback_stream_request_bytes(s);
1422 }
1423
1424 static void flush_write_no_account(pa_memblockq *q) {
1425     pa_memblockq_flush_write(q, FALSE);
1426 }
1427
1428 /* Called from thread context */
1429 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1430     pa_sink_input *i = PA_SINK_INPUT(o);
1431     playback_stream *s;
1432
1433     pa_sink_input_assert_ref(i);
1434     s = PLAYBACK_STREAM(i->userdata);
1435     playback_stream_assert_ref(s);
1436
1437     switch (code) {
1438
1439         case SINK_INPUT_MESSAGE_SEEK:
1440         case SINK_INPUT_MESSAGE_POST_DATA: {
1441             int64_t windex = pa_memblockq_get_write_index(s->memblockq);
1442
1443             if (code == SINK_INPUT_MESSAGE_SEEK) {
1444                 /* The client side is incapable of accounting correctly
1445                  * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1446                  * able to deal with that. */
1447
1448                 pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1449                 windex = PA_MIN(windex, pa_memblockq_get_write_index(s->memblockq));
1450             }
1451
1452             if (chunk && pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1453                 if (pa_log_ratelimit(PA_LOG_WARN))
1454                     pa_log_warn("Failed to push data into queue");
1455                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1456                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1457             }
1458
1459             /* If more data is in queue, we rewind later instead. */
1460             if (s->seek_windex != -1)
1461                 windex = PA_MIN(windex, s->seek_windex);
1462             if (pa_atomic_dec(&s->seek_or_post_in_queue) > 1)
1463                 s->seek_windex = windex;
1464             else {
1465                 s->seek_windex = -1;
1466                 handle_seek(s, windex);
1467             }
1468             return 0;
1469         }
1470
1471         case SINK_INPUT_MESSAGE_DRAIN:
1472         case SINK_INPUT_MESSAGE_FLUSH:
1473         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1474         case SINK_INPUT_MESSAGE_TRIGGER: {
1475
1476             int64_t windex;
1477             pa_sink_input *isync;
1478             void (*func)(pa_memblockq *bq);
1479
1480             switch (code) {
1481                 case SINK_INPUT_MESSAGE_FLUSH:
1482                     func = flush_write_no_account;
1483                     break;
1484
1485                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1486                     func = pa_memblockq_prebuf_force;
1487                     break;
1488
1489                 case SINK_INPUT_MESSAGE_DRAIN:
1490                 case SINK_INPUT_MESSAGE_TRIGGER:
1491                     func = pa_memblockq_prebuf_disable;
1492                     break;
1493
1494                 default:
1495                     pa_assert_not_reached();
1496             }
1497
1498             windex = pa_memblockq_get_write_index(s->memblockq);
1499             func(s->memblockq);
1500             handle_seek(s, windex);
1501
1502             /* Do the same for all other members in the sync group */
1503             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1504                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1505                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1506                 func(ssync->memblockq);
1507                 handle_seek(ssync, windex);
1508             }
1509
1510             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1511                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1512                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1513                 func(ssync->memblockq);
1514                 handle_seek(ssync, windex);
1515             }
1516
1517             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1518                 if (!pa_memblockq_is_readable(s->memblockq))
1519                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1520                 else {
1521                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1522                     s->drain_request = TRUE;
1523                 }
1524             }
1525
1526             return 0;
1527         }
1528
1529         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1530             /* Atomically get a snapshot of all timing parameters... */
1531             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1532             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1533             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1534             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1535             s->underrun_for = s->sink_input->thread_info.underrun_for;
1536             s->playing_for = s->sink_input->thread_info.playing_for;
1537
1538             return 0;
1539
1540         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1541             int64_t windex;
1542
1543             windex = pa_memblockq_get_write_index(s->memblockq);
1544
1545             pa_memblockq_prebuf_force(s->memblockq);
1546
1547             handle_seek(s, windex);
1548
1549             /* Fall through to the default handler */
1550             break;
1551         }
1552
1553         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1554             pa_usec_t *r = userdata;
1555
1556             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1557
1558             /* Fall through, the default handler will add in the extra
1559              * latency added by the resampler */
1560             break;
1561         }
1562
1563         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1564             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1565             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1566             return 0;
1567         }
1568     }
1569
1570     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1571 }
1572
1573 /* Called from thread context */
1574 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1575     playback_stream *s;
1576
1577     pa_sink_input_assert_ref(i);
1578     s = PLAYBACK_STREAM(i->userdata);
1579     playback_stream_assert_ref(s);
1580     pa_assert(chunk);
1581
1582 #ifdef PROTOCOL_NATIVE_DEBUG
1583     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq));
1584 #endif
1585
1586     if (pa_memblockq_is_readable(s->memblockq))
1587         s->is_underrun = FALSE;
1588     else {
1589         if (!s->is_underrun)
1590             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1591
1592         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1593             s->drain_request = FALSE;
1594             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1595         } else if (!s->is_underrun)
1596             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
1597
1598         s->is_underrun = TRUE;
1599
1600         playback_stream_request_bytes(s);
1601     }
1602
1603     /* This call will not fail with prebuf=0, hence we check for
1604        underrun explicitly above */
1605     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1606         return -1;
1607
1608     chunk->length = PA_MIN(nbytes, chunk->length);
1609
1610     if (i->thread_info.underrun_for > 0)
1611         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1612
1613     pa_memblockq_drop(s->memblockq, chunk->length);
1614     playback_stream_request_bytes(s);
1615
1616     return 0;
1617 }
1618
1619 /* Called from thread context */
1620 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1621     playback_stream *s;
1622
1623     pa_sink_input_assert_ref(i);
1624     s = PLAYBACK_STREAM(i->userdata);
1625     playback_stream_assert_ref(s);
1626
1627     /* If we are in an underrun, then we don't rewind */
1628     if (i->thread_info.underrun_for > 0)
1629         return;
1630
1631     pa_memblockq_rewind(s->memblockq, nbytes);
1632 }
1633
1634 /* Called from thread context */
1635 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1636     playback_stream *s;
1637
1638     pa_sink_input_assert_ref(i);
1639     s = PLAYBACK_STREAM(i->userdata);
1640     playback_stream_assert_ref(s);
1641
1642     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1643 }
1644
1645 /* Called from thread context */
1646 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1647     playback_stream *s;
1648     size_t new_tlength, old_tlength;
1649
1650     pa_sink_input_assert_ref(i);
1651     s = PLAYBACK_STREAM(i->userdata);
1652     playback_stream_assert_ref(s);
1653
1654     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1655     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1656
1657     if (old_tlength < new_tlength) {
1658         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1659         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1660         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1661
1662         if (new_tlength == old_tlength)
1663             pa_log_debug("Failed to increase tlength");
1664         else {
1665             pa_log_debug("Notifying client about increased tlength");
1666             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1667         }
1668     }
1669 }
1670
1671 /* Called from main context */
1672 static void sink_input_kill_cb(pa_sink_input *i) {
1673     playback_stream *s;
1674
1675     pa_sink_input_assert_ref(i);
1676     s = PLAYBACK_STREAM(i->userdata);
1677     playback_stream_assert_ref(s);
1678
1679     playback_stream_send_killed(s);
1680     playback_stream_unlink(s);
1681 }
1682
1683 /* Called from main context */
1684 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1685     playback_stream *s;
1686     pa_tagstruct *t;
1687
1688     pa_sink_input_assert_ref(i);
1689     s = PLAYBACK_STREAM(i->userdata);
1690     playback_stream_assert_ref(s);
1691
1692     if (s->connection->version < 15)
1693       return;
1694
1695     t = pa_tagstruct_new(NULL, 0);
1696     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1697     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1698     pa_tagstruct_putu32(t, s->index);
1699     pa_tagstruct_puts(t, event);
1700     pa_tagstruct_put_proplist(t, pl);
1701     pa_pstream_send_tagstruct(s->connection->pstream, t);
1702 }
1703
1704 /* Called from main context */
1705 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1706     playback_stream *s;
1707     pa_tagstruct *t;
1708
1709     pa_sink_input_assert_ref(i);
1710     s = PLAYBACK_STREAM(i->userdata);
1711     playback_stream_assert_ref(s);
1712
1713     if (s->connection->version < 12)
1714       return;
1715
1716     t = pa_tagstruct_new(NULL, 0);
1717     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1718     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1719     pa_tagstruct_putu32(t, s->index);
1720     pa_tagstruct_put_boolean(t, suspend);
1721     pa_pstream_send_tagstruct(s->connection->pstream, t);
1722 }
1723
1724 /* Called from main context */
1725 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1726     playback_stream *s;
1727     pa_tagstruct *t;
1728
1729     pa_sink_input_assert_ref(i);
1730     s = PLAYBACK_STREAM(i->userdata);
1731     playback_stream_assert_ref(s);
1732
1733     if (!dest)
1734         return;
1735
1736     fix_playback_buffer_attr(s);
1737     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1738     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1739
1740     if (s->connection->version < 12)
1741       return;
1742
1743     t = pa_tagstruct_new(NULL, 0);
1744     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1745     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1746     pa_tagstruct_putu32(t, s->index);
1747     pa_tagstruct_putu32(t, dest->index);
1748     pa_tagstruct_puts(t, dest->name);
1749     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1750
1751     if (s->connection->version >= 13) {
1752         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1753         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1754         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1755         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1756         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1757     }
1758
1759     pa_pstream_send_tagstruct(s->connection->pstream, t);
1760 }
1761
1762 /*** source_output callbacks ***/
1763
1764 /* Called from thread context */
1765 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1766     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1767     record_stream *s;
1768
1769     pa_source_output_assert_ref(o);
1770     s = RECORD_STREAM(o->userdata);
1771     record_stream_assert_ref(s);
1772
1773     switch (code) {
1774         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1775             /* Atomically get a snapshot of all timing parameters... */
1776             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1777             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1778             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1779             return 0;
1780     }
1781
1782     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1783 }
1784
1785 /* Called from thread context */
1786 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1787     record_stream *s;
1788
1789     pa_source_output_assert_ref(o);
1790     s = RECORD_STREAM(o->userdata);
1791     record_stream_assert_ref(s);
1792     pa_assert(chunk);
1793
1794     pa_atomic_add(&s->on_the_fly, chunk->length);
1795     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1796 }
1797
1798 static void source_output_kill_cb(pa_source_output *o) {
1799     record_stream *s;
1800
1801     pa_source_output_assert_ref(o);
1802     s = RECORD_STREAM(o->userdata);
1803     record_stream_assert_ref(s);
1804
1805     record_stream_send_killed(s);
1806     record_stream_unlink(s);
1807 }
1808
1809 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1810     record_stream *s;
1811
1812     pa_source_output_assert_ref(o);
1813     s = RECORD_STREAM(o->userdata);
1814     record_stream_assert_ref(s);
1815
1816     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1817
1818     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1819 }
1820
1821 /* Called from main context */
1822 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1823     record_stream *s;
1824     pa_tagstruct *t;
1825
1826     pa_source_output_assert_ref(o);
1827     s = RECORD_STREAM(o->userdata);
1828     record_stream_assert_ref(s);
1829
1830     if (s->connection->version < 15)
1831       return;
1832
1833     t = pa_tagstruct_new(NULL, 0);
1834     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1835     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1836     pa_tagstruct_putu32(t, s->index);
1837     pa_tagstruct_puts(t, event);
1838     pa_tagstruct_put_proplist(t, pl);
1839     pa_pstream_send_tagstruct(s->connection->pstream, t);
1840 }
1841
1842 /* Called from main context */
1843 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1844     record_stream *s;
1845     pa_tagstruct *t;
1846
1847     pa_source_output_assert_ref(o);
1848     s = RECORD_STREAM(o->userdata);
1849     record_stream_assert_ref(s);
1850
1851     if (s->connection->version < 12)
1852       return;
1853
1854     t = pa_tagstruct_new(NULL, 0);
1855     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1856     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1857     pa_tagstruct_putu32(t, s->index);
1858     pa_tagstruct_put_boolean(t, suspend);
1859     pa_pstream_send_tagstruct(s->connection->pstream, t);
1860 }
1861
1862 /* Called from main context */
1863 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1864     record_stream *s;
1865     pa_tagstruct *t;
1866
1867     pa_source_output_assert_ref(o);
1868     s = RECORD_STREAM(o->userdata);
1869     record_stream_assert_ref(s);
1870
1871     if (!dest)
1872         return;
1873
1874     fix_record_buffer_attr_pre(s);
1875     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1876     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1877     fix_record_buffer_attr_post(s);
1878
1879     if (s->connection->version < 12)
1880       return;
1881
1882     t = pa_tagstruct_new(NULL, 0);
1883     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1884     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1885     pa_tagstruct_putu32(t, s->index);
1886     pa_tagstruct_putu32(t, dest->index);
1887     pa_tagstruct_puts(t, dest->name);
1888     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1889
1890     if (s->connection->version >= 13) {
1891         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1892         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1893         pa_tagstruct_put_usec(t, s->configured_source_latency);
1894     }
1895
1896     pa_pstream_send_tagstruct(s->connection->pstream, t);
1897 }
1898
1899 /*** pdispatch callbacks ***/
1900
1901 static void protocol_error(pa_native_connection *c) {
1902     pa_log("protocol error, kicking client");
1903     native_connection_unlink(c);
1904 }
1905
1906 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1907 if (!(expression)) { \
1908     pa_pstream_send_error((pstream), (tag), (error)); \
1909     return; \
1910 } \
1911 } while(0);
1912
1913 #define CHECK_VALIDITY_GOTO(pstream, expression, tag, error, label) do { \
1914 if (!(expression)) { \
1915     pa_pstream_send_error((pstream), (tag), (error)); \
1916     goto label; \
1917 } \
1918 } while(0);
1919
1920 static pa_tagstruct *reply_new(uint32_t tag) {
1921     pa_tagstruct *reply;
1922
1923     reply = pa_tagstruct_new(NULL, 0);
1924     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1925     pa_tagstruct_putu32(reply, tag);
1926     return reply;
1927 }
1928
1929 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1930     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1931     playback_stream *s;
1932     uint32_t sink_index, syncid, missing = 0;
1933     pa_buffer_attr attr;
1934     const char *name = NULL, *sink_name;
1935     pa_sample_spec ss;
1936     pa_channel_map map;
1937     pa_tagstruct *reply;
1938     pa_sink *sink = NULL;
1939     pa_cvolume volume;
1940     pa_bool_t
1941         corked = FALSE,
1942         no_remap = FALSE,
1943         no_remix = FALSE,
1944         fix_format = FALSE,
1945         fix_rate = FALSE,
1946         fix_channels = FALSE,
1947         no_move = FALSE,
1948         variable_rate = FALSE,
1949         muted = FALSE,
1950         adjust_latency = FALSE,
1951         early_requests = FALSE,
1952         dont_inhibit_auto_suspend = FALSE,
1953         volume_set = TRUE,
1954         muted_set = FALSE,
1955         fail_on_suspend = FALSE,
1956         relative_volume = FALSE,
1957         passthrough = FALSE;
1958
1959     pa_sink_input_flags_t flags = 0;
1960     pa_proplist *p = NULL;
1961     int ret = PA_ERR_INVALID;
1962     uint8_t n_formats = 0;
1963     pa_format_info *format;
1964     pa_idxset *formats = NULL;
1965     uint32_t i;
1966
1967     pa_native_connection_assert_ref(c);
1968     pa_assert(t);
1969     memset(&attr, 0, sizeof(attr));
1970
1971     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1972         pa_tagstruct_get(
1973                 t,
1974                 PA_TAG_SAMPLE_SPEC, &ss,
1975                 PA_TAG_CHANNEL_MAP, &map,
1976                 PA_TAG_U32, &sink_index,
1977                 PA_TAG_STRING, &sink_name,
1978                 PA_TAG_U32, &attr.maxlength,
1979                 PA_TAG_BOOLEAN, &corked,
1980                 PA_TAG_U32, &attr.tlength,
1981                 PA_TAG_U32, &attr.prebuf,
1982                 PA_TAG_U32, &attr.minreq,
1983                 PA_TAG_U32, &syncid,
1984                 PA_TAG_CVOLUME, &volume,
1985                 PA_TAG_INVALID) < 0) {
1986
1987         protocol_error(c);
1988         goto finish;
1989     }
1990
1991     CHECK_VALIDITY_GOTO(c->pstream, c->authorized, tag, PA_ERR_ACCESS, finish);
1992     CHECK_VALIDITY_GOTO(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID, finish);
1993     CHECK_VALIDITY_GOTO(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID, finish);
1994     CHECK_VALIDITY_GOTO(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID, finish);
1995     CHECK_VALIDITY_GOTO(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID, finish);
1996
1997     p = pa_proplist_new();
1998
1999     if (name)
2000         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2001
2002     if (c->version >= 12) {
2003         /* Since 0.9.8 the user can ask for a couple of additional flags */
2004
2005         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2006             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2007             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2008             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2009             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2010             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2011             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2012
2013             protocol_error(c);
2014             goto finish;
2015         }
2016     }
2017
2018     if (c->version >= 13) {
2019
2020         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
2021             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2022             pa_tagstruct_get_proplist(t, p) < 0) {
2023
2024             protocol_error(c);
2025             goto finish;
2026         }
2027     }
2028
2029     if (c->version >= 14) {
2030
2031         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
2032             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2033
2034             protocol_error(c);
2035             goto finish;
2036         }
2037     }
2038
2039     if (c->version >= 15) {
2040
2041         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
2042             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2043             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2044
2045             protocol_error(c);
2046             goto finish;
2047         }
2048     }
2049
2050     if (c->version >= 17) {
2051
2052         if (pa_tagstruct_get_boolean(t, &relative_volume) < 0) {
2053
2054             protocol_error(c);
2055             goto finish;
2056         }
2057     }
2058
2059     if (c->version >= 18) {
2060
2061         if (pa_tagstruct_get_boolean(t, &passthrough) < 0 ) {
2062             protocol_error(c);
2063             goto finish;
2064         }
2065     }
2066
2067     if (c->version >= 21) {
2068
2069         if (pa_tagstruct_getu8(t, &n_formats) < 0) {
2070             protocol_error(c);
2071             goto finish;
2072         }
2073
2074         if (n_formats)
2075             formats = pa_idxset_new(NULL, NULL);
2076
2077         for (i = 0; i < n_formats; i++) {
2078             format = pa_format_info_new();
2079             if (pa_tagstruct_get_format_info(t, format) < 0) {
2080                 protocol_error(c);
2081                 goto finish;
2082             }
2083             pa_idxset_put(formats, format, NULL);
2084         }
2085     }
2086
2087     if (n_formats == 0) {
2088         CHECK_VALIDITY_GOTO(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID, finish);
2089         CHECK_VALIDITY_GOTO(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID, finish);
2090         CHECK_VALIDITY_GOTO(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID, finish);
2091     } else {
2092         PA_IDXSET_FOREACH(format, formats, i) {
2093             CHECK_VALIDITY_GOTO(c->pstream, pa_format_info_valid(format), tag, PA_ERR_INVALID, finish);
2094         }
2095     }
2096
2097     if (!pa_tagstruct_eof(t)) {
2098         protocol_error(c);
2099         goto finish;
2100     }
2101
2102     if (sink_index != PA_INVALID_INDEX) {
2103
2104         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
2105             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2106             goto finish;
2107         }
2108
2109     } else if (sink_name) {
2110
2111         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
2112             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2113             goto finish;
2114         }
2115     }
2116
2117     flags =
2118         (corked ? PA_SINK_INPUT_START_CORKED : 0) |
2119         (no_remap ? PA_SINK_INPUT_NO_REMAP : 0) |
2120         (no_remix ? PA_SINK_INPUT_NO_REMIX : 0) |
2121         (fix_format ? PA_SINK_INPUT_FIX_FORMAT : 0) |
2122         (fix_rate ? PA_SINK_INPUT_FIX_RATE : 0) |
2123         (fix_channels ? PA_SINK_INPUT_FIX_CHANNELS : 0) |
2124         (no_move ? PA_SINK_INPUT_DONT_MOVE : 0) |
2125         (variable_rate ? PA_SINK_INPUT_VARIABLE_RATE : 0) |
2126         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2127         (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0) |
2128         (passthrough ? PA_SINK_INPUT_PASSTHROUGH : 0);
2129
2130     /* Only since protocol version 15 there's a separate muted_set
2131      * flag. For older versions we synthesize it here */
2132     muted_set = muted_set || muted;
2133
2134     s = playback_stream_new(c, sink, &ss, &map, formats, &attr, volume_set ? &volume : NULL, muted, muted_set, flags, p, adjust_latency, early_requests, relative_volume, syncid, &missing, &ret);
2135     /* We no longer own the formats idxset */
2136     formats = NULL;
2137
2138     CHECK_VALIDITY_GOTO(c->pstream, s, tag, ret, finish);
2139
2140     reply = reply_new(tag);
2141     pa_tagstruct_putu32(reply, s->index);
2142     pa_assert(s->sink_input);
2143     pa_tagstruct_putu32(reply, s->sink_input->index);
2144     pa_tagstruct_putu32(reply, missing);
2145
2146 #ifdef PROTOCOL_NATIVE_DEBUG
2147     pa_log("initial request is %u", missing);
2148 #endif
2149
2150     if (c->version >= 9) {
2151         /* Since 0.9.0 we support sending the buffer metrics back to the client */
2152
2153         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2154         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
2155         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
2156         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
2157     }
2158
2159     if (c->version >= 12) {
2160         /* Since 0.9.8 we support sending the chosen sample
2161          * spec/channel map/device/suspend status back to the
2162          * client */
2163
2164         pa_tagstruct_put_sample_spec(reply, &ss);
2165         pa_tagstruct_put_channel_map(reply, &map);
2166
2167         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
2168         pa_tagstruct_puts(reply, s->sink_input->sink->name);
2169
2170         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2171     }
2172
2173     if (c->version >= 13)
2174         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2175
2176     if (c->version >= 21) {
2177         /* Send back the format we negotiated */
2178         if (s->sink_input->format)
2179             pa_tagstruct_put_format_info(reply, s->sink_input->format);
2180         else {
2181             pa_format_info *f = pa_format_info_new();
2182             pa_tagstruct_put_format_info(reply, f);
2183             pa_format_info_free(f);
2184         }
2185     }
2186
2187     pa_pstream_send_tagstruct(c->pstream, reply);
2188
2189 finish:
2190     if (p)
2191         pa_proplist_free(p);
2192     if (formats)
2193         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
2194 }
2195
2196 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2197     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2198     uint32_t channel;
2199
2200     pa_native_connection_assert_ref(c);
2201     pa_assert(t);
2202
2203     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2204         !pa_tagstruct_eof(t)) {
2205         protocol_error(c);
2206         return;
2207     }
2208
2209     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2210
2211     switch (command) {
2212
2213         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2214             playback_stream *s;
2215             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2216                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2217                 return;
2218             }
2219
2220             playback_stream_unlink(s);
2221             break;
2222         }
2223
2224         case PA_COMMAND_DELETE_RECORD_STREAM: {
2225             record_stream *s;
2226             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2227                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2228                 return;
2229             }
2230
2231             record_stream_unlink(s);
2232             break;
2233         }
2234
2235         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2236             upload_stream *s;
2237
2238             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2239                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2240                 return;
2241             }
2242
2243             upload_stream_unlink(s);
2244             break;
2245         }
2246
2247         default:
2248             pa_assert_not_reached();
2249     }
2250
2251     pa_pstream_send_simple_ack(c->pstream, tag);
2252 }
2253
2254 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2255     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2256     record_stream *s;
2257     pa_buffer_attr attr;
2258     uint32_t source_index;
2259     const char *name = NULL, *source_name;
2260     pa_sample_spec ss;
2261     pa_channel_map map;
2262     pa_tagstruct *reply;
2263     pa_source *source = NULL;
2264     pa_cvolume volume;
2265     pa_bool_t
2266         corked = FALSE,
2267         no_remap = FALSE,
2268         no_remix = FALSE,
2269         fix_format = FALSE,
2270         fix_rate = FALSE,
2271         fix_channels = FALSE,
2272         no_move = FALSE,
2273         variable_rate = FALSE,
2274         muted = FALSE,
2275         adjust_latency = FALSE,
2276         peak_detect = FALSE,
2277         early_requests = FALSE,
2278         dont_inhibit_auto_suspend = FALSE,
2279         volume_set = FALSE,
2280         muted_set = FALSE,
2281         fail_on_suspend = FALSE,
2282         relative_volume = FALSE,
2283         passthrough = FALSE;
2284
2285     pa_source_output_flags_t flags = 0;
2286     pa_proplist *p = NULL;
2287     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2288     pa_sink_input *direct_on_input = NULL;
2289     int ret = PA_ERR_INVALID;
2290     uint8_t n_formats = 0;
2291     pa_format_info *format;
2292     pa_idxset *formats = NULL;
2293     uint32_t i;
2294
2295     pa_native_connection_assert_ref(c);
2296     pa_assert(t);
2297
2298     memset(&attr, 0, sizeof(attr));
2299
2300     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2301         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2302         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2303         pa_tagstruct_getu32(t, &source_index) < 0 ||
2304         pa_tagstruct_gets(t, &source_name) < 0 ||
2305         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2306         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2307         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2308
2309         protocol_error(c);
2310         goto finish;
2311     }
2312
2313     CHECK_VALIDITY_GOTO(c->pstream, c->authorized, tag, PA_ERR_ACCESS, finish);
2314     CHECK_VALIDITY_GOTO(c->pstream, !source_name || pa_namereg_is_valid_name_or_wildcard(source_name, PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID, finish);
2315     CHECK_VALIDITY_GOTO(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID, finish);
2316     CHECK_VALIDITY_GOTO(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID, finish);
2317
2318     p = pa_proplist_new();
2319
2320     if (name)
2321         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2322
2323     if (c->version >= 12) {
2324         /* Since 0.9.8 the user can ask for a couple of additional flags */
2325
2326         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2327             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2328             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2329             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2330             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2331             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2332             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2333
2334             protocol_error(c);
2335             goto finish;
2336         }
2337     }
2338
2339     if (c->version >= 13) {
2340
2341         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2342             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2343             pa_tagstruct_get_proplist(t, p) < 0 ||
2344             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2345
2346             protocol_error(c);
2347             goto finish;
2348         }
2349     }
2350
2351     if (c->version >= 14) {
2352
2353         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2354             protocol_error(c);
2355             goto finish;
2356         }
2357     }
2358
2359     if (c->version >= 15) {
2360
2361         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2362             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2363
2364             protocol_error(c);
2365             goto finish;
2366         }
2367     }
2368
2369     if (c->version >= 22) {
2370         /* For newer client versions (with per-source-output volumes), we try
2371          * to make the behaviour for playback and record streams the same. */
2372         volume_set = TRUE;
2373
2374         if (pa_tagstruct_getu8(t, &n_formats) < 0) {
2375             protocol_error(c);
2376             goto finish;
2377         }
2378
2379         if (n_formats)
2380             formats = pa_idxset_new(NULL, NULL);
2381
2382         for (i = 0; i < n_formats; i++) {
2383             format = pa_format_info_new();
2384             if (pa_tagstruct_get_format_info(t, format) < 0) {
2385                 protocol_error(c);
2386                 goto finish;
2387             }
2388             pa_idxset_put(formats, format, NULL);
2389         }
2390
2391         if (pa_tagstruct_get_cvolume(t, &volume) < 0 ||
2392             pa_tagstruct_get_boolean(t, &muted) < 0 ||
2393             pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
2394             pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
2395             pa_tagstruct_get_boolean(t, &relative_volume) < 0 ||
2396             pa_tagstruct_get_boolean(t, &passthrough) < 0) {
2397
2398             protocol_error(c);
2399             goto finish;
2400         }
2401
2402         CHECK_VALIDITY_GOTO(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID, finish);
2403     }
2404
2405     if (n_formats == 0) {
2406         CHECK_VALIDITY_GOTO(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID, finish);
2407         CHECK_VALIDITY_GOTO(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID, finish);
2408         CHECK_VALIDITY_GOTO(c->pstream, c->version < 22 || (volume.channels == ss.channels), tag, PA_ERR_INVALID, finish);
2409         CHECK_VALIDITY_GOTO(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID, finish);
2410     } else {
2411         PA_IDXSET_FOREACH(format, formats, i) {
2412             CHECK_VALIDITY_GOTO(c->pstream, pa_format_info_valid(format), tag, PA_ERR_INVALID, finish);
2413         }
2414     }
2415
2416
2417     if (!pa_tagstruct_eof(t)) {
2418         protocol_error(c);
2419         goto finish;
2420     }
2421
2422     if (source_index != PA_INVALID_INDEX) {
2423
2424         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2425             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2426             goto finish;
2427         }
2428
2429     } else if (source_name) {
2430
2431         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2432             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2433             goto finish;
2434         }
2435     }
2436
2437     if (direct_on_input_idx != PA_INVALID_INDEX) {
2438
2439         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2440             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2441             goto finish;
2442         }
2443     }
2444
2445     flags =
2446         (corked ? PA_SOURCE_OUTPUT_START_CORKED : 0) |
2447         (no_remap ? PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2448         (no_remix ? PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2449         (fix_format ? PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2450         (fix_rate ? PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2451         (fix_channels ? PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2452         (no_move ? PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2453         (variable_rate ? PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2454         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2455         (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0) |
2456         (passthrough ? PA_SOURCE_OUTPUT_PASSTHROUGH : 0);
2457
2458     s = record_stream_new(c, source, &ss, &map, formats, &attr, volume_set ? &volume : NULL, muted, muted_set, flags, p, adjust_latency, early_requests, relative_volume, peak_detect, direct_on_input, &ret);
2459
2460     CHECK_VALIDITY_GOTO(c->pstream, s, tag, ret, finish);
2461
2462     reply = reply_new(tag);
2463     pa_tagstruct_putu32(reply, s->index);
2464     pa_assert(s->source_output);
2465     pa_tagstruct_putu32(reply, s->source_output->index);
2466
2467     if (c->version >= 9) {
2468         /* Since 0.9 we support sending the buffer metrics back to the client */
2469
2470         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2471         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2472     }
2473
2474     if (c->version >= 12) {
2475         /* Since 0.9.8 we support sending the chosen sample
2476          * spec/channel map/device/suspend status back to the
2477          * client */
2478
2479         pa_tagstruct_put_sample_spec(reply, &ss);
2480         pa_tagstruct_put_channel_map(reply, &map);
2481
2482         pa_tagstruct_putu32(reply, s->source_output->source->index);
2483         pa_tagstruct_puts(reply, s->source_output->source->name);
2484
2485         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2486     }
2487
2488     if (c->version >= 13)
2489         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2490
2491     if (c->version >= 22) {
2492         /* Send back the format we negotiated */
2493         if (s->source_output->format)
2494             pa_tagstruct_put_format_info(reply, s->source_output->format);
2495         else {
2496             pa_format_info *f = pa_format_info_new();
2497             pa_tagstruct_put_format_info(reply, f);
2498             pa_format_info_free(f);
2499         }
2500     }
2501
2502     pa_pstream_send_tagstruct(c->pstream, reply);
2503
2504 finish:
2505     if (p)
2506         pa_proplist_free(p);
2507     if (formats)
2508         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
2509 }
2510
2511 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2512     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2513     int ret;
2514
2515     pa_native_connection_assert_ref(c);
2516     pa_assert(t);
2517
2518     if (!pa_tagstruct_eof(t)) {
2519         protocol_error(c);
2520         return;
2521     }
2522
2523     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2524     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2525     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2526
2527     pa_log_debug("Client %s asks us to terminate.", pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY)));
2528
2529     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2530 }
2531
2532 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2533     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2534     const void*cookie;
2535     pa_tagstruct *reply;
2536     pa_bool_t shm_on_remote = FALSE, do_shm;
2537
2538     pa_native_connection_assert_ref(c);
2539     pa_assert(t);
2540
2541     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2542         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2543         !pa_tagstruct_eof(t)) {
2544         protocol_error(c);
2545         return;
2546     }
2547
2548     /* Minimum supported version */
2549     if (c->version < 8) {
2550         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2551         return;
2552     }
2553
2554     /* Starting with protocol version 13 the MSB of the version tag
2555        reflects if shm is available for this pa_native_connection or
2556        not. */
2557     if (c->version >= 13) {
2558         shm_on_remote = !!(c->version & 0x80000000U);
2559         c->version &= 0x7FFFFFFFU;
2560     }
2561
2562     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2563
2564     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2565
2566     if (!c->authorized) {
2567         pa_bool_t success = FALSE;
2568
2569 #ifdef HAVE_CREDS
2570         const pa_creds *creds;
2571
2572         if ((creds = pa_pdispatch_creds(pd))) {
2573             if (creds->uid == getuid())
2574                 success = TRUE;
2575             else if (c->options->auth_group) {
2576                 int r;
2577                 gid_t gid;
2578
2579                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2580                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2581                 else if (gid == creds->gid)
2582                     success = TRUE;
2583
2584                 if (!success) {
2585                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2586                         pa_log_warn("Failed to check group membership.");
2587                     else if (r > 0)
2588                         success = TRUE;
2589                 }
2590             }
2591
2592             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2593                         (unsigned long) creds->uid,
2594                         (unsigned long) creds->gid,
2595                         (int) success);
2596         }
2597 #endif
2598
2599         if (!success && c->options->auth_cookie) {
2600             const uint8_t *ac;
2601
2602             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2603                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2604                     success = TRUE;
2605         }
2606
2607         if (!success) {
2608             pa_log_warn("Denied access to client with invalid authorization data.");
2609             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2610             return;
2611         }
2612
2613         c->authorized = TRUE;
2614         if (c->auth_timeout_event) {
2615             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2616             c->auth_timeout_event = NULL;
2617         }
2618     }
2619
2620     /* Enable shared memory support if possible */
2621     do_shm =
2622         pa_mempool_is_shared(c->protocol->core->mempool) &&
2623         c->is_local;
2624
2625     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2626
2627     if (do_shm)
2628         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2629             do_shm = FALSE;
2630
2631 #ifdef HAVE_CREDS
2632     if (do_shm) {
2633         /* Only enable SHM if both sides are owned by the same
2634          * user. This is a security measure because otherwise data
2635          * private to the user might leak. */
2636
2637         const pa_creds *creds;
2638         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2639             do_shm = FALSE;
2640     }
2641 #endif
2642
2643     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2644     pa_pstream_enable_shm(c->pstream, do_shm);
2645
2646     reply = reply_new(tag);
2647     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2648
2649 #ifdef HAVE_CREDS
2650 {
2651     /* SHM support is only enabled after both sides made sure they are the same user. */
2652
2653     pa_creds ucred;
2654
2655     ucred.uid = getuid();
2656     ucred.gid = getgid();
2657
2658     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2659 }
2660 #else
2661     pa_pstream_send_tagstruct(c->pstream, reply);
2662 #endif
2663 }
2664
2665 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2666     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2667     const char *name = NULL;
2668     pa_proplist *p;
2669     pa_tagstruct *reply;
2670
2671     pa_native_connection_assert_ref(c);
2672     pa_assert(t);
2673
2674     p = pa_proplist_new();
2675
2676     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2677         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2678         !pa_tagstruct_eof(t)) {
2679
2680         protocol_error(c);
2681         pa_proplist_free(p);
2682         return;
2683     }
2684
2685     if (name)
2686         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2687             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2688             pa_proplist_free(p);
2689             return;
2690         }
2691
2692     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2693     pa_proplist_free(p);
2694
2695     reply = reply_new(tag);
2696
2697     if (c->version >= 13)
2698         pa_tagstruct_putu32(reply, c->client->index);
2699
2700     pa_pstream_send_tagstruct(c->pstream, reply);
2701 }
2702
2703 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2704     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2705     const char *name;
2706     uint32_t idx = PA_IDXSET_INVALID;
2707
2708     pa_native_connection_assert_ref(c);
2709     pa_assert(t);
2710
2711     if (pa_tagstruct_gets(t, &name) < 0 ||
2712         !pa_tagstruct_eof(t)) {
2713         protocol_error(c);
2714         return;
2715     }
2716
2717     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2718     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_LOOKUP_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
2719
2720     if (command == PA_COMMAND_LOOKUP_SINK) {
2721         pa_sink *sink;
2722         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2723             idx = sink->index;
2724     } else {
2725         pa_source *source;
2726         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2727         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2728             idx = source->index;
2729     }
2730
2731     if (idx == PA_IDXSET_INVALID)
2732         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2733     else {
2734         pa_tagstruct *reply;
2735         reply = reply_new(tag);
2736         pa_tagstruct_putu32(reply, idx);
2737         pa_pstream_send_tagstruct(c->pstream, reply);
2738     }
2739 }
2740
2741 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2742     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2743     uint32_t idx;
2744     playback_stream *s;
2745
2746     pa_native_connection_assert_ref(c);
2747     pa_assert(t);
2748
2749     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2750         !pa_tagstruct_eof(t)) {
2751         protocol_error(c);
2752         return;
2753     }
2754
2755     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2756     s = pa_idxset_get_by_index(c->output_streams, idx);
2757     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2758     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2759
2760     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2761 }
2762
2763 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2764     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2765     pa_tagstruct *reply;
2766     const pa_mempool_stat *stat;
2767
2768     pa_native_connection_assert_ref(c);
2769     pa_assert(t);
2770
2771     if (!pa_tagstruct_eof(t)) {
2772         protocol_error(c);
2773         return;
2774     }
2775
2776     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2777
2778     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2779
2780     reply = reply_new(tag);
2781     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2782     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2783     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2784     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2785     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2786     pa_pstream_send_tagstruct(c->pstream, reply);
2787 }
2788
2789 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2790     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2791     pa_tagstruct *reply;
2792     playback_stream *s;
2793     struct timeval tv, now;
2794     uint32_t idx;
2795
2796     pa_native_connection_assert_ref(c);
2797     pa_assert(t);
2798
2799     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2800         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2801         !pa_tagstruct_eof(t)) {
2802         protocol_error(c);
2803         return;
2804     }
2805
2806     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2807     s = pa_idxset_get_by_index(c->output_streams, idx);
2808     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2809     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2810
2811     /* Get an atomic snapshot of all timing parameters */
2812     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2813
2814     reply = reply_new(tag);
2815     pa_tagstruct_put_usec(reply,
2816                           s->current_sink_latency +
2817                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2818     pa_tagstruct_put_usec(reply, 0);
2819     pa_tagstruct_put_boolean(reply,
2820                              s->playing_for > 0 &&
2821                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2822                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2823     pa_tagstruct_put_timeval(reply, &tv);
2824     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2825     pa_tagstruct_puts64(reply, s->write_index);
2826     pa_tagstruct_puts64(reply, s->read_index);
2827
2828     if (c->version >= 13) {
2829         pa_tagstruct_putu64(reply, s->underrun_for);
2830         pa_tagstruct_putu64(reply, s->playing_for);
2831     }
2832
2833     pa_pstream_send_tagstruct(c->pstream, reply);
2834 }
2835
2836 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2837     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2838     pa_tagstruct *reply;
2839     record_stream *s;
2840     struct timeval tv, now;
2841     uint32_t idx;
2842
2843     pa_native_connection_assert_ref(c);
2844     pa_assert(t);
2845
2846     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2847         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2848         !pa_tagstruct_eof(t)) {
2849         protocol_error(c);
2850         return;
2851     }
2852
2853     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2854     s = pa_idxset_get_by_index(c->record_streams, idx);
2855     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2856
2857     /* Get an atomic snapshot of all timing parameters */
2858     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2859
2860     reply = reply_new(tag);
2861     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2862     pa_tagstruct_put_usec(reply,
2863                           s->current_source_latency +
2864                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->source->sample_spec));
2865     pa_tagstruct_put_boolean(reply,
2866                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2867                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2868     pa_tagstruct_put_timeval(reply, &tv);
2869     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2870     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2871     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2872     pa_pstream_send_tagstruct(c->pstream, reply);
2873 }
2874
2875 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2876     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2877     upload_stream *s;
2878     uint32_t length;
2879     const char *name = NULL;
2880     pa_sample_spec ss;
2881     pa_channel_map map;
2882     pa_tagstruct *reply;
2883     pa_proplist *p;
2884
2885     pa_native_connection_assert_ref(c);
2886     pa_assert(t);
2887
2888     if (pa_tagstruct_gets(t, &name) < 0 ||
2889         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2890         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2891         pa_tagstruct_getu32(t, &length) < 0) {
2892         protocol_error(c);
2893         return;
2894     }
2895
2896     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2897     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2898     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2899     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2900     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2901     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2902
2903     p = pa_proplist_new();
2904
2905     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2906         !pa_tagstruct_eof(t)) {
2907
2908         protocol_error(c);
2909         pa_proplist_free(p);
2910         return;
2911     }
2912
2913     if (c->version < 13)
2914         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2915     else if (!name)
2916         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2917             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2918
2919     if (!name || !pa_namereg_is_valid_name(name)) {
2920         pa_proplist_free(p);
2921         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2922     }
2923
2924     s = upload_stream_new(c, &ss, &map, name, length, p);
2925     pa_proplist_free(p);
2926
2927     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2928
2929     reply = reply_new(tag);
2930     pa_tagstruct_putu32(reply, s->index);
2931     pa_tagstruct_putu32(reply, length);
2932     pa_pstream_send_tagstruct(c->pstream, reply);
2933 }
2934
2935 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2936     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2937     uint32_t channel;
2938     upload_stream *s;
2939     uint32_t idx;
2940
2941     pa_native_connection_assert_ref(c);
2942     pa_assert(t);
2943
2944     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2945         !pa_tagstruct_eof(t)) {
2946         protocol_error(c);
2947         return;
2948     }
2949
2950     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2951
2952     s = pa_idxset_get_by_index(c->output_streams, channel);
2953     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2954     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2955
2956     if (!s->memchunk.memblock)
2957         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2958     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2959         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2960     else
2961         pa_pstream_send_simple_ack(c->pstream, tag);
2962
2963     upload_stream_unlink(s);
2964 }
2965
2966 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2967     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2968     uint32_t sink_index;
2969     pa_volume_t volume;
2970     pa_sink *sink;
2971     const char *name, *sink_name;
2972     uint32_t idx;
2973     pa_proplist *p;
2974     pa_tagstruct *reply;
2975
2976     pa_native_connection_assert_ref(c);
2977     pa_assert(t);
2978
2979     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2980
2981     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2982         pa_tagstruct_gets(t, &sink_name) < 0 ||
2983         pa_tagstruct_getu32(t, &volume) < 0 ||
2984         pa_tagstruct_gets(t, &name) < 0) {
2985         protocol_error(c);
2986         return;
2987     }
2988
2989     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID);
2990     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2991     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2992     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2993
2994     if (sink_index != PA_INVALID_INDEX)
2995         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2996     else
2997         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2998
2999     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3000
3001     p = pa_proplist_new();
3002
3003     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
3004         !pa_tagstruct_eof(t)) {
3005         protocol_error(c);
3006         pa_proplist_free(p);
3007         return;
3008     }
3009
3010     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
3011
3012     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
3013         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3014         pa_proplist_free(p);
3015         return;
3016     }
3017
3018     pa_proplist_free(p);
3019
3020     reply = reply_new(tag);
3021
3022     if (c->version >= 13)
3023         pa_tagstruct_putu32(reply, idx);
3024
3025     pa_pstream_send_tagstruct(c->pstream, reply);
3026 }
3027
3028 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3029     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3030     const char *name;
3031
3032     pa_native_connection_assert_ref(c);
3033     pa_assert(t);
3034
3035     if (pa_tagstruct_gets(t, &name) < 0 ||
3036         !pa_tagstruct_eof(t)) {
3037         protocol_error(c);
3038         return;
3039     }
3040
3041     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3042     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3043
3044     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
3045         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3046         return;
3047     }
3048
3049     pa_pstream_send_simple_ack(c->pstream, tag);
3050 }
3051
3052 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
3053     pa_assert(c);
3054     pa_assert(fixed);
3055     pa_assert(original);
3056
3057     *fixed = *original;
3058
3059     if (c->version < 12) {
3060         /* Before protocol version 12 we didn't support S32 samples,
3061          * so we need to lie about this to the client */
3062
3063         if (fixed->format == PA_SAMPLE_S32LE)
3064             fixed->format = PA_SAMPLE_FLOAT32LE;
3065         if (fixed->format == PA_SAMPLE_S32BE)
3066             fixed->format = PA_SAMPLE_FLOAT32BE;
3067     }
3068
3069     if (c->version < 15) {
3070         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
3071             fixed->format = PA_SAMPLE_FLOAT32LE;
3072         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
3073             fixed->format = PA_SAMPLE_FLOAT32BE;
3074     }
3075 }
3076
3077 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
3078     pa_sample_spec fixed_ss;
3079
3080     pa_assert(t);
3081     pa_sink_assert_ref(sink);
3082
3083     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
3084
3085     pa_tagstruct_put(
3086         t,
3087         PA_TAG_U32, sink->index,
3088         PA_TAG_STRING, sink->name,
3089         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
3090         PA_TAG_SAMPLE_SPEC, &fixed_ss,
3091         PA_TAG_CHANNEL_MAP, &sink->channel_map,
3092         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
3093         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
3094         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
3095         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
3096         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
3097         PA_TAG_USEC, pa_sink_get_latency(sink),
3098         PA_TAG_STRING, sink->driver,
3099         PA_TAG_U32, sink->flags & PA_SINK_CLIENT_FLAGS_MASK,
3100         PA_TAG_INVALID);
3101
3102     if (c->version >= 13) {
3103         pa_tagstruct_put_proplist(t, sink->proplist);
3104         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
3105     }
3106
3107     if (c->version >= 15) {
3108         pa_tagstruct_put_volume(t, sink->base_volume);
3109         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
3110             pa_log_error("Internal sink state is invalid.");
3111         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
3112         pa_tagstruct_putu32(t, sink->n_volume_steps);
3113         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
3114     }
3115
3116     if (c->version >= 16) {
3117         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
3118
3119         if (sink->ports) {
3120             void *state;
3121             pa_device_port *p;
3122
3123             PA_HASHMAP_FOREACH(p, sink->ports, state) {
3124                 pa_tagstruct_puts(t, p->name);
3125                 pa_tagstruct_puts(t, p->description);
3126                 pa_tagstruct_putu32(t, p->priority);
3127                 if (c->version >= 24)
3128                     pa_tagstruct_putu32(t, p->available);
3129             }
3130         }
3131
3132         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
3133     }
3134
3135     if (c->version >= 21) {
3136         uint32_t i;
3137         pa_format_info *f;
3138         pa_idxset *formats = pa_sink_get_formats(sink);
3139
3140         pa_tagstruct_putu8(t, (uint8_t) pa_idxset_size(formats));
3141         PA_IDXSET_FOREACH(f, formats, i) {
3142             pa_tagstruct_put_format_info(t, f);
3143         }
3144
3145         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
3146     }
3147 }
3148
3149 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
3150     pa_sample_spec fixed_ss;
3151
3152     pa_assert(t);
3153     pa_source_assert_ref(source);
3154
3155     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
3156
3157     pa_tagstruct_put(
3158         t,
3159         PA_TAG_U32, source->index,
3160         PA_TAG_STRING, source->name,
3161         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
3162         PA_TAG_SAMPLE_SPEC, &fixed_ss,
3163         PA_TAG_CHANNEL_MAP, &source->channel_map,
3164         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
3165         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
3166         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
3167         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
3168         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
3169         PA_TAG_USEC, pa_source_get_latency(source),
3170         PA_TAG_STRING, source->driver,
3171         PA_TAG_U32, source->flags & PA_SOURCE_CLIENT_FLAGS_MASK,
3172         PA_TAG_INVALID);
3173
3174     if (c->version >= 13) {
3175         pa_tagstruct_put_proplist(t, source->proplist);
3176         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
3177     }
3178
3179     if (c->version >= 15) {
3180         pa_tagstruct_put_volume(t, source->base_volume);
3181         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
3182             pa_log_error("Internal source state is invalid.");
3183         pa_tagstruct_putu32(t, pa_source_get_state(source));
3184         pa_tagstruct_putu32(t, source->n_volume_steps);
3185         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
3186     }
3187
3188     if (c->version >= 16) {
3189
3190         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
3191
3192         if (source->ports) {
3193             void *state;
3194             pa_device_port *p;
3195
3196             PA_HASHMAP_FOREACH(p, source->ports, state) {
3197                 pa_tagstruct_puts(t, p->name);
3198                 pa_tagstruct_puts(t, p->description);
3199                 pa_tagstruct_putu32(t, p->priority);
3200                 if (c->version >= 24)
3201                     pa_tagstruct_putu32(t, p->available);
3202             }
3203         }
3204
3205         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
3206     }
3207
3208     if (c->version >= 22) {
3209         uint32_t i;
3210         pa_format_info *f;
3211         pa_idxset *formats = pa_source_get_formats(source);
3212
3213         pa_tagstruct_putu8(t, (uint8_t) pa_idxset_size(formats));
3214         PA_IDXSET_FOREACH(f, formats, i) {
3215             pa_tagstruct_put_format_info(t, f);
3216         }
3217
3218         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
3219     }
3220 }
3221
3222 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
3223     pa_assert(t);
3224     pa_assert(client);
3225
3226     pa_tagstruct_putu32(t, client->index);
3227     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
3228     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
3229     pa_tagstruct_puts(t, client->driver);
3230
3231     if (c->version >= 13)
3232         pa_tagstruct_put_proplist(t, client->proplist);
3233 }
3234
3235 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
3236     void *state = NULL;
3237     pa_card_profile *p;
3238
3239     pa_assert(t);
3240     pa_assert(card);
3241
3242     pa_tagstruct_putu32(t, card->index);
3243     pa_tagstruct_puts(t, card->name);
3244     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
3245     pa_tagstruct_puts(t, card->driver);
3246
3247     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
3248
3249     if (card->profiles) {
3250         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
3251             pa_tagstruct_puts(t, p->name);
3252             pa_tagstruct_puts(t, p->description);
3253             pa_tagstruct_putu32(t, p->n_sinks);
3254             pa_tagstruct_putu32(t, p->n_sources);
3255             pa_tagstruct_putu32(t, p->priority);
3256         }
3257     }
3258
3259     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
3260     pa_tagstruct_put_proplist(t, card->proplist);
3261 }
3262
3263 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
3264     pa_assert(t);
3265     pa_assert(module);
3266
3267     pa_tagstruct_putu32(t, module->index);
3268     pa_tagstruct_puts(t, module->name);
3269     pa_tagstruct_puts(t, module->argument);
3270     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
3271
3272     if (c->version < 15)
3273         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
3274
3275     if (c->version >= 15)
3276         pa_tagstruct_put_proplist(t, module->proplist);
3277 }
3278
3279 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
3280     pa_sample_spec fixed_ss;
3281     pa_usec_t sink_latency;
3282     pa_cvolume v;
3283     pa_bool_t has_volume = FALSE;
3284
3285     pa_assert(t);
3286     pa_sink_input_assert_ref(s);
3287
3288     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3289
3290     has_volume = pa_sink_input_is_volume_readable(s);
3291     if (has_volume)
3292         pa_sink_input_get_volume(s, &v, TRUE);
3293     else
3294         pa_cvolume_reset(&v, fixed_ss.channels);
3295
3296     pa_tagstruct_putu32(t, s->index);
3297     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3298     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3299     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3300     pa_tagstruct_putu32(t, s->sink->index);
3301     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3302     pa_tagstruct_put_channel_map(t, &s->channel_map);
3303     pa_tagstruct_put_cvolume(t, &v);
3304     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3305     pa_tagstruct_put_usec(t, sink_latency);
3306     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3307     pa_tagstruct_puts(t, s->driver);
3308     if (c->version >= 11)
3309         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3310     if (c->version >= 13)
3311         pa_tagstruct_put_proplist(t, s->proplist);
3312     if (c->version >= 19)
3313         pa_tagstruct_put_boolean(t, (pa_sink_input_get_state(s) == PA_SINK_INPUT_CORKED));
3314     if (c->version >= 20) {
3315         pa_tagstruct_put_boolean(t, has_volume);
3316         pa_tagstruct_put_boolean(t, s->volume_writable);
3317     }
3318     if (c->version >= 21)
3319         pa_tagstruct_put_format_info(t, s->format);
3320 }
3321
3322 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3323     pa_sample_spec fixed_ss;
3324     pa_usec_t source_latency;
3325     pa_cvolume v;
3326     pa_bool_t has_volume = FALSE;
3327
3328     pa_assert(t);
3329     pa_source_output_assert_ref(s);
3330
3331     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3332
3333     has_volume = pa_source_output_is_volume_readable(s);
3334     if (has_volume)
3335         pa_source_output_get_volume(s, &v, TRUE);
3336     else
3337         pa_cvolume_reset(&v, fixed_ss.channels);
3338
3339     pa_tagstruct_putu32(t, s->index);
3340     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3341     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3342     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3343     pa_tagstruct_putu32(t, s->source->index);
3344     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3345     pa_tagstruct_put_channel_map(t, &s->channel_map);
3346     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3347     pa_tagstruct_put_usec(t, source_latency);
3348     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3349     pa_tagstruct_puts(t, s->driver);
3350     if (c->version >= 13)
3351         pa_tagstruct_put_proplist(t, s->proplist);
3352     if (c->version >= 19)
3353         pa_tagstruct_put_boolean(t, (pa_source_output_get_state(s) == PA_SOURCE_OUTPUT_CORKED));
3354     if (c->version >= 22) {
3355         pa_tagstruct_put_cvolume(t, &v);
3356         pa_tagstruct_put_boolean(t, pa_source_output_get_mute(s));
3357         pa_tagstruct_put_boolean(t, has_volume);
3358         pa_tagstruct_put_boolean(t, s->volume_writable);
3359         pa_tagstruct_put_format_info(t, s->format);
3360     }
3361 }
3362
3363 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3364     pa_sample_spec fixed_ss;
3365     pa_cvolume v;
3366
3367     pa_assert(t);
3368     pa_assert(e);
3369
3370     if (e->memchunk.memblock)
3371         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3372     else
3373         memset(&fixed_ss, 0, sizeof(fixed_ss));
3374
3375     pa_tagstruct_putu32(t, e->index);
3376     pa_tagstruct_puts(t, e->name);
3377
3378     if (e->volume_is_set)
3379         v = e->volume;
3380     else
3381         pa_cvolume_init(&v);
3382
3383     pa_tagstruct_put_cvolume(t, &v);
3384     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3385     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3386     pa_tagstruct_put_channel_map(t, &e->channel_map);
3387     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3388     pa_tagstruct_put_boolean(t, e->lazy);
3389     pa_tagstruct_puts(t, e->filename);
3390
3391     if (c->version >= 13)
3392         pa_tagstruct_put_proplist(t, e->proplist);
3393 }
3394
3395 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3396     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3397     uint32_t idx;
3398     pa_sink *sink = NULL;
3399     pa_source *source = NULL;
3400     pa_client *client = NULL;
3401     pa_card *card = NULL;
3402     pa_module *module = NULL;
3403     pa_sink_input *si = NULL;
3404     pa_source_output *so = NULL;
3405     pa_scache_entry *sce = NULL;
3406     const char *name = NULL;
3407     pa_tagstruct *reply;
3408
3409     pa_native_connection_assert_ref(c);
3410     pa_assert(t);
3411
3412     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3413         (command != PA_COMMAND_GET_CLIENT_INFO &&
3414          command != PA_COMMAND_GET_MODULE_INFO &&
3415          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3416          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3417          pa_tagstruct_gets(t, &name) < 0) ||
3418         !pa_tagstruct_eof(t)) {
3419         protocol_error(c);
3420         return;
3421     }
3422
3423     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3424     CHECK_VALIDITY(c->pstream, !name ||
3425                    (command == PA_COMMAND_GET_SINK_INFO &&
3426                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SINK)) ||
3427                    (command == PA_COMMAND_GET_SOURCE_INFO &&
3428                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SOURCE)) ||
3429                    pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3430     CHECK_VALIDITY(c->pstream, command == PA_COMMAND_GET_SINK_INFO ||
3431                    command == PA_COMMAND_GET_SOURCE_INFO ||
3432                    (idx != PA_INVALID_INDEX || name), tag, PA_ERR_INVALID);
3433     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3434     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3435
3436     if (command == PA_COMMAND_GET_SINK_INFO) {
3437         if (idx != PA_INVALID_INDEX)
3438             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3439         else
3440             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3441     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3442         if (idx != PA_INVALID_INDEX)
3443             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3444         else
3445             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3446     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3447         if (idx != PA_INVALID_INDEX)
3448             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3449         else
3450             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3451     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3452         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3453     else if (command == PA_COMMAND_GET_MODULE_INFO)
3454         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3455     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3456         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3457     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3458         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3459     else {
3460         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3461         if (idx != PA_INVALID_INDEX)
3462             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3463         else
3464             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3465     }
3466
3467     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3468         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3469         return;
3470     }
3471
3472     reply = reply_new(tag);
3473     if (sink)
3474         sink_fill_tagstruct(c, reply, sink);
3475     else if (source)
3476         source_fill_tagstruct(c, reply, source);
3477     else if (client)
3478         client_fill_tagstruct(c, reply, client);
3479     else if (card)
3480         card_fill_tagstruct(c, reply, card);
3481     else if (module)
3482         module_fill_tagstruct(c, reply, module);
3483     else if (si)
3484         sink_input_fill_tagstruct(c, reply, si);
3485     else if (so)
3486         source_output_fill_tagstruct(c, reply, so);
3487     else
3488         scache_fill_tagstruct(c, reply, sce);
3489     pa_pstream_send_tagstruct(c->pstream, reply);
3490 }
3491
3492 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3493     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3494     pa_idxset *i;
3495     uint32_t idx;
3496     void *p;
3497     pa_tagstruct *reply;
3498
3499     pa_native_connection_assert_ref(c);
3500     pa_assert(t);
3501
3502     if (!pa_tagstruct_eof(t)) {
3503         protocol_error(c);
3504         return;
3505     }
3506
3507     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3508
3509     reply = reply_new(tag);
3510
3511     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3512         i = c->protocol->core->sinks;
3513     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3514         i = c->protocol->core->sources;
3515     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3516         i = c->protocol->core->clients;
3517     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3518         i = c->protocol->core->cards;
3519     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3520         i = c->protocol->core->modules;
3521     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3522         i = c->protocol->core->sink_inputs;
3523     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3524         i = c->protocol->core->source_outputs;
3525     else {
3526         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3527         i = c->protocol->core->scache;
3528     }
3529
3530     if (i) {
3531         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3532             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3533                 sink_fill_tagstruct(c, reply, p);
3534             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3535                 source_fill_tagstruct(c, reply, p);
3536             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3537                 client_fill_tagstruct(c, reply, p);
3538             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3539                 card_fill_tagstruct(c, reply, p);
3540             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3541                 module_fill_tagstruct(c, reply, p);
3542             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3543                 sink_input_fill_tagstruct(c, reply, p);
3544             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3545                 source_output_fill_tagstruct(c, reply, p);
3546             else {
3547                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3548                 scache_fill_tagstruct(c, reply, p);
3549             }
3550         }
3551     }
3552
3553     pa_pstream_send_tagstruct(c->pstream, reply);
3554 }
3555
3556 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3557     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3558     pa_tagstruct *reply;
3559     pa_sink *def_sink;
3560     pa_source *def_source;
3561     pa_sample_spec fixed_ss;
3562     char *h, *u;
3563
3564     pa_native_connection_assert_ref(c);
3565     pa_assert(t);
3566
3567     if (!pa_tagstruct_eof(t)) {
3568         protocol_error(c);
3569         return;
3570     }
3571
3572     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3573
3574     reply = reply_new(tag);
3575     pa_tagstruct_puts(reply, PACKAGE_NAME);
3576     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3577
3578     u = pa_get_user_name_malloc();
3579     pa_tagstruct_puts(reply, u);
3580     pa_xfree(u);
3581
3582     h = pa_get_host_name_malloc();
3583     pa_tagstruct_puts(reply, h);
3584     pa_xfree(h);
3585
3586     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3587     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3588
3589     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3590     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3591     def_source = pa_namereg_get_default_source(c->protocol->core);
3592     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3593
3594     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3595
3596     if (c->version >= 15)
3597         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3598
3599     pa_pstream_send_tagstruct(c->pstream, reply);
3600 }
3601
3602 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3603     pa_tagstruct *t;
3604     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3605
3606     pa_native_connection_assert_ref(c);
3607
3608     t = pa_tagstruct_new(NULL, 0);
3609     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3610     pa_tagstruct_putu32(t, (uint32_t) -1);
3611     pa_tagstruct_putu32(t, e);
3612     pa_tagstruct_putu32(t, idx);
3613     pa_pstream_send_tagstruct(c->pstream, t);
3614 }
3615
3616 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3617     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3618     pa_subscription_mask_t m;
3619
3620     pa_native_connection_assert_ref(c);
3621     pa_assert(t);
3622
3623     if (pa_tagstruct_getu32(t, &m) < 0 ||
3624         !pa_tagstruct_eof(t)) {
3625         protocol_error(c);
3626         return;
3627     }
3628
3629     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3630     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3631
3632     if (c->subscription)
3633         pa_subscription_free(c->subscription);
3634
3635     if (m != 0) {
3636         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3637         pa_assert(c->subscription);
3638     } else
3639         c->subscription = NULL;
3640
3641     pa_pstream_send_simple_ack(c->pstream, tag);
3642 }
3643
3644 static void command_set_volume(
3645         pa_pdispatch *pd,
3646         uint32_t command,
3647         uint32_t tag,
3648         pa_tagstruct *t,
3649         void *userdata) {
3650
3651     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3652     uint32_t idx;
3653     pa_cvolume volume;
3654     pa_sink *sink = NULL;
3655     pa_source *source = NULL;
3656     pa_sink_input *si = NULL;
3657     pa_source_output *so = NULL;
3658     const char *name = NULL;
3659     const char *client_name;
3660
3661     pa_native_connection_assert_ref(c);
3662     pa_assert(t);
3663
3664     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3665         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3666         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3667         pa_tagstruct_get_cvolume(t, &volume) ||
3668         !pa_tagstruct_eof(t)) {
3669         protocol_error(c);
3670         return;
3671     }
3672
3673     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3674     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_VOLUME ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3675     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3676     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3677     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3678     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3679
3680     switch (command) {
3681
3682         case PA_COMMAND_SET_SINK_VOLUME:
3683             if (idx != PA_INVALID_INDEX)
3684                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3685             else
3686                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3687             break;
3688
3689         case PA_COMMAND_SET_SOURCE_VOLUME:
3690             if (idx != PA_INVALID_INDEX)
3691                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3692             else
3693                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3694             break;
3695
3696         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3697             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3698             break;
3699
3700         case PA_COMMAND_SET_SOURCE_OUTPUT_VOLUME:
3701             so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3702             break;
3703
3704         default:
3705             pa_assert_not_reached();
3706     }
3707
3708     CHECK_VALIDITY(c->pstream, si || so || sink || source, tag, PA_ERR_NOENTITY);
3709
3710     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3711
3712     if (sink) {
3713         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &sink->sample_spec), tag, PA_ERR_INVALID);
3714
3715         pa_log_debug("Client %s changes volume of sink %s.", client_name, sink->name);
3716         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3717     } else if (source) {
3718         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &source->sample_spec), tag, PA_ERR_INVALID);
3719
3720         pa_log_debug("Client %s changes volume of source %s.", client_name, source->name);
3721         pa_source_set_volume(source, &volume, TRUE, TRUE);
3722     } else if (si) {
3723         CHECK_VALIDITY(c->pstream, si->volume_writable, tag, PA_ERR_BADSTATE);
3724         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &si->sample_spec), tag, PA_ERR_INVALID);
3725
3726         pa_log_debug("Client %s changes volume of sink input %s.",
3727                      client_name,
3728                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3729         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3730     } else if (so) {
3731         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &so->sample_spec), tag, PA_ERR_INVALID);
3732
3733         pa_log_debug("Client %s changes volume of source output %s.",
3734                      client_name,
3735                      pa_strnull(pa_proplist_gets(so->proplist, PA_PROP_MEDIA_NAME)));
3736         pa_source_output_set_volume(so, &volume, TRUE, TRUE);
3737     }
3738
3739     pa_pstream_send_simple_ack(c->pstream, tag);
3740 }
3741
3742 static void command_set_mute(
3743         pa_pdispatch *pd,
3744         uint32_t command,
3745         uint32_t tag,
3746         pa_tagstruct *t,
3747         void *userdata) {
3748
3749     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3750     uint32_t idx;
3751     pa_bool_t mute;
3752     pa_sink *sink = NULL;
3753     pa_source *source = NULL;
3754     pa_sink_input *si = NULL;
3755     pa_source_output *so = NULL;
3756     const char *name = NULL, *client_name;
3757
3758     pa_native_connection_assert_ref(c);
3759     pa_assert(t);
3760
3761     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3762         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3763         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3764         pa_tagstruct_get_boolean(t, &mute) ||
3765         !pa_tagstruct_eof(t)) {
3766         protocol_error(c);
3767         return;
3768     }
3769
3770     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3771     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_MUTE ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3772     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3773     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3774     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3775
3776     switch (command) {
3777
3778         case PA_COMMAND_SET_SINK_MUTE:
3779             if (idx != PA_INVALID_INDEX)
3780                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3781             else
3782                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3783
3784             break;
3785
3786         case PA_COMMAND_SET_SOURCE_MUTE:
3787             if (idx != PA_INVALID_INDEX)
3788                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3789             else
3790                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3791
3792             break;
3793
3794         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3795             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3796             break;
3797
3798         case PA_COMMAND_SET_SOURCE_OUTPUT_MUTE:
3799             so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3800             break;
3801
3802         default:
3803             pa_assert_not_reached();
3804     }
3805
3806     CHECK_VALIDITY(c->pstream, si || so || sink || source, tag, PA_ERR_NOENTITY);
3807
3808     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3809
3810     if (sink) {
3811         pa_log_debug("Client %s changes mute of sink %s.", client_name, sink->name);
3812         pa_sink_set_mute(sink, mute, TRUE);
3813     } else if (source) {
3814         pa_log_debug("Client %s changes mute of source %s.", client_name, source->name);
3815         pa_source_set_mute(source, mute, TRUE);
3816     } else if (si) {
3817         pa_log_debug("Client %s changes mute of sink input %s.",
3818                      client_name,
3819                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3820         pa_sink_input_set_mute(si, mute, TRUE);
3821     } else if (so) {
3822         pa_log_debug("Client %s changes mute of source output %s.",
3823                      client_name,
3824                      pa_strnull(pa_proplist_gets(so->proplist, PA_PROP_MEDIA_NAME)));
3825         pa_source_output_set_mute(so, mute, TRUE);
3826     }
3827
3828     pa_pstream_send_simple_ack(c->pstream, tag);
3829 }
3830
3831 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3832     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3833     uint32_t idx;
3834     pa_bool_t b;
3835     playback_stream *s;
3836
3837     pa_native_connection_assert_ref(c);
3838     pa_assert(t);
3839
3840     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3841         pa_tagstruct_get_boolean(t, &b) < 0 ||
3842         !pa_tagstruct_eof(t)) {
3843         protocol_error(c);
3844         return;
3845     }
3846
3847     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3848     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3849     s = pa_idxset_get_by_index(c->output_streams, idx);
3850     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3851     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3852
3853     pa_sink_input_cork(s->sink_input, b);
3854
3855     if (b)
3856         s->is_underrun = TRUE;
3857
3858     pa_pstream_send_simple_ack(c->pstream, tag);
3859 }
3860
3861 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3862     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3863     uint32_t idx;
3864     playback_stream *s;
3865
3866     pa_native_connection_assert_ref(c);
3867     pa_assert(t);
3868
3869     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3870         !pa_tagstruct_eof(t)) {
3871         protocol_error(c);
3872         return;
3873     }
3874
3875     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3876     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3877     s = pa_idxset_get_by_index(c->output_streams, idx);
3878     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3879     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3880
3881     switch (command) {
3882         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3883             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3884             break;
3885
3886         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3887             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3888             break;
3889
3890         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3891             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3892             break;
3893
3894         default:
3895             pa_assert_not_reached();
3896     }
3897
3898     pa_pstream_send_simple_ack(c->pstream, tag);
3899 }
3900
3901 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3902     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3903     uint32_t idx;
3904     record_stream *s;
3905     pa_bool_t b;
3906
3907     pa_native_connection_assert_ref(c);
3908     pa_assert(t);
3909
3910     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3911         pa_tagstruct_get_boolean(t, &b) < 0 ||
3912         !pa_tagstruct_eof(t)) {
3913         protocol_error(c);
3914         return;
3915     }
3916
3917     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3918     s = pa_idxset_get_by_index(c->record_streams, idx);
3919     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3920
3921     pa_source_output_cork(s->source_output, b);
3922     pa_memblockq_prebuf_force(s->memblockq);
3923     pa_pstream_send_simple_ack(c->pstream, tag);
3924 }
3925
3926 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3927     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3928     uint32_t idx;
3929     record_stream *s;
3930
3931     pa_native_connection_assert_ref(c);
3932     pa_assert(t);
3933
3934     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3935         !pa_tagstruct_eof(t)) {
3936         protocol_error(c);
3937         return;
3938     }
3939
3940     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3941     s = pa_idxset_get_by_index(c->record_streams, idx);
3942     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3943
3944     pa_memblockq_flush_read(s->memblockq);
3945     pa_pstream_send_simple_ack(c->pstream, tag);
3946 }
3947
3948 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3949     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3950     uint32_t idx;
3951     pa_buffer_attr a;
3952     pa_tagstruct *reply;
3953
3954     pa_native_connection_assert_ref(c);
3955     pa_assert(t);
3956
3957     memset(&a, 0, sizeof(a));
3958
3959     if (pa_tagstruct_getu32(t, &idx) < 0) {
3960         protocol_error(c);
3961         return;
3962     }
3963
3964     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3965
3966     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3967         playback_stream *s;
3968         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3969
3970         s = pa_idxset_get_by_index(c->output_streams, idx);
3971         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3972         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3973
3974         if (pa_tagstruct_get(
3975                     t,
3976                     PA_TAG_U32, &a.maxlength,
3977                     PA_TAG_U32, &a.tlength,
3978                     PA_TAG_U32, &a.prebuf,
3979                     PA_TAG_U32, &a.minreq,
3980                     PA_TAG_INVALID) < 0 ||
3981             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3982             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3983             !pa_tagstruct_eof(t)) {
3984             protocol_error(c);
3985             return;
3986         }
3987
3988         s->adjust_latency = adjust_latency;
3989         s->early_requests = early_requests;
3990         s->buffer_attr_req = a;
3991
3992         fix_playback_buffer_attr(s);
3993         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3994
3995         reply = reply_new(tag);
3996         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3997         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3998         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3999         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
4000
4001         if (c->version >= 13)
4002             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
4003
4004     } else {
4005         record_stream *s;
4006         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
4007         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
4008
4009         s = pa_idxset_get_by_index(c->record_streams, idx);
4010         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4011
4012         if (pa_tagstruct_get(
4013                     t,
4014                     PA_TAG_U32, &a.maxlength,
4015                     PA_TAG_U32, &a.fragsize,
4016                     PA_TAG_INVALID) < 0 ||
4017             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
4018             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
4019             !pa_tagstruct_eof(t)) {
4020             protocol_error(c);
4021             return;
4022         }
4023
4024         s->adjust_latency = adjust_latency;
4025         s->early_requests = early_requests;
4026         s->buffer_attr_req = a;
4027
4028         fix_record_buffer_attr_pre(s);
4029         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
4030         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
4031         fix_record_buffer_attr_post(s);
4032
4033         reply = reply_new(tag);
4034         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
4035         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
4036
4037         if (c->version >= 13)
4038             pa_tagstruct_put_usec(reply, s->configured_source_latency);
4039     }
4040
4041     pa_pstream_send_tagstruct(c->pstream, reply);
4042 }
4043
4044 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4045     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4046     uint32_t idx;
4047     uint32_t rate;
4048
4049     pa_native_connection_assert_ref(c);
4050     pa_assert(t);
4051
4052     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4053         pa_tagstruct_getu32(t, &rate) < 0 ||
4054         !pa_tagstruct_eof(t)) {
4055         protocol_error(c);
4056         return;
4057     }
4058
4059     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4060     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
4061
4062     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
4063         playback_stream *s;
4064
4065         s = pa_idxset_get_by_index(c->output_streams, idx);
4066         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4067         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4068
4069         pa_sink_input_set_rate(s->sink_input, rate);
4070
4071     } else {
4072         record_stream *s;
4073         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
4074
4075         s = pa_idxset_get_by_index(c->record_streams, idx);
4076         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4077
4078         pa_source_output_set_rate(s->source_output, rate);
4079     }
4080
4081     pa_pstream_send_simple_ack(c->pstream, tag);
4082 }
4083
4084 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4085     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4086     uint32_t idx;
4087     uint32_t mode;
4088     pa_proplist *p;
4089
4090     pa_native_connection_assert_ref(c);
4091     pa_assert(t);
4092
4093     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4094
4095     p = pa_proplist_new();
4096
4097     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
4098
4099         if (pa_tagstruct_getu32(t, &mode) < 0 ||
4100             pa_tagstruct_get_proplist(t, p) < 0 ||
4101             !pa_tagstruct_eof(t)) {
4102             protocol_error(c);
4103             pa_proplist_free(p);
4104             return;
4105         }
4106
4107     } else {
4108
4109         if (pa_tagstruct_getu32(t, &idx) < 0 ||
4110             pa_tagstruct_getu32(t, &mode) < 0 ||
4111             pa_tagstruct_get_proplist(t, p) < 0 ||
4112             !pa_tagstruct_eof(t)) {
4113             protocol_error(c);
4114             pa_proplist_free(p);
4115             return;
4116         }
4117     }
4118
4119     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
4120         pa_proplist_free(p);
4121         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
4122     }
4123
4124     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
4125         playback_stream *s;
4126
4127         s = pa_idxset_get_by_index(c->output_streams, idx);
4128         if (!s || !playback_stream_isinstance(s)) {
4129             pa_proplist_free(p);
4130             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
4131         }
4132         pa_sink_input_update_proplist(s->sink_input, mode, p);
4133
4134     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
4135         record_stream *s;
4136
4137         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
4138             pa_proplist_free(p);
4139             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
4140         }
4141         pa_source_output_update_proplist(s->source_output, mode, p);
4142
4143     } else {
4144         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
4145
4146         pa_client_update_proplist(c->client, mode, p);
4147     }
4148
4149     pa_pstream_send_simple_ack(c->pstream, tag);
4150     pa_proplist_free(p);
4151 }
4152
4153 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4154     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4155     uint32_t idx;
4156     unsigned changed = 0;
4157     pa_proplist *p;
4158     pa_strlist *l = NULL;
4159
4160     pa_native_connection_assert_ref(c);
4161     pa_assert(t);
4162
4163     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4164
4165     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
4166
4167         if (pa_tagstruct_getu32(t, &idx) < 0) {
4168             protocol_error(c);
4169             return;
4170         }
4171     }
4172
4173     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
4174         playback_stream *s;
4175
4176         s = pa_idxset_get_by_index(c->output_streams, idx);
4177         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4178         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4179
4180         p = s->sink_input->proplist;
4181
4182     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
4183         record_stream *s;
4184
4185         s = pa_idxset_get_by_index(c->record_streams, idx);
4186         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4187
4188         p = s->source_output->proplist;
4189     } else {
4190         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
4191
4192         p = c->client->proplist;
4193     }
4194
4195     for (;;) {
4196         const char *k;
4197
4198         if (pa_tagstruct_gets(t, &k) < 0) {
4199             protocol_error(c);
4200             pa_strlist_free(l);
4201             return;
4202         }
4203
4204         if (!k)
4205             break;
4206
4207         l = pa_strlist_prepend(l, k);
4208     }
4209
4210     if (!pa_tagstruct_eof(t)) {
4211         protocol_error(c);
4212         pa_strlist_free(l);
4213         return;
4214     }
4215
4216     for (;;) {
4217         char *z;
4218
4219         l = pa_strlist_pop(l, &z);
4220
4221         if (!z)
4222             break;
4223
4224         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
4225         pa_xfree(z);
4226     }
4227
4228     pa_pstream_send_simple_ack(c->pstream, tag);
4229
4230     if (changed) {
4231         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
4232             playback_stream *s;
4233
4234             s = pa_idxset_get_by_index(c->output_streams, idx);
4235             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
4236
4237         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
4238             record_stream *s;
4239
4240             s = pa_idxset_get_by_index(c->record_streams, idx);
4241             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
4242
4243         } else {
4244             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
4245             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
4246         }
4247     }
4248 }
4249
4250 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4251     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4252     const char *s;
4253
4254     pa_native_connection_assert_ref(c);
4255     pa_assert(t);
4256
4257     if (pa_tagstruct_gets(t, &s) < 0 ||
4258         !pa_tagstruct_eof(t)) {
4259         protocol_error(c);
4260         return;
4261     }
4262
4263     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4264     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
4265
4266     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
4267         pa_source *source;
4268
4269         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
4270         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4271
4272         pa_namereg_set_default_source(c->protocol->core, source);
4273     } else {
4274         pa_sink *sink;
4275         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
4276
4277         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
4278         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4279
4280         pa_namereg_set_default_sink(c->protocol->core, sink);
4281     }
4282
4283     pa_pstream_send_simple_ack(c->pstream, tag);
4284 }
4285
4286 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4287     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4288     uint32_t idx;
4289     const char *name;
4290
4291     pa_native_connection_assert_ref(c);
4292     pa_assert(t);
4293
4294     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4295         pa_tagstruct_gets(t, &name) < 0 ||
4296         !pa_tagstruct_eof(t)) {
4297         protocol_error(c);
4298         return;
4299     }
4300
4301     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4302     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
4303
4304     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
4305         playback_stream *s;
4306
4307         s = pa_idxset_get_by_index(c->output_streams, idx);
4308         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4309         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4310
4311         pa_sink_input_set_name(s->sink_input, name);
4312
4313     } else {
4314         record_stream *s;
4315         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
4316
4317         s = pa_idxset_get_by_index(c->record_streams, idx);
4318         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4319
4320         pa_source_output_set_name(s->source_output, name);
4321     }
4322
4323     pa_pstream_send_simple_ack(c->pstream, tag);
4324 }
4325
4326 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4327     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4328     uint32_t idx;
4329
4330     pa_native_connection_assert_ref(c);
4331     pa_assert(t);
4332
4333     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4334         !pa_tagstruct_eof(t)) {
4335         protocol_error(c);
4336         return;
4337     }
4338
4339     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4340
4341     if (command == PA_COMMAND_KILL_CLIENT) {
4342         pa_client *client;
4343
4344         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
4345         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
4346
4347         pa_native_connection_ref(c);
4348         pa_client_kill(client);
4349
4350     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
4351         pa_sink_input *s;
4352
4353         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4354         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4355
4356         pa_native_connection_ref(c);
4357         pa_sink_input_kill(s);
4358     } else {
4359         pa_source_output *s;
4360
4361         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4362
4363         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4364         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4365
4366         pa_native_connection_ref(c);
4367         pa_source_output_kill(s);
4368     }
4369
4370     pa_pstream_send_simple_ack(c->pstream, tag);
4371     pa_native_connection_unref(c);
4372 }
4373
4374 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4375     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4376     pa_module *m;
4377     const char *name, *argument;
4378     pa_tagstruct *reply;
4379
4380     pa_native_connection_assert_ref(c);
4381     pa_assert(t);
4382
4383     if (pa_tagstruct_gets(t, &name) < 0 ||
4384         pa_tagstruct_gets(t, &argument) < 0 ||
4385         !pa_tagstruct_eof(t)) {
4386         protocol_error(c);
4387         return;
4388     }
4389
4390     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4391     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4392     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4393
4394     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4395         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4396         return;
4397     }
4398
4399     reply = reply_new(tag);
4400     pa_tagstruct_putu32(reply, m->index);
4401     pa_pstream_send_tagstruct(c->pstream, reply);
4402 }
4403
4404 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4405     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4406     uint32_t idx;
4407     pa_module *m;
4408
4409     pa_native_connection_assert_ref(c);
4410     pa_assert(t);
4411
4412     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4413         !pa_tagstruct_eof(t)) {
4414         protocol_error(c);
4415         return;
4416     }
4417
4418     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4419     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4420     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4421
4422     pa_module_unload_request(m, FALSE);
4423     pa_pstream_send_simple_ack(c->pstream, tag);
4424 }
4425
4426 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4427     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4428     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4429     const char *name_device = NULL;
4430
4431     pa_native_connection_assert_ref(c);
4432     pa_assert(t);
4433
4434     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4435         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4436         pa_tagstruct_gets(t, &name_device) < 0 ||
4437         !pa_tagstruct_eof(t)) {
4438         protocol_error(c);
4439         return;
4440     }
4441
4442     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4443     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4444
4445     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name_or_wildcard(name_device, command == PA_COMMAND_MOVE_SINK_INPUT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4446     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4447     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4448     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4449
4450     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4451         pa_sink_input *si = NULL;
4452         pa_sink *sink = NULL;
4453
4454         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4455
4456         if (idx_device != PA_INVALID_INDEX)
4457             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4458         else
4459             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4460
4461         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4462
4463         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4464             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4465             return;
4466         }
4467     } else {
4468         pa_source_output *so = NULL;
4469         pa_source *source;
4470
4471         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4472
4473         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4474
4475         if (idx_device != PA_INVALID_INDEX)
4476             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4477         else
4478             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4479
4480         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4481
4482         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4483             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4484             return;
4485         }
4486     }
4487
4488     pa_pstream_send_simple_ack(c->pstream, tag);
4489 }
4490
4491 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4492     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4493     uint32_t idx = PA_INVALID_INDEX;
4494     const char *name = NULL;
4495     pa_bool_t b;
4496
4497     pa_native_connection_assert_ref(c);
4498     pa_assert(t);
4499
4500     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4501         pa_tagstruct_gets(t, &name) < 0 ||
4502         pa_tagstruct_get_boolean(t, &b) < 0 ||
4503         !pa_tagstruct_eof(t)) {
4504         protocol_error(c);
4505         return;
4506     }
4507
4508     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4509     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SUSPEND_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE) || *name == 0, tag, PA_ERR_INVALID);
4510     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4511     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4512     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4513
4514     if (command == PA_COMMAND_SUSPEND_SINK) {
4515
4516         if (idx == PA_INVALID_INDEX && name && !*name) {
4517
4518             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4519
4520             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4521                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4522                 return;
4523             }
4524         } else {
4525             pa_sink *sink = NULL;
4526
4527             if (idx != PA_INVALID_INDEX)
4528                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4529             else
4530                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4531
4532             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4533
4534             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4535                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4536                 return;
4537             }
4538         }
4539     } else {
4540
4541         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4542
4543         if (idx == PA_INVALID_INDEX && name && !*name) {
4544
4545             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4546
4547             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4548                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4549                 return;
4550             }
4551
4552         } else {
4553             pa_source *source;
4554
4555             if (idx != PA_INVALID_INDEX)
4556                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4557             else
4558                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4559
4560             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4561
4562             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4563                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4564                 return;
4565             }
4566         }
4567     }
4568
4569     pa_pstream_send_simple_ack(c->pstream, tag);
4570 }
4571
4572 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4573     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4574     uint32_t idx = PA_INVALID_INDEX;
4575     const char *name = NULL;
4576     pa_module *m;
4577     pa_native_protocol_ext_cb_t cb;
4578
4579     pa_native_connection_assert_ref(c);
4580     pa_assert(t);
4581
4582     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4583         pa_tagstruct_gets(t, &name) < 0) {
4584         protocol_error(c);
4585         return;
4586     }
4587
4588     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4589     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4590     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4591     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4592     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4593
4594     if (idx != PA_INVALID_INDEX)
4595         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4596     else {
4597         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4598             if (strcmp(name, m->name) == 0)
4599                 break;
4600     }
4601
4602     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4603     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4604
4605     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4606     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4607
4608     if (cb(c->protocol, m, c, tag, t) < 0)
4609         protocol_error(c);
4610 }
4611
4612 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4613     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4614     uint32_t idx = PA_INVALID_INDEX;
4615     const char *name = NULL, *profile = NULL;
4616     pa_card *card = NULL;
4617     int ret;
4618
4619     pa_native_connection_assert_ref(c);
4620     pa_assert(t);
4621
4622     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4623         pa_tagstruct_gets(t, &name) < 0 ||
4624         pa_tagstruct_gets(t, &profile) < 0 ||
4625         !pa_tagstruct_eof(t)) {
4626         protocol_error(c);
4627         return;
4628     }
4629
4630     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4631     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4632     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4633     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4634     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4635
4636     if (idx != PA_INVALID_INDEX)
4637         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4638     else
4639         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4640
4641     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4642
4643     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4644         pa_pstream_send_error(c->pstream, tag, -ret);
4645         return;
4646     }
4647
4648     pa_pstream_send_simple_ack(c->pstream, tag);
4649 }
4650
4651 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4652     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4653     uint32_t idx = PA_INVALID_INDEX;
4654     const char *name = NULL, *port = NULL;
4655     int ret;
4656
4657     pa_native_connection_assert_ref(c);
4658     pa_assert(t);
4659
4660     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4661         pa_tagstruct_gets(t, &name) < 0 ||
4662         pa_tagstruct_gets(t, &port) < 0 ||
4663         !pa_tagstruct_eof(t)) {
4664         protocol_error(c);
4665         return;
4666     }
4667
4668     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4669     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_PORT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4670     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4671     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4672     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4673
4674     if (command == PA_COMMAND_SET_SINK_PORT) {
4675         pa_sink *sink;
4676
4677         if (idx != PA_INVALID_INDEX)
4678             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4679         else
4680             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4681
4682         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4683
4684         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4685             pa_pstream_send_error(c->pstream, tag, -ret);
4686             return;
4687         }
4688     } else {
4689         pa_source *source;
4690
4691         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4692
4693         if (idx != PA_INVALID_INDEX)
4694             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4695         else
4696             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4697
4698         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4699
4700         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4701             pa_pstream_send_error(c->pstream, tag, -ret);
4702             return;
4703         }
4704     }
4705
4706     pa_pstream_send_simple_ack(c->pstream, tag);
4707 }
4708
4709 /*** pstream callbacks ***/
4710
4711 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4712     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4713
4714     pa_assert(p);
4715     pa_assert(packet);
4716     pa_native_connection_assert_ref(c);
4717
4718     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4719         pa_log("invalid packet.");
4720         native_connection_unlink(c);
4721     }
4722 }
4723
4724 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4725     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4726     output_stream *stream;
4727
4728     pa_assert(p);
4729     pa_assert(chunk);
4730     pa_native_connection_assert_ref(c);
4731
4732     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4733         pa_log_debug("Client sent block for invalid stream.");
4734         /* Ignoring */
4735         return;
4736     }
4737
4738 #ifdef PROTOCOL_NATIVE_DEBUG
4739     pa_log("got %lu bytes from client", (unsigned long) chunk->length);
4740 #endif
4741
4742     if (playback_stream_isinstance(stream)) {
4743         playback_stream *ps = PLAYBACK_STREAM(stream);
4744
4745         pa_atomic_inc(&ps->seek_or_post_in_queue);
4746         if (chunk->memblock) {
4747             if (seek != PA_SEEK_RELATIVE || offset != 0)
4748                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, chunk, NULL);
4749             else
4750                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4751         } else
4752             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4753
4754     } else {
4755         upload_stream *u = UPLOAD_STREAM(stream);
4756         size_t l;
4757
4758         if (!u->memchunk.memblock) {
4759             if (u->length == chunk->length && chunk->memblock) {
4760                 u->memchunk = *chunk;
4761                 pa_memblock_ref(u->memchunk.memblock);
4762                 u->length = 0;
4763             } else {
4764                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4765                 u->memchunk.index = u->memchunk.length = 0;
4766             }
4767         }
4768
4769         pa_assert(u->memchunk.memblock);
4770
4771         l = u->length;
4772         if (l > chunk->length)
4773             l = chunk->length;
4774
4775         if (l > 0) {
4776             void *dst;
4777             dst = pa_memblock_acquire(u->memchunk.memblock);
4778
4779             if (chunk->memblock) {
4780                 void *src;
4781                 src = pa_memblock_acquire(chunk->memblock);
4782
4783                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4784                        (uint8_t*) src + chunk->index, l);
4785
4786                 pa_memblock_release(chunk->memblock);
4787             } else
4788                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4789
4790             pa_memblock_release(u->memchunk.memblock);
4791
4792             u->memchunk.length += l;
4793             u->length -= l;
4794         }
4795     }
4796 }
4797
4798 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4799     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4800
4801     pa_assert(p);
4802     pa_native_connection_assert_ref(c);
4803
4804     native_connection_unlink(c);
4805     pa_log_info("Connection died.");
4806 }
4807
4808 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4809     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4810
4811     pa_assert(p);
4812     pa_native_connection_assert_ref(c);
4813
4814     native_connection_send_memblock(c);
4815 }
4816
4817 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4818     pa_thread_mq *q;
4819
4820     if (!(q = pa_thread_mq_get()))
4821         pa_pstream_send_revoke(p, block_id);
4822     else
4823         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4824 }
4825
4826 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4827     pa_thread_mq *q;
4828
4829     if (!(q = pa_thread_mq_get()))
4830         pa_pstream_send_release(p, block_id);
4831     else
4832         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4833 }
4834
4835 /*** client callbacks ***/
4836
4837 static void client_kill_cb(pa_client *c) {
4838     pa_assert(c);
4839
4840     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4841     pa_log_info("Connection killed.");
4842 }
4843
4844 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4845     pa_tagstruct *t;
4846     pa_native_connection *c;
4847
4848     pa_assert(client);
4849     c = PA_NATIVE_CONNECTION(client->userdata);
4850     pa_native_connection_assert_ref(c);
4851
4852     if (c->version < 15)
4853       return;
4854
4855     t = pa_tagstruct_new(NULL, 0);
4856     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4857     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4858     pa_tagstruct_puts(t, event);
4859     pa_tagstruct_put_proplist(t, pl);
4860     pa_pstream_send_tagstruct(c->pstream, t);
4861 }
4862
4863 /*** module entry points ***/
4864
4865 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4866     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4867
4868     pa_assert(m);
4869     pa_native_connection_assert_ref(c);
4870     pa_assert(c->auth_timeout_event == e);
4871
4872     if (!c->authorized) {
4873         native_connection_unlink(c);
4874         pa_log_info("Connection terminated due to authentication timeout.");
4875     }
4876 }
4877
4878 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4879     pa_native_connection *c;
4880     char pname[128];
4881     pa_client *client;
4882     pa_client_new_data data;
4883
4884     pa_assert(p);
4885     pa_assert(io);
4886     pa_assert(o);
4887
4888     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4889         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4890         pa_iochannel_free(io);
4891         return;
4892     }
4893
4894     pa_client_new_data_init(&data);
4895     data.module = o->module;
4896     data.driver = __FILE__;
4897     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4898     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4899     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4900     client = pa_client_new(p->core, &data);
4901     pa_client_new_data_done(&data);
4902
4903     if (!client)
4904         return;
4905
4906     c = pa_msgobject_new(pa_native_connection);
4907     c->parent.parent.free = native_connection_free;
4908     c->parent.process_msg = native_connection_process_msg;
4909     c->protocol = p;
4910     c->options = pa_native_options_ref(o);
4911     c->authorized = FALSE;
4912
4913     if (o->auth_anonymous) {
4914         pa_log_info("Client authenticated anonymously.");
4915         c->authorized = TRUE;
4916     }
4917
4918     if (!c->authorized &&
4919         o->auth_ip_acl &&
4920         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4921
4922         pa_log_info("Client authenticated by IP ACL.");
4923         c->authorized = TRUE;
4924     }
4925
4926     if (!c->authorized)
4927         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4928     else
4929         c->auth_timeout_event = NULL;
4930
4931     c->is_local = pa_iochannel_socket_is_local(io);
4932     c->version = 8;
4933
4934     c->client = client;
4935     c->client->kill = client_kill_cb;
4936     c->client->send_event = client_send_event_cb;
4937     c->client->userdata = c;
4938
4939     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4940     pa_pstream_set_receive_packet_callback(c->pstream, pstream_packet_callback, c);
4941     pa_pstream_set_receive_memblock_callback(c->pstream, pstream_memblock_callback, c);
4942     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4943     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4944     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4945     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4946
4947     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4948
4949     c->record_streams = pa_idxset_new(NULL, NULL);
4950     c->output_streams = pa_idxset_new(NULL, NULL);
4951
4952     c->rrobin_index = PA_IDXSET_INVALID;
4953     c->subscription = NULL;
4954
4955     pa_idxset_put(p->connections, c, NULL);
4956
4957 #ifdef HAVE_CREDS
4958     if (pa_iochannel_creds_supported(io))
4959         pa_iochannel_creds_enable(io);
4960 #endif
4961
4962     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4963 }
4964
4965 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4966     pa_native_connection *c;
4967     void *state = NULL;
4968
4969     pa_assert(p);
4970     pa_assert(m);
4971
4972     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4973         if (c->options->module == m)
4974             native_connection_unlink(c);
4975 }
4976
4977 static pa_native_protocol* native_protocol_new(pa_core *c) {
4978     pa_native_protocol *p;
4979     pa_native_hook_t h;
4980
4981     pa_assert(c);
4982
4983     p = pa_xnew(pa_native_protocol, 1);
4984     PA_REFCNT_INIT(p);
4985     p->core = c;
4986     p->connections = pa_idxset_new(NULL, NULL);
4987
4988     p->servers = NULL;
4989
4990     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4991
4992     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4993         pa_hook_init(&p->hooks[h], p);
4994
4995     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4996
4997     return p;
4998 }
4999
5000 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
5001     pa_native_protocol *p;
5002
5003     if ((p = pa_shared_get(c, "native-protocol")))
5004         return pa_native_protocol_ref(p);
5005
5006     return native_protocol_new(c);
5007 }
5008
5009 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
5010     pa_assert(p);
5011     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5012
5013     PA_REFCNT_INC(p);
5014
5015     return p;
5016 }
5017
5018 void pa_native_protocol_unref(pa_native_protocol *p) {
5019     pa_native_connection *c;
5020     pa_native_hook_t h;
5021
5022     pa_assert(p);
5023     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5024
5025     if (PA_REFCNT_DEC(p) > 0)
5026         return;
5027
5028     while ((c = pa_idxset_first(p->connections, NULL)))
5029         native_connection_unlink(c);
5030
5031     pa_idxset_free(p->connections, NULL, NULL);
5032
5033     pa_strlist_free(p->servers);
5034
5035     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
5036         pa_hook_done(&p->hooks[h]);
5037
5038     pa_hashmap_free(p->extensions, NULL, NULL);
5039
5040     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
5041
5042     pa_xfree(p);
5043 }
5044
5045 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
5046     pa_assert(p);
5047     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5048     pa_assert(name);
5049
5050     p->servers = pa_strlist_prepend(p->servers, name);
5051
5052     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
5053 }
5054
5055 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
5056     pa_assert(p);
5057     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5058     pa_assert(name);
5059
5060     p->servers = pa_strlist_remove(p->servers, name);
5061
5062     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
5063 }
5064
5065 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
5066     pa_assert(p);
5067     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5068
5069     return p->hooks;
5070 }
5071
5072 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
5073     pa_assert(p);
5074     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5075
5076     return p->servers;
5077 }
5078
5079 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
5080     pa_assert(p);
5081     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5082     pa_assert(m);
5083     pa_assert(cb);
5084     pa_assert(!pa_hashmap_get(p->extensions, m));
5085
5086     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
5087     return 0;
5088 }
5089
5090 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
5091     pa_assert(p);
5092     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5093     pa_assert(m);
5094
5095     pa_assert_se(pa_hashmap_remove(p->extensions, m));
5096 }
5097
5098 pa_native_options* pa_native_options_new(void) {
5099     pa_native_options *o;
5100
5101     o = pa_xnew0(pa_native_options, 1);
5102     PA_REFCNT_INIT(o);
5103
5104     return o;
5105 }
5106
5107 pa_native_options* pa_native_options_ref(pa_native_options *o) {
5108     pa_assert(o);
5109     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5110
5111     PA_REFCNT_INC(o);
5112
5113     return o;
5114 }
5115
5116 void pa_native_options_unref(pa_native_options *o) {
5117     pa_assert(o);
5118     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5119
5120     if (PA_REFCNT_DEC(o) > 0)
5121         return;
5122
5123     pa_xfree(o->auth_group);
5124
5125     if (o->auth_ip_acl)
5126         pa_ip_acl_free(o->auth_ip_acl);
5127
5128     if (o->auth_cookie)
5129         pa_auth_cookie_unref(o->auth_cookie);
5130
5131     pa_xfree(o);
5132 }
5133
5134 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
5135     pa_bool_t enabled;
5136     const char *acl;
5137
5138     pa_assert(o);
5139     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5140     pa_assert(ma);
5141
5142     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
5143         pa_log("auth-anonymous= expects a boolean argument.");
5144         return -1;
5145     }
5146
5147     enabled = TRUE;
5148     if (pa_modargs_get_value_boolean(ma, "auth-group-enable", &enabled) < 0) {
5149         pa_log("auth-group-enable= expects a boolean argument.");
5150         return -1;
5151     }
5152
5153     pa_xfree(o->auth_group);
5154     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
5155
5156 #ifndef HAVE_CREDS
5157     if (o->auth_group)
5158         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
5159 #endif
5160
5161     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
5162         pa_ip_acl *ipa;
5163
5164         if (!(ipa = pa_ip_acl_new(acl))) {
5165             pa_log("Failed to parse IP ACL '%s'", acl);
5166             return -1;
5167         }
5168
5169         if (o->auth_ip_acl)
5170             pa_ip_acl_free(o->auth_ip_acl);
5171
5172         o->auth_ip_acl = ipa;
5173     }
5174
5175     enabled = TRUE;
5176     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
5177         pa_log("auth-cookie-enabled= expects a boolean argument.");
5178         return -1;
5179     }
5180
5181     if (o->auth_cookie)
5182         pa_auth_cookie_unref(o->auth_cookie);
5183
5184     if (enabled) {
5185         const char *cn;
5186
5187         /* The new name for this is 'auth-cookie', for compat reasons
5188          * we check the old name too */
5189         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
5190             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
5191                 cn = PA_NATIVE_COOKIE_FILE;
5192
5193         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
5194             return -1;
5195
5196     } else
5197           o->auth_cookie = NULL;
5198
5199     return 0;
5200 }
5201
5202 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
5203     pa_native_connection_assert_ref(c);
5204
5205     return c->pstream;
5206 }
5207
5208 pa_client* pa_native_connection_get_client(pa_native_connection *c) {
5209     pa_native_connection_assert_ref(c);
5210
5211     return c->client;
5212 }