aecaf71c95cce81f69232cf20395c12ba136414b
[platform/upstream/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/timeval.h>
33 #include <pulse/version.h>
34 #include <pulse/utf8.h>
35 #include <pulse/util.h>
36 #include <pulse/xmalloc.h>
37
38 #include <pulsecore/native-common.h>
39 #include <pulsecore/packet.h>
40 #include <pulsecore/client.h>
41 #include <pulsecore/source-output.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/pstream.h>
44 #include <pulsecore/tagstruct.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream-util.h>
47 #include <pulsecore/authkey.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/core-subscribe.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/strlist.h>
53 #include <pulsecore/shared.h>
54 #include <pulsecore/sample-util.h>
55 #include <pulsecore/llist.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* Kick a client if it doesn't authenticate within this time */
64 #define AUTH_TIMEOUT 60
65
66 /* Don't accept more connection than this */
67 #define MAX_CONNECTIONS 64
68
69 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
70 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
71 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
72 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
73
74 struct pa_native_protocol;
75
76 typedef struct record_stream {
77     pa_msgobject parent;
78
79     pa_native_connection *connection;
80     uint32_t index;
81
82     pa_source_output *source_output;
83     pa_memblockq *memblockq;
84
85     pa_bool_t adjust_latency:1;
86     pa_bool_t early_requests:1;
87
88     pa_buffer_attr buffer_attr;
89
90     pa_atomic_t on_the_fly;
91     pa_usec_t configured_source_latency;
92     size_t drop_initial;
93
94     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
95     size_t on_the_fly_snapshot;
96     pa_usec_t current_monitor_latency;
97     pa_usec_t current_source_latency;
98 } record_stream;
99
100 PA_DECLARE_CLASS(record_stream);
101 #define RECORD_STREAM(o) (record_stream_cast(o))
102 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
103
104 typedef struct output_stream {
105     pa_msgobject parent;
106 } output_stream;
107
108 PA_DECLARE_CLASS(output_stream);
109 #define OUTPUT_STREAM(o) (output_stream_cast(o))
110 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
111
112 typedef struct playback_stream {
113     output_stream parent;
114
115     pa_native_connection *connection;
116     uint32_t index;
117
118     pa_sink_input *sink_input;
119     pa_memblockq *memblockq;
120
121     pa_bool_t adjust_latency:1;
122     pa_bool_t early_requests:1;
123
124     pa_bool_t is_underrun:1;
125     pa_bool_t drain_request:1;
126     uint32_t drain_tag;
127     uint32_t syncid;
128
129     pa_atomic_t missing;
130     pa_usec_t configured_sink_latency;
131     pa_buffer_attr buffer_attr;
132
133     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
134     int64_t read_index, write_index;
135     size_t render_memblockq_length;
136     pa_usec_t current_sink_latency;
137     uint64_t playing_for, underrun_for;
138 } playback_stream;
139
140 PA_DECLARE_CLASS(playback_stream);
141 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
142 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
143
144 typedef struct upload_stream {
145     output_stream parent;
146
147     pa_native_connection *connection;
148     uint32_t index;
149
150     pa_memchunk memchunk;
151     size_t length;
152     char *name;
153     pa_sample_spec sample_spec;
154     pa_channel_map channel_map;
155     pa_proplist *proplist;
156 } upload_stream;
157
158 PA_DECLARE_CLASS(upload_stream);
159 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
160 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
161
162 struct pa_native_connection {
163     pa_msgobject parent;
164     pa_native_protocol *protocol;
165     pa_native_options *options;
166     pa_bool_t authorized:1;
167     pa_bool_t is_local:1;
168     uint32_t version;
169     pa_client *client;
170     pa_pstream *pstream;
171     pa_pdispatch *pdispatch;
172     pa_idxset *record_streams, *output_streams;
173     uint32_t rrobin_index;
174     pa_subscription *subscription;
175     pa_time_event *auth_timeout_event;
176 };
177
178 PA_DECLARE_CLASS(pa_native_connection);
179 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
180 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
181
182 struct pa_native_protocol {
183     PA_REFCNT_DECLARE;
184
185     pa_core *core;
186     pa_idxset *connections;
187
188     pa_strlist *servers;
189     pa_hook hooks[PA_NATIVE_HOOK_MAX];
190
191     pa_hashmap *extensions;
192 };
193
194 enum {
195     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
196 };
197
198 enum {
199     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
200     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
201     SINK_INPUT_MESSAGE_FLUSH,
202     SINK_INPUT_MESSAGE_TRIGGER,
203     SINK_INPUT_MESSAGE_SEEK,
204     SINK_INPUT_MESSAGE_PREBUF_FORCE,
205     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
206     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
207 };
208
209 enum {
210     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
211     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
212     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
213     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
214     PLAYBACK_STREAM_MESSAGE_STARTED,
215     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
216 };
217
218 enum {
219     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
220 };
221
222 enum {
223     CONNECTION_MESSAGE_RELEASE,
224     CONNECTION_MESSAGE_REVOKE
225 };
226
227 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
228 static void sink_input_kill_cb(pa_sink_input *i);
229 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
230 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
231 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
232 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
235
236 static void native_connection_send_memblock(pa_native_connection *c);
237 static void playback_stream_request_bytes(struct playback_stream*s);
238
239 static void source_output_kill_cb(pa_source_output *o);
240 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
241 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
242 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
243 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
244 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
245
246 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
247 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248
249 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287
288 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
289     [PA_COMMAND_ERROR] = NULL,
290     [PA_COMMAND_TIMEOUT] = NULL,
291     [PA_COMMAND_REPLY] = NULL,
292     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
293     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
294     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
295     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
296     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
297     [PA_COMMAND_AUTH] = command_auth,
298     [PA_COMMAND_REQUEST] = NULL,
299     [PA_COMMAND_EXIT] = command_exit,
300     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
301     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
302     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
303     [PA_COMMAND_STAT] = command_stat,
304     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
305     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
306     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
307     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
308     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
309     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
310     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
311     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
312     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
313     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
314     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
315     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
316     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
317     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
318     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
319     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
320     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
321     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
328     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
329
330     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
331     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
332     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
333
334     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
335     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
336     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
337
338     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
339     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
340
341     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
342     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
343     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
344     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345
346     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
347     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
348
349     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
350     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
351     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
352     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
353     [PA_COMMAND_KILL_CLIENT] = command_kill,
354     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
355     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
356     [PA_COMMAND_LOAD_MODULE] = command_load_module,
357     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
358
359     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
360     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
361     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
362     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
363
364     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
365     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
366
367     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
368     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
369
370     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
371     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
372
373     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
374     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
375     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
376
377     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
378     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
379     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
380
381     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
382
383     [PA_COMMAND_EXTENSION] = command_extension
384 };
385
386 /* structure management */
387
388 /* Called from main context */
389 static void upload_stream_unlink(upload_stream *s) {
390     pa_assert(s);
391
392     if (!s->connection)
393         return;
394
395     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
396     s->connection = NULL;
397     upload_stream_unref(s);
398 }
399
400 /* Called from main context */
401 static void upload_stream_free(pa_object *o) {
402     upload_stream *s = UPLOAD_STREAM(o);
403     pa_assert(s);
404
405     upload_stream_unlink(s);
406
407     pa_xfree(s->name);
408
409     if (s->proplist)
410         pa_proplist_free(s->proplist);
411
412     if (s->memchunk.memblock)
413         pa_memblock_unref(s->memchunk.memblock);
414
415     pa_xfree(s);
416 }
417
418 /* Called from main context */
419 static upload_stream* upload_stream_new(
420         pa_native_connection *c,
421         const pa_sample_spec *ss,
422         const pa_channel_map *map,
423         const char *name,
424         size_t length,
425         pa_proplist *p) {
426
427     upload_stream *s;
428
429     pa_assert(c);
430     pa_assert(ss);
431     pa_assert(name);
432     pa_assert(length > 0);
433     pa_assert(p);
434
435     s = pa_msgobject_new(upload_stream);
436     s->parent.parent.parent.free = upload_stream_free;
437     s->connection = c;
438     s->sample_spec = *ss;
439     s->channel_map = *map;
440     s->name = pa_xstrdup(name);
441     pa_memchunk_reset(&s->memchunk);
442     s->length = length;
443     s->proplist = pa_proplist_copy(p);
444     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
445
446     pa_idxset_put(c->output_streams, s, &s->index);
447
448     return s;
449 }
450
451 /* Called from main context */
452 static void record_stream_unlink(record_stream *s) {
453     pa_assert(s);
454
455     if (!s->connection)
456         return;
457
458     if (s->source_output) {
459         pa_source_output_unlink(s->source_output);
460         pa_source_output_unref(s->source_output);
461         s->source_output = NULL;
462     }
463
464     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
465     s->connection = NULL;
466     record_stream_unref(s);
467 }
468
469 /* Called from main context */
470 static void record_stream_free(pa_object *o) {
471     record_stream *s = RECORD_STREAM(o);
472     pa_assert(s);
473
474     record_stream_unlink(s);
475
476     pa_memblockq_free(s->memblockq);
477     pa_xfree(s);
478 }
479
480 /* Called from main context */
481 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
482     record_stream *s = RECORD_STREAM(o);
483     record_stream_assert_ref(s);
484
485     if (!s->connection)
486         return -1;
487
488     switch (code) {
489
490         case RECORD_STREAM_MESSAGE_POST_DATA:
491
492             /* We try to keep up to date with how many bytes are
493              * currently on the fly */
494             pa_atomic_sub(&s->on_the_fly, chunk->length);
495
496             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
497 /*                 pa_log_warn("Failed to push data into output queue."); */
498                 return -1;
499             }
500
501             if (!pa_pstream_is_pending(s->connection->pstream))
502                 native_connection_send_memblock(s->connection);
503
504             break;
505     }
506
507     return 0;
508 }
509
510 /* Called from main context */
511 static void fix_record_buffer_attr_pre(record_stream *s) {
512
513     size_t frame_size;
514     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
515
516     pa_assert(s);
517
518     /* This function will be called from the main thread, before as
519      * well as after the source output has been activated using
520      * pa_source_output_put()! That means it may not touch any
521      * ->thread_info data! */
522
523     frame_size = pa_frame_size(&s->source_output->sample_spec);
524
525     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
526         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
527     if (s->buffer_attr.maxlength <= 0)
528         s->buffer_attr.maxlength = (uint32_t) frame_size;
529
530     if (s->buffer_attr.fragsize == (uint32_t) -1)
531         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
532     if (s->buffer_attr.fragsize <= 0)
533         s->buffer_attr.fragsize = (uint32_t) frame_size;
534
535     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
536
537     if (s->early_requests) {
538
539         /* In early request mode we need to emulate the classic
540          * fragment-based playback model. We do this setting the source
541          * latency to the fragment size. */
542
543         source_usec = fragsize_usec;
544
545     } else if (s->adjust_latency) {
546
547         /* So, the user asked us to adjust the latency according to
548          * what the source can provide. Half the latency will be
549          * spent on the hw buffer, half of it in the async buffer
550          * queue we maintain for each client. */
551
552         source_usec = fragsize_usec/2;
553
554     } else {
555
556         /* Ok, the user didn't ask us to adjust the latency, hence we
557          * don't */
558
559         source_usec = (pa_usec_t) -1;
560     }
561
562     if (source_usec != (pa_usec_t) -1)
563         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
564     else
565         s->configured_source_latency = 0;
566
567     if (s->early_requests) {
568
569         /* Ok, we didn't necessarily get what we were asking for, so
570          * let's tell the user */
571
572         fragsize_usec = s->configured_source_latency;
573
574     } else if (s->adjust_latency) {
575
576         /* Now subtract what we actually got */
577
578         if (fragsize_usec >= s->configured_source_latency*2)
579             fragsize_usec -= s->configured_source_latency;
580         else
581             fragsize_usec = s->configured_source_latency;
582     }
583
584     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
585         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
586
587         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
588
589     if (s->buffer_attr.fragsize <= 0)
590         s->buffer_attr.fragsize = (uint32_t) frame_size;
591 }
592
593 /* Called from main context */
594 static void fix_record_buffer_attr_post(record_stream *s) {
595     size_t base;
596
597     pa_assert(s);
598
599     /* This function will be called from the main thread, before as
600      * well as after the source output has been activated using
601      * pa_source_output_put()! That means it may not touch and
602      * ->thread_info data! */
603
604     base = pa_frame_size(&s->source_output->sample_spec);
605
606     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
607     if (s->buffer_attr.fragsize <= 0)
608         s->buffer_attr.fragsize = base;
609
610     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
611         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
612 }
613
614 /* Called from main context */
615 static record_stream* record_stream_new(
616         pa_native_connection *c,
617         pa_source *source,
618         pa_sample_spec *ss,
619         pa_channel_map *map,
620         pa_bool_t peak_detect,
621         pa_buffer_attr *attr,
622         pa_source_output_flags_t flags,
623         pa_proplist *p,
624         pa_bool_t adjust_latency,
625         pa_sink_input *direct_on_input,
626         pa_bool_t early_requests,
627         int *ret) {
628
629     record_stream *s;
630     pa_source_output *source_output = NULL;
631     size_t base;
632     pa_source_output_new_data data;
633
634     pa_assert(c);
635     pa_assert(ss);
636     pa_assert(p);
637     pa_assert(ret);
638
639     pa_source_output_new_data_init(&data);
640
641     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
642     data.driver = __FILE__;
643     data.module = c->options->module;
644     data.client = c->client;
645     data.source = source;
646     data.direct_on_input = direct_on_input;
647     pa_source_output_new_data_set_sample_spec(&data, ss);
648     pa_source_output_new_data_set_channel_map(&data, map);
649     if (peak_detect)
650         data.resample_method = PA_RESAMPLER_PEAKS;
651
652     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
653
654     pa_source_output_new_data_done(&data);
655
656     if (!source_output)
657         return NULL;
658
659     s = pa_msgobject_new(record_stream);
660     s->parent.parent.free = record_stream_free;
661     s->parent.process_msg = record_stream_process_msg;
662     s->connection = c;
663     s->source_output = source_output;
664     s->buffer_attr = *attr;
665     s->adjust_latency = adjust_latency;
666     s->early_requests = early_requests;
667     pa_atomic_store(&s->on_the_fly, 0);
668
669     s->source_output->parent.process_msg = source_output_process_msg;
670     s->source_output->push = source_output_push_cb;
671     s->source_output->kill = source_output_kill_cb;
672     s->source_output->get_latency = source_output_get_latency_cb;
673     s->source_output->moving = source_output_moving_cb;
674     s->source_output->suspend = source_output_suspend_cb;
675     s->source_output->send_event = source_output_send_event_cb;
676     s->source_output->userdata = s;
677
678     fix_record_buffer_attr_pre(s);
679
680     s->memblockq = pa_memblockq_new(
681             0,
682             s->buffer_attr.maxlength,
683             0,
684             base = pa_frame_size(&source_output->sample_spec),
685             1,
686             0,
687             0,
688             NULL);
689
690     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
691     fix_record_buffer_attr_post(s);
692
693     *ss = s->source_output->sample_spec;
694     *map = s->source_output->channel_map;
695
696     pa_idxset_put(c->record_streams, s, &s->index);
697
698     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
699                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
700                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
701                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
702
703     pa_source_output_put(s->source_output);
704     return s;
705 }
706
707 /* Called from main context */
708 static void record_stream_send_killed(record_stream *r) {
709     pa_tagstruct *t;
710     record_stream_assert_ref(r);
711
712     t = pa_tagstruct_new(NULL, 0);
713     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
714     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
715     pa_tagstruct_putu32(t, r->index);
716     pa_pstream_send_tagstruct(r->connection->pstream, t);
717 }
718
719 /* Called from main context */
720 static void playback_stream_unlink(playback_stream *s) {
721     pa_assert(s);
722
723     if (!s->connection)
724         return;
725
726     if (s->sink_input) {
727         pa_sink_input_unlink(s->sink_input);
728         pa_sink_input_unref(s->sink_input);
729         s->sink_input = NULL;
730     }
731
732     if (s->drain_request)
733         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
734
735     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
736     s->connection = NULL;
737     playback_stream_unref(s);
738 }
739
740 /* Called from main context */
741 static void playback_stream_free(pa_object* o) {
742     playback_stream *s = PLAYBACK_STREAM(o);
743     pa_assert(s);
744
745     playback_stream_unlink(s);
746
747     pa_memblockq_free(s->memblockq);
748     pa_xfree(s);
749 }
750
751 /* Called from main context */
752 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
753     playback_stream *s = PLAYBACK_STREAM(o);
754     playback_stream_assert_ref(s);
755
756     if (!s->connection)
757         return -1;
758
759     switch (code) {
760         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
761             pa_tagstruct *t;
762             int l = 0;
763
764             for (;;) {
765                 if ((l = pa_atomic_load(&s->missing)) <= 0)
766                     return 0;
767
768                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
769                     break;
770             }
771
772             t = pa_tagstruct_new(NULL, 0);
773             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
774             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
775             pa_tagstruct_putu32(t, s->index);
776             pa_tagstruct_putu32(t, (uint32_t) l);
777             pa_pstream_send_tagstruct(s->connection->pstream, t);
778
779 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
780             break;
781         }
782
783         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
784             pa_tagstruct *t;
785
786 /*             pa_log("signalling underflow"); */
787
788             /* Report that we're empty */
789             t = pa_tagstruct_new(NULL, 0);
790             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
791             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
792             pa_tagstruct_putu32(t, s->index);
793             pa_pstream_send_tagstruct(s->connection->pstream, t);
794             break;
795         }
796
797         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
798             pa_tagstruct *t;
799
800             /* Notify the user we're overflowed*/
801             t = pa_tagstruct_new(NULL, 0);
802             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
803             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
804             pa_tagstruct_putu32(t, s->index);
805             pa_pstream_send_tagstruct(s->connection->pstream, t);
806             break;
807         }
808
809         case PLAYBACK_STREAM_MESSAGE_STARTED:
810
811             if (s->connection->version >= 13) {
812                 pa_tagstruct *t;
813
814                 /* Notify the user we started playback */
815                 t = pa_tagstruct_new(NULL, 0);
816                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
817                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
818                 pa_tagstruct_putu32(t, s->index);
819                 pa_pstream_send_tagstruct(s->connection->pstream, t);
820             }
821
822             break;
823
824         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
825             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
826             break;
827
828         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
829             pa_tagstruct *t;
830
831             s->buffer_attr.tlength = (uint32_t) offset;
832
833             t = pa_tagstruct_new(NULL, 0);
834             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
835             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
836             pa_tagstruct_putu32(t, s->index);
837             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
838             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
839             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
840             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
841             pa_tagstruct_put_usec(t, s->configured_sink_latency);
842             pa_pstream_send_tagstruct(s->connection->pstream, t);
843
844             break;
845         }
846     }
847
848     return 0;
849 }
850
851 /* Called from main context */
852 static void fix_playback_buffer_attr(playback_stream *s) {
853     size_t frame_size, max_prebuf;
854     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
855
856     pa_assert(s);
857
858     /* This function will be called from the main thread, before as
859      * well as after the sink input has been activated using
860      * pa_sink_input_put()! That means it may not touch any
861      * ->thread_info data, such as the memblockq! */
862
863     frame_size = pa_frame_size(&s->sink_input->sample_spec);
864
865     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
866         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
867     if (s->buffer_attr.maxlength <= 0)
868         s->buffer_attr.maxlength = (uint32_t) frame_size;
869
870     if (s->buffer_attr.tlength == (uint32_t) -1)
871         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
872     if (s->buffer_attr.tlength <= 0)
873         s->buffer_attr.tlength = (uint32_t) frame_size;
874
875     if (s->buffer_attr.minreq == (uint32_t) -1)
876         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
877     if (s->buffer_attr.minreq <= 0)
878         s->buffer_attr.minreq = (uint32_t) frame_size;
879
880     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
881         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
882
883     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
884     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
885
886     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
887                 (double) tlength_usec / PA_USEC_PER_MSEC,
888                 (double) minreq_usec / PA_USEC_PER_MSEC);
889
890     if (s->early_requests) {
891
892         /* In early request mode we need to emulate the classic
893          * fragment-based playback model. We do this setting the sink
894          * latency to the fragment size. */
895
896         sink_usec = minreq_usec;
897         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
898
899     } else if (s->adjust_latency) {
900
901         /* So, the user asked us to adjust the latency of the stream
902          * buffer according to the what the sink can provide. The
903          * tlength passed in shall be the overall latency. Roughly
904          * half the latency will be spent on the hw buffer, the other
905          * half of it in the async buffer queue we maintain for each
906          * client. In between we'll have a safety space of size
907          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
908          * empty and needs to be filled, then our buffer must have
909          * enough data to fulfill this request immediatly and thus
910          * have at least the same tlength as the size of the hw
911          * buffer. It additionally needs space for 2 times minreq
912          * because if the buffer ran empty and a partial fillup
913          * happens immediately on the next iteration we need to be
914          * able to fulfill it and give the application also minreq
915          * time to fill it up again for the next request Makes 2 times
916          * minreq in plus.. */
917
918         if (tlength_usec > minreq_usec*2)
919             sink_usec = (tlength_usec - minreq_usec*2)/2;
920         else
921             sink_usec = 0;
922
923         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
924
925     } else {
926
927         /* Ok, the user didn't ask us to adjust the latency, but we
928          * still need to make sure that the parameters from the user
929          * do make sense. */
930
931         if (tlength_usec > minreq_usec*2)
932             sink_usec = (tlength_usec - minreq_usec*2);
933         else
934             sink_usec = 0;
935
936         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
937     }
938
939     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
940
941     if (s->early_requests) {
942
943         /* Ok, we didn't necessarily get what we were asking for, so
944          * let's tell the user */
945
946         minreq_usec = s->configured_sink_latency;
947
948     } else if (s->adjust_latency) {
949
950         /* Ok, we didn't necessarily get what we were asking for, so
951          * let's subtract from what we asked for for the remaining
952          * buffer space */
953
954         if (tlength_usec >= s->configured_sink_latency)
955             tlength_usec -= s->configured_sink_latency;
956     }
957
958     /* FIXME: This is actually larger than necessary, since not all of
959      * the sink latency is actually rewritable. */
960     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
961         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
962
963     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
964         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
965         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
966
967     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
968         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
969         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
970
971     if (s->buffer_attr.minreq <= 0) {
972         s->buffer_attr.minreq = (uint32_t) frame_size;
973         s->buffer_attr.tlength += (uint32_t) frame_size*2;
974     }
975
976     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
977         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
978
979     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
980
981     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
982         s->buffer_attr.prebuf > max_prebuf)
983         s->buffer_attr.prebuf = max_prebuf;
984 }
985
986 /* Called from main context */
987 static playback_stream* playback_stream_new(
988         pa_native_connection *c,
989         pa_sink *sink,
990         pa_sample_spec *ss,
991         pa_channel_map *map,
992         pa_buffer_attr *a,
993         pa_cvolume *volume,
994         pa_bool_t muted,
995         pa_bool_t muted_set,
996         uint32_t syncid,
997         uint32_t *missing,
998         pa_sink_input_flags_t flags,
999         pa_proplist *p,
1000         pa_bool_t adjust_latency,
1001         pa_bool_t early_requests,
1002         int *ret) {
1003
1004     playback_stream *s, *ssync;
1005     pa_sink_input *sink_input = NULL;
1006     pa_memchunk silence;
1007     uint32_t idx;
1008     int64_t start_index;
1009     pa_sink_input_new_data data;
1010
1011     pa_assert(c);
1012     pa_assert(ss);
1013     pa_assert(missing);
1014     pa_assert(p);
1015     pa_assert(ret);
1016
1017     /* Find syncid group */
1018     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1019
1020         if (!playback_stream_isinstance(ssync))
1021             continue;
1022
1023         if (ssync->syncid == syncid)
1024             break;
1025     }
1026
1027     /* Synced streams must connect to the same sink */
1028     if (ssync) {
1029
1030         if (!sink)
1031             sink = ssync->sink_input->sink;
1032         else if (sink != ssync->sink_input->sink) {
1033             *ret = PA_ERR_INVALID;
1034             return NULL;
1035         }
1036     }
1037
1038     pa_sink_input_new_data_init(&data);
1039
1040     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1041     data.driver = __FILE__;
1042     data.module = c->options->module;
1043     data.client = c->client;
1044     data.sink = sink;
1045     pa_sink_input_new_data_set_sample_spec(&data, ss);
1046     pa_sink_input_new_data_set_channel_map(&data, map);
1047     if (volume)
1048         pa_sink_input_new_data_set_volume(&data, volume);
1049     if (muted_set)
1050         pa_sink_input_new_data_set_muted(&data, muted);
1051     data.sync_base = ssync ? ssync->sink_input : NULL;
1052
1053     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1054
1055     pa_sink_input_new_data_done(&data);
1056
1057     if (!sink_input)
1058         return NULL;
1059
1060     s = pa_msgobject_new(playback_stream);
1061     s->parent.parent.parent.free = playback_stream_free;
1062     s->parent.parent.process_msg = playback_stream_process_msg;
1063     s->connection = c;
1064     s->syncid = syncid;
1065     s->sink_input = sink_input;
1066     s->is_underrun = TRUE;
1067     s->drain_request = FALSE;
1068     pa_atomic_store(&s->missing, 0);
1069     s->buffer_attr = *a;
1070     s->adjust_latency = adjust_latency;
1071     s->early_requests = early_requests;
1072
1073     s->sink_input->parent.process_msg = sink_input_process_msg;
1074     s->sink_input->pop = sink_input_pop_cb;
1075     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1076     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1077     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1078     s->sink_input->kill = sink_input_kill_cb;
1079     s->sink_input->moving = sink_input_moving_cb;
1080     s->sink_input->suspend = sink_input_suspend_cb;
1081     s->sink_input->send_event = sink_input_send_event_cb;
1082     s->sink_input->userdata = s;
1083
1084     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1085
1086     fix_playback_buffer_attr(s);
1087
1088     pa_sink_input_get_silence(sink_input, &silence);
1089     s->memblockq = pa_memblockq_new(
1090             start_index,
1091             s->buffer_attr.maxlength,
1092             s->buffer_attr.tlength,
1093             pa_frame_size(&sink_input->sample_spec),
1094             s->buffer_attr.prebuf,
1095             s->buffer_attr.minreq,
1096             0,
1097             &silence);
1098     pa_memblock_unref(silence.memblock);
1099
1100     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1101
1102     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1103
1104     *ss = s->sink_input->sample_spec;
1105     *map = s->sink_input->channel_map;
1106
1107     pa_idxset_put(c->output_streams, s, &s->index);
1108
1109     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1110                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1111                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1112                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1113                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1114
1115     pa_sink_input_put(s->sink_input);
1116     return s;
1117 }
1118
1119 /* Called from IO context */
1120 static void playback_stream_request_bytes(playback_stream *s) {
1121     size_t m, minreq;
1122     int previous_missing;
1123
1124     playback_stream_assert_ref(s);
1125
1126     m = pa_memblockq_pop_missing(s->memblockq);
1127
1128     if (m <= 0)
1129         return;
1130
1131 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1132
1133     previous_missing = pa_atomic_add(&s->missing, (int) m);
1134     minreq = pa_memblockq_get_minreq(s->memblockq);
1135
1136     if (pa_memblockq_prebuf_active(s->memblockq) ||
1137         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1138         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1139 }
1140
1141
1142 /* Called from main context */
1143 static void playback_stream_send_killed(playback_stream *p) {
1144     pa_tagstruct *t;
1145     playback_stream_assert_ref(p);
1146
1147     t = pa_tagstruct_new(NULL, 0);
1148     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1149     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1150     pa_tagstruct_putu32(t, p->index);
1151     pa_pstream_send_tagstruct(p->connection->pstream, t);
1152 }
1153
1154 /* Called from main context */
1155 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1156     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1157     pa_native_connection_assert_ref(c);
1158
1159     if (!c->protocol)
1160         return -1;
1161
1162     switch (code) {
1163
1164         case CONNECTION_MESSAGE_REVOKE:
1165             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1166             break;
1167
1168         case CONNECTION_MESSAGE_RELEASE:
1169             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1170             break;
1171     }
1172
1173     return 0;
1174 }
1175
1176 /* Called from main context */
1177 static void native_connection_unlink(pa_native_connection *c) {
1178     record_stream *r;
1179     output_stream *o;
1180
1181     pa_assert(c);
1182
1183     if (!c->protocol)
1184         return;
1185
1186     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1187
1188     if (c->options)
1189         pa_native_options_unref(c->options);
1190
1191     while ((r = pa_idxset_first(c->record_streams, NULL)))
1192         record_stream_unlink(r);
1193
1194     while ((o = pa_idxset_first(c->output_streams, NULL)))
1195         if (playback_stream_isinstance(o))
1196             playback_stream_unlink(PLAYBACK_STREAM(o));
1197         else
1198             upload_stream_unlink(UPLOAD_STREAM(o));
1199
1200     if (c->subscription)
1201         pa_subscription_free(c->subscription);
1202
1203     if (c->pstream)
1204         pa_pstream_unlink(c->pstream);
1205
1206     if (c->auth_timeout_event) {
1207         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1208         c->auth_timeout_event = NULL;
1209     }
1210
1211     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1212     c->protocol = NULL;
1213     pa_native_connection_unref(c);
1214 }
1215
1216 /* Called from main context */
1217 static void native_connection_free(pa_object *o) {
1218     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1219
1220     pa_assert(c);
1221
1222     native_connection_unlink(c);
1223
1224     pa_idxset_free(c->record_streams, NULL, NULL);
1225     pa_idxset_free(c->output_streams, NULL, NULL);
1226
1227     pa_pdispatch_unref(c->pdispatch);
1228     pa_pstream_unref(c->pstream);
1229     pa_client_free(c->client);
1230
1231     pa_xfree(c);
1232 }
1233
1234 /* Called from main context */
1235 static void native_connection_send_memblock(pa_native_connection *c) {
1236     uint32_t start;
1237     record_stream *r;
1238
1239     start = PA_IDXSET_INVALID;
1240     for (;;) {
1241         pa_memchunk chunk;
1242
1243         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1244             return;
1245
1246         if (start == PA_IDXSET_INVALID)
1247             start = c->rrobin_index;
1248         else if (start == c->rrobin_index)
1249             return;
1250
1251         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1252             pa_memchunk schunk = chunk;
1253
1254             if (schunk.length > r->buffer_attr.fragsize)
1255                 schunk.length = r->buffer_attr.fragsize;
1256
1257             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1258
1259             pa_memblockq_drop(r->memblockq, schunk.length);
1260             pa_memblock_unref(schunk.memblock);
1261
1262             return;
1263         }
1264     }
1265 }
1266
1267 /*** sink input callbacks ***/
1268
1269 /* Called from thread context */
1270 static void handle_seek(playback_stream *s, int64_t indexw) {
1271     playback_stream_assert_ref(s);
1272
1273 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1274
1275     if (s->sink_input->thread_info.underrun_for > 0) {
1276
1277 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1278
1279         if (pa_memblockq_is_readable(s->memblockq)) {
1280
1281             /* We just ended an underrun, let's ask the sink
1282              * for a complete rewind rewrite */
1283
1284             pa_log_debug("Requesting rewind due to end of underrun.");
1285             pa_sink_input_request_rewind(s->sink_input,
1286                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1287                                          FALSE, TRUE, FALSE);
1288         }
1289
1290     } else {
1291         int64_t indexr;
1292
1293         indexr = pa_memblockq_get_read_index(s->memblockq);
1294
1295         if (indexw < indexr) {
1296             /* OK, the sink already asked for this data, so
1297              * let's have it usk us again */
1298
1299             pa_log_debug("Requesting rewind due to rewrite.");
1300             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1301         }
1302     }
1303
1304     playback_stream_request_bytes(s);
1305 }
1306
1307 /* Called from thread context */
1308 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1309     pa_sink_input *i = PA_SINK_INPUT(o);
1310     playback_stream *s;
1311
1312     pa_sink_input_assert_ref(i);
1313     s = PLAYBACK_STREAM(i->userdata);
1314     playback_stream_assert_ref(s);
1315
1316     switch (code) {
1317
1318         case SINK_INPUT_MESSAGE_SEEK: {
1319             int64_t windex;
1320
1321             windex = pa_memblockq_get_write_index(s->memblockq);
1322
1323             /* The client side is incapable of accounting correctly
1324              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1325              * able to deal with that. */
1326
1327             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1328
1329             handle_seek(s, windex);
1330             return 0;
1331         }
1332
1333         case SINK_INPUT_MESSAGE_POST_DATA: {
1334             int64_t windex;
1335
1336             pa_assert(chunk);
1337
1338             windex = pa_memblockq_get_write_index(s->memblockq);
1339
1340 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1341
1342             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1343                 pa_log_warn("Failed to push data into queue");
1344                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1345                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1346             }
1347
1348             handle_seek(s, windex);
1349
1350 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1351
1352             return 0;
1353         }
1354
1355         case SINK_INPUT_MESSAGE_DRAIN:
1356         case SINK_INPUT_MESSAGE_FLUSH:
1357         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1358         case SINK_INPUT_MESSAGE_TRIGGER: {
1359
1360             int64_t windex;
1361             pa_sink_input *isync;
1362             void (*func)(pa_memblockq *bq);
1363
1364             switch  (code) {
1365                 case SINK_INPUT_MESSAGE_FLUSH:
1366                     func = pa_memblockq_flush_write;
1367                     break;
1368
1369                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1370                     func = pa_memblockq_prebuf_force;
1371                     break;
1372
1373                 case SINK_INPUT_MESSAGE_DRAIN:
1374                 case SINK_INPUT_MESSAGE_TRIGGER:
1375                     func = pa_memblockq_prebuf_disable;
1376                     break;
1377
1378                 default:
1379                     pa_assert_not_reached();
1380             }
1381
1382             windex = pa_memblockq_get_write_index(s->memblockq);
1383             func(s->memblockq);
1384             handle_seek(s, windex);
1385
1386             /* Do the same for all other members in the sync group */
1387             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1388                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1389                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1390                 func(ssync->memblockq);
1391                 handle_seek(ssync, windex);
1392             }
1393
1394             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1395                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1396                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1397                 func(ssync->memblockq);
1398                 handle_seek(ssync, windex);
1399             }
1400
1401             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1402                 if (!pa_memblockq_is_readable(s->memblockq))
1403                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1404                 else {
1405                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1406                     s->drain_request = TRUE;
1407                 }
1408             }
1409
1410             return 0;
1411         }
1412
1413         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1414             /* Atomically get a snapshot of all timing parameters... */
1415             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1416             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1417             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1418             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1419             s->underrun_for = s->sink_input->thread_info.underrun_for;
1420             s->playing_for = s->sink_input->thread_info.playing_for;
1421
1422             return 0;
1423
1424         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1425             int64_t windex;
1426
1427             windex = pa_memblockq_get_write_index(s->memblockq);
1428
1429             pa_memblockq_prebuf_force(s->memblockq);
1430
1431             handle_seek(s, windex);
1432
1433             /* Fall through to the default handler */
1434             break;
1435         }
1436
1437         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1438             pa_usec_t *r = userdata;
1439
1440             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1441
1442             /* Fall through, the default handler will add in the extra
1443              * latency added by the resampler */
1444             break;
1445         }
1446
1447         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1448             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1449             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1450             return 0;
1451         }
1452     }
1453
1454     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1455 }
1456
1457 /* Called from thread context */
1458 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1459     playback_stream *s;
1460
1461     pa_sink_input_assert_ref(i);
1462     s = PLAYBACK_STREAM(i->userdata);
1463     playback_stream_assert_ref(s);
1464     pa_assert(chunk);
1465
1466 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1467
1468     if (pa_memblockq_is_readable(s->memblockq))
1469         s->is_underrun = FALSE;
1470     else {
1471         if (!s->is_underrun)
1472             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1473
1474         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1475             s->drain_request = FALSE;
1476             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1477         } else if (!s->is_underrun)
1478             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1479
1480         s->is_underrun = TRUE;
1481
1482         playback_stream_request_bytes(s);
1483     }
1484
1485     /* This call will not fail with prebuf=0, hence we check for
1486        underrun explicitly above */
1487     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1488         return -1;
1489
1490     chunk->length = PA_MIN(nbytes, chunk->length);
1491
1492     if (i->thread_info.underrun_for > 0)
1493         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1494
1495     pa_memblockq_drop(s->memblockq, chunk->length);
1496     playback_stream_request_bytes(s);
1497
1498     return 0;
1499 }
1500
1501 /* Called from thread context */
1502 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1503     playback_stream *s;
1504
1505     pa_sink_input_assert_ref(i);
1506     s = PLAYBACK_STREAM(i->userdata);
1507     playback_stream_assert_ref(s);
1508
1509     /* If we are in an underrun, then we don't rewind */
1510     if (i->thread_info.underrun_for > 0)
1511         return;
1512
1513     pa_memblockq_rewind(s->memblockq, nbytes);
1514 }
1515
1516 /* Called from thread context */
1517 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1518     playback_stream *s;
1519
1520     pa_sink_input_assert_ref(i);
1521     s = PLAYBACK_STREAM(i->userdata);
1522     playback_stream_assert_ref(s);
1523
1524     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1525 }
1526
1527 /* Called from thread context */
1528 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1529     playback_stream *s;
1530     size_t new_tlength, old_tlength;
1531
1532     pa_sink_input_assert_ref(i);
1533     s = PLAYBACK_STREAM(i->userdata);
1534     playback_stream_assert_ref(s);
1535
1536     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1537     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1538
1539     if (old_tlength < new_tlength) {
1540         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1541         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1542         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1543
1544         if (new_tlength == old_tlength)
1545             pa_log_debug("Failed to increase tlength");
1546         else {
1547             pa_log_debug("Notifying client about increased tlength");
1548             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1549         }
1550     }
1551 }
1552
1553 /* Called from main context */
1554 static void sink_input_kill_cb(pa_sink_input *i) {
1555     playback_stream *s;
1556
1557     pa_sink_input_assert_ref(i);
1558     s = PLAYBACK_STREAM(i->userdata);
1559     playback_stream_assert_ref(s);
1560
1561     playback_stream_send_killed(s);
1562     playback_stream_unlink(s);
1563 }
1564
1565 /* Called from main context */
1566 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1567     playback_stream *s;
1568     pa_tagstruct *t;
1569
1570     pa_sink_input_assert_ref(i);
1571     s = PLAYBACK_STREAM(i->userdata);
1572     playback_stream_assert_ref(s);
1573
1574     if (s->connection->version < 15)
1575       return;
1576
1577     t = pa_tagstruct_new(NULL, 0);
1578     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1579     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1580     pa_tagstruct_putu32(t, s->index);
1581     pa_tagstruct_puts(t, event);
1582     pa_tagstruct_put_proplist(t, pl);
1583     pa_pstream_send_tagstruct(s->connection->pstream, t);
1584 }
1585
1586 /* Called from main context */
1587 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1588     playback_stream *s;
1589     pa_tagstruct *t;
1590
1591     pa_sink_input_assert_ref(i);
1592     s = PLAYBACK_STREAM(i->userdata);
1593     playback_stream_assert_ref(s);
1594
1595     if (s->connection->version < 12)
1596       return;
1597
1598     t = pa_tagstruct_new(NULL, 0);
1599     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1600     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1601     pa_tagstruct_putu32(t, s->index);
1602     pa_tagstruct_put_boolean(t, suspend);
1603     pa_pstream_send_tagstruct(s->connection->pstream, t);
1604 }
1605
1606 /* Called from main context */
1607 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1608     playback_stream *s;
1609     pa_tagstruct *t;
1610
1611     pa_sink_input_assert_ref(i);
1612     s = PLAYBACK_STREAM(i->userdata);
1613     playback_stream_assert_ref(s);
1614
1615     fix_playback_buffer_attr(s);
1616     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1617     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1618
1619     if (s->connection->version < 12)
1620       return;
1621
1622     t = pa_tagstruct_new(NULL, 0);
1623     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1624     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1625     pa_tagstruct_putu32(t, s->index);
1626     pa_tagstruct_putu32(t, dest->index);
1627     pa_tagstruct_puts(t, dest->name);
1628     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1629
1630     if (s->connection->version >= 13) {
1631         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1632         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1633         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1634         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1635         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1636     }
1637
1638     pa_pstream_send_tagstruct(s->connection->pstream, t);
1639 }
1640
1641 /*** source_output callbacks ***/
1642
1643 /* Called from thread context */
1644 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1645     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1646     record_stream *s;
1647
1648     pa_source_output_assert_ref(o);
1649     s = RECORD_STREAM(o->userdata);
1650     record_stream_assert_ref(s);
1651
1652     switch (code) {
1653         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1654             /* Atomically get a snapshot of all timing parameters... */
1655             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1656             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1657             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1658             return 0;
1659     }
1660
1661     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1662 }
1663
1664 /* Called from thread context */
1665 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1666     record_stream *s;
1667
1668     pa_source_output_assert_ref(o);
1669     s = RECORD_STREAM(o->userdata);
1670     record_stream_assert_ref(s);
1671     pa_assert(chunk);
1672
1673     pa_atomic_add(&s->on_the_fly, chunk->length);
1674     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1675 }
1676
1677 static void source_output_kill_cb(pa_source_output *o) {
1678     record_stream *s;
1679
1680     pa_source_output_assert_ref(o);
1681     s = RECORD_STREAM(o->userdata);
1682     record_stream_assert_ref(s);
1683
1684     record_stream_send_killed(s);
1685     record_stream_unlink(s);
1686 }
1687
1688 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1689     record_stream *s;
1690
1691     pa_source_output_assert_ref(o);
1692     s = RECORD_STREAM(o->userdata);
1693     record_stream_assert_ref(s);
1694
1695     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1696
1697     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1698 }
1699
1700 /* Called from main context */
1701 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1702     record_stream *s;
1703     pa_tagstruct *t;
1704
1705     pa_source_output_assert_ref(o);
1706     s = RECORD_STREAM(o->userdata);
1707     record_stream_assert_ref(s);
1708
1709     if (s->connection->version < 15)
1710       return;
1711
1712     t = pa_tagstruct_new(NULL, 0);
1713     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1714     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1715     pa_tagstruct_putu32(t, s->index);
1716     pa_tagstruct_puts(t, event);
1717     pa_tagstruct_put_proplist(t, pl);
1718     pa_pstream_send_tagstruct(s->connection->pstream, t);
1719 }
1720
1721 /* Called from main context */
1722 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1723     record_stream *s;
1724     pa_tagstruct *t;
1725
1726     pa_source_output_assert_ref(o);
1727     s = RECORD_STREAM(o->userdata);
1728     record_stream_assert_ref(s);
1729
1730     if (s->connection->version < 12)
1731       return;
1732
1733     t = pa_tagstruct_new(NULL, 0);
1734     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1735     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1736     pa_tagstruct_putu32(t, s->index);
1737     pa_tagstruct_put_boolean(t, suspend);
1738     pa_pstream_send_tagstruct(s->connection->pstream, t);
1739 }
1740
1741 /* Called from main context */
1742 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1743     record_stream *s;
1744     pa_tagstruct *t;
1745
1746     pa_source_output_assert_ref(o);
1747     s = RECORD_STREAM(o->userdata);
1748     record_stream_assert_ref(s);
1749
1750     fix_record_buffer_attr_pre(s);
1751     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1752     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1753     fix_record_buffer_attr_post(s);
1754
1755     if (s->connection->version < 12)
1756       return;
1757
1758     t = pa_tagstruct_new(NULL, 0);
1759     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1760     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1761     pa_tagstruct_putu32(t, s->index);
1762     pa_tagstruct_putu32(t, dest->index);
1763     pa_tagstruct_puts(t, dest->name);
1764     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1765
1766     if (s->connection->version >= 13) {
1767         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1768         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1769         pa_tagstruct_put_usec(t, s->configured_source_latency);
1770     }
1771
1772     pa_pstream_send_tagstruct(s->connection->pstream, t);
1773 }
1774
1775 /*** pdispatch callbacks ***/
1776
1777 static void protocol_error(pa_native_connection *c) {
1778     pa_log("protocol error, kicking client");
1779     native_connection_unlink(c);
1780 }
1781
1782 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1783 if (!(expression)) { \
1784     pa_pstream_send_error((pstream), (tag), (error)); \
1785     return; \
1786 } \
1787 } while(0);
1788
1789 static pa_tagstruct *reply_new(uint32_t tag) {
1790     pa_tagstruct *reply;
1791
1792     reply = pa_tagstruct_new(NULL, 0);
1793     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1794     pa_tagstruct_putu32(reply, tag);
1795     return reply;
1796 }
1797
1798 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1799     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1800     playback_stream *s;
1801     uint32_t sink_index, syncid, missing;
1802     pa_buffer_attr attr;
1803     const char *name = NULL, *sink_name;
1804     pa_sample_spec ss;
1805     pa_channel_map map;
1806     pa_tagstruct *reply;
1807     pa_sink *sink = NULL;
1808     pa_cvolume volume;
1809     pa_bool_t
1810         corked = FALSE,
1811         no_remap = FALSE,
1812         no_remix = FALSE,
1813         fix_format = FALSE,
1814         fix_rate = FALSE,
1815         fix_channels = FALSE,
1816         no_move = FALSE,
1817         variable_rate = FALSE,
1818         muted = FALSE,
1819         adjust_latency = FALSE,
1820         early_requests = FALSE,
1821         dont_inhibit_auto_suspend = FALSE,
1822         muted_set = FALSE,
1823         fail_on_suspend = FALSE;
1824     pa_sink_input_flags_t flags = 0;
1825     pa_proplist *p;
1826     pa_bool_t volume_set = TRUE;
1827     int ret = PA_ERR_INVALID;
1828
1829     pa_native_connection_assert_ref(c);
1830     pa_assert(t);
1831     memset(&attr, 0, sizeof(attr));
1832
1833     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1834         pa_tagstruct_get(
1835                 t,
1836                 PA_TAG_SAMPLE_SPEC, &ss,
1837                 PA_TAG_CHANNEL_MAP, &map,
1838                 PA_TAG_U32, &sink_index,
1839                 PA_TAG_STRING, &sink_name,
1840                 PA_TAG_U32, &attr.maxlength,
1841                 PA_TAG_BOOLEAN, &corked,
1842                 PA_TAG_U32, &attr.tlength,
1843                 PA_TAG_U32, &attr.prebuf,
1844                 PA_TAG_U32, &attr.minreq,
1845                 PA_TAG_U32, &syncid,
1846                 PA_TAG_CVOLUME, &volume,
1847                 PA_TAG_INVALID) < 0) {
1848
1849         protocol_error(c);
1850         return;
1851     }
1852
1853     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1854     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1855     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1856     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1857     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1858     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1859     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1860     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1861
1862     p = pa_proplist_new();
1863
1864     if (name)
1865         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1866
1867     if (c->version >= 12)  {
1868         /* Since 0.9.8 the user can ask for a couple of additional flags */
1869
1870         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1871             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1872             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1873             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1874             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1875             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1876             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1877
1878             protocol_error(c);
1879             pa_proplist_free(p);
1880             return;
1881         }
1882     }
1883
1884     if (c->version >= 13) {
1885
1886         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1887             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1888             pa_tagstruct_get_proplist(t, p) < 0) {
1889             protocol_error(c);
1890             pa_proplist_free(p);
1891             return;
1892         }
1893     }
1894
1895     if (c->version >= 14) {
1896
1897         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1898             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1899             protocol_error(c);
1900             pa_proplist_free(p);
1901             return;
1902         }
1903     }
1904
1905     if (c->version >= 15) {
1906
1907         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1908             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1909             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1910             protocol_error(c);
1911             pa_proplist_free(p);
1912             return;
1913         }
1914     }
1915
1916     if (!pa_tagstruct_eof(t)) {
1917         protocol_error(c);
1918         pa_proplist_free(p);
1919         return;
1920     }
1921
1922     if (sink_index != PA_INVALID_INDEX) {
1923
1924         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1925             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1926             pa_proplist_free(p);
1927             return;
1928         }
1929
1930     } else if (sink_name) {
1931
1932         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1933             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1934             pa_proplist_free(p);
1935             return;
1936         }
1937     }
1938
1939     flags =
1940         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1941         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1942         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1943         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1944         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1945         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1946         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1947         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1948         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1949         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1950
1951     /* Only since protocol version 15 there's a seperate muted_set
1952      * flag. For older versions we synthesize it here */
1953     muted_set = muted_set || muted;
1954
1955     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1956     pa_proplist_free(p);
1957
1958     CHECK_VALIDITY(c->pstream, s, tag, ret);
1959
1960     reply = reply_new(tag);
1961     pa_tagstruct_putu32(reply, s->index);
1962     pa_assert(s->sink_input);
1963     pa_tagstruct_putu32(reply, s->sink_input->index);
1964     pa_tagstruct_putu32(reply, missing);
1965
1966 /*     pa_log("initial request is %u", missing); */
1967
1968     if (c->version >= 9) {
1969         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1970
1971         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1972         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1973         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1974         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1975     }
1976
1977     if (c->version >= 12) {
1978         /* Since 0.9.8 we support sending the chosen sample
1979          * spec/channel map/device/suspend status back to the
1980          * client */
1981
1982         pa_tagstruct_put_sample_spec(reply, &ss);
1983         pa_tagstruct_put_channel_map(reply, &map);
1984
1985         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1986         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1987
1988         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
1989     }
1990
1991     if (c->version >= 13)
1992         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
1993
1994     pa_pstream_send_tagstruct(c->pstream, reply);
1995 }
1996
1997 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1998     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1999     uint32_t channel;
2000
2001     pa_native_connection_assert_ref(c);
2002     pa_assert(t);
2003
2004     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2005         !pa_tagstruct_eof(t)) {
2006         protocol_error(c);
2007         return;
2008     }
2009
2010     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2011
2012     switch (command) {
2013
2014         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2015             playback_stream *s;
2016             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2017                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2018                 return;
2019             }
2020
2021             playback_stream_unlink(s);
2022             break;
2023         }
2024
2025         case PA_COMMAND_DELETE_RECORD_STREAM: {
2026             record_stream *s;
2027             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2028                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2029                 return;
2030             }
2031
2032             record_stream_unlink(s);
2033             break;
2034         }
2035
2036         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2037             upload_stream *s;
2038
2039             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2040                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2041                 return;
2042             }
2043
2044             upload_stream_unlink(s);
2045             break;
2046         }
2047
2048         default:
2049             pa_assert_not_reached();
2050     }
2051
2052     pa_pstream_send_simple_ack(c->pstream, tag);
2053 }
2054
2055 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2056     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2057     record_stream *s;
2058     pa_buffer_attr attr;
2059     uint32_t source_index;
2060     const char *name = NULL, *source_name;
2061     pa_sample_spec ss;
2062     pa_channel_map map;
2063     pa_tagstruct *reply;
2064     pa_source *source = NULL;
2065     pa_bool_t
2066         corked = FALSE,
2067         no_remap = FALSE,
2068         no_remix = FALSE,
2069         fix_format = FALSE,
2070         fix_rate = FALSE,
2071         fix_channels = FALSE,
2072         no_move = FALSE,
2073         variable_rate = FALSE,
2074         adjust_latency = FALSE,
2075         peak_detect = FALSE,
2076         early_requests = FALSE,
2077         dont_inhibit_auto_suspend = FALSE,
2078         fail_on_suspend = FALSE;
2079     pa_source_output_flags_t flags = 0;
2080     pa_proplist *p;
2081     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2082     pa_sink_input *direct_on_input = NULL;
2083     int ret = PA_ERR_INVALID;
2084
2085     pa_native_connection_assert_ref(c);
2086     pa_assert(t);
2087
2088     memset(&attr, 0, sizeof(attr));
2089
2090     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2091         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2092         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2093         pa_tagstruct_getu32(t, &source_index) < 0 ||
2094         pa_tagstruct_gets(t, &source_name) < 0 ||
2095         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2096         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2097         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2098         protocol_error(c);
2099         return;
2100     }
2101
2102     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2103     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2104     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2105     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2106     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2107     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2108     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2109
2110     p = pa_proplist_new();
2111
2112     if (name)
2113         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2114
2115     if (c->version >= 12)  {
2116         /* Since 0.9.8 the user can ask for a couple of additional flags */
2117
2118         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2119             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2120             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2121             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2122             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2123             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2124             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2125
2126             protocol_error(c);
2127             pa_proplist_free(p);
2128             return;
2129         }
2130     }
2131
2132     if (c->version >= 13) {
2133
2134         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2135             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2136             pa_tagstruct_get_proplist(t, p) < 0 ||
2137             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2138             protocol_error(c);
2139             pa_proplist_free(p);
2140             return;
2141         }
2142     }
2143
2144     if (c->version >= 14) {
2145
2146         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2147             protocol_error(c);
2148             pa_proplist_free(p);
2149             return;
2150         }
2151     }
2152
2153     if (c->version >= 15) {
2154
2155         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2156             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2157             protocol_error(c);
2158             pa_proplist_free(p);
2159             return;
2160         }
2161     }
2162
2163     if (!pa_tagstruct_eof(t)) {
2164         protocol_error(c);
2165         pa_proplist_free(p);
2166         return;
2167     }
2168
2169     if (source_index != PA_INVALID_INDEX) {
2170
2171         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2172             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2173             pa_proplist_free(p);
2174             return;
2175         }
2176
2177     } else if (source_name) {
2178
2179         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2180             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2181             pa_proplist_free(p);
2182             return;
2183         }
2184     }
2185
2186     if (direct_on_input_idx != PA_INVALID_INDEX) {
2187
2188         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2189             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2190             pa_proplist_free(p);
2191             return;
2192         }
2193     }
2194
2195     flags =
2196         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2197         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2198         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2199         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2200         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2201         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2202         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2203         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2204         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2205         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2206
2207     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2208     pa_proplist_free(p);
2209
2210     CHECK_VALIDITY(c->pstream, s, tag, ret);
2211
2212     reply = reply_new(tag);
2213     pa_tagstruct_putu32(reply, s->index);
2214     pa_assert(s->source_output);
2215     pa_tagstruct_putu32(reply, s->source_output->index);
2216
2217     if (c->version >= 9) {
2218         /* Since 0.9 we support sending the buffer metrics back to the client */
2219
2220         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2221         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2222     }
2223
2224     if (c->version >= 12) {
2225         /* Since 0.9.8 we support sending the chosen sample
2226          * spec/channel map/device/suspend status back to the
2227          * client */
2228
2229         pa_tagstruct_put_sample_spec(reply, &ss);
2230         pa_tagstruct_put_channel_map(reply, &map);
2231
2232         pa_tagstruct_putu32(reply, s->source_output->source->index);
2233         pa_tagstruct_puts(reply, s->source_output->source->name);
2234
2235         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2236     }
2237
2238     if (c->version >= 13)
2239         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2240
2241     pa_pstream_send_tagstruct(c->pstream, reply);
2242 }
2243
2244 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2245     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2246     int ret;
2247
2248     pa_native_connection_assert_ref(c);
2249     pa_assert(t);
2250
2251     if (!pa_tagstruct_eof(t)) {
2252         protocol_error(c);
2253         return;
2254     }
2255
2256     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2257     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2258     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2259
2260     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2261 }
2262
2263 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2264     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2265     const void*cookie;
2266     pa_tagstruct *reply;
2267     pa_bool_t shm_on_remote = FALSE, do_shm;
2268
2269     pa_native_connection_assert_ref(c);
2270     pa_assert(t);
2271
2272     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2273         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2274         !pa_tagstruct_eof(t)) {
2275         protocol_error(c);
2276         return;
2277     }
2278
2279     /* Minimum supported version */
2280     if (c->version < 8) {
2281         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2282         return;
2283     }
2284
2285     /* Starting with protocol version 13 the MSB of the version tag
2286        reflects if shm is available for this pa_native_connection or
2287        not. */
2288     if (c->version >= 13) {
2289         shm_on_remote = !!(c->version & 0x80000000U);
2290         c->version &= 0x7FFFFFFFU;
2291     }
2292
2293     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2294
2295     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2296
2297     if (!c->authorized) {
2298         pa_bool_t success = FALSE;
2299
2300 #ifdef HAVE_CREDS
2301         const pa_creds *creds;
2302
2303         if ((creds = pa_pdispatch_creds(pd))) {
2304             if (creds->uid == getuid())
2305                 success = TRUE;
2306             else if (c->options->auth_group) {
2307                 int r;
2308                 gid_t gid;
2309
2310                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2311                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2312                 else if (gid == creds->gid)
2313                     success = TRUE;
2314
2315                 if (!success) {
2316                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2317                         pa_log_warn("Failed to check group membership.");
2318                     else if (r > 0)
2319                         success = TRUE;
2320                 }
2321             }
2322
2323             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2324                         (unsigned long) creds->uid,
2325                         (unsigned long) creds->gid,
2326                         (int) success);
2327         }
2328 #endif
2329
2330         if (!success && c->options->auth_cookie) {
2331             const uint8_t *ac;
2332
2333             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2334                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2335                     success = TRUE;
2336         }
2337
2338         if (!success) {
2339             pa_log_warn("Denied access to client with invalid authorization data.");
2340             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2341             return;
2342         }
2343
2344         c->authorized = TRUE;
2345         if (c->auth_timeout_event) {
2346             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2347             c->auth_timeout_event = NULL;
2348         }
2349     }
2350
2351     /* Enable shared memory support if possible */
2352     do_shm =
2353         pa_mempool_is_shared(c->protocol->core->mempool) &&
2354         c->is_local;
2355
2356     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2357
2358     if (do_shm)
2359         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2360             do_shm = FALSE;
2361
2362 #ifdef HAVE_CREDS
2363     if (do_shm) {
2364         /* Only enable SHM if both sides are owned by the same
2365          * user. This is a security measure because otherwise data
2366          * private to the user might leak. */
2367
2368         const pa_creds *creds;
2369         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2370             do_shm = FALSE;
2371     }
2372 #endif
2373
2374     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2375     pa_pstream_enable_shm(c->pstream, do_shm);
2376
2377     reply = reply_new(tag);
2378     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2379
2380 #ifdef HAVE_CREDS
2381 {
2382     /* SHM support is only enabled after both sides made sure they are the same user. */
2383
2384     pa_creds ucred;
2385
2386     ucred.uid = getuid();
2387     ucred.gid = getgid();
2388
2389     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2390 }
2391 #else
2392     pa_pstream_send_tagstruct(c->pstream, reply);
2393 #endif
2394 }
2395
2396 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2397     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2398     const char *name = NULL;
2399     pa_proplist *p;
2400     pa_tagstruct *reply;
2401
2402     pa_native_connection_assert_ref(c);
2403     pa_assert(t);
2404
2405     p = pa_proplist_new();
2406
2407     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2408         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2409         !pa_tagstruct_eof(t)) {
2410
2411         protocol_error(c);
2412         pa_proplist_free(p);
2413         return;
2414     }
2415
2416     if (name)
2417         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2418             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2419             pa_proplist_free(p);
2420             return;
2421         }
2422
2423     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2424     pa_proplist_free(p);
2425
2426     reply = reply_new(tag);
2427
2428     if (c->version >= 13)
2429         pa_tagstruct_putu32(reply, c->client->index);
2430
2431     pa_pstream_send_tagstruct(c->pstream, reply);
2432 }
2433
2434 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2435     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2436     const char *name;
2437     uint32_t idx = PA_IDXSET_INVALID;
2438
2439     pa_native_connection_assert_ref(c);
2440     pa_assert(t);
2441
2442     if (pa_tagstruct_gets(t, &name) < 0 ||
2443         !pa_tagstruct_eof(t)) {
2444         protocol_error(c);
2445         return;
2446     }
2447
2448     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2449     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2450
2451     if (command == PA_COMMAND_LOOKUP_SINK) {
2452         pa_sink *sink;
2453         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2454             idx = sink->index;
2455     } else {
2456         pa_source *source;
2457         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2458         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2459             idx = source->index;
2460     }
2461
2462     if (idx == PA_IDXSET_INVALID)
2463         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2464     else {
2465         pa_tagstruct *reply;
2466         reply = reply_new(tag);
2467         pa_tagstruct_putu32(reply, idx);
2468         pa_pstream_send_tagstruct(c->pstream, reply);
2469     }
2470 }
2471
2472 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2473     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2474     uint32_t idx;
2475     playback_stream *s;
2476
2477     pa_native_connection_assert_ref(c);
2478     pa_assert(t);
2479
2480     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2481         !pa_tagstruct_eof(t)) {
2482         protocol_error(c);
2483         return;
2484     }
2485
2486     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2487     s = pa_idxset_get_by_index(c->output_streams, idx);
2488     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2489     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2490
2491     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2492 }
2493
2494 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2495     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2496     pa_tagstruct *reply;
2497     const pa_mempool_stat *stat;
2498
2499     pa_native_connection_assert_ref(c);
2500     pa_assert(t);
2501
2502     if (!pa_tagstruct_eof(t)) {
2503         protocol_error(c);
2504         return;
2505     }
2506
2507     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2508
2509     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2510
2511     reply = reply_new(tag);
2512     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2513     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2514     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2515     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2516     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2517     pa_pstream_send_tagstruct(c->pstream, reply);
2518 }
2519
2520 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2521     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2522     pa_tagstruct *reply;
2523     playback_stream *s;
2524     struct timeval tv, now;
2525     uint32_t idx;
2526
2527     pa_native_connection_assert_ref(c);
2528     pa_assert(t);
2529
2530     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2531         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2532         !pa_tagstruct_eof(t)) {
2533         protocol_error(c);
2534         return;
2535     }
2536
2537     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2538     s = pa_idxset_get_by_index(c->output_streams, idx);
2539     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2540     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2541
2542     /* Get an atomic snapshot of all timing parameters */
2543     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2544
2545     reply = reply_new(tag);
2546     pa_tagstruct_put_usec(reply,
2547                           s->current_sink_latency +
2548                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec));
2549     pa_tagstruct_put_usec(reply, 0);
2550     pa_tagstruct_put_boolean(reply,
2551                              s->playing_for > 0 &&
2552                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2553                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2554     pa_tagstruct_put_timeval(reply, &tv);
2555     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2556     pa_tagstruct_puts64(reply, s->write_index);
2557     pa_tagstruct_puts64(reply, s->read_index);
2558
2559     if (c->version >= 13) {
2560         pa_tagstruct_putu64(reply, s->underrun_for);
2561         pa_tagstruct_putu64(reply, s->playing_for);
2562     }
2563
2564     pa_pstream_send_tagstruct(c->pstream, reply);
2565 }
2566
2567 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2568     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2569     pa_tagstruct *reply;
2570     record_stream *s;
2571     struct timeval tv, now;
2572     uint32_t idx;
2573
2574     pa_native_connection_assert_ref(c);
2575     pa_assert(t);
2576
2577     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2578         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2579         !pa_tagstruct_eof(t)) {
2580         protocol_error(c);
2581         return;
2582     }
2583
2584     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2585     s = pa_idxset_get_by_index(c->record_streams, idx);
2586     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2587
2588     /* Get an atomic snapshot of all timing parameters */
2589     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2590
2591     reply = reply_new(tag);
2592     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2593     pa_tagstruct_put_usec(reply,
2594                           s->current_source_latency +
2595                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2596     pa_tagstruct_put_boolean(reply,
2597                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2598                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2599     pa_tagstruct_put_timeval(reply, &tv);
2600     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2601     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2602     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2603     pa_pstream_send_tagstruct(c->pstream, reply);
2604 }
2605
2606 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2607     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2608     upload_stream *s;
2609     uint32_t length;
2610     const char *name = NULL;
2611     pa_sample_spec ss;
2612     pa_channel_map map;
2613     pa_tagstruct *reply;
2614     pa_proplist *p;
2615
2616     pa_native_connection_assert_ref(c);
2617     pa_assert(t);
2618
2619     if (pa_tagstruct_gets(t, &name) < 0 ||
2620         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2621         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2622         pa_tagstruct_getu32(t, &length) < 0) {
2623         protocol_error(c);
2624         return;
2625     }
2626
2627     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2628     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2629     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2630     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2631     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2632     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2633
2634     p = pa_proplist_new();
2635
2636     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2637         !pa_tagstruct_eof(t)) {
2638
2639         protocol_error(c);
2640         pa_proplist_free(p);
2641         return;
2642     }
2643
2644     if (c->version < 13)
2645         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2646     else if (!name)
2647         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2648             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2649
2650     if (!name || !pa_namereg_is_valid_name(name)) {
2651         pa_proplist_free(p);
2652         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2653     }
2654
2655     s = upload_stream_new(c, &ss, &map, name, length, p);
2656     pa_proplist_free(p);
2657
2658     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2659
2660     reply = reply_new(tag);
2661     pa_tagstruct_putu32(reply, s->index);
2662     pa_tagstruct_putu32(reply, length);
2663     pa_pstream_send_tagstruct(c->pstream, reply);
2664 }
2665
2666 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2667     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2668     uint32_t channel;
2669     upload_stream *s;
2670     uint32_t idx;
2671
2672     pa_native_connection_assert_ref(c);
2673     pa_assert(t);
2674
2675     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2676         !pa_tagstruct_eof(t)) {
2677         protocol_error(c);
2678         return;
2679     }
2680
2681     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2682
2683     s = pa_idxset_get_by_index(c->output_streams, channel);
2684     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2685     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2686
2687     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2688         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2689     else
2690         pa_pstream_send_simple_ack(c->pstream, tag);
2691
2692     upload_stream_unlink(s);
2693 }
2694
2695 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2696     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2697     uint32_t sink_index;
2698     pa_volume_t volume;
2699     pa_sink *sink;
2700     const char *name, *sink_name;
2701     uint32_t idx;
2702     pa_proplist *p;
2703     pa_tagstruct *reply;
2704
2705     pa_native_connection_assert_ref(c);
2706     pa_assert(t);
2707
2708     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2709
2710     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2711         pa_tagstruct_gets(t, &sink_name) < 0 ||
2712         pa_tagstruct_getu32(t, &volume) < 0 ||
2713         pa_tagstruct_gets(t, &name) < 0) {
2714         protocol_error(c);
2715         return;
2716     }
2717
2718     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2719     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2720     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2721     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2722
2723     if (sink_index != PA_INVALID_INDEX)
2724         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2725     else
2726         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2727
2728     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2729
2730     p = pa_proplist_new();
2731
2732     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2733         !pa_tagstruct_eof(t)) {
2734         protocol_error(c);
2735         pa_proplist_free(p);
2736         return;
2737     }
2738
2739     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2740
2741     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2742         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2743         pa_proplist_free(p);
2744         return;
2745     }
2746
2747     pa_proplist_free(p);
2748
2749     reply = reply_new(tag);
2750
2751     if (c->version >= 13)
2752         pa_tagstruct_putu32(reply, idx);
2753
2754     pa_pstream_send_tagstruct(c->pstream, reply);
2755 }
2756
2757 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2758     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2759     const char *name;
2760
2761     pa_native_connection_assert_ref(c);
2762     pa_assert(t);
2763
2764     if (pa_tagstruct_gets(t, &name) < 0 ||
2765         !pa_tagstruct_eof(t)) {
2766         protocol_error(c);
2767         return;
2768     }
2769
2770     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2771     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2772
2773     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2774         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2775         return;
2776     }
2777
2778     pa_pstream_send_simple_ack(c->pstream, tag);
2779 }
2780
2781 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2782     pa_assert(c);
2783     pa_assert(fixed);
2784     pa_assert(original);
2785
2786     *fixed = *original;
2787
2788     if (c->version < 12) {
2789         /* Before protocol version 12 we didn't support S32 samples,
2790          * so we need to lie about this to the client */
2791
2792         if (fixed->format == PA_SAMPLE_S32LE)
2793             fixed->format = PA_SAMPLE_FLOAT32LE;
2794         if (fixed->format == PA_SAMPLE_S32BE)
2795             fixed->format = PA_SAMPLE_FLOAT32BE;
2796     }
2797
2798     if (c->version < 15) {
2799         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2800             fixed->format = PA_SAMPLE_FLOAT32LE;
2801         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2802             fixed->format = PA_SAMPLE_FLOAT32BE;
2803     }
2804 }
2805
2806 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2807     pa_sample_spec fixed_ss;
2808
2809     pa_assert(t);
2810     pa_sink_assert_ref(sink);
2811
2812     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2813
2814     pa_tagstruct_put(
2815         t,
2816         PA_TAG_U32, sink->index,
2817         PA_TAG_STRING, sink->name,
2818         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2819         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2820         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2821         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2822         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE, FALSE),
2823         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2824         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2825         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2826         PA_TAG_USEC, pa_sink_get_latency(sink),
2827         PA_TAG_STRING, sink->driver,
2828         PA_TAG_U32, sink->flags,
2829         PA_TAG_INVALID);
2830
2831     if (c->version >= 13) {
2832         pa_tagstruct_put_proplist(t, sink->proplist);
2833         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2834     }
2835
2836     if (c->version >= 15) {
2837         pa_tagstruct_put_volume(t, sink->base_volume);
2838         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2839             pa_log_error("Internal sink state is invalid.");
2840         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2841         pa_tagstruct_putu32(t, sink->n_volume_steps);
2842         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2843     }
2844 }
2845
2846 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2847     pa_sample_spec fixed_ss;
2848
2849     pa_assert(t);
2850     pa_source_assert_ref(source);
2851
2852     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2853
2854     pa_tagstruct_put(
2855         t,
2856         PA_TAG_U32, source->index,
2857         PA_TAG_STRING, source->name,
2858         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2859         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2860         PA_TAG_CHANNEL_MAP, &source->channel_map,
2861         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2862         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2863         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2864         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2865         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2866         PA_TAG_USEC, pa_source_get_latency(source),
2867         PA_TAG_STRING, source->driver,
2868         PA_TAG_U32, source->flags,
2869         PA_TAG_INVALID);
2870
2871     if (c->version >= 13) {
2872         pa_tagstruct_put_proplist(t, source->proplist);
2873         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2874     }
2875
2876     if (c->version >= 15) {
2877         pa_tagstruct_put_volume(t, source->base_volume);
2878         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2879             pa_log_error("Internal source state is invalid.");
2880         pa_tagstruct_putu32(t, pa_source_get_state(source));
2881         pa_tagstruct_putu32(t, source->n_volume_steps);
2882         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2883     }
2884 }
2885
2886 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2887     pa_assert(t);
2888     pa_assert(client);
2889
2890     pa_tagstruct_putu32(t, client->index);
2891     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2892     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2893     pa_tagstruct_puts(t, client->driver);
2894
2895     if (c->version >= 13)
2896         pa_tagstruct_put_proplist(t, client->proplist);
2897 }
2898
2899 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2900     void *state = NULL;
2901     pa_card_profile *p;
2902
2903     pa_assert(t);
2904     pa_assert(card);
2905
2906     pa_tagstruct_putu32(t, card->index);
2907     pa_tagstruct_puts(t, card->name);
2908     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2909     pa_tagstruct_puts(t, card->driver);
2910
2911     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2912
2913     if (card->profiles) {
2914         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2915             pa_tagstruct_puts(t, p->name);
2916             pa_tagstruct_puts(t, p->description);
2917             pa_tagstruct_putu32(t, p->n_sinks);
2918             pa_tagstruct_putu32(t, p->n_sources);
2919             pa_tagstruct_putu32(t, p->priority);
2920         }
2921     }
2922
2923     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2924     pa_tagstruct_put_proplist(t, card->proplist);
2925 }
2926
2927 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2928     pa_assert(t);
2929     pa_assert(module);
2930
2931     pa_tagstruct_putu32(t, module->index);
2932     pa_tagstruct_puts(t, module->name);
2933     pa_tagstruct_puts(t, module->argument);
2934     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2935
2936     if (c->version < 15)
2937         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2938
2939     if (c->version >= 15)
2940         pa_tagstruct_put_proplist(t, module->proplist);
2941 }
2942
2943 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2944     pa_sample_spec fixed_ss;
2945     pa_usec_t sink_latency;
2946     pa_cvolume v;
2947
2948     pa_assert(t);
2949     pa_sink_input_assert_ref(s);
2950
2951     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2952
2953     pa_tagstruct_putu32(t, s->index);
2954     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2955     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2956     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2957     pa_tagstruct_putu32(t, s->sink->index);
2958     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2959     pa_tagstruct_put_channel_map(t, &s->channel_map);
2960     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s, &v, TRUE));
2961     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
2962     pa_tagstruct_put_usec(t, sink_latency);
2963     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
2964     pa_tagstruct_puts(t, s->driver);
2965     if (c->version >= 11)
2966         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
2967     if (c->version >= 13)
2968         pa_tagstruct_put_proplist(t, s->proplist);
2969 }
2970
2971 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
2972     pa_sample_spec fixed_ss;
2973     pa_usec_t source_latency;
2974
2975     pa_assert(t);
2976     pa_source_output_assert_ref(s);
2977
2978     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2979
2980     pa_tagstruct_putu32(t, s->index);
2981     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2982     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2983     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2984     pa_tagstruct_putu32(t, s->source->index);
2985     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2986     pa_tagstruct_put_channel_map(t, &s->channel_map);
2987     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
2988     pa_tagstruct_put_usec(t, source_latency);
2989     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
2990     pa_tagstruct_puts(t, s->driver);
2991
2992     if (c->version >= 13)
2993         pa_tagstruct_put_proplist(t, s->proplist);
2994 }
2995
2996 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
2997     pa_sample_spec fixed_ss;
2998     pa_cvolume v;
2999
3000     pa_assert(t);
3001     pa_assert(e);
3002
3003     if (e->memchunk.memblock)
3004         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3005     else
3006         memset(&fixed_ss, 0, sizeof(fixed_ss));
3007
3008     pa_tagstruct_putu32(t, e->index);
3009     pa_tagstruct_puts(t, e->name);
3010
3011     if (e->volume_is_set)
3012         v = e->volume;
3013     else
3014         pa_cvolume_init(&v);
3015
3016     pa_tagstruct_put_cvolume(t, &v);
3017     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3018     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3019     pa_tagstruct_put_channel_map(t, &e->channel_map);
3020     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3021     pa_tagstruct_put_boolean(t, e->lazy);
3022     pa_tagstruct_puts(t, e->filename);
3023
3024     if (c->version >= 13)
3025         pa_tagstruct_put_proplist(t, e->proplist);
3026 }
3027
3028 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3029     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3030     uint32_t idx;
3031     pa_sink *sink = NULL;
3032     pa_source *source = NULL;
3033     pa_client *client = NULL;
3034     pa_card *card = NULL;
3035     pa_module *module = NULL;
3036     pa_sink_input *si = NULL;
3037     pa_source_output *so = NULL;
3038     pa_scache_entry *sce = NULL;
3039     const char *name = NULL;
3040     pa_tagstruct *reply;
3041
3042     pa_native_connection_assert_ref(c);
3043     pa_assert(t);
3044
3045     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3046         (command != PA_COMMAND_GET_CLIENT_INFO &&
3047          command != PA_COMMAND_GET_MODULE_INFO &&
3048          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3049          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3050          pa_tagstruct_gets(t, &name) < 0) ||
3051         !pa_tagstruct_eof(t)) {
3052         protocol_error(c);
3053         return;
3054     }
3055
3056     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3057     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3058     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3059     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3060     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3061
3062     if (command == PA_COMMAND_GET_SINK_INFO) {
3063         if (idx != PA_INVALID_INDEX)
3064             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3065         else
3066             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3067     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3068         if (idx != PA_INVALID_INDEX)
3069             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3070         else
3071             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3072     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3073         if (idx != PA_INVALID_INDEX)
3074             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3075         else
3076             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3077     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3078         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3079     else if (command == PA_COMMAND_GET_MODULE_INFO)
3080         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3081     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3082         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3083     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3084         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3085     else {
3086         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3087         if (idx != PA_INVALID_INDEX)
3088             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3089         else
3090             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3091     }
3092
3093     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3094         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3095         return;
3096     }
3097
3098     reply = reply_new(tag);
3099     if (sink)
3100         sink_fill_tagstruct(c, reply, sink);
3101     else if (source)
3102         source_fill_tagstruct(c, reply, source);
3103     else if (client)
3104         client_fill_tagstruct(c, reply, client);
3105     else if (card)
3106         card_fill_tagstruct(c, reply, card);
3107     else if (module)
3108         module_fill_tagstruct(c, reply, module);
3109     else if (si)
3110         sink_input_fill_tagstruct(c, reply, si);
3111     else if (so)
3112         source_output_fill_tagstruct(c, reply, so);
3113     else
3114         scache_fill_tagstruct(c, reply, sce);
3115     pa_pstream_send_tagstruct(c->pstream, reply);
3116 }
3117
3118 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3119     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3120     pa_idxset *i;
3121     uint32_t idx;
3122     void *p;
3123     pa_tagstruct *reply;
3124
3125     pa_native_connection_assert_ref(c);
3126     pa_assert(t);
3127
3128     if (!pa_tagstruct_eof(t)) {
3129         protocol_error(c);
3130         return;
3131     }
3132
3133     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3134
3135     reply = reply_new(tag);
3136
3137     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3138         i = c->protocol->core->sinks;
3139     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3140         i = c->protocol->core->sources;
3141     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3142         i = c->protocol->core->clients;
3143     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3144         i = c->protocol->core->cards;
3145     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3146         i = c->protocol->core->modules;
3147     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3148         i = c->protocol->core->sink_inputs;
3149     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3150         i = c->protocol->core->source_outputs;
3151     else {
3152         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3153         i = c->protocol->core->scache;
3154     }
3155
3156     if (i) {
3157         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3158             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3159                 sink_fill_tagstruct(c, reply, p);
3160             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3161                 source_fill_tagstruct(c, reply, p);
3162             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3163                 client_fill_tagstruct(c, reply, p);
3164             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3165                 card_fill_tagstruct(c, reply, p);
3166             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3167                 module_fill_tagstruct(c, reply, p);
3168             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3169                 sink_input_fill_tagstruct(c, reply, p);
3170             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3171                 source_output_fill_tagstruct(c, reply, p);
3172             else {
3173                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3174                 scache_fill_tagstruct(c, reply, p);
3175             }
3176         }
3177     }
3178
3179     pa_pstream_send_tagstruct(c->pstream, reply);
3180 }
3181
3182 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3183     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3184     pa_tagstruct *reply;
3185     char txt[256];
3186     pa_sink *def_sink;
3187     pa_source *def_source;
3188     pa_sample_spec fixed_ss;
3189
3190     pa_native_connection_assert_ref(c);
3191     pa_assert(t);
3192
3193     if (!pa_tagstruct_eof(t)) {
3194         protocol_error(c);
3195         return;
3196     }
3197
3198     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3199
3200     reply = reply_new(tag);
3201     pa_tagstruct_puts(reply, PACKAGE_NAME);
3202     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3203     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
3204     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
3205
3206     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3207     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3208
3209     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3210     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3211     def_source = pa_namereg_get_default_source(c->protocol->core);
3212     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3213
3214     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3215
3216     if (c->version >= 15)
3217         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3218
3219     pa_pstream_send_tagstruct(c->pstream, reply);
3220 }
3221
3222 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3223     pa_tagstruct *t;
3224     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3225
3226     pa_native_connection_assert_ref(c);
3227
3228     t = pa_tagstruct_new(NULL, 0);
3229     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3230     pa_tagstruct_putu32(t, (uint32_t) -1);
3231     pa_tagstruct_putu32(t, e);
3232     pa_tagstruct_putu32(t, idx);
3233     pa_pstream_send_tagstruct(c->pstream, t);
3234 }
3235
3236 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3237     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3238     pa_subscription_mask_t m;
3239
3240     pa_native_connection_assert_ref(c);
3241     pa_assert(t);
3242
3243     if (pa_tagstruct_getu32(t, &m) < 0 ||
3244         !pa_tagstruct_eof(t)) {
3245         protocol_error(c);
3246         return;
3247     }
3248
3249     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3250     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3251
3252     if (c->subscription)
3253         pa_subscription_free(c->subscription);
3254
3255     if (m != 0) {
3256         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3257         pa_assert(c->subscription);
3258     } else
3259         c->subscription = NULL;
3260
3261     pa_pstream_send_simple_ack(c->pstream, tag);
3262 }
3263
3264 static void command_set_volume(
3265         pa_pdispatch *pd,
3266         uint32_t command,
3267         uint32_t tag,
3268         pa_tagstruct *t,
3269         void *userdata) {
3270
3271     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3272     uint32_t idx;
3273     pa_cvolume volume;
3274     pa_sink *sink = NULL;
3275     pa_source *source = NULL;
3276     pa_sink_input *si = NULL;
3277     const char *name = NULL;
3278
3279     pa_native_connection_assert_ref(c);
3280     pa_assert(t);
3281
3282     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3283         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3284         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3285         pa_tagstruct_get_cvolume(t, &volume) ||
3286         !pa_tagstruct_eof(t)) {
3287         protocol_error(c);
3288         return;
3289     }
3290
3291     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3292     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3293     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3294     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3295     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3296     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3297
3298     switch (command) {
3299
3300         case PA_COMMAND_SET_SINK_VOLUME:
3301             if (idx != PA_INVALID_INDEX)
3302                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3303             else
3304                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3305             break;
3306
3307         case PA_COMMAND_SET_SOURCE_VOLUME:
3308             if (idx != PA_INVALID_INDEX)
3309                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3310             else
3311                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3312             break;
3313
3314         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3315             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3316             break;
3317
3318         default:
3319             pa_assert_not_reached();
3320     }
3321
3322     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3323
3324     if (sink)
3325         pa_sink_set_volume(sink, &volume, TRUE, TRUE, TRUE);
3326     else if (source)
3327         pa_source_set_volume(source, &volume);
3328     else if (si)
3329         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3330
3331     pa_pstream_send_simple_ack(c->pstream, tag);
3332 }
3333
3334 static void command_set_mute(
3335         pa_pdispatch *pd,
3336         uint32_t command,
3337         uint32_t tag,
3338         pa_tagstruct *t,
3339         void *userdata) {
3340
3341     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3342     uint32_t idx;
3343     pa_bool_t mute;
3344     pa_sink *sink = NULL;
3345     pa_source *source = NULL;
3346     pa_sink_input *si = NULL;
3347     const char *name = NULL;
3348
3349     pa_native_connection_assert_ref(c);
3350     pa_assert(t);
3351
3352     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3353         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3354         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3355         pa_tagstruct_get_boolean(t, &mute) ||
3356         !pa_tagstruct_eof(t)) {
3357         protocol_error(c);
3358         return;
3359     }
3360
3361     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3362     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3363     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3364     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3365     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3366
3367     switch (command) {
3368
3369         case PA_COMMAND_SET_SINK_MUTE:
3370
3371             if (idx != PA_INVALID_INDEX)
3372                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3373             else
3374                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3375
3376             break;
3377
3378         case PA_COMMAND_SET_SOURCE_MUTE:
3379             if (idx != PA_INVALID_INDEX)
3380                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3381             else
3382                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3383
3384             break;
3385
3386         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3387             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3388             break;
3389
3390         default:
3391             pa_assert_not_reached();
3392     }
3393
3394     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3395
3396     if (sink)
3397         pa_sink_set_mute(sink, mute);
3398     else if (source)
3399         pa_source_set_mute(source, mute);
3400     else if (si)
3401         pa_sink_input_set_mute(si, mute, TRUE);
3402
3403     pa_pstream_send_simple_ack(c->pstream, tag);
3404 }
3405
3406 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3407     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3408     uint32_t idx;
3409     pa_bool_t b;
3410     playback_stream *s;
3411
3412     pa_native_connection_assert_ref(c);
3413     pa_assert(t);
3414
3415     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3416         pa_tagstruct_get_boolean(t, &b) < 0 ||
3417         !pa_tagstruct_eof(t)) {
3418         protocol_error(c);
3419         return;
3420     }
3421
3422     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3423     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3424     s = pa_idxset_get_by_index(c->output_streams, idx);
3425     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3426     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3427
3428     pa_sink_input_cork(s->sink_input, b);
3429
3430     if (b)
3431         s->is_underrun = TRUE;
3432
3433     pa_pstream_send_simple_ack(c->pstream, tag);
3434 }
3435
3436 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3437     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3438     uint32_t idx;
3439     playback_stream *s;
3440
3441     pa_native_connection_assert_ref(c);
3442     pa_assert(t);
3443
3444     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3445         !pa_tagstruct_eof(t)) {
3446         protocol_error(c);
3447         return;
3448     }
3449
3450     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3451     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3452     s = pa_idxset_get_by_index(c->output_streams, idx);
3453     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3454     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3455
3456     switch (command) {
3457         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3458             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3459             break;
3460
3461         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3462             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3463             break;
3464
3465         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3466             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3467             break;
3468
3469         default:
3470             pa_assert_not_reached();
3471     }
3472
3473     pa_pstream_send_simple_ack(c->pstream, tag);
3474 }
3475
3476 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3477     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3478     uint32_t idx;
3479     record_stream *s;
3480     pa_bool_t b;
3481
3482     pa_native_connection_assert_ref(c);
3483     pa_assert(t);
3484
3485     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3486         pa_tagstruct_get_boolean(t, &b) < 0 ||
3487         !pa_tagstruct_eof(t)) {
3488         protocol_error(c);
3489         return;
3490     }
3491
3492     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3493     s = pa_idxset_get_by_index(c->record_streams, idx);
3494     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3495
3496     pa_source_output_cork(s->source_output, b);
3497     pa_memblockq_prebuf_force(s->memblockq);
3498     pa_pstream_send_simple_ack(c->pstream, tag);
3499 }
3500
3501 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3502     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3503     uint32_t idx;
3504     record_stream *s;
3505
3506     pa_native_connection_assert_ref(c);
3507     pa_assert(t);
3508
3509     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3510         !pa_tagstruct_eof(t)) {
3511         protocol_error(c);
3512         return;
3513     }
3514
3515     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3516     s = pa_idxset_get_by_index(c->record_streams, idx);
3517     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3518
3519     pa_memblockq_flush_read(s->memblockq);
3520     pa_pstream_send_simple_ack(c->pstream, tag);
3521 }
3522
3523 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3524     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3525     uint32_t idx;
3526     pa_buffer_attr a;
3527     pa_tagstruct *reply;
3528
3529     pa_native_connection_assert_ref(c);
3530     pa_assert(t);
3531
3532     memset(&a, 0, sizeof(a));
3533
3534     if (pa_tagstruct_getu32(t, &idx) < 0) {
3535         protocol_error(c);
3536         return;
3537     }
3538
3539     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3540
3541     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3542         playback_stream *s;
3543         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3544
3545         s = pa_idxset_get_by_index(c->output_streams, idx);
3546         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3547         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3548
3549         if (pa_tagstruct_get(
3550                     t,
3551                     PA_TAG_U32, &a.maxlength,
3552                     PA_TAG_U32, &a.tlength,
3553                     PA_TAG_U32, &a.prebuf,
3554                     PA_TAG_U32, &a.minreq,
3555                     PA_TAG_INVALID) < 0 ||
3556             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3557             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3558             !pa_tagstruct_eof(t)) {
3559             protocol_error(c);
3560             return;
3561         }
3562
3563         s->adjust_latency = adjust_latency;
3564         s->early_requests = early_requests;
3565         s->buffer_attr = a;
3566
3567         fix_playback_buffer_attr(s);
3568         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3569
3570         reply = reply_new(tag);
3571         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3572         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3573         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3574         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3575
3576         if (c->version >= 13)
3577             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3578
3579     } else {
3580         record_stream *s;
3581         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3582         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3583
3584         s = pa_idxset_get_by_index(c->record_streams, idx);
3585         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3586
3587         if (pa_tagstruct_get(
3588                     t,
3589                     PA_TAG_U32, &a.maxlength,
3590                     PA_TAG_U32, &a.fragsize,
3591                     PA_TAG_INVALID) < 0 ||
3592             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3593             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3594             !pa_tagstruct_eof(t)) {
3595             protocol_error(c);
3596             return;
3597         }
3598
3599         s->adjust_latency = adjust_latency;
3600         s->early_requests = early_requests;
3601         s->buffer_attr = a;
3602
3603         fix_record_buffer_attr_pre(s);
3604         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3605         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3606         fix_record_buffer_attr_post(s);
3607
3608         reply = reply_new(tag);
3609         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3610         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3611
3612         if (c->version >= 13)
3613             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3614     }
3615
3616     pa_pstream_send_tagstruct(c->pstream, reply);
3617 }
3618
3619 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3620     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3621     uint32_t idx;
3622     uint32_t rate;
3623
3624     pa_native_connection_assert_ref(c);
3625     pa_assert(t);
3626
3627     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3628         pa_tagstruct_getu32(t, &rate) < 0 ||
3629         !pa_tagstruct_eof(t)) {
3630         protocol_error(c);
3631         return;
3632     }
3633
3634     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3635     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3636
3637     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3638         playback_stream *s;
3639
3640         s = pa_idxset_get_by_index(c->output_streams, idx);
3641         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3642         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3643
3644         pa_sink_input_set_rate(s->sink_input, rate);
3645
3646     } else {
3647         record_stream *s;
3648         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3649
3650         s = pa_idxset_get_by_index(c->record_streams, idx);
3651         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3652
3653         pa_source_output_set_rate(s->source_output, rate);
3654     }
3655
3656     pa_pstream_send_simple_ack(c->pstream, tag);
3657 }
3658
3659 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3660     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3661     uint32_t idx;
3662     uint32_t mode;
3663     pa_proplist *p;
3664
3665     pa_native_connection_assert_ref(c);
3666     pa_assert(t);
3667
3668     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3669
3670     p = pa_proplist_new();
3671
3672     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3673
3674         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3675             pa_tagstruct_get_proplist(t, p) < 0 ||
3676             !pa_tagstruct_eof(t)) {
3677             protocol_error(c);
3678             pa_proplist_free(p);
3679             return;
3680         }
3681
3682     } else {
3683
3684         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3685             pa_tagstruct_getu32(t, &mode) < 0 ||
3686             pa_tagstruct_get_proplist(t, p) < 0 ||
3687             !pa_tagstruct_eof(t)) {
3688             protocol_error(c);
3689             pa_proplist_free(p);
3690             return;
3691         }
3692     }
3693
3694     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3695         pa_proplist_free(p);
3696         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3697     }
3698
3699     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3700         playback_stream *s;
3701
3702         s = pa_idxset_get_by_index(c->output_streams, idx);
3703         if (!s || !playback_stream_isinstance(s)) {
3704             pa_proplist_free(p);
3705             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3706         }
3707         pa_sink_input_update_proplist(s->sink_input, mode, p);
3708
3709     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3710         record_stream *s;
3711
3712         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3713             pa_proplist_free(p);
3714             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3715         }
3716         pa_source_output_update_proplist(s->source_output, mode, p);
3717
3718     } else {
3719         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3720
3721         pa_client_update_proplist(c->client, mode, p);
3722     }
3723
3724     pa_pstream_send_simple_ack(c->pstream, tag);
3725     pa_proplist_free(p);
3726 }
3727
3728 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3729     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3730     uint32_t idx;
3731     unsigned changed = 0;
3732     pa_proplist *p;
3733     pa_strlist *l = NULL;
3734
3735     pa_native_connection_assert_ref(c);
3736     pa_assert(t);
3737
3738     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3739
3740     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3741
3742         if (pa_tagstruct_getu32(t, &idx) < 0) {
3743             protocol_error(c);
3744             return;
3745         }
3746     }
3747
3748     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3749         playback_stream *s;
3750
3751         s = pa_idxset_get_by_index(c->output_streams, idx);
3752         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3753         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3754
3755         p = s->sink_input->proplist;
3756
3757     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3758         record_stream *s;
3759
3760         s = pa_idxset_get_by_index(c->record_streams, idx);
3761         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3762
3763         p = s->source_output->proplist;
3764     } else {
3765         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3766
3767         p = c->client->proplist;
3768     }
3769
3770     for (;;) {
3771         const char *k;
3772
3773         if (pa_tagstruct_gets(t, &k) < 0) {
3774             protocol_error(c);
3775             pa_strlist_free(l);
3776             return;
3777         }
3778
3779         if (!k)
3780             break;
3781
3782         l = pa_strlist_prepend(l, k);
3783     }
3784
3785     if (!pa_tagstruct_eof(t)) {
3786         protocol_error(c);
3787         pa_strlist_free(l);
3788         return;
3789     }
3790
3791     for (;;) {
3792         char *z;
3793
3794         l = pa_strlist_pop(l, &z);
3795
3796         if (!z)
3797             break;
3798
3799         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3800         pa_xfree(z);
3801     }
3802
3803     pa_pstream_send_simple_ack(c->pstream, tag);
3804
3805     if (changed) {
3806         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3807             playback_stream *s;
3808
3809             s = pa_idxset_get_by_index(c->output_streams, idx);
3810             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3811
3812         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3813             record_stream *s;
3814
3815             s = pa_idxset_get_by_index(c->record_streams, idx);
3816             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3817
3818         } else {
3819             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3820             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3821         }
3822     }
3823 }
3824
3825 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3826     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3827     const char *s;
3828
3829     pa_native_connection_assert_ref(c);
3830     pa_assert(t);
3831
3832     if (pa_tagstruct_gets(t, &s) < 0 ||
3833         !pa_tagstruct_eof(t)) {
3834         protocol_error(c);
3835         return;
3836     }
3837
3838     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3839     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3840
3841     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3842         pa_source *source;
3843
3844         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3845         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3846
3847         pa_namereg_set_default_source(c->protocol->core, source);
3848     } else {
3849         pa_sink *sink;
3850         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3851
3852         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3853         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3854
3855         pa_namereg_set_default_sink(c->protocol->core, sink);
3856     }
3857
3858     pa_pstream_send_simple_ack(c->pstream, tag);
3859 }
3860
3861 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3862     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3863     uint32_t idx;
3864     const char *name;
3865
3866     pa_native_connection_assert_ref(c);
3867     pa_assert(t);
3868
3869     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3870         pa_tagstruct_gets(t, &name) < 0 ||
3871         !pa_tagstruct_eof(t)) {
3872         protocol_error(c);
3873         return;
3874     }
3875
3876     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3877     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3878
3879     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3880         playback_stream *s;
3881
3882         s = pa_idxset_get_by_index(c->output_streams, idx);
3883         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3884         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3885
3886         pa_sink_input_set_name(s->sink_input, name);
3887
3888     } else {
3889         record_stream *s;
3890         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3891
3892         s = pa_idxset_get_by_index(c->record_streams, idx);
3893         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3894
3895         pa_source_output_set_name(s->source_output, name);
3896     }
3897
3898     pa_pstream_send_simple_ack(c->pstream, tag);
3899 }
3900
3901 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3902     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3903     uint32_t idx;
3904
3905     pa_native_connection_assert_ref(c);
3906     pa_assert(t);
3907
3908     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3909         !pa_tagstruct_eof(t)) {
3910         protocol_error(c);
3911         return;
3912     }
3913
3914     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3915
3916     if (command == PA_COMMAND_KILL_CLIENT) {
3917         pa_client *client;
3918
3919         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3920         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3921
3922         pa_native_connection_ref(c);
3923         pa_client_kill(client);
3924
3925     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3926         pa_sink_input *s;
3927
3928         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3929         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3930
3931         pa_native_connection_ref(c);
3932         pa_sink_input_kill(s);
3933     } else {
3934         pa_source_output *s;
3935
3936         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
3937
3938         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3939         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3940
3941         pa_native_connection_ref(c);
3942         pa_source_output_kill(s);
3943     }
3944
3945     pa_pstream_send_simple_ack(c->pstream, tag);
3946     pa_native_connection_unref(c);
3947 }
3948
3949 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3950     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3951     pa_module *m;
3952     const char *name, *argument;
3953     pa_tagstruct *reply;
3954
3955     pa_native_connection_assert_ref(c);
3956     pa_assert(t);
3957
3958     if (pa_tagstruct_gets(t, &name) < 0 ||
3959         pa_tagstruct_gets(t, &argument) < 0 ||
3960         !pa_tagstruct_eof(t)) {
3961         protocol_error(c);
3962         return;
3963     }
3964
3965     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3966     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
3967     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
3968
3969     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
3970         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
3971         return;
3972     }
3973
3974     reply = reply_new(tag);
3975     pa_tagstruct_putu32(reply, m->index);
3976     pa_pstream_send_tagstruct(c->pstream, reply);
3977 }
3978
3979 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3980     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3981     uint32_t idx;
3982     pa_module *m;
3983
3984     pa_native_connection_assert_ref(c);
3985     pa_assert(t);
3986
3987     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3988         !pa_tagstruct_eof(t)) {
3989         protocol_error(c);
3990         return;
3991     }
3992
3993     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3994     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3995     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
3996
3997     pa_module_unload_request(m, FALSE);
3998     pa_pstream_send_simple_ack(c->pstream, tag);
3999 }
4000
4001 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4002     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4003     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4004     const char *name_device = NULL;
4005
4006     pa_native_connection_assert_ref(c);
4007     pa_assert(t);
4008
4009     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4010         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4011         pa_tagstruct_gets(t, &name_device) < 0 ||
4012         !pa_tagstruct_eof(t)) {
4013         protocol_error(c);
4014         return;
4015     }
4016
4017     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4018     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4019
4020     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4021     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4022     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4023     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4024
4025     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4026         pa_sink_input *si = NULL;
4027         pa_sink *sink = NULL;
4028
4029         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4030
4031         if (idx_device != PA_INVALID_INDEX)
4032             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4033         else
4034             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4035
4036         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4037
4038         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4039             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4040             return;
4041         }
4042     } else {
4043         pa_source_output *so = NULL;
4044         pa_source *source;
4045
4046         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4047
4048         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4049
4050         if (idx_device != PA_INVALID_INDEX)
4051             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4052         else
4053             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4054
4055         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4056
4057         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4058             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4059             return;
4060         }
4061     }
4062
4063     pa_pstream_send_simple_ack(c->pstream, tag);
4064 }
4065
4066 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4067     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4068     uint32_t idx = PA_INVALID_INDEX;
4069     const char *name = NULL;
4070     pa_bool_t b;
4071
4072     pa_native_connection_assert_ref(c);
4073     pa_assert(t);
4074
4075     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4076         pa_tagstruct_gets(t, &name) < 0 ||
4077         pa_tagstruct_get_boolean(t, &b) < 0 ||
4078         !pa_tagstruct_eof(t)) {
4079         protocol_error(c);
4080         return;
4081     }
4082
4083     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4084     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4085     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4086     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4087     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4088
4089     if (command == PA_COMMAND_SUSPEND_SINK) {
4090
4091         if (idx == PA_INVALID_INDEX && name && !*name) {
4092
4093             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4094
4095             if (pa_sink_suspend_all(c->protocol->core, b) < 0) {
4096                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4097                 return;
4098             }
4099         } else {
4100             pa_sink *sink = NULL;
4101
4102             if (idx != PA_INVALID_INDEX)
4103                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4104             else
4105                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4106
4107             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4108
4109             if (pa_sink_suspend(sink, b) < 0) {
4110                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4111                 return;
4112             }
4113         }
4114     } else {
4115
4116         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4117
4118         if (idx == PA_INVALID_INDEX && name && !*name) {
4119
4120             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4121
4122             if (pa_source_suspend_all(c->protocol->core, b) < 0) {
4123                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4124                 return;
4125             }
4126
4127         } else {
4128             pa_source *source;
4129
4130             if (idx != PA_INVALID_INDEX)
4131                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4132             else
4133                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4134
4135             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4136
4137             if (pa_source_suspend(source, b) < 0) {
4138                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4139                 return;
4140             }
4141         }
4142     }
4143
4144     pa_pstream_send_simple_ack(c->pstream, tag);
4145 }
4146
4147 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4148     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4149     uint32_t idx = PA_INVALID_INDEX;
4150     const char *name = NULL;
4151     pa_module *m;
4152     pa_native_protocol_ext_cb_t cb;
4153
4154     pa_native_connection_assert_ref(c);
4155     pa_assert(t);
4156
4157     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4158         pa_tagstruct_gets(t, &name) < 0) {
4159         protocol_error(c);
4160         return;
4161     }
4162
4163     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4164     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4165     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4166     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4167     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4168
4169     if (idx != PA_INVALID_INDEX)
4170         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4171     else {
4172         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4173             if (strcmp(name, m->name) == 0)
4174                 break;
4175     }
4176
4177     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4178     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4179
4180     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4181     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4182
4183     if (cb(c->protocol, m, c, tag, t) < 0)
4184         protocol_error(c);
4185 }
4186
4187 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4188     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4189     uint32_t idx = PA_INVALID_INDEX;
4190     const char *name = NULL, *profile = NULL;
4191     pa_card *card = NULL;
4192
4193     pa_native_connection_assert_ref(c);
4194     pa_assert(t);
4195
4196     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4197         pa_tagstruct_gets(t, &name) < 0 ||
4198         pa_tagstruct_gets(t, &profile) < 0 ||
4199         !pa_tagstruct_eof(t)) {
4200         protocol_error(c);
4201         return;
4202     }
4203
4204     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4205     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4206     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4207     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4208     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4209
4210     if (idx != PA_INVALID_INDEX)
4211         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4212     else
4213         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4214
4215     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4216
4217     if (pa_card_set_profile(card, profile, TRUE) < 0) {
4218         pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4219         return;
4220     }
4221
4222     pa_pstream_send_simple_ack(c->pstream, tag);
4223 }
4224
4225 /*** pstream callbacks ***/
4226
4227 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4228     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4229
4230     pa_assert(p);
4231     pa_assert(packet);
4232     pa_native_connection_assert_ref(c);
4233
4234     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4235         pa_log("invalid packet.");
4236         native_connection_unlink(c);
4237     }
4238 }
4239
4240 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4241     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4242     output_stream *stream;
4243
4244     pa_assert(p);
4245     pa_assert(chunk);
4246     pa_native_connection_assert_ref(c);
4247
4248     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4249         pa_log_debug("Client sent block for invalid stream.");
4250         /* Ignoring */
4251         return;
4252     }
4253
4254 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4255
4256     if (playback_stream_isinstance(stream)) {
4257         playback_stream *ps = PLAYBACK_STREAM(stream);
4258
4259         if (chunk->memblock) {
4260             if (seek != PA_SEEK_RELATIVE || offset != 0)
4261                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4262
4263             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4264         } else
4265             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4266
4267     } else {
4268         upload_stream *u = UPLOAD_STREAM(stream);
4269         size_t l;
4270
4271         if (!u->memchunk.memblock) {
4272             if (u->length == chunk->length && chunk->memblock) {
4273                 u->memchunk = *chunk;
4274                 pa_memblock_ref(u->memchunk.memblock);
4275                 u->length = 0;
4276             } else {
4277                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4278                 u->memchunk.index = u->memchunk.length = 0;
4279             }
4280         }
4281
4282         pa_assert(u->memchunk.memblock);
4283
4284         l = u->length;
4285         if (l > chunk->length)
4286             l = chunk->length;
4287
4288         if (l > 0) {
4289             void *dst;
4290             dst = pa_memblock_acquire(u->memchunk.memblock);
4291
4292             if (chunk->memblock) {
4293                 void *src;
4294                 src = pa_memblock_acquire(chunk->memblock);
4295
4296                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4297                        (uint8_t*) src + chunk->index, l);
4298
4299                 pa_memblock_release(chunk->memblock);
4300             } else
4301                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4302
4303             pa_memblock_release(u->memchunk.memblock);
4304
4305             u->memchunk.length += l;
4306             u->length -= l;
4307         }
4308     }
4309 }
4310
4311 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4312     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4313
4314     pa_assert(p);
4315     pa_native_connection_assert_ref(c);
4316
4317     native_connection_unlink(c);
4318     pa_log_info("Connection died.");
4319 }
4320
4321 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4322     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4323
4324     pa_assert(p);
4325     pa_native_connection_assert_ref(c);
4326
4327     native_connection_send_memblock(c);
4328 }
4329
4330 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4331     pa_thread_mq *q;
4332
4333     if (!(q = pa_thread_mq_get()))
4334         pa_pstream_send_revoke(p, block_id);
4335     else
4336         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4337 }
4338
4339 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4340     pa_thread_mq *q;
4341
4342     if (!(q = pa_thread_mq_get()))
4343         pa_pstream_send_release(p, block_id);
4344     else
4345         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4346 }
4347
4348 /*** client callbacks ***/
4349
4350 static void client_kill_cb(pa_client *c) {
4351     pa_assert(c);
4352
4353     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4354     pa_log_info("Connection killed.");
4355 }
4356
4357 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4358     pa_tagstruct *t;
4359     pa_native_connection *c;
4360
4361     pa_assert(client);
4362     c = PA_NATIVE_CONNECTION(client->userdata);
4363     pa_native_connection_assert_ref(c);
4364
4365     if (c->version < 15)
4366       return;
4367
4368     t = pa_tagstruct_new(NULL, 0);
4369     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4370     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4371     pa_tagstruct_puts(t, event);
4372     pa_tagstruct_put_proplist(t, pl);
4373     pa_pstream_send_tagstruct(c->pstream, t);
4374 }
4375
4376 /*** module entry points ***/
4377
4378 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
4379     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4380
4381     pa_assert(m);
4382     pa_assert(tv);
4383     pa_native_connection_assert_ref(c);
4384     pa_assert(c->auth_timeout_event == e);
4385
4386     if (!c->authorized) {
4387         native_connection_unlink(c);
4388         pa_log_info("Connection terminated due to authentication timeout.");
4389     }
4390 }
4391
4392 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4393     pa_native_connection *c;
4394     char pname[128];
4395     pa_client *client;
4396     pa_client_new_data data;
4397
4398     pa_assert(p);
4399     pa_assert(io);
4400     pa_assert(o);
4401
4402     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4403         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4404         pa_iochannel_free(io);
4405         return;
4406     }
4407
4408     pa_client_new_data_init(&data);
4409     data.module = o->module;
4410     data.driver = __FILE__;
4411     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4412     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4413     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4414     client = pa_client_new(p->core, &data);
4415     pa_client_new_data_done(&data);
4416
4417     if (!client)
4418         return;
4419
4420     c = pa_msgobject_new(pa_native_connection);
4421     c->parent.parent.free = native_connection_free;
4422     c->parent.process_msg = native_connection_process_msg;
4423     c->protocol = p;
4424     c->options = pa_native_options_ref(o);
4425     c->authorized = FALSE;
4426
4427     if (o->auth_anonymous) {
4428         pa_log_info("Client authenticated anonymously.");
4429         c->authorized = TRUE;
4430     }
4431
4432     if (!c->authorized &&
4433         o->auth_ip_acl &&
4434         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4435
4436         pa_log_info("Client authenticated by IP ACL.");
4437         c->authorized = TRUE;
4438     }
4439
4440     if (!c->authorized) {
4441         struct timeval tv;
4442         pa_gettimeofday(&tv);
4443         tv.tv_sec += AUTH_TIMEOUT;
4444         c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c);
4445     } else
4446         c->auth_timeout_event = NULL;
4447
4448     c->is_local = pa_iochannel_socket_is_local(io);
4449     c->version = 8;
4450
4451     c->client = client;
4452     c->client->kill = client_kill_cb;
4453     c->client->send_event = client_send_event_cb;
4454     c->client->userdata = c;
4455
4456     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4457     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4458     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4459     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4460     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4461     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4462     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4463
4464     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
4465
4466     c->record_streams = pa_idxset_new(NULL, NULL);
4467     c->output_streams = pa_idxset_new(NULL, NULL);
4468
4469     c->rrobin_index = PA_IDXSET_INVALID;
4470     c->subscription = NULL;
4471
4472     pa_idxset_put(p->connections, c, NULL);
4473
4474 #ifdef HAVE_CREDS
4475     if (pa_iochannel_creds_supported(io))
4476         pa_iochannel_creds_enable(io);
4477 #endif
4478
4479     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4480 }
4481
4482 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4483     pa_native_connection *c;
4484     void *state = NULL;
4485
4486     pa_assert(p);
4487     pa_assert(m);
4488
4489     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4490         if (c->options->module == m)
4491             native_connection_unlink(c);
4492 }
4493
4494 static pa_native_protocol* native_protocol_new(pa_core *c) {
4495     pa_native_protocol *p;
4496     pa_native_hook_t h;
4497
4498     pa_assert(c);
4499
4500     p = pa_xnew(pa_native_protocol, 1);
4501     PA_REFCNT_INIT(p);
4502     p->core = c;
4503     p->connections = pa_idxset_new(NULL, NULL);
4504
4505     p->servers = NULL;
4506
4507     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4508
4509     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4510         pa_hook_init(&p->hooks[h], p);
4511
4512     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4513
4514     return p;
4515 }
4516
4517 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4518     pa_native_protocol *p;
4519
4520     if ((p = pa_shared_get(c, "native-protocol")))
4521         return pa_native_protocol_ref(p);
4522
4523     return native_protocol_new(c);
4524 }
4525
4526 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4527     pa_assert(p);
4528     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4529
4530     PA_REFCNT_INC(p);
4531
4532     return p;
4533 }
4534
4535 void pa_native_protocol_unref(pa_native_protocol *p) {
4536     pa_native_connection *c;
4537     pa_native_hook_t h;
4538
4539     pa_assert(p);
4540     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4541
4542     if (PA_REFCNT_DEC(p) > 0)
4543         return;
4544
4545     while ((c = pa_idxset_first(p->connections, NULL)))
4546         native_connection_unlink(c);
4547
4548     pa_idxset_free(p->connections, NULL, NULL);
4549
4550     pa_strlist_free(p->servers);
4551
4552     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4553         pa_hook_done(&p->hooks[h]);
4554
4555     pa_hashmap_free(p->extensions, NULL, NULL);
4556
4557     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4558
4559     pa_xfree(p);
4560 }
4561
4562 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4563     pa_assert(p);
4564     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4565     pa_assert(name);
4566
4567     p->servers = pa_strlist_prepend(p->servers, name);
4568
4569     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4570 }
4571
4572 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4573     pa_assert(p);
4574     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4575     pa_assert(name);
4576
4577     p->servers = pa_strlist_remove(p->servers, name);
4578
4579     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4580 }
4581
4582 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4583     pa_assert(p);
4584     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4585
4586     return p->hooks;
4587 }
4588
4589 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4590     pa_assert(p);
4591     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4592
4593     return p->servers;
4594 }
4595
4596 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4597     pa_assert(p);
4598     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4599     pa_assert(m);
4600     pa_assert(cb);
4601     pa_assert(!pa_hashmap_get(p->extensions, m));
4602
4603     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4604     return 0;
4605 }
4606
4607 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4608     pa_assert(p);
4609     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4610     pa_assert(m);
4611
4612     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4613 }
4614
4615 pa_native_options* pa_native_options_new(void) {
4616     pa_native_options *o;
4617
4618     o = pa_xnew0(pa_native_options, 1);
4619     PA_REFCNT_INIT(o);
4620
4621     return o;
4622 }
4623
4624 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4625     pa_assert(o);
4626     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4627
4628     PA_REFCNT_INC(o);
4629
4630     return o;
4631 }
4632
4633 void pa_native_options_unref(pa_native_options *o) {
4634     pa_assert(o);
4635     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4636
4637     if (PA_REFCNT_DEC(o) > 0)
4638         return;
4639
4640     pa_xfree(o->auth_group);
4641
4642     if (o->auth_ip_acl)
4643         pa_ip_acl_free(o->auth_ip_acl);
4644
4645     if (o->auth_cookie)
4646         pa_auth_cookie_unref(o->auth_cookie);
4647
4648     pa_xfree(o);
4649 }
4650
4651 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4652     pa_bool_t enabled;
4653     const char *acl;
4654
4655     pa_assert(o);
4656     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4657     pa_assert(ma);
4658
4659     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4660         pa_log("auth-anonymous= expects a boolean argument.");
4661         return -1;
4662     }
4663
4664     enabled = TRUE;
4665     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4666         pa_log("auth-group-enabled= expects a boolean argument.");
4667         return -1;
4668     }
4669
4670     pa_xfree(o->auth_group);
4671     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4672
4673 #ifndef HAVE_CREDS
4674     if (o->auth_group)
4675         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4676 #endif
4677
4678     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4679         pa_ip_acl *ipa;
4680
4681         if (!(ipa = pa_ip_acl_new(acl))) {
4682             pa_log("Failed to parse IP ACL '%s'", acl);
4683             return -1;
4684         }
4685
4686         if (o->auth_ip_acl)
4687             pa_ip_acl_free(o->auth_ip_acl);
4688
4689         o->auth_ip_acl = ipa;
4690     }
4691
4692     enabled = TRUE;
4693     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4694         pa_log("auth-cookie-enabled= expects a boolean argument.");
4695         return -1;
4696     }
4697
4698     if (o->auth_cookie)
4699         pa_auth_cookie_unref(o->auth_cookie);
4700
4701     if (enabled) {
4702         const char *cn;
4703
4704         /* The new name for this is 'auth-cookie', for compat reasons
4705          * we check the old name too */
4706         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4707             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4708                 cn = PA_NATIVE_COOKIE_FILE;
4709
4710         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4711             return -1;
4712
4713     } else
4714           o->auth_cookie = NULL;
4715
4716     return 0;
4717 }
4718
4719 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4720     pa_native_connection_assert_ref(c);
4721
4722     return c->pstream;
4723 }