Merge commit 'origin/master-tx'
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/timeval.h>
33 #include <pulse/version.h>
34 #include <pulse/utf8.h>
35 #include <pulse/util.h>
36 #include <pulse/xmalloc.h>
37
38 #include <pulsecore/native-common.h>
39 #include <pulsecore/packet.h>
40 #include <pulsecore/client.h>
41 #include <pulsecore/source-output.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/pstream.h>
44 #include <pulsecore/tagstruct.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream-util.h>
47 #include <pulsecore/authkey.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/core-subscribe.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/strlist.h>
53 #include <pulsecore/shared.h>
54 #include <pulsecore/sample-util.h>
55 #include <pulsecore/llist.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* Kick a client if it doesn't authenticate within this time */
64 #define AUTH_TIMEOUT 60
65
66 /* Don't accept more connection than this */
67 #define MAX_CONNECTIONS 64
68
69 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
70 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
71 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
72 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
73
74 struct pa_native_protocol;
75
76 typedef struct record_stream {
77     pa_msgobject parent;
78
79     pa_native_connection *connection;
80     uint32_t index;
81
82     pa_source_output *source_output;
83     pa_memblockq *memblockq;
84
85     pa_bool_t adjust_latency:1;
86     pa_bool_t early_requests:1;
87
88     pa_buffer_attr buffer_attr;
89
90     pa_atomic_t on_the_fly;
91     pa_usec_t configured_source_latency;
92     size_t drop_initial;
93
94     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
95     size_t on_the_fly_snapshot;
96     pa_usec_t current_monitor_latency;
97     pa_usec_t current_source_latency;
98 } record_stream;
99
100 PA_DECLARE_CLASS(record_stream);
101 #define RECORD_STREAM(o) (record_stream_cast(o))
102 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
103
104 typedef struct output_stream {
105     pa_msgobject parent;
106 } output_stream;
107
108 PA_DECLARE_CLASS(output_stream);
109 #define OUTPUT_STREAM(o) (output_stream_cast(o))
110 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
111
112 typedef struct playback_stream {
113     output_stream parent;
114
115     pa_native_connection *connection;
116     uint32_t index;
117
118     pa_sink_input *sink_input;
119     pa_memblockq *memblockq;
120
121     pa_bool_t adjust_latency:1;
122     pa_bool_t early_requests:1;
123
124     pa_bool_t is_underrun:1;
125     pa_bool_t drain_request:1;
126     uint32_t drain_tag;
127     uint32_t syncid;
128
129     pa_atomic_t missing;
130     pa_usec_t configured_sink_latency;
131     pa_buffer_attr buffer_attr;
132
133     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
134     int64_t read_index, write_index;
135     size_t render_memblockq_length;
136     pa_usec_t current_sink_latency;
137     uint64_t playing_for, underrun_for;
138 } playback_stream;
139
140 PA_DECLARE_CLASS(playback_stream);
141 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
142 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
143
144 typedef struct upload_stream {
145     output_stream parent;
146
147     pa_native_connection *connection;
148     uint32_t index;
149
150     pa_memchunk memchunk;
151     size_t length;
152     char *name;
153     pa_sample_spec sample_spec;
154     pa_channel_map channel_map;
155     pa_proplist *proplist;
156 } upload_stream;
157
158 PA_DECLARE_CLASS(upload_stream);
159 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
160 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
161
162 struct pa_native_connection {
163     pa_msgobject parent;
164     pa_native_protocol *protocol;
165     pa_native_options *options;
166     pa_bool_t authorized:1;
167     pa_bool_t is_local:1;
168     uint32_t version;
169     pa_client *client;
170     pa_pstream *pstream;
171     pa_pdispatch *pdispatch;
172     pa_idxset *record_streams, *output_streams;
173     uint32_t rrobin_index;
174     pa_subscription *subscription;
175     pa_time_event *auth_timeout_event;
176 };
177
178 PA_DECLARE_CLASS(pa_native_connection);
179 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
180 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
181
182 struct pa_native_protocol {
183     PA_REFCNT_DECLARE;
184
185     pa_core *core;
186     pa_idxset *connections;
187
188     pa_strlist *servers;
189     pa_hook hooks[PA_NATIVE_HOOK_MAX];
190
191     pa_hashmap *extensions;
192 };
193
194 enum {
195     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
196 };
197
198 enum {
199     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
200     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
201     SINK_INPUT_MESSAGE_FLUSH,
202     SINK_INPUT_MESSAGE_TRIGGER,
203     SINK_INPUT_MESSAGE_SEEK,
204     SINK_INPUT_MESSAGE_PREBUF_FORCE,
205     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
206     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
207 };
208
209 enum {
210     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
211     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
212     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
213     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
214     PLAYBACK_STREAM_MESSAGE_STARTED,
215     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
216 };
217
218 enum {
219     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
220 };
221
222 enum {
223     CONNECTION_MESSAGE_RELEASE,
224     CONNECTION_MESSAGE_REVOKE
225 };
226
227 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
228 static void sink_input_kill_cb(pa_sink_input *i);
229 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
230 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
231 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
232 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
235
236 static void native_connection_send_memblock(pa_native_connection *c);
237 static void playback_stream_request_bytes(struct playback_stream*s);
238
239 static void source_output_kill_cb(pa_source_output *o);
240 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
241 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
242 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
243 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
244 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
245
246 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
247 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248
249 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287
288 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
289     [PA_COMMAND_ERROR] = NULL,
290     [PA_COMMAND_TIMEOUT] = NULL,
291     [PA_COMMAND_REPLY] = NULL,
292     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
293     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
294     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
295     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
296     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
297     [PA_COMMAND_AUTH] = command_auth,
298     [PA_COMMAND_REQUEST] = NULL,
299     [PA_COMMAND_EXIT] = command_exit,
300     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
301     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
302     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
303     [PA_COMMAND_STAT] = command_stat,
304     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
305     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
306     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
307     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
308     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
309     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
310     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
311     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
312     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
313     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
314     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
315     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
316     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
317     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
318     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
319     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
320     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
321     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
328     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
329
330     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
331     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
332     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
333
334     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
335     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
336     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
337
338     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
339     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
340
341     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
342     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
343     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
344     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345
346     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
347     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
348
349     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
350     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
351     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
352     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
353     [PA_COMMAND_KILL_CLIENT] = command_kill,
354     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
355     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
356     [PA_COMMAND_LOAD_MODULE] = command_load_module,
357     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
358
359     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
360     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
361     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
362     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
363
364     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
365     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
366
367     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
368     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
369
370     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
371     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
372
373     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
374     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
375     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
376
377     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
378     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
379     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
380
381     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
382
383     [PA_COMMAND_EXTENSION] = command_extension
384 };
385
386 /* structure management */
387
388 /* Called from main context */
389 static void upload_stream_unlink(upload_stream *s) {
390     pa_assert(s);
391
392     if (!s->connection)
393         return;
394
395     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
396     s->connection = NULL;
397     upload_stream_unref(s);
398 }
399
400 /* Called from main context */
401 static void upload_stream_free(pa_object *o) {
402     upload_stream *s = UPLOAD_STREAM(o);
403     pa_assert(s);
404
405     upload_stream_unlink(s);
406
407     pa_xfree(s->name);
408
409     if (s->proplist)
410         pa_proplist_free(s->proplist);
411
412     if (s->memchunk.memblock)
413         pa_memblock_unref(s->memchunk.memblock);
414
415     pa_xfree(s);
416 }
417
418 /* Called from main context */
419 static upload_stream* upload_stream_new(
420         pa_native_connection *c,
421         const pa_sample_spec *ss,
422         const pa_channel_map *map,
423         const char *name,
424         size_t length,
425         pa_proplist *p) {
426
427     upload_stream *s;
428
429     pa_assert(c);
430     pa_assert(ss);
431     pa_assert(name);
432     pa_assert(length > 0);
433     pa_assert(p);
434
435     s = pa_msgobject_new(upload_stream);
436     s->parent.parent.parent.free = upload_stream_free;
437     s->connection = c;
438     s->sample_spec = *ss;
439     s->channel_map = *map;
440     s->name = pa_xstrdup(name);
441     pa_memchunk_reset(&s->memchunk);
442     s->length = length;
443     s->proplist = pa_proplist_copy(p);
444     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
445
446     pa_idxset_put(c->output_streams, s, &s->index);
447
448     return s;
449 }
450
451 /* Called from main context */
452 static void record_stream_unlink(record_stream *s) {
453     pa_assert(s);
454
455     if (!s->connection)
456         return;
457
458     if (s->source_output) {
459         pa_source_output_unlink(s->source_output);
460         pa_source_output_unref(s->source_output);
461         s->source_output = NULL;
462     }
463
464     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
465     s->connection = NULL;
466     record_stream_unref(s);
467 }
468
469 /* Called from main context */
470 static void record_stream_free(pa_object *o) {
471     record_stream *s = RECORD_STREAM(o);
472     pa_assert(s);
473
474     record_stream_unlink(s);
475
476     pa_memblockq_free(s->memblockq);
477     pa_xfree(s);
478 }
479
480 /* Called from main context */
481 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
482     record_stream *s = RECORD_STREAM(o);
483     record_stream_assert_ref(s);
484
485     if (!s->connection)
486         return -1;
487
488     switch (code) {
489
490         case RECORD_STREAM_MESSAGE_POST_DATA:
491
492             /* We try to keep up to date with how many bytes are
493              * currently on the fly */
494             pa_atomic_sub(&s->on_the_fly, chunk->length);
495
496             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
497 /*                 pa_log_warn("Failed to push data into output queue."); */
498                 return -1;
499             }
500
501             if (!pa_pstream_is_pending(s->connection->pstream))
502                 native_connection_send_memblock(s->connection);
503
504             break;
505     }
506
507     return 0;
508 }
509
510 /* Called from main context */
511 static void fix_record_buffer_attr_pre(record_stream *s) {
512
513     size_t frame_size;
514     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
515
516     pa_assert(s);
517
518     /* This function will be called from the main thread, before as
519      * well as after the source output has been activated using
520      * pa_source_output_put()! That means it may not touch any
521      * ->thread_info data! */
522
523     frame_size = pa_frame_size(&s->source_output->sample_spec);
524
525     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
526         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
527     if (s->buffer_attr.maxlength <= 0)
528         s->buffer_attr.maxlength = (uint32_t) frame_size;
529
530     if (s->buffer_attr.fragsize == (uint32_t) -1)
531         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
532     if (s->buffer_attr.fragsize <= 0)
533         s->buffer_attr.fragsize = (uint32_t) frame_size;
534
535     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
536
537     if (s->early_requests) {
538
539         /* In early request mode we need to emulate the classic
540          * fragment-based playback model. We do this setting the source
541          * latency to the fragment size. */
542
543         source_usec = fragsize_usec;
544
545     } else if (s->adjust_latency) {
546
547         /* So, the user asked us to adjust the latency according to
548          * what the source can provide. Half the latency will be
549          * spent on the hw buffer, half of it in the async buffer
550          * queue we maintain for each client. */
551
552         source_usec = fragsize_usec/2;
553
554     } else {
555
556         /* Ok, the user didn't ask us to adjust the latency, hence we
557          * don't */
558
559         source_usec = (pa_usec_t) -1;
560     }
561
562     if (source_usec != (pa_usec_t) -1)
563         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
564     else
565         s->configured_source_latency = 0;
566
567     if (s->early_requests) {
568
569         /* Ok, we didn't necessarily get what we were asking for, so
570          * let's tell the user */
571
572         fragsize_usec = s->configured_source_latency;
573
574     } else if (s->adjust_latency) {
575
576         /* Now subtract what we actually got */
577
578         if (fragsize_usec >= s->configured_source_latency*2)
579             fragsize_usec -= s->configured_source_latency;
580         else
581             fragsize_usec = s->configured_source_latency;
582     }
583
584     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
585         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
586
587         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
588
589     if (s->buffer_attr.fragsize <= 0)
590         s->buffer_attr.fragsize = (uint32_t) frame_size;
591 }
592
593 /* Called from main context */
594 static void fix_record_buffer_attr_post(record_stream *s) {
595     size_t base;
596
597     pa_assert(s);
598
599     /* This function will be called from the main thread, before as
600      * well as after the source output has been activated using
601      * pa_source_output_put()! That means it may not touch and
602      * ->thread_info data! */
603
604     base = pa_frame_size(&s->source_output->sample_spec);
605
606     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
607     if (s->buffer_attr.fragsize <= 0)
608         s->buffer_attr.fragsize = base;
609
610     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
611         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
612 }
613
614 /* Called from main context */
615 static record_stream* record_stream_new(
616         pa_native_connection *c,
617         pa_source *source,
618         pa_sample_spec *ss,
619         pa_channel_map *map,
620         pa_bool_t peak_detect,
621         pa_buffer_attr *attr,
622         pa_source_output_flags_t flags,
623         pa_proplist *p,
624         pa_bool_t adjust_latency,
625         pa_sink_input *direct_on_input,
626         pa_bool_t early_requests,
627         int *ret) {
628
629     record_stream *s;
630     pa_source_output *source_output = NULL;
631     size_t base;
632     pa_source_output_new_data data;
633
634     pa_assert(c);
635     pa_assert(ss);
636     pa_assert(p);
637     pa_assert(ret);
638
639     pa_source_output_new_data_init(&data);
640
641     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
642     data.driver = __FILE__;
643     data.module = c->options->module;
644     data.client = c->client;
645     data.source = source;
646     data.direct_on_input = direct_on_input;
647     pa_source_output_new_data_set_sample_spec(&data, ss);
648     pa_source_output_new_data_set_channel_map(&data, map);
649     if (peak_detect)
650         data.resample_method = PA_RESAMPLER_PEAKS;
651
652     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
653
654     pa_source_output_new_data_done(&data);
655
656     if (!source_output)
657         return NULL;
658
659     s = pa_msgobject_new(record_stream);
660     s->parent.parent.free = record_stream_free;
661     s->parent.process_msg = record_stream_process_msg;
662     s->connection = c;
663     s->source_output = source_output;
664     s->buffer_attr = *attr;
665     s->adjust_latency = adjust_latency;
666     s->early_requests = early_requests;
667     pa_atomic_store(&s->on_the_fly, 0);
668
669     s->source_output->parent.process_msg = source_output_process_msg;
670     s->source_output->push = source_output_push_cb;
671     s->source_output->kill = source_output_kill_cb;
672     s->source_output->get_latency = source_output_get_latency_cb;
673     s->source_output->moving = source_output_moving_cb;
674     s->source_output->suspend = source_output_suspend_cb;
675     s->source_output->send_event = source_output_send_event_cb;
676     s->source_output->userdata = s;
677
678     fix_record_buffer_attr_pre(s);
679
680     s->memblockq = pa_memblockq_new(
681             0,
682             s->buffer_attr.maxlength,
683             0,
684             base = pa_frame_size(&source_output->sample_spec),
685             1,
686             0,
687             0,
688             NULL);
689
690     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
691     fix_record_buffer_attr_post(s);
692
693     *ss = s->source_output->sample_spec;
694     *map = s->source_output->channel_map;
695
696     pa_idxset_put(c->record_streams, s, &s->index);
697
698     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
699                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
700                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
701                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
702
703     pa_source_output_put(s->source_output);
704     return s;
705 }
706
707 /* Called from main context */
708 static void record_stream_send_killed(record_stream *r) {
709     pa_tagstruct *t;
710     record_stream_assert_ref(r);
711
712     t = pa_tagstruct_new(NULL, 0);
713     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
714     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
715     pa_tagstruct_putu32(t, r->index);
716     pa_pstream_send_tagstruct(r->connection->pstream, t);
717 }
718
719 /* Called from main context */
720 static void playback_stream_unlink(playback_stream *s) {
721     pa_assert(s);
722
723     if (!s->connection)
724         return;
725
726     if (s->sink_input) {
727         pa_sink_input_unlink(s->sink_input);
728         pa_sink_input_unref(s->sink_input);
729         s->sink_input = NULL;
730     }
731
732     if (s->drain_request)
733         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
734
735     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
736     s->connection = NULL;
737     playback_stream_unref(s);
738 }
739
740 /* Called from main context */
741 static void playback_stream_free(pa_object* o) {
742     playback_stream *s = PLAYBACK_STREAM(o);
743     pa_assert(s);
744
745     playback_stream_unlink(s);
746
747     pa_memblockq_free(s->memblockq);
748     pa_xfree(s);
749 }
750
751 /* Called from main context */
752 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
753     playback_stream *s = PLAYBACK_STREAM(o);
754     playback_stream_assert_ref(s);
755
756     if (!s->connection)
757         return -1;
758
759     switch (code) {
760         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
761             pa_tagstruct *t;
762             int l = 0;
763
764             for (;;) {
765                 if ((l = pa_atomic_load(&s->missing)) <= 0)
766                     return 0;
767
768                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
769                     break;
770             }
771
772             t = pa_tagstruct_new(NULL, 0);
773             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
774             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
775             pa_tagstruct_putu32(t, s->index);
776             pa_tagstruct_putu32(t, (uint32_t) l);
777             pa_pstream_send_tagstruct(s->connection->pstream, t);
778
779 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
780             break;
781         }
782
783         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
784             pa_tagstruct *t;
785
786 /*             pa_log("signalling underflow"); */
787
788             /* Report that we're empty */
789             t = pa_tagstruct_new(NULL, 0);
790             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
791             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
792             pa_tagstruct_putu32(t, s->index);
793             pa_pstream_send_tagstruct(s->connection->pstream, t);
794             break;
795         }
796
797         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
798             pa_tagstruct *t;
799
800             /* Notify the user we're overflowed*/
801             t = pa_tagstruct_new(NULL, 0);
802             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
803             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
804             pa_tagstruct_putu32(t, s->index);
805             pa_pstream_send_tagstruct(s->connection->pstream, t);
806             break;
807         }
808
809         case PLAYBACK_STREAM_MESSAGE_STARTED:
810
811             if (s->connection->version >= 13) {
812                 pa_tagstruct *t;
813
814                 /* Notify the user we started playback */
815                 t = pa_tagstruct_new(NULL, 0);
816                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
817                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
818                 pa_tagstruct_putu32(t, s->index);
819                 pa_pstream_send_tagstruct(s->connection->pstream, t);
820             }
821
822             break;
823
824         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
825             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
826             break;
827
828         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
829             pa_tagstruct *t;
830
831             s->buffer_attr.tlength = (uint32_t) offset;
832
833             t = pa_tagstruct_new(NULL, 0);
834             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
835             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
836             pa_tagstruct_putu32(t, s->index);
837             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
838             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
839             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
840             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
841             pa_tagstruct_put_usec(t, s->configured_sink_latency);
842             pa_pstream_send_tagstruct(s->connection->pstream, t);
843
844             break;
845         }
846     }
847
848     return 0;
849 }
850
851 /* Called from main context */
852 static void fix_playback_buffer_attr(playback_stream *s) {
853     size_t frame_size;
854     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
855
856     pa_assert(s);
857
858     /* This function will be called from the main thread, before as
859      * well as after the sink input has been activated using
860      * pa_sink_input_put()! That means it may not touch any
861      * ->thread_info data, such as the memblockq! */
862
863     frame_size = pa_frame_size(&s->sink_input->sample_spec);
864
865     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
866         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
867     if (s->buffer_attr.maxlength <= 0)
868         s->buffer_attr.maxlength = (uint32_t) frame_size;
869
870     if (s->buffer_attr.tlength == (uint32_t) -1)
871         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
872     if (s->buffer_attr.tlength <= 0)
873         s->buffer_attr.tlength = (uint32_t) frame_size;
874
875     if (s->buffer_attr.minreq == (uint32_t) -1)
876         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
877     if (s->buffer_attr.minreq <= 0)
878         s->buffer_attr.minreq = (uint32_t) frame_size;
879
880     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
881         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
882
883     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
884     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
885
886     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
887                 (double) tlength_usec / PA_USEC_PER_MSEC,
888                 (double) minreq_usec / PA_USEC_PER_MSEC);
889
890     if (s->early_requests) {
891
892         /* In early request mode we need to emulate the classic
893          * fragment-based playback model. We do this setting the sink
894          * latency to the fragment size. */
895
896         sink_usec = minreq_usec;
897         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
898
899     } else if (s->adjust_latency) {
900
901         /* So, the user asked us to adjust the latency of the stream
902          * buffer according to the what the sink can provide. The
903          * tlength passed in shall be the overall latency. Roughly
904          * half the latency will be spent on the hw buffer, the other
905          * half of it in the async buffer queue we maintain for each
906          * client. In between we'll have a safety space of size
907          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
908          * empty and needs to be filled, then our buffer must have
909          * enough data to fulfill this request immediatly and thus
910          * have at least the same tlength as the size of the hw
911          * buffer. It additionally needs space for 2 times minreq
912          * because if the buffer ran empty and a partial fillup
913          * happens immediately on the next iteration we need to be
914          * able to fulfill it and give the application also minreq
915          * time to fill it up again for the next request Makes 2 times
916          * minreq in plus.. */
917
918         if (tlength_usec > minreq_usec*2)
919             sink_usec = (tlength_usec - minreq_usec*2)/2;
920         else
921             sink_usec = 0;
922
923         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
924
925     } else {
926
927         /* Ok, the user didn't ask us to adjust the latency, but we
928          * still need to make sure that the parameters from the user
929          * do make sense. */
930
931         if (tlength_usec > minreq_usec*2)
932             sink_usec = (tlength_usec - minreq_usec*2);
933         else
934             sink_usec = 0;
935
936         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
937     }
938
939     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
940
941     if (s->early_requests) {
942
943         /* Ok, we didn't necessarily get what we were asking for, so
944          * let's tell the user */
945
946         minreq_usec = s->configured_sink_latency;
947
948     } else if (s->adjust_latency) {
949
950         /* Ok, we didn't necessarily get what we were asking for, so
951          * let's subtract from what we asked for for the remaining
952          * buffer space */
953
954         if (tlength_usec >= s->configured_sink_latency)
955             tlength_usec -= s->configured_sink_latency;
956     }
957
958     /* FIXME: This is actually larger than necessary, since not all of
959      * the sink latency is actually rewritable. */
960     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
961         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
962
963     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
964         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
965         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
966
967     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
968         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
969         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
970
971     if (s->buffer_attr.minreq <= 0) {
972         s->buffer_attr.minreq = (uint32_t) frame_size;
973         s->buffer_attr.tlength += (uint32_t) frame_size*2;
974     }
975
976     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
977         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
978
979     if (s->buffer_attr.prebuf == (uint32_t) -1 || s->buffer_attr.prebuf > s->buffer_attr.tlength)
980         s->buffer_attr.prebuf = s->buffer_attr.tlength;
981 }
982
983 /* Called from main context */
984 static playback_stream* playback_stream_new(
985         pa_native_connection *c,
986         pa_sink *sink,
987         pa_sample_spec *ss,
988         pa_channel_map *map,
989         pa_buffer_attr *a,
990         pa_cvolume *volume,
991         pa_bool_t muted,
992         pa_bool_t muted_set,
993         uint32_t syncid,
994         uint32_t *missing,
995         pa_sink_input_flags_t flags,
996         pa_proplist *p,
997         pa_bool_t adjust_latency,
998         pa_bool_t early_requests,
999         int *ret) {
1000
1001     playback_stream *s, *ssync;
1002     pa_sink_input *sink_input = NULL;
1003     pa_memchunk silence;
1004     uint32_t idx;
1005     int64_t start_index;
1006     pa_sink_input_new_data data;
1007
1008     pa_assert(c);
1009     pa_assert(ss);
1010     pa_assert(missing);
1011     pa_assert(p);
1012     pa_assert(ret);
1013
1014     /* Find syncid group */
1015     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1016
1017         if (!playback_stream_isinstance(ssync))
1018             continue;
1019
1020         if (ssync->syncid == syncid)
1021             break;
1022     }
1023
1024     /* Synced streams must connect to the same sink */
1025     if (ssync) {
1026
1027         if (!sink)
1028             sink = ssync->sink_input->sink;
1029         else if (sink != ssync->sink_input->sink) {
1030             *ret = PA_ERR_INVALID;
1031             return NULL;
1032         }
1033     }
1034
1035     pa_sink_input_new_data_init(&data);
1036
1037     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1038     data.driver = __FILE__;
1039     data.module = c->options->module;
1040     data.client = c->client;
1041     data.sink = sink;
1042     pa_sink_input_new_data_set_sample_spec(&data, ss);
1043     pa_sink_input_new_data_set_channel_map(&data, map);
1044     if (volume)
1045         pa_sink_input_new_data_set_volume(&data, volume);
1046     if (muted_set)
1047         pa_sink_input_new_data_set_muted(&data, muted);
1048     data.sync_base = ssync ? ssync->sink_input : NULL;
1049
1050     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1051
1052     pa_sink_input_new_data_done(&data);
1053
1054     if (!sink_input)
1055         return NULL;
1056
1057     s = pa_msgobject_new(playback_stream);
1058     s->parent.parent.parent.free = playback_stream_free;
1059     s->parent.parent.process_msg = playback_stream_process_msg;
1060     s->connection = c;
1061     s->syncid = syncid;
1062     s->sink_input = sink_input;
1063     s->is_underrun = TRUE;
1064     s->drain_request = FALSE;
1065     pa_atomic_store(&s->missing, 0);
1066     s->buffer_attr = *a;
1067     s->adjust_latency = adjust_latency;
1068     s->early_requests = early_requests;
1069
1070     s->sink_input->parent.process_msg = sink_input_process_msg;
1071     s->sink_input->pop = sink_input_pop_cb;
1072     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1073     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1074     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1075     s->sink_input->kill = sink_input_kill_cb;
1076     s->sink_input->moving = sink_input_moving_cb;
1077     s->sink_input->suspend = sink_input_suspend_cb;
1078     s->sink_input->send_event = sink_input_send_event_cb;
1079     s->sink_input->userdata = s;
1080
1081     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1082
1083     fix_playback_buffer_attr(s);
1084
1085     pa_sink_input_get_silence(sink_input, &silence);
1086     s->memblockq = pa_memblockq_new(
1087             start_index,
1088             s->buffer_attr.maxlength,
1089             s->buffer_attr.tlength,
1090             pa_frame_size(&sink_input->sample_spec),
1091             s->buffer_attr.prebuf,
1092             s->buffer_attr.minreq,
1093             0,
1094             &silence);
1095     pa_memblock_unref(silence.memblock);
1096
1097     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1098
1099     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1100
1101     *ss = s->sink_input->sample_spec;
1102     *map = s->sink_input->channel_map;
1103
1104     pa_idxset_put(c->output_streams, s, &s->index);
1105
1106     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1107                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1108                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1109                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1110                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1111
1112     pa_sink_input_put(s->sink_input);
1113     return s;
1114 }
1115
1116 /* Called from IO context */
1117 static void playback_stream_request_bytes(playback_stream *s) {
1118     size_t m, minreq;
1119     int previous_missing;
1120
1121     playback_stream_assert_ref(s);
1122
1123     m = pa_memblockq_pop_missing(s->memblockq);
1124
1125     if (m <= 0)
1126         return;
1127
1128 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1129
1130     previous_missing = pa_atomic_add(&s->missing, (int) m);
1131     minreq = pa_memblockq_get_minreq(s->memblockq);
1132
1133     if (pa_memblockq_prebuf_active(s->memblockq) ||
1134         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1135         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1136 }
1137
1138
1139 /* Called from main context */
1140 static void playback_stream_send_killed(playback_stream *p) {
1141     pa_tagstruct *t;
1142     playback_stream_assert_ref(p);
1143
1144     t = pa_tagstruct_new(NULL, 0);
1145     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1146     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1147     pa_tagstruct_putu32(t, p->index);
1148     pa_pstream_send_tagstruct(p->connection->pstream, t);
1149 }
1150
1151 /* Called from main context */
1152 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1153     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1154     pa_native_connection_assert_ref(c);
1155
1156     if (!c->protocol)
1157         return -1;
1158
1159     switch (code) {
1160
1161         case CONNECTION_MESSAGE_REVOKE:
1162             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1163             break;
1164
1165         case CONNECTION_MESSAGE_RELEASE:
1166             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1167             break;
1168     }
1169
1170     return 0;
1171 }
1172
1173 /* Called from main context */
1174 static void native_connection_unlink(pa_native_connection *c) {
1175     record_stream *r;
1176     output_stream *o;
1177
1178     pa_assert(c);
1179
1180     if (!c->protocol)
1181         return;
1182
1183     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1184
1185     if (c->options)
1186         pa_native_options_unref(c->options);
1187
1188     while ((r = pa_idxset_first(c->record_streams, NULL)))
1189         record_stream_unlink(r);
1190
1191     while ((o = pa_idxset_first(c->output_streams, NULL)))
1192         if (playback_stream_isinstance(o))
1193             playback_stream_unlink(PLAYBACK_STREAM(o));
1194         else
1195             upload_stream_unlink(UPLOAD_STREAM(o));
1196
1197     if (c->subscription)
1198         pa_subscription_free(c->subscription);
1199
1200     if (c->pstream)
1201         pa_pstream_unlink(c->pstream);
1202
1203     if (c->auth_timeout_event) {
1204         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1205         c->auth_timeout_event = NULL;
1206     }
1207
1208     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1209     c->protocol = NULL;
1210     pa_native_connection_unref(c);
1211 }
1212
1213 /* Called from main context */
1214 static void native_connection_free(pa_object *o) {
1215     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1216
1217     pa_assert(c);
1218
1219     native_connection_unlink(c);
1220
1221     pa_idxset_free(c->record_streams, NULL, NULL);
1222     pa_idxset_free(c->output_streams, NULL, NULL);
1223
1224     pa_pdispatch_unref(c->pdispatch);
1225     pa_pstream_unref(c->pstream);
1226     pa_client_free(c->client);
1227
1228     pa_xfree(c);
1229 }
1230
1231 /* Called from main context */
1232 static void native_connection_send_memblock(pa_native_connection *c) {
1233     uint32_t start;
1234     record_stream *r;
1235
1236     start = PA_IDXSET_INVALID;
1237     for (;;) {
1238         pa_memchunk chunk;
1239
1240         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1241             return;
1242
1243         if (start == PA_IDXSET_INVALID)
1244             start = c->rrobin_index;
1245         else if (start == c->rrobin_index)
1246             return;
1247
1248         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1249             pa_memchunk schunk = chunk;
1250
1251             if (schunk.length > r->buffer_attr.fragsize)
1252                 schunk.length = r->buffer_attr.fragsize;
1253
1254             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1255
1256             pa_memblockq_drop(r->memblockq, schunk.length);
1257             pa_memblock_unref(schunk.memblock);
1258
1259             return;
1260         }
1261     }
1262 }
1263
1264 /*** sink input callbacks ***/
1265
1266 /* Called from thread context */
1267 static void handle_seek(playback_stream *s, int64_t indexw) {
1268     playback_stream_assert_ref(s);
1269
1270 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1271
1272     if (s->sink_input->thread_info.underrun_for > 0) {
1273
1274 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1275
1276         if (pa_memblockq_is_readable(s->memblockq)) {
1277
1278             /* We just ended an underrun, let's ask the sink
1279              * for a complete rewind rewrite */
1280
1281             pa_log_debug("Requesting rewind due to end of underrun.");
1282             pa_sink_input_request_rewind(s->sink_input,
1283                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1284                                          FALSE, TRUE, FALSE);
1285         }
1286
1287     } else {
1288         int64_t indexr;
1289
1290         indexr = pa_memblockq_get_read_index(s->memblockq);
1291
1292         if (indexw < indexr) {
1293             /* OK, the sink already asked for this data, so
1294              * let's have it usk us again */
1295
1296             pa_log_debug("Requesting rewind due to rewrite.");
1297             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1298         }
1299     }
1300
1301     playback_stream_request_bytes(s);
1302 }
1303
1304 /* Called from thread context */
1305 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1306     pa_sink_input *i = PA_SINK_INPUT(o);
1307     playback_stream *s;
1308
1309     pa_sink_input_assert_ref(i);
1310     s = PLAYBACK_STREAM(i->userdata);
1311     playback_stream_assert_ref(s);
1312
1313     switch (code) {
1314
1315         case SINK_INPUT_MESSAGE_SEEK: {
1316             int64_t windex;
1317
1318             windex = pa_memblockq_get_write_index(s->memblockq);
1319
1320             /* The client side is incapable of accounting correctly
1321              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1322              * able to deal with that. */
1323
1324             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1325
1326             handle_seek(s, windex);
1327             return 0;
1328         }
1329
1330         case SINK_INPUT_MESSAGE_POST_DATA: {
1331             int64_t windex;
1332
1333             pa_assert(chunk);
1334
1335             windex = pa_memblockq_get_write_index(s->memblockq);
1336
1337 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1338
1339             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1340                 pa_log_warn("Failed to push data into queue");
1341                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1342                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1343             }
1344
1345             handle_seek(s, windex);
1346
1347 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1348
1349             return 0;
1350         }
1351
1352         case SINK_INPUT_MESSAGE_DRAIN:
1353         case SINK_INPUT_MESSAGE_FLUSH:
1354         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1355         case SINK_INPUT_MESSAGE_TRIGGER: {
1356
1357             int64_t windex;
1358             pa_sink_input *isync;
1359             void (*func)(pa_memblockq *bq);
1360
1361             switch  (code) {
1362                 case SINK_INPUT_MESSAGE_FLUSH:
1363                     func = pa_memblockq_flush_write;
1364                     break;
1365
1366                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1367                     func = pa_memblockq_prebuf_force;
1368                     break;
1369
1370                 case SINK_INPUT_MESSAGE_DRAIN:
1371                 case SINK_INPUT_MESSAGE_TRIGGER:
1372                     func = pa_memblockq_prebuf_disable;
1373                     break;
1374
1375                 default:
1376                     pa_assert_not_reached();
1377             }
1378
1379             windex = pa_memblockq_get_write_index(s->memblockq);
1380             func(s->memblockq);
1381             handle_seek(s, windex);
1382
1383             /* Do the same for all other members in the sync group */
1384             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1385                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1386                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1387                 func(ssync->memblockq);
1388                 handle_seek(ssync, windex);
1389             }
1390
1391             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1392                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1393                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1394                 func(ssync->memblockq);
1395                 handle_seek(ssync, windex);
1396             }
1397
1398             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1399                 if (!pa_memblockq_is_readable(s->memblockq))
1400                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1401                 else {
1402                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1403                     s->drain_request = TRUE;
1404                 }
1405             }
1406
1407             return 0;
1408         }
1409
1410         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1411             /* Atomically get a snapshot of all timing parameters... */
1412             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1413             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1414             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1415             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1416             s->underrun_for = s->sink_input->thread_info.underrun_for;
1417             s->playing_for = s->sink_input->thread_info.playing_for;
1418
1419             return 0;
1420
1421         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1422             int64_t windex;
1423
1424             windex = pa_memblockq_get_write_index(s->memblockq);
1425
1426             pa_memblockq_prebuf_force(s->memblockq);
1427
1428             handle_seek(s, windex);
1429
1430             /* Fall through to the default handler */
1431             break;
1432         }
1433
1434         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1435             pa_usec_t *r = userdata;
1436
1437             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1438
1439             /* Fall through, the default handler will add in the extra
1440              * latency added by the resampler */
1441             break;
1442         }
1443
1444         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1445             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1446             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1447             return 0;
1448         }
1449     }
1450
1451     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1452 }
1453
1454 /* Called from thread context */
1455 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1456     playback_stream *s;
1457
1458     pa_sink_input_assert_ref(i);
1459     s = PLAYBACK_STREAM(i->userdata);
1460     playback_stream_assert_ref(s);
1461     pa_assert(chunk);
1462
1463 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1464
1465     if (pa_memblockq_is_readable(s->memblockq))
1466         s->is_underrun = FALSE;
1467     else {
1468         pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1469
1470         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1471             s->drain_request = FALSE;
1472             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1473         } else if (!s->is_underrun)
1474             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1475
1476         s->is_underrun = TRUE;
1477
1478         playback_stream_request_bytes(s);
1479     }
1480
1481     /* This call will not fail with prebuf=0, hence we check for
1482        underrun explicitly above */
1483     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1484         return -1;
1485
1486     chunk->length = PA_MIN(nbytes, chunk->length);
1487
1488     if (i->thread_info.underrun_for > 0)
1489         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1490
1491     pa_memblockq_drop(s->memblockq, chunk->length);
1492     playback_stream_request_bytes(s);
1493
1494     return 0;
1495 }
1496
1497 /* Called from thread context */
1498 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1499     playback_stream *s;
1500
1501     pa_sink_input_assert_ref(i);
1502     s = PLAYBACK_STREAM(i->userdata);
1503     playback_stream_assert_ref(s);
1504
1505     /* If we are in an underrun, then we don't rewind */
1506     if (i->thread_info.underrun_for > 0)
1507         return;
1508
1509     pa_memblockq_rewind(s->memblockq, nbytes);
1510 }
1511
1512 /* Called from thread context */
1513 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1514     playback_stream *s;
1515
1516     pa_sink_input_assert_ref(i);
1517     s = PLAYBACK_STREAM(i->userdata);
1518     playback_stream_assert_ref(s);
1519
1520     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1521 }
1522
1523 /* Called from thread context */
1524 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1525     playback_stream *s;
1526     size_t new_tlength, old_tlength;
1527
1528     pa_sink_input_assert_ref(i);
1529     s = PLAYBACK_STREAM(i->userdata);
1530     playback_stream_assert_ref(s);
1531
1532     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1533     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1534
1535     if (old_tlength < new_tlength) {
1536         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1537         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1538         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1539
1540         if (new_tlength == old_tlength)
1541             pa_log_debug("Failed to increase tlength");
1542         else {
1543             pa_log_debug("Notifying client about increased tlength");
1544             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1545         }
1546     }
1547 }
1548
1549 /* Called from main context */
1550 static void sink_input_kill_cb(pa_sink_input *i) {
1551     playback_stream *s;
1552
1553     pa_sink_input_assert_ref(i);
1554     s = PLAYBACK_STREAM(i->userdata);
1555     playback_stream_assert_ref(s);
1556
1557     playback_stream_send_killed(s);
1558     playback_stream_unlink(s);
1559 }
1560
1561 /* Called from main context */
1562 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1563     playback_stream *s;
1564     pa_tagstruct *t;
1565
1566     pa_sink_input_assert_ref(i);
1567     s = PLAYBACK_STREAM(i->userdata);
1568     playback_stream_assert_ref(s);
1569
1570     if (s->connection->version < 15)
1571       return;
1572
1573     t = pa_tagstruct_new(NULL, 0);
1574     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1575     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1576     pa_tagstruct_putu32(t, s->index);
1577     pa_tagstruct_puts(t, event);
1578     pa_tagstruct_put_proplist(t, pl);
1579     pa_pstream_send_tagstruct(s->connection->pstream, t);
1580 }
1581
1582 /* Called from main context */
1583 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1584     playback_stream *s;
1585     pa_tagstruct *t;
1586
1587     pa_sink_input_assert_ref(i);
1588     s = PLAYBACK_STREAM(i->userdata);
1589     playback_stream_assert_ref(s);
1590
1591     if (s->connection->version < 12)
1592       return;
1593
1594     t = pa_tagstruct_new(NULL, 0);
1595     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1596     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1597     pa_tagstruct_putu32(t, s->index);
1598     pa_tagstruct_put_boolean(t, suspend);
1599     pa_pstream_send_tagstruct(s->connection->pstream, t);
1600 }
1601
1602 /* Called from main context */
1603 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1604     playback_stream *s;
1605     pa_tagstruct *t;
1606
1607     pa_sink_input_assert_ref(i);
1608     s = PLAYBACK_STREAM(i->userdata);
1609     playback_stream_assert_ref(s);
1610
1611     fix_playback_buffer_attr(s);
1612     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1613     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1614
1615     if (s->connection->version < 12)
1616       return;
1617
1618     t = pa_tagstruct_new(NULL, 0);
1619     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1620     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1621     pa_tagstruct_putu32(t, s->index);
1622     pa_tagstruct_putu32(t, dest->index);
1623     pa_tagstruct_puts(t, dest->name);
1624     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1625
1626     if (s->connection->version >= 13) {
1627         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1628         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1629         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1630         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1631         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1632     }
1633
1634     pa_pstream_send_tagstruct(s->connection->pstream, t);
1635 }
1636
1637 /*** source_output callbacks ***/
1638
1639 /* Called from thread context */
1640 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1641     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1642     record_stream *s;
1643
1644     pa_source_output_assert_ref(o);
1645     s = RECORD_STREAM(o->userdata);
1646     record_stream_assert_ref(s);
1647
1648     switch (code) {
1649         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1650             /* Atomically get a snapshot of all timing parameters... */
1651             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1652             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1653             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1654             return 0;
1655     }
1656
1657     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1658 }
1659
1660 /* Called from thread context */
1661 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1662     record_stream *s;
1663
1664     pa_source_output_assert_ref(o);
1665     s = RECORD_STREAM(o->userdata);
1666     record_stream_assert_ref(s);
1667     pa_assert(chunk);
1668
1669     pa_atomic_add(&s->on_the_fly, chunk->length);
1670     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1671 }
1672
1673 static void source_output_kill_cb(pa_source_output *o) {
1674     record_stream *s;
1675
1676     pa_source_output_assert_ref(o);
1677     s = RECORD_STREAM(o->userdata);
1678     record_stream_assert_ref(s);
1679
1680     record_stream_send_killed(s);
1681     record_stream_unlink(s);
1682 }
1683
1684 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1685     record_stream *s;
1686
1687     pa_source_output_assert_ref(o);
1688     s = RECORD_STREAM(o->userdata);
1689     record_stream_assert_ref(s);
1690
1691     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1692
1693     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1694 }
1695
1696 /* Called from main context */
1697 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1698     record_stream *s;
1699     pa_tagstruct *t;
1700
1701     pa_source_output_assert_ref(o);
1702     s = RECORD_STREAM(o->userdata);
1703     record_stream_assert_ref(s);
1704
1705     if (s->connection->version < 15)
1706       return;
1707
1708     t = pa_tagstruct_new(NULL, 0);
1709     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1710     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1711     pa_tagstruct_putu32(t, s->index);
1712     pa_tagstruct_puts(t, event);
1713     pa_tagstruct_put_proplist(t, pl);
1714     pa_pstream_send_tagstruct(s->connection->pstream, t);
1715 }
1716
1717 /* Called from main context */
1718 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1719     record_stream *s;
1720     pa_tagstruct *t;
1721
1722     pa_source_output_assert_ref(o);
1723     s = RECORD_STREAM(o->userdata);
1724     record_stream_assert_ref(s);
1725
1726     if (s->connection->version < 12)
1727       return;
1728
1729     t = pa_tagstruct_new(NULL, 0);
1730     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1731     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1732     pa_tagstruct_putu32(t, s->index);
1733     pa_tagstruct_put_boolean(t, suspend);
1734     pa_pstream_send_tagstruct(s->connection->pstream, t);
1735 }
1736
1737 /* Called from main context */
1738 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1739     record_stream *s;
1740     pa_tagstruct *t;
1741
1742     pa_source_output_assert_ref(o);
1743     s = RECORD_STREAM(o->userdata);
1744     record_stream_assert_ref(s);
1745
1746     fix_record_buffer_attr_pre(s);
1747     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1748     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1749     fix_record_buffer_attr_post(s);
1750
1751     if (s->connection->version < 12)
1752       return;
1753
1754     t = pa_tagstruct_new(NULL, 0);
1755     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1756     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1757     pa_tagstruct_putu32(t, s->index);
1758     pa_tagstruct_putu32(t, dest->index);
1759     pa_tagstruct_puts(t, dest->name);
1760     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1761
1762     if (s->connection->version >= 13) {
1763         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1764         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1765         pa_tagstruct_put_usec(t, s->configured_source_latency);
1766     }
1767
1768     pa_pstream_send_tagstruct(s->connection->pstream, t);
1769 }
1770
1771 /*** pdispatch callbacks ***/
1772
1773 static void protocol_error(pa_native_connection *c) {
1774     pa_log("protocol error, kicking client");
1775     native_connection_unlink(c);
1776 }
1777
1778 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1779 if (!(expression)) { \
1780     pa_pstream_send_error((pstream), (tag), (error)); \
1781     return; \
1782 } \
1783 } while(0);
1784
1785 static pa_tagstruct *reply_new(uint32_t tag) {
1786     pa_tagstruct *reply;
1787
1788     reply = pa_tagstruct_new(NULL, 0);
1789     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1790     pa_tagstruct_putu32(reply, tag);
1791     return reply;
1792 }
1793
1794 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1795     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1796     playback_stream *s;
1797     uint32_t sink_index, syncid, missing;
1798     pa_buffer_attr attr;
1799     const char *name = NULL, *sink_name;
1800     pa_sample_spec ss;
1801     pa_channel_map map;
1802     pa_tagstruct *reply;
1803     pa_sink *sink = NULL;
1804     pa_cvolume volume;
1805     pa_bool_t
1806         corked = FALSE,
1807         no_remap = FALSE,
1808         no_remix = FALSE,
1809         fix_format = FALSE,
1810         fix_rate = FALSE,
1811         fix_channels = FALSE,
1812         no_move = FALSE,
1813         variable_rate = FALSE,
1814         muted = FALSE,
1815         adjust_latency = FALSE,
1816         early_requests = FALSE,
1817         dont_inhibit_auto_suspend = FALSE,
1818         muted_set = FALSE,
1819         fail_on_suspend = FALSE;
1820     pa_sink_input_flags_t flags = 0;
1821     pa_proplist *p;
1822     pa_bool_t volume_set = TRUE;
1823     int ret = PA_ERR_INVALID;
1824
1825     pa_native_connection_assert_ref(c);
1826     pa_assert(t);
1827     memset(&attr, 0, sizeof(attr));
1828
1829     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1830         pa_tagstruct_get(
1831                 t,
1832                 PA_TAG_SAMPLE_SPEC, &ss,
1833                 PA_TAG_CHANNEL_MAP, &map,
1834                 PA_TAG_U32, &sink_index,
1835                 PA_TAG_STRING, &sink_name,
1836                 PA_TAG_U32, &attr.maxlength,
1837                 PA_TAG_BOOLEAN, &corked,
1838                 PA_TAG_U32, &attr.tlength,
1839                 PA_TAG_U32, &attr.prebuf,
1840                 PA_TAG_U32, &attr.minreq,
1841                 PA_TAG_U32, &syncid,
1842                 PA_TAG_CVOLUME, &volume,
1843                 PA_TAG_INVALID) < 0) {
1844
1845         protocol_error(c);
1846         return;
1847     }
1848
1849     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1850     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1851     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1852     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1853     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1854     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1855     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1856     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1857
1858     p = pa_proplist_new();
1859
1860     if (name)
1861         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1862
1863     if (c->version >= 12)  {
1864         /* Since 0.9.8 the user can ask for a couple of additional flags */
1865
1866         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1867             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1868             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1869             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1870             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1871             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1872             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1873
1874             protocol_error(c);
1875             pa_proplist_free(p);
1876             return;
1877         }
1878     }
1879
1880     if (c->version >= 13) {
1881
1882         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1883             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1884             pa_tagstruct_get_proplist(t, p) < 0) {
1885             protocol_error(c);
1886             pa_proplist_free(p);
1887             return;
1888         }
1889     }
1890
1891     if (c->version >= 14) {
1892
1893         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1894             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1895             protocol_error(c);
1896             pa_proplist_free(p);
1897             return;
1898         }
1899     }
1900
1901     if (c->version >= 15) {
1902
1903         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1904             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1905             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1906             protocol_error(c);
1907             pa_proplist_free(p);
1908             return;
1909         }
1910     }
1911
1912     if (!pa_tagstruct_eof(t)) {
1913         protocol_error(c);
1914         pa_proplist_free(p);
1915         return;
1916     }
1917
1918     if (sink_index != PA_INVALID_INDEX) {
1919
1920         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1921             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1922             pa_proplist_free(p);
1923             return;
1924         }
1925
1926     } else if (sink_name) {
1927
1928         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1929             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1930             pa_proplist_free(p);
1931             return;
1932         }
1933     }
1934
1935     flags =
1936         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1937         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1938         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1939         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1940         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1941         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1942         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1943         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1944         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1945         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1946
1947     /* Only since protocol version 15 there's a seperate muted_set
1948      * flag. For older versions we synthesize it here */
1949     muted_set = muted_set || muted;
1950
1951     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1952     pa_proplist_free(p);
1953
1954     CHECK_VALIDITY(c->pstream, s, tag, ret);
1955
1956     reply = reply_new(tag);
1957     pa_tagstruct_putu32(reply, s->index);
1958     pa_assert(s->sink_input);
1959     pa_tagstruct_putu32(reply, s->sink_input->index);
1960     pa_tagstruct_putu32(reply, missing);
1961
1962 /*     pa_log("initial request is %u", missing); */
1963
1964     if (c->version >= 9) {
1965         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1966
1967         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1968         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1969         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1970         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1971     }
1972
1973     if (c->version >= 12) {
1974         /* Since 0.9.8 we support sending the chosen sample
1975          * spec/channel map/device/suspend status back to the
1976          * client */
1977
1978         pa_tagstruct_put_sample_spec(reply, &ss);
1979         pa_tagstruct_put_channel_map(reply, &map);
1980
1981         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1982         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1983
1984         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
1985     }
1986
1987     if (c->version >= 13)
1988         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
1989
1990     pa_pstream_send_tagstruct(c->pstream, reply);
1991 }
1992
1993 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1994     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1995     uint32_t channel;
1996
1997     pa_native_connection_assert_ref(c);
1998     pa_assert(t);
1999
2000     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2001         !pa_tagstruct_eof(t)) {
2002         protocol_error(c);
2003         return;
2004     }
2005
2006     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2007
2008     switch (command) {
2009
2010         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2011             playback_stream *s;
2012             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2013                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2014                 return;
2015             }
2016
2017             playback_stream_unlink(s);
2018             break;
2019         }
2020
2021         case PA_COMMAND_DELETE_RECORD_STREAM: {
2022             record_stream *s;
2023             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2024                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2025                 return;
2026             }
2027
2028             record_stream_unlink(s);
2029             break;
2030         }
2031
2032         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2033             upload_stream *s;
2034
2035             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2036                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2037                 return;
2038             }
2039
2040             upload_stream_unlink(s);
2041             break;
2042         }
2043
2044         default:
2045             pa_assert_not_reached();
2046     }
2047
2048     pa_pstream_send_simple_ack(c->pstream, tag);
2049 }
2050
2051 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2052     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2053     record_stream *s;
2054     pa_buffer_attr attr;
2055     uint32_t source_index;
2056     const char *name = NULL, *source_name;
2057     pa_sample_spec ss;
2058     pa_channel_map map;
2059     pa_tagstruct *reply;
2060     pa_source *source = NULL;
2061     pa_bool_t
2062         corked = FALSE,
2063         no_remap = FALSE,
2064         no_remix = FALSE,
2065         fix_format = FALSE,
2066         fix_rate = FALSE,
2067         fix_channels = FALSE,
2068         no_move = FALSE,
2069         variable_rate = FALSE,
2070         adjust_latency = FALSE,
2071         peak_detect = FALSE,
2072         early_requests = FALSE,
2073         dont_inhibit_auto_suspend = FALSE,
2074         fail_on_suspend = FALSE;
2075     pa_source_output_flags_t flags = 0;
2076     pa_proplist *p;
2077     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2078     pa_sink_input *direct_on_input = NULL;
2079     int ret = PA_ERR_INVALID;
2080
2081     pa_native_connection_assert_ref(c);
2082     pa_assert(t);
2083
2084     memset(&attr, 0, sizeof(attr));
2085
2086     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2087         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2088         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2089         pa_tagstruct_getu32(t, &source_index) < 0 ||
2090         pa_tagstruct_gets(t, &source_name) < 0 ||
2091         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2092         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2093         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2094         protocol_error(c);
2095         return;
2096     }
2097
2098     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2099     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2100     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2101     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2102     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2103     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2104     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2105
2106     p = pa_proplist_new();
2107
2108     if (name)
2109         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2110
2111     if (c->version >= 12)  {
2112         /* Since 0.9.8 the user can ask for a couple of additional flags */
2113
2114         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2115             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2116             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2117             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2118             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2119             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2120             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2121
2122             protocol_error(c);
2123             pa_proplist_free(p);
2124             return;
2125         }
2126     }
2127
2128     if (c->version >= 13) {
2129
2130         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2131             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2132             pa_tagstruct_get_proplist(t, p) < 0 ||
2133             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2134             protocol_error(c);
2135             pa_proplist_free(p);
2136             return;
2137         }
2138     }
2139
2140     if (c->version >= 14) {
2141
2142         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2143             protocol_error(c);
2144             pa_proplist_free(p);
2145             return;
2146         }
2147     }
2148
2149     if (c->version >= 15) {
2150
2151         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2152             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2153             protocol_error(c);
2154             pa_proplist_free(p);
2155             return;
2156         }
2157     }
2158
2159     if (!pa_tagstruct_eof(t)) {
2160         protocol_error(c);
2161         pa_proplist_free(p);
2162         return;
2163     }
2164
2165     if (source_index != PA_INVALID_INDEX) {
2166
2167         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2168             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2169             pa_proplist_free(p);
2170             return;
2171         }
2172
2173     } else if (source_name) {
2174
2175         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2176             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2177             pa_proplist_free(p);
2178             return;
2179         }
2180     }
2181
2182     if (direct_on_input_idx != PA_INVALID_INDEX) {
2183
2184         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2185             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2186             pa_proplist_free(p);
2187             return;
2188         }
2189     }
2190
2191     flags =
2192         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2193         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2194         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2195         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2196         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2197         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2198         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2199         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2200         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2201         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2202
2203     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2204     pa_proplist_free(p);
2205
2206     CHECK_VALIDITY(c->pstream, s, tag, ret);
2207
2208     reply = reply_new(tag);
2209     pa_tagstruct_putu32(reply, s->index);
2210     pa_assert(s->source_output);
2211     pa_tagstruct_putu32(reply, s->source_output->index);
2212
2213     if (c->version >= 9) {
2214         /* Since 0.9 we support sending the buffer metrics back to the client */
2215
2216         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2217         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2218     }
2219
2220     if (c->version >= 12) {
2221         /* Since 0.9.8 we support sending the chosen sample
2222          * spec/channel map/device/suspend status back to the
2223          * client */
2224
2225         pa_tagstruct_put_sample_spec(reply, &ss);
2226         pa_tagstruct_put_channel_map(reply, &map);
2227
2228         pa_tagstruct_putu32(reply, s->source_output->source->index);
2229         pa_tagstruct_puts(reply, s->source_output->source->name);
2230
2231         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2232     }
2233
2234     if (c->version >= 13)
2235         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2236
2237     pa_pstream_send_tagstruct(c->pstream, reply);
2238 }
2239
2240 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2241     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2242     int ret;
2243
2244     pa_native_connection_assert_ref(c);
2245     pa_assert(t);
2246
2247     if (!pa_tagstruct_eof(t)) {
2248         protocol_error(c);
2249         return;
2250     }
2251
2252     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2253     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2254     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2255
2256     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2257 }
2258
2259 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2260     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2261     const void*cookie;
2262     pa_tagstruct *reply;
2263     pa_bool_t shm_on_remote = FALSE, do_shm;
2264
2265     pa_native_connection_assert_ref(c);
2266     pa_assert(t);
2267
2268     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2269         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2270         !pa_tagstruct_eof(t)) {
2271         protocol_error(c);
2272         return;
2273     }
2274
2275     /* Minimum supported version */
2276     if (c->version < 8) {
2277         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2278         return;
2279     }
2280
2281     /* Starting with protocol version 13 the MSB of the version tag
2282        reflects if shm is available for this pa_native_connection or
2283        not. */
2284     if (c->version >= 13) {
2285         shm_on_remote = !!(c->version & 0x80000000U);
2286         c->version &= 0x7FFFFFFFU;
2287     }
2288
2289     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2290
2291     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2292
2293     if (!c->authorized) {
2294         pa_bool_t success = FALSE;
2295
2296 #ifdef HAVE_CREDS
2297         const pa_creds *creds;
2298
2299         if ((creds = pa_pdispatch_creds(pd))) {
2300             if (creds->uid == getuid())
2301                 success = TRUE;
2302             else if (c->options->auth_group) {
2303                 int r;
2304                 gid_t gid;
2305
2306                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2307                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2308                 else if (gid == creds->gid)
2309                     success = TRUE;
2310
2311                 if (!success) {
2312                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2313                         pa_log_warn("Failed to check group membership.");
2314                     else if (r > 0)
2315                         success = TRUE;
2316                 }
2317             }
2318
2319             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2320                         (unsigned long) creds->uid,
2321                         (unsigned long) creds->gid,
2322                         (int) success);
2323         }
2324 #endif
2325
2326         if (!success && c->options->auth_cookie) {
2327             const uint8_t *ac;
2328
2329             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2330                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2331                     success = TRUE;
2332         }
2333
2334         if (!success) {
2335             pa_log_warn("Denied access to client with invalid authorization data.");
2336             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2337             return;
2338         }
2339
2340         c->authorized = TRUE;
2341         if (c->auth_timeout_event) {
2342             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2343             c->auth_timeout_event = NULL;
2344         }
2345     }
2346
2347     /* Enable shared memory support if possible */
2348     do_shm =
2349         pa_mempool_is_shared(c->protocol->core->mempool) &&
2350         c->is_local;
2351
2352     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2353
2354     if (do_shm)
2355         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2356             do_shm = FALSE;
2357
2358 #ifdef HAVE_CREDS
2359     if (do_shm) {
2360         /* Only enable SHM if both sides are owned by the same
2361          * user. This is a security measure because otherwise data
2362          * private to the user might leak. */
2363
2364         const pa_creds *creds;
2365         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2366             do_shm = FALSE;
2367     }
2368 #endif
2369
2370     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2371     pa_pstream_enable_shm(c->pstream, do_shm);
2372
2373     reply = reply_new(tag);
2374     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2375
2376 #ifdef HAVE_CREDS
2377 {
2378     /* SHM support is only enabled after both sides made sure they are the same user. */
2379
2380     pa_creds ucred;
2381
2382     ucred.uid = getuid();
2383     ucred.gid = getgid();
2384
2385     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2386 }
2387 #else
2388     pa_pstream_send_tagstruct(c->pstream, reply);
2389 #endif
2390 }
2391
2392 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2393     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2394     const char *name = NULL;
2395     pa_proplist *p;
2396     pa_tagstruct *reply;
2397
2398     pa_native_connection_assert_ref(c);
2399     pa_assert(t);
2400
2401     p = pa_proplist_new();
2402
2403     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2404         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2405         !pa_tagstruct_eof(t)) {
2406
2407         protocol_error(c);
2408         pa_proplist_free(p);
2409         return;
2410     }
2411
2412     if (name)
2413         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2414             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2415             pa_proplist_free(p);
2416             return;
2417         }
2418
2419     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2420     pa_proplist_free(p);
2421
2422     reply = reply_new(tag);
2423
2424     if (c->version >= 13)
2425         pa_tagstruct_putu32(reply, c->client->index);
2426
2427     pa_pstream_send_tagstruct(c->pstream, reply);
2428 }
2429
2430 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2431     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2432     const char *name;
2433     uint32_t idx = PA_IDXSET_INVALID;
2434
2435     pa_native_connection_assert_ref(c);
2436     pa_assert(t);
2437
2438     if (pa_tagstruct_gets(t, &name) < 0 ||
2439         !pa_tagstruct_eof(t)) {
2440         protocol_error(c);
2441         return;
2442     }
2443
2444     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2445     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2446
2447     if (command == PA_COMMAND_LOOKUP_SINK) {
2448         pa_sink *sink;
2449         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2450             idx = sink->index;
2451     } else {
2452         pa_source *source;
2453         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2454         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2455             idx = source->index;
2456     }
2457
2458     if (idx == PA_IDXSET_INVALID)
2459         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2460     else {
2461         pa_tagstruct *reply;
2462         reply = reply_new(tag);
2463         pa_tagstruct_putu32(reply, idx);
2464         pa_pstream_send_tagstruct(c->pstream, reply);
2465     }
2466 }
2467
2468 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2469     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2470     uint32_t idx;
2471     playback_stream *s;
2472
2473     pa_native_connection_assert_ref(c);
2474     pa_assert(t);
2475
2476     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2477         !pa_tagstruct_eof(t)) {
2478         protocol_error(c);
2479         return;
2480     }
2481
2482     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2483     s = pa_idxset_get_by_index(c->output_streams, idx);
2484     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2485     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2486
2487     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2488 }
2489
2490 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2491     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2492     pa_tagstruct *reply;
2493     const pa_mempool_stat *stat;
2494
2495     pa_native_connection_assert_ref(c);
2496     pa_assert(t);
2497
2498     if (!pa_tagstruct_eof(t)) {
2499         protocol_error(c);
2500         return;
2501     }
2502
2503     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2504
2505     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2506
2507     reply = reply_new(tag);
2508     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2509     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2510     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2511     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2512     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2513     pa_pstream_send_tagstruct(c->pstream, reply);
2514 }
2515
2516 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2517     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2518     pa_tagstruct *reply;
2519     playback_stream *s;
2520     struct timeval tv, now;
2521     uint32_t idx;
2522
2523     pa_native_connection_assert_ref(c);
2524     pa_assert(t);
2525
2526     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2527         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2528         !pa_tagstruct_eof(t)) {
2529         protocol_error(c);
2530         return;
2531     }
2532
2533     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2534     s = pa_idxset_get_by_index(c->output_streams, idx);
2535     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2536     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2537
2538     /* Get an atomic snapshot of all timing parameters */
2539     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2540
2541     reply = reply_new(tag);
2542     pa_tagstruct_put_usec(reply,
2543                           s->current_sink_latency +
2544                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec));
2545     pa_tagstruct_put_usec(reply, 0);
2546     pa_tagstruct_put_boolean(reply,
2547                              s->playing_for > 0 &&
2548                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2549                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2550     pa_tagstruct_put_timeval(reply, &tv);
2551     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2552     pa_tagstruct_puts64(reply, s->write_index);
2553     pa_tagstruct_puts64(reply, s->read_index);
2554
2555     if (c->version >= 13) {
2556         pa_tagstruct_putu64(reply, s->underrun_for);
2557         pa_tagstruct_putu64(reply, s->playing_for);
2558     }
2559
2560     pa_pstream_send_tagstruct(c->pstream, reply);
2561 }
2562
2563 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2564     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2565     pa_tagstruct *reply;
2566     record_stream *s;
2567     struct timeval tv, now;
2568     uint32_t idx;
2569
2570     pa_native_connection_assert_ref(c);
2571     pa_assert(t);
2572
2573     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2574         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2575         !pa_tagstruct_eof(t)) {
2576         protocol_error(c);
2577         return;
2578     }
2579
2580     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2581     s = pa_idxset_get_by_index(c->record_streams, idx);
2582     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2583
2584     /* Get an atomic snapshot of all timing parameters */
2585     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2586
2587     reply = reply_new(tag);
2588     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2589     pa_tagstruct_put_usec(reply,
2590                           s->current_source_latency +
2591                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2592     pa_tagstruct_put_boolean(reply,
2593                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2594                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2595     pa_tagstruct_put_timeval(reply, &tv);
2596     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2597     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2598     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2599     pa_pstream_send_tagstruct(c->pstream, reply);
2600 }
2601
2602 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2603     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2604     upload_stream *s;
2605     uint32_t length;
2606     const char *name = NULL;
2607     pa_sample_spec ss;
2608     pa_channel_map map;
2609     pa_tagstruct *reply;
2610     pa_proplist *p;
2611
2612     pa_native_connection_assert_ref(c);
2613     pa_assert(t);
2614
2615     if (pa_tagstruct_gets(t, &name) < 0 ||
2616         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2617         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2618         pa_tagstruct_getu32(t, &length) < 0) {
2619         protocol_error(c);
2620         return;
2621     }
2622
2623     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2624     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2625     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2626     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2627     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2628     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2629
2630     p = pa_proplist_new();
2631
2632     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2633         !pa_tagstruct_eof(t)) {
2634
2635         protocol_error(c);
2636         pa_proplist_free(p);
2637         return;
2638     }
2639
2640     if (c->version < 13)
2641         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2642     else if (!name)
2643         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2644             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2645
2646     if (!name || !pa_namereg_is_valid_name(name)) {
2647         pa_proplist_free(p);
2648         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2649     }
2650
2651     s = upload_stream_new(c, &ss, &map, name, length, p);
2652     pa_proplist_free(p);
2653
2654     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2655
2656     reply = reply_new(tag);
2657     pa_tagstruct_putu32(reply, s->index);
2658     pa_tagstruct_putu32(reply, length);
2659     pa_pstream_send_tagstruct(c->pstream, reply);
2660 }
2661
2662 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2663     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2664     uint32_t channel;
2665     upload_stream *s;
2666     uint32_t idx;
2667
2668     pa_native_connection_assert_ref(c);
2669     pa_assert(t);
2670
2671     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2672         !pa_tagstruct_eof(t)) {
2673         protocol_error(c);
2674         return;
2675     }
2676
2677     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2678
2679     s = pa_idxset_get_by_index(c->output_streams, channel);
2680     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2681     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2682
2683     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2684         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2685     else
2686         pa_pstream_send_simple_ack(c->pstream, tag);
2687
2688     upload_stream_unlink(s);
2689 }
2690
2691 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2692     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2693     uint32_t sink_index;
2694     pa_volume_t volume;
2695     pa_sink *sink;
2696     const char *name, *sink_name;
2697     uint32_t idx;
2698     pa_proplist *p;
2699     pa_tagstruct *reply;
2700
2701     pa_native_connection_assert_ref(c);
2702     pa_assert(t);
2703
2704     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2705
2706     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2707         pa_tagstruct_gets(t, &sink_name) < 0 ||
2708         pa_tagstruct_getu32(t, &volume) < 0 ||
2709         pa_tagstruct_gets(t, &name) < 0) {
2710         protocol_error(c);
2711         return;
2712     }
2713
2714     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2715     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2716     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2717     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2718
2719     if (sink_index != PA_INVALID_INDEX)
2720         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2721     else
2722         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2723
2724     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2725
2726     p = pa_proplist_new();
2727
2728     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2729         !pa_tagstruct_eof(t)) {
2730         protocol_error(c);
2731         pa_proplist_free(p);
2732         return;
2733     }
2734
2735     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2736
2737     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2738         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2739         pa_proplist_free(p);
2740         return;
2741     }
2742
2743     pa_proplist_free(p);
2744
2745     reply = reply_new(tag);
2746
2747     if (c->version >= 13)
2748         pa_tagstruct_putu32(reply, idx);
2749
2750     pa_pstream_send_tagstruct(c->pstream, reply);
2751 }
2752
2753 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2754     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2755     const char *name;
2756
2757     pa_native_connection_assert_ref(c);
2758     pa_assert(t);
2759
2760     if (pa_tagstruct_gets(t, &name) < 0 ||
2761         !pa_tagstruct_eof(t)) {
2762         protocol_error(c);
2763         return;
2764     }
2765
2766     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2767     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2768
2769     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2770         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2771         return;
2772     }
2773
2774     pa_pstream_send_simple_ack(c->pstream, tag);
2775 }
2776
2777 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2778     pa_assert(c);
2779     pa_assert(fixed);
2780     pa_assert(original);
2781
2782     *fixed = *original;
2783
2784     if (c->version < 12) {
2785         /* Before protocol version 12 we didn't support S32 samples,
2786          * so we need to lie about this to the client */
2787
2788         if (fixed->format == PA_SAMPLE_S32LE)
2789             fixed->format = PA_SAMPLE_FLOAT32LE;
2790         if (fixed->format == PA_SAMPLE_S32BE)
2791             fixed->format = PA_SAMPLE_FLOAT32BE;
2792     }
2793
2794     if (c->version < 15) {
2795         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2796             fixed->format = PA_SAMPLE_FLOAT32LE;
2797         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2798             fixed->format = PA_SAMPLE_FLOAT32BE;
2799     }
2800 }
2801
2802 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2803     pa_sample_spec fixed_ss;
2804
2805     pa_assert(t);
2806     pa_sink_assert_ref(sink);
2807
2808     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2809
2810     pa_tagstruct_put(
2811         t,
2812         PA_TAG_U32, sink->index,
2813         PA_TAG_STRING, sink->name,
2814         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2815         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2816         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2817         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2818         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
2819         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2820         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2821         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2822         PA_TAG_USEC, pa_sink_get_latency(sink),
2823         PA_TAG_STRING, sink->driver,
2824         PA_TAG_U32, sink->flags,
2825         PA_TAG_INVALID);
2826
2827     if (c->version >= 13) {
2828         pa_tagstruct_put_proplist(t, sink->proplist);
2829         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2830     }
2831
2832     if (c->version >= 15) {
2833         pa_tagstruct_put_volume(t, sink->base_volume);
2834         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2835             pa_log_error("Internal sink state is invalid.");
2836         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2837         pa_tagstruct_putu32(t, sink->n_volume_steps);
2838         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2839     }
2840 }
2841
2842 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2843     pa_sample_spec fixed_ss;
2844
2845     pa_assert(t);
2846     pa_source_assert_ref(source);
2847
2848     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2849
2850     pa_tagstruct_put(
2851         t,
2852         PA_TAG_U32, source->index,
2853         PA_TAG_STRING, source->name,
2854         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2855         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2856         PA_TAG_CHANNEL_MAP, &source->channel_map,
2857         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2858         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2859         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2860         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2861         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2862         PA_TAG_USEC, pa_source_get_latency(source),
2863         PA_TAG_STRING, source->driver,
2864         PA_TAG_U32, source->flags,
2865         PA_TAG_INVALID);
2866
2867     if (c->version >= 13) {
2868         pa_tagstruct_put_proplist(t, source->proplist);
2869         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2870     }
2871
2872     if (c->version >= 15) {
2873         pa_tagstruct_put_volume(t, source->base_volume);
2874         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2875             pa_log_error("Internal source state is invalid.");
2876         pa_tagstruct_putu32(t, pa_source_get_state(source));
2877         pa_tagstruct_putu32(t, source->n_volume_steps);
2878         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2879     }
2880 }
2881
2882 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2883     pa_assert(t);
2884     pa_assert(client);
2885
2886     pa_tagstruct_putu32(t, client->index);
2887     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2888     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2889     pa_tagstruct_puts(t, client->driver);
2890
2891     if (c->version >= 13)
2892         pa_tagstruct_put_proplist(t, client->proplist);
2893 }
2894
2895 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2896     void *state = NULL;
2897     pa_card_profile *p;
2898
2899     pa_assert(t);
2900     pa_assert(card);
2901
2902     pa_tagstruct_putu32(t, card->index);
2903     pa_tagstruct_puts(t, card->name);
2904     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2905     pa_tagstruct_puts(t, card->driver);
2906
2907     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2908
2909     if (card->profiles) {
2910         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2911             pa_tagstruct_puts(t, p->name);
2912             pa_tagstruct_puts(t, p->description);
2913             pa_tagstruct_putu32(t, p->n_sinks);
2914             pa_tagstruct_putu32(t, p->n_sources);
2915             pa_tagstruct_putu32(t, p->priority);
2916         }
2917     }
2918
2919     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2920     pa_tagstruct_put_proplist(t, card->proplist);
2921 }
2922
2923 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2924     pa_assert(t);
2925     pa_assert(module);
2926
2927     pa_tagstruct_putu32(t, module->index);
2928     pa_tagstruct_puts(t, module->name);
2929     pa_tagstruct_puts(t, module->argument);
2930     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2931
2932     if (c->version < 15)
2933         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2934
2935     if (c->version >= 15)
2936         pa_tagstruct_put_proplist(t, module->proplist);
2937 }
2938
2939 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2940     pa_sample_spec fixed_ss;
2941     pa_usec_t sink_latency;
2942
2943     pa_assert(t);
2944     pa_sink_input_assert_ref(s);
2945
2946     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2947
2948     pa_tagstruct_putu32(t, s->index);
2949     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2950     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2951     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2952     pa_tagstruct_putu32(t, s->sink->index);
2953     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2954     pa_tagstruct_put_channel_map(t, &s->channel_map);
2955     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s));
2956     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
2957     pa_tagstruct_put_usec(t, sink_latency);
2958     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
2959     pa_tagstruct_puts(t, s->driver);
2960     if (c->version >= 11)
2961         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
2962     if (c->version >= 13)
2963         pa_tagstruct_put_proplist(t, s->proplist);
2964 }
2965
2966 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
2967     pa_sample_spec fixed_ss;
2968     pa_usec_t source_latency;
2969
2970     pa_assert(t);
2971     pa_source_output_assert_ref(s);
2972
2973     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2974
2975     pa_tagstruct_putu32(t, s->index);
2976     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2977     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2978     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2979     pa_tagstruct_putu32(t, s->source->index);
2980     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2981     pa_tagstruct_put_channel_map(t, &s->channel_map);
2982     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
2983     pa_tagstruct_put_usec(t, source_latency);
2984     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
2985     pa_tagstruct_puts(t, s->driver);
2986
2987     if (c->version >= 13)
2988         pa_tagstruct_put_proplist(t, s->proplist);
2989 }
2990
2991 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
2992     pa_sample_spec fixed_ss;
2993     pa_cvolume v;
2994
2995     pa_assert(t);
2996     pa_assert(e);
2997
2998     if (e->memchunk.memblock)
2999         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3000     else
3001         memset(&fixed_ss, 0, sizeof(fixed_ss));
3002
3003     pa_tagstruct_putu32(t, e->index);
3004     pa_tagstruct_puts(t, e->name);
3005
3006     if (e->volume_is_set)
3007         v = e->volume;
3008     else
3009         pa_cvolume_init(&v);
3010
3011     pa_tagstruct_put_cvolume(t, &v);
3012     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3013     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3014     pa_tagstruct_put_channel_map(t, &e->channel_map);
3015     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3016     pa_tagstruct_put_boolean(t, e->lazy);
3017     pa_tagstruct_puts(t, e->filename);
3018
3019     if (c->version >= 13)
3020         pa_tagstruct_put_proplist(t, e->proplist);
3021 }
3022
3023 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3024     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3025     uint32_t idx;
3026     pa_sink *sink = NULL;
3027     pa_source *source = NULL;
3028     pa_client *client = NULL;
3029     pa_card *card = NULL;
3030     pa_module *module = NULL;
3031     pa_sink_input *si = NULL;
3032     pa_source_output *so = NULL;
3033     pa_scache_entry *sce = NULL;
3034     const char *name = NULL;
3035     pa_tagstruct *reply;
3036
3037     pa_native_connection_assert_ref(c);
3038     pa_assert(t);
3039
3040     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3041         (command != PA_COMMAND_GET_CLIENT_INFO &&
3042          command != PA_COMMAND_GET_MODULE_INFO &&
3043          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3044          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3045          pa_tagstruct_gets(t, &name) < 0) ||
3046         !pa_tagstruct_eof(t)) {
3047         protocol_error(c);
3048         return;
3049     }
3050
3051     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3052     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3053     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3054     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3055     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3056
3057     if (command == PA_COMMAND_GET_SINK_INFO) {
3058         if (idx != PA_INVALID_INDEX)
3059             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3060         else
3061             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3062     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3063         if (idx != PA_INVALID_INDEX)
3064             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3065         else
3066             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3067     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3068         if (idx != PA_INVALID_INDEX)
3069             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3070         else
3071             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3072     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3073         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3074     else if (command == PA_COMMAND_GET_MODULE_INFO)
3075         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3076     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3077         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3078     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3079         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3080     else {
3081         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3082         if (idx != PA_INVALID_INDEX)
3083             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3084         else
3085             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3086     }
3087
3088     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3089         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3090         return;
3091     }
3092
3093     reply = reply_new(tag);
3094     if (sink)
3095         sink_fill_tagstruct(c, reply, sink);
3096     else if (source)
3097         source_fill_tagstruct(c, reply, source);
3098     else if (client)
3099         client_fill_tagstruct(c, reply, client);
3100     else if (card)
3101         card_fill_tagstruct(c, reply, card);
3102     else if (module)
3103         module_fill_tagstruct(c, reply, module);
3104     else if (si)
3105         sink_input_fill_tagstruct(c, reply, si);
3106     else if (so)
3107         source_output_fill_tagstruct(c, reply, so);
3108     else
3109         scache_fill_tagstruct(c, reply, sce);
3110     pa_pstream_send_tagstruct(c->pstream, reply);
3111 }
3112
3113 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3114     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3115     pa_idxset *i;
3116     uint32_t idx;
3117     void *p;
3118     pa_tagstruct *reply;
3119
3120     pa_native_connection_assert_ref(c);
3121     pa_assert(t);
3122
3123     if (!pa_tagstruct_eof(t)) {
3124         protocol_error(c);
3125         return;
3126     }
3127
3128     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3129
3130     reply = reply_new(tag);
3131
3132     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3133         i = c->protocol->core->sinks;
3134     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3135         i = c->protocol->core->sources;
3136     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3137         i = c->protocol->core->clients;
3138     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3139         i = c->protocol->core->cards;
3140     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3141         i = c->protocol->core->modules;
3142     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3143         i = c->protocol->core->sink_inputs;
3144     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3145         i = c->protocol->core->source_outputs;
3146     else {
3147         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3148         i = c->protocol->core->scache;
3149     }
3150
3151     if (i) {
3152         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3153             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3154                 sink_fill_tagstruct(c, reply, p);
3155             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3156                 source_fill_tagstruct(c, reply, p);
3157             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3158                 client_fill_tagstruct(c, reply, p);
3159             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3160                 card_fill_tagstruct(c, reply, p);
3161             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3162                 module_fill_tagstruct(c, reply, p);
3163             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3164                 sink_input_fill_tagstruct(c, reply, p);
3165             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3166                 source_output_fill_tagstruct(c, reply, p);
3167             else {
3168                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3169                 scache_fill_tagstruct(c, reply, p);
3170             }
3171         }
3172     }
3173
3174     pa_pstream_send_tagstruct(c->pstream, reply);
3175 }
3176
3177 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3178     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3179     pa_tagstruct *reply;
3180     char txt[256];
3181     pa_sink *def_sink;
3182     pa_source *def_source;
3183     pa_sample_spec fixed_ss;
3184
3185     pa_native_connection_assert_ref(c);
3186     pa_assert(t);
3187
3188     if (!pa_tagstruct_eof(t)) {
3189         protocol_error(c);
3190         return;
3191     }
3192
3193     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3194
3195     reply = reply_new(tag);
3196     pa_tagstruct_puts(reply, PACKAGE_NAME);
3197     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3198     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
3199     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
3200
3201     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3202     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3203
3204     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3205     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3206     def_source = pa_namereg_get_default_source(c->protocol->core);
3207     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3208
3209     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3210
3211     if (c->version >= 15)
3212         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3213
3214     pa_pstream_send_tagstruct(c->pstream, reply);
3215 }
3216
3217 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3218     pa_tagstruct *t;
3219     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3220
3221     pa_native_connection_assert_ref(c);
3222
3223     t = pa_tagstruct_new(NULL, 0);
3224     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3225     pa_tagstruct_putu32(t, (uint32_t) -1);
3226     pa_tagstruct_putu32(t, e);
3227     pa_tagstruct_putu32(t, idx);
3228     pa_pstream_send_tagstruct(c->pstream, t);
3229 }
3230
3231 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3232     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3233     pa_subscription_mask_t m;
3234
3235     pa_native_connection_assert_ref(c);
3236     pa_assert(t);
3237
3238     if (pa_tagstruct_getu32(t, &m) < 0 ||
3239         !pa_tagstruct_eof(t)) {
3240         protocol_error(c);
3241         return;
3242     }
3243
3244     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3245     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3246
3247     if (c->subscription)
3248         pa_subscription_free(c->subscription);
3249
3250     if (m != 0) {
3251         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3252         pa_assert(c->subscription);
3253     } else
3254         c->subscription = NULL;
3255
3256     pa_pstream_send_simple_ack(c->pstream, tag);
3257 }
3258
3259 static void command_set_volume(
3260         pa_pdispatch *pd,
3261         uint32_t command,
3262         uint32_t tag,
3263         pa_tagstruct *t,
3264         void *userdata) {
3265
3266     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3267     uint32_t idx;
3268     pa_cvolume volume;
3269     pa_sink *sink = NULL;
3270     pa_source *source = NULL;
3271     pa_sink_input *si = NULL;
3272     const char *name = NULL;
3273
3274     pa_native_connection_assert_ref(c);
3275     pa_assert(t);
3276
3277     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3278         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3279         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3280         pa_tagstruct_get_cvolume(t, &volume) ||
3281         !pa_tagstruct_eof(t)) {
3282         protocol_error(c);
3283         return;
3284     }
3285
3286     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3287     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3288     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3289     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3290     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3291     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3292
3293     switch (command) {
3294
3295         case PA_COMMAND_SET_SINK_VOLUME:
3296             if (idx != PA_INVALID_INDEX)
3297                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3298             else
3299                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3300             break;
3301
3302         case PA_COMMAND_SET_SOURCE_VOLUME:
3303             if (idx != PA_INVALID_INDEX)
3304                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3305             else
3306                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3307             break;
3308
3309         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3310             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3311             break;
3312
3313         default:
3314             pa_assert_not_reached();
3315     }
3316
3317     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3318
3319     if (sink)
3320         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3321     else if (source)
3322         pa_source_set_volume(source, &volume);
3323     else if (si)
3324         pa_sink_input_set_volume(si, &volume, TRUE);
3325
3326     pa_pstream_send_simple_ack(c->pstream, tag);
3327 }
3328
3329 static void command_set_mute(
3330         pa_pdispatch *pd,
3331         uint32_t command,
3332         uint32_t tag,
3333         pa_tagstruct *t,
3334         void *userdata) {
3335
3336     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3337     uint32_t idx;
3338     pa_bool_t mute;
3339     pa_sink *sink = NULL;
3340     pa_source *source = NULL;
3341     pa_sink_input *si = NULL;
3342     const char *name = NULL;
3343
3344     pa_native_connection_assert_ref(c);
3345     pa_assert(t);
3346
3347     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3348         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3349         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3350         pa_tagstruct_get_boolean(t, &mute) ||
3351         !pa_tagstruct_eof(t)) {
3352         protocol_error(c);
3353         return;
3354     }
3355
3356     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3357     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3358     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3359     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3360     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3361
3362     switch (command) {
3363
3364         case PA_COMMAND_SET_SINK_MUTE:
3365
3366             if (idx != PA_INVALID_INDEX)
3367                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3368             else
3369                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3370
3371             break;
3372
3373         case PA_COMMAND_SET_SOURCE_MUTE:
3374             if (idx != PA_INVALID_INDEX)
3375                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3376             else
3377                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3378
3379             break;
3380
3381         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3382             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3383             break;
3384
3385         default:
3386             pa_assert_not_reached();
3387     }
3388
3389     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3390
3391     if (sink)
3392         pa_sink_set_mute(sink, mute);
3393     else if (source)
3394         pa_source_set_mute(source, mute);
3395     else if (si)
3396         pa_sink_input_set_mute(si, mute, TRUE);
3397
3398     pa_pstream_send_simple_ack(c->pstream, tag);
3399 }
3400
3401 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3402     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3403     uint32_t idx;
3404     pa_bool_t b;
3405     playback_stream *s;
3406
3407     pa_native_connection_assert_ref(c);
3408     pa_assert(t);
3409
3410     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3411         pa_tagstruct_get_boolean(t, &b) < 0 ||
3412         !pa_tagstruct_eof(t)) {
3413         protocol_error(c);
3414         return;
3415     }
3416
3417     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3418     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3419     s = pa_idxset_get_by_index(c->output_streams, idx);
3420     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3421     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3422
3423     pa_sink_input_cork(s->sink_input, b);
3424
3425     if (b)
3426         s->is_underrun = TRUE;
3427
3428     pa_pstream_send_simple_ack(c->pstream, tag);
3429 }
3430
3431 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3432     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3433     uint32_t idx;
3434     playback_stream *s;
3435
3436     pa_native_connection_assert_ref(c);
3437     pa_assert(t);
3438
3439     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3440         !pa_tagstruct_eof(t)) {
3441         protocol_error(c);
3442         return;
3443     }
3444
3445     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3446     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3447     s = pa_idxset_get_by_index(c->output_streams, idx);
3448     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3449     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3450
3451     switch (command) {
3452         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3453             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3454             break;
3455
3456         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3457             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3458             break;
3459
3460         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3461             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3462             break;
3463
3464         default:
3465             pa_assert_not_reached();
3466     }
3467
3468     pa_pstream_send_simple_ack(c->pstream, tag);
3469 }
3470
3471 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3472     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3473     uint32_t idx;
3474     record_stream *s;
3475     pa_bool_t b;
3476
3477     pa_native_connection_assert_ref(c);
3478     pa_assert(t);
3479
3480     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3481         pa_tagstruct_get_boolean(t, &b) < 0 ||
3482         !pa_tagstruct_eof(t)) {
3483         protocol_error(c);
3484         return;
3485     }
3486
3487     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3488     s = pa_idxset_get_by_index(c->record_streams, idx);
3489     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3490
3491     pa_source_output_cork(s->source_output, b);
3492     pa_memblockq_prebuf_force(s->memblockq);
3493     pa_pstream_send_simple_ack(c->pstream, tag);
3494 }
3495
3496 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3497     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3498     uint32_t idx;
3499     record_stream *s;
3500
3501     pa_native_connection_assert_ref(c);
3502     pa_assert(t);
3503
3504     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3505         !pa_tagstruct_eof(t)) {
3506         protocol_error(c);
3507         return;
3508     }
3509
3510     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3511     s = pa_idxset_get_by_index(c->record_streams, idx);
3512     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3513
3514     pa_memblockq_flush_read(s->memblockq);
3515     pa_pstream_send_simple_ack(c->pstream, tag);
3516 }
3517
3518 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3519     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3520     uint32_t idx;
3521     pa_buffer_attr a;
3522     pa_tagstruct *reply;
3523
3524     pa_native_connection_assert_ref(c);
3525     pa_assert(t);
3526
3527     memset(&a, 0, sizeof(a));
3528
3529     if (pa_tagstruct_getu32(t, &idx) < 0) {
3530         protocol_error(c);
3531         return;
3532     }
3533
3534     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3535
3536     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3537         playback_stream *s;
3538         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3539
3540         s = pa_idxset_get_by_index(c->output_streams, idx);
3541         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3542         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3543
3544         if (pa_tagstruct_get(
3545                     t,
3546                     PA_TAG_U32, &a.maxlength,
3547                     PA_TAG_U32, &a.tlength,
3548                     PA_TAG_U32, &a.prebuf,
3549                     PA_TAG_U32, &a.minreq,
3550                     PA_TAG_INVALID) < 0 ||
3551             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3552             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3553             !pa_tagstruct_eof(t)) {
3554             protocol_error(c);
3555             return;
3556         }
3557
3558         s->adjust_latency = adjust_latency;
3559         s->early_requests = early_requests;
3560         s->buffer_attr = a;
3561
3562         fix_playback_buffer_attr(s);
3563         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3564
3565         reply = reply_new(tag);
3566         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3567         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3568         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3569         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3570
3571         if (c->version >= 13)
3572             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3573
3574     } else {
3575         record_stream *s;
3576         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3577         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3578
3579         s = pa_idxset_get_by_index(c->record_streams, idx);
3580         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3581
3582         if (pa_tagstruct_get(
3583                     t,
3584                     PA_TAG_U32, &a.maxlength,
3585                     PA_TAG_U32, &a.fragsize,
3586                     PA_TAG_INVALID) < 0 ||
3587             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3588             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3589             !pa_tagstruct_eof(t)) {
3590             protocol_error(c);
3591             return;
3592         }
3593
3594         s->adjust_latency = adjust_latency;
3595         s->early_requests = early_requests;
3596         s->buffer_attr = a;
3597
3598         fix_record_buffer_attr_pre(s);
3599         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3600         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3601         fix_record_buffer_attr_post(s);
3602
3603         reply = reply_new(tag);
3604         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3605         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3606
3607         if (c->version >= 13)
3608             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3609     }
3610
3611     pa_pstream_send_tagstruct(c->pstream, reply);
3612 }
3613
3614 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3615     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3616     uint32_t idx;
3617     uint32_t rate;
3618
3619     pa_native_connection_assert_ref(c);
3620     pa_assert(t);
3621
3622     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3623         pa_tagstruct_getu32(t, &rate) < 0 ||
3624         !pa_tagstruct_eof(t)) {
3625         protocol_error(c);
3626         return;
3627     }
3628
3629     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3630     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3631
3632     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3633         playback_stream *s;
3634
3635         s = pa_idxset_get_by_index(c->output_streams, idx);
3636         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3637         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3638
3639         pa_sink_input_set_rate(s->sink_input, rate);
3640
3641     } else {
3642         record_stream *s;
3643         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3644
3645         s = pa_idxset_get_by_index(c->record_streams, idx);
3646         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3647
3648         pa_source_output_set_rate(s->source_output, rate);
3649     }
3650
3651     pa_pstream_send_simple_ack(c->pstream, tag);
3652 }
3653
3654 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3655     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3656     uint32_t idx;
3657     uint32_t mode;
3658     pa_proplist *p;
3659
3660     pa_native_connection_assert_ref(c);
3661     pa_assert(t);
3662
3663     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3664
3665     p = pa_proplist_new();
3666
3667     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3668
3669         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3670             pa_tagstruct_get_proplist(t, p) < 0 ||
3671             !pa_tagstruct_eof(t)) {
3672             protocol_error(c);
3673             pa_proplist_free(p);
3674             return;
3675         }
3676
3677     } else {
3678
3679         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3680             pa_tagstruct_getu32(t, &mode) < 0 ||
3681             pa_tagstruct_get_proplist(t, p) < 0 ||
3682             !pa_tagstruct_eof(t)) {
3683             protocol_error(c);
3684             pa_proplist_free(p);
3685             return;
3686         }
3687     }
3688
3689     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3690         pa_proplist_free(p);
3691         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3692     }
3693
3694     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3695         playback_stream *s;
3696
3697         s = pa_idxset_get_by_index(c->output_streams, idx);
3698         if (!s || !playback_stream_isinstance(s)) {
3699             pa_proplist_free(p);
3700             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3701         }
3702         pa_sink_input_update_proplist(s->sink_input, mode, p);
3703
3704     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3705         record_stream *s;
3706
3707         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3708             pa_proplist_free(p);
3709             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3710         }
3711         pa_source_output_update_proplist(s->source_output, mode, p);
3712
3713     } else {
3714         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3715
3716         pa_client_update_proplist(c->client, mode, p);
3717     }
3718
3719     pa_pstream_send_simple_ack(c->pstream, tag);
3720     pa_proplist_free(p);
3721 }
3722
3723 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3724     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3725     uint32_t idx;
3726     unsigned changed = 0;
3727     pa_proplist *p;
3728     pa_strlist *l = NULL;
3729
3730     pa_native_connection_assert_ref(c);
3731     pa_assert(t);
3732
3733     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3734
3735     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3736
3737         if (pa_tagstruct_getu32(t, &idx) < 0) {
3738             protocol_error(c);
3739             return;
3740         }
3741     }
3742
3743     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3744         playback_stream *s;
3745
3746         s = pa_idxset_get_by_index(c->output_streams, idx);
3747         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3748         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3749
3750         p = s->sink_input->proplist;
3751
3752     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3753         record_stream *s;
3754
3755         s = pa_idxset_get_by_index(c->record_streams, idx);
3756         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3757
3758         p = s->source_output->proplist;
3759     } else {
3760         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3761
3762         p = c->client->proplist;
3763     }
3764
3765     for (;;) {
3766         const char *k;
3767
3768         if (pa_tagstruct_gets(t, &k) < 0) {
3769             protocol_error(c);
3770             pa_strlist_free(l);
3771             return;
3772         }
3773
3774         if (!k)
3775             break;
3776
3777         l = pa_strlist_prepend(l, k);
3778     }
3779
3780     if (!pa_tagstruct_eof(t)) {
3781         protocol_error(c);
3782         pa_strlist_free(l);
3783         return;
3784     }
3785
3786     for (;;) {
3787         char *z;
3788
3789         l = pa_strlist_pop(l, &z);
3790
3791         if (!z)
3792             break;
3793
3794         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3795         pa_xfree(z);
3796     }
3797
3798     pa_pstream_send_simple_ack(c->pstream, tag);
3799
3800     if (changed) {
3801         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3802             playback_stream *s;
3803
3804             s = pa_idxset_get_by_index(c->output_streams, idx);
3805             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3806
3807         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3808             record_stream *s;
3809
3810             s = pa_idxset_get_by_index(c->record_streams, idx);
3811             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3812
3813         } else {
3814             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3815             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3816         }
3817     }
3818 }
3819
3820 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3821     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3822     const char *s;
3823
3824     pa_native_connection_assert_ref(c);
3825     pa_assert(t);
3826
3827     if (pa_tagstruct_gets(t, &s) < 0 ||
3828         !pa_tagstruct_eof(t)) {
3829         protocol_error(c);
3830         return;
3831     }
3832
3833     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3834     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3835
3836     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3837         pa_source *source;
3838
3839         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3840         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3841
3842         pa_namereg_set_default_source(c->protocol->core, source);
3843     } else {
3844         pa_sink *sink;
3845         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3846
3847         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3848         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3849
3850         pa_namereg_set_default_sink(c->protocol->core, sink);
3851     }
3852
3853     pa_pstream_send_simple_ack(c->pstream, tag);
3854 }
3855
3856 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3857     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3858     uint32_t idx;
3859     const char *name;
3860
3861     pa_native_connection_assert_ref(c);
3862     pa_assert(t);
3863
3864     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3865         pa_tagstruct_gets(t, &name) < 0 ||
3866         !pa_tagstruct_eof(t)) {
3867         protocol_error(c);
3868         return;
3869     }
3870
3871     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3872     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3873
3874     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3875         playback_stream *s;
3876
3877         s = pa_idxset_get_by_index(c->output_streams, idx);
3878         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3879         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3880
3881         pa_sink_input_set_name(s->sink_input, name);
3882
3883     } else {
3884         record_stream *s;
3885         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3886
3887         s = pa_idxset_get_by_index(c->record_streams, idx);
3888         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3889
3890         pa_source_output_set_name(s->source_output, name);
3891     }
3892
3893     pa_pstream_send_simple_ack(c->pstream, tag);
3894 }
3895
3896 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3897     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3898     uint32_t idx;
3899
3900     pa_native_connection_assert_ref(c);
3901     pa_assert(t);
3902
3903     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3904         !pa_tagstruct_eof(t)) {
3905         protocol_error(c);
3906         return;
3907     }
3908
3909     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3910
3911     if (command == PA_COMMAND_KILL_CLIENT) {
3912         pa_client *client;
3913
3914         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3915         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3916
3917         pa_native_connection_ref(c);
3918         pa_client_kill(client);
3919
3920     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3921         pa_sink_input *s;
3922
3923         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3924         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3925
3926         pa_native_connection_ref(c);
3927         pa_sink_input_kill(s);
3928     } else {
3929         pa_source_output *s;
3930
3931         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
3932
3933         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3934         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3935
3936         pa_native_connection_ref(c);
3937         pa_source_output_kill(s);
3938     }
3939
3940     pa_pstream_send_simple_ack(c->pstream, tag);
3941     pa_native_connection_unref(c);
3942 }
3943
3944 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3945     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3946     pa_module *m;
3947     const char *name, *argument;
3948     pa_tagstruct *reply;
3949
3950     pa_native_connection_assert_ref(c);
3951     pa_assert(t);
3952
3953     if (pa_tagstruct_gets(t, &name) < 0 ||
3954         pa_tagstruct_gets(t, &argument) < 0 ||
3955         !pa_tagstruct_eof(t)) {
3956         protocol_error(c);
3957         return;
3958     }
3959
3960     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3961     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
3962     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
3963
3964     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
3965         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
3966         return;
3967     }
3968
3969     reply = reply_new(tag);
3970     pa_tagstruct_putu32(reply, m->index);
3971     pa_pstream_send_tagstruct(c->pstream, reply);
3972 }
3973
3974 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3975     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3976     uint32_t idx;
3977     pa_module *m;
3978
3979     pa_native_connection_assert_ref(c);
3980     pa_assert(t);
3981
3982     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3983         !pa_tagstruct_eof(t)) {
3984         protocol_error(c);
3985         return;
3986     }
3987
3988     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3989     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3990     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
3991
3992     pa_module_unload_request(m, FALSE);
3993     pa_pstream_send_simple_ack(c->pstream, tag);
3994 }
3995
3996 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3997     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3998     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
3999     const char *name_device = NULL;
4000
4001     pa_native_connection_assert_ref(c);
4002     pa_assert(t);
4003
4004     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4005         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4006         pa_tagstruct_gets(t, &name_device) < 0 ||
4007         !pa_tagstruct_eof(t)) {
4008         protocol_error(c);
4009         return;
4010     }
4011
4012     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4013     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4014
4015     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4016     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4017     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4018     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4019
4020     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4021         pa_sink_input *si = NULL;
4022         pa_sink *sink = NULL;
4023
4024         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4025
4026         if (idx_device != PA_INVALID_INDEX)
4027             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4028         else
4029             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4030
4031         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4032
4033         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4034             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4035             return;
4036         }
4037     } else {
4038         pa_source_output *so = NULL;
4039         pa_source *source;
4040
4041         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4042
4043         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4044
4045         if (idx_device != PA_INVALID_INDEX)
4046             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4047         else
4048             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4049
4050         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4051
4052         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4053             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4054             return;
4055         }
4056     }
4057
4058     pa_pstream_send_simple_ack(c->pstream, tag);
4059 }
4060
4061 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4062     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4063     uint32_t idx = PA_INVALID_INDEX;
4064     const char *name = NULL;
4065     pa_bool_t b;
4066
4067     pa_native_connection_assert_ref(c);
4068     pa_assert(t);
4069
4070     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4071         pa_tagstruct_gets(t, &name) < 0 ||
4072         pa_tagstruct_get_boolean(t, &b) < 0 ||
4073         !pa_tagstruct_eof(t)) {
4074         protocol_error(c);
4075         return;
4076     }
4077
4078     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4079     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4080     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4081     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4082     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4083
4084     if (command == PA_COMMAND_SUSPEND_SINK) {
4085
4086         if (idx == PA_INVALID_INDEX && name && !*name) {
4087
4088             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4089
4090             if (pa_sink_suspend_all(c->protocol->core, b) < 0) {
4091                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4092                 return;
4093             }
4094         } else {
4095             pa_sink *sink = NULL;
4096
4097             if (idx != PA_INVALID_INDEX)
4098                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4099             else
4100                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4101
4102             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4103
4104             if (pa_sink_suspend(sink, b) < 0) {
4105                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4106                 return;
4107             }
4108         }
4109     } else {
4110
4111         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4112
4113         if (idx == PA_INVALID_INDEX && name && !*name) {
4114
4115             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4116
4117             if (pa_source_suspend_all(c->protocol->core, b) < 0) {
4118                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4119                 return;
4120             }
4121
4122         } else {
4123             pa_source *source;
4124
4125             if (idx != PA_INVALID_INDEX)
4126                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4127             else
4128                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4129
4130             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4131
4132             if (pa_source_suspend(source, b) < 0) {
4133                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4134                 return;
4135             }
4136         }
4137     }
4138
4139     pa_pstream_send_simple_ack(c->pstream, tag);
4140 }
4141
4142 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4143     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4144     uint32_t idx = PA_INVALID_INDEX;
4145     const char *name = NULL;
4146     pa_module *m;
4147     pa_native_protocol_ext_cb_t cb;
4148
4149     pa_native_connection_assert_ref(c);
4150     pa_assert(t);
4151
4152     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4153         pa_tagstruct_gets(t, &name) < 0) {
4154         protocol_error(c);
4155         return;
4156     }
4157
4158     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4159     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4160     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4161     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4162     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4163
4164     if (idx != PA_INVALID_INDEX)
4165         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4166     else {
4167         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4168             if (strcmp(name, m->name) == 0)
4169                 break;
4170     }
4171
4172     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4173     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4174
4175     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4176     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4177
4178     if (cb(c->protocol, m, c, tag, t) < 0)
4179         protocol_error(c);
4180 }
4181
4182 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4183     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4184     uint32_t idx = PA_INVALID_INDEX;
4185     const char *name = NULL, *profile = NULL;
4186     pa_card *card = NULL;
4187
4188     pa_native_connection_assert_ref(c);
4189     pa_assert(t);
4190
4191     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4192         pa_tagstruct_gets(t, &name) < 0 ||
4193         pa_tagstruct_gets(t, &profile) < 0 ||
4194         !pa_tagstruct_eof(t)) {
4195         protocol_error(c);
4196         return;
4197     }
4198
4199     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4200     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4201     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4202     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4203     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4204
4205     if (idx != PA_INVALID_INDEX)
4206         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4207     else
4208         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4209
4210     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4211
4212     if (pa_card_set_profile(card, profile, TRUE) < 0) {
4213         pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4214         return;
4215     }
4216
4217     pa_pstream_send_simple_ack(c->pstream, tag);
4218 }
4219
4220 /*** pstream callbacks ***/
4221
4222 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4223     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4224
4225     pa_assert(p);
4226     pa_assert(packet);
4227     pa_native_connection_assert_ref(c);
4228
4229     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4230         pa_log("invalid packet.");
4231         native_connection_unlink(c);
4232     }
4233 }
4234
4235 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4236     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4237     output_stream *stream;
4238
4239     pa_assert(p);
4240     pa_assert(chunk);
4241     pa_native_connection_assert_ref(c);
4242
4243     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4244         pa_log("client sent block for invalid stream.");
4245         /* Ignoring */
4246         return;
4247     }
4248
4249 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4250
4251     if (playback_stream_isinstance(stream)) {
4252         playback_stream *ps = PLAYBACK_STREAM(stream);
4253
4254         if (chunk->memblock) {
4255             if (seek != PA_SEEK_RELATIVE || offset != 0)
4256                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4257
4258             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4259         } else
4260             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4261
4262     } else {
4263         upload_stream *u = UPLOAD_STREAM(stream);
4264         size_t l;
4265
4266         if (!u->memchunk.memblock) {
4267             if (u->length == chunk->length && chunk->memblock) {
4268                 u->memchunk = *chunk;
4269                 pa_memblock_ref(u->memchunk.memblock);
4270                 u->length = 0;
4271             } else {
4272                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4273                 u->memchunk.index = u->memchunk.length = 0;
4274             }
4275         }
4276
4277         pa_assert(u->memchunk.memblock);
4278
4279         l = u->length;
4280         if (l > chunk->length)
4281             l = chunk->length;
4282
4283         if (l > 0) {
4284             void *dst;
4285             dst = pa_memblock_acquire(u->memchunk.memblock);
4286
4287             if (chunk->memblock) {
4288                 void *src;
4289                 src = pa_memblock_acquire(chunk->memblock);
4290
4291                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4292                        (uint8_t*) src + chunk->index, l);
4293
4294                 pa_memblock_release(chunk->memblock);
4295             } else
4296                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4297
4298             pa_memblock_release(u->memchunk.memblock);
4299
4300             u->memchunk.length += l;
4301             u->length -= l;
4302         }
4303     }
4304 }
4305
4306 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4307     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4308
4309     pa_assert(p);
4310     pa_native_connection_assert_ref(c);
4311
4312     native_connection_unlink(c);
4313     pa_log_info("Connection died.");
4314 }
4315
4316 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4317     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4318
4319     pa_assert(p);
4320     pa_native_connection_assert_ref(c);
4321
4322     native_connection_send_memblock(c);
4323 }
4324
4325 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4326     pa_thread_mq *q;
4327
4328     if (!(q = pa_thread_mq_get()))
4329         pa_pstream_send_revoke(p, block_id);
4330     else
4331         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4332 }
4333
4334 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4335     pa_thread_mq *q;
4336
4337     if (!(q = pa_thread_mq_get()))
4338         pa_pstream_send_release(p, block_id);
4339     else
4340         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4341 }
4342
4343 /*** client callbacks ***/
4344
4345 static void client_kill_cb(pa_client *c) {
4346     pa_assert(c);
4347
4348     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4349     pa_log_info("Connection killed.");
4350 }
4351
4352 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4353     pa_tagstruct *t;
4354     pa_native_connection *c;
4355
4356     pa_assert(client);
4357     c = PA_NATIVE_CONNECTION(client->userdata);
4358     pa_native_connection_assert_ref(c);
4359
4360     if (c->version < 15)
4361       return;
4362
4363     t = pa_tagstruct_new(NULL, 0);
4364     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4365     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4366     pa_tagstruct_puts(t, event);
4367     pa_tagstruct_put_proplist(t, pl);
4368     pa_pstream_send_tagstruct(c->pstream, t);
4369 }
4370
4371 /*** module entry points ***/
4372
4373 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
4374     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4375
4376     pa_assert(m);
4377     pa_assert(tv);
4378     pa_native_connection_assert_ref(c);
4379     pa_assert(c->auth_timeout_event == e);
4380
4381     if (!c->authorized) {
4382         native_connection_unlink(c);
4383         pa_log_info("Connection terminated due to authentication timeout.");
4384     }
4385 }
4386
4387 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4388     pa_native_connection *c;
4389     char pname[128];
4390     pa_client *client;
4391     pa_client_new_data data;
4392
4393     pa_assert(p);
4394     pa_assert(io);
4395     pa_assert(o);
4396
4397     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4398         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4399         pa_iochannel_free(io);
4400         return;
4401     }
4402
4403     pa_client_new_data_init(&data);
4404     data.module = o->module;
4405     data.driver = __FILE__;
4406     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4407     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4408     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4409     client = pa_client_new(p->core, &data);
4410     pa_client_new_data_done(&data);
4411
4412     if (!client)
4413         return;
4414
4415     c = pa_msgobject_new(pa_native_connection);
4416     c->parent.parent.free = native_connection_free;
4417     c->parent.process_msg = native_connection_process_msg;
4418     c->protocol = p;
4419     c->options = pa_native_options_ref(o);
4420     c->authorized = FALSE;
4421
4422     if (o->auth_anonymous) {
4423         pa_log_info("Client authenticated anonymously.");
4424         c->authorized = TRUE;
4425     }
4426
4427     if (!c->authorized &&
4428         o->auth_ip_acl &&
4429         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4430
4431         pa_log_info("Client authenticated by IP ACL.");
4432         c->authorized = TRUE;
4433     }
4434
4435     if (!c->authorized) {
4436         struct timeval tv;
4437         pa_gettimeofday(&tv);
4438         tv.tv_sec += AUTH_TIMEOUT;
4439         c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c);
4440     } else
4441         c->auth_timeout_event = NULL;
4442
4443     c->is_local = pa_iochannel_socket_is_local(io);
4444     c->version = 8;
4445
4446     c->client = client;
4447     c->client->kill = client_kill_cb;
4448     c->client->send_event = client_send_event_cb;
4449     c->client->userdata = c;
4450
4451     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4452     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4453     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4454     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4455     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4456     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4457     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4458
4459     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
4460
4461     c->record_streams = pa_idxset_new(NULL, NULL);
4462     c->output_streams = pa_idxset_new(NULL, NULL);
4463
4464     c->rrobin_index = PA_IDXSET_INVALID;
4465     c->subscription = NULL;
4466
4467     pa_idxset_put(p->connections, c, NULL);
4468
4469 #ifdef HAVE_CREDS
4470     if (pa_iochannel_creds_supported(io))
4471         pa_iochannel_creds_enable(io);
4472 #endif
4473
4474     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4475 }
4476
4477 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4478     pa_native_connection *c;
4479     void *state = NULL;
4480
4481     pa_assert(p);
4482     pa_assert(m);
4483
4484     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4485         if (c->options->module == m)
4486             native_connection_unlink(c);
4487 }
4488
4489 static pa_native_protocol* native_protocol_new(pa_core *c) {
4490     pa_native_protocol *p;
4491     pa_native_hook_t h;
4492
4493     pa_assert(c);
4494
4495     p = pa_xnew(pa_native_protocol, 1);
4496     PA_REFCNT_INIT(p);
4497     p->core = c;
4498     p->connections = pa_idxset_new(NULL, NULL);
4499
4500     p->servers = NULL;
4501
4502     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4503
4504     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4505         pa_hook_init(&p->hooks[h], p);
4506
4507     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4508
4509     return p;
4510 }
4511
4512 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4513     pa_native_protocol *p;
4514
4515     if ((p = pa_shared_get(c, "native-protocol")))
4516         return pa_native_protocol_ref(p);
4517
4518     return native_protocol_new(c);
4519 }
4520
4521 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4522     pa_assert(p);
4523     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4524
4525     PA_REFCNT_INC(p);
4526
4527     return p;
4528 }
4529
4530 void pa_native_protocol_unref(pa_native_protocol *p) {
4531     pa_native_connection *c;
4532     pa_native_hook_t h;
4533
4534     pa_assert(p);
4535     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4536
4537     if (PA_REFCNT_DEC(p) > 0)
4538         return;
4539
4540     while ((c = pa_idxset_first(p->connections, NULL)))
4541         native_connection_unlink(c);
4542
4543     pa_idxset_free(p->connections, NULL, NULL);
4544
4545     pa_strlist_free(p->servers);
4546
4547     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4548         pa_hook_done(&p->hooks[h]);
4549
4550     pa_hashmap_free(p->extensions, NULL, NULL);
4551
4552     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4553
4554     pa_xfree(p);
4555 }
4556
4557 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4558     pa_assert(p);
4559     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4560     pa_assert(name);
4561
4562     p->servers = pa_strlist_prepend(p->servers, name);
4563
4564     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4565 }
4566
4567 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4568     pa_assert(p);
4569     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4570     pa_assert(name);
4571
4572     p->servers = pa_strlist_remove(p->servers, name);
4573
4574     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4575 }
4576
4577 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4578     pa_assert(p);
4579     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4580
4581     return p->hooks;
4582 }
4583
4584 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4585     pa_assert(p);
4586     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4587
4588     return p->servers;
4589 }
4590
4591 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4592     pa_assert(p);
4593     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4594     pa_assert(m);
4595     pa_assert(cb);
4596     pa_assert(!pa_hashmap_get(p->extensions, m));
4597
4598     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4599     return 0;
4600 }
4601
4602 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4603     pa_assert(p);
4604     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4605     pa_assert(m);
4606
4607     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4608 }
4609
4610 pa_native_options* pa_native_options_new(void) {
4611     pa_native_options *o;
4612
4613     o = pa_xnew0(pa_native_options, 1);
4614     PA_REFCNT_INIT(o);
4615
4616     return o;
4617 }
4618
4619 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4620     pa_assert(o);
4621     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4622
4623     PA_REFCNT_INC(o);
4624
4625     return o;
4626 }
4627
4628 void pa_native_options_unref(pa_native_options *o) {
4629     pa_assert(o);
4630     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4631
4632     if (PA_REFCNT_DEC(o) > 0)
4633         return;
4634
4635     pa_xfree(o->auth_group);
4636
4637     if (o->auth_ip_acl)
4638         pa_ip_acl_free(o->auth_ip_acl);
4639
4640     if (o->auth_cookie)
4641         pa_auth_cookie_unref(o->auth_cookie);
4642
4643     pa_xfree(o);
4644 }
4645
4646 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4647     pa_bool_t enabled;
4648     const char *acl;
4649
4650     pa_assert(o);
4651     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4652     pa_assert(ma);
4653
4654     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4655         pa_log("auth-anonymous= expects a boolean argument.");
4656         return -1;
4657     }
4658
4659     enabled = TRUE;
4660     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4661         pa_log("auth-group-enabled= expects a boolean argument.");
4662         return -1;
4663     }
4664
4665     pa_xfree(o->auth_group);
4666     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4667
4668 #ifndef HAVE_CREDS
4669     if (o->auth_group)
4670         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4671 #endif
4672
4673     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4674         pa_ip_acl *ipa;
4675
4676         if (!(ipa = pa_ip_acl_new(acl))) {
4677             pa_log("Failed to parse IP ACL '%s'", acl);
4678             return -1;
4679         }
4680
4681         if (o->auth_ip_acl)
4682             pa_ip_acl_free(o->auth_ip_acl);
4683
4684         o->auth_ip_acl = ipa;
4685     }
4686
4687     enabled = TRUE;
4688     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4689         pa_log("auth-cookie-enabled= expects a boolean argument.");
4690         return -1;
4691     }
4692
4693     if (o->auth_cookie)
4694         pa_auth_cookie_unref(o->auth_cookie);
4695
4696     if (enabled) {
4697         const char *cn;
4698
4699         /* The new name for this is 'auth-cookie', for compat reasons
4700          * we check the old name too */
4701         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4702             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4703                 cn = PA_NATIVE_COOKIE_FILE;
4704
4705         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4706             return -1;
4707
4708     } else
4709           o->auth_cookie = NULL;
4710
4711     return 0;
4712 }
4713
4714 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4715     pa_native_connection_assert_ref(c);
4716
4717     return c->pstream;
4718 }