be a bit more verbose about max_request changes
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/timeval.h>
33 #include <pulse/version.h>
34 #include <pulse/utf8.h>
35 #include <pulse/util.h>
36 #include <pulse/xmalloc.h>
37
38 #include <pulsecore/native-common.h>
39 #include <pulsecore/packet.h>
40 #include <pulsecore/client.h>
41 #include <pulsecore/source-output.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/pstream.h>
44 #include <pulsecore/tagstruct.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream-util.h>
47 #include <pulsecore/authkey.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/core-subscribe.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/strlist.h>
53 #include <pulsecore/shared.h>
54 #include <pulsecore/sample-util.h>
55 #include <pulsecore/llist.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* Kick a client if it doesn't authenticate within this time */
64 #define AUTH_TIMEOUT 60
65
66 /* Don't accept more connection than this */
67 #define MAX_CONNECTIONS 64
68
69 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
70 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
71 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
72 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
73
74 struct pa_native_protocol;
75
76 typedef struct record_stream {
77     pa_msgobject parent;
78
79     pa_native_connection *connection;
80     uint32_t index;
81
82     pa_source_output *source_output;
83     pa_memblockq *memblockq;
84
85     pa_bool_t adjust_latency:1;
86     pa_bool_t early_requests:1;
87
88     pa_buffer_attr buffer_attr;
89     pa_usec_t source_latency;
90 } record_stream;
91
92 PA_DECLARE_CLASS(record_stream);
93 #define RECORD_STREAM(o) (record_stream_cast(o))
94 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
95
96 typedef struct output_stream {
97     pa_msgobject parent;
98 } output_stream;
99
100 PA_DECLARE_CLASS(output_stream);
101 #define OUTPUT_STREAM(o) (output_stream_cast(o))
102 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
103
104 typedef struct playback_stream {
105     output_stream parent;
106
107     pa_native_connection *connection;
108     uint32_t index;
109
110     pa_sink_input *sink_input;
111     pa_memblockq *memblockq;
112
113     pa_bool_t adjust_latency:1;
114     pa_bool_t early_requests:1;
115
116     pa_bool_t is_underrun:1;
117     pa_bool_t drain_request:1;
118     uint32_t drain_tag;
119     uint32_t syncid;
120
121     pa_atomic_t missing;
122     pa_usec_t sink_latency;
123     pa_buffer_attr buffer_attr;
124
125     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
126     int64_t read_index, write_index;
127     size_t render_memblockq_length;
128 } playback_stream;
129
130 PA_DECLARE_CLASS(playback_stream);
131 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
132 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
133
134 typedef struct upload_stream {
135     output_stream parent;
136
137     pa_native_connection *connection;
138     uint32_t index;
139
140     pa_memchunk memchunk;
141     size_t length;
142     char *name;
143     pa_sample_spec sample_spec;
144     pa_channel_map channel_map;
145     pa_proplist *proplist;
146 } upload_stream;
147
148 PA_DECLARE_CLASS(upload_stream);
149 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
150 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
151
152 struct pa_native_connection {
153     pa_msgobject parent;
154     pa_native_protocol *protocol;
155     pa_native_options *options;
156     pa_bool_t authorized:1;
157     pa_bool_t is_local:1;
158     uint32_t version;
159     pa_client *client;
160     pa_pstream *pstream;
161     pa_pdispatch *pdispatch;
162     pa_idxset *record_streams, *output_streams;
163     uint32_t rrobin_index;
164     pa_subscription *subscription;
165     pa_time_event *auth_timeout_event;
166 };
167
168 PA_DECLARE_CLASS(pa_native_connection);
169 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
170 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
171
172 struct pa_native_protocol {
173     PA_REFCNT_DECLARE;
174
175     pa_core *core;
176     pa_idxset *connections;
177
178     pa_strlist *servers;
179     pa_hook hooks[PA_NATIVE_HOOK_MAX];
180
181     pa_hashmap *extensions;
182 };
183
184 enum {
185     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
186     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
187     SINK_INPUT_MESSAGE_FLUSH,
188     SINK_INPUT_MESSAGE_TRIGGER,
189     SINK_INPUT_MESSAGE_SEEK,
190     SINK_INPUT_MESSAGE_PREBUF_FORCE,
191     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
192     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
193 };
194
195 enum {
196     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
197     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
198     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
199     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
200     PLAYBACK_STREAM_MESSAGE_STARTED,
201     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
202 };
203
204 enum {
205     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
206 };
207
208 enum {
209     CONNECTION_MESSAGE_RELEASE,
210     CONNECTION_MESSAGE_REVOKE
211 };
212
213 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
214 static void sink_input_kill_cb(pa_sink_input *i);
215 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
216 static void sink_input_moving_cb(pa_sink_input *i);
217 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
218 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
219 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
220 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
221
222 static void native_connection_send_memblock(pa_native_connection *c);
223 static void playback_stream_request_bytes(struct playback_stream*s);
224
225 static void source_output_kill_cb(pa_source_output *o);
226 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
227 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
228 static void source_output_moving_cb(pa_source_output *o);
229 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
230 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
231
232 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
233
234 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
235 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
236 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
237 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
238 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
239 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
240 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
241 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
242 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
243 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
244 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
245 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
246 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
247 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
248 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
249 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272
273 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
274     [PA_COMMAND_ERROR] = NULL,
275     [PA_COMMAND_TIMEOUT] = NULL,
276     [PA_COMMAND_REPLY] = NULL,
277     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
278     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
279     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
280     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
281     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
282     [PA_COMMAND_AUTH] = command_auth,
283     [PA_COMMAND_REQUEST] = NULL,
284     [PA_COMMAND_EXIT] = command_exit,
285     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
286     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
287     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
288     [PA_COMMAND_STAT] = command_stat,
289     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
290     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
291     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
292     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
293     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
294     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
295     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
296     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
297     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
298     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
299     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
300     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
301     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
302     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
303     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
304     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
305     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
306     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
307     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
308     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
309     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
310     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
311     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
312     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
313     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
314
315     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
316     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
317     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
318
319     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
320     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
321     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
322
323     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
324     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
325
326     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
327     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
328     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
329     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
330
331     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
332     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
333
334     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
335     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
336     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
337     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
338     [PA_COMMAND_KILL_CLIENT] = command_kill,
339     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
340     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
341     [PA_COMMAND_LOAD_MODULE] = command_load_module,
342     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
343
344     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
345     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
346     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
347     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
348
349     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
350     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
351
352     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
353     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
354
355     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
356     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
357
358     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
359     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
360     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
361
362     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
363     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
364     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
365
366     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
367
368     [PA_COMMAND_EXTENSION] = command_extension
369 };
370
371 /* structure management */
372
373 /* Called from main context */
374 static void upload_stream_unlink(upload_stream *s) {
375     pa_assert(s);
376
377     if (!s->connection)
378         return;
379
380     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
381     s->connection = NULL;
382     upload_stream_unref(s);
383 }
384
385 /* Called from main context */
386 static void upload_stream_free(pa_object *o) {
387     upload_stream *s = UPLOAD_STREAM(o);
388     pa_assert(s);
389
390     upload_stream_unlink(s);
391
392     pa_xfree(s->name);
393
394     if (s->proplist)
395         pa_proplist_free(s->proplist);
396
397     if (s->memchunk.memblock)
398         pa_memblock_unref(s->memchunk.memblock);
399
400     pa_xfree(s);
401 }
402
403 /* Called from main context */
404 static upload_stream* upload_stream_new(
405         pa_native_connection *c,
406         const pa_sample_spec *ss,
407         const pa_channel_map *map,
408         const char *name,
409         size_t length,
410         pa_proplist *p) {
411
412     upload_stream *s;
413
414     pa_assert(c);
415     pa_assert(ss);
416     pa_assert(name);
417     pa_assert(length > 0);
418     pa_assert(p);
419
420     s = pa_msgobject_new(upload_stream);
421     s->parent.parent.parent.free = upload_stream_free;
422     s->connection = c;
423     s->sample_spec = *ss;
424     s->channel_map = *map;
425     s->name = pa_xstrdup(name);
426     pa_memchunk_reset(&s->memchunk);
427     s->length = length;
428     s->proplist = pa_proplist_copy(p);
429     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
430
431     pa_idxset_put(c->output_streams, s, &s->index);
432
433     return s;
434 }
435
436 /* Called from main context */
437 static void record_stream_unlink(record_stream *s) {
438     pa_assert(s);
439
440     if (!s->connection)
441         return;
442
443     if (s->source_output) {
444         pa_source_output_unlink(s->source_output);
445         pa_source_output_unref(s->source_output);
446         s->source_output = NULL;
447     }
448
449     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
450     s->connection = NULL;
451     record_stream_unref(s);
452 }
453
454 /* Called from main context */
455 static void record_stream_free(pa_object *o) {
456     record_stream *s = RECORD_STREAM(o);
457     pa_assert(s);
458
459     record_stream_unlink(s);
460
461     pa_memblockq_free(s->memblockq);
462     pa_xfree(s);
463 }
464
465 /* Called from main context */
466 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
467     record_stream *s = RECORD_STREAM(o);
468     record_stream_assert_ref(s);
469
470     if (!s->connection)
471         return -1;
472
473     switch (code) {
474
475         case RECORD_STREAM_MESSAGE_POST_DATA:
476
477             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
478 /*                 pa_log_warn("Failed to push data into output queue."); */
479                 return -1;
480             }
481
482             if (!pa_pstream_is_pending(s->connection->pstream))
483                 native_connection_send_memblock(s->connection);
484
485             break;
486     }
487
488     return 0;
489 }
490
491 /* Called from main context */
492 static void fix_record_buffer_attr_pre(record_stream *s) {
493
494     size_t frame_size;
495     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
496
497     pa_assert(s);
498
499     /* This function will be called from the main thread, before as
500      * well as after the source output has been activated using
501      * pa_source_output_put()! That means it may not touch any
502      * ->thread_info data! */
503
504     frame_size = pa_frame_size(&s->source_output->sample_spec);
505
506     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
507         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
508     if (s->buffer_attr.maxlength <= 0)
509         s->buffer_attr.maxlength = (uint32_t) frame_size;
510
511     if (s->buffer_attr.fragsize == (uint32_t) -1)
512         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
513     if (s->buffer_attr.fragsize <= 0)
514         s->buffer_attr.fragsize = (uint32_t) frame_size;
515
516     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
517
518     if (s->early_requests) {
519
520         /* In early request mode we need to emulate the classic
521          * fragment-based playback model. We do this setting the source
522          * latency to the fragment size. */
523
524         source_usec = fragsize_usec;
525
526     } else if (s->adjust_latency) {
527
528         /* So, the user asked us to adjust the latency according to
529          * what the source can provide. Half the latency will be
530          * spent on the hw buffer, half of it in the async buffer
531          * queue we maintain for each client. */
532
533         source_usec = fragsize_usec/2;
534
535     } else {
536
537         /* Ok, the user didn't ask us to adjust the latency, hence we
538          * don't */
539
540         source_usec = 0;
541     }
542
543     if (source_usec > 0)
544         s->source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
545     else
546         s->source_latency = 0;
547
548     if (s->early_requests) {
549
550         /* Ok, we didn't necessarily get what we were asking for, so
551          * let's tell the user */
552
553         fragsize_usec = s->source_latency;
554
555     } else if (s->adjust_latency) {
556
557         /* Now subtract what we actually got */
558
559         if (fragsize_usec >= s->source_latency*2)
560             fragsize_usec -= s->source_latency;
561         else
562             fragsize_usec = s->source_latency;
563     }
564
565     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
566         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
567
568         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
569
570     if (s->buffer_attr.fragsize <= 0)
571         s->buffer_attr.fragsize = (uint32_t) frame_size;
572 }
573
574 /* Called from main context */
575 static void fix_record_buffer_attr_post(record_stream *s) {
576     size_t base;
577
578     pa_assert(s);
579
580     /* This function will be called from the main thread, before as
581      * well as after the source output has been activated using
582      * pa_source_output_put()! That means it may not touch and
583      * ->thread_info data! */
584
585     base = pa_frame_size(&s->source_output->sample_spec);
586
587     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
588     if (s->buffer_attr.fragsize <= 0)
589         s->buffer_attr.fragsize = base;
590
591     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
592         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
593 }
594
595 /* Called from main context */
596 static record_stream* record_stream_new(
597         pa_native_connection *c,
598         pa_source *source,
599         pa_sample_spec *ss,
600         pa_channel_map *map,
601         pa_bool_t peak_detect,
602         pa_buffer_attr *attr,
603         pa_source_output_flags_t flags,
604         pa_proplist *p,
605         pa_bool_t adjust_latency,
606         pa_sink_input *direct_on_input,
607         pa_bool_t early_requests,
608         int *ret) {
609
610     record_stream *s;
611     pa_source_output *source_output = NULL;
612     size_t base;
613     pa_source_output_new_data data;
614
615     pa_assert(c);
616     pa_assert(ss);
617     pa_assert(p);
618     pa_assert(ret);
619
620     pa_source_output_new_data_init(&data);
621
622     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
623     data.driver = __FILE__;
624     data.module = c->options->module;
625     data.client = c->client;
626     data.source = source;
627     data.direct_on_input = direct_on_input;
628     pa_source_output_new_data_set_sample_spec(&data, ss);
629     pa_source_output_new_data_set_channel_map(&data, map);
630     if (peak_detect)
631         data.resample_method = PA_RESAMPLER_PEAKS;
632
633     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
634
635     pa_source_output_new_data_done(&data);
636
637     if (!source_output)
638         return NULL;
639
640     s = pa_msgobject_new(record_stream);
641     s->parent.parent.free = record_stream_free;
642     s->parent.process_msg = record_stream_process_msg;
643     s->connection = c;
644     s->source_output = source_output;
645     s->buffer_attr = *attr;
646     s->adjust_latency = adjust_latency;
647     s->early_requests = early_requests;
648
649     s->source_output->push = source_output_push_cb;
650     s->source_output->kill = source_output_kill_cb;
651     s->source_output->get_latency = source_output_get_latency_cb;
652     s->source_output->moving = source_output_moving_cb;
653     s->source_output->suspend = source_output_suspend_cb;
654     s->source_output->send_event = source_output_send_event_cb;
655     s->source_output->userdata = s;
656
657     fix_record_buffer_attr_pre(s);
658
659     s->memblockq = pa_memblockq_new(
660             0,
661             s->buffer_attr.maxlength,
662             0,
663             base = pa_frame_size(&source_output->sample_spec),
664             1,
665             0,
666             0,
667             NULL);
668
669     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
670     fix_record_buffer_attr_post(s);
671
672     *ss = s->source_output->sample_spec;
673     *map = s->source_output->channel_map;
674
675     pa_idxset_put(c->record_streams, s, &s->index);
676
677     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
678                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->source_latency) / PA_USEC_PER_MSEC,
679                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
680                 (double) s->source_latency / PA_USEC_PER_MSEC);
681
682     pa_source_output_put(s->source_output);
683     return s;
684 }
685
686 /* Called from main context */
687 static void record_stream_send_killed(record_stream *r) {
688     pa_tagstruct *t;
689     record_stream_assert_ref(r);
690
691     t = pa_tagstruct_new(NULL, 0);
692     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
693     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
694     pa_tagstruct_putu32(t, r->index);
695     pa_pstream_send_tagstruct(r->connection->pstream, t);
696 }
697
698 /* Called from main context */
699 static void playback_stream_unlink(playback_stream *s) {
700     pa_assert(s);
701
702     if (!s->connection)
703         return;
704
705     if (s->sink_input) {
706         pa_sink_input_unlink(s->sink_input);
707         pa_sink_input_unref(s->sink_input);
708         s->sink_input = NULL;
709     }
710
711     if (s->drain_request)
712         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
713
714     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
715     s->connection = NULL;
716     playback_stream_unref(s);
717 }
718
719 /* Called from main context */
720 static void playback_stream_free(pa_object* o) {
721     playback_stream *s = PLAYBACK_STREAM(o);
722     pa_assert(s);
723
724     playback_stream_unlink(s);
725
726     pa_memblockq_free(s->memblockq);
727     pa_xfree(s);
728 }
729
730 /* Called from main context */
731 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
732     playback_stream *s = PLAYBACK_STREAM(o);
733     playback_stream_assert_ref(s);
734
735     if (!s->connection)
736         return -1;
737
738     switch (code) {
739         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
740             pa_tagstruct *t;
741             uint32_t l = 0;
742
743             for (;;) {
744                 if ((l = (uint32_t) pa_atomic_load(&s->missing)) <= 0)
745                     break;
746
747                 if (pa_atomic_cmpxchg(&s->missing, (int) l, 0))
748                     break;
749             }
750
751             if (l <= 0)
752                 break;
753
754             t = pa_tagstruct_new(NULL, 0);
755             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
756             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
757             pa_tagstruct_putu32(t, s->index);
758             pa_tagstruct_putu32(t, l);
759             pa_pstream_send_tagstruct(s->connection->pstream, t);
760
761 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
762             break;
763         }
764
765         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
766             pa_tagstruct *t;
767
768 /*             pa_log("signalling underflow"); */
769
770             /* Report that we're empty */
771             t = pa_tagstruct_new(NULL, 0);
772             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
773             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
774             pa_tagstruct_putu32(t, s->index);
775             pa_pstream_send_tagstruct(s->connection->pstream, t);
776             break;
777         }
778
779         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
780             pa_tagstruct *t;
781
782             /* Notify the user we're overflowed*/
783             t = pa_tagstruct_new(NULL, 0);
784             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
785             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
786             pa_tagstruct_putu32(t, s->index);
787             pa_pstream_send_tagstruct(s->connection->pstream, t);
788             break;
789         }
790
791         case PLAYBACK_STREAM_MESSAGE_STARTED:
792
793             if (s->connection->version >= 13) {
794                 pa_tagstruct *t;
795
796                 /* Notify the user we started playback */
797                 t = pa_tagstruct_new(NULL, 0);
798                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
799                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
800                 pa_tagstruct_putu32(t, s->index);
801                 pa_pstream_send_tagstruct(s->connection->pstream, t);
802             }
803
804             break;
805
806         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
807             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
808             break;
809
810         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
811             pa_tagstruct *t;
812
813             s->buffer_attr.tlength = (uint32_t) offset;
814
815             t = pa_tagstruct_new(NULL, 0);
816             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
817             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
818             pa_tagstruct_putu32(t, s->index);
819             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
820             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
821             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
822             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
823             pa_tagstruct_put_usec(t, s->sink_latency);
824             pa_pstream_send_tagstruct(s->connection->pstream, t);
825
826             break;
827         }
828     }
829
830     return 0;
831 }
832
833 /* Called from main context */
834 static void fix_playback_buffer_attr(playback_stream *s) {
835     size_t frame_size;
836     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
837
838     pa_assert(s);
839
840     /* This function will be called from the main thread, before as
841      * well as after the sink input has been activated using
842      * pa_sink_input_put()! That means it may not touch any
843      * ->thread_info data, such as the memblockq! */
844
845     frame_size = pa_frame_size(&s->sink_input->sample_spec);
846
847     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
848         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
849     if (s->buffer_attr.maxlength <= 0)
850         s->buffer_attr.maxlength = (uint32_t) frame_size;
851
852     if (s->buffer_attr.tlength == (uint32_t) -1)
853         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
854     if (s->buffer_attr.tlength <= 0)
855         s->buffer_attr.tlength = (uint32_t) frame_size;
856
857     if (s->buffer_attr.minreq == (uint32_t) -1)
858         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
859     if (s->buffer_attr.minreq <= 0)
860         s->buffer_attr.minreq = (uint32_t) frame_size;
861
862     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
863         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
864
865     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
866     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
867
868     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
869                 (double) tlength_usec / PA_USEC_PER_MSEC,
870                 (double) minreq_usec / PA_USEC_PER_MSEC);
871
872     if (s->early_requests) {
873
874         /* In early request mode we need to emulate the classic
875          * fragment-based playback model. We do this setting the sink
876          * latency to the fragment size. */
877
878         sink_usec = minreq_usec;
879         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
880
881     } else if (s->adjust_latency) {
882
883         /* So, the user asked us to adjust the latency of the stream
884          * buffer according to the what the sink can provide. The
885          * tlength passed in shall be the overall latency. Roughly
886          * half the latency will be spent on the hw buffer, the other
887          * half of it in the async buffer queue we maintain for each
888          * client. In between we'll have a safety space of size
889          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
890          * empty and needs to be filled, then our buffer must have
891          * enough data to fulfill this request immediatly and thus
892          * have at least the same tlength as the size of the hw
893          * buffer. It additionally needs space for 2 times minreq
894          * because if the buffer ran empty and a partial fillup
895          * happens immediately on the next iteration we need to be
896          * able to fulfill it and give the application also minreq
897          * time to fill it up again for the next request Makes 2 times
898          * minreq in plus.. */
899
900         if (tlength_usec > minreq_usec*2)
901             sink_usec = (tlength_usec - minreq_usec*2)/2;
902         else
903             sink_usec = 0;
904
905         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
906
907     } else {
908
909         /* Ok, the user didn't ask us to adjust the latency, but we
910          * still need to make sure that the parameters from the user
911          * do make sense. */
912
913         if (tlength_usec > minreq_usec*2)
914             sink_usec = (tlength_usec - minreq_usec*2);
915         else
916             sink_usec = 0;
917
918         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
919     }
920
921     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
922
923     if (s->early_requests) {
924
925         /* Ok, we didn't necessarily get what we were asking for, so
926          * let's tell the user */
927
928         minreq_usec = s->sink_latency;
929
930     } else if (s->adjust_latency) {
931
932         /* Ok, we didn't necessarily get what we were asking for, so
933          * let's subtract from what we asked for for the remaining
934          * buffer space */
935
936         if (tlength_usec >= s->sink_latency)
937             tlength_usec -= s->sink_latency;
938     }
939
940     /* FIXME: This is actually larger than necessary, since not all of
941      * the sink latency is actually rewritable. */
942     if (tlength_usec < s->sink_latency + 2*minreq_usec)
943         tlength_usec = s->sink_latency + 2*minreq_usec;
944
945     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
946         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
947         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
948
949     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
950         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
951         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
952
953     if (s->buffer_attr.minreq <= 0) {
954         s->buffer_attr.minreq = (uint32_t) frame_size;
955         s->buffer_attr.tlength += (uint32_t) frame_size*2;
956     }
957
958     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
959         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
960
961     if (s->buffer_attr.prebuf == (uint32_t) -1 || s->buffer_attr.prebuf > s->buffer_attr.tlength)
962         s->buffer_attr.prebuf = s->buffer_attr.tlength;
963 }
964
965 /* Called from main context */
966 static playback_stream* playback_stream_new(
967         pa_native_connection *c,
968         pa_sink *sink,
969         pa_sample_spec *ss,
970         pa_channel_map *map,
971         pa_buffer_attr *a,
972         pa_cvolume *volume,
973         pa_bool_t muted,
974         pa_bool_t muted_set,
975         uint32_t syncid,
976         uint32_t *missing,
977         pa_sink_input_flags_t flags,
978         pa_proplist *p,
979         pa_bool_t adjust_latency,
980         pa_bool_t early_requests,
981         int *ret) {
982
983     playback_stream *s, *ssync;
984     pa_sink_input *sink_input = NULL;
985     pa_memchunk silence;
986     uint32_t idx;
987     int64_t start_index;
988     pa_sink_input_new_data data;
989
990     pa_assert(c);
991     pa_assert(ss);
992     pa_assert(missing);
993     pa_assert(p);
994     pa_assert(ret);
995
996     /* Find syncid group */
997     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
998
999         if (!playback_stream_isinstance(ssync))
1000             continue;
1001
1002         if (ssync->syncid == syncid)
1003             break;
1004     }
1005
1006     /* Synced streams must connect to the same sink */
1007     if (ssync) {
1008
1009         if (!sink)
1010             sink = ssync->sink_input->sink;
1011         else if (sink != ssync->sink_input->sink) {
1012             *ret = PA_ERR_INVALID;
1013             return NULL;
1014         }
1015     }
1016
1017     pa_sink_input_new_data_init(&data);
1018
1019     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1020     data.driver = __FILE__;
1021     data.module = c->options->module;
1022     data.client = c->client;
1023     data.sink = sink;
1024     pa_sink_input_new_data_set_sample_spec(&data, ss);
1025     pa_sink_input_new_data_set_channel_map(&data, map);
1026     if (volume)
1027         pa_sink_input_new_data_set_volume(&data, volume);
1028     if (muted_set)
1029         pa_sink_input_new_data_set_muted(&data, muted);
1030     data.sync_base = ssync ? ssync->sink_input : NULL;
1031
1032     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1033
1034     pa_sink_input_new_data_done(&data);
1035
1036     if (!sink_input)
1037         return NULL;
1038
1039     s = pa_msgobject_new(playback_stream);
1040     s->parent.parent.parent.free = playback_stream_free;
1041     s->parent.parent.process_msg = playback_stream_process_msg;
1042     s->connection = c;
1043     s->syncid = syncid;
1044     s->sink_input = sink_input;
1045     s->is_underrun = TRUE;
1046     s->drain_request = FALSE;
1047     pa_atomic_store(&s->missing, 0);
1048     s->buffer_attr = *a;
1049     s->adjust_latency = adjust_latency;
1050     s->early_requests = early_requests;
1051
1052     s->sink_input->parent.process_msg = sink_input_process_msg;
1053     s->sink_input->pop = sink_input_pop_cb;
1054     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1055     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1056     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1057     s->sink_input->kill = sink_input_kill_cb;
1058     s->sink_input->moving = sink_input_moving_cb;
1059     s->sink_input->suspend = sink_input_suspend_cb;
1060     s->sink_input->send_event = sink_input_send_event_cb;
1061     s->sink_input->userdata = s;
1062
1063     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1064
1065     fix_playback_buffer_attr(s);
1066
1067     pa_sink_input_get_silence(sink_input, &silence);
1068     s->memblockq = pa_memblockq_new(
1069             start_index,
1070             s->buffer_attr.maxlength,
1071             s->buffer_attr.tlength,
1072             pa_frame_size(&sink_input->sample_spec),
1073             s->buffer_attr.prebuf,
1074             s->buffer_attr.minreq,
1075             0,
1076             &silence);
1077     pa_memblock_unref(silence.memblock);
1078
1079     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1080
1081     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1082
1083     *ss = s->sink_input->sample_spec;
1084     *map = s->sink_input->channel_map;
1085
1086     pa_idxset_put(c->output_streams, s, &s->index);
1087
1088     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1089                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->sink_latency) / PA_USEC_PER_MSEC,
1090                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1091                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1092                 (double) s->sink_latency / PA_USEC_PER_MSEC);
1093
1094     pa_sink_input_put(s->sink_input);
1095     return s;
1096 }
1097
1098 /* Called from IO context */
1099 static void playback_stream_request_bytes(playback_stream *s) {
1100     size_t m, previous_missing, minreq;
1101
1102     playback_stream_assert_ref(s);
1103
1104     m = pa_memblockq_pop_missing(s->memblockq);
1105
1106     if (m <= 0)
1107         return;
1108
1109 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1110
1111     previous_missing = (size_t) pa_atomic_add(&s->missing, (int) m);
1112     minreq = pa_memblockq_get_minreq(s->memblockq);
1113
1114     if (pa_memblockq_prebuf_active(s->memblockq) ||
1115         (previous_missing < minreq && previous_missing+m >= minreq))
1116         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1117 }
1118
1119
1120 /* Called from main context */
1121 static void playback_stream_send_killed(playback_stream *p) {
1122     pa_tagstruct *t;
1123     playback_stream_assert_ref(p);
1124
1125     t = pa_tagstruct_new(NULL, 0);
1126     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1127     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1128     pa_tagstruct_putu32(t, p->index);
1129     pa_pstream_send_tagstruct(p->connection->pstream, t);
1130 }
1131
1132 /* Called from main context */
1133 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1134     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1135     pa_native_connection_assert_ref(c);
1136
1137     if (!c->protocol)
1138         return -1;
1139
1140     switch (code) {
1141
1142         case CONNECTION_MESSAGE_REVOKE:
1143             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1144             break;
1145
1146         case CONNECTION_MESSAGE_RELEASE:
1147             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1148             break;
1149     }
1150
1151     return 0;
1152 }
1153
1154 /* Called from main context */
1155 static void native_connection_unlink(pa_native_connection *c) {
1156     record_stream *r;
1157     output_stream *o;
1158
1159     pa_assert(c);
1160
1161     if (!c->protocol)
1162         return;
1163
1164     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1165
1166     if (c->options)
1167         pa_native_options_unref(c->options);
1168
1169     while ((r = pa_idxset_first(c->record_streams, NULL)))
1170         record_stream_unlink(r);
1171
1172     while ((o = pa_idxset_first(c->output_streams, NULL)))
1173         if (playback_stream_isinstance(o))
1174             playback_stream_unlink(PLAYBACK_STREAM(o));
1175         else
1176             upload_stream_unlink(UPLOAD_STREAM(o));
1177
1178     if (c->subscription)
1179         pa_subscription_free(c->subscription);
1180
1181     if (c->pstream)
1182         pa_pstream_unlink(c->pstream);
1183
1184     if (c->auth_timeout_event) {
1185         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1186         c->auth_timeout_event = NULL;
1187     }
1188
1189     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1190     c->protocol = NULL;
1191     pa_native_connection_unref(c);
1192 }
1193
1194 /* Called from main context */
1195 static void native_connection_free(pa_object *o) {
1196     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1197
1198     pa_assert(c);
1199
1200     native_connection_unlink(c);
1201
1202     pa_idxset_free(c->record_streams, NULL, NULL);
1203     pa_idxset_free(c->output_streams, NULL, NULL);
1204
1205     pa_pdispatch_unref(c->pdispatch);
1206     pa_pstream_unref(c->pstream);
1207     pa_client_free(c->client);
1208
1209     pa_xfree(c);
1210 }
1211
1212 /* Called from main context */
1213 static void native_connection_send_memblock(pa_native_connection *c) {
1214     uint32_t start;
1215     record_stream *r;
1216
1217     start = PA_IDXSET_INVALID;
1218     for (;;) {
1219         pa_memchunk chunk;
1220
1221         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1222             return;
1223
1224         if (start == PA_IDXSET_INVALID)
1225             start = c->rrobin_index;
1226         else if (start == c->rrobin_index)
1227             return;
1228
1229         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1230             pa_memchunk schunk = chunk;
1231
1232             if (schunk.length > r->buffer_attr.fragsize)
1233                 schunk.length = r->buffer_attr.fragsize;
1234
1235             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1236
1237             pa_memblockq_drop(r->memblockq, schunk.length);
1238             pa_memblock_unref(schunk.memblock);
1239
1240             return;
1241         }
1242     }
1243 }
1244
1245 /*** sink input callbacks ***/
1246
1247 /* Called from thread context */
1248 static void handle_seek(playback_stream *s, int64_t indexw) {
1249     playback_stream_assert_ref(s);
1250
1251 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1252
1253     if (s->sink_input->thread_info.underrun_for > 0) {
1254
1255 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1256
1257         if (pa_memblockq_is_readable(s->memblockq)) {
1258
1259             /* We just ended an underrun, let's ask the sink
1260              * for a complete rewind rewrite */
1261
1262             pa_log_debug("Requesting rewind due to end of underrun.");
1263             pa_sink_input_request_rewind(s->sink_input,
1264                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1265                                          FALSE, TRUE, FALSE);
1266         }
1267
1268     } else {
1269         int64_t indexr;
1270
1271         indexr = pa_memblockq_get_read_index(s->memblockq);
1272
1273         if (indexw < indexr) {
1274             /* OK, the sink already asked for this data, so
1275              * let's have it usk us again */
1276
1277             pa_log_debug("Requesting rewind due to rewrite.");
1278             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1279         }
1280     }
1281
1282     playback_stream_request_bytes(s);
1283 }
1284
1285 /* Called from thread context */
1286 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1287     pa_sink_input *i = PA_SINK_INPUT(o);
1288     playback_stream *s;
1289
1290     pa_sink_input_assert_ref(i);
1291     s = PLAYBACK_STREAM(i->userdata);
1292     playback_stream_assert_ref(s);
1293
1294     switch (code) {
1295
1296         case SINK_INPUT_MESSAGE_SEEK: {
1297             int64_t windex;
1298
1299             windex = pa_memblockq_get_write_index(s->memblockq);
1300             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
1301
1302             handle_seek(s, windex);
1303             return 0;
1304         }
1305
1306         case SINK_INPUT_MESSAGE_POST_DATA: {
1307             int64_t windex;
1308
1309             pa_assert(chunk);
1310
1311             windex = pa_memblockq_get_write_index(s->memblockq);
1312
1313 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1314
1315             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1316                 pa_log_warn("Failed to push data into queue");
1317                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1318                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE);
1319             }
1320
1321             handle_seek(s, windex);
1322
1323 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1324
1325             return 0;
1326         }
1327
1328         case SINK_INPUT_MESSAGE_DRAIN:
1329         case SINK_INPUT_MESSAGE_FLUSH:
1330         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1331         case SINK_INPUT_MESSAGE_TRIGGER: {
1332
1333             int64_t windex;
1334             pa_sink_input *isync;
1335             void (*func)(pa_memblockq *bq);
1336
1337             switch  (code) {
1338                 case SINK_INPUT_MESSAGE_FLUSH:
1339                     func = pa_memblockq_flush_write;
1340                     break;
1341
1342                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1343                     func = pa_memblockq_prebuf_force;
1344                     break;
1345
1346                 case SINK_INPUT_MESSAGE_DRAIN:
1347                 case SINK_INPUT_MESSAGE_TRIGGER:
1348                     func = pa_memblockq_prebuf_disable;
1349                     break;
1350
1351                 default:
1352                     pa_assert_not_reached();
1353             }
1354
1355             windex = pa_memblockq_get_write_index(s->memblockq);
1356             func(s->memblockq);
1357             handle_seek(s, windex);
1358
1359             /* Do the same for all other members in the sync group */
1360             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1361                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1362                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1363                 func(ssync->memblockq);
1364                 handle_seek(ssync, windex);
1365             }
1366
1367             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1368                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1369                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1370                 func(ssync->memblockq);
1371                 handle_seek(ssync, windex);
1372             }
1373
1374             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1375                 if (!pa_memblockq_is_readable(s->memblockq))
1376                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1377                 else {
1378                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1379                     s->drain_request = TRUE;
1380                 }
1381             }
1382
1383             return 0;
1384         }
1385
1386         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1387
1388             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1389             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1390             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1391             return 0;
1392
1393         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1394             int64_t windex;
1395
1396             windex = pa_memblockq_get_write_index(s->memblockq);
1397
1398             pa_memblockq_prebuf_force(s->memblockq);
1399
1400             handle_seek(s, windex);
1401
1402             /* Fall through to the default handler */
1403             break;
1404         }
1405
1406         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1407             pa_usec_t *r = userdata;
1408
1409             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1410
1411             /* Fall through, the default handler will add in the extra
1412              * latency added by the resampler */
1413             break;
1414         }
1415
1416         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1417             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1418             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1419             return 0;
1420         }
1421     }
1422
1423     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1424 }
1425
1426 /* Called from thread context */
1427 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1428     playback_stream *s;
1429
1430     pa_sink_input_assert_ref(i);
1431     s = PLAYBACK_STREAM(i->userdata);
1432     playback_stream_assert_ref(s);
1433     pa_assert(chunk);
1434
1435 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1436
1437     if (pa_memblockq_is_readable(s->memblockq))
1438         s->is_underrun = FALSE;
1439     else {
1440         pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1441
1442         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1443             s->drain_request = FALSE;
1444             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1445         } else if (!s->is_underrun)
1446             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1447
1448         s->is_underrun = TRUE;
1449
1450         playback_stream_request_bytes(s);
1451     }
1452
1453     /* This call will not fail with prebuf=0, hence we check for
1454        underrun explicitly above */
1455     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1456         return -1;
1457
1458     chunk->length = PA_MIN(nbytes, chunk->length);
1459
1460     if (i->thread_info.underrun_for > 0)
1461         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1462
1463     pa_memblockq_drop(s->memblockq, chunk->length);
1464     playback_stream_request_bytes(s);
1465
1466     return 0;
1467 }
1468
1469 /* Called from thread context */
1470 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1471     playback_stream *s;
1472
1473     pa_sink_input_assert_ref(i);
1474     s = PLAYBACK_STREAM(i->userdata);
1475     playback_stream_assert_ref(s);
1476
1477     /* If we are in an underrun, then we don't rewind */
1478     if (i->thread_info.underrun_for > 0)
1479         return;
1480
1481     pa_memblockq_rewind(s->memblockq, nbytes);
1482 }
1483
1484 /* Called from thread context */
1485 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1486     playback_stream *s;
1487
1488     pa_sink_input_assert_ref(i);
1489     s = PLAYBACK_STREAM(i->userdata);
1490     playback_stream_assert_ref(s);
1491
1492     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1493 }
1494
1495 /* Called from thread context */
1496 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1497     playback_stream *s;
1498     size_t new_tlength, old_tlength;
1499
1500     pa_sink_input_assert_ref(i);
1501     s = PLAYBACK_STREAM(i->userdata);
1502     playback_stream_assert_ref(s);
1503
1504     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1505     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1506
1507     if (old_tlength < new_tlength) {
1508         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1509         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1510         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1511
1512         if (new_tlength == old_tlength)
1513             pa_log_debug("Failed to increase tlength");
1514         else {
1515             pa_log_debug("Notifying client about increased tlength");
1516             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1517         }
1518     }
1519 }
1520
1521 /* Called from main context */
1522 static void sink_input_kill_cb(pa_sink_input *i) {
1523     playback_stream *s;
1524
1525     pa_sink_input_assert_ref(i);
1526     s = PLAYBACK_STREAM(i->userdata);
1527     playback_stream_assert_ref(s);
1528
1529     playback_stream_send_killed(s);
1530     playback_stream_unlink(s);
1531 }
1532
1533 /* Called from main context */
1534 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1535     playback_stream *s;
1536     pa_tagstruct *t;
1537
1538     pa_sink_input_assert_ref(i);
1539     s = PLAYBACK_STREAM(i->userdata);
1540     playback_stream_assert_ref(s);
1541
1542     if (s->connection->version < 15)
1543       return;
1544
1545     t = pa_tagstruct_new(NULL, 0);
1546     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1547     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1548     pa_tagstruct_putu32(t, s->index);
1549     pa_tagstruct_puts(t, event);
1550     pa_tagstruct_put_proplist(t, pl);
1551     pa_pstream_send_tagstruct(s->connection->pstream, t);
1552 }
1553
1554 /* Called from main context */
1555 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1556     playback_stream *s;
1557     pa_tagstruct *t;
1558
1559     pa_sink_input_assert_ref(i);
1560     s = PLAYBACK_STREAM(i->userdata);
1561     playback_stream_assert_ref(s);
1562
1563     if (s->connection->version < 12)
1564       return;
1565
1566     t = pa_tagstruct_new(NULL, 0);
1567     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1568     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1569     pa_tagstruct_putu32(t, s->index);
1570     pa_tagstruct_put_boolean(t, suspend);
1571     pa_pstream_send_tagstruct(s->connection->pstream, t);
1572 }
1573
1574 /* Called from main context */
1575 static void sink_input_moving_cb(pa_sink_input *i) {
1576     playback_stream *s;
1577     pa_tagstruct *t;
1578
1579     pa_sink_input_assert_ref(i);
1580     s = PLAYBACK_STREAM(i->userdata);
1581     playback_stream_assert_ref(s);
1582
1583     fix_playback_buffer_attr(s);
1584     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1585     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1586
1587     if (s->connection->version < 12)
1588       return;
1589
1590     t = pa_tagstruct_new(NULL, 0);
1591     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1592     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1593     pa_tagstruct_putu32(t, s->index);
1594     pa_tagstruct_putu32(t, i->sink->index);
1595     pa_tagstruct_puts(t, i->sink->name);
1596     pa_tagstruct_put_boolean(t, pa_sink_get_state(i->sink) == PA_SINK_SUSPENDED);
1597
1598     if (s->connection->version >= 13) {
1599         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1600         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1601         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1602         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1603         pa_tagstruct_put_usec(t, s->sink_latency);
1604     }
1605
1606     pa_pstream_send_tagstruct(s->connection->pstream, t);
1607 }
1608
1609 /*** source_output callbacks ***/
1610
1611 /* Called from thread context */
1612 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1613     record_stream *s;
1614
1615     pa_source_output_assert_ref(o);
1616     s = RECORD_STREAM(o->userdata);
1617     record_stream_assert_ref(s);
1618     pa_assert(chunk);
1619
1620     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1621 }
1622
1623 static void source_output_kill_cb(pa_source_output *o) {
1624     record_stream *s;
1625
1626     pa_source_output_assert_ref(o);
1627     s = RECORD_STREAM(o->userdata);
1628     record_stream_assert_ref(s);
1629
1630     record_stream_send_killed(s);
1631     record_stream_unlink(s);
1632 }
1633
1634 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1635     record_stream *s;
1636
1637     pa_source_output_assert_ref(o);
1638     s = RECORD_STREAM(o->userdata);
1639     record_stream_assert_ref(s);
1640
1641     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1642
1643     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1644 }
1645
1646 /* Called from main context */
1647 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1648     record_stream *s;
1649     pa_tagstruct *t;
1650
1651     pa_source_output_assert_ref(o);
1652     s = RECORD_STREAM(o->userdata);
1653     record_stream_assert_ref(s);
1654
1655     if (s->connection->version < 15)
1656       return;
1657
1658     t = pa_tagstruct_new(NULL, 0);
1659     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1660     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1661     pa_tagstruct_putu32(t, s->index);
1662     pa_tagstruct_puts(t, event);
1663     pa_tagstruct_put_proplist(t, pl);
1664     pa_pstream_send_tagstruct(s->connection->pstream, t);
1665 }
1666
1667 /* Called from main context */
1668 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1669     record_stream *s;
1670     pa_tagstruct *t;
1671
1672     pa_source_output_assert_ref(o);
1673     s = RECORD_STREAM(o->userdata);
1674     record_stream_assert_ref(s);
1675
1676     if (s->connection->version < 12)
1677       return;
1678
1679     t = pa_tagstruct_new(NULL, 0);
1680     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1681     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1682     pa_tagstruct_putu32(t, s->index);
1683     pa_tagstruct_put_boolean(t, suspend);
1684     pa_pstream_send_tagstruct(s->connection->pstream, t);
1685 }
1686
1687 /* Called from main context */
1688 static void source_output_moving_cb(pa_source_output *o) {
1689     record_stream *s;
1690     pa_tagstruct *t;
1691
1692     pa_source_output_assert_ref(o);
1693     s = RECORD_STREAM(o->userdata);
1694     record_stream_assert_ref(s);
1695
1696     fix_record_buffer_attr_pre(s);
1697     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1698     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1699     fix_record_buffer_attr_post(s);
1700
1701     if (s->connection->version < 12)
1702       return;
1703
1704     t = pa_tagstruct_new(NULL, 0);
1705     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1706     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1707     pa_tagstruct_putu32(t, s->index);
1708     pa_tagstruct_putu32(t, o->source->index);
1709     pa_tagstruct_puts(t, o->source->name);
1710     pa_tagstruct_put_boolean(t, pa_source_get_state(o->source) == PA_SOURCE_SUSPENDED);
1711
1712     if (s->connection->version >= 13) {
1713         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1714         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1715         pa_tagstruct_put_usec(t, s->source_latency);
1716     }
1717
1718     pa_pstream_send_tagstruct(s->connection->pstream, t);
1719 }
1720
1721 /*** pdispatch callbacks ***/
1722
1723 static void protocol_error(pa_native_connection *c) {
1724     pa_log("protocol error, kicking client");
1725     native_connection_unlink(c);
1726 }
1727
1728 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1729 if (!(expression)) { \
1730     pa_pstream_send_error((pstream), (tag), (error)); \
1731     return; \
1732 } \
1733 } while(0);
1734
1735 static pa_tagstruct *reply_new(uint32_t tag) {
1736     pa_tagstruct *reply;
1737
1738     reply = pa_tagstruct_new(NULL, 0);
1739     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1740     pa_tagstruct_putu32(reply, tag);
1741     return reply;
1742 }
1743
1744 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1745     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1746     playback_stream *s;
1747     uint32_t sink_index, syncid, missing;
1748     pa_buffer_attr attr;
1749     const char *name = NULL, *sink_name;
1750     pa_sample_spec ss;
1751     pa_channel_map map;
1752     pa_tagstruct *reply;
1753     pa_sink *sink = NULL;
1754     pa_cvolume volume;
1755     pa_bool_t
1756         corked = FALSE,
1757         no_remap = FALSE,
1758         no_remix = FALSE,
1759         fix_format = FALSE,
1760         fix_rate = FALSE,
1761         fix_channels = FALSE,
1762         no_move = FALSE,
1763         variable_rate = FALSE,
1764         muted = FALSE,
1765         adjust_latency = FALSE,
1766         early_requests = FALSE,
1767         dont_inhibit_auto_suspend = FALSE,
1768         muted_set = FALSE,
1769         fail_on_suspend = FALSE;
1770     pa_sink_input_flags_t flags = 0;
1771     pa_proplist *p;
1772     pa_bool_t volume_set = TRUE;
1773     int ret = PA_ERR_INVALID;
1774
1775     pa_native_connection_assert_ref(c);
1776     pa_assert(t);
1777     memset(&attr, 0, sizeof(attr));
1778
1779     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1780         pa_tagstruct_get(
1781                 t,
1782                 PA_TAG_SAMPLE_SPEC, &ss,
1783                 PA_TAG_CHANNEL_MAP, &map,
1784                 PA_TAG_U32, &sink_index,
1785                 PA_TAG_STRING, &sink_name,
1786                 PA_TAG_U32, &attr.maxlength,
1787                 PA_TAG_BOOLEAN, &corked,
1788                 PA_TAG_U32, &attr.tlength,
1789                 PA_TAG_U32, &attr.prebuf,
1790                 PA_TAG_U32, &attr.minreq,
1791                 PA_TAG_U32, &syncid,
1792                 PA_TAG_CVOLUME, &volume,
1793                 PA_TAG_INVALID) < 0) {
1794
1795         protocol_error(c);
1796         return;
1797     }
1798
1799     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1800     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1801     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1802     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1803     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1804     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1805     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1806     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1807
1808     p = pa_proplist_new();
1809
1810     if (name)
1811         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1812
1813     if (c->version >= 12)  {
1814         /* Since 0.9.8 the user can ask for a couple of additional flags */
1815
1816         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1817             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1818             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1819             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1820             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1821             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1822             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1823
1824             protocol_error(c);
1825             pa_proplist_free(p);
1826             return;
1827         }
1828     }
1829
1830     if (c->version >= 13) {
1831
1832         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1833             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1834             pa_tagstruct_get_proplist(t, p) < 0) {
1835             protocol_error(c);
1836             pa_proplist_free(p);
1837             return;
1838         }
1839     }
1840
1841     if (c->version >= 14) {
1842
1843         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1844             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1845             protocol_error(c);
1846             pa_proplist_free(p);
1847             return;
1848         }
1849     }
1850
1851     if (c->version >= 15) {
1852
1853         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1854             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1855             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1856             protocol_error(c);
1857             pa_proplist_free(p);
1858             return;
1859         }
1860     }
1861
1862     if (!pa_tagstruct_eof(t)) {
1863         protocol_error(c);
1864         pa_proplist_free(p);
1865         return;
1866     }
1867
1868     if (sink_index != PA_INVALID_INDEX) {
1869
1870         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1871             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1872             pa_proplist_free(p);
1873             return;
1874         }
1875
1876     } else if (sink_name) {
1877
1878         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1879             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1880             pa_proplist_free(p);
1881             return;
1882         }
1883     }
1884
1885     flags =
1886         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1887         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1888         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1889         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1890         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1891         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1892         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1893         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1894         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1895         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1896
1897     /* Only since protocol version 15 there's a seperate muted_set
1898      * flag. For older versions we synthesize it here */
1899     muted_set = muted_set || muted;
1900
1901     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1902     pa_proplist_free(p);
1903
1904     CHECK_VALIDITY(c->pstream, s, tag, ret);
1905
1906     reply = reply_new(tag);
1907     pa_tagstruct_putu32(reply, s->index);
1908     pa_assert(s->sink_input);
1909     pa_tagstruct_putu32(reply, s->sink_input->index);
1910     pa_tagstruct_putu32(reply, missing);
1911
1912 /*     pa_log("initial request is %u", missing); */
1913
1914     if (c->version >= 9) {
1915         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1916
1917         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1918         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1919         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1920         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1921     }
1922
1923     if (c->version >= 12) {
1924         /* Since 0.9.8 we support sending the chosen sample
1925          * spec/channel map/device/suspend status back to the
1926          * client */
1927
1928         pa_tagstruct_put_sample_spec(reply, &ss);
1929         pa_tagstruct_put_channel_map(reply, &map);
1930
1931         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1932         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1933
1934         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
1935     }
1936
1937     if (c->version >= 13)
1938         pa_tagstruct_put_usec(reply, s->sink_latency);
1939
1940     pa_pstream_send_tagstruct(c->pstream, reply);
1941 }
1942
1943 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1944     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1945     uint32_t channel;
1946
1947     pa_native_connection_assert_ref(c);
1948     pa_assert(t);
1949
1950     if (pa_tagstruct_getu32(t, &channel) < 0 ||
1951         !pa_tagstruct_eof(t)) {
1952         protocol_error(c);
1953         return;
1954     }
1955
1956     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1957
1958     switch (command) {
1959
1960         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
1961             playback_stream *s;
1962             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
1963                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
1964                 return;
1965             }
1966
1967             playback_stream_unlink(s);
1968             break;
1969         }
1970
1971         case PA_COMMAND_DELETE_RECORD_STREAM: {
1972             record_stream *s;
1973             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
1974                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
1975                 return;
1976             }
1977
1978             record_stream_unlink(s);
1979             break;
1980         }
1981
1982         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
1983             upload_stream *s;
1984
1985             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
1986                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
1987                 return;
1988             }
1989
1990             upload_stream_unlink(s);
1991             break;
1992         }
1993
1994         default:
1995             pa_assert_not_reached();
1996     }
1997
1998     pa_pstream_send_simple_ack(c->pstream, tag);
1999 }
2000
2001 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2002     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2003     record_stream *s;
2004     pa_buffer_attr attr;
2005     uint32_t source_index;
2006     const char *name = NULL, *source_name;
2007     pa_sample_spec ss;
2008     pa_channel_map map;
2009     pa_tagstruct *reply;
2010     pa_source *source = NULL;
2011     pa_bool_t
2012         corked = FALSE,
2013         no_remap = FALSE,
2014         no_remix = FALSE,
2015         fix_format = FALSE,
2016         fix_rate = FALSE,
2017         fix_channels = FALSE,
2018         no_move = FALSE,
2019         variable_rate = FALSE,
2020         adjust_latency = FALSE,
2021         peak_detect = FALSE,
2022         early_requests = FALSE,
2023         dont_inhibit_auto_suspend = FALSE,
2024         fail_on_suspend = FALSE;
2025     pa_source_output_flags_t flags = 0;
2026     pa_proplist *p;
2027     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2028     pa_sink_input *direct_on_input = NULL;
2029     int ret = PA_ERR_INVALID;
2030
2031     pa_native_connection_assert_ref(c);
2032     pa_assert(t);
2033
2034     memset(&attr, 0, sizeof(attr));
2035
2036     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2037         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2038         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2039         pa_tagstruct_getu32(t, &source_index) < 0 ||
2040         pa_tagstruct_gets(t, &source_name) < 0 ||
2041         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2042         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2043         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2044         protocol_error(c);
2045         return;
2046     }
2047
2048     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2049     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2050     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2051     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2052     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2053     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2054     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2055
2056     p = pa_proplist_new();
2057
2058     if (name)
2059         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2060
2061     if (c->version >= 12)  {
2062         /* Since 0.9.8 the user can ask for a couple of additional flags */
2063
2064         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2065             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2066             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2067             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2068             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2069             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2070             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2071
2072             protocol_error(c);
2073             pa_proplist_free(p);
2074             return;
2075         }
2076     }
2077
2078     if (c->version >= 13) {
2079
2080         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2081             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2082             pa_tagstruct_get_proplist(t, p) < 0 ||
2083             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2084             protocol_error(c);
2085             pa_proplist_free(p);
2086             return;
2087         }
2088     }
2089
2090     if (c->version >= 14) {
2091
2092         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2093             protocol_error(c);
2094             pa_proplist_free(p);
2095             return;
2096         }
2097     }
2098
2099     if (c->version >= 15) {
2100
2101         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2102             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2103             protocol_error(c);
2104             pa_proplist_free(p);
2105             return;
2106         }
2107     }
2108
2109     if (!pa_tagstruct_eof(t)) {
2110         protocol_error(c);
2111         pa_proplist_free(p);
2112         return;
2113     }
2114
2115     if (source_index != PA_INVALID_INDEX) {
2116
2117         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2118             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2119             pa_proplist_free(p);
2120             return;
2121         }
2122
2123     } else if (source_name) {
2124
2125         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2126             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2127             pa_proplist_free(p);
2128             return;
2129         }
2130     }
2131
2132     if (direct_on_input_idx != PA_INVALID_INDEX) {
2133
2134         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2135             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2136             pa_proplist_free(p);
2137             return;
2138         }
2139     }
2140
2141     flags =
2142         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2143         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2144         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2145         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2146         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2147         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2148         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2149         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2150         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2151         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2152
2153     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2154     pa_proplist_free(p);
2155
2156     CHECK_VALIDITY(c->pstream, s, tag, ret);
2157
2158     reply = reply_new(tag);
2159     pa_tagstruct_putu32(reply, s->index);
2160     pa_assert(s->source_output);
2161     pa_tagstruct_putu32(reply, s->source_output->index);
2162
2163     if (c->version >= 9) {
2164         /* Since 0.9 we support sending the buffer metrics back to the client */
2165
2166         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2167         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2168     }
2169
2170     if (c->version >= 12) {
2171         /* Since 0.9.8 we support sending the chosen sample
2172          * spec/channel map/device/suspend status back to the
2173          * client */
2174
2175         pa_tagstruct_put_sample_spec(reply, &ss);
2176         pa_tagstruct_put_channel_map(reply, &map);
2177
2178         pa_tagstruct_putu32(reply, s->source_output->source->index);
2179         pa_tagstruct_puts(reply, s->source_output->source->name);
2180
2181         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2182     }
2183
2184     if (c->version >= 13)
2185         pa_tagstruct_put_usec(reply, s->source_latency);
2186
2187     pa_pstream_send_tagstruct(c->pstream, reply);
2188 }
2189
2190 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2191     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2192     int ret;
2193
2194     pa_native_connection_assert_ref(c);
2195     pa_assert(t);
2196
2197     if (!pa_tagstruct_eof(t)) {
2198         protocol_error(c);
2199         return;
2200     }
2201
2202     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2203     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2204     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2205
2206     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2207 }
2208
2209 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2210     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2211     const void*cookie;
2212     pa_tagstruct *reply;
2213     pa_bool_t shm_on_remote = FALSE, do_shm;
2214
2215     pa_native_connection_assert_ref(c);
2216     pa_assert(t);
2217
2218     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2219         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2220         !pa_tagstruct_eof(t)) {
2221         protocol_error(c);
2222         return;
2223     }
2224
2225     /* Minimum supported version */
2226     if (c->version < 8) {
2227         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2228         return;
2229     }
2230
2231     /* Starting with protocol version 13 the MSB of the version tag
2232        reflects if shm is available for this pa_native_connection or
2233        not. */
2234     if (c->version >= 13) {
2235         shm_on_remote = !!(c->version & 0x80000000U);
2236         c->version &= 0x7FFFFFFFU;
2237     }
2238
2239     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2240
2241     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2242
2243     if (!c->authorized) {
2244         pa_bool_t success = FALSE;
2245
2246 #ifdef HAVE_CREDS
2247         const pa_creds *creds;
2248
2249         if ((creds = pa_pdispatch_creds(pd))) {
2250             if (creds->uid == getuid())
2251                 success = TRUE;
2252             else if (c->options->auth_group) {
2253                 int r;
2254                 gid_t gid;
2255
2256                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2257                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2258                 else if (gid == creds->gid)
2259                     success = TRUE;
2260
2261                 if (!success) {
2262                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2263                         pa_log_warn("Failed to check group membership.");
2264                     else if (r > 0)
2265                         success = TRUE;
2266                 }
2267             }
2268
2269             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2270                         (unsigned long) creds->uid,
2271                         (unsigned long) creds->gid,
2272                         (int) success);
2273         }
2274 #endif
2275
2276         if (!success && c->options->auth_cookie) {
2277             const uint8_t *ac;
2278
2279             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2280                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2281                     success = TRUE;
2282         }
2283
2284         if (!success) {
2285             pa_log_warn("Denied access to client with invalid authorization data.");
2286             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2287             return;
2288         }
2289
2290         c->authorized = TRUE;
2291         if (c->auth_timeout_event) {
2292             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2293             c->auth_timeout_event = NULL;
2294         }
2295     }
2296
2297     /* Enable shared memory support if possible */
2298     do_shm =
2299         pa_mempool_is_shared(c->protocol->core->mempool) &&
2300         c->is_local;
2301
2302     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2303
2304     if (do_shm)
2305         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2306             do_shm = FALSE;
2307
2308 #ifdef HAVE_CREDS
2309     if (do_shm) {
2310         /* Only enable SHM if both sides are owned by the same
2311          * user. This is a security measure because otherwise data
2312          * private to the user might leak. */
2313
2314         const pa_creds *creds;
2315         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2316             do_shm = FALSE;
2317     }
2318 #endif
2319
2320     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2321     pa_pstream_enable_shm(c->pstream, do_shm);
2322
2323     reply = reply_new(tag);
2324     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2325
2326 #ifdef HAVE_CREDS
2327 {
2328     /* SHM support is only enabled after both sides made sure they are the same user. */
2329
2330     pa_creds ucred;
2331
2332     ucred.uid = getuid();
2333     ucred.gid = getgid();
2334
2335     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2336 }
2337 #else
2338     pa_pstream_send_tagstruct(c->pstream, reply);
2339 #endif
2340 }
2341
2342 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2343     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2344     const char *name = NULL;
2345     pa_proplist *p;
2346     pa_tagstruct *reply;
2347
2348     pa_native_connection_assert_ref(c);
2349     pa_assert(t);
2350
2351     p = pa_proplist_new();
2352
2353     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2354         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2355         !pa_tagstruct_eof(t)) {
2356
2357         protocol_error(c);
2358         pa_proplist_free(p);
2359         return;
2360     }
2361
2362     if (name)
2363         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2364             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2365             pa_proplist_free(p);
2366             return;
2367         }
2368
2369     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2370     pa_proplist_free(p);
2371
2372     reply = reply_new(tag);
2373
2374     if (c->version >= 13)
2375         pa_tagstruct_putu32(reply, c->client->index);
2376
2377     pa_pstream_send_tagstruct(c->pstream, reply);
2378 }
2379
2380 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2381     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2382     const char *name;
2383     uint32_t idx = PA_IDXSET_INVALID;
2384
2385     pa_native_connection_assert_ref(c);
2386     pa_assert(t);
2387
2388     if (pa_tagstruct_gets(t, &name) < 0 ||
2389         !pa_tagstruct_eof(t)) {
2390         protocol_error(c);
2391         return;
2392     }
2393
2394     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2395     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2396
2397     if (command == PA_COMMAND_LOOKUP_SINK) {
2398         pa_sink *sink;
2399         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2400             idx = sink->index;
2401     } else {
2402         pa_source *source;
2403         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2404         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2405             idx = source->index;
2406     }
2407
2408     if (idx == PA_IDXSET_INVALID)
2409         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2410     else {
2411         pa_tagstruct *reply;
2412         reply = reply_new(tag);
2413         pa_tagstruct_putu32(reply, idx);
2414         pa_pstream_send_tagstruct(c->pstream, reply);
2415     }
2416 }
2417
2418 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2419     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2420     uint32_t idx;
2421     playback_stream *s;
2422
2423     pa_native_connection_assert_ref(c);
2424     pa_assert(t);
2425
2426     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2427         !pa_tagstruct_eof(t)) {
2428         protocol_error(c);
2429         return;
2430     }
2431
2432     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2433     s = pa_idxset_get_by_index(c->output_streams, idx);
2434     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2435     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2436
2437     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2438 }
2439
2440 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2441     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2442     pa_tagstruct *reply;
2443     const pa_mempool_stat *stat;
2444
2445     pa_native_connection_assert_ref(c);
2446     pa_assert(t);
2447
2448     if (!pa_tagstruct_eof(t)) {
2449         protocol_error(c);
2450         return;
2451     }
2452
2453     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2454
2455     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2456
2457     reply = reply_new(tag);
2458     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2459     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2460     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2461     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2462     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2463     pa_pstream_send_tagstruct(c->pstream, reply);
2464 }
2465
2466 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2467     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2468     pa_tagstruct *reply;
2469     playback_stream *s;
2470     struct timeval tv, now;
2471     uint32_t idx;
2472     pa_usec_t latency;
2473
2474     pa_native_connection_assert_ref(c);
2475     pa_assert(t);
2476
2477     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2478         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2479         !pa_tagstruct_eof(t)) {
2480         protocol_error(c);
2481         return;
2482     }
2483
2484     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2485     s = pa_idxset_get_by_index(c->output_streams, idx);
2486     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2487     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2488     CHECK_VALIDITY(c->pstream, pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0, tag, PA_ERR_NOENTITY)
2489
2490     reply = reply_new(tag);
2491
2492     latency = pa_sink_get_latency(s->sink_input->sink);
2493     latency += pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec);
2494
2495     pa_tagstruct_put_usec(reply, latency);
2496
2497     pa_tagstruct_put_usec(reply, 0);
2498     pa_tagstruct_put_boolean(reply, s->sink_input->thread_info.playing_for > 0);
2499     pa_tagstruct_put_timeval(reply, &tv);
2500     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2501     pa_tagstruct_puts64(reply, s->write_index);
2502     pa_tagstruct_puts64(reply, s->read_index);
2503
2504     if (c->version >= 13) {
2505         pa_tagstruct_putu64(reply, s->sink_input->thread_info.underrun_for);
2506         pa_tagstruct_putu64(reply, s->sink_input->thread_info.playing_for);
2507     }
2508
2509     pa_pstream_send_tagstruct(c->pstream, reply);
2510 }
2511
2512 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2513     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2514     pa_tagstruct *reply;
2515     record_stream *s;
2516     struct timeval tv, now;
2517     uint32_t idx;
2518
2519     pa_native_connection_assert_ref(c);
2520     pa_assert(t);
2521
2522     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2523         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2524         !pa_tagstruct_eof(t)) {
2525         protocol_error(c);
2526         return;
2527     }
2528
2529     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2530     s = pa_idxset_get_by_index(c->record_streams, idx);
2531     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2532
2533     reply = reply_new(tag);
2534     pa_tagstruct_put_usec(reply, s->source_output->source->monitor_of ? pa_sink_get_latency(s->source_output->source->monitor_of) : 0);
2535     pa_tagstruct_put_usec(reply, pa_source_get_latency(s->source_output->source));
2536     pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING);
2537     pa_tagstruct_put_timeval(reply, &tv);
2538     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2539     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2540     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2541     pa_pstream_send_tagstruct(c->pstream, reply);
2542 }
2543
2544 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2545     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2546     upload_stream *s;
2547     uint32_t length;
2548     const char *name = NULL;
2549     pa_sample_spec ss;
2550     pa_channel_map map;
2551     pa_tagstruct *reply;
2552     pa_proplist *p;
2553
2554     pa_native_connection_assert_ref(c);
2555     pa_assert(t);
2556
2557     if (pa_tagstruct_gets(t, &name) < 0 ||
2558         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2559         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2560         pa_tagstruct_getu32(t, &length) < 0) {
2561         protocol_error(c);
2562         return;
2563     }
2564
2565     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2566     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2567     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2568     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2569     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2570     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2571
2572     p = pa_proplist_new();
2573
2574     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2575         !pa_tagstruct_eof(t)) {
2576
2577         protocol_error(c);
2578         pa_proplist_free(p);
2579         return;
2580     }
2581
2582     if (c->version < 13)
2583         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2584     else if (!name)
2585         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2586             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2587
2588     if (!name || !pa_namereg_is_valid_name(name)) {
2589         pa_proplist_free(p);
2590         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2591     }
2592
2593     s = upload_stream_new(c, &ss, &map, name, length, p);
2594     pa_proplist_free(p);
2595
2596     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2597
2598     reply = reply_new(tag);
2599     pa_tagstruct_putu32(reply, s->index);
2600     pa_tagstruct_putu32(reply, length);
2601     pa_pstream_send_tagstruct(c->pstream, reply);
2602 }
2603
2604 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2605     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2606     uint32_t channel;
2607     upload_stream *s;
2608     uint32_t idx;
2609
2610     pa_native_connection_assert_ref(c);
2611     pa_assert(t);
2612
2613     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2614         !pa_tagstruct_eof(t)) {
2615         protocol_error(c);
2616         return;
2617     }
2618
2619     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2620
2621     s = pa_idxset_get_by_index(c->output_streams, channel);
2622     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2623     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2624
2625     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2626         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2627     else
2628         pa_pstream_send_simple_ack(c->pstream, tag);
2629
2630     upload_stream_unlink(s);
2631 }
2632
2633 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2634     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2635     uint32_t sink_index;
2636     pa_volume_t volume;
2637     pa_sink *sink;
2638     const char *name, *sink_name;
2639     uint32_t idx;
2640     pa_proplist *p;
2641     pa_tagstruct *reply;
2642
2643     pa_native_connection_assert_ref(c);
2644     pa_assert(t);
2645
2646     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2647
2648     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2649         pa_tagstruct_gets(t, &sink_name) < 0 ||
2650         pa_tagstruct_getu32(t, &volume) < 0 ||
2651         pa_tagstruct_gets(t, &name) < 0) {
2652         protocol_error(c);
2653         return;
2654     }
2655
2656     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2657     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2658     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2659     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2660
2661     if (sink_index != PA_INVALID_INDEX)
2662         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2663     else
2664         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2665
2666     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2667
2668     p = pa_proplist_new();
2669
2670     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2671         !pa_tagstruct_eof(t)) {
2672         protocol_error(c);
2673         pa_proplist_free(p);
2674         return;
2675     }
2676
2677     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2678
2679     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2680         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2681         pa_proplist_free(p);
2682         return;
2683     }
2684
2685     pa_proplist_free(p);
2686
2687     reply = reply_new(tag);
2688
2689     if (c->version >= 13)
2690         pa_tagstruct_putu32(reply, idx);
2691
2692     pa_pstream_send_tagstruct(c->pstream, reply);
2693 }
2694
2695 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2696     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2697     const char *name;
2698
2699     pa_native_connection_assert_ref(c);
2700     pa_assert(t);
2701
2702     if (pa_tagstruct_gets(t, &name) < 0 ||
2703         !pa_tagstruct_eof(t)) {
2704         protocol_error(c);
2705         return;
2706     }
2707
2708     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2709     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2710
2711     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2712         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2713         return;
2714     }
2715
2716     pa_pstream_send_simple_ack(c->pstream, tag);
2717 }
2718
2719 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2720     pa_assert(c);
2721     pa_assert(fixed);
2722     pa_assert(original);
2723
2724     *fixed = *original;
2725
2726     if (c->version < 12) {
2727         /* Before protocol version 12 we didn't support S32 samples,
2728          * so we need to lie about this to the client */
2729
2730         if (fixed->format == PA_SAMPLE_S32LE)
2731             fixed->format = PA_SAMPLE_FLOAT32LE;
2732         if (fixed->format == PA_SAMPLE_S32BE)
2733             fixed->format = PA_SAMPLE_FLOAT32BE;
2734     }
2735
2736     if (c->version < 15) {
2737         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2738             fixed->format = PA_SAMPLE_FLOAT32LE;
2739         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2740             fixed->format = PA_SAMPLE_FLOAT32BE;
2741     }
2742 }
2743
2744 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2745     pa_sample_spec fixed_ss;
2746
2747     pa_assert(t);
2748     pa_sink_assert_ref(sink);
2749
2750     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2751
2752     pa_tagstruct_put(
2753         t,
2754         PA_TAG_U32, sink->index,
2755         PA_TAG_STRING, sink->name,
2756         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2757         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2758         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2759         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2760         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
2761         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2762         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2763         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2764         PA_TAG_USEC, pa_sink_get_latency(sink),
2765         PA_TAG_STRING, sink->driver,
2766         PA_TAG_U32, sink->flags,
2767         PA_TAG_INVALID);
2768
2769     if (c->version >= 13) {
2770         pa_tagstruct_put_proplist(t, sink->proplist);
2771         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2772     }
2773
2774     if (c->version >= 15) {
2775         pa_tagstruct_put_volume(t, sink->base_volume);
2776         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2777             pa_log_error("Internal sink state is invalid.");
2778         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2779         pa_tagstruct_putu32(t, sink->n_volume_steps);
2780         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2781     }
2782 }
2783
2784 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2785     pa_sample_spec fixed_ss;
2786
2787     pa_assert(t);
2788     pa_source_assert_ref(source);
2789
2790     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2791
2792     pa_tagstruct_put(
2793         t,
2794         PA_TAG_U32, source->index,
2795         PA_TAG_STRING, source->name,
2796         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2797         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2798         PA_TAG_CHANNEL_MAP, &source->channel_map,
2799         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2800         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2801         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2802         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2803         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2804         PA_TAG_USEC, pa_source_get_latency(source),
2805         PA_TAG_STRING, source->driver,
2806         PA_TAG_U32, source->flags,
2807         PA_TAG_INVALID);
2808
2809     if (c->version >= 13) {
2810         pa_tagstruct_put_proplist(t, source->proplist);
2811         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2812     }
2813
2814     if (c->version >= 15) {
2815         pa_tagstruct_put_volume(t, source->base_volume);
2816         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2817             pa_log_error("Internal source state is invalid.");
2818         pa_tagstruct_putu32(t, pa_source_get_state(source));
2819         pa_tagstruct_putu32(t, source->n_volume_steps);
2820         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2821     }
2822 }
2823
2824 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2825     pa_assert(t);
2826     pa_assert(client);
2827
2828     pa_tagstruct_putu32(t, client->index);
2829     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2830     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2831     pa_tagstruct_puts(t, client->driver);
2832
2833     if (c->version >= 13)
2834         pa_tagstruct_put_proplist(t, client->proplist);
2835 }
2836
2837 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2838     void *state = NULL;
2839     pa_card_profile *p;
2840
2841     pa_assert(t);
2842     pa_assert(card);
2843
2844     pa_tagstruct_putu32(t, card->index);
2845     pa_tagstruct_puts(t, card->name);
2846     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2847     pa_tagstruct_puts(t, card->driver);
2848
2849     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2850
2851     if (card->profiles) {
2852         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2853             pa_tagstruct_puts(t, p->name);
2854             pa_tagstruct_puts(t, p->description);
2855             pa_tagstruct_putu32(t, p->n_sinks);
2856             pa_tagstruct_putu32(t, p->n_sources);
2857             pa_tagstruct_putu32(t, p->priority);
2858         }
2859     }
2860
2861     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2862     pa_tagstruct_put_proplist(t, card->proplist);
2863 }
2864
2865 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2866     pa_assert(t);
2867     pa_assert(module);
2868
2869     pa_tagstruct_putu32(t, module->index);
2870     pa_tagstruct_puts(t, module->name);
2871     pa_tagstruct_puts(t, module->argument);
2872     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2873
2874     if (c->version < 15)
2875         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2876
2877     if (c->version >= 15)
2878         pa_tagstruct_put_proplist(t, module->proplist);
2879 }
2880
2881 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2882     pa_sample_spec fixed_ss;
2883     pa_usec_t sink_latency;
2884
2885     pa_assert(t);
2886     pa_sink_input_assert_ref(s);
2887
2888     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2889
2890     pa_tagstruct_putu32(t, s->index);
2891     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2892     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2893     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2894     pa_tagstruct_putu32(t, s->sink->index);
2895     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2896     pa_tagstruct_put_channel_map(t, &s->channel_map);
2897     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s));
2898     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
2899     pa_tagstruct_put_usec(t, sink_latency);
2900     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
2901     pa_tagstruct_puts(t, s->driver);
2902     if (c->version >= 11)
2903         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
2904     if (c->version >= 13)
2905         pa_tagstruct_put_proplist(t, s->proplist);
2906 }
2907
2908 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
2909     pa_sample_spec fixed_ss;
2910     pa_usec_t source_latency;
2911
2912     pa_assert(t);
2913     pa_source_output_assert_ref(s);
2914
2915     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2916
2917     pa_tagstruct_putu32(t, s->index);
2918     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2919     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2920     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2921     pa_tagstruct_putu32(t, s->source->index);
2922     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2923     pa_tagstruct_put_channel_map(t, &s->channel_map);
2924     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
2925     pa_tagstruct_put_usec(t, source_latency);
2926     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
2927     pa_tagstruct_puts(t, s->driver);
2928
2929     if (c->version >= 13)
2930         pa_tagstruct_put_proplist(t, s->proplist);
2931 }
2932
2933 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
2934     pa_sample_spec fixed_ss;
2935     pa_cvolume v;
2936
2937     pa_assert(t);
2938     pa_assert(e);
2939
2940     if (e->memchunk.memblock)
2941         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
2942     else
2943         memset(&fixed_ss, 0, sizeof(fixed_ss));
2944
2945     pa_tagstruct_putu32(t, e->index);
2946     pa_tagstruct_puts(t, e->name);
2947
2948     if (e->volume_is_set)
2949         v = e->volume;
2950     else
2951         pa_cvolume_init(&v);
2952
2953     pa_tagstruct_put_cvolume(t, &v);
2954     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
2955     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2956     pa_tagstruct_put_channel_map(t, &e->channel_map);
2957     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
2958     pa_tagstruct_put_boolean(t, e->lazy);
2959     pa_tagstruct_puts(t, e->filename);
2960
2961     if (c->version >= 13)
2962         pa_tagstruct_put_proplist(t, e->proplist);
2963 }
2964
2965 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2966     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2967     uint32_t idx;
2968     pa_sink *sink = NULL;
2969     pa_source *source = NULL;
2970     pa_client *client = NULL;
2971     pa_card *card = NULL;
2972     pa_module *module = NULL;
2973     pa_sink_input *si = NULL;
2974     pa_source_output *so = NULL;
2975     pa_scache_entry *sce = NULL;
2976     const char *name = NULL;
2977     pa_tagstruct *reply;
2978
2979     pa_native_connection_assert_ref(c);
2980     pa_assert(t);
2981
2982     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2983         (command != PA_COMMAND_GET_CLIENT_INFO &&
2984          command != PA_COMMAND_GET_MODULE_INFO &&
2985          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
2986          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
2987          pa_tagstruct_gets(t, &name) < 0) ||
2988         !pa_tagstruct_eof(t)) {
2989         protocol_error(c);
2990         return;
2991     }
2992
2993     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2994     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2995     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
2996     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
2997     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2998
2999     if (command == PA_COMMAND_GET_SINK_INFO) {
3000         if (idx != PA_INVALID_INDEX)
3001             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3002         else
3003             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3004     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3005         if (idx != PA_INVALID_INDEX)
3006             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3007         else
3008             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3009     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3010         if (idx != PA_INVALID_INDEX)
3011             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3012         else
3013             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3014     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3015         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3016     else if (command == PA_COMMAND_GET_MODULE_INFO)
3017         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3018     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3019         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3020     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3021         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3022     else {
3023         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3024         if (idx != PA_INVALID_INDEX)
3025             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3026         else
3027             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3028     }
3029
3030     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3031         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3032         return;
3033     }
3034
3035     reply = reply_new(tag);
3036     if (sink)
3037         sink_fill_tagstruct(c, reply, sink);
3038     else if (source)
3039         source_fill_tagstruct(c, reply, source);
3040     else if (client)
3041         client_fill_tagstruct(c, reply, client);
3042     else if (card)
3043         card_fill_tagstruct(c, reply, card);
3044     else if (module)
3045         module_fill_tagstruct(c, reply, module);
3046     else if (si)
3047         sink_input_fill_tagstruct(c, reply, si);
3048     else if (so)
3049         source_output_fill_tagstruct(c, reply, so);
3050     else
3051         scache_fill_tagstruct(c, reply, sce);
3052     pa_pstream_send_tagstruct(c->pstream, reply);
3053 }
3054
3055 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3056     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3057     pa_idxset *i;
3058     uint32_t idx;
3059     void *p;
3060     pa_tagstruct *reply;
3061
3062     pa_native_connection_assert_ref(c);
3063     pa_assert(t);
3064
3065     if (!pa_tagstruct_eof(t)) {
3066         protocol_error(c);
3067         return;
3068     }
3069
3070     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3071
3072     reply = reply_new(tag);
3073
3074     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3075         i = c->protocol->core->sinks;
3076     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3077         i = c->protocol->core->sources;
3078     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3079         i = c->protocol->core->clients;
3080     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3081         i = c->protocol->core->cards;
3082     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3083         i = c->protocol->core->modules;
3084     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3085         i = c->protocol->core->sink_inputs;
3086     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3087         i = c->protocol->core->source_outputs;
3088     else {
3089         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3090         i = c->protocol->core->scache;
3091     }
3092
3093     if (i) {
3094         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3095             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3096                 sink_fill_tagstruct(c, reply, p);
3097             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3098                 source_fill_tagstruct(c, reply, p);
3099             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3100                 client_fill_tagstruct(c, reply, p);
3101             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3102                 card_fill_tagstruct(c, reply, p);
3103             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3104                 module_fill_tagstruct(c, reply, p);
3105             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3106                 sink_input_fill_tagstruct(c, reply, p);
3107             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3108                 source_output_fill_tagstruct(c, reply, p);
3109             else {
3110                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3111                 scache_fill_tagstruct(c, reply, p);
3112             }
3113         }
3114     }
3115
3116     pa_pstream_send_tagstruct(c->pstream, reply);
3117 }
3118
3119 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3120     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3121     pa_tagstruct *reply;
3122     char txt[256];
3123     pa_sink *def_sink;
3124     pa_source *def_source;
3125     pa_sample_spec fixed_ss;
3126
3127     pa_native_connection_assert_ref(c);
3128     pa_assert(t);
3129
3130     if (!pa_tagstruct_eof(t)) {
3131         protocol_error(c);
3132         return;
3133     }
3134
3135     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3136
3137     reply = reply_new(tag);
3138     pa_tagstruct_puts(reply, PACKAGE_NAME);
3139     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3140     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
3141     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
3142
3143     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3144     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3145
3146     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3147     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3148     def_source = pa_namereg_get_default_source(c->protocol->core);
3149     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3150
3151     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3152
3153     if (c->version >= 15)
3154         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3155
3156     pa_pstream_send_tagstruct(c->pstream, reply);
3157 }
3158
3159 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3160     pa_tagstruct *t;
3161     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3162
3163     pa_native_connection_assert_ref(c);
3164
3165     t = pa_tagstruct_new(NULL, 0);
3166     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3167     pa_tagstruct_putu32(t, (uint32_t) -1);
3168     pa_tagstruct_putu32(t, e);
3169     pa_tagstruct_putu32(t, idx);
3170     pa_pstream_send_tagstruct(c->pstream, t);
3171 }
3172
3173 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3174     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3175     pa_subscription_mask_t m;
3176
3177     pa_native_connection_assert_ref(c);
3178     pa_assert(t);
3179
3180     if (pa_tagstruct_getu32(t, &m) < 0 ||
3181         !pa_tagstruct_eof(t)) {
3182         protocol_error(c);
3183         return;
3184     }
3185
3186     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3187     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3188
3189     if (c->subscription)
3190         pa_subscription_free(c->subscription);
3191
3192     if (m != 0) {
3193         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3194         pa_assert(c->subscription);
3195     } else
3196         c->subscription = NULL;
3197
3198     pa_pstream_send_simple_ack(c->pstream, tag);
3199 }
3200
3201 static void command_set_volume(
3202         pa_pdispatch *pd,
3203         uint32_t command,
3204         uint32_t tag,
3205         pa_tagstruct *t,
3206         void *userdata) {
3207
3208     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3209     uint32_t idx;
3210     pa_cvolume volume;
3211     pa_sink *sink = NULL;
3212     pa_source *source = NULL;
3213     pa_sink_input *si = NULL;
3214     const char *name = NULL;
3215
3216     pa_native_connection_assert_ref(c);
3217     pa_assert(t);
3218
3219     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3220         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3221         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3222         pa_tagstruct_get_cvolume(t, &volume) ||
3223         !pa_tagstruct_eof(t)) {
3224         protocol_error(c);
3225         return;
3226     }
3227
3228     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3229     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3230     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3231     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3232     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3233     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3234
3235     switch (command) {
3236
3237         case PA_COMMAND_SET_SINK_VOLUME:
3238             if (idx != PA_INVALID_INDEX)
3239                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3240             else
3241                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3242             break;
3243
3244         case PA_COMMAND_SET_SOURCE_VOLUME:
3245             if (idx != PA_INVALID_INDEX)
3246                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3247             else
3248                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3249             break;
3250
3251         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3252             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3253             break;
3254
3255         default:
3256             pa_assert_not_reached();
3257     }
3258
3259     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3260
3261     if (sink)
3262         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3263     else if (source)
3264         pa_source_set_volume(source, &volume);
3265     else if (si)
3266         pa_sink_input_set_volume(si, &volume, TRUE);
3267
3268     pa_pstream_send_simple_ack(c->pstream, tag);
3269 }
3270
3271 static void command_set_mute(
3272         pa_pdispatch *pd,
3273         uint32_t command,
3274         uint32_t tag,
3275         pa_tagstruct *t,
3276         void *userdata) {
3277
3278     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3279     uint32_t idx;
3280     pa_bool_t mute;
3281     pa_sink *sink = NULL;
3282     pa_source *source = NULL;
3283     pa_sink_input *si = NULL;
3284     const char *name = NULL;
3285
3286     pa_native_connection_assert_ref(c);
3287     pa_assert(t);
3288
3289     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3290         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3291         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3292         pa_tagstruct_get_boolean(t, &mute) ||
3293         !pa_tagstruct_eof(t)) {
3294         protocol_error(c);
3295         return;
3296     }
3297
3298     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3299     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3300     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3301     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3302     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3303
3304     switch (command) {
3305
3306         case PA_COMMAND_SET_SINK_MUTE:
3307
3308             if (idx != PA_INVALID_INDEX)
3309                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3310             else
3311                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3312
3313             break;
3314
3315         case PA_COMMAND_SET_SOURCE_MUTE:
3316             if (idx != PA_INVALID_INDEX)
3317                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3318             else
3319                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3320
3321             break;
3322
3323         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3324             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3325             break;
3326
3327         default:
3328             pa_assert_not_reached();
3329     }
3330
3331     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3332
3333     if (sink)
3334         pa_sink_set_mute(sink, mute);
3335     else if (source)
3336         pa_source_set_mute(source, mute);
3337     else if (si)
3338         pa_sink_input_set_mute(si, mute, TRUE);
3339
3340     pa_pstream_send_simple_ack(c->pstream, tag);
3341 }
3342
3343 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3344     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3345     uint32_t idx;
3346     pa_bool_t b;
3347     playback_stream *s;
3348
3349     pa_native_connection_assert_ref(c);
3350     pa_assert(t);
3351
3352     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3353         pa_tagstruct_get_boolean(t, &b) < 0 ||
3354         !pa_tagstruct_eof(t)) {
3355         protocol_error(c);
3356         return;
3357     }
3358
3359     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3360     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3361     s = pa_idxset_get_by_index(c->output_streams, idx);
3362     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3363     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3364
3365     pa_sink_input_cork(s->sink_input, b);
3366
3367     if (b)
3368         s->is_underrun = TRUE;
3369
3370     pa_pstream_send_simple_ack(c->pstream, tag);
3371 }
3372
3373 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3374     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3375     uint32_t idx;
3376     playback_stream *s;
3377
3378     pa_native_connection_assert_ref(c);
3379     pa_assert(t);
3380
3381     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3382         !pa_tagstruct_eof(t)) {
3383         protocol_error(c);
3384         return;
3385     }
3386
3387     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3388     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3389     s = pa_idxset_get_by_index(c->output_streams, idx);
3390     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3391     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3392
3393     switch (command) {
3394         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3395             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3396             break;
3397
3398         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3399             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3400             break;
3401
3402         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3403             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3404             break;
3405
3406         default:
3407             pa_assert_not_reached();
3408     }
3409
3410     pa_pstream_send_simple_ack(c->pstream, tag);
3411 }
3412
3413 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3414     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3415     uint32_t idx;
3416     record_stream *s;
3417     pa_bool_t b;
3418
3419     pa_native_connection_assert_ref(c);
3420     pa_assert(t);
3421
3422     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3423         pa_tagstruct_get_boolean(t, &b) < 0 ||
3424         !pa_tagstruct_eof(t)) {
3425         protocol_error(c);
3426         return;
3427     }
3428
3429     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3430     s = pa_idxset_get_by_index(c->record_streams, idx);
3431     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3432
3433     pa_source_output_cork(s->source_output, b);
3434     pa_memblockq_prebuf_force(s->memblockq);
3435     pa_pstream_send_simple_ack(c->pstream, tag);
3436 }
3437
3438 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3439     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3440     uint32_t idx;
3441     record_stream *s;
3442
3443     pa_native_connection_assert_ref(c);
3444     pa_assert(t);
3445
3446     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3447         !pa_tagstruct_eof(t)) {
3448         protocol_error(c);
3449         return;
3450     }
3451
3452     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3453     s = pa_idxset_get_by_index(c->record_streams, idx);
3454     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3455
3456     pa_memblockq_flush_read(s->memblockq);
3457     pa_pstream_send_simple_ack(c->pstream, tag);
3458 }
3459
3460 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3461     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3462     uint32_t idx;
3463     pa_buffer_attr a;
3464     pa_tagstruct *reply;
3465
3466     pa_native_connection_assert_ref(c);
3467     pa_assert(t);
3468
3469     memset(&a, 0, sizeof(a));
3470
3471     if (pa_tagstruct_getu32(t, &idx) < 0) {
3472         protocol_error(c);
3473         return;
3474     }
3475
3476     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3477
3478     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3479         playback_stream *s;
3480         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3481
3482         s = pa_idxset_get_by_index(c->output_streams, idx);
3483         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3484         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3485
3486         if (pa_tagstruct_get(
3487                     t,
3488                     PA_TAG_U32, &a.maxlength,
3489                     PA_TAG_U32, &a.tlength,
3490                     PA_TAG_U32, &a.prebuf,
3491                     PA_TAG_U32, &a.minreq,
3492                     PA_TAG_INVALID) < 0 ||
3493             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3494             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3495             !pa_tagstruct_eof(t)) {
3496             protocol_error(c);
3497             return;
3498         }
3499
3500         s->adjust_latency = adjust_latency;
3501         s->early_requests = early_requests;
3502         s->buffer_attr = a;
3503
3504         fix_playback_buffer_attr(s);
3505         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3506
3507         reply = reply_new(tag);
3508         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3509         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3510         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3511         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3512
3513         if (c->version >= 13)
3514             pa_tagstruct_put_usec(reply, s->sink_latency);
3515
3516     } else {
3517         record_stream *s;
3518         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3519         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3520
3521         s = pa_idxset_get_by_index(c->record_streams, idx);
3522         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3523
3524         if (pa_tagstruct_get(
3525                     t,
3526                     PA_TAG_U32, &a.maxlength,
3527                     PA_TAG_U32, &a.fragsize,
3528                     PA_TAG_INVALID) < 0 ||
3529             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3530             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3531             !pa_tagstruct_eof(t)) {
3532             protocol_error(c);
3533             return;
3534         }
3535
3536         s->adjust_latency = adjust_latency;
3537         s->early_requests = early_requests;
3538         s->buffer_attr = a;
3539
3540         fix_record_buffer_attr_pre(s);
3541         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3542         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3543         fix_record_buffer_attr_post(s);
3544
3545         reply = reply_new(tag);
3546         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3547         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3548
3549         if (c->version >= 13)
3550             pa_tagstruct_put_usec(reply, s->source_latency);
3551     }
3552
3553     pa_pstream_send_tagstruct(c->pstream, reply);
3554 }
3555
3556 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3557     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3558     uint32_t idx;
3559     uint32_t rate;
3560
3561     pa_native_connection_assert_ref(c);
3562     pa_assert(t);
3563
3564     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3565         pa_tagstruct_getu32(t, &rate) < 0 ||
3566         !pa_tagstruct_eof(t)) {
3567         protocol_error(c);
3568         return;
3569     }
3570
3571     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3572     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3573
3574     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3575         playback_stream *s;
3576
3577         s = pa_idxset_get_by_index(c->output_streams, idx);
3578         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3579         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3580
3581         pa_sink_input_set_rate(s->sink_input, rate);
3582
3583     } else {
3584         record_stream *s;
3585         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3586
3587         s = pa_idxset_get_by_index(c->record_streams, idx);
3588         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3589
3590         pa_source_output_set_rate(s->source_output, rate);
3591     }
3592
3593     pa_pstream_send_simple_ack(c->pstream, tag);
3594 }
3595
3596 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3597     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3598     uint32_t idx;
3599     uint32_t mode;
3600     pa_proplist *p;
3601
3602     pa_native_connection_assert_ref(c);
3603     pa_assert(t);
3604
3605     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3606
3607     p = pa_proplist_new();
3608
3609     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3610
3611         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3612             pa_tagstruct_get_proplist(t, p) < 0 ||
3613             !pa_tagstruct_eof(t)) {
3614             protocol_error(c);
3615             pa_proplist_free(p);
3616             return;
3617         }
3618
3619     } else {
3620
3621         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3622             pa_tagstruct_getu32(t, &mode) < 0 ||
3623             pa_tagstruct_get_proplist(t, p) < 0 ||
3624             !pa_tagstruct_eof(t)) {
3625             protocol_error(c);
3626             pa_proplist_free(p);
3627             return;
3628         }
3629     }
3630
3631     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3632         pa_proplist_free(p);
3633         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3634     }
3635
3636     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3637         playback_stream *s;
3638
3639         s = pa_idxset_get_by_index(c->output_streams, idx);
3640         if (!s || !playback_stream_isinstance(s)) {
3641             pa_proplist_free(p);
3642             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3643         }
3644         pa_sink_input_update_proplist(s->sink_input, mode, p);
3645
3646     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3647         record_stream *s;
3648
3649         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3650             pa_proplist_free(p);
3651             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3652         }
3653         pa_source_output_update_proplist(s->source_output, mode, p);
3654
3655     } else {
3656         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3657
3658         pa_client_update_proplist(c->client, mode, p);
3659     }
3660
3661     pa_pstream_send_simple_ack(c->pstream, tag);
3662     pa_proplist_free(p);
3663 }
3664
3665 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3666     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3667     uint32_t idx;
3668     unsigned changed = 0;
3669     pa_proplist *p;
3670     pa_strlist *l = NULL;
3671
3672     pa_native_connection_assert_ref(c);
3673     pa_assert(t);
3674
3675     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3676
3677     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3678
3679         if (pa_tagstruct_getu32(t, &idx) < 0) {
3680             protocol_error(c);
3681             return;
3682         }
3683     }
3684
3685     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3686         playback_stream *s;
3687
3688         s = pa_idxset_get_by_index(c->output_streams, idx);
3689         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3690         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3691
3692         p = s->sink_input->proplist;
3693
3694     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3695         record_stream *s;
3696
3697         s = pa_idxset_get_by_index(c->record_streams, idx);
3698         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3699
3700         p = s->source_output->proplist;
3701     } else {
3702         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3703
3704         p = c->client->proplist;
3705     }
3706
3707     for (;;) {
3708         const char *k;
3709
3710         if (pa_tagstruct_gets(t, &k) < 0) {
3711             protocol_error(c);
3712             pa_strlist_free(l);
3713             return;
3714         }
3715
3716         if (!k)
3717             break;
3718
3719         l = pa_strlist_prepend(l, k);
3720     }
3721
3722     if (!pa_tagstruct_eof(t)) {
3723         protocol_error(c);
3724         pa_strlist_free(l);
3725         return;
3726     }
3727
3728     for (;;) {
3729         char *z;
3730
3731         l = pa_strlist_pop(l, &z);
3732
3733         if (!z)
3734             break;
3735
3736         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3737         pa_xfree(z);
3738     }
3739
3740     pa_pstream_send_simple_ack(c->pstream, tag);
3741
3742     if (changed) {
3743         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3744             playback_stream *s;
3745
3746             s = pa_idxset_get_by_index(c->output_streams, idx);
3747             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3748
3749         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3750             record_stream *s;
3751
3752             s = pa_idxset_get_by_index(c->record_streams, idx);
3753             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3754
3755         } else {
3756             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3757             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3758         }
3759     }
3760 }
3761
3762 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3763     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3764     const char *s;
3765
3766     pa_native_connection_assert_ref(c);
3767     pa_assert(t);
3768
3769     if (pa_tagstruct_gets(t, &s) < 0 ||
3770         !pa_tagstruct_eof(t)) {
3771         protocol_error(c);
3772         return;
3773     }
3774
3775     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3776     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3777
3778     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3779         pa_source *source;
3780
3781         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3782         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3783
3784         pa_namereg_set_default_source(c->protocol->core, source);
3785     } else {
3786         pa_sink *sink;
3787         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3788
3789         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3790         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3791
3792         pa_namereg_set_default_sink(c->protocol->core, sink);
3793     }
3794
3795     pa_pstream_send_simple_ack(c->pstream, tag);
3796 }
3797
3798 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3799     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3800     uint32_t idx;
3801     const char *name;
3802
3803     pa_native_connection_assert_ref(c);
3804     pa_assert(t);
3805
3806     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3807         pa_tagstruct_gets(t, &name) < 0 ||
3808         !pa_tagstruct_eof(t)) {
3809         protocol_error(c);
3810         return;
3811     }
3812
3813     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3814     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3815
3816     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3817         playback_stream *s;
3818
3819         s = pa_idxset_get_by_index(c->output_streams, idx);
3820         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3821         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3822
3823         pa_sink_input_set_name(s->sink_input, name);
3824
3825     } else {
3826         record_stream *s;
3827         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3828
3829         s = pa_idxset_get_by_index(c->record_streams, idx);
3830         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3831
3832         pa_source_output_set_name(s->source_output, name);
3833     }
3834
3835     pa_pstream_send_simple_ack(c->pstream, tag);
3836 }
3837
3838 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3839     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3840     uint32_t idx;
3841
3842     pa_native_connection_assert_ref(c);
3843     pa_assert(t);
3844
3845     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3846         !pa_tagstruct_eof(t)) {
3847         protocol_error(c);
3848         return;
3849     }
3850
3851     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3852
3853     if (command == PA_COMMAND_KILL_CLIENT) {
3854         pa_client *client;
3855
3856         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3857         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3858
3859         pa_native_connection_ref(c);
3860         pa_client_kill(client);
3861
3862     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3863         pa_sink_input *s;
3864
3865         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3866         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3867
3868         pa_native_connection_ref(c);
3869         pa_sink_input_kill(s);
3870     } else {
3871         pa_source_output *s;
3872
3873         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
3874
3875         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3876         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3877
3878         pa_native_connection_ref(c);
3879         pa_source_output_kill(s);
3880     }
3881
3882     pa_pstream_send_simple_ack(c->pstream, tag);
3883     pa_native_connection_unref(c);
3884 }
3885
3886 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3887     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3888     pa_module *m;
3889     const char *name, *argument;
3890     pa_tagstruct *reply;
3891
3892     pa_native_connection_assert_ref(c);
3893     pa_assert(t);
3894
3895     if (pa_tagstruct_gets(t, &name) < 0 ||
3896         pa_tagstruct_gets(t, &argument) < 0 ||
3897         !pa_tagstruct_eof(t)) {
3898         protocol_error(c);
3899         return;
3900     }
3901
3902     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3903     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
3904     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
3905
3906     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
3907         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
3908         return;
3909     }
3910
3911     reply = reply_new(tag);
3912     pa_tagstruct_putu32(reply, m->index);
3913     pa_pstream_send_tagstruct(c->pstream, reply);
3914 }
3915
3916 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3917     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3918     uint32_t idx;
3919     pa_module *m;
3920
3921     pa_native_connection_assert_ref(c);
3922     pa_assert(t);
3923
3924     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3925         !pa_tagstruct_eof(t)) {
3926         protocol_error(c);
3927         return;
3928     }
3929
3930     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3931     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3932     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
3933
3934     pa_module_unload_request(m, FALSE);
3935     pa_pstream_send_simple_ack(c->pstream, tag);
3936 }
3937
3938 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3939     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3940     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
3941     const char *name_device = NULL;
3942
3943     pa_native_connection_assert_ref(c);
3944     pa_assert(t);
3945
3946     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3947         pa_tagstruct_getu32(t, &idx_device) < 0 ||
3948         pa_tagstruct_gets(t, &name_device) < 0 ||
3949         !pa_tagstruct_eof(t)) {
3950         protocol_error(c);
3951         return;
3952     }
3953
3954     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3955     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3956
3957     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
3958     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
3959     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
3960     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3961
3962     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
3963         pa_sink_input *si = NULL;
3964         pa_sink *sink = NULL;
3965
3966         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3967
3968         if (idx_device != PA_INVALID_INDEX)
3969             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
3970         else
3971             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
3972
3973         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
3974
3975         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
3976             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
3977             return;
3978         }
3979     } else {
3980         pa_source_output *so = NULL;
3981         pa_source *source;
3982
3983         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
3984
3985         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3986
3987         if (idx_device != PA_INVALID_INDEX)
3988             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
3989         else
3990             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
3991
3992         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
3993
3994         if (pa_source_output_move_to(so, source, TRUE) < 0) {
3995             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
3996             return;
3997         }
3998     }
3999
4000     pa_pstream_send_simple_ack(c->pstream, tag);
4001 }
4002
4003 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4004     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4005     uint32_t idx = PA_INVALID_INDEX;
4006     const char *name = NULL;
4007     pa_bool_t b;
4008
4009     pa_native_connection_assert_ref(c);
4010     pa_assert(t);
4011
4012     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4013         pa_tagstruct_gets(t, &name) < 0 ||
4014         pa_tagstruct_get_boolean(t, &b) < 0 ||
4015         !pa_tagstruct_eof(t)) {
4016         protocol_error(c);
4017         return;
4018     }
4019
4020     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4021     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4022     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4023     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4024     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4025
4026     if (command == PA_COMMAND_SUSPEND_SINK) {
4027
4028         if (idx == PA_INVALID_INDEX && name && !*name) {
4029
4030             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4031
4032             if (pa_sink_suspend_all(c->protocol->core, b) < 0) {
4033                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4034                 return;
4035             }
4036         } else {
4037             pa_sink *sink = NULL;
4038
4039             if (idx != PA_INVALID_INDEX)
4040                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4041             else
4042                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4043
4044             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4045
4046             if (pa_sink_suspend(sink, b) < 0) {
4047                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4048                 return;
4049             }
4050         }
4051     } else {
4052
4053         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4054
4055         if (idx == PA_INVALID_INDEX && name && !*name) {
4056
4057             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4058
4059             if (pa_source_suspend_all(c->protocol->core, b) < 0) {
4060                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4061                 return;
4062             }
4063
4064         } else {
4065             pa_source *source;
4066
4067             if (idx != PA_INVALID_INDEX)
4068                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4069             else
4070                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4071
4072             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4073
4074             if (pa_source_suspend(source, b) < 0) {
4075                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4076                 return;
4077             }
4078         }
4079     }
4080
4081     pa_pstream_send_simple_ack(c->pstream, tag);
4082 }
4083
4084 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4085     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4086     uint32_t idx = PA_INVALID_INDEX;
4087     const char *name = NULL;
4088     pa_module *m;
4089     pa_native_protocol_ext_cb_t cb;
4090
4091     pa_native_connection_assert_ref(c);
4092     pa_assert(t);
4093
4094     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4095         pa_tagstruct_gets(t, &name) < 0) {
4096         protocol_error(c);
4097         return;
4098     }
4099
4100     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4101     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4102     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4103     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4104     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4105
4106     if (idx != PA_INVALID_INDEX)
4107         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4108     else {
4109         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4110             if (strcmp(name, m->name) == 0)
4111                 break;
4112     }
4113
4114     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4115     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4116
4117     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4118     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4119
4120     if (cb(c->protocol, m, c, tag, t) < 0)
4121         protocol_error(c);
4122 }
4123
4124 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4125     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4126     uint32_t idx = PA_INVALID_INDEX;
4127     const char *name = NULL, *profile = NULL;
4128     pa_card *card = NULL;
4129
4130     pa_native_connection_assert_ref(c);
4131     pa_assert(t);
4132
4133     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4134         pa_tagstruct_gets(t, &name) < 0 ||
4135         pa_tagstruct_gets(t, &profile) < 0 ||
4136         !pa_tagstruct_eof(t)) {
4137         protocol_error(c);
4138         return;
4139     }
4140
4141     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4142     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4143     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4144     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4145     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4146
4147     if (idx != PA_INVALID_INDEX)
4148         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4149     else
4150         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4151
4152     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4153
4154     if (pa_card_set_profile(card, profile, TRUE) < 0) {
4155         pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4156         return;
4157     }
4158
4159     pa_pstream_send_simple_ack(c->pstream, tag);
4160 }
4161
4162 /*** pstream callbacks ***/
4163
4164 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4165     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4166
4167     pa_assert(p);
4168     pa_assert(packet);
4169     pa_native_connection_assert_ref(c);
4170
4171     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4172         pa_log("invalid packet.");
4173         native_connection_unlink(c);
4174     }
4175 }
4176
4177 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4178     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4179     output_stream *stream;
4180
4181     pa_assert(p);
4182     pa_assert(chunk);
4183     pa_native_connection_assert_ref(c);
4184
4185     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4186         pa_log("client sent block for invalid stream.");
4187         /* Ignoring */
4188         return;
4189     }
4190
4191 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4192
4193     if (playback_stream_isinstance(stream)) {
4194         playback_stream *ps = PLAYBACK_STREAM(stream);
4195
4196         if (chunk->memblock) {
4197             if (seek != PA_SEEK_RELATIVE || offset != 0)
4198                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4199
4200             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4201         } else
4202             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4203
4204     } else {
4205         upload_stream *u = UPLOAD_STREAM(stream);
4206         size_t l;
4207
4208         if (!u->memchunk.memblock) {
4209             if (u->length == chunk->length && chunk->memblock) {
4210                 u->memchunk = *chunk;
4211                 pa_memblock_ref(u->memchunk.memblock);
4212                 u->length = 0;
4213             } else {
4214                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4215                 u->memchunk.index = u->memchunk.length = 0;
4216             }
4217         }
4218
4219         pa_assert(u->memchunk.memblock);
4220
4221         l = u->length;
4222         if (l > chunk->length)
4223             l = chunk->length;
4224
4225         if (l > 0) {
4226             void *dst;
4227             dst = pa_memblock_acquire(u->memchunk.memblock);
4228
4229             if (chunk->memblock) {
4230                 void *src;
4231                 src = pa_memblock_acquire(chunk->memblock);
4232
4233                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4234                        (uint8_t*) src + chunk->index, l);
4235
4236                 pa_memblock_release(chunk->memblock);
4237             } else
4238                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4239
4240             pa_memblock_release(u->memchunk.memblock);
4241
4242             u->memchunk.length += l;
4243             u->length -= l;
4244         }
4245     }
4246 }
4247
4248 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4249     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4250
4251     pa_assert(p);
4252     pa_native_connection_assert_ref(c);
4253
4254     native_connection_unlink(c);
4255     pa_log_info("Connection died.");
4256 }
4257
4258 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4259     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4260
4261     pa_assert(p);
4262     pa_native_connection_assert_ref(c);
4263
4264     native_connection_send_memblock(c);
4265 }
4266
4267 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4268     pa_thread_mq *q;
4269
4270     if (!(q = pa_thread_mq_get()))
4271         pa_pstream_send_revoke(p, block_id);
4272     else
4273         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4274 }
4275
4276 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4277     pa_thread_mq *q;
4278
4279     if (!(q = pa_thread_mq_get()))
4280         pa_pstream_send_release(p, block_id);
4281     else
4282         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4283 }
4284
4285 /*** client callbacks ***/
4286
4287 static void client_kill_cb(pa_client *c) {
4288     pa_assert(c);
4289
4290     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4291     pa_log_info("Connection killed.");
4292 }
4293
4294 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4295     pa_tagstruct *t;
4296     pa_native_connection *c;
4297
4298     pa_assert(client);
4299     c = PA_NATIVE_CONNECTION(client->userdata);
4300     pa_native_connection_assert_ref(c);
4301
4302     if (c->version < 15)
4303       return;
4304
4305     t = pa_tagstruct_new(NULL, 0);
4306     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4307     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4308     pa_tagstruct_puts(t, event);
4309     pa_tagstruct_put_proplist(t, pl);
4310     pa_pstream_send_tagstruct(c->pstream, t);
4311 }
4312
4313 /*** module entry points ***/
4314
4315 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
4316     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4317
4318     pa_assert(m);
4319     pa_assert(tv);
4320     pa_native_connection_assert_ref(c);
4321     pa_assert(c->auth_timeout_event == e);
4322
4323     if (!c->authorized) {
4324         native_connection_unlink(c);
4325         pa_log_info("Connection terminated due to authentication timeout.");
4326     }
4327 }
4328
4329 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4330     pa_native_connection *c;
4331     char pname[128];
4332     pa_client *client;
4333     pa_client_new_data data;
4334
4335     pa_assert(p);
4336     pa_assert(io);
4337     pa_assert(o);
4338
4339     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4340         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4341         pa_iochannel_free(io);
4342         return;
4343     }
4344
4345     pa_client_new_data_init(&data);
4346     data.module = o->module;
4347     data.driver = __FILE__;
4348     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4349     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4350     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4351     client = pa_client_new(p->core, &data);
4352     pa_client_new_data_done(&data);
4353
4354     if (!client)
4355         return;
4356
4357     c = pa_msgobject_new(pa_native_connection);
4358     c->parent.parent.free = native_connection_free;
4359     c->parent.process_msg = native_connection_process_msg;
4360     c->protocol = p;
4361     c->options = pa_native_options_ref(o);
4362     c->authorized = FALSE;
4363
4364     if (o->auth_anonymous) {
4365         pa_log_info("Client authenticated anonymously.");
4366         c->authorized = TRUE;
4367     }
4368
4369     if (!c->authorized &&
4370         o->auth_ip_acl &&
4371         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4372
4373         pa_log_info("Client authenticated by IP ACL.");
4374         c->authorized = TRUE;
4375     }
4376
4377     if (!c->authorized) {
4378         struct timeval tv;
4379         pa_gettimeofday(&tv);
4380         tv.tv_sec += AUTH_TIMEOUT;
4381         c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c);
4382     } else
4383         c->auth_timeout_event = NULL;
4384
4385     c->is_local = pa_iochannel_socket_is_local(io);
4386     c->version = 8;
4387
4388     c->client = client;
4389     c->client->kill = client_kill_cb;
4390     c->client->send_event = client_send_event_cb;
4391     c->client->userdata = c;
4392
4393     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4394     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4395     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4396     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4397     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4398     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4399     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4400
4401     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
4402
4403     c->record_streams = pa_idxset_new(NULL, NULL);
4404     c->output_streams = pa_idxset_new(NULL, NULL);
4405
4406     c->rrobin_index = PA_IDXSET_INVALID;
4407     c->subscription = NULL;
4408
4409     pa_idxset_put(p->connections, c, NULL);
4410
4411 #ifdef HAVE_CREDS
4412     if (pa_iochannel_creds_supported(io))
4413         pa_iochannel_creds_enable(io);
4414 #endif
4415
4416     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4417 }
4418
4419 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4420     pa_native_connection *c;
4421     void *state = NULL;
4422
4423     pa_assert(p);
4424     pa_assert(m);
4425
4426     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4427         if (c->options->module == m)
4428             native_connection_unlink(c);
4429 }
4430
4431 static pa_native_protocol* native_protocol_new(pa_core *c) {
4432     pa_native_protocol *p;
4433     pa_native_hook_t h;
4434
4435     pa_assert(c);
4436
4437     p = pa_xnew(pa_native_protocol, 1);
4438     PA_REFCNT_INIT(p);
4439     p->core = c;
4440     p->connections = pa_idxset_new(NULL, NULL);
4441
4442     p->servers = NULL;
4443
4444     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4445
4446     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4447         pa_hook_init(&p->hooks[h], p);
4448
4449     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4450
4451     return p;
4452 }
4453
4454 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4455     pa_native_protocol *p;
4456
4457     if ((p = pa_shared_get(c, "native-protocol")))
4458         return pa_native_protocol_ref(p);
4459
4460     return native_protocol_new(c);
4461 }
4462
4463 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4464     pa_assert(p);
4465     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4466
4467     PA_REFCNT_INC(p);
4468
4469     return p;
4470 }
4471
4472 void pa_native_protocol_unref(pa_native_protocol *p) {
4473     pa_native_connection *c;
4474     pa_native_hook_t h;
4475
4476     pa_assert(p);
4477     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4478
4479     if (PA_REFCNT_DEC(p) > 0)
4480         return;
4481
4482     while ((c = pa_idxset_first(p->connections, NULL)))
4483         native_connection_unlink(c);
4484
4485     pa_idxset_free(p->connections, NULL, NULL);
4486
4487     pa_strlist_free(p->servers);
4488
4489     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4490         pa_hook_done(&p->hooks[h]);
4491
4492     pa_hashmap_free(p->extensions, NULL, NULL);
4493
4494     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4495
4496     pa_xfree(p);
4497 }
4498
4499 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4500     pa_assert(p);
4501     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4502     pa_assert(name);
4503
4504     p->servers = pa_strlist_prepend(p->servers, name);
4505
4506     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4507 }
4508
4509 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4510     pa_assert(p);
4511     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4512     pa_assert(name);
4513
4514     p->servers = pa_strlist_remove(p->servers, name);
4515
4516     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4517 }
4518
4519 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4520     pa_assert(p);
4521     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4522
4523     return p->hooks;
4524 }
4525
4526 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4527     pa_assert(p);
4528     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4529
4530     return p->servers;
4531 }
4532
4533 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4534     pa_assert(p);
4535     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4536     pa_assert(m);
4537     pa_assert(cb);
4538     pa_assert(!pa_hashmap_get(p->extensions, m));
4539
4540     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4541     return 0;
4542 }
4543
4544 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4545     pa_assert(p);
4546     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4547     pa_assert(m);
4548
4549     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4550 }
4551
4552 pa_native_options* pa_native_options_new(void) {
4553     pa_native_options *o;
4554
4555     o = pa_xnew0(pa_native_options, 1);
4556     PA_REFCNT_INIT(o);
4557
4558     return o;
4559 }
4560
4561 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4562     pa_assert(o);
4563     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4564
4565     PA_REFCNT_INC(o);
4566
4567     return o;
4568 }
4569
4570 void pa_native_options_unref(pa_native_options *o) {
4571     pa_assert(o);
4572     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4573
4574     if (PA_REFCNT_DEC(o) > 0)
4575         return;
4576
4577     pa_xfree(o->auth_group);
4578
4579     if (o->auth_ip_acl)
4580         pa_ip_acl_free(o->auth_ip_acl);
4581
4582     if (o->auth_cookie)
4583         pa_auth_cookie_unref(o->auth_cookie);
4584
4585     pa_xfree(o);
4586 }
4587
4588 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4589     pa_bool_t enabled;
4590     const char *acl;
4591
4592     pa_assert(o);
4593     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4594     pa_assert(ma);
4595
4596     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4597         pa_log("auth-anonymous= expects a boolean argument.");
4598         return -1;
4599     }
4600
4601     enabled = TRUE;
4602     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4603         pa_log("auth-group-enabled= expects a boolean argument.");
4604         return -1;
4605     }
4606
4607     pa_xfree(o->auth_group);
4608     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4609
4610 #ifndef HAVE_CREDS
4611     if (o->auth_group)
4612         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4613 #endif
4614
4615     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4616         pa_ip_acl *ipa;
4617
4618         if (!(ipa = pa_ip_acl_new(acl))) {
4619             pa_log("Failed to parse IP ACL '%s'", acl);
4620             return -1;
4621         }
4622
4623         if (o->auth_ip_acl)
4624             pa_ip_acl_free(o->auth_ip_acl);
4625
4626         o->auth_ip_acl = ipa;
4627     }
4628
4629     enabled = TRUE;
4630     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4631         pa_log("auth-cookie-enabled= expects a boolean argument.");
4632         return -1;
4633     }
4634
4635     if (o->auth_cookie)
4636         pa_auth_cookie_unref(o->auth_cookie);
4637
4638     if (enabled) {
4639         const char *cn;
4640
4641         /* The new name for this is 'auth-cookie', for compat reasons
4642          * we check the old name too */
4643         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4644             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4645                 cn = PA_NATIVE_COOKIE_FILE;
4646
4647         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4648             return -1;
4649
4650     } else
4651           o->auth_cookie = NULL;
4652
4653     return 0;
4654 }
4655
4656 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4657     pa_native_connection_assert_ref(c);
4658
4659     return c->pstream;
4660 }