protocol-native: Allow clients to know at what index underrun occurred
[platform/upstream/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38 #include <pulse/internal.h>
39
40 #include <pulsecore/native-common.h>
41 #include <pulsecore/packet.h>
42 #include <pulsecore/client.h>
43 #include <pulsecore/source-output.h>
44 #include <pulsecore/sink-input.h>
45 #include <pulsecore/pstream.h>
46 #include <pulsecore/tagstruct.h>
47 #include <pulsecore/pdispatch.h>
48 #include <pulsecore/pstream-util.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* Kick a client if it doesn't authenticate within this time */
64 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
65
66 /* Don't accept more connection than this */
67 #define MAX_CONNECTIONS 64
68
69 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
70 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
71 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
72 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
73
74 struct pa_native_protocol;
75
76 typedef struct record_stream {
77     pa_msgobject parent;
78
79     pa_native_connection *connection;
80     uint32_t index;
81
82     pa_source_output *source_output;
83     pa_memblockq *memblockq;
84
85     pa_bool_t adjust_latency:1;
86     pa_bool_t early_requests:1;
87
88     /* Requested buffer attributes */
89     pa_buffer_attr buffer_attr_req;
90     /* Fixed-up and adjusted buffer attributes */
91     pa_buffer_attr buffer_attr;
92
93     pa_atomic_t on_the_fly;
94     pa_usec_t configured_source_latency;
95     size_t drop_initial;
96
97     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
98     size_t on_the_fly_snapshot;
99     pa_usec_t current_monitor_latency;
100     pa_usec_t current_source_latency;
101 } record_stream;
102
103 #define RECORD_STREAM(o) (record_stream_cast(o))
104 PA_DEFINE_PRIVATE_CLASS(record_stream, pa_msgobject);
105
106 typedef struct output_stream {
107     pa_msgobject parent;
108 } output_stream;
109
110 #define OUTPUT_STREAM(o) (output_stream_cast(o))
111 PA_DEFINE_PRIVATE_CLASS(output_stream, pa_msgobject);
112
113 typedef struct playback_stream {
114     output_stream parent;
115
116     pa_native_connection *connection;
117     uint32_t index;
118
119     pa_sink_input *sink_input;
120     pa_memblockq *memblockq;
121
122     pa_bool_t adjust_latency:1;
123     pa_bool_t early_requests:1;
124
125     pa_bool_t is_underrun:1;
126     pa_bool_t drain_request:1;
127     uint32_t drain_tag;
128     uint32_t syncid;
129
130     /* Optimization to avoid too many rewinds with a lot of small blocks */
131     pa_atomic_t seek_or_post_in_queue;
132     int64_t seek_windex;
133
134     pa_atomic_t missing;
135     pa_usec_t configured_sink_latency;
136     /* Requested buffer attributes */
137     pa_buffer_attr buffer_attr_req;
138     /* Fixed-up and adjusted buffer attributes */
139     pa_buffer_attr buffer_attr;
140
141     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
142     int64_t read_index, write_index;
143     size_t render_memblockq_length;
144     pa_usec_t current_sink_latency;
145     uint64_t playing_for, underrun_for;
146 } playback_stream;
147
148 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
149 PA_DEFINE_PRIVATE_CLASS(playback_stream, output_stream);
150
151 typedef struct upload_stream {
152     output_stream parent;
153
154     pa_native_connection *connection;
155     uint32_t index;
156
157     pa_memchunk memchunk;
158     size_t length;
159     char *name;
160     pa_sample_spec sample_spec;
161     pa_channel_map channel_map;
162     pa_proplist *proplist;
163 } upload_stream;
164
165 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
166 PA_DEFINE_PRIVATE_CLASS(upload_stream, output_stream);
167
168 struct pa_native_connection {
169     pa_msgobject parent;
170     pa_native_protocol *protocol;
171     pa_native_options *options;
172     pa_bool_t authorized:1;
173     pa_bool_t is_local:1;
174     uint32_t version;
175     pa_client *client;
176     pa_pstream *pstream;
177     pa_pdispatch *pdispatch;
178     pa_idxset *record_streams, *output_streams;
179     uint32_t rrobin_index;
180     pa_subscription *subscription;
181     pa_time_event *auth_timeout_event;
182 };
183
184 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
185 PA_DEFINE_PRIVATE_CLASS(pa_native_connection, pa_msgobject);
186
187 struct pa_native_protocol {
188     PA_REFCNT_DECLARE;
189
190     pa_core *core;
191     pa_idxset *connections;
192
193     pa_strlist *servers;
194     pa_hook hooks[PA_NATIVE_HOOK_MAX];
195
196     pa_hashmap *extensions;
197 };
198
199 enum {
200     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
201 };
202
203 enum {
204     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
205     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
206     SINK_INPUT_MESSAGE_FLUSH,
207     SINK_INPUT_MESSAGE_TRIGGER,
208     SINK_INPUT_MESSAGE_SEEK,
209     SINK_INPUT_MESSAGE_PREBUF_FORCE,
210     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
211     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
212 };
213
214 enum {
215     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
216     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
217     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
218     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
219     PLAYBACK_STREAM_MESSAGE_STARTED,
220     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
221 };
222
223 enum {
224     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
225 };
226
227 enum {
228     CONNECTION_MESSAGE_RELEASE,
229     CONNECTION_MESSAGE_REVOKE
230 };
231
232 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
233 static void sink_input_kill_cb(pa_sink_input *i);
234 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
235 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
236 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
237 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
238 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
239 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
240
241 static void native_connection_send_memblock(pa_native_connection *c);
242 static void playback_stream_request_bytes(struct playback_stream*s);
243
244 static void source_output_kill_cb(pa_source_output *o);
245 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
246 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
247 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
248 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
249 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
250
251 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
252 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
253
254 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
289 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
290 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
291 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
292 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
293
294 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
295     [PA_COMMAND_ERROR] = NULL,
296     [PA_COMMAND_TIMEOUT] = NULL,
297     [PA_COMMAND_REPLY] = NULL,
298     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
299     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
300     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
301     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
302     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
303     [PA_COMMAND_AUTH] = command_auth,
304     [PA_COMMAND_REQUEST] = NULL,
305     [PA_COMMAND_EXIT] = command_exit,
306     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
307     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
308     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
309     [PA_COMMAND_STAT] = command_stat,
310     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
311     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
312     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
313     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
314     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
315     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
316     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
317     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
318     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
319     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
320     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
321     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
322     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
323     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
324     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
325     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
328     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
329     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
330     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
331     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
332     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
333     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
334     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
335
336     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
337     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
338     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
339     [PA_COMMAND_SET_SOURCE_OUTPUT_VOLUME] = command_set_volume,
340
341     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
342     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
343     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
344     [PA_COMMAND_SET_SOURCE_OUTPUT_MUTE] = command_set_mute,
345
346     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
347     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
348
349     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
350     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
351     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
352     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
353
354     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
355     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
356
357     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
358     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
359     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
360     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
361     [PA_COMMAND_KILL_CLIENT] = command_kill,
362     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
363     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
364     [PA_COMMAND_LOAD_MODULE] = command_load_module,
365     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
366
367     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
368     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
369     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
370     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
371
372     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
373     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
374
375     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
376     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
377
378     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
379     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
380
381     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
382     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
383     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
384
385     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
386     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
387     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
388
389     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
390
391     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
392     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
393
394     [PA_COMMAND_EXTENSION] = command_extension
395 };
396
397 /* structure management */
398
399 /* Called from main context */
400 static void upload_stream_unlink(upload_stream *s) {
401     pa_assert(s);
402
403     if (!s->connection)
404         return;
405
406     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
407     s->connection = NULL;
408     upload_stream_unref(s);
409 }
410
411 /* Called from main context */
412 static void upload_stream_free(pa_object *o) {
413     upload_stream *s = UPLOAD_STREAM(o);
414     pa_assert(s);
415
416     upload_stream_unlink(s);
417
418     pa_xfree(s->name);
419
420     if (s->proplist)
421         pa_proplist_free(s->proplist);
422
423     if (s->memchunk.memblock)
424         pa_memblock_unref(s->memchunk.memblock);
425
426     pa_xfree(s);
427 }
428
429 /* Called from main context */
430 static upload_stream* upload_stream_new(
431         pa_native_connection *c,
432         const pa_sample_spec *ss,
433         const pa_channel_map *map,
434         const char *name,
435         size_t length,
436         pa_proplist *p) {
437
438     upload_stream *s;
439
440     pa_assert(c);
441     pa_assert(ss);
442     pa_assert(name);
443     pa_assert(length > 0);
444     pa_assert(p);
445
446     s = pa_msgobject_new(upload_stream);
447     s->parent.parent.parent.free = upload_stream_free;
448     s->connection = c;
449     s->sample_spec = *ss;
450     s->channel_map = *map;
451     s->name = pa_xstrdup(name);
452     pa_memchunk_reset(&s->memchunk);
453     s->length = length;
454     s->proplist = pa_proplist_copy(p);
455     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
456
457     pa_idxset_put(c->output_streams, s, &s->index);
458
459     return s;
460 }
461
462 /* Called from main context */
463 static void record_stream_unlink(record_stream *s) {
464     pa_assert(s);
465
466     if (!s->connection)
467         return;
468
469     if (s->source_output) {
470         pa_source_output_unlink(s->source_output);
471         pa_source_output_unref(s->source_output);
472         s->source_output = NULL;
473     }
474
475     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
476     s->connection = NULL;
477     record_stream_unref(s);
478 }
479
480 /* Called from main context */
481 static void record_stream_free(pa_object *o) {
482     record_stream *s = RECORD_STREAM(o);
483     pa_assert(s);
484
485     record_stream_unlink(s);
486
487     pa_memblockq_free(s->memblockq);
488     pa_xfree(s);
489 }
490
491 /* Called from main context */
492 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
493     record_stream *s = RECORD_STREAM(o);
494     record_stream_assert_ref(s);
495
496     if (!s->connection)
497         return -1;
498
499     switch (code) {
500
501         case RECORD_STREAM_MESSAGE_POST_DATA:
502
503             /* We try to keep up to date with how many bytes are
504              * currently on the fly */
505             pa_atomic_sub(&s->on_the_fly, chunk->length);
506
507             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
508 /*                 pa_log_warn("Failed to push data into output queue."); */
509                 return -1;
510             }
511
512             if (!pa_pstream_is_pending(s->connection->pstream))
513                 native_connection_send_memblock(s->connection);
514
515             break;
516     }
517
518     return 0;
519 }
520
521 /* Called from main context */
522 static void fix_record_buffer_attr_pre(record_stream *s) {
523
524     size_t frame_size;
525     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
526
527     pa_assert(s);
528
529     /* This function will be called from the main thread, before as
530      * well as after the source output has been activated using
531      * pa_source_output_put()! That means it may not touch any
532      * ->thread_info data! */
533
534     frame_size = pa_frame_size(&s->source_output->sample_spec);
535     s->buffer_attr = s->buffer_attr_req;
536
537     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
538         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
539     if (s->buffer_attr.maxlength <= 0)
540         s->buffer_attr.maxlength = (uint32_t) frame_size;
541
542     if (s->buffer_attr.fragsize == (uint32_t) -1)
543         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
544     if (s->buffer_attr.fragsize <= 0)
545         s->buffer_attr.fragsize = (uint32_t) frame_size;
546
547     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
548
549     if (s->early_requests) {
550
551         /* In early request mode we need to emulate the classic
552          * fragment-based playback model. We do this setting the source
553          * latency to the fragment size. */
554
555         source_usec = fragsize_usec;
556
557     } else if (s->adjust_latency) {
558
559         /* So, the user asked us to adjust the latency according to
560          * what the source can provide. Half the latency will be
561          * spent on the hw buffer, half of it in the async buffer
562          * queue we maintain for each client. */
563
564         source_usec = fragsize_usec/2;
565
566     } else {
567
568         /* Ok, the user didn't ask us to adjust the latency, hence we
569          * don't */
570
571         source_usec = (pa_usec_t) -1;
572     }
573
574     if (source_usec != (pa_usec_t) -1)
575         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
576     else
577         s->configured_source_latency = 0;
578
579     if (s->early_requests) {
580
581         /* Ok, we didn't necessarily get what we were asking for, so
582          * let's tell the user */
583
584         fragsize_usec = s->configured_source_latency;
585
586     } else if (s->adjust_latency) {
587
588         /* Now subtract what we actually got */
589
590         if (fragsize_usec >= s->configured_source_latency*2)
591             fragsize_usec -= s->configured_source_latency;
592         else
593             fragsize_usec = s->configured_source_latency;
594     }
595
596     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
597         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
598
599         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
600
601     if (s->buffer_attr.fragsize <= 0)
602         s->buffer_attr.fragsize = (uint32_t) frame_size;
603 }
604
605 /* Called from main context */
606 static void fix_record_buffer_attr_post(record_stream *s) {
607     size_t base;
608
609     pa_assert(s);
610
611     /* This function will be called from the main thread, before as
612      * well as after the source output has been activated using
613      * pa_source_output_put()! That means it may not touch and
614      * ->thread_info data! */
615
616     base = pa_frame_size(&s->source_output->sample_spec);
617
618     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
619     if (s->buffer_attr.fragsize <= 0)
620         s->buffer_attr.fragsize = base;
621
622     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
623         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
624 }
625
626 /* Called from main context */
627 static record_stream* record_stream_new(
628         pa_native_connection *c,
629         pa_source *source,
630         pa_sample_spec *ss,
631         pa_channel_map *map,
632         pa_idxset *formats,
633         pa_buffer_attr *attr,
634         pa_cvolume *volume,
635         pa_bool_t muted,
636         pa_bool_t muted_set,
637         pa_source_output_flags_t flags,
638         pa_proplist *p,
639         pa_bool_t adjust_latency,
640         pa_bool_t early_requests,
641         pa_bool_t relative_volume,
642         pa_bool_t peak_detect,
643         pa_sink_input *direct_on_input,
644         int *ret) {
645
646     record_stream *s;
647     pa_source_output *source_output = NULL;
648     pa_source_output_new_data data;
649
650     pa_assert(c);
651     pa_assert(ss);
652     pa_assert(p);
653     pa_assert(ret);
654
655     pa_source_output_new_data_init(&data);
656
657     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
658     data.driver = __FILE__;
659     data.module = c->options->module;
660     data.client = c->client;
661     if (source)
662         pa_source_output_new_data_set_source(&data, source, TRUE);
663     if (pa_sample_spec_valid(ss))
664         pa_source_output_new_data_set_sample_spec(&data, ss);
665     if (pa_channel_map_valid(map))
666         pa_source_output_new_data_set_channel_map(&data, map);
667     if (formats)
668         pa_source_output_new_data_set_formats(&data, formats);
669     data.direct_on_input = direct_on_input;
670     if (volume) {
671         pa_source_output_new_data_set_volume(&data, volume);
672         data.volume_is_absolute = !relative_volume;
673         data.save_volume = TRUE;
674     }
675     if (muted_set) {
676         pa_source_output_new_data_set_muted(&data, muted);
677         data.save_muted = TRUE;
678     }
679     if (peak_detect)
680         data.resample_method = PA_RESAMPLER_PEAKS;
681     data.flags = flags;
682
683     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data);
684
685     pa_source_output_new_data_done(&data);
686
687     if (!source_output)
688         return NULL;
689
690     s = pa_msgobject_new(record_stream);
691     s->parent.parent.free = record_stream_free;
692     s->parent.process_msg = record_stream_process_msg;
693     s->connection = c;
694     s->source_output = source_output;
695     s->buffer_attr_req = *attr;
696     s->adjust_latency = adjust_latency;
697     s->early_requests = early_requests;
698     pa_atomic_store(&s->on_the_fly, 0);
699
700     s->source_output->parent.process_msg = source_output_process_msg;
701     s->source_output->push = source_output_push_cb;
702     s->source_output->kill = source_output_kill_cb;
703     s->source_output->get_latency = source_output_get_latency_cb;
704     s->source_output->moving = source_output_moving_cb;
705     s->source_output->suspend = source_output_suspend_cb;
706     s->source_output->send_event = source_output_send_event_cb;
707     s->source_output->userdata = s;
708
709     fix_record_buffer_attr_pre(s);
710
711     s->memblockq = pa_memblockq_new(
712             0,
713             s->buffer_attr.maxlength,
714             0,
715             pa_frame_size(&source_output->sample_spec),
716             1,
717             0,
718             0,
719             NULL);
720
721     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
722     fix_record_buffer_attr_post(s);
723
724     *ss = s->source_output->sample_spec;
725     *map = s->source_output->channel_map;
726
727     pa_idxset_put(c->record_streams, s, &s->index);
728
729     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
730                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
731                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
732                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
733
734     pa_source_output_put(s->source_output);
735     return s;
736 }
737
738 /* Called from main context */
739 static void record_stream_send_killed(record_stream *r) {
740     pa_tagstruct *t;
741     record_stream_assert_ref(r);
742
743     t = pa_tagstruct_new(NULL, 0);
744     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
745     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
746     pa_tagstruct_putu32(t, r->index);
747     pa_pstream_send_tagstruct(r->connection->pstream, t);
748 }
749
750 /* Called from main context */
751 static void playback_stream_unlink(playback_stream *s) {
752     pa_assert(s);
753
754     if (!s->connection)
755         return;
756
757     if (s->sink_input) {
758         pa_sink_input_unlink(s->sink_input);
759         pa_sink_input_unref(s->sink_input);
760         s->sink_input = NULL;
761     }
762
763     if (s->drain_request)
764         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
765
766     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
767     s->connection = NULL;
768     playback_stream_unref(s);
769 }
770
771 /* Called from main context */
772 static void playback_stream_free(pa_object* o) {
773     playback_stream *s = PLAYBACK_STREAM(o);
774     pa_assert(s);
775
776     playback_stream_unlink(s);
777
778     pa_memblockq_free(s->memblockq);
779     pa_xfree(s);
780 }
781
782 /* Called from main context */
783 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
784     playback_stream *s = PLAYBACK_STREAM(o);
785     playback_stream_assert_ref(s);
786
787     if (!s->connection)
788         return -1;
789
790     switch (code) {
791
792         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
793             pa_tagstruct *t;
794             int l = 0;
795
796             for (;;) {
797                 if ((l = pa_atomic_load(&s->missing)) <= 0)
798                     return 0;
799
800                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
801                     break;
802             }
803
804             t = pa_tagstruct_new(NULL, 0);
805             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
806             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
807             pa_tagstruct_putu32(t, s->index);
808             pa_tagstruct_putu32(t, (uint32_t) l);
809             pa_pstream_send_tagstruct(s->connection->pstream, t);
810
811 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
812             break;
813         }
814
815         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
816             pa_tagstruct *t;
817
818 /*             pa_log("signalling underflow"); */
819
820             /* Report that we're empty */
821             t = pa_tagstruct_new(NULL, 0);
822             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
823             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
824             pa_tagstruct_putu32(t, s->index);
825             if (s->connection->version >= 23)
826                 pa_tagstruct_puts64(t, offset);
827             pa_pstream_send_tagstruct(s->connection->pstream, t);
828             break;
829         }
830
831         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
832             pa_tagstruct *t;
833
834             /* Notify the user we're overflowed*/
835             t = pa_tagstruct_new(NULL, 0);
836             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
837             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
838             pa_tagstruct_putu32(t, s->index);
839             pa_pstream_send_tagstruct(s->connection->pstream, t);
840             break;
841         }
842
843         case PLAYBACK_STREAM_MESSAGE_STARTED:
844
845             if (s->connection->version >= 13) {
846                 pa_tagstruct *t;
847
848                 /* Notify the user we started playback */
849                 t = pa_tagstruct_new(NULL, 0);
850                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
851                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
852                 pa_tagstruct_putu32(t, s->index);
853                 pa_pstream_send_tagstruct(s->connection->pstream, t);
854             }
855
856             break;
857
858         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
859             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
860             break;
861
862         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH:
863
864             s->buffer_attr.tlength = (uint32_t) offset;
865
866             if (s->connection->version >= 15) {
867                 pa_tagstruct *t;
868
869                 t = pa_tagstruct_new(NULL, 0);
870                 pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
871                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
872                 pa_tagstruct_putu32(t, s->index);
873                 pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
874                 pa_tagstruct_putu32(t, s->buffer_attr.tlength);
875                 pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
876                 pa_tagstruct_putu32(t, s->buffer_attr.minreq);
877                 pa_tagstruct_put_usec(t, s->configured_sink_latency);
878                 pa_pstream_send_tagstruct(s->connection->pstream, t);
879             }
880
881             break;
882     }
883
884     return 0;
885 }
886
887 /* Called from main context */
888 static void fix_playback_buffer_attr(playback_stream *s) {
889     size_t frame_size, max_prebuf;
890     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
891
892     pa_assert(s);
893
894     /* pa_log("Client requested: maxlength=%li bytes tlength=%li bytes minreq=%li bytes prebuf=%li bytes", */
895     /*        (long) s->buffer_attr.maxlength, */
896     /*        (long) s->buffer_attr.tlength, */
897     /*        (long) s->buffer_attr.minreq, */
898     /*        (long) s->buffer_attr.prebuf); */
899
900     /* pa_log("Client requested: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms", */
901     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
902     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
903     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
904     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC)); */
905
906     /* This function will be called from the main thread, before as
907      * well as after the sink input has been activated using
908      * pa_sink_input_put()! That means it may not touch any
909      * ->thread_info data, such as the memblockq! */
910
911     frame_size = pa_frame_size(&s->sink_input->sample_spec);
912     s->buffer_attr = s->buffer_attr_req;
913
914     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
915         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
916     if (s->buffer_attr.maxlength <= 0)
917         s->buffer_attr.maxlength = (uint32_t) frame_size;
918
919     if (s->buffer_attr.tlength == (uint32_t) -1)
920         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
921     if (s->buffer_attr.tlength <= 0)
922         s->buffer_attr.tlength = (uint32_t) frame_size;
923
924     if (s->buffer_attr.minreq == (uint32_t) -1)
925         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
926     if (s->buffer_attr.minreq <= 0)
927         s->buffer_attr.minreq = (uint32_t) frame_size;
928
929     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
930         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
931
932     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
933     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
934
935     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
936                 (double) tlength_usec / PA_USEC_PER_MSEC,
937                 (double) minreq_usec / PA_USEC_PER_MSEC);
938
939     if (s->early_requests) {
940
941         /* In early request mode we need to emulate the classic
942          * fragment-based playback model. We do this setting the sink
943          * latency to the fragment size. */
944
945         sink_usec = minreq_usec;
946         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
947
948     } else if (s->adjust_latency) {
949
950         /* So, the user asked us to adjust the latency of the stream
951          * buffer according to the what the sink can provide. The
952          * tlength passed in shall be the overall latency. Roughly
953          * half the latency will be spent on the hw buffer, the other
954          * half of it in the async buffer queue we maintain for each
955          * client. In between we'll have a safety space of size
956          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
957          * empty and needs to be filled, then our buffer must have
958          * enough data to fulfill this request immediatly and thus
959          * have at least the same tlength as the size of the hw
960          * buffer. It additionally needs space for 2 times minreq
961          * because if the buffer ran empty and a partial fillup
962          * happens immediately on the next iteration we need to be
963          * able to fulfill it and give the application also minreq
964          * time to fill it up again for the next request Makes 2 times
965          * minreq in plus.. */
966
967         if (tlength_usec > minreq_usec*2)
968             sink_usec = (tlength_usec - minreq_usec*2)/2;
969         else
970             sink_usec = 0;
971
972         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
973
974     } else {
975
976         /* Ok, the user didn't ask us to adjust the latency, but we
977          * still need to make sure that the parameters from the user
978          * do make sense. */
979
980         if (tlength_usec > minreq_usec*2)
981             sink_usec = (tlength_usec - minreq_usec*2);
982         else
983             sink_usec = 0;
984
985         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
986     }
987
988     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
989
990     if (s->early_requests) {
991
992         /* Ok, we didn't necessarily get what we were asking for, so
993          * let's tell the user */
994
995         minreq_usec = s->configured_sink_latency;
996
997     } else if (s->adjust_latency) {
998
999         /* Ok, we didn't necessarily get what we were asking for, so
1000          * let's subtract from what we asked for for the remaining
1001          * buffer space */
1002
1003         if (tlength_usec >= s->configured_sink_latency)
1004             tlength_usec -= s->configured_sink_latency;
1005     }
1006
1007     /* FIXME: This is actually larger than necessary, since not all of
1008      * the sink latency is actually rewritable. */
1009     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
1010         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
1011
1012     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
1013         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
1014         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
1015
1016     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
1017         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
1018         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
1019
1020     if (s->buffer_attr.minreq <= 0) {
1021         s->buffer_attr.minreq = (uint32_t) frame_size;
1022         s->buffer_attr.tlength += (uint32_t) frame_size*2;
1023     }
1024
1025     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
1026         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
1027
1028     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
1029
1030     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
1031         s->buffer_attr.prebuf > max_prebuf)
1032         s->buffer_attr.prebuf = max_prebuf;
1033
1034     /* pa_log("Client accepted: maxlength=%lu ms tlength=%lu ms minreq=%lu ms prebuf=%lu ms", */
1035     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.maxlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
1036     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
1037     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC), */
1038     /*        (unsigned long) (pa_bytes_to_usec(s->buffer_attr.prebuf, &s->sink_input->sample_spec) / PA_USEC_PER_MSEC)); */
1039 }
1040
1041 /* Called from main context */
1042 static playback_stream* playback_stream_new(
1043         pa_native_connection *c,
1044         pa_sink *sink,
1045         pa_sample_spec *ss,
1046         pa_channel_map *map,
1047         pa_idxset *formats,
1048         pa_buffer_attr *a,
1049         pa_cvolume *volume,
1050         pa_bool_t muted,
1051         pa_bool_t muted_set,
1052         pa_sink_input_flags_t flags,
1053         pa_proplist *p,
1054         pa_bool_t adjust_latency,
1055         pa_bool_t early_requests,
1056         pa_bool_t relative_volume,
1057         uint32_t syncid,
1058         uint32_t *missing,
1059         int *ret) {
1060
1061     /* Note: This function takes ownership of the 'formats' param, so we need
1062      * to take extra care to not leak it */
1063
1064     playback_stream *s, *ssync;
1065     pa_sink_input *sink_input = NULL;
1066     pa_memchunk silence;
1067     uint32_t idx;
1068     int64_t start_index;
1069     pa_sink_input_new_data data;
1070
1071     pa_assert(c);
1072     pa_assert(ss);
1073     pa_assert(missing);
1074     pa_assert(p);
1075     pa_assert(ret);
1076
1077     /* Find syncid group */
1078     PA_IDXSET_FOREACH(ssync, c->output_streams, idx) {
1079
1080         if (!playback_stream_isinstance(ssync))
1081             continue;
1082
1083         if (ssync->syncid == syncid)
1084             break;
1085     }
1086
1087     /* Synced streams must connect to the same sink */
1088     if (ssync) {
1089
1090         if (!sink)
1091             sink = ssync->sink_input->sink;
1092         else if (sink != ssync->sink_input->sink) {
1093             *ret = PA_ERR_INVALID;
1094             goto out;
1095         }
1096     }
1097
1098     pa_sink_input_new_data_init(&data);
1099
1100     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1101     data.driver = __FILE__;
1102     data.module = c->options->module;
1103     data.client = c->client;
1104     if (sink)
1105         pa_sink_input_new_data_set_sink(&data, sink, TRUE);
1106     if (pa_sample_spec_valid(ss))
1107         pa_sink_input_new_data_set_sample_spec(&data, ss);
1108     if (pa_channel_map_valid(map))
1109         pa_sink_input_new_data_set_channel_map(&data, map);
1110     if (formats) {
1111         pa_sink_input_new_data_set_formats(&data, formats);
1112         /* Ownership transferred to new_data, so we don't free it ourseleves */
1113         formats = NULL;
1114     }
1115     if (volume) {
1116         pa_sink_input_new_data_set_volume(&data, volume);
1117         data.volume_is_absolute = !relative_volume;
1118         data.save_volume = TRUE;
1119     }
1120     if (muted_set) {
1121         pa_sink_input_new_data_set_muted(&data, muted);
1122         data.save_muted = TRUE;
1123     }
1124     data.sync_base = ssync ? ssync->sink_input : NULL;
1125     data.flags = flags;
1126
1127     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data);
1128
1129     pa_sink_input_new_data_done(&data);
1130
1131     if (!sink_input)
1132         goto out;
1133
1134     s = pa_msgobject_new(playback_stream);
1135     s->parent.parent.parent.free = playback_stream_free;
1136     s->parent.parent.process_msg = playback_stream_process_msg;
1137     s->connection = c;
1138     s->syncid = syncid;
1139     s->sink_input = sink_input;
1140     s->is_underrun = TRUE;
1141     s->drain_request = FALSE;
1142     pa_atomic_store(&s->missing, 0);
1143     s->buffer_attr_req = *a;
1144     s->adjust_latency = adjust_latency;
1145     s->early_requests = early_requests;
1146     pa_atomic_store(&s->seek_or_post_in_queue, 0);
1147     s->seek_windex = -1;
1148
1149     s->sink_input->parent.process_msg = sink_input_process_msg;
1150     s->sink_input->pop = sink_input_pop_cb;
1151     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1152     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1153     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1154     s->sink_input->kill = sink_input_kill_cb;
1155     s->sink_input->moving = sink_input_moving_cb;
1156     s->sink_input->suspend = sink_input_suspend_cb;
1157     s->sink_input->send_event = sink_input_send_event_cb;
1158     s->sink_input->userdata = s;
1159
1160     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1161
1162     fix_playback_buffer_attr(s);
1163
1164     pa_sink_input_get_silence(sink_input, &silence);
1165     s->memblockq = pa_memblockq_new(
1166             start_index,
1167             s->buffer_attr.maxlength,
1168             s->buffer_attr.tlength,
1169             pa_frame_size(&sink_input->sample_spec),
1170             s->buffer_attr.prebuf,
1171             s->buffer_attr.minreq,
1172             0,
1173             &silence);
1174     pa_memblock_unref(silence.memblock);
1175
1176     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1177
1178     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1179
1180     /* pa_log("missing original: %li", (long int) *missing); */
1181
1182     *ss = s->sink_input->sample_spec;
1183     *map = s->sink_input->channel_map;
1184
1185     pa_idxset_put(c->output_streams, s, &s->index);
1186
1187     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1188                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1189                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1190                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1191                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1192
1193     pa_sink_input_put(s->sink_input);
1194
1195 out:
1196     if (formats)
1197         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
1198
1199     return s;
1200 }
1201
1202 /* Called from IO context */
1203 static void playback_stream_request_bytes(playback_stream *s) {
1204     size_t m, minreq;
1205     int previous_missing;
1206
1207     playback_stream_assert_ref(s);
1208
1209     m = pa_memblockq_pop_missing(s->memblockq);
1210
1211     /* pa_log("request_bytes(%lu) (tlength=%lu minreq=%lu length=%lu really missing=%lli)", */
1212     /*        (unsigned long) m, */
1213     /*        pa_memblockq_get_tlength(s->memblockq), */
1214     /*        pa_memblockq_get_minreq(s->memblockq), */
1215     /*        pa_memblockq_get_length(s->memblockq), */
1216     /*        (long long) pa_memblockq_get_tlength(s->memblockq) - (long long) pa_memblockq_get_length(s->memblockq)); */
1217
1218     if (m <= 0)
1219         return;
1220
1221 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1222
1223     previous_missing = pa_atomic_add(&s->missing, (int) m);
1224     minreq = pa_memblockq_get_minreq(s->memblockq);
1225
1226     if (pa_memblockq_prebuf_active(s->memblockq) ||
1227         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1228         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1229 }
1230
1231 /* Called from main context */
1232 static void playback_stream_send_killed(playback_stream *p) {
1233     pa_tagstruct *t;
1234     playback_stream_assert_ref(p);
1235
1236     t = pa_tagstruct_new(NULL, 0);
1237     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1238     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1239     pa_tagstruct_putu32(t, p->index);
1240     pa_pstream_send_tagstruct(p->connection->pstream, t);
1241 }
1242
1243 /* Called from main context */
1244 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1245     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1246     pa_native_connection_assert_ref(c);
1247
1248     if (!c->protocol)
1249         return -1;
1250
1251     switch (code) {
1252
1253         case CONNECTION_MESSAGE_REVOKE:
1254             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1255             break;
1256
1257         case CONNECTION_MESSAGE_RELEASE:
1258             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1259             break;
1260     }
1261
1262     return 0;
1263 }
1264
1265 /* Called from main context */
1266 static void native_connection_unlink(pa_native_connection *c) {
1267     record_stream *r;
1268     output_stream *o;
1269
1270     pa_assert(c);
1271
1272     if (!c->protocol)
1273         return;
1274
1275     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1276
1277     if (c->options)
1278         pa_native_options_unref(c->options);
1279
1280     while ((r = pa_idxset_first(c->record_streams, NULL)))
1281         record_stream_unlink(r);
1282
1283     while ((o = pa_idxset_first(c->output_streams, NULL)))
1284         if (playback_stream_isinstance(o))
1285             playback_stream_unlink(PLAYBACK_STREAM(o));
1286         else
1287             upload_stream_unlink(UPLOAD_STREAM(o));
1288
1289     if (c->subscription)
1290         pa_subscription_free(c->subscription);
1291
1292     if (c->pstream)
1293         pa_pstream_unlink(c->pstream);
1294
1295     if (c->auth_timeout_event) {
1296         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1297         c->auth_timeout_event = NULL;
1298     }
1299
1300     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1301     c->protocol = NULL;
1302     pa_native_connection_unref(c);
1303 }
1304
1305 /* Called from main context */
1306 static void native_connection_free(pa_object *o) {
1307     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1308
1309     pa_assert(c);
1310
1311     native_connection_unlink(c);
1312
1313     pa_idxset_free(c->record_streams, NULL, NULL);
1314     pa_idxset_free(c->output_streams, NULL, NULL);
1315
1316     pa_pdispatch_unref(c->pdispatch);
1317     pa_pstream_unref(c->pstream);
1318     pa_client_free(c->client);
1319
1320     pa_xfree(c);
1321 }
1322
1323 /* Called from main context */
1324 static void native_connection_send_memblock(pa_native_connection *c) {
1325     uint32_t start;
1326     record_stream *r;
1327
1328     start = PA_IDXSET_INVALID;
1329     for (;;) {
1330         pa_memchunk chunk;
1331
1332         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1333             return;
1334
1335         if (start == PA_IDXSET_INVALID)
1336             start = c->rrobin_index;
1337         else if (start == c->rrobin_index)
1338             return;
1339
1340         if (pa_memblockq_peek(r->memblockq, &chunk) >= 0) {
1341             pa_memchunk schunk = chunk;
1342
1343             if (schunk.length > r->buffer_attr.fragsize)
1344                 schunk.length = r->buffer_attr.fragsize;
1345
1346             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1347
1348             pa_memblockq_drop(r->memblockq, schunk.length);
1349             pa_memblock_unref(schunk.memblock);
1350
1351             return;
1352         }
1353     }
1354 }
1355
1356 /*** sink input callbacks ***/
1357
1358 /* Called from thread context */
1359 static void handle_seek(playback_stream *s, int64_t indexw) {
1360     playback_stream_assert_ref(s);
1361
1362 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1363
1364     if (s->sink_input->thread_info.underrun_for > 0) {
1365
1366 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1367
1368         if (pa_memblockq_is_readable(s->memblockq)) {
1369
1370             /* We just ended an underrun, let's ask the sink
1371              * for a complete rewind rewrite */
1372
1373             pa_log_debug("Requesting rewind due to end of underrun.");
1374             pa_sink_input_request_rewind(s->sink_input,
1375                                          (size_t) (s->sink_input->thread_info.underrun_for == (uint64_t) -1 ? 0 :
1376                                                    s->sink_input->thread_info.underrun_for),
1377                                          FALSE, TRUE, FALSE);
1378         }
1379
1380     } else {
1381         int64_t indexr;
1382
1383         indexr = pa_memblockq_get_read_index(s->memblockq);
1384
1385         if (indexw < indexr) {
1386             /* OK, the sink already asked for this data, so
1387              * let's have it usk us again */
1388
1389             pa_log_debug("Requesting rewind due to rewrite.");
1390             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1391         }
1392     }
1393
1394     playback_stream_request_bytes(s);
1395 }
1396
1397 static void flush_write_no_account(pa_memblockq *q) {
1398     pa_memblockq_flush_write(q, FALSE);
1399 }
1400
1401 /* Called from thread context */
1402 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1403     pa_sink_input *i = PA_SINK_INPUT(o);
1404     playback_stream *s;
1405
1406     pa_sink_input_assert_ref(i);
1407     s = PLAYBACK_STREAM(i->userdata);
1408     playback_stream_assert_ref(s);
1409
1410     switch (code) {
1411
1412         case SINK_INPUT_MESSAGE_SEEK:
1413         case SINK_INPUT_MESSAGE_POST_DATA: {
1414             int64_t windex = pa_memblockq_get_write_index(s->memblockq);
1415
1416             if (code == SINK_INPUT_MESSAGE_SEEK) {
1417                 /* The client side is incapable of accounting correctly
1418                  * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1419                  * able to deal with that. */
1420
1421                 pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1422                 windex = PA_MIN(windex, pa_memblockq_get_write_index(s->memblockq));
1423             }
1424
1425             if (chunk && pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1426                 if (pa_log_ratelimit(PA_LOG_WARN))
1427                     pa_log_warn("Failed to push data into queue");
1428                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1429                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1430             }
1431
1432             /* If more data is in queue, we rewind later instead. */
1433             if (s->seek_windex != -1)
1434                 windex = PA_MIN(windex, s->seek_windex);
1435             if (pa_atomic_dec(&s->seek_or_post_in_queue) > 1)
1436                 s->seek_windex = windex;
1437             else {
1438                 s->seek_windex = -1;
1439                 handle_seek(s, windex);
1440             }
1441             return 0;
1442         }
1443
1444         case SINK_INPUT_MESSAGE_DRAIN:
1445         case SINK_INPUT_MESSAGE_FLUSH:
1446         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1447         case SINK_INPUT_MESSAGE_TRIGGER: {
1448
1449             int64_t windex;
1450             pa_sink_input *isync;
1451             void (*func)(pa_memblockq *bq);
1452
1453             switch (code) {
1454                 case SINK_INPUT_MESSAGE_FLUSH:
1455                     func = flush_write_no_account;
1456                     break;
1457
1458                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1459                     func = pa_memblockq_prebuf_force;
1460                     break;
1461
1462                 case SINK_INPUT_MESSAGE_DRAIN:
1463                 case SINK_INPUT_MESSAGE_TRIGGER:
1464                     func = pa_memblockq_prebuf_disable;
1465                     break;
1466
1467                 default:
1468                     pa_assert_not_reached();
1469             }
1470
1471             windex = pa_memblockq_get_write_index(s->memblockq);
1472             func(s->memblockq);
1473             handle_seek(s, windex);
1474
1475             /* Do the same for all other members in the sync group */
1476             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1477                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1478                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1479                 func(ssync->memblockq);
1480                 handle_seek(ssync, windex);
1481             }
1482
1483             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1484                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1485                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1486                 func(ssync->memblockq);
1487                 handle_seek(ssync, windex);
1488             }
1489
1490             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1491                 if (!pa_memblockq_is_readable(s->memblockq))
1492                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1493                 else {
1494                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1495                     s->drain_request = TRUE;
1496                 }
1497             }
1498
1499             return 0;
1500         }
1501
1502         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1503             /* Atomically get a snapshot of all timing parameters... */
1504             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1505             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1506             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1507             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1508             s->underrun_for = s->sink_input->thread_info.underrun_for;
1509             s->playing_for = s->sink_input->thread_info.playing_for;
1510
1511             return 0;
1512
1513         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1514             int64_t windex;
1515
1516             windex = pa_memblockq_get_write_index(s->memblockq);
1517
1518             pa_memblockq_prebuf_force(s->memblockq);
1519
1520             handle_seek(s, windex);
1521
1522             /* Fall through to the default handler */
1523             break;
1524         }
1525
1526         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1527             pa_usec_t *r = userdata;
1528
1529             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1530
1531             /* Fall through, the default handler will add in the extra
1532              * latency added by the resampler */
1533             break;
1534         }
1535
1536         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1537             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1538             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1539             return 0;
1540         }
1541     }
1542
1543     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1544 }
1545
1546 /* Called from thread context */
1547 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1548     playback_stream *s;
1549
1550     pa_sink_input_assert_ref(i);
1551     s = PLAYBACK_STREAM(i->userdata);
1552     playback_stream_assert_ref(s);
1553     pa_assert(chunk);
1554
1555 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1556
1557     if (pa_memblockq_is_readable(s->memblockq))
1558         s->is_underrun = FALSE;
1559     else {
1560         if (!s->is_underrun)
1561             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1562
1563         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1564             s->drain_request = FALSE;
1565             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1566         } else if (!s->is_underrun)
1567             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, pa_memblockq_get_read_index(s->memblockq), NULL, NULL);
1568
1569         s->is_underrun = TRUE;
1570
1571         playback_stream_request_bytes(s);
1572     }
1573
1574     /* This call will not fail with prebuf=0, hence we check for
1575        underrun explicitly above */
1576     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1577         return -1;
1578
1579     chunk->length = PA_MIN(nbytes, chunk->length);
1580
1581     if (i->thread_info.underrun_for > 0)
1582         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1583
1584     pa_memblockq_drop(s->memblockq, chunk->length);
1585     playback_stream_request_bytes(s);
1586
1587     return 0;
1588 }
1589
1590 /* Called from thread context */
1591 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1592     playback_stream *s;
1593
1594     pa_sink_input_assert_ref(i);
1595     s = PLAYBACK_STREAM(i->userdata);
1596     playback_stream_assert_ref(s);
1597
1598     /* If we are in an underrun, then we don't rewind */
1599     if (i->thread_info.underrun_for > 0)
1600         return;
1601
1602     pa_memblockq_rewind(s->memblockq, nbytes);
1603 }
1604
1605 /* Called from thread context */
1606 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1607     playback_stream *s;
1608
1609     pa_sink_input_assert_ref(i);
1610     s = PLAYBACK_STREAM(i->userdata);
1611     playback_stream_assert_ref(s);
1612
1613     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1614 }
1615
1616 /* Called from thread context */
1617 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1618     playback_stream *s;
1619     size_t new_tlength, old_tlength;
1620
1621     pa_sink_input_assert_ref(i);
1622     s = PLAYBACK_STREAM(i->userdata);
1623     playback_stream_assert_ref(s);
1624
1625     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1626     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1627
1628     if (old_tlength < new_tlength) {
1629         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1630         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1631         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1632
1633         if (new_tlength == old_tlength)
1634             pa_log_debug("Failed to increase tlength");
1635         else {
1636             pa_log_debug("Notifying client about increased tlength");
1637             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1638         }
1639     }
1640 }
1641
1642 /* Called from main context */
1643 static void sink_input_kill_cb(pa_sink_input *i) {
1644     playback_stream *s;
1645
1646     pa_sink_input_assert_ref(i);
1647     s = PLAYBACK_STREAM(i->userdata);
1648     playback_stream_assert_ref(s);
1649
1650     playback_stream_send_killed(s);
1651     playback_stream_unlink(s);
1652 }
1653
1654 /* Called from main context */
1655 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1656     playback_stream *s;
1657     pa_tagstruct *t;
1658
1659     pa_sink_input_assert_ref(i);
1660     s = PLAYBACK_STREAM(i->userdata);
1661     playback_stream_assert_ref(s);
1662
1663     if (s->connection->version < 15)
1664       return;
1665
1666     t = pa_tagstruct_new(NULL, 0);
1667     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1668     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1669     pa_tagstruct_putu32(t, s->index);
1670     pa_tagstruct_puts(t, event);
1671     pa_tagstruct_put_proplist(t, pl);
1672     pa_pstream_send_tagstruct(s->connection->pstream, t);
1673 }
1674
1675 /* Called from main context */
1676 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1677     playback_stream *s;
1678     pa_tagstruct *t;
1679
1680     pa_sink_input_assert_ref(i);
1681     s = PLAYBACK_STREAM(i->userdata);
1682     playback_stream_assert_ref(s);
1683
1684     if (s->connection->version < 12)
1685       return;
1686
1687     t = pa_tagstruct_new(NULL, 0);
1688     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1689     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1690     pa_tagstruct_putu32(t, s->index);
1691     pa_tagstruct_put_boolean(t, suspend);
1692     pa_pstream_send_tagstruct(s->connection->pstream, t);
1693 }
1694
1695 /* Called from main context */
1696 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1697     playback_stream *s;
1698     pa_tagstruct *t;
1699
1700     pa_sink_input_assert_ref(i);
1701     s = PLAYBACK_STREAM(i->userdata);
1702     playback_stream_assert_ref(s);
1703
1704     if (!dest)
1705         return;
1706
1707     fix_playback_buffer_attr(s);
1708     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1709     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1710
1711     if (s->connection->version < 12)
1712       return;
1713
1714     t = pa_tagstruct_new(NULL, 0);
1715     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1716     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1717     pa_tagstruct_putu32(t, s->index);
1718     pa_tagstruct_putu32(t, dest->index);
1719     pa_tagstruct_puts(t, dest->name);
1720     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1721
1722     if (s->connection->version >= 13) {
1723         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1724         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1725         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1726         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1727         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1728     }
1729
1730     pa_pstream_send_tagstruct(s->connection->pstream, t);
1731 }
1732
1733 /*** source_output callbacks ***/
1734
1735 /* Called from thread context */
1736 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1737     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1738     record_stream *s;
1739
1740     pa_source_output_assert_ref(o);
1741     s = RECORD_STREAM(o->userdata);
1742     record_stream_assert_ref(s);
1743
1744     switch (code) {
1745         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1746             /* Atomically get a snapshot of all timing parameters... */
1747             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1748             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1749             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1750             return 0;
1751     }
1752
1753     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1754 }
1755
1756 /* Called from thread context */
1757 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1758     record_stream *s;
1759
1760     pa_source_output_assert_ref(o);
1761     s = RECORD_STREAM(o->userdata);
1762     record_stream_assert_ref(s);
1763     pa_assert(chunk);
1764
1765     pa_atomic_add(&s->on_the_fly, chunk->length);
1766     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1767 }
1768
1769 static void source_output_kill_cb(pa_source_output *o) {
1770     record_stream *s;
1771
1772     pa_source_output_assert_ref(o);
1773     s = RECORD_STREAM(o->userdata);
1774     record_stream_assert_ref(s);
1775
1776     record_stream_send_killed(s);
1777     record_stream_unlink(s);
1778 }
1779
1780 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1781     record_stream *s;
1782
1783     pa_source_output_assert_ref(o);
1784     s = RECORD_STREAM(o->userdata);
1785     record_stream_assert_ref(s);
1786
1787     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1788
1789     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1790 }
1791
1792 /* Called from main context */
1793 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1794     record_stream *s;
1795     pa_tagstruct *t;
1796
1797     pa_source_output_assert_ref(o);
1798     s = RECORD_STREAM(o->userdata);
1799     record_stream_assert_ref(s);
1800
1801     if (s->connection->version < 15)
1802       return;
1803
1804     t = pa_tagstruct_new(NULL, 0);
1805     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1806     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1807     pa_tagstruct_putu32(t, s->index);
1808     pa_tagstruct_puts(t, event);
1809     pa_tagstruct_put_proplist(t, pl);
1810     pa_pstream_send_tagstruct(s->connection->pstream, t);
1811 }
1812
1813 /* Called from main context */
1814 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1815     record_stream *s;
1816     pa_tagstruct *t;
1817
1818     pa_source_output_assert_ref(o);
1819     s = RECORD_STREAM(o->userdata);
1820     record_stream_assert_ref(s);
1821
1822     if (s->connection->version < 12)
1823       return;
1824
1825     t = pa_tagstruct_new(NULL, 0);
1826     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1827     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1828     pa_tagstruct_putu32(t, s->index);
1829     pa_tagstruct_put_boolean(t, suspend);
1830     pa_pstream_send_tagstruct(s->connection->pstream, t);
1831 }
1832
1833 /* Called from main context */
1834 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1835     record_stream *s;
1836     pa_tagstruct *t;
1837
1838     pa_source_output_assert_ref(o);
1839     s = RECORD_STREAM(o->userdata);
1840     record_stream_assert_ref(s);
1841
1842     if (!dest)
1843         return;
1844
1845     fix_record_buffer_attr_pre(s);
1846     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1847     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1848     fix_record_buffer_attr_post(s);
1849
1850     if (s->connection->version < 12)
1851       return;
1852
1853     t = pa_tagstruct_new(NULL, 0);
1854     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1855     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1856     pa_tagstruct_putu32(t, s->index);
1857     pa_tagstruct_putu32(t, dest->index);
1858     pa_tagstruct_puts(t, dest->name);
1859     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1860
1861     if (s->connection->version >= 13) {
1862         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1863         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1864         pa_tagstruct_put_usec(t, s->configured_source_latency);
1865     }
1866
1867     pa_pstream_send_tagstruct(s->connection->pstream, t);
1868 }
1869
1870 /*** pdispatch callbacks ***/
1871
1872 static void protocol_error(pa_native_connection *c) {
1873     pa_log("protocol error, kicking client");
1874     native_connection_unlink(c);
1875 }
1876
1877 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1878 if (!(expression)) { \
1879     pa_pstream_send_error((pstream), (tag), (error)); \
1880     return; \
1881 } \
1882 } while(0);
1883
1884 #define CHECK_VALIDITY_GOTO(pstream, expression, tag, error, label) do { \
1885 if (!(expression)) { \
1886     pa_pstream_send_error((pstream), (tag), (error)); \
1887     goto label; \
1888 } \
1889 } while(0);
1890
1891 static pa_tagstruct *reply_new(uint32_t tag) {
1892     pa_tagstruct *reply;
1893
1894     reply = pa_tagstruct_new(NULL, 0);
1895     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1896     pa_tagstruct_putu32(reply, tag);
1897     return reply;
1898 }
1899
1900 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1901     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1902     playback_stream *s;
1903     uint32_t sink_index, syncid, missing = 0;
1904     pa_buffer_attr attr;
1905     const char *name = NULL, *sink_name;
1906     pa_sample_spec ss;
1907     pa_channel_map map;
1908     pa_tagstruct *reply;
1909     pa_sink *sink = NULL;
1910     pa_cvolume volume;
1911     pa_bool_t
1912         corked = FALSE,
1913         no_remap = FALSE,
1914         no_remix = FALSE,
1915         fix_format = FALSE,
1916         fix_rate = FALSE,
1917         fix_channels = FALSE,
1918         no_move = FALSE,
1919         variable_rate = FALSE,
1920         muted = FALSE,
1921         adjust_latency = FALSE,
1922         early_requests = FALSE,
1923         dont_inhibit_auto_suspend = FALSE,
1924         volume_set = TRUE,
1925         muted_set = FALSE,
1926         fail_on_suspend = FALSE,
1927         relative_volume = FALSE,
1928         passthrough = FALSE;
1929
1930     pa_sink_input_flags_t flags = 0;
1931     pa_proplist *p = NULL;
1932     int ret = PA_ERR_INVALID;
1933     uint8_t n_formats = 0;
1934     pa_format_info *format;
1935     pa_idxset *formats = NULL;
1936     uint32_t i;
1937
1938     pa_native_connection_assert_ref(c);
1939     pa_assert(t);
1940     memset(&attr, 0, sizeof(attr));
1941
1942     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1943         pa_tagstruct_get(
1944                 t,
1945                 PA_TAG_SAMPLE_SPEC, &ss,
1946                 PA_TAG_CHANNEL_MAP, &map,
1947                 PA_TAG_U32, &sink_index,
1948                 PA_TAG_STRING, &sink_name,
1949                 PA_TAG_U32, &attr.maxlength,
1950                 PA_TAG_BOOLEAN, &corked,
1951                 PA_TAG_U32, &attr.tlength,
1952                 PA_TAG_U32, &attr.prebuf,
1953                 PA_TAG_U32, &attr.minreq,
1954                 PA_TAG_U32, &syncid,
1955                 PA_TAG_CVOLUME, &volume,
1956                 PA_TAG_INVALID) < 0) {
1957
1958         protocol_error(c);
1959         goto finish;
1960     }
1961
1962     CHECK_VALIDITY_GOTO(c->pstream, c->authorized, tag, PA_ERR_ACCESS, finish);
1963     CHECK_VALIDITY_GOTO(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID, finish);
1964     CHECK_VALIDITY_GOTO(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID, finish);
1965     CHECK_VALIDITY_GOTO(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID, finish);
1966     CHECK_VALIDITY_GOTO(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID, finish);
1967
1968     p = pa_proplist_new();
1969
1970     if (name)
1971         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1972
1973     if (c->version >= 12) {
1974         /* Since 0.9.8 the user can ask for a couple of additional flags */
1975
1976         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1977             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1978             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1979             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1980             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1981             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1982             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1983
1984             protocol_error(c);
1985             goto finish;
1986         }
1987     }
1988
1989     if (c->version >= 13) {
1990
1991         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1992             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1993             pa_tagstruct_get_proplist(t, p) < 0) {
1994
1995             protocol_error(c);
1996             goto finish;
1997         }
1998     }
1999
2000     if (c->version >= 14) {
2001
2002         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
2003             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2004
2005             protocol_error(c);
2006             goto finish;
2007         }
2008     }
2009
2010     if (c->version >= 15) {
2011
2012         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
2013             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2014             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2015
2016             protocol_error(c);
2017             goto finish;
2018         }
2019     }
2020
2021     if (c->version >= 17) {
2022
2023         if (pa_tagstruct_get_boolean(t, &relative_volume) < 0) {
2024
2025             protocol_error(c);
2026             goto finish;
2027         }
2028     }
2029
2030     if (c->version >= 18) {
2031
2032         if (pa_tagstruct_get_boolean(t, &passthrough) < 0 ) {
2033             protocol_error(c);
2034             goto finish;
2035         }
2036     }
2037
2038     if (c->version >= 21) {
2039
2040         if (pa_tagstruct_getu8(t, &n_formats) < 0) {
2041             protocol_error(c);
2042             goto finish;
2043         }
2044
2045         if (n_formats)
2046             formats = pa_idxset_new(NULL, NULL);
2047
2048         for (i = 0; i < n_formats; i++) {
2049             format = pa_format_info_new();
2050             if (pa_tagstruct_get_format_info(t, format) < 0) {
2051                 protocol_error(c);
2052                 goto finish;
2053             }
2054             pa_idxset_put(formats, format, NULL);
2055         }
2056     }
2057
2058     if (n_formats == 0) {
2059         CHECK_VALIDITY_GOTO(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID, finish);
2060         CHECK_VALIDITY_GOTO(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID, finish);
2061         CHECK_VALIDITY_GOTO(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID, finish);
2062     } else {
2063         PA_IDXSET_FOREACH(format, formats, i) {
2064             CHECK_VALIDITY_GOTO(c->pstream, pa_format_info_valid(format), tag, PA_ERR_INVALID, finish);
2065         }
2066     }
2067
2068     if (!pa_tagstruct_eof(t)) {
2069         protocol_error(c);
2070         goto finish;
2071     }
2072
2073     if (sink_index != PA_INVALID_INDEX) {
2074
2075         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
2076             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2077             goto finish;
2078         }
2079
2080     } else if (sink_name) {
2081
2082         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
2083             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2084             goto finish;
2085         }
2086     }
2087
2088     flags =
2089         (corked ? PA_SINK_INPUT_START_CORKED : 0) |
2090         (no_remap ? PA_SINK_INPUT_NO_REMAP : 0) |
2091         (no_remix ? PA_SINK_INPUT_NO_REMIX : 0) |
2092         (fix_format ? PA_SINK_INPUT_FIX_FORMAT : 0) |
2093         (fix_rate ? PA_SINK_INPUT_FIX_RATE : 0) |
2094         (fix_channels ? PA_SINK_INPUT_FIX_CHANNELS : 0) |
2095         (no_move ? PA_SINK_INPUT_DONT_MOVE : 0) |
2096         (variable_rate ? PA_SINK_INPUT_VARIABLE_RATE : 0) |
2097         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2098         (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0) |
2099         (passthrough ? PA_SINK_INPUT_PASSTHROUGH : 0);
2100
2101     /* Only since protocol version 15 there's a seperate muted_set
2102      * flag. For older versions we synthesize it here */
2103     muted_set = muted_set || muted;
2104
2105     s = playback_stream_new(c, sink, &ss, &map, formats, &attr, volume_set ? &volume : NULL, muted, muted_set, flags, p, adjust_latency, early_requests, relative_volume, syncid, &missing, &ret);
2106     /* We no longer own the formats idxset */
2107     formats = NULL;
2108
2109     CHECK_VALIDITY_GOTO(c->pstream, s, tag, ret, finish);
2110
2111     reply = reply_new(tag);
2112     pa_tagstruct_putu32(reply, s->index);
2113     pa_assert(s->sink_input);
2114     pa_tagstruct_putu32(reply, s->sink_input->index);
2115     pa_tagstruct_putu32(reply, missing);
2116
2117 /*     pa_log("initial request is %u", missing); */
2118
2119     if (c->version >= 9) {
2120         /* Since 0.9.0 we support sending the buffer metrics back to the client */
2121
2122         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2123         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
2124         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
2125         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
2126     }
2127
2128     if (c->version >= 12) {
2129         /* Since 0.9.8 we support sending the chosen sample
2130          * spec/channel map/device/suspend status back to the
2131          * client */
2132
2133         pa_tagstruct_put_sample_spec(reply, &ss);
2134         pa_tagstruct_put_channel_map(reply, &map);
2135
2136         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
2137         pa_tagstruct_puts(reply, s->sink_input->sink->name);
2138
2139         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2140     }
2141
2142     if (c->version >= 13)
2143         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2144
2145     if (c->version >= 21) {
2146         /* Send back the format we negotiated */
2147         if (s->sink_input->format)
2148             pa_tagstruct_put_format_info(reply, s->sink_input->format);
2149         else {
2150             pa_format_info *f = pa_format_info_new();
2151             pa_tagstruct_put_format_info(reply, f);
2152             pa_format_info_free(f);
2153         }
2154     }
2155
2156     pa_pstream_send_tagstruct(c->pstream, reply);
2157
2158 finish:
2159     if (p)
2160         pa_proplist_free(p);
2161     if (formats)
2162         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
2163 }
2164
2165 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2166     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2167     uint32_t channel;
2168
2169     pa_native_connection_assert_ref(c);
2170     pa_assert(t);
2171
2172     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2173         !pa_tagstruct_eof(t)) {
2174         protocol_error(c);
2175         return;
2176     }
2177
2178     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2179
2180     switch (command) {
2181
2182         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2183             playback_stream *s;
2184             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2185                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2186                 return;
2187             }
2188
2189             playback_stream_unlink(s);
2190             break;
2191         }
2192
2193         case PA_COMMAND_DELETE_RECORD_STREAM: {
2194             record_stream *s;
2195             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2196                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2197                 return;
2198             }
2199
2200             record_stream_unlink(s);
2201             break;
2202         }
2203
2204         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2205             upload_stream *s;
2206
2207             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2208                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2209                 return;
2210             }
2211
2212             upload_stream_unlink(s);
2213             break;
2214         }
2215
2216         default:
2217             pa_assert_not_reached();
2218     }
2219
2220     pa_pstream_send_simple_ack(c->pstream, tag);
2221 }
2222
2223 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2224     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2225     record_stream *s;
2226     pa_buffer_attr attr;
2227     uint32_t source_index;
2228     const char *name = NULL, *source_name;
2229     pa_sample_spec ss;
2230     pa_channel_map map;
2231     pa_tagstruct *reply;
2232     pa_source *source = NULL;
2233     pa_cvolume volume;
2234     pa_bool_t
2235         corked = FALSE,
2236         no_remap = FALSE,
2237         no_remix = FALSE,
2238         fix_format = FALSE,
2239         fix_rate = FALSE,
2240         fix_channels = FALSE,
2241         no_move = FALSE,
2242         variable_rate = FALSE,
2243         muted = FALSE,
2244         adjust_latency = FALSE,
2245         peak_detect = FALSE,
2246         early_requests = FALSE,
2247         dont_inhibit_auto_suspend = FALSE,
2248         volume_set = FALSE,
2249         muted_set = FALSE,
2250         fail_on_suspend = FALSE,
2251         relative_volume = FALSE,
2252         passthrough = FALSE;
2253
2254     pa_source_output_flags_t flags = 0;
2255     pa_proplist *p = NULL;
2256     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2257     pa_sink_input *direct_on_input = NULL;
2258     int ret = PA_ERR_INVALID;
2259     uint8_t n_formats = 0;
2260     pa_format_info *format;
2261     pa_idxset *formats = NULL;
2262     uint32_t i;
2263
2264     pa_native_connection_assert_ref(c);
2265     pa_assert(t);
2266
2267     memset(&attr, 0, sizeof(attr));
2268
2269     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2270         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2271         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2272         pa_tagstruct_getu32(t, &source_index) < 0 ||
2273         pa_tagstruct_gets(t, &source_name) < 0 ||
2274         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2275         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2276         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2277
2278         protocol_error(c);
2279         goto finish;
2280     }
2281
2282     CHECK_VALIDITY_GOTO(c->pstream, c->authorized, tag, PA_ERR_ACCESS, finish);
2283     CHECK_VALIDITY_GOTO(c->pstream, !source_name || pa_namereg_is_valid_name_or_wildcard(source_name, PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID, finish);
2284     CHECK_VALIDITY_GOTO(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID, finish);
2285     CHECK_VALIDITY_GOTO(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID, finish);
2286
2287     p = pa_proplist_new();
2288
2289     if (name)
2290         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2291
2292     if (c->version >= 12) {
2293         /* Since 0.9.8 the user can ask for a couple of additional flags */
2294
2295         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2296             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2297             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2298             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2299             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2300             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2301             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2302
2303             protocol_error(c);
2304             goto finish;
2305         }
2306     }
2307
2308     if (c->version >= 13) {
2309
2310         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2311             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2312             pa_tagstruct_get_proplist(t, p) < 0 ||
2313             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2314
2315             protocol_error(c);
2316             goto finish;
2317         }
2318     }
2319
2320     if (c->version >= 14) {
2321
2322         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2323             protocol_error(c);
2324             goto finish;
2325         }
2326     }
2327
2328     if (c->version >= 15) {
2329
2330         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2331             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2332
2333             protocol_error(c);
2334             goto finish;
2335         }
2336     }
2337
2338     if (c->version >= 22) {
2339         /* For newer client versions (with per-source-output volumes), we try
2340          * to make the behaviour for playback and record streams the same. */
2341         volume_set = TRUE;
2342
2343         if (pa_tagstruct_getu8(t, &n_formats) < 0) {
2344             protocol_error(c);
2345             goto finish;
2346         }
2347
2348         if (n_formats)
2349             formats = pa_idxset_new(NULL, NULL);
2350
2351         for (i = 0; i < n_formats; i++) {
2352             format = pa_format_info_new();
2353             if (pa_tagstruct_get_format_info(t, format) < 0) {
2354                 protocol_error(c);
2355                 goto finish;
2356             }
2357             pa_idxset_put(formats, format, NULL);
2358         }
2359
2360         if (pa_tagstruct_get_cvolume(t, &volume) < 0 ||
2361             pa_tagstruct_get_boolean(t, &muted) < 0 ||
2362             pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
2363             pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
2364             pa_tagstruct_get_boolean(t, &relative_volume) < 0 ||
2365             pa_tagstruct_get_boolean(t, &passthrough) < 0) {
2366
2367             protocol_error(c);
2368             goto finish;
2369         }
2370
2371         CHECK_VALIDITY_GOTO(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID, finish);
2372     }
2373
2374     if (n_formats == 0) {
2375         CHECK_VALIDITY_GOTO(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID, finish);
2376         CHECK_VALIDITY_GOTO(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID, finish);
2377         CHECK_VALIDITY_GOTO(c->pstream, c->version < 22 || (volume.channels == ss.channels), tag, PA_ERR_INVALID, finish);
2378         CHECK_VALIDITY_GOTO(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID, finish);
2379     } else {
2380         PA_IDXSET_FOREACH(format, formats, i) {
2381             CHECK_VALIDITY_GOTO(c->pstream, pa_format_info_valid(format), tag, PA_ERR_INVALID, finish);
2382         }
2383     }
2384
2385
2386     if (!pa_tagstruct_eof(t)) {
2387         protocol_error(c);
2388         goto finish;
2389     }
2390
2391     if (source_index != PA_INVALID_INDEX) {
2392
2393         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2394             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2395             goto finish;
2396         }
2397
2398     } else if (source_name) {
2399
2400         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2401             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2402             goto finish;
2403         }
2404     }
2405
2406     if (direct_on_input_idx != PA_INVALID_INDEX) {
2407
2408         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2409             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2410             goto finish;
2411         }
2412     }
2413
2414     flags =
2415         (corked ? PA_SOURCE_OUTPUT_START_CORKED : 0) |
2416         (no_remap ? PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2417         (no_remix ? PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2418         (fix_format ? PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2419         (fix_rate ? PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2420         (fix_channels ? PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2421         (no_move ? PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2422         (variable_rate ? PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2423         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2424         (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0) |
2425         (passthrough ? PA_SOURCE_OUTPUT_PASSTHROUGH : 0);
2426
2427     s = record_stream_new(c, source, &ss, &map, formats, &attr, volume_set ? &volume : NULL, muted, muted_set, flags, p, adjust_latency, early_requests, relative_volume, peak_detect, direct_on_input, &ret);
2428
2429     CHECK_VALIDITY_GOTO(c->pstream, s, tag, ret, finish);
2430
2431     reply = reply_new(tag);
2432     pa_tagstruct_putu32(reply, s->index);
2433     pa_assert(s->source_output);
2434     pa_tagstruct_putu32(reply, s->source_output->index);
2435
2436     if (c->version >= 9) {
2437         /* Since 0.9 we support sending the buffer metrics back to the client */
2438
2439         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2440         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2441     }
2442
2443     if (c->version >= 12) {
2444         /* Since 0.9.8 we support sending the chosen sample
2445          * spec/channel map/device/suspend status back to the
2446          * client */
2447
2448         pa_tagstruct_put_sample_spec(reply, &ss);
2449         pa_tagstruct_put_channel_map(reply, &map);
2450
2451         pa_tagstruct_putu32(reply, s->source_output->source->index);
2452         pa_tagstruct_puts(reply, s->source_output->source->name);
2453
2454         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2455     }
2456
2457     if (c->version >= 13)
2458         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2459
2460     if (c->version >= 22) {
2461         /* Send back the format we negotiated */
2462         if (s->source_output->format)
2463             pa_tagstruct_put_format_info(reply, s->source_output->format);
2464         else {
2465             pa_format_info *f = pa_format_info_new();
2466             pa_tagstruct_put_format_info(reply, f);
2467             pa_format_info_free(f);
2468         }
2469     }
2470
2471     pa_pstream_send_tagstruct(c->pstream, reply);
2472
2473 finish:
2474     if (p)
2475         pa_proplist_free(p);
2476     if (formats)
2477         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
2478 }
2479
2480 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2481     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2482     int ret;
2483
2484     pa_native_connection_assert_ref(c);
2485     pa_assert(t);
2486
2487     if (!pa_tagstruct_eof(t)) {
2488         protocol_error(c);
2489         return;
2490     }
2491
2492     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2493     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2494     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2495
2496     pa_log_debug("Client %s asks us to terminate.", pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY)));
2497
2498     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2499 }
2500
2501 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2502     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2503     const void*cookie;
2504     pa_tagstruct *reply;
2505     pa_bool_t shm_on_remote = FALSE, do_shm;
2506
2507     pa_native_connection_assert_ref(c);
2508     pa_assert(t);
2509
2510     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2511         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2512         !pa_tagstruct_eof(t)) {
2513         protocol_error(c);
2514         return;
2515     }
2516
2517     /* Minimum supported version */
2518     if (c->version < 8) {
2519         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2520         return;
2521     }
2522
2523     /* Starting with protocol version 13 the MSB of the version tag
2524        reflects if shm is available for this pa_native_connection or
2525        not. */
2526     if (c->version >= 13) {
2527         shm_on_remote = !!(c->version & 0x80000000U);
2528         c->version &= 0x7FFFFFFFU;
2529     }
2530
2531     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2532
2533     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2534
2535     if (!c->authorized) {
2536         pa_bool_t success = FALSE;
2537
2538 #ifdef HAVE_CREDS
2539         const pa_creds *creds;
2540
2541         if ((creds = pa_pdispatch_creds(pd))) {
2542             if (creds->uid == getuid())
2543                 success = TRUE;
2544             else if (c->options->auth_group) {
2545                 int r;
2546                 gid_t gid;
2547
2548                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2549                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2550                 else if (gid == creds->gid)
2551                     success = TRUE;
2552
2553                 if (!success) {
2554                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2555                         pa_log_warn("Failed to check group membership.");
2556                     else if (r > 0)
2557                         success = TRUE;
2558                 }
2559             }
2560
2561             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2562                         (unsigned long) creds->uid,
2563                         (unsigned long) creds->gid,
2564                         (int) success);
2565         }
2566 #endif
2567
2568         if (!success && c->options->auth_cookie) {
2569             const uint8_t *ac;
2570
2571             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2572                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2573                     success = TRUE;
2574         }
2575
2576         if (!success) {
2577             pa_log_warn("Denied access to client with invalid authorization data.");
2578             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2579             return;
2580         }
2581
2582         c->authorized = TRUE;
2583         if (c->auth_timeout_event) {
2584             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2585             c->auth_timeout_event = NULL;
2586         }
2587     }
2588
2589     /* Enable shared memory support if possible */
2590     do_shm =
2591         pa_mempool_is_shared(c->protocol->core->mempool) &&
2592         c->is_local;
2593
2594     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2595
2596     if (do_shm)
2597         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2598             do_shm = FALSE;
2599
2600 #ifdef HAVE_CREDS
2601     if (do_shm) {
2602         /* Only enable SHM if both sides are owned by the same
2603          * user. This is a security measure because otherwise data
2604          * private to the user might leak. */
2605
2606         const pa_creds *creds;
2607         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2608             do_shm = FALSE;
2609     }
2610 #endif
2611
2612     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2613     pa_pstream_enable_shm(c->pstream, do_shm);
2614
2615     reply = reply_new(tag);
2616     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2617
2618 #ifdef HAVE_CREDS
2619 {
2620     /* SHM support is only enabled after both sides made sure they are the same user. */
2621
2622     pa_creds ucred;
2623
2624     ucred.uid = getuid();
2625     ucred.gid = getgid();
2626
2627     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2628 }
2629 #else
2630     pa_pstream_send_tagstruct(c->pstream, reply);
2631 #endif
2632 }
2633
2634 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2635     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2636     const char *name = NULL;
2637     pa_proplist *p;
2638     pa_tagstruct *reply;
2639
2640     pa_native_connection_assert_ref(c);
2641     pa_assert(t);
2642
2643     p = pa_proplist_new();
2644
2645     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2646         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2647         !pa_tagstruct_eof(t)) {
2648
2649         protocol_error(c);
2650         pa_proplist_free(p);
2651         return;
2652     }
2653
2654     if (name)
2655         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2656             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2657             pa_proplist_free(p);
2658             return;
2659         }
2660
2661     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2662     pa_proplist_free(p);
2663
2664     reply = reply_new(tag);
2665
2666     if (c->version >= 13)
2667         pa_tagstruct_putu32(reply, c->client->index);
2668
2669     pa_pstream_send_tagstruct(c->pstream, reply);
2670 }
2671
2672 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2673     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2674     const char *name;
2675     uint32_t idx = PA_IDXSET_INVALID;
2676
2677     pa_native_connection_assert_ref(c);
2678     pa_assert(t);
2679
2680     if (pa_tagstruct_gets(t, &name) < 0 ||
2681         !pa_tagstruct_eof(t)) {
2682         protocol_error(c);
2683         return;
2684     }
2685
2686     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2687     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_LOOKUP_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
2688
2689     if (command == PA_COMMAND_LOOKUP_SINK) {
2690         pa_sink *sink;
2691         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2692             idx = sink->index;
2693     } else {
2694         pa_source *source;
2695         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2696         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2697             idx = source->index;
2698     }
2699
2700     if (idx == PA_IDXSET_INVALID)
2701         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2702     else {
2703         pa_tagstruct *reply;
2704         reply = reply_new(tag);
2705         pa_tagstruct_putu32(reply, idx);
2706         pa_pstream_send_tagstruct(c->pstream, reply);
2707     }
2708 }
2709
2710 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2711     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2712     uint32_t idx;
2713     playback_stream *s;
2714
2715     pa_native_connection_assert_ref(c);
2716     pa_assert(t);
2717
2718     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2719         !pa_tagstruct_eof(t)) {
2720         protocol_error(c);
2721         return;
2722     }
2723
2724     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2725     s = pa_idxset_get_by_index(c->output_streams, idx);
2726     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2727     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2728
2729     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2730 }
2731
2732 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2733     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2734     pa_tagstruct *reply;
2735     const pa_mempool_stat *stat;
2736
2737     pa_native_connection_assert_ref(c);
2738     pa_assert(t);
2739
2740     if (!pa_tagstruct_eof(t)) {
2741         protocol_error(c);
2742         return;
2743     }
2744
2745     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2746
2747     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2748
2749     reply = reply_new(tag);
2750     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2751     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2752     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2753     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2754     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2755     pa_pstream_send_tagstruct(c->pstream, reply);
2756 }
2757
2758 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2759     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2760     pa_tagstruct *reply;
2761     playback_stream *s;
2762     struct timeval tv, now;
2763     uint32_t idx;
2764
2765     pa_native_connection_assert_ref(c);
2766     pa_assert(t);
2767
2768     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2769         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2770         !pa_tagstruct_eof(t)) {
2771         protocol_error(c);
2772         return;
2773     }
2774
2775     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2776     s = pa_idxset_get_by_index(c->output_streams, idx);
2777     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2778     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2779
2780     /* Get an atomic snapshot of all timing parameters */
2781     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2782
2783     reply = reply_new(tag);
2784     pa_tagstruct_put_usec(reply,
2785                           s->current_sink_latency +
2786                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2787     pa_tagstruct_put_usec(reply, 0);
2788     pa_tagstruct_put_boolean(reply,
2789                              s->playing_for > 0 &&
2790                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2791                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2792     pa_tagstruct_put_timeval(reply, &tv);
2793     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2794     pa_tagstruct_puts64(reply, s->write_index);
2795     pa_tagstruct_puts64(reply, s->read_index);
2796
2797     if (c->version >= 13) {
2798         pa_tagstruct_putu64(reply, s->underrun_for);
2799         pa_tagstruct_putu64(reply, s->playing_for);
2800     }
2801
2802     pa_pstream_send_tagstruct(c->pstream, reply);
2803 }
2804
2805 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2806     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2807     pa_tagstruct *reply;
2808     record_stream *s;
2809     struct timeval tv, now;
2810     uint32_t idx;
2811
2812     pa_native_connection_assert_ref(c);
2813     pa_assert(t);
2814
2815     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2816         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2817         !pa_tagstruct_eof(t)) {
2818         protocol_error(c);
2819         return;
2820     }
2821
2822     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2823     s = pa_idxset_get_by_index(c->record_streams, idx);
2824     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2825
2826     /* Get an atomic snapshot of all timing parameters */
2827     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2828
2829     reply = reply_new(tag);
2830     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2831     pa_tagstruct_put_usec(reply,
2832                           s->current_source_latency +
2833                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->source->sample_spec));
2834     pa_tagstruct_put_boolean(reply,
2835                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2836                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2837     pa_tagstruct_put_timeval(reply, &tv);
2838     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2839     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2840     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2841     pa_pstream_send_tagstruct(c->pstream, reply);
2842 }
2843
2844 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2845     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2846     upload_stream *s;
2847     uint32_t length;
2848     const char *name = NULL;
2849     pa_sample_spec ss;
2850     pa_channel_map map;
2851     pa_tagstruct *reply;
2852     pa_proplist *p;
2853
2854     pa_native_connection_assert_ref(c);
2855     pa_assert(t);
2856
2857     if (pa_tagstruct_gets(t, &name) < 0 ||
2858         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2859         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2860         pa_tagstruct_getu32(t, &length) < 0) {
2861         protocol_error(c);
2862         return;
2863     }
2864
2865     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2866     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2867     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2868     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2869     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2870     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2871
2872     p = pa_proplist_new();
2873
2874     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2875         !pa_tagstruct_eof(t)) {
2876
2877         protocol_error(c);
2878         pa_proplist_free(p);
2879         return;
2880     }
2881
2882     if (c->version < 13)
2883         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2884     else if (!name)
2885         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2886             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2887
2888     if (!name || !pa_namereg_is_valid_name(name)) {
2889         pa_proplist_free(p);
2890         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2891     }
2892
2893     s = upload_stream_new(c, &ss, &map, name, length, p);
2894     pa_proplist_free(p);
2895
2896     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2897
2898     reply = reply_new(tag);
2899     pa_tagstruct_putu32(reply, s->index);
2900     pa_tagstruct_putu32(reply, length);
2901     pa_pstream_send_tagstruct(c->pstream, reply);
2902 }
2903
2904 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2905     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2906     uint32_t channel;
2907     upload_stream *s;
2908     uint32_t idx;
2909
2910     pa_native_connection_assert_ref(c);
2911     pa_assert(t);
2912
2913     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2914         !pa_tagstruct_eof(t)) {
2915         protocol_error(c);
2916         return;
2917     }
2918
2919     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2920
2921     s = pa_idxset_get_by_index(c->output_streams, channel);
2922     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2923     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2924
2925     if (!s->memchunk.memblock)
2926         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2927     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2928         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2929     else
2930         pa_pstream_send_simple_ack(c->pstream, tag);
2931
2932     upload_stream_unlink(s);
2933 }
2934
2935 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2936     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2937     uint32_t sink_index;
2938     pa_volume_t volume;
2939     pa_sink *sink;
2940     const char *name, *sink_name;
2941     uint32_t idx;
2942     pa_proplist *p;
2943     pa_tagstruct *reply;
2944
2945     pa_native_connection_assert_ref(c);
2946     pa_assert(t);
2947
2948     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2949
2950     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2951         pa_tagstruct_gets(t, &sink_name) < 0 ||
2952         pa_tagstruct_getu32(t, &volume) < 0 ||
2953         pa_tagstruct_gets(t, &name) < 0) {
2954         protocol_error(c);
2955         return;
2956     }
2957
2958     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name_or_wildcard(sink_name, PA_NAMEREG_SINK), tag, PA_ERR_INVALID);
2959     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2960     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2961     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2962
2963     if (sink_index != PA_INVALID_INDEX)
2964         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2965     else
2966         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2967
2968     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2969
2970     p = pa_proplist_new();
2971
2972     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2973         !pa_tagstruct_eof(t)) {
2974         protocol_error(c);
2975         pa_proplist_free(p);
2976         return;
2977     }
2978
2979     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2980
2981     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2982         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2983         pa_proplist_free(p);
2984         return;
2985     }
2986
2987     pa_proplist_free(p);
2988
2989     reply = reply_new(tag);
2990
2991     if (c->version >= 13)
2992         pa_tagstruct_putu32(reply, idx);
2993
2994     pa_pstream_send_tagstruct(c->pstream, reply);
2995 }
2996
2997 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2998     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2999     const char *name;
3000
3001     pa_native_connection_assert_ref(c);
3002     pa_assert(t);
3003
3004     if (pa_tagstruct_gets(t, &name) < 0 ||
3005         !pa_tagstruct_eof(t)) {
3006         protocol_error(c);
3007         return;
3008     }
3009
3010     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3011     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3012
3013     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
3014         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3015         return;
3016     }
3017
3018     pa_pstream_send_simple_ack(c->pstream, tag);
3019 }
3020
3021 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
3022     pa_assert(c);
3023     pa_assert(fixed);
3024     pa_assert(original);
3025
3026     *fixed = *original;
3027
3028     if (c->version < 12) {
3029         /* Before protocol version 12 we didn't support S32 samples,
3030          * so we need to lie about this to the client */
3031
3032         if (fixed->format == PA_SAMPLE_S32LE)
3033             fixed->format = PA_SAMPLE_FLOAT32LE;
3034         if (fixed->format == PA_SAMPLE_S32BE)
3035             fixed->format = PA_SAMPLE_FLOAT32BE;
3036     }
3037
3038     if (c->version < 15) {
3039         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
3040             fixed->format = PA_SAMPLE_FLOAT32LE;
3041         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
3042             fixed->format = PA_SAMPLE_FLOAT32BE;
3043     }
3044 }
3045
3046 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
3047     pa_sample_spec fixed_ss;
3048
3049     pa_assert(t);
3050     pa_sink_assert_ref(sink);
3051
3052     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
3053
3054     pa_tagstruct_put(
3055         t,
3056         PA_TAG_U32, sink->index,
3057         PA_TAG_STRING, sink->name,
3058         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
3059         PA_TAG_SAMPLE_SPEC, &fixed_ss,
3060         PA_TAG_CHANNEL_MAP, &sink->channel_map,
3061         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
3062         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
3063         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
3064         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
3065         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
3066         PA_TAG_USEC, pa_sink_get_latency(sink),
3067         PA_TAG_STRING, sink->driver,
3068         PA_TAG_U32, sink->flags & ~PA_SINK_SHARE_VOLUME_WITH_MASTER,
3069         PA_TAG_INVALID);
3070
3071     if (c->version >= 13) {
3072         pa_tagstruct_put_proplist(t, sink->proplist);
3073         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
3074     }
3075
3076     if (c->version >= 15) {
3077         pa_tagstruct_put_volume(t, sink->base_volume);
3078         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
3079             pa_log_error("Internal sink state is invalid.");
3080         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
3081         pa_tagstruct_putu32(t, sink->n_volume_steps);
3082         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
3083     }
3084
3085     if (c->version >= 16) {
3086         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
3087
3088         if (sink->ports) {
3089             void *state;
3090             pa_device_port *p;
3091
3092             PA_HASHMAP_FOREACH(p, sink->ports, state) {
3093                 pa_tagstruct_puts(t, p->name);
3094                 pa_tagstruct_puts(t, p->description);
3095                 pa_tagstruct_putu32(t, p->priority);
3096             }
3097         }
3098
3099         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
3100     }
3101
3102     if (c->version >= 21) {
3103         uint32_t i;
3104         pa_format_info *f;
3105         pa_idxset *formats = pa_sink_get_formats(sink);
3106
3107         pa_tagstruct_putu8(t, (uint8_t) pa_idxset_size(formats));
3108         PA_IDXSET_FOREACH(f, formats, i) {
3109             pa_tagstruct_put_format_info(t, f);
3110         }
3111
3112         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
3113     }
3114 }
3115
3116 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
3117     pa_sample_spec fixed_ss;
3118
3119     pa_assert(t);
3120     pa_source_assert_ref(source);
3121
3122     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
3123
3124     pa_tagstruct_put(
3125         t,
3126         PA_TAG_U32, source->index,
3127         PA_TAG_STRING, source->name,
3128         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
3129         PA_TAG_SAMPLE_SPEC, &fixed_ss,
3130         PA_TAG_CHANNEL_MAP, &source->channel_map,
3131         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
3132         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
3133         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
3134         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
3135         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
3136         PA_TAG_USEC, pa_source_get_latency(source),
3137         PA_TAG_STRING, source->driver,
3138         PA_TAG_U32, source->flags,
3139         PA_TAG_INVALID);
3140
3141     if (c->version >= 13) {
3142         pa_tagstruct_put_proplist(t, source->proplist);
3143         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
3144     }
3145
3146     if (c->version >= 15) {
3147         pa_tagstruct_put_volume(t, source->base_volume);
3148         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
3149             pa_log_error("Internal source state is invalid.");
3150         pa_tagstruct_putu32(t, pa_source_get_state(source));
3151         pa_tagstruct_putu32(t, source->n_volume_steps);
3152         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
3153     }
3154
3155     if (c->version >= 16) {
3156
3157         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
3158
3159         if (source->ports) {
3160             void *state;
3161             pa_device_port *p;
3162
3163             PA_HASHMAP_FOREACH(p, source->ports, state) {
3164                 pa_tagstruct_puts(t, p->name);
3165                 pa_tagstruct_puts(t, p->description);
3166                 pa_tagstruct_putu32(t, p->priority);
3167             }
3168         }
3169
3170         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
3171     }
3172
3173     if (c->version >= 22) {
3174         uint32_t i;
3175         pa_format_info *f;
3176         pa_idxset *formats = pa_source_get_formats(source);
3177
3178         pa_tagstruct_putu8(t, (uint8_t) pa_idxset_size(formats));
3179         PA_IDXSET_FOREACH(f, formats, i) {
3180             pa_tagstruct_put_format_info(t, f);
3181         }
3182
3183         pa_idxset_free(formats, (pa_free2_cb_t) pa_format_info_free2, NULL);
3184     }
3185 }
3186
3187 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
3188     pa_assert(t);
3189     pa_assert(client);
3190
3191     pa_tagstruct_putu32(t, client->index);
3192     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
3193     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
3194     pa_tagstruct_puts(t, client->driver);
3195
3196     if (c->version >= 13)
3197         pa_tagstruct_put_proplist(t, client->proplist);
3198 }
3199
3200 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
3201     void *state = NULL;
3202     pa_card_profile *p;
3203
3204     pa_assert(t);
3205     pa_assert(card);
3206
3207     pa_tagstruct_putu32(t, card->index);
3208     pa_tagstruct_puts(t, card->name);
3209     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
3210     pa_tagstruct_puts(t, card->driver);
3211
3212     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
3213
3214     if (card->profiles) {
3215         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
3216             pa_tagstruct_puts(t, p->name);
3217             pa_tagstruct_puts(t, p->description);
3218             pa_tagstruct_putu32(t, p->n_sinks);
3219             pa_tagstruct_putu32(t, p->n_sources);
3220             pa_tagstruct_putu32(t, p->priority);
3221         }
3222     }
3223
3224     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
3225     pa_tagstruct_put_proplist(t, card->proplist);
3226 }
3227
3228 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
3229     pa_assert(t);
3230     pa_assert(module);
3231
3232     pa_tagstruct_putu32(t, module->index);
3233     pa_tagstruct_puts(t, module->name);
3234     pa_tagstruct_puts(t, module->argument);
3235     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
3236
3237     if (c->version < 15)
3238         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
3239
3240     if (c->version >= 15)
3241         pa_tagstruct_put_proplist(t, module->proplist);
3242 }
3243
3244 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
3245     pa_sample_spec fixed_ss;
3246     pa_usec_t sink_latency;
3247     pa_cvolume v;
3248     pa_bool_t has_volume = FALSE;
3249
3250     pa_assert(t);
3251     pa_sink_input_assert_ref(s);
3252
3253     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3254
3255     has_volume = pa_sink_input_is_volume_readable(s);
3256     if (has_volume)
3257         pa_sink_input_get_volume(s, &v, TRUE);
3258     else
3259         pa_cvolume_reset(&v, fixed_ss.channels);
3260
3261     pa_tagstruct_putu32(t, s->index);
3262     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3263     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3264     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3265     pa_tagstruct_putu32(t, s->sink->index);
3266     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3267     pa_tagstruct_put_channel_map(t, &s->channel_map);
3268     pa_tagstruct_put_cvolume(t, &v);
3269     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3270     pa_tagstruct_put_usec(t, sink_latency);
3271     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3272     pa_tagstruct_puts(t, s->driver);
3273     if (c->version >= 11)
3274         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3275     if (c->version >= 13)
3276         pa_tagstruct_put_proplist(t, s->proplist);
3277     if (c->version >= 19)
3278         pa_tagstruct_put_boolean(t, (pa_sink_input_get_state(s) == PA_SINK_INPUT_CORKED));
3279     if (c->version >= 20) {
3280         pa_tagstruct_put_boolean(t, has_volume);
3281         pa_tagstruct_put_boolean(t, s->volume_writable);
3282     }
3283     if (c->version >= 21)
3284         pa_tagstruct_put_format_info(t, s->format);
3285 }
3286
3287 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3288     pa_sample_spec fixed_ss;
3289     pa_usec_t source_latency;
3290     pa_cvolume v;
3291     pa_bool_t has_volume = FALSE;
3292
3293     pa_assert(t);
3294     pa_source_output_assert_ref(s);
3295
3296     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3297
3298     has_volume = pa_source_output_is_volume_readable(s);
3299     if (has_volume)
3300         pa_source_output_get_volume(s, &v, TRUE);
3301     else
3302         pa_cvolume_reset(&v, fixed_ss.channels);
3303
3304     pa_tagstruct_putu32(t, s->index);
3305     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3306     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3307     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3308     pa_tagstruct_putu32(t, s->source->index);
3309     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3310     pa_tagstruct_put_channel_map(t, &s->channel_map);
3311     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3312     pa_tagstruct_put_usec(t, source_latency);
3313     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3314     pa_tagstruct_puts(t, s->driver);
3315     if (c->version >= 13)
3316         pa_tagstruct_put_proplist(t, s->proplist);
3317     if (c->version >= 19)
3318         pa_tagstruct_put_boolean(t, (pa_source_output_get_state(s) == PA_SOURCE_OUTPUT_CORKED));
3319     if (c->version >= 22) {
3320         pa_tagstruct_put_cvolume(t, &v);
3321         pa_tagstruct_put_boolean(t, pa_source_output_get_mute(s));
3322         pa_tagstruct_put_boolean(t, has_volume);
3323         pa_tagstruct_put_boolean(t, s->volume_writable);
3324         pa_tagstruct_put_format_info(t, s->format);
3325     }
3326 }
3327
3328 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3329     pa_sample_spec fixed_ss;
3330     pa_cvolume v;
3331
3332     pa_assert(t);
3333     pa_assert(e);
3334
3335     if (e->memchunk.memblock)
3336         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3337     else
3338         memset(&fixed_ss, 0, sizeof(fixed_ss));
3339
3340     pa_tagstruct_putu32(t, e->index);
3341     pa_tagstruct_puts(t, e->name);
3342
3343     if (e->volume_is_set)
3344         v = e->volume;
3345     else
3346         pa_cvolume_init(&v);
3347
3348     pa_tagstruct_put_cvolume(t, &v);
3349     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3350     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3351     pa_tagstruct_put_channel_map(t, &e->channel_map);
3352     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3353     pa_tagstruct_put_boolean(t, e->lazy);
3354     pa_tagstruct_puts(t, e->filename);
3355
3356     if (c->version >= 13)
3357         pa_tagstruct_put_proplist(t, e->proplist);
3358 }
3359
3360 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3361     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3362     uint32_t idx;
3363     pa_sink *sink = NULL;
3364     pa_source *source = NULL;
3365     pa_client *client = NULL;
3366     pa_card *card = NULL;
3367     pa_module *module = NULL;
3368     pa_sink_input *si = NULL;
3369     pa_source_output *so = NULL;
3370     pa_scache_entry *sce = NULL;
3371     const char *name = NULL;
3372     pa_tagstruct *reply;
3373
3374     pa_native_connection_assert_ref(c);
3375     pa_assert(t);
3376
3377     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3378         (command != PA_COMMAND_GET_CLIENT_INFO &&
3379          command != PA_COMMAND_GET_MODULE_INFO &&
3380          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3381          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3382          pa_tagstruct_gets(t, &name) < 0) ||
3383         !pa_tagstruct_eof(t)) {
3384         protocol_error(c);
3385         return;
3386     }
3387
3388     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3389     CHECK_VALIDITY(c->pstream, !name ||
3390                    (command == PA_COMMAND_GET_SINK_INFO &&
3391                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SINK)) ||
3392                    (command == PA_COMMAND_GET_SOURCE_INFO &&
3393                     pa_namereg_is_valid_name_or_wildcard(name, PA_NAMEREG_SOURCE)) ||
3394                    pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3395     CHECK_VALIDITY(c->pstream, command == PA_COMMAND_GET_SINK_INFO ||
3396                    command == PA_COMMAND_GET_SOURCE_INFO ||
3397                    (idx != PA_INVALID_INDEX || name), tag, PA_ERR_INVALID);
3398     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3399     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3400
3401     if (command == PA_COMMAND_GET_SINK_INFO) {
3402         if (idx != PA_INVALID_INDEX)
3403             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3404         else
3405             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3406     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3407         if (idx != PA_INVALID_INDEX)
3408             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3409         else
3410             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3411     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3412         if (idx != PA_INVALID_INDEX)
3413             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3414         else
3415             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3416     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3417         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3418     else if (command == PA_COMMAND_GET_MODULE_INFO)
3419         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3420     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3421         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3422     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3423         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3424     else {
3425         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3426         if (idx != PA_INVALID_INDEX)
3427             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3428         else
3429             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3430     }
3431
3432     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3433         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3434         return;
3435     }
3436
3437     reply = reply_new(tag);
3438     if (sink)
3439         sink_fill_tagstruct(c, reply, sink);
3440     else if (source)
3441         source_fill_tagstruct(c, reply, source);
3442     else if (client)
3443         client_fill_tagstruct(c, reply, client);
3444     else if (card)
3445         card_fill_tagstruct(c, reply, card);
3446     else if (module)
3447         module_fill_tagstruct(c, reply, module);
3448     else if (si)
3449         sink_input_fill_tagstruct(c, reply, si);
3450     else if (so)
3451         source_output_fill_tagstruct(c, reply, so);
3452     else
3453         scache_fill_tagstruct(c, reply, sce);
3454     pa_pstream_send_tagstruct(c->pstream, reply);
3455 }
3456
3457 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3458     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3459     pa_idxset *i;
3460     uint32_t idx;
3461     void *p;
3462     pa_tagstruct *reply;
3463
3464     pa_native_connection_assert_ref(c);
3465     pa_assert(t);
3466
3467     if (!pa_tagstruct_eof(t)) {
3468         protocol_error(c);
3469         return;
3470     }
3471
3472     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3473
3474     reply = reply_new(tag);
3475
3476     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3477         i = c->protocol->core->sinks;
3478     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3479         i = c->protocol->core->sources;
3480     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3481         i = c->protocol->core->clients;
3482     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3483         i = c->protocol->core->cards;
3484     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3485         i = c->protocol->core->modules;
3486     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3487         i = c->protocol->core->sink_inputs;
3488     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3489         i = c->protocol->core->source_outputs;
3490     else {
3491         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3492         i = c->protocol->core->scache;
3493     }
3494
3495     if (i) {
3496         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3497             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3498                 sink_fill_tagstruct(c, reply, p);
3499             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3500                 source_fill_tagstruct(c, reply, p);
3501             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3502                 client_fill_tagstruct(c, reply, p);
3503             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3504                 card_fill_tagstruct(c, reply, p);
3505             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3506                 module_fill_tagstruct(c, reply, p);
3507             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3508                 sink_input_fill_tagstruct(c, reply, p);
3509             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3510                 source_output_fill_tagstruct(c, reply, p);
3511             else {
3512                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3513                 scache_fill_tagstruct(c, reply, p);
3514             }
3515         }
3516     }
3517
3518     pa_pstream_send_tagstruct(c->pstream, reply);
3519 }
3520
3521 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3522     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3523     pa_tagstruct *reply;
3524     pa_sink *def_sink;
3525     pa_source *def_source;
3526     pa_sample_spec fixed_ss;
3527     char *h, *u;
3528
3529     pa_native_connection_assert_ref(c);
3530     pa_assert(t);
3531
3532     if (!pa_tagstruct_eof(t)) {
3533         protocol_error(c);
3534         return;
3535     }
3536
3537     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3538
3539     reply = reply_new(tag);
3540     pa_tagstruct_puts(reply, PACKAGE_NAME);
3541     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3542
3543     u = pa_get_user_name_malloc();
3544     pa_tagstruct_puts(reply, u);
3545     pa_xfree(u);
3546
3547     h = pa_get_host_name_malloc();
3548     pa_tagstruct_puts(reply, h);
3549     pa_xfree(h);
3550
3551     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3552     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3553
3554     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3555     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3556     def_source = pa_namereg_get_default_source(c->protocol->core);
3557     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3558
3559     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3560
3561     if (c->version >= 15)
3562         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3563
3564     pa_pstream_send_tagstruct(c->pstream, reply);
3565 }
3566
3567 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3568     pa_tagstruct *t;
3569     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3570
3571     pa_native_connection_assert_ref(c);
3572
3573     t = pa_tagstruct_new(NULL, 0);
3574     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3575     pa_tagstruct_putu32(t, (uint32_t) -1);
3576     pa_tagstruct_putu32(t, e);
3577     pa_tagstruct_putu32(t, idx);
3578     pa_pstream_send_tagstruct(c->pstream, t);
3579 }
3580
3581 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3582     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3583     pa_subscription_mask_t m;
3584
3585     pa_native_connection_assert_ref(c);
3586     pa_assert(t);
3587
3588     if (pa_tagstruct_getu32(t, &m) < 0 ||
3589         !pa_tagstruct_eof(t)) {
3590         protocol_error(c);
3591         return;
3592     }
3593
3594     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3595     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3596
3597     if (c->subscription)
3598         pa_subscription_free(c->subscription);
3599
3600     if (m != 0) {
3601         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3602         pa_assert(c->subscription);
3603     } else
3604         c->subscription = NULL;
3605
3606     pa_pstream_send_simple_ack(c->pstream, tag);
3607 }
3608
3609 static void command_set_volume(
3610         pa_pdispatch *pd,
3611         uint32_t command,
3612         uint32_t tag,
3613         pa_tagstruct *t,
3614         void *userdata) {
3615
3616     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3617     uint32_t idx;
3618     pa_cvolume volume;
3619     pa_sink *sink = NULL;
3620     pa_source *source = NULL;
3621     pa_sink_input *si = NULL;
3622     pa_source_output *so = NULL;
3623     const char *name = NULL;
3624     const char *client_name;
3625
3626     pa_native_connection_assert_ref(c);
3627     pa_assert(t);
3628
3629     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3630         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3631         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3632         pa_tagstruct_get_cvolume(t, &volume) ||
3633         !pa_tagstruct_eof(t)) {
3634         protocol_error(c);
3635         return;
3636     }
3637
3638     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3639     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_VOLUME ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3640     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3641     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3642     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3643     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3644
3645     switch (command) {
3646
3647         case PA_COMMAND_SET_SINK_VOLUME:
3648             if (idx != PA_INVALID_INDEX)
3649                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3650             else
3651                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3652             break;
3653
3654         case PA_COMMAND_SET_SOURCE_VOLUME:
3655             if (idx != PA_INVALID_INDEX)
3656                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3657             else
3658                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3659             break;
3660
3661         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3662             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3663             break;
3664
3665         case PA_COMMAND_SET_SOURCE_OUTPUT_VOLUME:
3666             so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3667             break;
3668
3669         default:
3670             pa_assert_not_reached();
3671     }
3672
3673     CHECK_VALIDITY(c->pstream, si || so || sink || source, tag, PA_ERR_NOENTITY);
3674
3675     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3676
3677     if (sink) {
3678         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &sink->sample_spec), tag, PA_ERR_INVALID);
3679
3680         pa_log_debug("Client %s changes volume of sink %s.", client_name, sink->name);
3681         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3682     } else if (source) {
3683         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &source->sample_spec), tag, PA_ERR_INVALID);
3684
3685         pa_log_debug("Client %s changes volume of source %s.", client_name, source->name);
3686         pa_source_set_volume(source, &volume, TRUE, TRUE);
3687     } else if (si) {
3688         CHECK_VALIDITY(c->pstream, si->volume_writable, tag, PA_ERR_BADSTATE);
3689         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &si->sample_spec), tag, PA_ERR_INVALID);
3690
3691         pa_log_debug("Client %s changes volume of sink input %s.",
3692                      client_name,
3693                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3694         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3695     } else if (so) {
3696         CHECK_VALIDITY(c->pstream, volume.channels == 1 || pa_cvolume_compatible(&volume, &so->sample_spec), tag, PA_ERR_INVALID);
3697
3698         pa_log_debug("Client %s changes volume of source output %s.",
3699                      client_name,
3700                      pa_strnull(pa_proplist_gets(so->proplist, PA_PROP_MEDIA_NAME)));
3701         pa_source_output_set_volume(so, &volume, TRUE, TRUE);
3702     }
3703
3704     pa_pstream_send_simple_ack(c->pstream, tag);
3705 }
3706
3707 static void command_set_mute(
3708         pa_pdispatch *pd,
3709         uint32_t command,
3710         uint32_t tag,
3711         pa_tagstruct *t,
3712         void *userdata) {
3713
3714     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3715     uint32_t idx;
3716     pa_bool_t mute;
3717     pa_sink *sink = NULL;
3718     pa_source *source = NULL;
3719     pa_sink_input *si = NULL;
3720     pa_source_output *so = NULL;
3721     const char *name = NULL, *client_name;
3722
3723     pa_native_connection_assert_ref(c);
3724     pa_assert(t);
3725
3726     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3727         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3728         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3729         pa_tagstruct_get_boolean(t, &mute) ||
3730         !pa_tagstruct_eof(t)) {
3731         protocol_error(c);
3732         return;
3733     }
3734
3735     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3736     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_MUTE ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
3737     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3738     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3739     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3740
3741     switch (command) {
3742
3743         case PA_COMMAND_SET_SINK_MUTE:
3744             if (idx != PA_INVALID_INDEX)
3745                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3746             else
3747                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3748
3749             break;
3750
3751         case PA_COMMAND_SET_SOURCE_MUTE:
3752             if (idx != PA_INVALID_INDEX)
3753                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3754             else
3755                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3756
3757             break;
3758
3759         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3760             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3761             break;
3762
3763         case PA_COMMAND_SET_SOURCE_OUTPUT_MUTE:
3764             so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3765             break;
3766
3767         default:
3768             pa_assert_not_reached();
3769     }
3770
3771     CHECK_VALIDITY(c->pstream, si || so || sink || source, tag, PA_ERR_NOENTITY);
3772
3773     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3774
3775     if (sink) {
3776         pa_log_debug("Client %s changes mute of sink %s.", client_name, sink->name);
3777         pa_sink_set_mute(sink, mute, TRUE);
3778     } else if (source) {
3779         pa_log_debug("Client %s changes mute of source %s.", client_name, source->name);
3780         pa_source_set_mute(source, mute, TRUE);
3781     } else if (si) {
3782         pa_log_debug("Client %s changes mute of sink input %s.",
3783                      client_name,
3784                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3785         pa_sink_input_set_mute(si, mute, TRUE);
3786     } else if (so) {
3787         pa_log_debug("Client %s changes mute of source output %s.",
3788                      client_name,
3789                      pa_strnull(pa_proplist_gets(so->proplist, PA_PROP_MEDIA_NAME)));
3790         pa_source_output_set_mute(so, mute, TRUE);
3791     }
3792
3793     pa_pstream_send_simple_ack(c->pstream, tag);
3794 }
3795
3796 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3797     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3798     uint32_t idx;
3799     pa_bool_t b;
3800     playback_stream *s;
3801
3802     pa_native_connection_assert_ref(c);
3803     pa_assert(t);
3804
3805     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3806         pa_tagstruct_get_boolean(t, &b) < 0 ||
3807         !pa_tagstruct_eof(t)) {
3808         protocol_error(c);
3809         return;
3810     }
3811
3812     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3813     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3814     s = pa_idxset_get_by_index(c->output_streams, idx);
3815     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3816     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3817
3818     pa_sink_input_cork(s->sink_input, b);
3819
3820     if (b)
3821         s->is_underrun = TRUE;
3822
3823     pa_pstream_send_simple_ack(c->pstream, tag);
3824 }
3825
3826 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3827     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3828     uint32_t idx;
3829     playback_stream *s;
3830
3831     pa_native_connection_assert_ref(c);
3832     pa_assert(t);
3833
3834     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3835         !pa_tagstruct_eof(t)) {
3836         protocol_error(c);
3837         return;
3838     }
3839
3840     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3841     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3842     s = pa_idxset_get_by_index(c->output_streams, idx);
3843     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3844     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3845
3846     switch (command) {
3847         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3848             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3849             break;
3850
3851         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3852             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3853             break;
3854
3855         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3856             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3857             break;
3858
3859         default:
3860             pa_assert_not_reached();
3861     }
3862
3863     pa_pstream_send_simple_ack(c->pstream, tag);
3864 }
3865
3866 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3867     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3868     uint32_t idx;
3869     record_stream *s;
3870     pa_bool_t b;
3871
3872     pa_native_connection_assert_ref(c);
3873     pa_assert(t);
3874
3875     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3876         pa_tagstruct_get_boolean(t, &b) < 0 ||
3877         !pa_tagstruct_eof(t)) {
3878         protocol_error(c);
3879         return;
3880     }
3881
3882     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3883     s = pa_idxset_get_by_index(c->record_streams, idx);
3884     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3885
3886     pa_source_output_cork(s->source_output, b);
3887     pa_memblockq_prebuf_force(s->memblockq);
3888     pa_pstream_send_simple_ack(c->pstream, tag);
3889 }
3890
3891 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3892     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3893     uint32_t idx;
3894     record_stream *s;
3895
3896     pa_native_connection_assert_ref(c);
3897     pa_assert(t);
3898
3899     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3900         !pa_tagstruct_eof(t)) {
3901         protocol_error(c);
3902         return;
3903     }
3904
3905     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3906     s = pa_idxset_get_by_index(c->record_streams, idx);
3907     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3908
3909     pa_memblockq_flush_read(s->memblockq);
3910     pa_pstream_send_simple_ack(c->pstream, tag);
3911 }
3912
3913 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3914     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3915     uint32_t idx;
3916     pa_buffer_attr a;
3917     pa_tagstruct *reply;
3918
3919     pa_native_connection_assert_ref(c);
3920     pa_assert(t);
3921
3922     memset(&a, 0, sizeof(a));
3923
3924     if (pa_tagstruct_getu32(t, &idx) < 0) {
3925         protocol_error(c);
3926         return;
3927     }
3928
3929     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3930
3931     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3932         playback_stream *s;
3933         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3934
3935         s = pa_idxset_get_by_index(c->output_streams, idx);
3936         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3937         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3938
3939         if (pa_tagstruct_get(
3940                     t,
3941                     PA_TAG_U32, &a.maxlength,
3942                     PA_TAG_U32, &a.tlength,
3943                     PA_TAG_U32, &a.prebuf,
3944                     PA_TAG_U32, &a.minreq,
3945                     PA_TAG_INVALID) < 0 ||
3946             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3947             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3948             !pa_tagstruct_eof(t)) {
3949             protocol_error(c);
3950             return;
3951         }
3952
3953         s->adjust_latency = adjust_latency;
3954         s->early_requests = early_requests;
3955         s->buffer_attr_req = a;
3956
3957         fix_playback_buffer_attr(s);
3958         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3959
3960         reply = reply_new(tag);
3961         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3962         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3963         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3964         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3965
3966         if (c->version >= 13)
3967             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3968
3969     } else {
3970         record_stream *s;
3971         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3972         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3973
3974         s = pa_idxset_get_by_index(c->record_streams, idx);
3975         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3976
3977         if (pa_tagstruct_get(
3978                     t,
3979                     PA_TAG_U32, &a.maxlength,
3980                     PA_TAG_U32, &a.fragsize,
3981                     PA_TAG_INVALID) < 0 ||
3982             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3983             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3984             !pa_tagstruct_eof(t)) {
3985             protocol_error(c);
3986             return;
3987         }
3988
3989         s->adjust_latency = adjust_latency;
3990         s->early_requests = early_requests;
3991         s->buffer_attr_req = a;
3992
3993         fix_record_buffer_attr_pre(s);
3994         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3995         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3996         fix_record_buffer_attr_post(s);
3997
3998         reply = reply_new(tag);
3999         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
4000         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
4001
4002         if (c->version >= 13)
4003             pa_tagstruct_put_usec(reply, s->configured_source_latency);
4004     }
4005
4006     pa_pstream_send_tagstruct(c->pstream, reply);
4007 }
4008
4009 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4010     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4011     uint32_t idx;
4012     uint32_t rate;
4013
4014     pa_native_connection_assert_ref(c);
4015     pa_assert(t);
4016
4017     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4018         pa_tagstruct_getu32(t, &rate) < 0 ||
4019         !pa_tagstruct_eof(t)) {
4020         protocol_error(c);
4021         return;
4022     }
4023
4024     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4025     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
4026
4027     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
4028         playback_stream *s;
4029
4030         s = pa_idxset_get_by_index(c->output_streams, idx);
4031         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4032         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4033
4034         pa_sink_input_set_rate(s->sink_input, rate);
4035
4036     } else {
4037         record_stream *s;
4038         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
4039
4040         s = pa_idxset_get_by_index(c->record_streams, idx);
4041         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4042
4043         pa_source_output_set_rate(s->source_output, rate);
4044     }
4045
4046     pa_pstream_send_simple_ack(c->pstream, tag);
4047 }
4048
4049 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4050     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4051     uint32_t idx;
4052     uint32_t mode;
4053     pa_proplist *p;
4054
4055     pa_native_connection_assert_ref(c);
4056     pa_assert(t);
4057
4058     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4059
4060     p = pa_proplist_new();
4061
4062     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
4063
4064         if (pa_tagstruct_getu32(t, &mode) < 0 ||
4065             pa_tagstruct_get_proplist(t, p) < 0 ||
4066             !pa_tagstruct_eof(t)) {
4067             protocol_error(c);
4068             pa_proplist_free(p);
4069             return;
4070         }
4071
4072     } else {
4073
4074         if (pa_tagstruct_getu32(t, &idx) < 0 ||
4075             pa_tagstruct_getu32(t, &mode) < 0 ||
4076             pa_tagstruct_get_proplist(t, p) < 0 ||
4077             !pa_tagstruct_eof(t)) {
4078             protocol_error(c);
4079             pa_proplist_free(p);
4080             return;
4081         }
4082     }
4083
4084     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
4085         pa_proplist_free(p);
4086         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
4087     }
4088
4089     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
4090         playback_stream *s;
4091
4092         s = pa_idxset_get_by_index(c->output_streams, idx);
4093         if (!s || !playback_stream_isinstance(s)) {
4094             pa_proplist_free(p);
4095             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
4096         }
4097         pa_sink_input_update_proplist(s->sink_input, mode, p);
4098
4099     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
4100         record_stream *s;
4101
4102         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
4103             pa_proplist_free(p);
4104             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
4105         }
4106         pa_source_output_update_proplist(s->source_output, mode, p);
4107
4108     } else {
4109         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
4110
4111         pa_client_update_proplist(c->client, mode, p);
4112     }
4113
4114     pa_pstream_send_simple_ack(c->pstream, tag);
4115     pa_proplist_free(p);
4116 }
4117
4118 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4119     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4120     uint32_t idx;
4121     unsigned changed = 0;
4122     pa_proplist *p;
4123     pa_strlist *l = NULL;
4124
4125     pa_native_connection_assert_ref(c);
4126     pa_assert(t);
4127
4128     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4129
4130     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
4131
4132         if (pa_tagstruct_getu32(t, &idx) < 0) {
4133             protocol_error(c);
4134             return;
4135         }
4136     }
4137
4138     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
4139         playback_stream *s;
4140
4141         s = pa_idxset_get_by_index(c->output_streams, idx);
4142         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4143         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4144
4145         p = s->sink_input->proplist;
4146
4147     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
4148         record_stream *s;
4149
4150         s = pa_idxset_get_by_index(c->record_streams, idx);
4151         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4152
4153         p = s->source_output->proplist;
4154     } else {
4155         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
4156
4157         p = c->client->proplist;
4158     }
4159
4160     for (;;) {
4161         const char *k;
4162
4163         if (pa_tagstruct_gets(t, &k) < 0) {
4164             protocol_error(c);
4165             pa_strlist_free(l);
4166             return;
4167         }
4168
4169         if (!k)
4170             break;
4171
4172         l = pa_strlist_prepend(l, k);
4173     }
4174
4175     if (!pa_tagstruct_eof(t)) {
4176         protocol_error(c);
4177         pa_strlist_free(l);
4178         return;
4179     }
4180
4181     for (;;) {
4182         char *z;
4183
4184         l = pa_strlist_pop(l, &z);
4185
4186         if (!z)
4187             break;
4188
4189         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
4190         pa_xfree(z);
4191     }
4192
4193     pa_pstream_send_simple_ack(c->pstream, tag);
4194
4195     if (changed) {
4196         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
4197             playback_stream *s;
4198
4199             s = pa_idxset_get_by_index(c->output_streams, idx);
4200             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
4201
4202         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
4203             record_stream *s;
4204
4205             s = pa_idxset_get_by_index(c->record_streams, idx);
4206             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
4207
4208         } else {
4209             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
4210             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
4211         }
4212     }
4213 }
4214
4215 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4216     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4217     const char *s;
4218
4219     pa_native_connection_assert_ref(c);
4220     pa_assert(t);
4221
4222     if (pa_tagstruct_gets(t, &s) < 0 ||
4223         !pa_tagstruct_eof(t)) {
4224         protocol_error(c);
4225         return;
4226     }
4227
4228     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4229     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
4230
4231     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
4232         pa_source *source;
4233
4234         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
4235         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4236
4237         pa_namereg_set_default_source(c->protocol->core, source);
4238     } else {
4239         pa_sink *sink;
4240         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
4241
4242         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
4243         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4244
4245         pa_namereg_set_default_sink(c->protocol->core, sink);
4246     }
4247
4248     pa_pstream_send_simple_ack(c->pstream, tag);
4249 }
4250
4251 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4252     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4253     uint32_t idx;
4254     const char *name;
4255
4256     pa_native_connection_assert_ref(c);
4257     pa_assert(t);
4258
4259     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4260         pa_tagstruct_gets(t, &name) < 0 ||
4261         !pa_tagstruct_eof(t)) {
4262         protocol_error(c);
4263         return;
4264     }
4265
4266     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4267     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
4268
4269     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
4270         playback_stream *s;
4271
4272         s = pa_idxset_get_by_index(c->output_streams, idx);
4273         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4274         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
4275
4276         pa_sink_input_set_name(s->sink_input, name);
4277
4278     } else {
4279         record_stream *s;
4280         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
4281
4282         s = pa_idxset_get_by_index(c->record_streams, idx);
4283         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4284
4285         pa_source_output_set_name(s->source_output, name);
4286     }
4287
4288     pa_pstream_send_simple_ack(c->pstream, tag);
4289 }
4290
4291 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4292     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4293     uint32_t idx;
4294
4295     pa_native_connection_assert_ref(c);
4296     pa_assert(t);
4297
4298     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4299         !pa_tagstruct_eof(t)) {
4300         protocol_error(c);
4301         return;
4302     }
4303
4304     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4305
4306     if (command == PA_COMMAND_KILL_CLIENT) {
4307         pa_client *client;
4308
4309         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
4310         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
4311
4312         pa_native_connection_ref(c);
4313         pa_client_kill(client);
4314
4315     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
4316         pa_sink_input *s;
4317
4318         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4319         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4320
4321         pa_native_connection_ref(c);
4322         pa_sink_input_kill(s);
4323     } else {
4324         pa_source_output *s;
4325
4326         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4327
4328         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4329         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4330
4331         pa_native_connection_ref(c);
4332         pa_source_output_kill(s);
4333     }
4334
4335     pa_pstream_send_simple_ack(c->pstream, tag);
4336     pa_native_connection_unref(c);
4337 }
4338
4339 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4340     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4341     pa_module *m;
4342     const char *name, *argument;
4343     pa_tagstruct *reply;
4344
4345     pa_native_connection_assert_ref(c);
4346     pa_assert(t);
4347
4348     if (pa_tagstruct_gets(t, &name) < 0 ||
4349         pa_tagstruct_gets(t, &argument) < 0 ||
4350         !pa_tagstruct_eof(t)) {
4351         protocol_error(c);
4352         return;
4353     }
4354
4355     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4356     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4357     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4358
4359     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4360         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4361         return;
4362     }
4363
4364     reply = reply_new(tag);
4365     pa_tagstruct_putu32(reply, m->index);
4366     pa_pstream_send_tagstruct(c->pstream, reply);
4367 }
4368
4369 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4370     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4371     uint32_t idx;
4372     pa_module *m;
4373
4374     pa_native_connection_assert_ref(c);
4375     pa_assert(t);
4376
4377     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4378         !pa_tagstruct_eof(t)) {
4379         protocol_error(c);
4380         return;
4381     }
4382
4383     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4384     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4385     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4386
4387     pa_module_unload_request(m, FALSE);
4388     pa_pstream_send_simple_ack(c->pstream, tag);
4389 }
4390
4391 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4392     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4393     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4394     const char *name_device = NULL;
4395
4396     pa_native_connection_assert_ref(c);
4397     pa_assert(t);
4398
4399     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4400         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4401         pa_tagstruct_gets(t, &name_device) < 0 ||
4402         !pa_tagstruct_eof(t)) {
4403         protocol_error(c);
4404         return;
4405     }
4406
4407     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4408     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4409
4410     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name_or_wildcard(name_device, command == PA_COMMAND_MOVE_SINK_INPUT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4411     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4412     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4413     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4414
4415     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4416         pa_sink_input *si = NULL;
4417         pa_sink *sink = NULL;
4418
4419         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4420
4421         if (idx_device != PA_INVALID_INDEX)
4422             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4423         else
4424             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4425
4426         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4427
4428         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4429             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4430             return;
4431         }
4432     } else {
4433         pa_source_output *so = NULL;
4434         pa_source *source;
4435
4436         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4437
4438         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4439
4440         if (idx_device != PA_INVALID_INDEX)
4441             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4442         else
4443             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4444
4445         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4446
4447         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4448             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4449             return;
4450         }
4451     }
4452
4453     pa_pstream_send_simple_ack(c->pstream, tag);
4454 }
4455
4456 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4457     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4458     uint32_t idx = PA_INVALID_INDEX;
4459     const char *name = NULL;
4460     pa_bool_t b;
4461
4462     pa_native_connection_assert_ref(c);
4463     pa_assert(t);
4464
4465     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4466         pa_tagstruct_gets(t, &name) < 0 ||
4467         pa_tagstruct_get_boolean(t, &b) < 0 ||
4468         !pa_tagstruct_eof(t)) {
4469         protocol_error(c);
4470         return;
4471     }
4472
4473     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4474     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SUSPEND_SINK ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE) || *name == 0, tag, PA_ERR_INVALID);
4475     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4476     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4477     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4478
4479     if (command == PA_COMMAND_SUSPEND_SINK) {
4480
4481         if (idx == PA_INVALID_INDEX && name && !*name) {
4482
4483             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4484
4485             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4486                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4487                 return;
4488             }
4489         } else {
4490             pa_sink *sink = NULL;
4491
4492             if (idx != PA_INVALID_INDEX)
4493                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4494             else
4495                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4496
4497             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4498
4499             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4500                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4501                 return;
4502             }
4503         }
4504     } else {
4505
4506         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4507
4508         if (idx == PA_INVALID_INDEX && name && !*name) {
4509
4510             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4511
4512             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4513                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4514                 return;
4515             }
4516
4517         } else {
4518             pa_source *source;
4519
4520             if (idx != PA_INVALID_INDEX)
4521                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4522             else
4523                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4524
4525             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4526
4527             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4528                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4529                 return;
4530             }
4531         }
4532     }
4533
4534     pa_pstream_send_simple_ack(c->pstream, tag);
4535 }
4536
4537 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4538     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4539     uint32_t idx = PA_INVALID_INDEX;
4540     const char *name = NULL;
4541     pa_module *m;
4542     pa_native_protocol_ext_cb_t cb;
4543
4544     pa_native_connection_assert_ref(c);
4545     pa_assert(t);
4546
4547     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4548         pa_tagstruct_gets(t, &name) < 0) {
4549         protocol_error(c);
4550         return;
4551     }
4552
4553     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4554     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4555     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4556     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4557     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4558
4559     if (idx != PA_INVALID_INDEX)
4560         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4561     else {
4562         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4563             if (strcmp(name, m->name) == 0)
4564                 break;
4565     }
4566
4567     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4568     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4569
4570     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4571     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4572
4573     if (cb(c->protocol, m, c, tag, t) < 0)
4574         protocol_error(c);
4575 }
4576
4577 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4578     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4579     uint32_t idx = PA_INVALID_INDEX;
4580     const char *name = NULL, *profile = NULL;
4581     pa_card *card = NULL;
4582     int ret;
4583
4584     pa_native_connection_assert_ref(c);
4585     pa_assert(t);
4586
4587     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4588         pa_tagstruct_gets(t, &name) < 0 ||
4589         pa_tagstruct_gets(t, &profile) < 0 ||
4590         !pa_tagstruct_eof(t)) {
4591         protocol_error(c);
4592         return;
4593     }
4594
4595     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4596     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4597     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4598     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4599     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4600
4601     if (idx != PA_INVALID_INDEX)
4602         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4603     else
4604         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4605
4606     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4607
4608     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4609         pa_pstream_send_error(c->pstream, tag, -ret);
4610         return;
4611     }
4612
4613     pa_pstream_send_simple_ack(c->pstream, tag);
4614 }
4615
4616 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4617     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4618     uint32_t idx = PA_INVALID_INDEX;
4619     const char *name = NULL, *port = NULL;
4620     int ret;
4621
4622     pa_native_connection_assert_ref(c);
4623     pa_assert(t);
4624
4625     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4626         pa_tagstruct_gets(t, &name) < 0 ||
4627         pa_tagstruct_gets(t, &port) < 0 ||
4628         !pa_tagstruct_eof(t)) {
4629         protocol_error(c);
4630         return;
4631     }
4632
4633     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4634     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name_or_wildcard(name, command == PA_COMMAND_SET_SINK_PORT ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE), tag, PA_ERR_INVALID);
4635     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4636     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4637     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4638
4639     if (command == PA_COMMAND_SET_SINK_PORT) {
4640         pa_sink *sink;
4641
4642         if (idx != PA_INVALID_INDEX)
4643             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4644         else
4645             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4646
4647         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4648
4649         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4650             pa_pstream_send_error(c->pstream, tag, -ret);
4651             return;
4652         }
4653     } else {
4654         pa_source *source;
4655
4656         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4657
4658         if (idx != PA_INVALID_INDEX)
4659             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4660         else
4661             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4662
4663         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4664
4665         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4666             pa_pstream_send_error(c->pstream, tag, -ret);
4667             return;
4668         }
4669     }
4670
4671     pa_pstream_send_simple_ack(c->pstream, tag);
4672 }
4673
4674 /*** pstream callbacks ***/
4675
4676 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4677     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4678
4679     pa_assert(p);
4680     pa_assert(packet);
4681     pa_native_connection_assert_ref(c);
4682
4683     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4684         pa_log("invalid packet.");
4685         native_connection_unlink(c);
4686     }
4687 }
4688
4689 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4690     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4691     output_stream *stream;
4692
4693     pa_assert(p);
4694     pa_assert(chunk);
4695     pa_native_connection_assert_ref(c);
4696
4697     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4698         pa_log_debug("Client sent block for invalid stream.");
4699         /* Ignoring */
4700         return;
4701     }
4702
4703 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4704
4705     if (playback_stream_isinstance(stream)) {
4706         playback_stream *ps = PLAYBACK_STREAM(stream);
4707
4708         pa_atomic_inc(&ps->seek_or_post_in_queue);
4709         if (chunk->memblock) {
4710             if (seek != PA_SEEK_RELATIVE || offset != 0)
4711                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, chunk, NULL);
4712             else
4713                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4714         } else
4715             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4716
4717     } else {
4718         upload_stream *u = UPLOAD_STREAM(stream);
4719         size_t l;
4720
4721         if (!u->memchunk.memblock) {
4722             if (u->length == chunk->length && chunk->memblock) {
4723                 u->memchunk = *chunk;
4724                 pa_memblock_ref(u->memchunk.memblock);
4725                 u->length = 0;
4726             } else {
4727                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4728                 u->memchunk.index = u->memchunk.length = 0;
4729             }
4730         }
4731
4732         pa_assert(u->memchunk.memblock);
4733
4734         l = u->length;
4735         if (l > chunk->length)
4736             l = chunk->length;
4737
4738         if (l > 0) {
4739             void *dst;
4740             dst = pa_memblock_acquire(u->memchunk.memblock);
4741
4742             if (chunk->memblock) {
4743                 void *src;
4744                 src = pa_memblock_acquire(chunk->memblock);
4745
4746                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4747                        (uint8_t*) src + chunk->index, l);
4748
4749                 pa_memblock_release(chunk->memblock);
4750             } else
4751                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4752
4753             pa_memblock_release(u->memchunk.memblock);
4754
4755             u->memchunk.length += l;
4756             u->length -= l;
4757         }
4758     }
4759 }
4760
4761 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4762     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4763
4764     pa_assert(p);
4765     pa_native_connection_assert_ref(c);
4766
4767     native_connection_unlink(c);
4768     pa_log_info("Connection died.");
4769 }
4770
4771 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4772     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4773
4774     pa_assert(p);
4775     pa_native_connection_assert_ref(c);
4776
4777     native_connection_send_memblock(c);
4778 }
4779
4780 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4781     pa_thread_mq *q;
4782
4783     if (!(q = pa_thread_mq_get()))
4784         pa_pstream_send_revoke(p, block_id);
4785     else
4786         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4787 }
4788
4789 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4790     pa_thread_mq *q;
4791
4792     if (!(q = pa_thread_mq_get()))
4793         pa_pstream_send_release(p, block_id);
4794     else
4795         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4796 }
4797
4798 /*** client callbacks ***/
4799
4800 static void client_kill_cb(pa_client *c) {
4801     pa_assert(c);
4802
4803     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4804     pa_log_info("Connection killed.");
4805 }
4806
4807 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4808     pa_tagstruct *t;
4809     pa_native_connection *c;
4810
4811     pa_assert(client);
4812     c = PA_NATIVE_CONNECTION(client->userdata);
4813     pa_native_connection_assert_ref(c);
4814
4815     if (c->version < 15)
4816       return;
4817
4818     t = pa_tagstruct_new(NULL, 0);
4819     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4820     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4821     pa_tagstruct_puts(t, event);
4822     pa_tagstruct_put_proplist(t, pl);
4823     pa_pstream_send_tagstruct(c->pstream, t);
4824 }
4825
4826 /*** module entry points ***/
4827
4828 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4829     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4830
4831     pa_assert(m);
4832     pa_native_connection_assert_ref(c);
4833     pa_assert(c->auth_timeout_event == e);
4834
4835     if (!c->authorized) {
4836         native_connection_unlink(c);
4837         pa_log_info("Connection terminated due to authentication timeout.");
4838     }
4839 }
4840
4841 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4842     pa_native_connection *c;
4843     char pname[128];
4844     pa_client *client;
4845     pa_client_new_data data;
4846
4847     pa_assert(p);
4848     pa_assert(io);
4849     pa_assert(o);
4850
4851     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4852         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4853         pa_iochannel_free(io);
4854         return;
4855     }
4856
4857     pa_client_new_data_init(&data);
4858     data.module = o->module;
4859     data.driver = __FILE__;
4860     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4861     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4862     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4863     client = pa_client_new(p->core, &data);
4864     pa_client_new_data_done(&data);
4865
4866     if (!client)
4867         return;
4868
4869     c = pa_msgobject_new(pa_native_connection);
4870     c->parent.parent.free = native_connection_free;
4871     c->parent.process_msg = native_connection_process_msg;
4872     c->protocol = p;
4873     c->options = pa_native_options_ref(o);
4874     c->authorized = FALSE;
4875
4876     if (o->auth_anonymous) {
4877         pa_log_info("Client authenticated anonymously.");
4878         c->authorized = TRUE;
4879     }
4880
4881     if (!c->authorized &&
4882         o->auth_ip_acl &&
4883         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4884
4885         pa_log_info("Client authenticated by IP ACL.");
4886         c->authorized = TRUE;
4887     }
4888
4889     if (!c->authorized)
4890         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4891     else
4892         c->auth_timeout_event = NULL;
4893
4894     c->is_local = pa_iochannel_socket_is_local(io);
4895     c->version = 8;
4896
4897     c->client = client;
4898     c->client->kill = client_kill_cb;
4899     c->client->send_event = client_send_event_cb;
4900     c->client->userdata = c;
4901
4902     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4903     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4904     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4905     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4906     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4907     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4908     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4909
4910     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4911
4912     c->record_streams = pa_idxset_new(NULL, NULL);
4913     c->output_streams = pa_idxset_new(NULL, NULL);
4914
4915     c->rrobin_index = PA_IDXSET_INVALID;
4916     c->subscription = NULL;
4917
4918     pa_idxset_put(p->connections, c, NULL);
4919
4920 #ifdef HAVE_CREDS
4921     if (pa_iochannel_creds_supported(io))
4922         pa_iochannel_creds_enable(io);
4923 #endif
4924
4925     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4926 }
4927
4928 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4929     pa_native_connection *c;
4930     void *state = NULL;
4931
4932     pa_assert(p);
4933     pa_assert(m);
4934
4935     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4936         if (c->options->module == m)
4937             native_connection_unlink(c);
4938 }
4939
4940 static pa_native_protocol* native_protocol_new(pa_core *c) {
4941     pa_native_protocol *p;
4942     pa_native_hook_t h;
4943
4944     pa_assert(c);
4945
4946     p = pa_xnew(pa_native_protocol, 1);
4947     PA_REFCNT_INIT(p);
4948     p->core = c;
4949     p->connections = pa_idxset_new(NULL, NULL);
4950
4951     p->servers = NULL;
4952
4953     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4954
4955     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4956         pa_hook_init(&p->hooks[h], p);
4957
4958     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4959
4960     return p;
4961 }
4962
4963 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4964     pa_native_protocol *p;
4965
4966     if ((p = pa_shared_get(c, "native-protocol")))
4967         return pa_native_protocol_ref(p);
4968
4969     return native_protocol_new(c);
4970 }
4971
4972 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4973     pa_assert(p);
4974     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4975
4976     PA_REFCNT_INC(p);
4977
4978     return p;
4979 }
4980
4981 void pa_native_protocol_unref(pa_native_protocol *p) {
4982     pa_native_connection *c;
4983     pa_native_hook_t h;
4984
4985     pa_assert(p);
4986     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4987
4988     if (PA_REFCNT_DEC(p) > 0)
4989         return;
4990
4991     while ((c = pa_idxset_first(p->connections, NULL)))
4992         native_connection_unlink(c);
4993
4994     pa_idxset_free(p->connections, NULL, NULL);
4995
4996     pa_strlist_free(p->servers);
4997
4998     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4999         pa_hook_done(&p->hooks[h]);
5000
5001     pa_hashmap_free(p->extensions, NULL, NULL);
5002
5003     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
5004
5005     pa_xfree(p);
5006 }
5007
5008 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
5009     pa_assert(p);
5010     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5011     pa_assert(name);
5012
5013     p->servers = pa_strlist_prepend(p->servers, name);
5014
5015     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
5016 }
5017
5018 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
5019     pa_assert(p);
5020     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5021     pa_assert(name);
5022
5023     p->servers = pa_strlist_remove(p->servers, name);
5024
5025     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
5026 }
5027
5028 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
5029     pa_assert(p);
5030     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5031
5032     return p->hooks;
5033 }
5034
5035 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
5036     pa_assert(p);
5037     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5038
5039     return p->servers;
5040 }
5041
5042 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
5043     pa_assert(p);
5044     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5045     pa_assert(m);
5046     pa_assert(cb);
5047     pa_assert(!pa_hashmap_get(p->extensions, m));
5048
5049     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
5050     return 0;
5051 }
5052
5053 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
5054     pa_assert(p);
5055     pa_assert(PA_REFCNT_VALUE(p) >= 1);
5056     pa_assert(m);
5057
5058     pa_assert_se(pa_hashmap_remove(p->extensions, m));
5059 }
5060
5061 pa_native_options* pa_native_options_new(void) {
5062     pa_native_options *o;
5063
5064     o = pa_xnew0(pa_native_options, 1);
5065     PA_REFCNT_INIT(o);
5066
5067     return o;
5068 }
5069
5070 pa_native_options* pa_native_options_ref(pa_native_options *o) {
5071     pa_assert(o);
5072     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5073
5074     PA_REFCNT_INC(o);
5075
5076     return o;
5077 }
5078
5079 void pa_native_options_unref(pa_native_options *o) {
5080     pa_assert(o);
5081     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5082
5083     if (PA_REFCNT_DEC(o) > 0)
5084         return;
5085
5086     pa_xfree(o->auth_group);
5087
5088     if (o->auth_ip_acl)
5089         pa_ip_acl_free(o->auth_ip_acl);
5090
5091     if (o->auth_cookie)
5092         pa_auth_cookie_unref(o->auth_cookie);
5093
5094     pa_xfree(o);
5095 }
5096
5097 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
5098     pa_bool_t enabled;
5099     const char *acl;
5100
5101     pa_assert(o);
5102     pa_assert(PA_REFCNT_VALUE(o) >= 1);
5103     pa_assert(ma);
5104
5105     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
5106         pa_log("auth-anonymous= expects a boolean argument.");
5107         return -1;
5108     }
5109
5110     enabled = TRUE;
5111     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
5112         pa_log("auth-group-enabled= expects a boolean argument.");
5113         return -1;
5114     }
5115
5116     pa_xfree(o->auth_group);
5117     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
5118
5119 #ifndef HAVE_CREDS
5120     if (o->auth_group)
5121         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
5122 #endif
5123
5124     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
5125         pa_ip_acl *ipa;
5126
5127         if (!(ipa = pa_ip_acl_new(acl))) {
5128             pa_log("Failed to parse IP ACL '%s'", acl);
5129             return -1;
5130         }
5131
5132         if (o->auth_ip_acl)
5133             pa_ip_acl_free(o->auth_ip_acl);
5134
5135         o->auth_ip_acl = ipa;
5136     }
5137
5138     enabled = TRUE;
5139     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
5140         pa_log("auth-cookie-enabled= expects a boolean argument.");
5141         return -1;
5142     }
5143
5144     if (o->auth_cookie)
5145         pa_auth_cookie_unref(o->auth_cookie);
5146
5147     if (enabled) {
5148         const char *cn;
5149
5150         /* The new name for this is 'auth-cookie', for compat reasons
5151          * we check the old name too */
5152         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
5153             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
5154                 cn = PA_NATIVE_COOKIE_FILE;
5155
5156         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
5157             return -1;
5158
5159     } else
5160           o->auth_cookie = NULL;
5161
5162     return 0;
5163 }
5164
5165 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
5166     pa_native_connection_assert_ref(c);
5167
5168     return c->pstream;
5169 }
5170
5171 pa_client* pa_native_connection_get_client(pa_native_connection *c) {
5172     pa_native_connection_assert_ref(c);
5173
5174     return c->client;
5175 }