Merge commit 'origin/master-tx'
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/timeval.h>
33 #include <pulse/version.h>
34 #include <pulse/utf8.h>
35 #include <pulse/util.h>
36 #include <pulse/xmalloc.h>
37
38 #include <pulsecore/native-common.h>
39 #include <pulsecore/packet.h>
40 #include <pulsecore/client.h>
41 #include <pulsecore/source-output.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/pstream.h>
44 #include <pulsecore/tagstruct.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream-util.h>
47 #include <pulsecore/authkey.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/core-subscribe.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/strlist.h>
53 #include <pulsecore/shared.h>
54 #include <pulsecore/sample-util.h>
55 #include <pulsecore/llist.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* Kick a client if it doesn't authenticate within this time */
64 #define AUTH_TIMEOUT 60
65
66 /* Don't accept more connection than this */
67 #define MAX_CONNECTIONS 64
68
69 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
70 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
71 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
72 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
73
74 struct pa_native_protocol;
75
76 typedef struct record_stream {
77     pa_msgobject parent;
78
79     pa_native_connection *connection;
80     uint32_t index;
81
82     pa_source_output *source_output;
83     pa_memblockq *memblockq;
84
85     pa_bool_t adjust_latency:1;
86     pa_bool_t early_requests:1;
87
88     pa_buffer_attr buffer_attr;
89
90     pa_atomic_t on_the_fly;
91     pa_usec_t configured_source_latency;
92     size_t drop_initial;
93
94     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
95     size_t on_the_fly_snapshot;
96     pa_usec_t current_monitor_latency;
97     pa_usec_t current_source_latency;
98 } record_stream;
99
100 PA_DECLARE_CLASS(record_stream);
101 #define RECORD_STREAM(o) (record_stream_cast(o))
102 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
103
104 typedef struct output_stream {
105     pa_msgobject parent;
106 } output_stream;
107
108 PA_DECLARE_CLASS(output_stream);
109 #define OUTPUT_STREAM(o) (output_stream_cast(o))
110 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
111
112 typedef struct playback_stream {
113     output_stream parent;
114
115     pa_native_connection *connection;
116     uint32_t index;
117
118     pa_sink_input *sink_input;
119     pa_memblockq *memblockq;
120
121     pa_bool_t adjust_latency:1;
122     pa_bool_t early_requests:1;
123
124     pa_bool_t is_underrun:1;
125     pa_bool_t drain_request:1;
126     uint32_t drain_tag;
127     uint32_t syncid;
128
129     pa_atomic_t missing;
130     pa_usec_t configured_sink_latency;
131     pa_buffer_attr buffer_attr;
132
133     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
134     int64_t read_index, write_index;
135     size_t render_memblockq_length;
136     pa_usec_t current_sink_latency;
137     uint64_t playing_for, underrun_for;
138 } playback_stream;
139
140 PA_DECLARE_CLASS(playback_stream);
141 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
142 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
143
144 typedef struct upload_stream {
145     output_stream parent;
146
147     pa_native_connection *connection;
148     uint32_t index;
149
150     pa_memchunk memchunk;
151     size_t length;
152     char *name;
153     pa_sample_spec sample_spec;
154     pa_channel_map channel_map;
155     pa_proplist *proplist;
156 } upload_stream;
157
158 PA_DECLARE_CLASS(upload_stream);
159 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
160 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
161
162 struct pa_native_connection {
163     pa_msgobject parent;
164     pa_native_protocol *protocol;
165     pa_native_options *options;
166     pa_bool_t authorized:1;
167     pa_bool_t is_local:1;
168     uint32_t version;
169     pa_client *client;
170     pa_pstream *pstream;
171     pa_pdispatch *pdispatch;
172     pa_idxset *record_streams, *output_streams;
173     uint32_t rrobin_index;
174     pa_subscription *subscription;
175     pa_time_event *auth_timeout_event;
176 };
177
178 PA_DECLARE_CLASS(pa_native_connection);
179 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
180 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
181
182 struct pa_native_protocol {
183     PA_REFCNT_DECLARE;
184
185     pa_core *core;
186     pa_idxset *connections;
187
188     pa_strlist *servers;
189     pa_hook hooks[PA_NATIVE_HOOK_MAX];
190
191     pa_hashmap *extensions;
192 };
193
194 enum {
195     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
196 };
197
198 enum {
199     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
200     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
201     SINK_INPUT_MESSAGE_FLUSH,
202     SINK_INPUT_MESSAGE_TRIGGER,
203     SINK_INPUT_MESSAGE_SEEK,
204     SINK_INPUT_MESSAGE_PREBUF_FORCE,
205     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
206     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
207 };
208
209 enum {
210     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
211     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
212     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
213     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
214     PLAYBACK_STREAM_MESSAGE_STARTED,
215     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
216 };
217
218 enum {
219     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
220 };
221
222 enum {
223     CONNECTION_MESSAGE_RELEASE,
224     CONNECTION_MESSAGE_REVOKE
225 };
226
227 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
228 static void sink_input_kill_cb(pa_sink_input *i);
229 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
230 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
231 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
232 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
235
236 static void native_connection_send_memblock(pa_native_connection *c);
237 static void playback_stream_request_bytes(struct playback_stream*s);
238
239 static void source_output_kill_cb(pa_source_output *o);
240 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
241 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
242 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
243 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
244 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
245
246 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
247 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248
249 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288
289 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
290     [PA_COMMAND_ERROR] = NULL,
291     [PA_COMMAND_TIMEOUT] = NULL,
292     [PA_COMMAND_REPLY] = NULL,
293     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
294     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
295     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
296     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
297     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
298     [PA_COMMAND_AUTH] = command_auth,
299     [PA_COMMAND_REQUEST] = NULL,
300     [PA_COMMAND_EXIT] = command_exit,
301     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
302     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
303     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
304     [PA_COMMAND_STAT] = command_stat,
305     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
306     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
307     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
308     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
309     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
310     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
311     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
312     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
313     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
314     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
315     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
316     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
317     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
318     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
319     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
320     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
321     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
328     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
329     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
330
331     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
332     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
333     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
334
335     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
336     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
337     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
338
339     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
340     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
341
342     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
343     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
344     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
346
347     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
348     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
349
350     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
351     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
352     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
353     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
354     [PA_COMMAND_KILL_CLIENT] = command_kill,
355     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
356     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
357     [PA_COMMAND_LOAD_MODULE] = command_load_module,
358     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
359
360     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
361     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
362     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
363     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
364
365     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
366     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
367
368     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
369     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
370
371     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
372     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
373
374     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
375     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
376     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
377
378     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
379     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
380     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
381
382     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
383
384     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
385     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
386
387     [PA_COMMAND_EXTENSION] = command_extension
388 };
389
390 /* structure management */
391
392 /* Called from main context */
393 static void upload_stream_unlink(upload_stream *s) {
394     pa_assert(s);
395
396     if (!s->connection)
397         return;
398
399     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
400     s->connection = NULL;
401     upload_stream_unref(s);
402 }
403
404 /* Called from main context */
405 static void upload_stream_free(pa_object *o) {
406     upload_stream *s = UPLOAD_STREAM(o);
407     pa_assert(s);
408
409     upload_stream_unlink(s);
410
411     pa_xfree(s->name);
412
413     if (s->proplist)
414         pa_proplist_free(s->proplist);
415
416     if (s->memchunk.memblock)
417         pa_memblock_unref(s->memchunk.memblock);
418
419     pa_xfree(s);
420 }
421
422 /* Called from main context */
423 static upload_stream* upload_stream_new(
424         pa_native_connection *c,
425         const pa_sample_spec *ss,
426         const pa_channel_map *map,
427         const char *name,
428         size_t length,
429         pa_proplist *p) {
430
431     upload_stream *s;
432
433     pa_assert(c);
434     pa_assert(ss);
435     pa_assert(name);
436     pa_assert(length > 0);
437     pa_assert(p);
438
439     s = pa_msgobject_new(upload_stream);
440     s->parent.parent.parent.free = upload_stream_free;
441     s->connection = c;
442     s->sample_spec = *ss;
443     s->channel_map = *map;
444     s->name = pa_xstrdup(name);
445     pa_memchunk_reset(&s->memchunk);
446     s->length = length;
447     s->proplist = pa_proplist_copy(p);
448     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
449
450     pa_idxset_put(c->output_streams, s, &s->index);
451
452     return s;
453 }
454
455 /* Called from main context */
456 static void record_stream_unlink(record_stream *s) {
457     pa_assert(s);
458
459     if (!s->connection)
460         return;
461
462     if (s->source_output) {
463         pa_source_output_unlink(s->source_output);
464         pa_source_output_unref(s->source_output);
465         s->source_output = NULL;
466     }
467
468     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
469     s->connection = NULL;
470     record_stream_unref(s);
471 }
472
473 /* Called from main context */
474 static void record_stream_free(pa_object *o) {
475     record_stream *s = RECORD_STREAM(o);
476     pa_assert(s);
477
478     record_stream_unlink(s);
479
480     pa_memblockq_free(s->memblockq);
481     pa_xfree(s);
482 }
483
484 /* Called from main context */
485 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
486     record_stream *s = RECORD_STREAM(o);
487     record_stream_assert_ref(s);
488
489     if (!s->connection)
490         return -1;
491
492     switch (code) {
493
494         case RECORD_STREAM_MESSAGE_POST_DATA:
495
496             /* We try to keep up to date with how many bytes are
497              * currently on the fly */
498             pa_atomic_sub(&s->on_the_fly, chunk->length);
499
500             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
501 /*                 pa_log_warn("Failed to push data into output queue."); */
502                 return -1;
503             }
504
505             if (!pa_pstream_is_pending(s->connection->pstream))
506                 native_connection_send_memblock(s->connection);
507
508             break;
509     }
510
511     return 0;
512 }
513
514 /* Called from main context */
515 static void fix_record_buffer_attr_pre(record_stream *s) {
516
517     size_t frame_size;
518     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
519
520     pa_assert(s);
521
522     /* This function will be called from the main thread, before as
523      * well as after the source output has been activated using
524      * pa_source_output_put()! That means it may not touch any
525      * ->thread_info data! */
526
527     frame_size = pa_frame_size(&s->source_output->sample_spec);
528
529     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
530         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
531     if (s->buffer_attr.maxlength <= 0)
532         s->buffer_attr.maxlength = (uint32_t) frame_size;
533
534     if (s->buffer_attr.fragsize == (uint32_t) -1)
535         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
536     if (s->buffer_attr.fragsize <= 0)
537         s->buffer_attr.fragsize = (uint32_t) frame_size;
538
539     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
540
541     if (s->early_requests) {
542
543         /* In early request mode we need to emulate the classic
544          * fragment-based playback model. We do this setting the source
545          * latency to the fragment size. */
546
547         source_usec = fragsize_usec;
548
549     } else if (s->adjust_latency) {
550
551         /* So, the user asked us to adjust the latency according to
552          * what the source can provide. Half the latency will be
553          * spent on the hw buffer, half of it in the async buffer
554          * queue we maintain for each client. */
555
556         source_usec = fragsize_usec/2;
557
558     } else {
559
560         /* Ok, the user didn't ask us to adjust the latency, hence we
561          * don't */
562
563         source_usec = (pa_usec_t) -1;
564     }
565
566     if (source_usec != (pa_usec_t) -1)
567         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
568     else
569         s->configured_source_latency = 0;
570
571     if (s->early_requests) {
572
573         /* Ok, we didn't necessarily get what we were asking for, so
574          * let's tell the user */
575
576         fragsize_usec = s->configured_source_latency;
577
578     } else if (s->adjust_latency) {
579
580         /* Now subtract what we actually got */
581
582         if (fragsize_usec >= s->configured_source_latency*2)
583             fragsize_usec -= s->configured_source_latency;
584         else
585             fragsize_usec = s->configured_source_latency;
586     }
587
588     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
589         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
590
591         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
592
593     if (s->buffer_attr.fragsize <= 0)
594         s->buffer_attr.fragsize = (uint32_t) frame_size;
595 }
596
597 /* Called from main context */
598 static void fix_record_buffer_attr_post(record_stream *s) {
599     size_t base;
600
601     pa_assert(s);
602
603     /* This function will be called from the main thread, before as
604      * well as after the source output has been activated using
605      * pa_source_output_put()! That means it may not touch and
606      * ->thread_info data! */
607
608     base = pa_frame_size(&s->source_output->sample_spec);
609
610     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
611     if (s->buffer_attr.fragsize <= 0)
612         s->buffer_attr.fragsize = base;
613
614     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
615         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
616 }
617
618 /* Called from main context */
619 static record_stream* record_stream_new(
620         pa_native_connection *c,
621         pa_source *source,
622         pa_sample_spec *ss,
623         pa_channel_map *map,
624         pa_bool_t peak_detect,
625         pa_buffer_attr *attr,
626         pa_source_output_flags_t flags,
627         pa_proplist *p,
628         pa_bool_t adjust_latency,
629         pa_sink_input *direct_on_input,
630         pa_bool_t early_requests,
631         int *ret) {
632
633     record_stream *s;
634     pa_source_output *source_output = NULL;
635     size_t base;
636     pa_source_output_new_data data;
637
638     pa_assert(c);
639     pa_assert(ss);
640     pa_assert(p);
641     pa_assert(ret);
642
643     pa_source_output_new_data_init(&data);
644
645     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
646     data.driver = __FILE__;
647     data.module = c->options->module;
648     data.client = c->client;
649     data.source = source;
650     data.direct_on_input = direct_on_input;
651     pa_source_output_new_data_set_sample_spec(&data, ss);
652     pa_source_output_new_data_set_channel_map(&data, map);
653     if (peak_detect)
654         data.resample_method = PA_RESAMPLER_PEAKS;
655
656     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
657
658     pa_source_output_new_data_done(&data);
659
660     if (!source_output)
661         return NULL;
662
663     s = pa_msgobject_new(record_stream);
664     s->parent.parent.free = record_stream_free;
665     s->parent.process_msg = record_stream_process_msg;
666     s->connection = c;
667     s->source_output = source_output;
668     s->buffer_attr = *attr;
669     s->adjust_latency = adjust_latency;
670     s->early_requests = early_requests;
671     pa_atomic_store(&s->on_the_fly, 0);
672
673     s->source_output->parent.process_msg = source_output_process_msg;
674     s->source_output->push = source_output_push_cb;
675     s->source_output->kill = source_output_kill_cb;
676     s->source_output->get_latency = source_output_get_latency_cb;
677     s->source_output->moving = source_output_moving_cb;
678     s->source_output->suspend = source_output_suspend_cb;
679     s->source_output->send_event = source_output_send_event_cb;
680     s->source_output->userdata = s;
681
682     fix_record_buffer_attr_pre(s);
683
684     s->memblockq = pa_memblockq_new(
685             0,
686             s->buffer_attr.maxlength,
687             0,
688             base = pa_frame_size(&source_output->sample_spec),
689             1,
690             0,
691             0,
692             NULL);
693
694     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
695     fix_record_buffer_attr_post(s);
696
697     *ss = s->source_output->sample_spec;
698     *map = s->source_output->channel_map;
699
700     pa_idxset_put(c->record_streams, s, &s->index);
701
702     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
703                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
704                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
705                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
706
707     pa_source_output_put(s->source_output);
708     return s;
709 }
710
711 /* Called from main context */
712 static void record_stream_send_killed(record_stream *r) {
713     pa_tagstruct *t;
714     record_stream_assert_ref(r);
715
716     t = pa_tagstruct_new(NULL, 0);
717     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
718     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
719     pa_tagstruct_putu32(t, r->index);
720     pa_pstream_send_tagstruct(r->connection->pstream, t);
721 }
722
723 /* Called from main context */
724 static void playback_stream_unlink(playback_stream *s) {
725     pa_assert(s);
726
727     if (!s->connection)
728         return;
729
730     if (s->sink_input) {
731         pa_sink_input_unlink(s->sink_input);
732         pa_sink_input_unref(s->sink_input);
733         s->sink_input = NULL;
734     }
735
736     if (s->drain_request)
737         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
738
739     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
740     s->connection = NULL;
741     playback_stream_unref(s);
742 }
743
744 /* Called from main context */
745 static void playback_stream_free(pa_object* o) {
746     playback_stream *s = PLAYBACK_STREAM(o);
747     pa_assert(s);
748
749     playback_stream_unlink(s);
750
751     pa_memblockq_free(s->memblockq);
752     pa_xfree(s);
753 }
754
755 /* Called from main context */
756 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
757     playback_stream *s = PLAYBACK_STREAM(o);
758     playback_stream_assert_ref(s);
759
760     if (!s->connection)
761         return -1;
762
763     switch (code) {
764         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
765             pa_tagstruct *t;
766             int l = 0;
767
768             for (;;) {
769                 if ((l = pa_atomic_load(&s->missing)) <= 0)
770                     return 0;
771
772                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
773                     break;
774             }
775
776             t = pa_tagstruct_new(NULL, 0);
777             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
778             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
779             pa_tagstruct_putu32(t, s->index);
780             pa_tagstruct_putu32(t, (uint32_t) l);
781             pa_pstream_send_tagstruct(s->connection->pstream, t);
782
783 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
784             break;
785         }
786
787         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
788             pa_tagstruct *t;
789
790 /*             pa_log("signalling underflow"); */
791
792             /* Report that we're empty */
793             t = pa_tagstruct_new(NULL, 0);
794             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
795             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
796             pa_tagstruct_putu32(t, s->index);
797             pa_pstream_send_tagstruct(s->connection->pstream, t);
798             break;
799         }
800
801         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
802             pa_tagstruct *t;
803
804             /* Notify the user we're overflowed*/
805             t = pa_tagstruct_new(NULL, 0);
806             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
807             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
808             pa_tagstruct_putu32(t, s->index);
809             pa_pstream_send_tagstruct(s->connection->pstream, t);
810             break;
811         }
812
813         case PLAYBACK_STREAM_MESSAGE_STARTED:
814
815             if (s->connection->version >= 13) {
816                 pa_tagstruct *t;
817
818                 /* Notify the user we started playback */
819                 t = pa_tagstruct_new(NULL, 0);
820                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
821                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
822                 pa_tagstruct_putu32(t, s->index);
823                 pa_pstream_send_tagstruct(s->connection->pstream, t);
824             }
825
826             break;
827
828         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
829             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
830             break;
831
832         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
833             pa_tagstruct *t;
834
835             s->buffer_attr.tlength = (uint32_t) offset;
836
837             t = pa_tagstruct_new(NULL, 0);
838             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
839             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
840             pa_tagstruct_putu32(t, s->index);
841             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
842             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
843             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
844             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
845             pa_tagstruct_put_usec(t, s->configured_sink_latency);
846             pa_pstream_send_tagstruct(s->connection->pstream, t);
847
848             break;
849         }
850     }
851
852     return 0;
853 }
854
855 /* Called from main context */
856 static void fix_playback_buffer_attr(playback_stream *s) {
857     size_t frame_size, max_prebuf;
858     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
859
860     pa_assert(s);
861
862     /* This function will be called from the main thread, before as
863      * well as after the sink input has been activated using
864      * pa_sink_input_put()! That means it may not touch any
865      * ->thread_info data, such as the memblockq! */
866
867     frame_size = pa_frame_size(&s->sink_input->sample_spec);
868
869     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
870         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
871     if (s->buffer_attr.maxlength <= 0)
872         s->buffer_attr.maxlength = (uint32_t) frame_size;
873
874     if (s->buffer_attr.tlength == (uint32_t) -1)
875         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
876     if (s->buffer_attr.tlength <= 0)
877         s->buffer_attr.tlength = (uint32_t) frame_size;
878
879     if (s->buffer_attr.minreq == (uint32_t) -1)
880         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
881     if (s->buffer_attr.minreq <= 0)
882         s->buffer_attr.minreq = (uint32_t) frame_size;
883
884     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
885         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
886
887     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
888     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
889
890     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
891                 (double) tlength_usec / PA_USEC_PER_MSEC,
892                 (double) minreq_usec / PA_USEC_PER_MSEC);
893
894     if (s->early_requests) {
895
896         /* In early request mode we need to emulate the classic
897          * fragment-based playback model. We do this setting the sink
898          * latency to the fragment size. */
899
900         sink_usec = minreq_usec;
901         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
902
903     } else if (s->adjust_latency) {
904
905         /* So, the user asked us to adjust the latency of the stream
906          * buffer according to the what the sink can provide. The
907          * tlength passed in shall be the overall latency. Roughly
908          * half the latency will be spent on the hw buffer, the other
909          * half of it in the async buffer queue we maintain for each
910          * client. In between we'll have a safety space of size
911          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
912          * empty and needs to be filled, then our buffer must have
913          * enough data to fulfill this request immediatly and thus
914          * have at least the same tlength as the size of the hw
915          * buffer. It additionally needs space for 2 times minreq
916          * because if the buffer ran empty and a partial fillup
917          * happens immediately on the next iteration we need to be
918          * able to fulfill it and give the application also minreq
919          * time to fill it up again for the next request Makes 2 times
920          * minreq in plus.. */
921
922         if (tlength_usec > minreq_usec*2)
923             sink_usec = (tlength_usec - minreq_usec*2)/2;
924         else
925             sink_usec = 0;
926
927         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
928
929     } else {
930
931         /* Ok, the user didn't ask us to adjust the latency, but we
932          * still need to make sure that the parameters from the user
933          * do make sense. */
934
935         if (tlength_usec > minreq_usec*2)
936             sink_usec = (tlength_usec - minreq_usec*2);
937         else
938             sink_usec = 0;
939
940         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
941     }
942
943     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
944
945     if (s->early_requests) {
946
947         /* Ok, we didn't necessarily get what we were asking for, so
948          * let's tell the user */
949
950         minreq_usec = s->configured_sink_latency;
951
952     } else if (s->adjust_latency) {
953
954         /* Ok, we didn't necessarily get what we were asking for, so
955          * let's subtract from what we asked for for the remaining
956          * buffer space */
957
958         if (tlength_usec >= s->configured_sink_latency)
959             tlength_usec -= s->configured_sink_latency;
960     }
961
962     /* FIXME: This is actually larger than necessary, since not all of
963      * the sink latency is actually rewritable. */
964     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
965         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
966
967     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
968         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
969         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
970
971     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
972         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
973         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
974
975     if (s->buffer_attr.minreq <= 0) {
976         s->buffer_attr.minreq = (uint32_t) frame_size;
977         s->buffer_attr.tlength += (uint32_t) frame_size*2;
978     }
979
980     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
981         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
982
983     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
984
985     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
986         s->buffer_attr.prebuf > max_prebuf)
987         s->buffer_attr.prebuf = max_prebuf;
988 }
989
990 /* Called from main context */
991 static playback_stream* playback_stream_new(
992         pa_native_connection *c,
993         pa_sink *sink,
994         pa_sample_spec *ss,
995         pa_channel_map *map,
996         pa_buffer_attr *a,
997         pa_cvolume *volume,
998         pa_bool_t muted,
999         pa_bool_t muted_set,
1000         uint32_t syncid,
1001         uint32_t *missing,
1002         pa_sink_input_flags_t flags,
1003         pa_proplist *p,
1004         pa_bool_t adjust_latency,
1005         pa_bool_t early_requests,
1006         int *ret) {
1007
1008     playback_stream *s, *ssync;
1009     pa_sink_input *sink_input = NULL;
1010     pa_memchunk silence;
1011     uint32_t idx;
1012     int64_t start_index;
1013     pa_sink_input_new_data data;
1014
1015     pa_assert(c);
1016     pa_assert(ss);
1017     pa_assert(missing);
1018     pa_assert(p);
1019     pa_assert(ret);
1020
1021     /* Find syncid group */
1022     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1023
1024         if (!playback_stream_isinstance(ssync))
1025             continue;
1026
1027         if (ssync->syncid == syncid)
1028             break;
1029     }
1030
1031     /* Synced streams must connect to the same sink */
1032     if (ssync) {
1033
1034         if (!sink)
1035             sink = ssync->sink_input->sink;
1036         else if (sink != ssync->sink_input->sink) {
1037             *ret = PA_ERR_INVALID;
1038             return NULL;
1039         }
1040     }
1041
1042     pa_sink_input_new_data_init(&data);
1043
1044     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1045     data.driver = __FILE__;
1046     data.module = c->options->module;
1047     data.client = c->client;
1048     data.sink = sink;
1049     pa_sink_input_new_data_set_sample_spec(&data, ss);
1050     pa_sink_input_new_data_set_channel_map(&data, map);
1051     if (volume)
1052         pa_sink_input_new_data_set_volume(&data, volume);
1053     if (muted_set)
1054         pa_sink_input_new_data_set_muted(&data, muted);
1055     data.sync_base = ssync ? ssync->sink_input : NULL;
1056
1057     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1058
1059     pa_sink_input_new_data_done(&data);
1060
1061     if (!sink_input)
1062         return NULL;
1063
1064     s = pa_msgobject_new(playback_stream);
1065     s->parent.parent.parent.free = playback_stream_free;
1066     s->parent.parent.process_msg = playback_stream_process_msg;
1067     s->connection = c;
1068     s->syncid = syncid;
1069     s->sink_input = sink_input;
1070     s->is_underrun = TRUE;
1071     s->drain_request = FALSE;
1072     pa_atomic_store(&s->missing, 0);
1073     s->buffer_attr = *a;
1074     s->adjust_latency = adjust_latency;
1075     s->early_requests = early_requests;
1076
1077     s->sink_input->parent.process_msg = sink_input_process_msg;
1078     s->sink_input->pop = sink_input_pop_cb;
1079     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1080     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1081     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1082     s->sink_input->kill = sink_input_kill_cb;
1083     s->sink_input->moving = sink_input_moving_cb;
1084     s->sink_input->suspend = sink_input_suspend_cb;
1085     s->sink_input->send_event = sink_input_send_event_cb;
1086     s->sink_input->userdata = s;
1087
1088     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1089
1090     fix_playback_buffer_attr(s);
1091
1092     pa_sink_input_get_silence(sink_input, &silence);
1093     s->memblockq = pa_memblockq_new(
1094             start_index,
1095             s->buffer_attr.maxlength,
1096             s->buffer_attr.tlength,
1097             pa_frame_size(&sink_input->sample_spec),
1098             s->buffer_attr.prebuf,
1099             s->buffer_attr.minreq,
1100             0,
1101             &silence);
1102     pa_memblock_unref(silence.memblock);
1103
1104     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1105
1106     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1107
1108     *ss = s->sink_input->sample_spec;
1109     *map = s->sink_input->channel_map;
1110
1111     pa_idxset_put(c->output_streams, s, &s->index);
1112
1113     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1114                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1115                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1116                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1117                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1118
1119     pa_sink_input_put(s->sink_input);
1120     return s;
1121 }
1122
1123 /* Called from IO context */
1124 static void playback_stream_request_bytes(playback_stream *s) {
1125     size_t m, minreq;
1126     int previous_missing;
1127
1128     playback_stream_assert_ref(s);
1129
1130     m = pa_memblockq_pop_missing(s->memblockq);
1131
1132     if (m <= 0)
1133         return;
1134
1135 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1136
1137     previous_missing = pa_atomic_add(&s->missing, (int) m);
1138     minreq = pa_memblockq_get_minreq(s->memblockq);
1139
1140     if (pa_memblockq_prebuf_active(s->memblockq) ||
1141         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1142         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1143 }
1144
1145
1146 /* Called from main context */
1147 static void playback_stream_send_killed(playback_stream *p) {
1148     pa_tagstruct *t;
1149     playback_stream_assert_ref(p);
1150
1151     t = pa_tagstruct_new(NULL, 0);
1152     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1153     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1154     pa_tagstruct_putu32(t, p->index);
1155     pa_pstream_send_tagstruct(p->connection->pstream, t);
1156 }
1157
1158 /* Called from main context */
1159 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1160     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1161     pa_native_connection_assert_ref(c);
1162
1163     if (!c->protocol)
1164         return -1;
1165
1166     switch (code) {
1167
1168         case CONNECTION_MESSAGE_REVOKE:
1169             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1170             break;
1171
1172         case CONNECTION_MESSAGE_RELEASE:
1173             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1174             break;
1175     }
1176
1177     return 0;
1178 }
1179
1180 /* Called from main context */
1181 static void native_connection_unlink(pa_native_connection *c) {
1182     record_stream *r;
1183     output_stream *o;
1184
1185     pa_assert(c);
1186
1187     if (!c->protocol)
1188         return;
1189
1190     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1191
1192     if (c->options)
1193         pa_native_options_unref(c->options);
1194
1195     while ((r = pa_idxset_first(c->record_streams, NULL)))
1196         record_stream_unlink(r);
1197
1198     while ((o = pa_idxset_first(c->output_streams, NULL)))
1199         if (playback_stream_isinstance(o))
1200             playback_stream_unlink(PLAYBACK_STREAM(o));
1201         else
1202             upload_stream_unlink(UPLOAD_STREAM(o));
1203
1204     if (c->subscription)
1205         pa_subscription_free(c->subscription);
1206
1207     if (c->pstream)
1208         pa_pstream_unlink(c->pstream);
1209
1210     if (c->auth_timeout_event) {
1211         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1212         c->auth_timeout_event = NULL;
1213     }
1214
1215     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1216     c->protocol = NULL;
1217     pa_native_connection_unref(c);
1218 }
1219
1220 /* Called from main context */
1221 static void native_connection_free(pa_object *o) {
1222     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1223
1224     pa_assert(c);
1225
1226     native_connection_unlink(c);
1227
1228     pa_idxset_free(c->record_streams, NULL, NULL);
1229     pa_idxset_free(c->output_streams, NULL, NULL);
1230
1231     pa_pdispatch_unref(c->pdispatch);
1232     pa_pstream_unref(c->pstream);
1233     pa_client_free(c->client);
1234
1235     pa_xfree(c);
1236 }
1237
1238 /* Called from main context */
1239 static void native_connection_send_memblock(pa_native_connection *c) {
1240     uint32_t start;
1241     record_stream *r;
1242
1243     start = PA_IDXSET_INVALID;
1244     for (;;) {
1245         pa_memchunk chunk;
1246
1247         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1248             return;
1249
1250         if (start == PA_IDXSET_INVALID)
1251             start = c->rrobin_index;
1252         else if (start == c->rrobin_index)
1253             return;
1254
1255         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1256             pa_memchunk schunk = chunk;
1257
1258             if (schunk.length > r->buffer_attr.fragsize)
1259                 schunk.length = r->buffer_attr.fragsize;
1260
1261             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1262
1263             pa_memblockq_drop(r->memblockq, schunk.length);
1264             pa_memblock_unref(schunk.memblock);
1265
1266             return;
1267         }
1268     }
1269 }
1270
1271 /*** sink input callbacks ***/
1272
1273 /* Called from thread context */
1274 static void handle_seek(playback_stream *s, int64_t indexw) {
1275     playback_stream_assert_ref(s);
1276
1277 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1278
1279     if (s->sink_input->thread_info.underrun_for > 0) {
1280
1281 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1282
1283         if (pa_memblockq_is_readable(s->memblockq)) {
1284
1285             /* We just ended an underrun, let's ask the sink
1286              * for a complete rewind rewrite */
1287
1288             pa_log_debug("Requesting rewind due to end of underrun.");
1289             pa_sink_input_request_rewind(s->sink_input,
1290                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1291                                          FALSE, TRUE, FALSE);
1292         }
1293
1294     } else {
1295         int64_t indexr;
1296
1297         indexr = pa_memblockq_get_read_index(s->memblockq);
1298
1299         if (indexw < indexr) {
1300             /* OK, the sink already asked for this data, so
1301              * let's have it usk us again */
1302
1303             pa_log_debug("Requesting rewind due to rewrite.");
1304             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1305         }
1306     }
1307
1308     playback_stream_request_bytes(s);
1309 }
1310
1311 /* Called from thread context */
1312 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1313     pa_sink_input *i = PA_SINK_INPUT(o);
1314     playback_stream *s;
1315
1316     pa_sink_input_assert_ref(i);
1317     s = PLAYBACK_STREAM(i->userdata);
1318     playback_stream_assert_ref(s);
1319
1320     switch (code) {
1321
1322         case SINK_INPUT_MESSAGE_SEEK: {
1323             int64_t windex;
1324
1325             windex = pa_memblockq_get_write_index(s->memblockq);
1326
1327             /* The client side is incapable of accounting correctly
1328              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1329              * able to deal with that. */
1330
1331             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1332
1333             handle_seek(s, windex);
1334             return 0;
1335         }
1336
1337         case SINK_INPUT_MESSAGE_POST_DATA: {
1338             int64_t windex;
1339
1340             pa_assert(chunk);
1341
1342             windex = pa_memblockq_get_write_index(s->memblockq);
1343
1344 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1345
1346             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1347                 pa_log_warn("Failed to push data into queue");
1348                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1349                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1350             }
1351
1352             handle_seek(s, windex);
1353
1354 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1355
1356             return 0;
1357         }
1358
1359         case SINK_INPUT_MESSAGE_DRAIN:
1360         case SINK_INPUT_MESSAGE_FLUSH:
1361         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1362         case SINK_INPUT_MESSAGE_TRIGGER: {
1363
1364             int64_t windex;
1365             pa_sink_input *isync;
1366             void (*func)(pa_memblockq *bq);
1367
1368             switch  (code) {
1369                 case SINK_INPUT_MESSAGE_FLUSH:
1370                     func = pa_memblockq_flush_write;
1371                     break;
1372
1373                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1374                     func = pa_memblockq_prebuf_force;
1375                     break;
1376
1377                 case SINK_INPUT_MESSAGE_DRAIN:
1378                 case SINK_INPUT_MESSAGE_TRIGGER:
1379                     func = pa_memblockq_prebuf_disable;
1380                     break;
1381
1382                 default:
1383                     pa_assert_not_reached();
1384             }
1385
1386             windex = pa_memblockq_get_write_index(s->memblockq);
1387             func(s->memblockq);
1388             handle_seek(s, windex);
1389
1390             /* Do the same for all other members in the sync group */
1391             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1392                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1393                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1394                 func(ssync->memblockq);
1395                 handle_seek(ssync, windex);
1396             }
1397
1398             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1399                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1400                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1401                 func(ssync->memblockq);
1402                 handle_seek(ssync, windex);
1403             }
1404
1405             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1406                 if (!pa_memblockq_is_readable(s->memblockq))
1407                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1408                 else {
1409                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1410                     s->drain_request = TRUE;
1411                 }
1412             }
1413
1414             return 0;
1415         }
1416
1417         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1418             /* Atomically get a snapshot of all timing parameters... */
1419             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1420             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1421             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1422             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1423             s->underrun_for = s->sink_input->thread_info.underrun_for;
1424             s->playing_for = s->sink_input->thread_info.playing_for;
1425
1426             return 0;
1427
1428         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1429             int64_t windex;
1430
1431             windex = pa_memblockq_get_write_index(s->memblockq);
1432
1433             pa_memblockq_prebuf_force(s->memblockq);
1434
1435             handle_seek(s, windex);
1436
1437             /* Fall through to the default handler */
1438             break;
1439         }
1440
1441         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1442             pa_usec_t *r = userdata;
1443
1444             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1445
1446             /* Fall through, the default handler will add in the extra
1447              * latency added by the resampler */
1448             break;
1449         }
1450
1451         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1452             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1453             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1454             return 0;
1455         }
1456     }
1457
1458     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1459 }
1460
1461 /* Called from thread context */
1462 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1463     playback_stream *s;
1464
1465     pa_sink_input_assert_ref(i);
1466     s = PLAYBACK_STREAM(i->userdata);
1467     playback_stream_assert_ref(s);
1468     pa_assert(chunk);
1469
1470 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1471
1472     if (pa_memblockq_is_readable(s->memblockq))
1473         s->is_underrun = FALSE;
1474     else {
1475         if (!s->is_underrun)
1476             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1477
1478         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1479             s->drain_request = FALSE;
1480             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1481         } else if (!s->is_underrun)
1482             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1483
1484         s->is_underrun = TRUE;
1485
1486         playback_stream_request_bytes(s);
1487     }
1488
1489     /* This call will not fail with prebuf=0, hence we check for
1490        underrun explicitly above */
1491     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1492         return -1;
1493
1494     chunk->length = PA_MIN(nbytes, chunk->length);
1495
1496     if (i->thread_info.underrun_for > 0)
1497         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1498
1499     pa_memblockq_drop(s->memblockq, chunk->length);
1500     playback_stream_request_bytes(s);
1501
1502     return 0;
1503 }
1504
1505 /* Called from thread context */
1506 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1507     playback_stream *s;
1508
1509     pa_sink_input_assert_ref(i);
1510     s = PLAYBACK_STREAM(i->userdata);
1511     playback_stream_assert_ref(s);
1512
1513     /* If we are in an underrun, then we don't rewind */
1514     if (i->thread_info.underrun_for > 0)
1515         return;
1516
1517     pa_memblockq_rewind(s->memblockq, nbytes);
1518 }
1519
1520 /* Called from thread context */
1521 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1522     playback_stream *s;
1523
1524     pa_sink_input_assert_ref(i);
1525     s = PLAYBACK_STREAM(i->userdata);
1526     playback_stream_assert_ref(s);
1527
1528     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1529 }
1530
1531 /* Called from thread context */
1532 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1533     playback_stream *s;
1534     size_t new_tlength, old_tlength;
1535
1536     pa_sink_input_assert_ref(i);
1537     s = PLAYBACK_STREAM(i->userdata);
1538     playback_stream_assert_ref(s);
1539
1540     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1541     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1542
1543     if (old_tlength < new_tlength) {
1544         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1545         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1546         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1547
1548         if (new_tlength == old_tlength)
1549             pa_log_debug("Failed to increase tlength");
1550         else {
1551             pa_log_debug("Notifying client about increased tlength");
1552             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1553         }
1554     }
1555 }
1556
1557 /* Called from main context */
1558 static void sink_input_kill_cb(pa_sink_input *i) {
1559     playback_stream *s;
1560
1561     pa_sink_input_assert_ref(i);
1562     s = PLAYBACK_STREAM(i->userdata);
1563     playback_stream_assert_ref(s);
1564
1565     playback_stream_send_killed(s);
1566     playback_stream_unlink(s);
1567 }
1568
1569 /* Called from main context */
1570 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1571     playback_stream *s;
1572     pa_tagstruct *t;
1573
1574     pa_sink_input_assert_ref(i);
1575     s = PLAYBACK_STREAM(i->userdata);
1576     playback_stream_assert_ref(s);
1577
1578     if (s->connection->version < 15)
1579       return;
1580
1581     t = pa_tagstruct_new(NULL, 0);
1582     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1583     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1584     pa_tagstruct_putu32(t, s->index);
1585     pa_tagstruct_puts(t, event);
1586     pa_tagstruct_put_proplist(t, pl);
1587     pa_pstream_send_tagstruct(s->connection->pstream, t);
1588 }
1589
1590 /* Called from main context */
1591 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1592     playback_stream *s;
1593     pa_tagstruct *t;
1594
1595     pa_sink_input_assert_ref(i);
1596     s = PLAYBACK_STREAM(i->userdata);
1597     playback_stream_assert_ref(s);
1598
1599     if (s->connection->version < 12)
1600       return;
1601
1602     t = pa_tagstruct_new(NULL, 0);
1603     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1604     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1605     pa_tagstruct_putu32(t, s->index);
1606     pa_tagstruct_put_boolean(t, suspend);
1607     pa_pstream_send_tagstruct(s->connection->pstream, t);
1608 }
1609
1610 /* Called from main context */
1611 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1612     playback_stream *s;
1613     pa_tagstruct *t;
1614
1615     pa_sink_input_assert_ref(i);
1616     s = PLAYBACK_STREAM(i->userdata);
1617     playback_stream_assert_ref(s);
1618
1619     fix_playback_buffer_attr(s);
1620     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1621     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1622
1623     if (s->connection->version < 12)
1624       return;
1625
1626     t = pa_tagstruct_new(NULL, 0);
1627     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1628     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1629     pa_tagstruct_putu32(t, s->index);
1630     pa_tagstruct_putu32(t, dest->index);
1631     pa_tagstruct_puts(t, dest->name);
1632     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1633
1634     if (s->connection->version >= 13) {
1635         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1636         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1637         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1638         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1639         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1640     }
1641
1642     pa_pstream_send_tagstruct(s->connection->pstream, t);
1643 }
1644
1645 /*** source_output callbacks ***/
1646
1647 /* Called from thread context */
1648 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1649     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1650     record_stream *s;
1651
1652     pa_source_output_assert_ref(o);
1653     s = RECORD_STREAM(o->userdata);
1654     record_stream_assert_ref(s);
1655
1656     switch (code) {
1657         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1658             /* Atomically get a snapshot of all timing parameters... */
1659             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1660             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1661             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1662             return 0;
1663     }
1664
1665     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1666 }
1667
1668 /* Called from thread context */
1669 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1670     record_stream *s;
1671
1672     pa_source_output_assert_ref(o);
1673     s = RECORD_STREAM(o->userdata);
1674     record_stream_assert_ref(s);
1675     pa_assert(chunk);
1676
1677     pa_atomic_add(&s->on_the_fly, chunk->length);
1678     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1679 }
1680
1681 static void source_output_kill_cb(pa_source_output *o) {
1682     record_stream *s;
1683
1684     pa_source_output_assert_ref(o);
1685     s = RECORD_STREAM(o->userdata);
1686     record_stream_assert_ref(s);
1687
1688     record_stream_send_killed(s);
1689     record_stream_unlink(s);
1690 }
1691
1692 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1693     record_stream *s;
1694
1695     pa_source_output_assert_ref(o);
1696     s = RECORD_STREAM(o->userdata);
1697     record_stream_assert_ref(s);
1698
1699     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1700
1701     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1702 }
1703
1704 /* Called from main context */
1705 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1706     record_stream *s;
1707     pa_tagstruct *t;
1708
1709     pa_source_output_assert_ref(o);
1710     s = RECORD_STREAM(o->userdata);
1711     record_stream_assert_ref(s);
1712
1713     if (s->connection->version < 15)
1714       return;
1715
1716     t = pa_tagstruct_new(NULL, 0);
1717     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1718     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1719     pa_tagstruct_putu32(t, s->index);
1720     pa_tagstruct_puts(t, event);
1721     pa_tagstruct_put_proplist(t, pl);
1722     pa_pstream_send_tagstruct(s->connection->pstream, t);
1723 }
1724
1725 /* Called from main context */
1726 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1727     record_stream *s;
1728     pa_tagstruct *t;
1729
1730     pa_source_output_assert_ref(o);
1731     s = RECORD_STREAM(o->userdata);
1732     record_stream_assert_ref(s);
1733
1734     if (s->connection->version < 12)
1735       return;
1736
1737     t = pa_tagstruct_new(NULL, 0);
1738     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1739     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1740     pa_tagstruct_putu32(t, s->index);
1741     pa_tagstruct_put_boolean(t, suspend);
1742     pa_pstream_send_tagstruct(s->connection->pstream, t);
1743 }
1744
1745 /* Called from main context */
1746 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1747     record_stream *s;
1748     pa_tagstruct *t;
1749
1750     pa_source_output_assert_ref(o);
1751     s = RECORD_STREAM(o->userdata);
1752     record_stream_assert_ref(s);
1753
1754     fix_record_buffer_attr_pre(s);
1755     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1756     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1757     fix_record_buffer_attr_post(s);
1758
1759     if (s->connection->version < 12)
1760       return;
1761
1762     t = pa_tagstruct_new(NULL, 0);
1763     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1764     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1765     pa_tagstruct_putu32(t, s->index);
1766     pa_tagstruct_putu32(t, dest->index);
1767     pa_tagstruct_puts(t, dest->name);
1768     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1769
1770     if (s->connection->version >= 13) {
1771         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1772         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1773         pa_tagstruct_put_usec(t, s->configured_source_latency);
1774     }
1775
1776     pa_pstream_send_tagstruct(s->connection->pstream, t);
1777 }
1778
1779 /*** pdispatch callbacks ***/
1780
1781 static void protocol_error(pa_native_connection *c) {
1782     pa_log("protocol error, kicking client");
1783     native_connection_unlink(c);
1784 }
1785
1786 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1787 if (!(expression)) { \
1788     pa_pstream_send_error((pstream), (tag), (error)); \
1789     return; \
1790 } \
1791 } while(0);
1792
1793 static pa_tagstruct *reply_new(uint32_t tag) {
1794     pa_tagstruct *reply;
1795
1796     reply = pa_tagstruct_new(NULL, 0);
1797     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1798     pa_tagstruct_putu32(reply, tag);
1799     return reply;
1800 }
1801
1802 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1803     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1804     playback_stream *s;
1805     uint32_t sink_index, syncid, missing;
1806     pa_buffer_attr attr;
1807     const char *name = NULL, *sink_name;
1808     pa_sample_spec ss;
1809     pa_channel_map map;
1810     pa_tagstruct *reply;
1811     pa_sink *sink = NULL;
1812     pa_cvolume volume;
1813     pa_bool_t
1814         corked = FALSE,
1815         no_remap = FALSE,
1816         no_remix = FALSE,
1817         fix_format = FALSE,
1818         fix_rate = FALSE,
1819         fix_channels = FALSE,
1820         no_move = FALSE,
1821         variable_rate = FALSE,
1822         muted = FALSE,
1823         adjust_latency = FALSE,
1824         early_requests = FALSE,
1825         dont_inhibit_auto_suspend = FALSE,
1826         muted_set = FALSE,
1827         fail_on_suspend = FALSE;
1828     pa_sink_input_flags_t flags = 0;
1829     pa_proplist *p;
1830     pa_bool_t volume_set = TRUE;
1831     int ret = PA_ERR_INVALID;
1832
1833     pa_native_connection_assert_ref(c);
1834     pa_assert(t);
1835     memset(&attr, 0, sizeof(attr));
1836
1837     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1838         pa_tagstruct_get(
1839                 t,
1840                 PA_TAG_SAMPLE_SPEC, &ss,
1841                 PA_TAG_CHANNEL_MAP, &map,
1842                 PA_TAG_U32, &sink_index,
1843                 PA_TAG_STRING, &sink_name,
1844                 PA_TAG_U32, &attr.maxlength,
1845                 PA_TAG_BOOLEAN, &corked,
1846                 PA_TAG_U32, &attr.tlength,
1847                 PA_TAG_U32, &attr.prebuf,
1848                 PA_TAG_U32, &attr.minreq,
1849                 PA_TAG_U32, &syncid,
1850                 PA_TAG_CVOLUME, &volume,
1851                 PA_TAG_INVALID) < 0) {
1852
1853         protocol_error(c);
1854         return;
1855     }
1856
1857     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1858     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1859     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1860     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1861     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1862     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1863     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1864     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1865
1866     p = pa_proplist_new();
1867
1868     if (name)
1869         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1870
1871     if (c->version >= 12)  {
1872         /* Since 0.9.8 the user can ask for a couple of additional flags */
1873
1874         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1875             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1876             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1877             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1878             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1879             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1880             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1881
1882             protocol_error(c);
1883             pa_proplist_free(p);
1884             return;
1885         }
1886     }
1887
1888     if (c->version >= 13) {
1889
1890         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1891             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1892             pa_tagstruct_get_proplist(t, p) < 0) {
1893             protocol_error(c);
1894             pa_proplist_free(p);
1895             return;
1896         }
1897     }
1898
1899     if (c->version >= 14) {
1900
1901         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1902             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1903             protocol_error(c);
1904             pa_proplist_free(p);
1905             return;
1906         }
1907     }
1908
1909     if (c->version >= 15) {
1910
1911         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1912             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1913             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1914             protocol_error(c);
1915             pa_proplist_free(p);
1916             return;
1917         }
1918     }
1919
1920     if (!pa_tagstruct_eof(t)) {
1921         protocol_error(c);
1922         pa_proplist_free(p);
1923         return;
1924     }
1925
1926     if (sink_index != PA_INVALID_INDEX) {
1927
1928         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1929             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1930             pa_proplist_free(p);
1931             return;
1932         }
1933
1934     } else if (sink_name) {
1935
1936         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1937             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1938             pa_proplist_free(p);
1939             return;
1940         }
1941     }
1942
1943     flags =
1944         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1945         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1946         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1947         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1948         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1949         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1950         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1951         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1952         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1953         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1954
1955     /* Only since protocol version 15 there's a seperate muted_set
1956      * flag. For older versions we synthesize it here */
1957     muted_set = muted_set || muted;
1958
1959     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1960     pa_proplist_free(p);
1961
1962     CHECK_VALIDITY(c->pstream, s, tag, ret);
1963
1964     reply = reply_new(tag);
1965     pa_tagstruct_putu32(reply, s->index);
1966     pa_assert(s->sink_input);
1967     pa_tagstruct_putu32(reply, s->sink_input->index);
1968     pa_tagstruct_putu32(reply, missing);
1969
1970 /*     pa_log("initial request is %u", missing); */
1971
1972     if (c->version >= 9) {
1973         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1974
1975         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1976         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1977         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1978         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1979     }
1980
1981     if (c->version >= 12) {
1982         /* Since 0.9.8 we support sending the chosen sample
1983          * spec/channel map/device/suspend status back to the
1984          * client */
1985
1986         pa_tagstruct_put_sample_spec(reply, &ss);
1987         pa_tagstruct_put_channel_map(reply, &map);
1988
1989         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1990         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1991
1992         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
1993     }
1994
1995     if (c->version >= 13)
1996         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
1997
1998     pa_pstream_send_tagstruct(c->pstream, reply);
1999 }
2000
2001 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2002     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2003     uint32_t channel;
2004
2005     pa_native_connection_assert_ref(c);
2006     pa_assert(t);
2007
2008     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2009         !pa_tagstruct_eof(t)) {
2010         protocol_error(c);
2011         return;
2012     }
2013
2014     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2015
2016     switch (command) {
2017
2018         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2019             playback_stream *s;
2020             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2021                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2022                 return;
2023             }
2024
2025             playback_stream_unlink(s);
2026             break;
2027         }
2028
2029         case PA_COMMAND_DELETE_RECORD_STREAM: {
2030             record_stream *s;
2031             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2032                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2033                 return;
2034             }
2035
2036             record_stream_unlink(s);
2037             break;
2038         }
2039
2040         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2041             upload_stream *s;
2042
2043             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2044                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2045                 return;
2046             }
2047
2048             upload_stream_unlink(s);
2049             break;
2050         }
2051
2052         default:
2053             pa_assert_not_reached();
2054     }
2055
2056     pa_pstream_send_simple_ack(c->pstream, tag);
2057 }
2058
2059 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2060     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2061     record_stream *s;
2062     pa_buffer_attr attr;
2063     uint32_t source_index;
2064     const char *name = NULL, *source_name;
2065     pa_sample_spec ss;
2066     pa_channel_map map;
2067     pa_tagstruct *reply;
2068     pa_source *source = NULL;
2069     pa_bool_t
2070         corked = FALSE,
2071         no_remap = FALSE,
2072         no_remix = FALSE,
2073         fix_format = FALSE,
2074         fix_rate = FALSE,
2075         fix_channels = FALSE,
2076         no_move = FALSE,
2077         variable_rate = FALSE,
2078         adjust_latency = FALSE,
2079         peak_detect = FALSE,
2080         early_requests = FALSE,
2081         dont_inhibit_auto_suspend = FALSE,
2082         fail_on_suspend = FALSE;
2083     pa_source_output_flags_t flags = 0;
2084     pa_proplist *p;
2085     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2086     pa_sink_input *direct_on_input = NULL;
2087     int ret = PA_ERR_INVALID;
2088
2089     pa_native_connection_assert_ref(c);
2090     pa_assert(t);
2091
2092     memset(&attr, 0, sizeof(attr));
2093
2094     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2095         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2096         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2097         pa_tagstruct_getu32(t, &source_index) < 0 ||
2098         pa_tagstruct_gets(t, &source_name) < 0 ||
2099         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2100         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2101         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2102         protocol_error(c);
2103         return;
2104     }
2105
2106     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2107     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2108     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2109     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2110     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2111     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2112     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2113
2114     p = pa_proplist_new();
2115
2116     if (name)
2117         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2118
2119     if (c->version >= 12)  {
2120         /* Since 0.9.8 the user can ask for a couple of additional flags */
2121
2122         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2123             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2124             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2125             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2126             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2127             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2128             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2129
2130             protocol_error(c);
2131             pa_proplist_free(p);
2132             return;
2133         }
2134     }
2135
2136     if (c->version >= 13) {
2137
2138         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2139             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2140             pa_tagstruct_get_proplist(t, p) < 0 ||
2141             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2142             protocol_error(c);
2143             pa_proplist_free(p);
2144             return;
2145         }
2146     }
2147
2148     if (c->version >= 14) {
2149
2150         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2151             protocol_error(c);
2152             pa_proplist_free(p);
2153             return;
2154         }
2155     }
2156
2157     if (c->version >= 15) {
2158
2159         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2160             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2161             protocol_error(c);
2162             pa_proplist_free(p);
2163             return;
2164         }
2165     }
2166
2167     if (!pa_tagstruct_eof(t)) {
2168         protocol_error(c);
2169         pa_proplist_free(p);
2170         return;
2171     }
2172
2173     if (source_index != PA_INVALID_INDEX) {
2174
2175         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2176             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2177             pa_proplist_free(p);
2178             return;
2179         }
2180
2181     } else if (source_name) {
2182
2183         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2184             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2185             pa_proplist_free(p);
2186             return;
2187         }
2188     }
2189
2190     if (direct_on_input_idx != PA_INVALID_INDEX) {
2191
2192         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2193             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2194             pa_proplist_free(p);
2195             return;
2196         }
2197     }
2198
2199     flags =
2200         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2201         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2202         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2203         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2204         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2205         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2206         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2207         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2208         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2209         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2210
2211     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2212     pa_proplist_free(p);
2213
2214     CHECK_VALIDITY(c->pstream, s, tag, ret);
2215
2216     reply = reply_new(tag);
2217     pa_tagstruct_putu32(reply, s->index);
2218     pa_assert(s->source_output);
2219     pa_tagstruct_putu32(reply, s->source_output->index);
2220
2221     if (c->version >= 9) {
2222         /* Since 0.9 we support sending the buffer metrics back to the client */
2223
2224         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2225         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2226     }
2227
2228     if (c->version >= 12) {
2229         /* Since 0.9.8 we support sending the chosen sample
2230          * spec/channel map/device/suspend status back to the
2231          * client */
2232
2233         pa_tagstruct_put_sample_spec(reply, &ss);
2234         pa_tagstruct_put_channel_map(reply, &map);
2235
2236         pa_tagstruct_putu32(reply, s->source_output->source->index);
2237         pa_tagstruct_puts(reply, s->source_output->source->name);
2238
2239         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2240     }
2241
2242     if (c->version >= 13)
2243         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2244
2245     pa_pstream_send_tagstruct(c->pstream, reply);
2246 }
2247
2248 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2249     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2250     int ret;
2251
2252     pa_native_connection_assert_ref(c);
2253     pa_assert(t);
2254
2255     if (!pa_tagstruct_eof(t)) {
2256         protocol_error(c);
2257         return;
2258     }
2259
2260     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2261     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2262     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2263
2264     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2265 }
2266
2267 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2268     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2269     const void*cookie;
2270     pa_tagstruct *reply;
2271     pa_bool_t shm_on_remote = FALSE, do_shm;
2272
2273     pa_native_connection_assert_ref(c);
2274     pa_assert(t);
2275
2276     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2277         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2278         !pa_tagstruct_eof(t)) {
2279         protocol_error(c);
2280         return;
2281     }
2282
2283     /* Minimum supported version */
2284     if (c->version < 8) {
2285         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2286         return;
2287     }
2288
2289     /* Starting with protocol version 13 the MSB of the version tag
2290        reflects if shm is available for this pa_native_connection or
2291        not. */
2292     if (c->version >= 13) {
2293         shm_on_remote = !!(c->version & 0x80000000U);
2294         c->version &= 0x7FFFFFFFU;
2295     }
2296
2297     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2298
2299     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2300
2301     if (!c->authorized) {
2302         pa_bool_t success = FALSE;
2303
2304 #ifdef HAVE_CREDS
2305         const pa_creds *creds;
2306
2307         if ((creds = pa_pdispatch_creds(pd))) {
2308             if (creds->uid == getuid())
2309                 success = TRUE;
2310             else if (c->options->auth_group) {
2311                 int r;
2312                 gid_t gid;
2313
2314                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2315                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2316                 else if (gid == creds->gid)
2317                     success = TRUE;
2318
2319                 if (!success) {
2320                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2321                         pa_log_warn("Failed to check group membership.");
2322                     else if (r > 0)
2323                         success = TRUE;
2324                 }
2325             }
2326
2327             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2328                         (unsigned long) creds->uid,
2329                         (unsigned long) creds->gid,
2330                         (int) success);
2331         }
2332 #endif
2333
2334         if (!success && c->options->auth_cookie) {
2335             const uint8_t *ac;
2336
2337             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2338                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2339                     success = TRUE;
2340         }
2341
2342         if (!success) {
2343             pa_log_warn("Denied access to client with invalid authorization data.");
2344             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2345             return;
2346         }
2347
2348         c->authorized = TRUE;
2349         if (c->auth_timeout_event) {
2350             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2351             c->auth_timeout_event = NULL;
2352         }
2353     }
2354
2355     /* Enable shared memory support if possible */
2356     do_shm =
2357         pa_mempool_is_shared(c->protocol->core->mempool) &&
2358         c->is_local;
2359
2360     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2361
2362     if (do_shm)
2363         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2364             do_shm = FALSE;
2365
2366 #ifdef HAVE_CREDS
2367     if (do_shm) {
2368         /* Only enable SHM if both sides are owned by the same
2369          * user. This is a security measure because otherwise data
2370          * private to the user might leak. */
2371
2372         const pa_creds *creds;
2373         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2374             do_shm = FALSE;
2375     }
2376 #endif
2377
2378     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2379     pa_pstream_enable_shm(c->pstream, do_shm);
2380
2381     reply = reply_new(tag);
2382     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2383
2384 #ifdef HAVE_CREDS
2385 {
2386     /* SHM support is only enabled after both sides made sure they are the same user. */
2387
2388     pa_creds ucred;
2389
2390     ucred.uid = getuid();
2391     ucred.gid = getgid();
2392
2393     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2394 }
2395 #else
2396     pa_pstream_send_tagstruct(c->pstream, reply);
2397 #endif
2398 }
2399
2400 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2401     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2402     const char *name = NULL;
2403     pa_proplist *p;
2404     pa_tagstruct *reply;
2405
2406     pa_native_connection_assert_ref(c);
2407     pa_assert(t);
2408
2409     p = pa_proplist_new();
2410
2411     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2412         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2413         !pa_tagstruct_eof(t)) {
2414
2415         protocol_error(c);
2416         pa_proplist_free(p);
2417         return;
2418     }
2419
2420     if (name)
2421         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2422             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2423             pa_proplist_free(p);
2424             return;
2425         }
2426
2427     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2428     pa_proplist_free(p);
2429
2430     reply = reply_new(tag);
2431
2432     if (c->version >= 13)
2433         pa_tagstruct_putu32(reply, c->client->index);
2434
2435     pa_pstream_send_tagstruct(c->pstream, reply);
2436 }
2437
2438 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2439     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2440     const char *name;
2441     uint32_t idx = PA_IDXSET_INVALID;
2442
2443     pa_native_connection_assert_ref(c);
2444     pa_assert(t);
2445
2446     if (pa_tagstruct_gets(t, &name) < 0 ||
2447         !pa_tagstruct_eof(t)) {
2448         protocol_error(c);
2449         return;
2450     }
2451
2452     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2453     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2454
2455     if (command == PA_COMMAND_LOOKUP_SINK) {
2456         pa_sink *sink;
2457         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2458             idx = sink->index;
2459     } else {
2460         pa_source *source;
2461         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2462         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2463             idx = source->index;
2464     }
2465
2466     if (idx == PA_IDXSET_INVALID)
2467         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2468     else {
2469         pa_tagstruct *reply;
2470         reply = reply_new(tag);
2471         pa_tagstruct_putu32(reply, idx);
2472         pa_pstream_send_tagstruct(c->pstream, reply);
2473     }
2474 }
2475
2476 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2477     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2478     uint32_t idx;
2479     playback_stream *s;
2480
2481     pa_native_connection_assert_ref(c);
2482     pa_assert(t);
2483
2484     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2485         !pa_tagstruct_eof(t)) {
2486         protocol_error(c);
2487         return;
2488     }
2489
2490     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2491     s = pa_idxset_get_by_index(c->output_streams, idx);
2492     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2493     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2494
2495     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2496 }
2497
2498 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2499     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2500     pa_tagstruct *reply;
2501     const pa_mempool_stat *stat;
2502
2503     pa_native_connection_assert_ref(c);
2504     pa_assert(t);
2505
2506     if (!pa_tagstruct_eof(t)) {
2507         protocol_error(c);
2508         return;
2509     }
2510
2511     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2512
2513     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2514
2515     reply = reply_new(tag);
2516     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2517     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2518     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2519     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2520     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2521     pa_pstream_send_tagstruct(c->pstream, reply);
2522 }
2523
2524 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2525     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2526     pa_tagstruct *reply;
2527     playback_stream *s;
2528     struct timeval tv, now;
2529     uint32_t idx;
2530
2531     pa_native_connection_assert_ref(c);
2532     pa_assert(t);
2533
2534     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2535         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2536         !pa_tagstruct_eof(t)) {
2537         protocol_error(c);
2538         return;
2539     }
2540
2541     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2542     s = pa_idxset_get_by_index(c->output_streams, idx);
2543     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2544     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2545
2546     /* Get an atomic snapshot of all timing parameters */
2547     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2548
2549     reply = reply_new(tag);
2550     pa_tagstruct_put_usec(reply,
2551                           s->current_sink_latency +
2552                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec));
2553     pa_tagstruct_put_usec(reply, 0);
2554     pa_tagstruct_put_boolean(reply,
2555                              s->playing_for > 0 &&
2556                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2557                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2558     pa_tagstruct_put_timeval(reply, &tv);
2559     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2560     pa_tagstruct_puts64(reply, s->write_index);
2561     pa_tagstruct_puts64(reply, s->read_index);
2562
2563     if (c->version >= 13) {
2564         pa_tagstruct_putu64(reply, s->underrun_for);
2565         pa_tagstruct_putu64(reply, s->playing_for);
2566     }
2567
2568     pa_pstream_send_tagstruct(c->pstream, reply);
2569 }
2570
2571 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2572     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2573     pa_tagstruct *reply;
2574     record_stream *s;
2575     struct timeval tv, now;
2576     uint32_t idx;
2577
2578     pa_native_connection_assert_ref(c);
2579     pa_assert(t);
2580
2581     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2582         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2583         !pa_tagstruct_eof(t)) {
2584         protocol_error(c);
2585         return;
2586     }
2587
2588     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2589     s = pa_idxset_get_by_index(c->record_streams, idx);
2590     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2591
2592     /* Get an atomic snapshot of all timing parameters */
2593     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2594
2595     reply = reply_new(tag);
2596     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2597     pa_tagstruct_put_usec(reply,
2598                           s->current_source_latency +
2599                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2600     pa_tagstruct_put_boolean(reply,
2601                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2602                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2603     pa_tagstruct_put_timeval(reply, &tv);
2604     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2605     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2606     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2607     pa_pstream_send_tagstruct(c->pstream, reply);
2608 }
2609
2610 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2611     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2612     upload_stream *s;
2613     uint32_t length;
2614     const char *name = NULL;
2615     pa_sample_spec ss;
2616     pa_channel_map map;
2617     pa_tagstruct *reply;
2618     pa_proplist *p;
2619
2620     pa_native_connection_assert_ref(c);
2621     pa_assert(t);
2622
2623     if (pa_tagstruct_gets(t, &name) < 0 ||
2624         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2625         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2626         pa_tagstruct_getu32(t, &length) < 0) {
2627         protocol_error(c);
2628         return;
2629     }
2630
2631     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2632     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2633     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2634     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2635     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2636     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2637
2638     p = pa_proplist_new();
2639
2640     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2641         !pa_tagstruct_eof(t)) {
2642
2643         protocol_error(c);
2644         pa_proplist_free(p);
2645         return;
2646     }
2647
2648     if (c->version < 13)
2649         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2650     else if (!name)
2651         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2652             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2653
2654     if (!name || !pa_namereg_is_valid_name(name)) {
2655         pa_proplist_free(p);
2656         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2657     }
2658
2659     s = upload_stream_new(c, &ss, &map, name, length, p);
2660     pa_proplist_free(p);
2661
2662     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2663
2664     reply = reply_new(tag);
2665     pa_tagstruct_putu32(reply, s->index);
2666     pa_tagstruct_putu32(reply, length);
2667     pa_pstream_send_tagstruct(c->pstream, reply);
2668 }
2669
2670 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2671     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2672     uint32_t channel;
2673     upload_stream *s;
2674     uint32_t idx;
2675
2676     pa_native_connection_assert_ref(c);
2677     pa_assert(t);
2678
2679     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2680         !pa_tagstruct_eof(t)) {
2681         protocol_error(c);
2682         return;
2683     }
2684
2685     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2686
2687     s = pa_idxset_get_by_index(c->output_streams, channel);
2688     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2689     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2690
2691     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2692         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2693     else
2694         pa_pstream_send_simple_ack(c->pstream, tag);
2695
2696     upload_stream_unlink(s);
2697 }
2698
2699 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2700     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2701     uint32_t sink_index;
2702     pa_volume_t volume;
2703     pa_sink *sink;
2704     const char *name, *sink_name;
2705     uint32_t idx;
2706     pa_proplist *p;
2707     pa_tagstruct *reply;
2708
2709     pa_native_connection_assert_ref(c);
2710     pa_assert(t);
2711
2712     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2713
2714     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2715         pa_tagstruct_gets(t, &sink_name) < 0 ||
2716         pa_tagstruct_getu32(t, &volume) < 0 ||
2717         pa_tagstruct_gets(t, &name) < 0) {
2718         protocol_error(c);
2719         return;
2720     }
2721
2722     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2723     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2724     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2725     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2726
2727     if (sink_index != PA_INVALID_INDEX)
2728         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2729     else
2730         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2731
2732     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2733
2734     p = pa_proplist_new();
2735
2736     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2737         !pa_tagstruct_eof(t)) {
2738         protocol_error(c);
2739         pa_proplist_free(p);
2740         return;
2741     }
2742
2743     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2744
2745     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2746         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2747         pa_proplist_free(p);
2748         return;
2749     }
2750
2751     pa_proplist_free(p);
2752
2753     reply = reply_new(tag);
2754
2755     if (c->version >= 13)
2756         pa_tagstruct_putu32(reply, idx);
2757
2758     pa_pstream_send_tagstruct(c->pstream, reply);
2759 }
2760
2761 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2762     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2763     const char *name;
2764
2765     pa_native_connection_assert_ref(c);
2766     pa_assert(t);
2767
2768     if (pa_tagstruct_gets(t, &name) < 0 ||
2769         !pa_tagstruct_eof(t)) {
2770         protocol_error(c);
2771         return;
2772     }
2773
2774     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2775     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2776
2777     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2778         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2779         return;
2780     }
2781
2782     pa_pstream_send_simple_ack(c->pstream, tag);
2783 }
2784
2785 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2786     pa_assert(c);
2787     pa_assert(fixed);
2788     pa_assert(original);
2789
2790     *fixed = *original;
2791
2792     if (c->version < 12) {
2793         /* Before protocol version 12 we didn't support S32 samples,
2794          * so we need to lie about this to the client */
2795
2796         if (fixed->format == PA_SAMPLE_S32LE)
2797             fixed->format = PA_SAMPLE_FLOAT32LE;
2798         if (fixed->format == PA_SAMPLE_S32BE)
2799             fixed->format = PA_SAMPLE_FLOAT32BE;
2800     }
2801
2802     if (c->version < 15) {
2803         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2804             fixed->format = PA_SAMPLE_FLOAT32LE;
2805         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2806             fixed->format = PA_SAMPLE_FLOAT32BE;
2807     }
2808 }
2809
2810 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2811     pa_sample_spec fixed_ss;
2812
2813     pa_assert(t);
2814     pa_sink_assert_ref(sink);
2815
2816     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2817
2818     pa_tagstruct_put(
2819         t,
2820         PA_TAG_U32, sink->index,
2821         PA_TAG_STRING, sink->name,
2822         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2823         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2824         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2825         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2826         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE, FALSE),
2827         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2828         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2829         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2830         PA_TAG_USEC, pa_sink_get_latency(sink),
2831         PA_TAG_STRING, sink->driver,
2832         PA_TAG_U32, sink->flags,
2833         PA_TAG_INVALID);
2834
2835     if (c->version >= 13) {
2836         pa_tagstruct_put_proplist(t, sink->proplist);
2837         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2838     }
2839
2840     if (c->version >= 15) {
2841         pa_tagstruct_put_volume(t, sink->base_volume);
2842         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2843             pa_log_error("Internal sink state is invalid.");
2844         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2845         pa_tagstruct_putu32(t, sink->n_volume_steps);
2846         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2847     }
2848
2849     if (c->version >= 16) {
2850         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
2851
2852         if (sink->ports) {
2853             void *state;
2854             pa_device_port *p;
2855
2856             PA_HASHMAP_FOREACH(p, sink->ports, state) {
2857                 pa_tagstruct_puts(t, p->name);
2858                 pa_tagstruct_puts(t, p->description);
2859                 pa_tagstruct_putu32(t, p->priority);
2860             }
2861         }
2862
2863         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
2864     }
2865 }
2866
2867 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2868     pa_sample_spec fixed_ss;
2869
2870     pa_assert(t);
2871     pa_source_assert_ref(source);
2872
2873     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2874
2875     pa_tagstruct_put(
2876         t,
2877         PA_TAG_U32, source->index,
2878         PA_TAG_STRING, source->name,
2879         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2880         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2881         PA_TAG_CHANNEL_MAP, &source->channel_map,
2882         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2883         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2884         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2885         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2886         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2887         PA_TAG_USEC, pa_source_get_latency(source),
2888         PA_TAG_STRING, source->driver,
2889         PA_TAG_U32, source->flags,
2890         PA_TAG_INVALID);
2891
2892     if (c->version >= 13) {
2893         pa_tagstruct_put_proplist(t, source->proplist);
2894         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2895     }
2896
2897     if (c->version >= 15) {
2898         pa_tagstruct_put_volume(t, source->base_volume);
2899         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2900             pa_log_error("Internal source state is invalid.");
2901         pa_tagstruct_putu32(t, pa_source_get_state(source));
2902         pa_tagstruct_putu32(t, source->n_volume_steps);
2903         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2904     }
2905
2906     if (c->version >= 16) {
2907
2908         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
2909
2910         if (source->ports) {
2911             void *state;
2912             pa_device_port *p;
2913
2914             PA_HASHMAP_FOREACH(p, source->ports, state) {
2915                 pa_tagstruct_puts(t, p->name);
2916                 pa_tagstruct_puts(t, p->description);
2917                 pa_tagstruct_putu32(t, p->priority);
2918             }
2919         }
2920
2921         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
2922     }
2923 }
2924
2925 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2926     pa_assert(t);
2927     pa_assert(client);
2928
2929     pa_tagstruct_putu32(t, client->index);
2930     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2931     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2932     pa_tagstruct_puts(t, client->driver);
2933
2934     if (c->version >= 13)
2935         pa_tagstruct_put_proplist(t, client->proplist);
2936 }
2937
2938 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2939     void *state = NULL;
2940     pa_card_profile *p;
2941
2942     pa_assert(t);
2943     pa_assert(card);
2944
2945     pa_tagstruct_putu32(t, card->index);
2946     pa_tagstruct_puts(t, card->name);
2947     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2948     pa_tagstruct_puts(t, card->driver);
2949
2950     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2951
2952     if (card->profiles) {
2953         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2954             pa_tagstruct_puts(t, p->name);
2955             pa_tagstruct_puts(t, p->description);
2956             pa_tagstruct_putu32(t, p->n_sinks);
2957             pa_tagstruct_putu32(t, p->n_sources);
2958             pa_tagstruct_putu32(t, p->priority);
2959         }
2960     }
2961
2962     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2963     pa_tagstruct_put_proplist(t, card->proplist);
2964 }
2965
2966 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2967     pa_assert(t);
2968     pa_assert(module);
2969
2970     pa_tagstruct_putu32(t, module->index);
2971     pa_tagstruct_puts(t, module->name);
2972     pa_tagstruct_puts(t, module->argument);
2973     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2974
2975     if (c->version < 15)
2976         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2977
2978     if (c->version >= 15)
2979         pa_tagstruct_put_proplist(t, module->proplist);
2980 }
2981
2982 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2983     pa_sample_spec fixed_ss;
2984     pa_usec_t sink_latency;
2985     pa_cvolume v;
2986
2987     pa_assert(t);
2988     pa_sink_input_assert_ref(s);
2989
2990     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2991
2992     pa_tagstruct_putu32(t, s->index);
2993     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2994     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2995     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2996     pa_tagstruct_putu32(t, s->sink->index);
2997     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2998     pa_tagstruct_put_channel_map(t, &s->channel_map);
2999     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s, &v, TRUE));
3000     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3001     pa_tagstruct_put_usec(t, sink_latency);
3002     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3003     pa_tagstruct_puts(t, s->driver);
3004     if (c->version >= 11)
3005         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3006     if (c->version >= 13)
3007         pa_tagstruct_put_proplist(t, s->proplist);
3008 }
3009
3010 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3011     pa_sample_spec fixed_ss;
3012     pa_usec_t source_latency;
3013
3014     pa_assert(t);
3015     pa_source_output_assert_ref(s);
3016
3017     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3018
3019     pa_tagstruct_putu32(t, s->index);
3020     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3021     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3022     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3023     pa_tagstruct_putu32(t, s->source->index);
3024     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3025     pa_tagstruct_put_channel_map(t, &s->channel_map);
3026     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3027     pa_tagstruct_put_usec(t, source_latency);
3028     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3029     pa_tagstruct_puts(t, s->driver);
3030
3031     if (c->version >= 13)
3032         pa_tagstruct_put_proplist(t, s->proplist);
3033 }
3034
3035 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3036     pa_sample_spec fixed_ss;
3037     pa_cvolume v;
3038
3039     pa_assert(t);
3040     pa_assert(e);
3041
3042     if (e->memchunk.memblock)
3043         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3044     else
3045         memset(&fixed_ss, 0, sizeof(fixed_ss));
3046
3047     pa_tagstruct_putu32(t, e->index);
3048     pa_tagstruct_puts(t, e->name);
3049
3050     if (e->volume_is_set)
3051         v = e->volume;
3052     else
3053         pa_cvolume_init(&v);
3054
3055     pa_tagstruct_put_cvolume(t, &v);
3056     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3057     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3058     pa_tagstruct_put_channel_map(t, &e->channel_map);
3059     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3060     pa_tagstruct_put_boolean(t, e->lazy);
3061     pa_tagstruct_puts(t, e->filename);
3062
3063     if (c->version >= 13)
3064         pa_tagstruct_put_proplist(t, e->proplist);
3065 }
3066
3067 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3068     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3069     uint32_t idx;
3070     pa_sink *sink = NULL;
3071     pa_source *source = NULL;
3072     pa_client *client = NULL;
3073     pa_card *card = NULL;
3074     pa_module *module = NULL;
3075     pa_sink_input *si = NULL;
3076     pa_source_output *so = NULL;
3077     pa_scache_entry *sce = NULL;
3078     const char *name = NULL;
3079     pa_tagstruct *reply;
3080
3081     pa_native_connection_assert_ref(c);
3082     pa_assert(t);
3083
3084     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3085         (command != PA_COMMAND_GET_CLIENT_INFO &&
3086          command != PA_COMMAND_GET_MODULE_INFO &&
3087          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3088          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3089          pa_tagstruct_gets(t, &name) < 0) ||
3090         !pa_tagstruct_eof(t)) {
3091         protocol_error(c);
3092         return;
3093     }
3094
3095     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3096     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3097     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3098     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3099     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3100
3101     if (command == PA_COMMAND_GET_SINK_INFO) {
3102         if (idx != PA_INVALID_INDEX)
3103             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3104         else
3105             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3106     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3107         if (idx != PA_INVALID_INDEX)
3108             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3109         else
3110             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3111     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3112         if (idx != PA_INVALID_INDEX)
3113             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3114         else
3115             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3116     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3117         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3118     else if (command == PA_COMMAND_GET_MODULE_INFO)
3119         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3120     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3121         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3122     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3123         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3124     else {
3125         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3126         if (idx != PA_INVALID_INDEX)
3127             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3128         else
3129             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3130     }
3131
3132     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3133         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3134         return;
3135     }
3136
3137     reply = reply_new(tag);
3138     if (sink)
3139         sink_fill_tagstruct(c, reply, sink);
3140     else if (source)
3141         source_fill_tagstruct(c, reply, source);
3142     else if (client)
3143         client_fill_tagstruct(c, reply, client);
3144     else if (card)
3145         card_fill_tagstruct(c, reply, card);
3146     else if (module)
3147         module_fill_tagstruct(c, reply, module);
3148     else if (si)
3149         sink_input_fill_tagstruct(c, reply, si);
3150     else if (so)
3151         source_output_fill_tagstruct(c, reply, so);
3152     else
3153         scache_fill_tagstruct(c, reply, sce);
3154     pa_pstream_send_tagstruct(c->pstream, reply);
3155 }
3156
3157 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3158     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3159     pa_idxset *i;
3160     uint32_t idx;
3161     void *p;
3162     pa_tagstruct *reply;
3163
3164     pa_native_connection_assert_ref(c);
3165     pa_assert(t);
3166
3167     if (!pa_tagstruct_eof(t)) {
3168         protocol_error(c);
3169         return;
3170     }
3171
3172     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3173
3174     reply = reply_new(tag);
3175
3176     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3177         i = c->protocol->core->sinks;
3178     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3179         i = c->protocol->core->sources;
3180     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3181         i = c->protocol->core->clients;
3182     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3183         i = c->protocol->core->cards;
3184     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3185         i = c->protocol->core->modules;
3186     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3187         i = c->protocol->core->sink_inputs;
3188     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3189         i = c->protocol->core->source_outputs;
3190     else {
3191         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3192         i = c->protocol->core->scache;
3193     }
3194
3195     if (i) {
3196         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3197             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3198                 sink_fill_tagstruct(c, reply, p);
3199             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3200                 source_fill_tagstruct(c, reply, p);
3201             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3202                 client_fill_tagstruct(c, reply, p);
3203             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3204                 card_fill_tagstruct(c, reply, p);
3205             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3206                 module_fill_tagstruct(c, reply, p);
3207             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3208                 sink_input_fill_tagstruct(c, reply, p);
3209             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3210                 source_output_fill_tagstruct(c, reply, p);
3211             else {
3212                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3213                 scache_fill_tagstruct(c, reply, p);
3214             }
3215         }
3216     }
3217
3218     pa_pstream_send_tagstruct(c->pstream, reply);
3219 }
3220
3221 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3222     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3223     pa_tagstruct *reply;
3224     pa_sink *def_sink;
3225     pa_source *def_source;
3226     pa_sample_spec fixed_ss;
3227     char *h, *u;
3228
3229     pa_native_connection_assert_ref(c);
3230     pa_assert(t);
3231
3232     if (!pa_tagstruct_eof(t)) {
3233         protocol_error(c);
3234         return;
3235     }
3236
3237     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3238
3239     reply = reply_new(tag);
3240     pa_tagstruct_puts(reply, PACKAGE_NAME);
3241     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3242
3243     u = pa_get_user_name_malloc();
3244     pa_tagstruct_puts(reply, u);
3245     pa_xfree(u);
3246
3247     h = pa_get_host_name_malloc();
3248     pa_tagstruct_puts(reply, h);
3249     pa_xfree(h);
3250
3251     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3252     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3253
3254     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3255     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3256     def_source = pa_namereg_get_default_source(c->protocol->core);
3257     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3258
3259     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3260
3261     if (c->version >= 15)
3262         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3263
3264     pa_pstream_send_tagstruct(c->pstream, reply);
3265 }
3266
3267 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3268     pa_tagstruct *t;
3269     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3270
3271     pa_native_connection_assert_ref(c);
3272
3273     t = pa_tagstruct_new(NULL, 0);
3274     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3275     pa_tagstruct_putu32(t, (uint32_t) -1);
3276     pa_tagstruct_putu32(t, e);
3277     pa_tagstruct_putu32(t, idx);
3278     pa_pstream_send_tagstruct(c->pstream, t);
3279 }
3280
3281 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3282     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3283     pa_subscription_mask_t m;
3284
3285     pa_native_connection_assert_ref(c);
3286     pa_assert(t);
3287
3288     if (pa_tagstruct_getu32(t, &m) < 0 ||
3289         !pa_tagstruct_eof(t)) {
3290         protocol_error(c);
3291         return;
3292     }
3293
3294     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3295     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3296
3297     if (c->subscription)
3298         pa_subscription_free(c->subscription);
3299
3300     if (m != 0) {
3301         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3302         pa_assert(c->subscription);
3303     } else
3304         c->subscription = NULL;
3305
3306     pa_pstream_send_simple_ack(c->pstream, tag);
3307 }
3308
3309 static void command_set_volume(
3310         pa_pdispatch *pd,
3311         uint32_t command,
3312         uint32_t tag,
3313         pa_tagstruct *t,
3314         void *userdata) {
3315
3316     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3317     uint32_t idx;
3318     pa_cvolume volume;
3319     pa_sink *sink = NULL;
3320     pa_source *source = NULL;
3321     pa_sink_input *si = NULL;
3322     const char *name = NULL;
3323
3324     pa_native_connection_assert_ref(c);
3325     pa_assert(t);
3326
3327     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3328         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3329         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3330         pa_tagstruct_get_cvolume(t, &volume) ||
3331         !pa_tagstruct_eof(t)) {
3332         protocol_error(c);
3333         return;
3334     }
3335
3336     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3337     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3338     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3339     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3340     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3341     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3342
3343     switch (command) {
3344
3345         case PA_COMMAND_SET_SINK_VOLUME:
3346             if (idx != PA_INVALID_INDEX)
3347                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3348             else
3349                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3350             break;
3351
3352         case PA_COMMAND_SET_SOURCE_VOLUME:
3353             if (idx != PA_INVALID_INDEX)
3354                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3355             else
3356                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3357             break;
3358
3359         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3360             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3361             break;
3362
3363         default:
3364             pa_assert_not_reached();
3365     }
3366
3367     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3368
3369     if (sink)
3370         pa_sink_set_volume(sink, &volume, TRUE, TRUE, TRUE, TRUE);
3371     else if (source)
3372         pa_source_set_volume(source, &volume, TRUE);
3373     else if (si)
3374         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3375
3376     pa_pstream_send_simple_ack(c->pstream, tag);
3377 }
3378
3379 static void command_set_mute(
3380         pa_pdispatch *pd,
3381         uint32_t command,
3382         uint32_t tag,
3383         pa_tagstruct *t,
3384         void *userdata) {
3385
3386     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3387     uint32_t idx;
3388     pa_bool_t mute;
3389     pa_sink *sink = NULL;
3390     pa_source *source = NULL;
3391     pa_sink_input *si = NULL;
3392     const char *name = NULL;
3393
3394     pa_native_connection_assert_ref(c);
3395     pa_assert(t);
3396
3397     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3398         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3399         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3400         pa_tagstruct_get_boolean(t, &mute) ||
3401         !pa_tagstruct_eof(t)) {
3402         protocol_error(c);
3403         return;
3404     }
3405
3406     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3407     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3408     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3409     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3410     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3411
3412     switch (command) {
3413
3414         case PA_COMMAND_SET_SINK_MUTE:
3415
3416             if (idx != PA_INVALID_INDEX)
3417                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3418             else
3419                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3420
3421             break;
3422
3423         case PA_COMMAND_SET_SOURCE_MUTE:
3424             if (idx != PA_INVALID_INDEX)
3425                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3426             else
3427                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3428
3429             break;
3430
3431         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3432             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3433             break;
3434
3435         default:
3436             pa_assert_not_reached();
3437     }
3438
3439     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3440
3441     if (sink)
3442         pa_sink_set_mute(sink, mute, TRUE);
3443     else if (source)
3444         pa_source_set_mute(source, mute, TRUE);
3445     else if (si)
3446         pa_sink_input_set_mute(si, mute, TRUE);
3447
3448     pa_pstream_send_simple_ack(c->pstream, tag);
3449 }
3450
3451 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3452     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3453     uint32_t idx;
3454     pa_bool_t b;
3455     playback_stream *s;
3456
3457     pa_native_connection_assert_ref(c);
3458     pa_assert(t);
3459
3460     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3461         pa_tagstruct_get_boolean(t, &b) < 0 ||
3462         !pa_tagstruct_eof(t)) {
3463         protocol_error(c);
3464         return;
3465     }
3466
3467     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3468     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3469     s = pa_idxset_get_by_index(c->output_streams, idx);
3470     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3471     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3472
3473     pa_sink_input_cork(s->sink_input, b);
3474
3475     if (b)
3476         s->is_underrun = TRUE;
3477
3478     pa_pstream_send_simple_ack(c->pstream, tag);
3479 }
3480
3481 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3482     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3483     uint32_t idx;
3484     playback_stream *s;
3485
3486     pa_native_connection_assert_ref(c);
3487     pa_assert(t);
3488
3489     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3490         !pa_tagstruct_eof(t)) {
3491         protocol_error(c);
3492         return;
3493     }
3494
3495     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3496     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3497     s = pa_idxset_get_by_index(c->output_streams, idx);
3498     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3499     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3500
3501     switch (command) {
3502         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3503             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3504             break;
3505
3506         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3507             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3508             break;
3509
3510         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3511             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3512             break;
3513
3514         default:
3515             pa_assert_not_reached();
3516     }
3517
3518     pa_pstream_send_simple_ack(c->pstream, tag);
3519 }
3520
3521 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3522     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3523     uint32_t idx;
3524     record_stream *s;
3525     pa_bool_t b;
3526
3527     pa_native_connection_assert_ref(c);
3528     pa_assert(t);
3529
3530     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3531         pa_tagstruct_get_boolean(t, &b) < 0 ||
3532         !pa_tagstruct_eof(t)) {
3533         protocol_error(c);
3534         return;
3535     }
3536
3537     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3538     s = pa_idxset_get_by_index(c->record_streams, idx);
3539     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3540
3541     pa_source_output_cork(s->source_output, b);
3542     pa_memblockq_prebuf_force(s->memblockq);
3543     pa_pstream_send_simple_ack(c->pstream, tag);
3544 }
3545
3546 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3547     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3548     uint32_t idx;
3549     record_stream *s;
3550
3551     pa_native_connection_assert_ref(c);
3552     pa_assert(t);
3553
3554     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3555         !pa_tagstruct_eof(t)) {
3556         protocol_error(c);
3557         return;
3558     }
3559
3560     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3561     s = pa_idxset_get_by_index(c->record_streams, idx);
3562     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3563
3564     pa_memblockq_flush_read(s->memblockq);
3565     pa_pstream_send_simple_ack(c->pstream, tag);
3566 }
3567
3568 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3569     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3570     uint32_t idx;
3571     pa_buffer_attr a;
3572     pa_tagstruct *reply;
3573
3574     pa_native_connection_assert_ref(c);
3575     pa_assert(t);
3576
3577     memset(&a, 0, sizeof(a));
3578
3579     if (pa_tagstruct_getu32(t, &idx) < 0) {
3580         protocol_error(c);
3581         return;
3582     }
3583
3584     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3585
3586     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3587         playback_stream *s;
3588         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3589
3590         s = pa_idxset_get_by_index(c->output_streams, idx);
3591         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3592         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3593
3594         if (pa_tagstruct_get(
3595                     t,
3596                     PA_TAG_U32, &a.maxlength,
3597                     PA_TAG_U32, &a.tlength,
3598                     PA_TAG_U32, &a.prebuf,
3599                     PA_TAG_U32, &a.minreq,
3600                     PA_TAG_INVALID) < 0 ||
3601             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3602             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3603             !pa_tagstruct_eof(t)) {
3604             protocol_error(c);
3605             return;
3606         }
3607
3608         s->adjust_latency = adjust_latency;
3609         s->early_requests = early_requests;
3610         s->buffer_attr = a;
3611
3612         fix_playback_buffer_attr(s);
3613         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3614
3615         reply = reply_new(tag);
3616         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3617         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3618         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3619         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3620
3621         if (c->version >= 13)
3622             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3623
3624     } else {
3625         record_stream *s;
3626         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3627         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3628
3629         s = pa_idxset_get_by_index(c->record_streams, idx);
3630         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3631
3632         if (pa_tagstruct_get(
3633                     t,
3634                     PA_TAG_U32, &a.maxlength,
3635                     PA_TAG_U32, &a.fragsize,
3636                     PA_TAG_INVALID) < 0 ||
3637             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3638             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3639             !pa_tagstruct_eof(t)) {
3640             protocol_error(c);
3641             return;
3642         }
3643
3644         s->adjust_latency = adjust_latency;
3645         s->early_requests = early_requests;
3646         s->buffer_attr = a;
3647
3648         fix_record_buffer_attr_pre(s);
3649         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3650         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3651         fix_record_buffer_attr_post(s);
3652
3653         reply = reply_new(tag);
3654         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3655         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3656
3657         if (c->version >= 13)
3658             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3659     }
3660
3661     pa_pstream_send_tagstruct(c->pstream, reply);
3662 }
3663
3664 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3665     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3666     uint32_t idx;
3667     uint32_t rate;
3668
3669     pa_native_connection_assert_ref(c);
3670     pa_assert(t);
3671
3672     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3673         pa_tagstruct_getu32(t, &rate) < 0 ||
3674         !pa_tagstruct_eof(t)) {
3675         protocol_error(c);
3676         return;
3677     }
3678
3679     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3680     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3681
3682     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3683         playback_stream *s;
3684
3685         s = pa_idxset_get_by_index(c->output_streams, idx);
3686         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3687         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3688
3689         pa_sink_input_set_rate(s->sink_input, rate);
3690
3691     } else {
3692         record_stream *s;
3693         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3694
3695         s = pa_idxset_get_by_index(c->record_streams, idx);
3696         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3697
3698         pa_source_output_set_rate(s->source_output, rate);
3699     }
3700
3701     pa_pstream_send_simple_ack(c->pstream, tag);
3702 }
3703
3704 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3705     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3706     uint32_t idx;
3707     uint32_t mode;
3708     pa_proplist *p;
3709
3710     pa_native_connection_assert_ref(c);
3711     pa_assert(t);
3712
3713     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3714
3715     p = pa_proplist_new();
3716
3717     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3718
3719         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3720             pa_tagstruct_get_proplist(t, p) < 0 ||
3721             !pa_tagstruct_eof(t)) {
3722             protocol_error(c);
3723             pa_proplist_free(p);
3724             return;
3725         }
3726
3727     } else {
3728
3729         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3730             pa_tagstruct_getu32(t, &mode) < 0 ||
3731             pa_tagstruct_get_proplist(t, p) < 0 ||
3732             !pa_tagstruct_eof(t)) {
3733             protocol_error(c);
3734             pa_proplist_free(p);
3735             return;
3736         }
3737     }
3738
3739     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3740         pa_proplist_free(p);
3741         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3742     }
3743
3744     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3745         playback_stream *s;
3746
3747         s = pa_idxset_get_by_index(c->output_streams, idx);
3748         if (!s || !playback_stream_isinstance(s)) {
3749             pa_proplist_free(p);
3750             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3751         }
3752         pa_sink_input_update_proplist(s->sink_input, mode, p);
3753
3754     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3755         record_stream *s;
3756
3757         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3758             pa_proplist_free(p);
3759             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3760         }
3761         pa_source_output_update_proplist(s->source_output, mode, p);
3762
3763     } else {
3764         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3765
3766         pa_client_update_proplist(c->client, mode, p);
3767     }
3768
3769     pa_pstream_send_simple_ack(c->pstream, tag);
3770     pa_proplist_free(p);
3771 }
3772
3773 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3774     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3775     uint32_t idx;
3776     unsigned changed = 0;
3777     pa_proplist *p;
3778     pa_strlist *l = NULL;
3779
3780     pa_native_connection_assert_ref(c);
3781     pa_assert(t);
3782
3783     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3784
3785     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3786
3787         if (pa_tagstruct_getu32(t, &idx) < 0) {
3788             protocol_error(c);
3789             return;
3790         }
3791     }
3792
3793     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3794         playback_stream *s;
3795
3796         s = pa_idxset_get_by_index(c->output_streams, idx);
3797         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3798         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3799
3800         p = s->sink_input->proplist;
3801
3802     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3803         record_stream *s;
3804
3805         s = pa_idxset_get_by_index(c->record_streams, idx);
3806         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3807
3808         p = s->source_output->proplist;
3809     } else {
3810         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3811
3812         p = c->client->proplist;
3813     }
3814
3815     for (;;) {
3816         const char *k;
3817
3818         if (pa_tagstruct_gets(t, &k) < 0) {
3819             protocol_error(c);
3820             pa_strlist_free(l);
3821             return;
3822         }
3823
3824         if (!k)
3825             break;
3826
3827         l = pa_strlist_prepend(l, k);
3828     }
3829
3830     if (!pa_tagstruct_eof(t)) {
3831         protocol_error(c);
3832         pa_strlist_free(l);
3833         return;
3834     }
3835
3836     for (;;) {
3837         char *z;
3838
3839         l = pa_strlist_pop(l, &z);
3840
3841         if (!z)
3842             break;
3843
3844         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3845         pa_xfree(z);
3846     }
3847
3848     pa_pstream_send_simple_ack(c->pstream, tag);
3849
3850     if (changed) {
3851         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3852             playback_stream *s;
3853
3854             s = pa_idxset_get_by_index(c->output_streams, idx);
3855             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3856
3857         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3858             record_stream *s;
3859
3860             s = pa_idxset_get_by_index(c->record_streams, idx);
3861             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3862
3863         } else {
3864             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3865             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3866         }
3867     }
3868 }
3869
3870 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3871     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3872     const char *s;
3873
3874     pa_native_connection_assert_ref(c);
3875     pa_assert(t);
3876
3877     if (pa_tagstruct_gets(t, &s) < 0 ||
3878         !pa_tagstruct_eof(t)) {
3879         protocol_error(c);
3880         return;
3881     }
3882
3883     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3884     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3885
3886     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3887         pa_source *source;
3888
3889         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3890         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3891
3892         pa_namereg_set_default_source(c->protocol->core, source);
3893     } else {
3894         pa_sink *sink;
3895         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3896
3897         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3898         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3899
3900         pa_namereg_set_default_sink(c->protocol->core, sink);
3901     }
3902
3903     pa_pstream_send_simple_ack(c->pstream, tag);
3904 }
3905
3906 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3907     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3908     uint32_t idx;
3909     const char *name;
3910
3911     pa_native_connection_assert_ref(c);
3912     pa_assert(t);
3913
3914     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3915         pa_tagstruct_gets(t, &name) < 0 ||
3916         !pa_tagstruct_eof(t)) {
3917         protocol_error(c);
3918         return;
3919     }
3920
3921     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3922     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3923
3924     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3925         playback_stream *s;
3926
3927         s = pa_idxset_get_by_index(c->output_streams, idx);
3928         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3929         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3930
3931         pa_sink_input_set_name(s->sink_input, name);
3932
3933     } else {
3934         record_stream *s;
3935         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3936
3937         s = pa_idxset_get_by_index(c->record_streams, idx);
3938         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3939
3940         pa_source_output_set_name(s->source_output, name);
3941     }
3942
3943     pa_pstream_send_simple_ack(c->pstream, tag);
3944 }
3945
3946 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3947     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3948     uint32_t idx;
3949
3950     pa_native_connection_assert_ref(c);
3951     pa_assert(t);
3952
3953     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3954         !pa_tagstruct_eof(t)) {
3955         protocol_error(c);
3956         return;
3957     }
3958
3959     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3960
3961     if (command == PA_COMMAND_KILL_CLIENT) {
3962         pa_client *client;
3963
3964         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3965         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3966
3967         pa_native_connection_ref(c);
3968         pa_client_kill(client);
3969
3970     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3971         pa_sink_input *s;
3972
3973         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3974         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3975
3976         pa_native_connection_ref(c);
3977         pa_sink_input_kill(s);
3978     } else {
3979         pa_source_output *s;
3980
3981         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
3982
3983         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3984         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3985
3986         pa_native_connection_ref(c);
3987         pa_source_output_kill(s);
3988     }
3989
3990     pa_pstream_send_simple_ack(c->pstream, tag);
3991     pa_native_connection_unref(c);
3992 }
3993
3994 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3995     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3996     pa_module *m;
3997     const char *name, *argument;
3998     pa_tagstruct *reply;
3999
4000     pa_native_connection_assert_ref(c);
4001     pa_assert(t);
4002
4003     if (pa_tagstruct_gets(t, &name) < 0 ||
4004         pa_tagstruct_gets(t, &argument) < 0 ||
4005         !pa_tagstruct_eof(t)) {
4006         protocol_error(c);
4007         return;
4008     }
4009
4010     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4011     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4012     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4013
4014     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4015         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4016         return;
4017     }
4018
4019     reply = reply_new(tag);
4020     pa_tagstruct_putu32(reply, m->index);
4021     pa_pstream_send_tagstruct(c->pstream, reply);
4022 }
4023
4024 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4025     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4026     uint32_t idx;
4027     pa_module *m;
4028
4029     pa_native_connection_assert_ref(c);
4030     pa_assert(t);
4031
4032     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4033         !pa_tagstruct_eof(t)) {
4034         protocol_error(c);
4035         return;
4036     }
4037
4038     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4039     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4040     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4041
4042     pa_module_unload_request(m, FALSE);
4043     pa_pstream_send_simple_ack(c->pstream, tag);
4044 }
4045
4046 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4047     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4048     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4049     const char *name_device = NULL;
4050
4051     pa_native_connection_assert_ref(c);
4052     pa_assert(t);
4053
4054     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4055         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4056         pa_tagstruct_gets(t, &name_device) < 0 ||
4057         !pa_tagstruct_eof(t)) {
4058         protocol_error(c);
4059         return;
4060     }
4061
4062     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4063     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4064
4065     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4066     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4067     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4068     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4069
4070     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4071         pa_sink_input *si = NULL;
4072         pa_sink *sink = NULL;
4073
4074         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4075
4076         if (idx_device != PA_INVALID_INDEX)
4077             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4078         else
4079             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4080
4081         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4082
4083         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4084             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4085             return;
4086         }
4087     } else {
4088         pa_source_output *so = NULL;
4089         pa_source *source;
4090
4091         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4092
4093         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4094
4095         if (idx_device != PA_INVALID_INDEX)
4096             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4097         else
4098             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4099
4100         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4101
4102         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4103             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4104             return;
4105         }
4106     }
4107
4108     pa_pstream_send_simple_ack(c->pstream, tag);
4109 }
4110
4111 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4112     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4113     uint32_t idx = PA_INVALID_INDEX;
4114     const char *name = NULL;
4115     pa_bool_t b;
4116
4117     pa_native_connection_assert_ref(c);
4118     pa_assert(t);
4119
4120     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4121         pa_tagstruct_gets(t, &name) < 0 ||
4122         pa_tagstruct_get_boolean(t, &b) < 0 ||
4123         !pa_tagstruct_eof(t)) {
4124         protocol_error(c);
4125         return;
4126     }
4127
4128     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4129     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4130     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4131     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4132     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4133
4134     if (command == PA_COMMAND_SUSPEND_SINK) {
4135
4136         if (idx == PA_INVALID_INDEX && name && !*name) {
4137
4138             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4139
4140             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4141                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4142                 return;
4143             }
4144         } else {
4145             pa_sink *sink = NULL;
4146
4147             if (idx != PA_INVALID_INDEX)
4148                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4149             else
4150                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4151
4152             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4153
4154             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4155                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4156                 return;
4157             }
4158         }
4159     } else {
4160
4161         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4162
4163         if (idx == PA_INVALID_INDEX && name && !*name) {
4164
4165             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4166
4167             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4168                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4169                 return;
4170             }
4171
4172         } else {
4173             pa_source *source;
4174
4175             if (idx != PA_INVALID_INDEX)
4176                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4177             else
4178                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4179
4180             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4181
4182             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4183                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4184                 return;
4185             }
4186         }
4187     }
4188
4189     pa_pstream_send_simple_ack(c->pstream, tag);
4190 }
4191
4192 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4193     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4194     uint32_t idx = PA_INVALID_INDEX;
4195     const char *name = NULL;
4196     pa_module *m;
4197     pa_native_protocol_ext_cb_t cb;
4198
4199     pa_native_connection_assert_ref(c);
4200     pa_assert(t);
4201
4202     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4203         pa_tagstruct_gets(t, &name) < 0) {
4204         protocol_error(c);
4205         return;
4206     }
4207
4208     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4209     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4210     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4211     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4212     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4213
4214     if (idx != PA_INVALID_INDEX)
4215         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4216     else {
4217         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4218             if (strcmp(name, m->name) == 0)
4219                 break;
4220     }
4221
4222     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4223     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4224
4225     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4226     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4227
4228     if (cb(c->protocol, m, c, tag, t) < 0)
4229         protocol_error(c);
4230 }
4231
4232 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4233     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4234     uint32_t idx = PA_INVALID_INDEX;
4235     const char *name = NULL, *profile = NULL;
4236     pa_card *card = NULL;
4237     int ret;
4238
4239     pa_native_connection_assert_ref(c);
4240     pa_assert(t);
4241
4242     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4243         pa_tagstruct_gets(t, &name) < 0 ||
4244         pa_tagstruct_gets(t, &profile) < 0 ||
4245         !pa_tagstruct_eof(t)) {
4246         protocol_error(c);
4247         return;
4248     }
4249
4250     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4251     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4252     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4253     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4254     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4255
4256     if (idx != PA_INVALID_INDEX)
4257         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4258     else
4259         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4260
4261     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4262
4263     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4264         pa_pstream_send_error(c->pstream, tag, -ret);
4265         return;
4266     }
4267
4268     pa_pstream_send_simple_ack(c->pstream, tag);
4269 }
4270
4271 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4272     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4273     uint32_t idx = PA_INVALID_INDEX;
4274     const char *name = NULL, *port = NULL;
4275     int ret;
4276
4277     pa_native_connection_assert_ref(c);
4278     pa_assert(t);
4279
4280     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4281         pa_tagstruct_gets(t, &name) < 0 ||
4282         pa_tagstruct_gets(t, &port) < 0 ||
4283         !pa_tagstruct_eof(t)) {
4284         protocol_error(c);
4285         return;
4286     }
4287
4288     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4289     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4290     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4291     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4292     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4293
4294     if (command == PA_COMMAND_SET_SINK_PORT) {
4295         pa_sink *sink;
4296
4297         if (idx != PA_INVALID_INDEX)
4298             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4299         else
4300             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4301
4302         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4303
4304         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4305             pa_pstream_send_error(c->pstream, tag, -ret);
4306             return;
4307         }
4308     } else {
4309         pa_source *source;
4310
4311         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4312
4313         if (idx != PA_INVALID_INDEX)
4314             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4315         else
4316             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4317
4318         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4319
4320         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4321             pa_pstream_send_error(c->pstream, tag, -ret);
4322             return;
4323         }
4324     }
4325
4326     pa_pstream_send_simple_ack(c->pstream, tag);
4327 }
4328
4329 /*** pstream callbacks ***/
4330
4331 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4332     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4333
4334     pa_assert(p);
4335     pa_assert(packet);
4336     pa_native_connection_assert_ref(c);
4337
4338     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4339         pa_log("invalid packet.");
4340         native_connection_unlink(c);
4341     }
4342 }
4343
4344 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4345     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4346     output_stream *stream;
4347
4348     pa_assert(p);
4349     pa_assert(chunk);
4350     pa_native_connection_assert_ref(c);
4351
4352     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4353         pa_log_debug("Client sent block for invalid stream.");
4354         /* Ignoring */
4355         return;
4356     }
4357
4358 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4359
4360     if (playback_stream_isinstance(stream)) {
4361         playback_stream *ps = PLAYBACK_STREAM(stream);
4362
4363         if (chunk->memblock) {
4364             if (seek != PA_SEEK_RELATIVE || offset != 0)
4365                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4366
4367             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4368         } else
4369             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4370
4371     } else {
4372         upload_stream *u = UPLOAD_STREAM(stream);
4373         size_t l;
4374
4375         if (!u->memchunk.memblock) {
4376             if (u->length == chunk->length && chunk->memblock) {
4377                 u->memchunk = *chunk;
4378                 pa_memblock_ref(u->memchunk.memblock);
4379                 u->length = 0;
4380             } else {
4381                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4382                 u->memchunk.index = u->memchunk.length = 0;
4383             }
4384         }
4385
4386         pa_assert(u->memchunk.memblock);
4387
4388         l = u->length;
4389         if (l > chunk->length)
4390             l = chunk->length;
4391
4392         if (l > 0) {
4393             void *dst;
4394             dst = pa_memblock_acquire(u->memchunk.memblock);
4395
4396             if (chunk->memblock) {
4397                 void *src;
4398                 src = pa_memblock_acquire(chunk->memblock);
4399
4400                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4401                        (uint8_t*) src + chunk->index, l);
4402
4403                 pa_memblock_release(chunk->memblock);
4404             } else
4405                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4406
4407             pa_memblock_release(u->memchunk.memblock);
4408
4409             u->memchunk.length += l;
4410             u->length -= l;
4411         }
4412     }
4413 }
4414
4415 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4416     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4417
4418     pa_assert(p);
4419     pa_native_connection_assert_ref(c);
4420
4421     native_connection_unlink(c);
4422     pa_log_info("Connection died.");
4423 }
4424
4425 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4426     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4427
4428     pa_assert(p);
4429     pa_native_connection_assert_ref(c);
4430
4431     native_connection_send_memblock(c);
4432 }
4433
4434 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4435     pa_thread_mq *q;
4436
4437     if (!(q = pa_thread_mq_get()))
4438         pa_pstream_send_revoke(p, block_id);
4439     else
4440         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4441 }
4442
4443 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4444     pa_thread_mq *q;
4445
4446     if (!(q = pa_thread_mq_get()))
4447         pa_pstream_send_release(p, block_id);
4448     else
4449         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4450 }
4451
4452 /*** client callbacks ***/
4453
4454 static void client_kill_cb(pa_client *c) {
4455     pa_assert(c);
4456
4457     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4458     pa_log_info("Connection killed.");
4459 }
4460
4461 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4462     pa_tagstruct *t;
4463     pa_native_connection *c;
4464
4465     pa_assert(client);
4466     c = PA_NATIVE_CONNECTION(client->userdata);
4467     pa_native_connection_assert_ref(c);
4468
4469     if (c->version < 15)
4470       return;
4471
4472     t = pa_tagstruct_new(NULL, 0);
4473     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4474     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4475     pa_tagstruct_puts(t, event);
4476     pa_tagstruct_put_proplist(t, pl);
4477     pa_pstream_send_tagstruct(c->pstream, t);
4478 }
4479
4480 /*** module entry points ***/
4481
4482 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
4483     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4484
4485     pa_assert(m);
4486     pa_assert(tv);
4487     pa_native_connection_assert_ref(c);
4488     pa_assert(c->auth_timeout_event == e);
4489
4490     if (!c->authorized) {
4491         native_connection_unlink(c);
4492         pa_log_info("Connection terminated due to authentication timeout.");
4493     }
4494 }
4495
4496 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4497     pa_native_connection *c;
4498     char pname[128];
4499     pa_client *client;
4500     pa_client_new_data data;
4501
4502     pa_assert(p);
4503     pa_assert(io);
4504     pa_assert(o);
4505
4506     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4507         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4508         pa_iochannel_free(io);
4509         return;
4510     }
4511
4512     pa_client_new_data_init(&data);
4513     data.module = o->module;
4514     data.driver = __FILE__;
4515     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4516     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4517     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4518     client = pa_client_new(p->core, &data);
4519     pa_client_new_data_done(&data);
4520
4521     if (!client)
4522         return;
4523
4524     c = pa_msgobject_new(pa_native_connection);
4525     c->parent.parent.free = native_connection_free;
4526     c->parent.process_msg = native_connection_process_msg;
4527     c->protocol = p;
4528     c->options = pa_native_options_ref(o);
4529     c->authorized = FALSE;
4530
4531     if (o->auth_anonymous) {
4532         pa_log_info("Client authenticated anonymously.");
4533         c->authorized = TRUE;
4534     }
4535
4536     if (!c->authorized &&
4537         o->auth_ip_acl &&
4538         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4539
4540         pa_log_info("Client authenticated by IP ACL.");
4541         c->authorized = TRUE;
4542     }
4543
4544     if (!c->authorized) {
4545         struct timeval tv;
4546         pa_gettimeofday(&tv);
4547         tv.tv_sec += AUTH_TIMEOUT;
4548         c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c);
4549     } else
4550         c->auth_timeout_event = NULL;
4551
4552     c->is_local = pa_iochannel_socket_is_local(io);
4553     c->version = 8;
4554
4555     c->client = client;
4556     c->client->kill = client_kill_cb;
4557     c->client->send_event = client_send_event_cb;
4558     c->client->userdata = c;
4559
4560     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4561     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4562     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4563     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4564     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4565     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4566     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4567
4568     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
4569
4570     c->record_streams = pa_idxset_new(NULL, NULL);
4571     c->output_streams = pa_idxset_new(NULL, NULL);
4572
4573     c->rrobin_index = PA_IDXSET_INVALID;
4574     c->subscription = NULL;
4575
4576     pa_idxset_put(p->connections, c, NULL);
4577
4578 #ifdef HAVE_CREDS
4579     if (pa_iochannel_creds_supported(io))
4580         pa_iochannel_creds_enable(io);
4581 #endif
4582
4583     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4584 }
4585
4586 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4587     pa_native_connection *c;
4588     void *state = NULL;
4589
4590     pa_assert(p);
4591     pa_assert(m);
4592
4593     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4594         if (c->options->module == m)
4595             native_connection_unlink(c);
4596 }
4597
4598 static pa_native_protocol* native_protocol_new(pa_core *c) {
4599     pa_native_protocol *p;
4600     pa_native_hook_t h;
4601
4602     pa_assert(c);
4603
4604     p = pa_xnew(pa_native_protocol, 1);
4605     PA_REFCNT_INIT(p);
4606     p->core = c;
4607     p->connections = pa_idxset_new(NULL, NULL);
4608
4609     p->servers = NULL;
4610
4611     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4612
4613     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4614         pa_hook_init(&p->hooks[h], p);
4615
4616     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4617
4618     return p;
4619 }
4620
4621 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4622     pa_native_protocol *p;
4623
4624     if ((p = pa_shared_get(c, "native-protocol")))
4625         return pa_native_protocol_ref(p);
4626
4627     return native_protocol_new(c);
4628 }
4629
4630 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4631     pa_assert(p);
4632     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4633
4634     PA_REFCNT_INC(p);
4635
4636     return p;
4637 }
4638
4639 void pa_native_protocol_unref(pa_native_protocol *p) {
4640     pa_native_connection *c;
4641     pa_native_hook_t h;
4642
4643     pa_assert(p);
4644     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4645
4646     if (PA_REFCNT_DEC(p) > 0)
4647         return;
4648
4649     while ((c = pa_idxset_first(p->connections, NULL)))
4650         native_connection_unlink(c);
4651
4652     pa_idxset_free(p->connections, NULL, NULL);
4653
4654     pa_strlist_free(p->servers);
4655
4656     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4657         pa_hook_done(&p->hooks[h]);
4658
4659     pa_hashmap_free(p->extensions, NULL, NULL);
4660
4661     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4662
4663     pa_xfree(p);
4664 }
4665
4666 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4667     pa_assert(p);
4668     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4669     pa_assert(name);
4670
4671     p->servers = pa_strlist_prepend(p->servers, name);
4672
4673     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4674 }
4675
4676 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4677     pa_assert(p);
4678     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4679     pa_assert(name);
4680
4681     p->servers = pa_strlist_remove(p->servers, name);
4682
4683     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4684 }
4685
4686 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4687     pa_assert(p);
4688     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4689
4690     return p->hooks;
4691 }
4692
4693 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4694     pa_assert(p);
4695     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4696
4697     return p->servers;
4698 }
4699
4700 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4701     pa_assert(p);
4702     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4703     pa_assert(m);
4704     pa_assert(cb);
4705     pa_assert(!pa_hashmap_get(p->extensions, m));
4706
4707     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4708     return 0;
4709 }
4710
4711 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4712     pa_assert(p);
4713     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4714     pa_assert(m);
4715
4716     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4717 }
4718
4719 pa_native_options* pa_native_options_new(void) {
4720     pa_native_options *o;
4721
4722     o = pa_xnew0(pa_native_options, 1);
4723     PA_REFCNT_INIT(o);
4724
4725     return o;
4726 }
4727
4728 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4729     pa_assert(o);
4730     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4731
4732     PA_REFCNT_INC(o);
4733
4734     return o;
4735 }
4736
4737 void pa_native_options_unref(pa_native_options *o) {
4738     pa_assert(o);
4739     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4740
4741     if (PA_REFCNT_DEC(o) > 0)
4742         return;
4743
4744     pa_xfree(o->auth_group);
4745
4746     if (o->auth_ip_acl)
4747         pa_ip_acl_free(o->auth_ip_acl);
4748
4749     if (o->auth_cookie)
4750         pa_auth_cookie_unref(o->auth_cookie);
4751
4752     pa_xfree(o);
4753 }
4754
4755 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4756     pa_bool_t enabled;
4757     const char *acl;
4758
4759     pa_assert(o);
4760     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4761     pa_assert(ma);
4762
4763     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4764         pa_log("auth-anonymous= expects a boolean argument.");
4765         return -1;
4766     }
4767
4768     enabled = TRUE;
4769     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4770         pa_log("auth-group-enabled= expects a boolean argument.");
4771         return -1;
4772     }
4773
4774     pa_xfree(o->auth_group);
4775     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4776
4777 #ifndef HAVE_CREDS
4778     if (o->auth_group)
4779         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4780 #endif
4781
4782     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4783         pa_ip_acl *ipa;
4784
4785         if (!(ipa = pa_ip_acl_new(acl))) {
4786             pa_log("Failed to parse IP ACL '%s'", acl);
4787             return -1;
4788         }
4789
4790         if (o->auth_ip_acl)
4791             pa_ip_acl_free(o->auth_ip_acl);
4792
4793         o->auth_ip_acl = ipa;
4794     }
4795
4796     enabled = TRUE;
4797     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4798         pa_log("auth-cookie-enabled= expects a boolean argument.");
4799         return -1;
4800     }
4801
4802     if (o->auth_cookie)
4803         pa_auth_cookie_unref(o->auth_cookie);
4804
4805     if (enabled) {
4806         const char *cn;
4807
4808         /* The new name for this is 'auth-cookie', for compat reasons
4809          * we check the old name too */
4810         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4811             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4812                 cn = PA_NATIVE_COOKIE_FILE;
4813
4814         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4815             return -1;
4816
4817     } else
4818           o->auth_cookie = NULL;
4819
4820     return 0;
4821 }
4822
4823 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4824     pa_native_connection_assert_ref(c);
4825
4826     return c->pstream;
4827 }