Merge commit 'origin/master-tx'
[platform/upstream/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/rtclock.h>
33 #include <pulse/timeval.h>
34 #include <pulse/version.h>
35 #include <pulse/utf8.h>
36 #include <pulse/util.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/native-common.h>
40 #include <pulsecore/packet.h>
41 #include <pulsecore/client.h>
42 #include <pulsecore/source-output.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/pstream.h>
45 #include <pulsecore/tagstruct.h>
46 #include <pulsecore/pdispatch.h>
47 #include <pulsecore/pstream-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/core-scache.h>
51 #include <pulsecore/core-subscribe.h>
52 #include <pulsecore/log.h>
53 #include <pulsecore/strlist.h>
54 #include <pulsecore/shared.h>
55 #include <pulsecore/sample-util.h>
56 #include <pulsecore/llist.h>
57 #include <pulsecore/creds.h>
58 #include <pulsecore/core-util.h>
59 #include <pulsecore/ipacl.h>
60 #include <pulsecore/thread-mq.h>
61
62 #include "protocol-native.h"
63
64 /* Kick a client if it doesn't authenticate within this time */
65 #define AUTH_TIMEOUT (60 * PA_USEC_PER_SEC)
66
67 /* Don't accept more connection than this */
68 #define MAX_CONNECTIONS 64
69
70 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
71 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
72 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
73 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
74
75 struct pa_native_protocol;
76
77 typedef struct record_stream {
78     pa_msgobject parent;
79
80     pa_native_connection *connection;
81     uint32_t index;
82
83     pa_source_output *source_output;
84     pa_memblockq *memblockq;
85
86     pa_bool_t adjust_latency:1;
87     pa_bool_t early_requests:1;
88
89     pa_buffer_attr buffer_attr;
90
91     pa_atomic_t on_the_fly;
92     pa_usec_t configured_source_latency;
93     size_t drop_initial;
94
95     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
96     size_t on_the_fly_snapshot;
97     pa_usec_t current_monitor_latency;
98     pa_usec_t current_source_latency;
99 } record_stream;
100
101 PA_DECLARE_CLASS(record_stream);
102 #define RECORD_STREAM(o) (record_stream_cast(o))
103 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
104
105 typedef struct output_stream {
106     pa_msgobject parent;
107 } output_stream;
108
109 PA_DECLARE_CLASS(output_stream);
110 #define OUTPUT_STREAM(o) (output_stream_cast(o))
111 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
112
113 typedef struct playback_stream {
114     output_stream parent;
115
116     pa_native_connection *connection;
117     uint32_t index;
118
119     pa_sink_input *sink_input;
120     pa_memblockq *memblockq;
121
122     pa_bool_t adjust_latency:1;
123     pa_bool_t early_requests:1;
124
125     pa_bool_t is_underrun:1;
126     pa_bool_t drain_request:1;
127     uint32_t drain_tag;
128     uint32_t syncid;
129
130     pa_atomic_t missing;
131     pa_usec_t configured_sink_latency;
132     pa_buffer_attr buffer_attr;
133
134     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
135     int64_t read_index, write_index;
136     size_t render_memblockq_length;
137     pa_usec_t current_sink_latency;
138     uint64_t playing_for, underrun_for;
139 } playback_stream;
140
141 PA_DECLARE_CLASS(playback_stream);
142 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
143 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
144
145 typedef struct upload_stream {
146     output_stream parent;
147
148     pa_native_connection *connection;
149     uint32_t index;
150
151     pa_memchunk memchunk;
152     size_t length;
153     char *name;
154     pa_sample_spec sample_spec;
155     pa_channel_map channel_map;
156     pa_proplist *proplist;
157 } upload_stream;
158
159 PA_DECLARE_CLASS(upload_stream);
160 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
161 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
162
163 struct pa_native_connection {
164     pa_msgobject parent;
165     pa_native_protocol *protocol;
166     pa_native_options *options;
167     pa_bool_t authorized:1;
168     pa_bool_t is_local:1;
169     uint32_t version;
170     pa_client *client;
171     pa_pstream *pstream;
172     pa_pdispatch *pdispatch;
173     pa_idxset *record_streams, *output_streams;
174     uint32_t rrobin_index;
175     pa_subscription *subscription;
176     pa_time_event *auth_timeout_event;
177 };
178
179 PA_DECLARE_CLASS(pa_native_connection);
180 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
181 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
182
183 struct pa_native_protocol {
184     PA_REFCNT_DECLARE;
185
186     pa_core *core;
187     pa_idxset *connections;
188
189     pa_strlist *servers;
190     pa_hook hooks[PA_NATIVE_HOOK_MAX];
191
192     pa_hashmap *extensions;
193 };
194
195 enum {
196     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
197 };
198
199 enum {
200     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
201     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
202     SINK_INPUT_MESSAGE_FLUSH,
203     SINK_INPUT_MESSAGE_TRIGGER,
204     SINK_INPUT_MESSAGE_SEEK,
205     SINK_INPUT_MESSAGE_PREBUF_FORCE,
206     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
207     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
208 };
209
210 enum {
211     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
212     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
213     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
214     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
215     PLAYBACK_STREAM_MESSAGE_STARTED,
216     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
217 };
218
219 enum {
220     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
221 };
222
223 enum {
224     CONNECTION_MESSAGE_RELEASE,
225     CONNECTION_MESSAGE_REVOKE
226 };
227
228 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
229 static void sink_input_kill_cb(pa_sink_input *i);
230 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
231 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
232 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
235 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
236
237 static void native_connection_send_memblock(pa_native_connection *c);
238 static void playback_stream_request_bytes(struct playback_stream*s);
239
240 static void source_output_kill_cb(pa_source_output *o);
241 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
242 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
243 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
244 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
245 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
246
247 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
249
250 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
288 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
289
290 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
291     [PA_COMMAND_ERROR] = NULL,
292     [PA_COMMAND_TIMEOUT] = NULL,
293     [PA_COMMAND_REPLY] = NULL,
294     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
295     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
296     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
297     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
298     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
299     [PA_COMMAND_AUTH] = command_auth,
300     [PA_COMMAND_REQUEST] = NULL,
301     [PA_COMMAND_EXIT] = command_exit,
302     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
303     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
304     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
305     [PA_COMMAND_STAT] = command_stat,
306     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
307     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
308     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
309     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
310     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
311     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
312     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
313     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
314     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
315     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
316     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
317     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
318     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
319     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
320     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
321     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
328     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
329     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
330     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
331
332     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
333     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
334     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
335
336     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
337     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
338     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
339
340     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
341     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
342
343     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
344     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
346     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
347
348     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
349     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
350
351     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
352     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
353     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
354     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
355     [PA_COMMAND_KILL_CLIENT] = command_kill,
356     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
357     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
358     [PA_COMMAND_LOAD_MODULE] = command_load_module,
359     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
360
361     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
362     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
363     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
364     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
365
366     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
367     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
368
369     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
370     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
371
372     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
373     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
374
375     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
376     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
377     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
378
379     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
380     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
381     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
382
383     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
384
385     [PA_COMMAND_SET_SINK_PORT] = command_set_sink_or_source_port,
386     [PA_COMMAND_SET_SOURCE_PORT] = command_set_sink_or_source_port,
387
388     [PA_COMMAND_EXTENSION] = command_extension
389 };
390
391 /* structure management */
392
393 /* Called from main context */
394 static void upload_stream_unlink(upload_stream *s) {
395     pa_assert(s);
396
397     if (!s->connection)
398         return;
399
400     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
401     s->connection = NULL;
402     upload_stream_unref(s);
403 }
404
405 /* Called from main context */
406 static void upload_stream_free(pa_object *o) {
407     upload_stream *s = UPLOAD_STREAM(o);
408     pa_assert(s);
409
410     upload_stream_unlink(s);
411
412     pa_xfree(s->name);
413
414     if (s->proplist)
415         pa_proplist_free(s->proplist);
416
417     if (s->memchunk.memblock)
418         pa_memblock_unref(s->memchunk.memblock);
419
420     pa_xfree(s);
421 }
422
423 /* Called from main context */
424 static upload_stream* upload_stream_new(
425         pa_native_connection *c,
426         const pa_sample_spec *ss,
427         const pa_channel_map *map,
428         const char *name,
429         size_t length,
430         pa_proplist *p) {
431
432     upload_stream *s;
433
434     pa_assert(c);
435     pa_assert(ss);
436     pa_assert(name);
437     pa_assert(length > 0);
438     pa_assert(p);
439
440     s = pa_msgobject_new(upload_stream);
441     s->parent.parent.parent.free = upload_stream_free;
442     s->connection = c;
443     s->sample_spec = *ss;
444     s->channel_map = *map;
445     s->name = pa_xstrdup(name);
446     pa_memchunk_reset(&s->memchunk);
447     s->length = length;
448     s->proplist = pa_proplist_copy(p);
449     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
450
451     pa_idxset_put(c->output_streams, s, &s->index);
452
453     return s;
454 }
455
456 /* Called from main context */
457 static void record_stream_unlink(record_stream *s) {
458     pa_assert(s);
459
460     if (!s->connection)
461         return;
462
463     if (s->source_output) {
464         pa_source_output_unlink(s->source_output);
465         pa_source_output_unref(s->source_output);
466         s->source_output = NULL;
467     }
468
469     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
470     s->connection = NULL;
471     record_stream_unref(s);
472 }
473
474 /* Called from main context */
475 static void record_stream_free(pa_object *o) {
476     record_stream *s = RECORD_STREAM(o);
477     pa_assert(s);
478
479     record_stream_unlink(s);
480
481     pa_memblockq_free(s->memblockq);
482     pa_xfree(s);
483 }
484
485 /* Called from main context */
486 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
487     record_stream *s = RECORD_STREAM(o);
488     record_stream_assert_ref(s);
489
490     if (!s->connection)
491         return -1;
492
493     switch (code) {
494
495         case RECORD_STREAM_MESSAGE_POST_DATA:
496
497             /* We try to keep up to date with how many bytes are
498              * currently on the fly */
499             pa_atomic_sub(&s->on_the_fly, chunk->length);
500
501             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
502 /*                 pa_log_warn("Failed to push data into output queue."); */
503                 return -1;
504             }
505
506             if (!pa_pstream_is_pending(s->connection->pstream))
507                 native_connection_send_memblock(s->connection);
508
509             break;
510     }
511
512     return 0;
513 }
514
515 /* Called from main context */
516 static void fix_record_buffer_attr_pre(record_stream *s) {
517
518     size_t frame_size;
519     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
520
521     pa_assert(s);
522
523     /* This function will be called from the main thread, before as
524      * well as after the source output has been activated using
525      * pa_source_output_put()! That means it may not touch any
526      * ->thread_info data! */
527
528     frame_size = pa_frame_size(&s->source_output->sample_spec);
529
530     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
531         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
532     if (s->buffer_attr.maxlength <= 0)
533         s->buffer_attr.maxlength = (uint32_t) frame_size;
534
535     if (s->buffer_attr.fragsize == (uint32_t) -1)
536         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
537     if (s->buffer_attr.fragsize <= 0)
538         s->buffer_attr.fragsize = (uint32_t) frame_size;
539
540     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
541
542     if (s->early_requests) {
543
544         /* In early request mode we need to emulate the classic
545          * fragment-based playback model. We do this setting the source
546          * latency to the fragment size. */
547
548         source_usec = fragsize_usec;
549
550     } else if (s->adjust_latency) {
551
552         /* So, the user asked us to adjust the latency according to
553          * what the source can provide. Half the latency will be
554          * spent on the hw buffer, half of it in the async buffer
555          * queue we maintain for each client. */
556
557         source_usec = fragsize_usec/2;
558
559     } else {
560
561         /* Ok, the user didn't ask us to adjust the latency, hence we
562          * don't */
563
564         source_usec = (pa_usec_t) -1;
565     }
566
567     if (source_usec != (pa_usec_t) -1)
568         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
569     else
570         s->configured_source_latency = 0;
571
572     if (s->early_requests) {
573
574         /* Ok, we didn't necessarily get what we were asking for, so
575          * let's tell the user */
576
577         fragsize_usec = s->configured_source_latency;
578
579     } else if (s->adjust_latency) {
580
581         /* Now subtract what we actually got */
582
583         if (fragsize_usec >= s->configured_source_latency*2)
584             fragsize_usec -= s->configured_source_latency;
585         else
586             fragsize_usec = s->configured_source_latency;
587     }
588
589     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
590         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
591
592         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
593
594     if (s->buffer_attr.fragsize <= 0)
595         s->buffer_attr.fragsize = (uint32_t) frame_size;
596 }
597
598 /* Called from main context */
599 static void fix_record_buffer_attr_post(record_stream *s) {
600     size_t base;
601
602     pa_assert(s);
603
604     /* This function will be called from the main thread, before as
605      * well as after the source output has been activated using
606      * pa_source_output_put()! That means it may not touch and
607      * ->thread_info data! */
608
609     base = pa_frame_size(&s->source_output->sample_spec);
610
611     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
612     if (s->buffer_attr.fragsize <= 0)
613         s->buffer_attr.fragsize = base;
614
615     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
616         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
617 }
618
619 /* Called from main context */
620 static record_stream* record_stream_new(
621         pa_native_connection *c,
622         pa_source *source,
623         pa_sample_spec *ss,
624         pa_channel_map *map,
625         pa_bool_t peak_detect,
626         pa_buffer_attr *attr,
627         pa_source_output_flags_t flags,
628         pa_proplist *p,
629         pa_bool_t adjust_latency,
630         pa_sink_input *direct_on_input,
631         pa_bool_t early_requests,
632         int *ret) {
633
634     record_stream *s;
635     pa_source_output *source_output = NULL;
636     size_t base;
637     pa_source_output_new_data data;
638
639     pa_assert(c);
640     pa_assert(ss);
641     pa_assert(p);
642     pa_assert(ret);
643
644     pa_source_output_new_data_init(&data);
645
646     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
647     data.driver = __FILE__;
648     data.module = c->options->module;
649     data.client = c->client;
650     data.source = source;
651     data.direct_on_input = direct_on_input;
652     pa_source_output_new_data_set_sample_spec(&data, ss);
653     pa_source_output_new_data_set_channel_map(&data, map);
654     if (peak_detect)
655         data.resample_method = PA_RESAMPLER_PEAKS;
656
657     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
658
659     pa_source_output_new_data_done(&data);
660
661     if (!source_output)
662         return NULL;
663
664     s = pa_msgobject_new(record_stream);
665     s->parent.parent.free = record_stream_free;
666     s->parent.process_msg = record_stream_process_msg;
667     s->connection = c;
668     s->source_output = source_output;
669     s->buffer_attr = *attr;
670     s->adjust_latency = adjust_latency;
671     s->early_requests = early_requests;
672     pa_atomic_store(&s->on_the_fly, 0);
673
674     s->source_output->parent.process_msg = source_output_process_msg;
675     s->source_output->push = source_output_push_cb;
676     s->source_output->kill = source_output_kill_cb;
677     s->source_output->get_latency = source_output_get_latency_cb;
678     s->source_output->moving = source_output_moving_cb;
679     s->source_output->suspend = source_output_suspend_cb;
680     s->source_output->send_event = source_output_send_event_cb;
681     s->source_output->userdata = s;
682
683     fix_record_buffer_attr_pre(s);
684
685     s->memblockq = pa_memblockq_new(
686             0,
687             s->buffer_attr.maxlength,
688             0,
689             base = pa_frame_size(&source_output->sample_spec),
690             1,
691             0,
692             0,
693             NULL);
694
695     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
696     fix_record_buffer_attr_post(s);
697
698     *ss = s->source_output->sample_spec;
699     *map = s->source_output->channel_map;
700
701     pa_idxset_put(c->record_streams, s, &s->index);
702
703     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
704                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
705                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
706                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
707
708     pa_source_output_put(s->source_output);
709     return s;
710 }
711
712 /* Called from main context */
713 static void record_stream_send_killed(record_stream *r) {
714     pa_tagstruct *t;
715     record_stream_assert_ref(r);
716
717     t = pa_tagstruct_new(NULL, 0);
718     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
719     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
720     pa_tagstruct_putu32(t, r->index);
721     pa_pstream_send_tagstruct(r->connection->pstream, t);
722 }
723
724 /* Called from main context */
725 static void playback_stream_unlink(playback_stream *s) {
726     pa_assert(s);
727
728     if (!s->connection)
729         return;
730
731     if (s->sink_input) {
732         pa_sink_input_unlink(s->sink_input);
733         pa_sink_input_unref(s->sink_input);
734         s->sink_input = NULL;
735     }
736
737     if (s->drain_request)
738         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
739
740     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
741     s->connection = NULL;
742     playback_stream_unref(s);
743 }
744
745 /* Called from main context */
746 static void playback_stream_free(pa_object* o) {
747     playback_stream *s = PLAYBACK_STREAM(o);
748     pa_assert(s);
749
750     playback_stream_unlink(s);
751
752     pa_memblockq_free(s->memblockq);
753     pa_xfree(s);
754 }
755
756 /* Called from main context */
757 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
758     playback_stream *s = PLAYBACK_STREAM(o);
759     playback_stream_assert_ref(s);
760
761     if (!s->connection)
762         return -1;
763
764     switch (code) {
765
766         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
767             pa_tagstruct *t;
768             int l = 0;
769
770             for (;;) {
771                 if ((l = pa_atomic_load(&s->missing)) <= 0)
772                     return 0;
773
774                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
775                     break;
776             }
777
778             t = pa_tagstruct_new(NULL, 0);
779             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
780             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
781             pa_tagstruct_putu32(t, s->index);
782             pa_tagstruct_putu32(t, (uint32_t) l);
783             pa_pstream_send_tagstruct(s->connection->pstream, t);
784
785 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
786             break;
787         }
788
789         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
790             pa_tagstruct *t;
791
792 /*             pa_log("signalling underflow"); */
793
794             /* Report that we're empty */
795             t = pa_tagstruct_new(NULL, 0);
796             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
797             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
798             pa_tagstruct_putu32(t, s->index);
799             pa_pstream_send_tagstruct(s->connection->pstream, t);
800             break;
801         }
802
803         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
804             pa_tagstruct *t;
805
806             /* Notify the user we're overflowed*/
807             t = pa_tagstruct_new(NULL, 0);
808             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
809             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
810             pa_tagstruct_putu32(t, s->index);
811             pa_pstream_send_tagstruct(s->connection->pstream, t);
812             break;
813         }
814
815         case PLAYBACK_STREAM_MESSAGE_STARTED:
816
817             if (s->connection->version >= 13) {
818                 pa_tagstruct *t;
819
820                 /* Notify the user we started playback */
821                 t = pa_tagstruct_new(NULL, 0);
822                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
823                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
824                 pa_tagstruct_putu32(t, s->index);
825                 pa_pstream_send_tagstruct(s->connection->pstream, t);
826             }
827
828             break;
829
830         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
831             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
832             break;
833
834         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
835             pa_tagstruct *t;
836
837             s->buffer_attr.tlength = (uint32_t) offset;
838
839             t = pa_tagstruct_new(NULL, 0);
840             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
841             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
842             pa_tagstruct_putu32(t, s->index);
843             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
844             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
845             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
846             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
847             pa_tagstruct_put_usec(t, s->configured_sink_latency);
848             pa_pstream_send_tagstruct(s->connection->pstream, t);
849
850             break;
851         }
852     }
853
854     return 0;
855 }
856
857 /* Called from main context */
858 static void fix_playback_buffer_attr(playback_stream *s) {
859     size_t frame_size, max_prebuf;
860     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
861
862     pa_assert(s);
863
864     /* This function will be called from the main thread, before as
865      * well as after the sink input has been activated using
866      * pa_sink_input_put()! That means it may not touch any
867      * ->thread_info data, such as the memblockq! */
868
869     frame_size = pa_frame_size(&s->sink_input->sample_spec);
870
871     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
872         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
873     if (s->buffer_attr.maxlength <= 0)
874         s->buffer_attr.maxlength = (uint32_t) frame_size;
875
876     if (s->buffer_attr.tlength == (uint32_t) -1)
877         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
878     if (s->buffer_attr.tlength <= 0)
879         s->buffer_attr.tlength = (uint32_t) frame_size;
880
881     if (s->buffer_attr.minreq == (uint32_t) -1)
882         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
883     if (s->buffer_attr.minreq <= 0)
884         s->buffer_attr.minreq = (uint32_t) frame_size;
885
886     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
887         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
888
889     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
890     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
891
892     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
893                 (double) tlength_usec / PA_USEC_PER_MSEC,
894                 (double) minreq_usec / PA_USEC_PER_MSEC);
895
896     if (s->early_requests) {
897
898         /* In early request mode we need to emulate the classic
899          * fragment-based playback model. We do this setting the sink
900          * latency to the fragment size. */
901
902         sink_usec = minreq_usec;
903         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
904
905     } else if (s->adjust_latency) {
906
907         /* So, the user asked us to adjust the latency of the stream
908          * buffer according to the what the sink can provide. The
909          * tlength passed in shall be the overall latency. Roughly
910          * half the latency will be spent on the hw buffer, the other
911          * half of it in the async buffer queue we maintain for each
912          * client. In between we'll have a safety space of size
913          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
914          * empty and needs to be filled, then our buffer must have
915          * enough data to fulfill this request immediatly and thus
916          * have at least the same tlength as the size of the hw
917          * buffer. It additionally needs space for 2 times minreq
918          * because if the buffer ran empty and a partial fillup
919          * happens immediately on the next iteration we need to be
920          * able to fulfill it and give the application also minreq
921          * time to fill it up again for the next request Makes 2 times
922          * minreq in plus.. */
923
924         if (tlength_usec > minreq_usec*2)
925             sink_usec = (tlength_usec - minreq_usec*2)/2;
926         else
927             sink_usec = 0;
928
929         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
930
931     } else {
932
933         /* Ok, the user didn't ask us to adjust the latency, but we
934          * still need to make sure that the parameters from the user
935          * do make sense. */
936
937         if (tlength_usec > minreq_usec*2)
938             sink_usec = (tlength_usec - minreq_usec*2);
939         else
940             sink_usec = 0;
941
942         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
943     }
944
945     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
946
947     if (s->early_requests) {
948
949         /* Ok, we didn't necessarily get what we were asking for, so
950          * let's tell the user */
951
952         minreq_usec = s->configured_sink_latency;
953
954     } else if (s->adjust_latency) {
955
956         /* Ok, we didn't necessarily get what we were asking for, so
957          * let's subtract from what we asked for for the remaining
958          * buffer space */
959
960         if (tlength_usec >= s->configured_sink_latency)
961             tlength_usec -= s->configured_sink_latency;
962     }
963
964     /* FIXME: This is actually larger than necessary, since not all of
965      * the sink latency is actually rewritable. */
966     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
967         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
968
969     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
970         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
971         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
972
973     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
974         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
975         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
976
977     if (s->buffer_attr.minreq <= 0) {
978         s->buffer_attr.minreq = (uint32_t) frame_size;
979         s->buffer_attr.tlength += (uint32_t) frame_size*2;
980     }
981
982     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
983         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
984
985     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
986
987     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
988         s->buffer_attr.prebuf > max_prebuf)
989         s->buffer_attr.prebuf = max_prebuf;
990 }
991
992 /* Called from main context */
993 static playback_stream* playback_stream_new(
994         pa_native_connection *c,
995         pa_sink *sink,
996         pa_sample_spec *ss,
997         pa_channel_map *map,
998         pa_buffer_attr *a,
999         pa_cvolume *volume,
1000         pa_bool_t muted,
1001         pa_bool_t muted_set,
1002         uint32_t syncid,
1003         uint32_t *missing,
1004         pa_sink_input_flags_t flags,
1005         pa_proplist *p,
1006         pa_bool_t adjust_latency,
1007         pa_bool_t early_requests,
1008         int *ret) {
1009
1010     playback_stream *s, *ssync;
1011     pa_sink_input *sink_input = NULL;
1012     pa_memchunk silence;
1013     uint32_t idx;
1014     int64_t start_index;
1015     pa_sink_input_new_data data;
1016
1017     pa_assert(c);
1018     pa_assert(ss);
1019     pa_assert(missing);
1020     pa_assert(p);
1021     pa_assert(ret);
1022
1023     /* Find syncid group */
1024     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1025
1026         if (!playback_stream_isinstance(ssync))
1027             continue;
1028
1029         if (ssync->syncid == syncid)
1030             break;
1031     }
1032
1033     /* Synced streams must connect to the same sink */
1034     if (ssync) {
1035
1036         if (!sink)
1037             sink = ssync->sink_input->sink;
1038         else if (sink != ssync->sink_input->sink) {
1039             *ret = PA_ERR_INVALID;
1040             return NULL;
1041         }
1042     }
1043
1044     pa_sink_input_new_data_init(&data);
1045
1046     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1047     data.driver = __FILE__;
1048     data.module = c->options->module;
1049     data.client = c->client;
1050     data.sink = sink;
1051     pa_sink_input_new_data_set_sample_spec(&data, ss);
1052     pa_sink_input_new_data_set_channel_map(&data, map);
1053     if (volume)
1054         pa_sink_input_new_data_set_volume(&data, volume);
1055     if (muted_set)
1056         pa_sink_input_new_data_set_muted(&data, muted);
1057     data.sync_base = ssync ? ssync->sink_input : NULL;
1058
1059     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1060
1061     pa_sink_input_new_data_done(&data);
1062
1063     if (!sink_input)
1064         return NULL;
1065
1066     s = pa_msgobject_new(playback_stream);
1067     s->parent.parent.parent.free = playback_stream_free;
1068     s->parent.parent.process_msg = playback_stream_process_msg;
1069     s->connection = c;
1070     s->syncid = syncid;
1071     s->sink_input = sink_input;
1072     s->is_underrun = TRUE;
1073     s->drain_request = FALSE;
1074     pa_atomic_store(&s->missing, 0);
1075     s->buffer_attr = *a;
1076     s->adjust_latency = adjust_latency;
1077     s->early_requests = early_requests;
1078
1079     s->sink_input->parent.process_msg = sink_input_process_msg;
1080     s->sink_input->pop = sink_input_pop_cb;
1081     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1082     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1083     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1084     s->sink_input->kill = sink_input_kill_cb;
1085     s->sink_input->moving = sink_input_moving_cb;
1086     s->sink_input->suspend = sink_input_suspend_cb;
1087     s->sink_input->send_event = sink_input_send_event_cb;
1088     s->sink_input->userdata = s;
1089
1090     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1091
1092     fix_playback_buffer_attr(s);
1093
1094     pa_sink_input_get_silence(sink_input, &silence);
1095     s->memblockq = pa_memblockq_new(
1096             start_index,
1097             s->buffer_attr.maxlength,
1098             s->buffer_attr.tlength,
1099             pa_frame_size(&sink_input->sample_spec),
1100             s->buffer_attr.prebuf,
1101             s->buffer_attr.minreq,
1102             0,
1103             &silence);
1104     pa_memblock_unref(silence.memblock);
1105
1106     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1107
1108     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1109
1110     *ss = s->sink_input->sample_spec;
1111     *map = s->sink_input->channel_map;
1112
1113     pa_idxset_put(c->output_streams, s, &s->index);
1114
1115     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1116                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1117                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1118                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1119                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1120
1121     pa_sink_input_put(s->sink_input);
1122     return s;
1123 }
1124
1125 /* Called from IO context */
1126 static void playback_stream_request_bytes(playback_stream *s) {
1127     size_t m, minreq;
1128     int previous_missing;
1129
1130     playback_stream_assert_ref(s);
1131
1132     m = pa_memblockq_pop_missing(s->memblockq);
1133
1134     /* pa_log("request_bytes(%lu) (tlength=%lu minreq=%lu length=%lu)", */
1135     /*        (unsigned long) m, */
1136     /*        pa_memblockq_get_tlength(s->memblockq), */
1137     /*        pa_memblockq_get_minreq(s->memblockq), */
1138     /*        pa_memblockq_get_length(s->memblockq)); */
1139
1140     if (m <= 0)
1141         return;
1142
1143 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1144
1145     previous_missing = pa_atomic_add(&s->missing, (int) m);
1146     minreq = pa_memblockq_get_minreq(s->memblockq);
1147
1148     if (pa_memblockq_prebuf_active(s->memblockq) ||
1149         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1150         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1151 }
1152
1153 /* Called from main context */
1154 static void playback_stream_send_killed(playback_stream *p) {
1155     pa_tagstruct *t;
1156     playback_stream_assert_ref(p);
1157
1158     t = pa_tagstruct_new(NULL, 0);
1159     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1160     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1161     pa_tagstruct_putu32(t, p->index);
1162     pa_pstream_send_tagstruct(p->connection->pstream, t);
1163 }
1164
1165 /* Called from main context */
1166 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1167     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1168     pa_native_connection_assert_ref(c);
1169
1170     if (!c->protocol)
1171         return -1;
1172
1173     switch (code) {
1174
1175         case CONNECTION_MESSAGE_REVOKE:
1176             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1177             break;
1178
1179         case CONNECTION_MESSAGE_RELEASE:
1180             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1181             break;
1182     }
1183
1184     return 0;
1185 }
1186
1187 /* Called from main context */
1188 static void native_connection_unlink(pa_native_connection *c) {
1189     record_stream *r;
1190     output_stream *o;
1191
1192     pa_assert(c);
1193
1194     if (!c->protocol)
1195         return;
1196
1197     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1198
1199     if (c->options)
1200         pa_native_options_unref(c->options);
1201
1202     while ((r = pa_idxset_first(c->record_streams, NULL)))
1203         record_stream_unlink(r);
1204
1205     while ((o = pa_idxset_first(c->output_streams, NULL)))
1206         if (playback_stream_isinstance(o))
1207             playback_stream_unlink(PLAYBACK_STREAM(o));
1208         else
1209             upload_stream_unlink(UPLOAD_STREAM(o));
1210
1211     if (c->subscription)
1212         pa_subscription_free(c->subscription);
1213
1214     if (c->pstream)
1215         pa_pstream_unlink(c->pstream);
1216
1217     if (c->auth_timeout_event) {
1218         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1219         c->auth_timeout_event = NULL;
1220     }
1221
1222     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1223     c->protocol = NULL;
1224     pa_native_connection_unref(c);
1225 }
1226
1227 /* Called from main context */
1228 static void native_connection_free(pa_object *o) {
1229     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1230
1231     pa_assert(c);
1232
1233     native_connection_unlink(c);
1234
1235     pa_idxset_free(c->record_streams, NULL, NULL);
1236     pa_idxset_free(c->output_streams, NULL, NULL);
1237
1238     pa_pdispatch_unref(c->pdispatch);
1239     pa_pstream_unref(c->pstream);
1240     pa_client_free(c->client);
1241
1242     pa_xfree(c);
1243 }
1244
1245 /* Called from main context */
1246 static void native_connection_send_memblock(pa_native_connection *c) {
1247     uint32_t start;
1248     record_stream *r;
1249
1250     start = PA_IDXSET_INVALID;
1251     for (;;) {
1252         pa_memchunk chunk;
1253
1254         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1255             return;
1256
1257         if (start == PA_IDXSET_INVALID)
1258             start = c->rrobin_index;
1259         else if (start == c->rrobin_index)
1260             return;
1261
1262         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1263             pa_memchunk schunk = chunk;
1264
1265             if (schunk.length > r->buffer_attr.fragsize)
1266                 schunk.length = r->buffer_attr.fragsize;
1267
1268             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1269
1270             pa_memblockq_drop(r->memblockq, schunk.length);
1271             pa_memblock_unref(schunk.memblock);
1272
1273             return;
1274         }
1275     }
1276 }
1277
1278 /*** sink input callbacks ***/
1279
1280 /* Called from thread context */
1281 static void handle_seek(playback_stream *s, int64_t indexw) {
1282     playback_stream_assert_ref(s);
1283
1284 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1285
1286     if (s->sink_input->thread_info.underrun_for > 0) {
1287
1288 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1289
1290         if (pa_memblockq_is_readable(s->memblockq)) {
1291
1292             /* We just ended an underrun, let's ask the sink
1293              * for a complete rewind rewrite */
1294
1295             pa_log_debug("Requesting rewind due to end of underrun.");
1296             pa_sink_input_request_rewind(s->sink_input,
1297                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1298                                          FALSE, TRUE, FALSE);
1299         }
1300
1301     } else {
1302         int64_t indexr;
1303
1304         indexr = pa_memblockq_get_read_index(s->memblockq);
1305
1306         if (indexw < indexr) {
1307             /* OK, the sink already asked for this data, so
1308              * let's have it usk us again */
1309
1310             pa_log_debug("Requesting rewind due to rewrite.");
1311             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1312         }
1313     }
1314
1315     playback_stream_request_bytes(s);
1316 }
1317
1318 /* Called from thread context */
1319 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1320     pa_sink_input *i = PA_SINK_INPUT(o);
1321     playback_stream *s;
1322
1323     pa_sink_input_assert_ref(i);
1324     s = PLAYBACK_STREAM(i->userdata);
1325     playback_stream_assert_ref(s);
1326
1327     switch (code) {
1328
1329         case SINK_INPUT_MESSAGE_SEEK: {
1330             int64_t windex;
1331
1332             windex = pa_memblockq_get_write_index(s->memblockq);
1333
1334             /* The client side is incapable of accounting correctly
1335              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1336              * able to deal with that. */
1337
1338             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1339
1340             handle_seek(s, windex);
1341             return 0;
1342         }
1343
1344         case SINK_INPUT_MESSAGE_POST_DATA: {
1345             int64_t windex;
1346
1347             pa_assert(chunk);
1348
1349             windex = pa_memblockq_get_write_index(s->memblockq);
1350
1351 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1352
1353             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1354
1355                 if (pa_log_ratelimit())
1356                     pa_log_warn("Failed to push data into queue");
1357                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1358                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1359             }
1360
1361             handle_seek(s, windex);
1362
1363 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1364
1365             return 0;
1366         }
1367
1368         case SINK_INPUT_MESSAGE_DRAIN:
1369         case SINK_INPUT_MESSAGE_FLUSH:
1370         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1371         case SINK_INPUT_MESSAGE_TRIGGER: {
1372
1373             int64_t windex;
1374             pa_sink_input *isync;
1375             void (*func)(pa_memblockq *bq);
1376
1377             switch  (code) {
1378                 case SINK_INPUT_MESSAGE_FLUSH:
1379                     func = pa_memblockq_flush_write;
1380                     break;
1381
1382                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1383                     func = pa_memblockq_prebuf_force;
1384                     break;
1385
1386                 case SINK_INPUT_MESSAGE_DRAIN:
1387                 case SINK_INPUT_MESSAGE_TRIGGER:
1388                     func = pa_memblockq_prebuf_disable;
1389                     break;
1390
1391                 default:
1392                     pa_assert_not_reached();
1393             }
1394
1395             windex = pa_memblockq_get_write_index(s->memblockq);
1396             func(s->memblockq);
1397             handle_seek(s, windex);
1398
1399             /* Do the same for all other members in the sync group */
1400             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1401                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1402                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1403                 func(ssync->memblockq);
1404                 handle_seek(ssync, windex);
1405             }
1406
1407             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1408                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1409                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1410                 func(ssync->memblockq);
1411                 handle_seek(ssync, windex);
1412             }
1413
1414             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1415                 if (!pa_memblockq_is_readable(s->memblockq))
1416                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1417                 else {
1418                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1419                     s->drain_request = TRUE;
1420                 }
1421             }
1422
1423             return 0;
1424         }
1425
1426         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1427             /* Atomically get a snapshot of all timing parameters... */
1428             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1429             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1430             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1431             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1432             s->underrun_for = s->sink_input->thread_info.underrun_for;
1433             s->playing_for = s->sink_input->thread_info.playing_for;
1434
1435             return 0;
1436
1437         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1438             int64_t windex;
1439
1440             windex = pa_memblockq_get_write_index(s->memblockq);
1441
1442             pa_memblockq_prebuf_force(s->memblockq);
1443
1444             handle_seek(s, windex);
1445
1446             /* Fall through to the default handler */
1447             break;
1448         }
1449
1450         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1451             pa_usec_t *r = userdata;
1452
1453             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1454
1455             /* Fall through, the default handler will add in the extra
1456              * latency added by the resampler */
1457             break;
1458         }
1459
1460         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1461             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1462             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1463             return 0;
1464         }
1465     }
1466
1467     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1468 }
1469
1470 /* Called from thread context */
1471 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1472     playback_stream *s;
1473
1474     pa_sink_input_assert_ref(i);
1475     s = PLAYBACK_STREAM(i->userdata);
1476     playback_stream_assert_ref(s);
1477     pa_assert(chunk);
1478
1479 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1480
1481     if (pa_memblockq_is_readable(s->memblockq))
1482         s->is_underrun = FALSE;
1483     else {
1484         if (!s->is_underrun)
1485             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1486
1487         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1488             s->drain_request = FALSE;
1489             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1490         } else if (!s->is_underrun)
1491             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1492
1493         s->is_underrun = TRUE;
1494
1495         playback_stream_request_bytes(s);
1496     }
1497
1498     /* This call will not fail with prebuf=0, hence we check for
1499        underrun explicitly above */
1500     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1501         return -1;
1502
1503     chunk->length = PA_MIN(nbytes, chunk->length);
1504
1505     if (i->thread_info.underrun_for > 0)
1506         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1507
1508     pa_memblockq_drop(s->memblockq, chunk->length);
1509     playback_stream_request_bytes(s);
1510
1511     return 0;
1512 }
1513
1514 /* Called from thread context */
1515 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1516     playback_stream *s;
1517
1518     pa_sink_input_assert_ref(i);
1519     s = PLAYBACK_STREAM(i->userdata);
1520     playback_stream_assert_ref(s);
1521
1522     /* If we are in an underrun, then we don't rewind */
1523     if (i->thread_info.underrun_for > 0)
1524         return;
1525
1526     pa_memblockq_rewind(s->memblockq, nbytes);
1527 }
1528
1529 /* Called from thread context */
1530 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1531     playback_stream *s;
1532
1533     pa_sink_input_assert_ref(i);
1534     s = PLAYBACK_STREAM(i->userdata);
1535     playback_stream_assert_ref(s);
1536
1537     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1538 }
1539
1540 /* Called from thread context */
1541 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1542     playback_stream *s;
1543     size_t new_tlength, old_tlength;
1544
1545     pa_sink_input_assert_ref(i);
1546     s = PLAYBACK_STREAM(i->userdata);
1547     playback_stream_assert_ref(s);
1548
1549     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1550     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1551
1552     if (old_tlength < new_tlength) {
1553         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1554         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1555         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1556
1557         if (new_tlength == old_tlength)
1558             pa_log_debug("Failed to increase tlength");
1559         else {
1560             pa_log_debug("Notifying client about increased tlength");
1561             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1562         }
1563     }
1564 }
1565
1566 /* Called from main context */
1567 static void sink_input_kill_cb(pa_sink_input *i) {
1568     playback_stream *s;
1569
1570     pa_sink_input_assert_ref(i);
1571     s = PLAYBACK_STREAM(i->userdata);
1572     playback_stream_assert_ref(s);
1573
1574     playback_stream_send_killed(s);
1575     playback_stream_unlink(s);
1576 }
1577
1578 /* Called from main context */
1579 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1580     playback_stream *s;
1581     pa_tagstruct *t;
1582
1583     pa_sink_input_assert_ref(i);
1584     s = PLAYBACK_STREAM(i->userdata);
1585     playback_stream_assert_ref(s);
1586
1587     if (s->connection->version < 15)
1588       return;
1589
1590     t = pa_tagstruct_new(NULL, 0);
1591     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1592     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1593     pa_tagstruct_putu32(t, s->index);
1594     pa_tagstruct_puts(t, event);
1595     pa_tagstruct_put_proplist(t, pl);
1596     pa_pstream_send_tagstruct(s->connection->pstream, t);
1597 }
1598
1599 /* Called from main context */
1600 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1601     playback_stream *s;
1602     pa_tagstruct *t;
1603
1604     pa_sink_input_assert_ref(i);
1605     s = PLAYBACK_STREAM(i->userdata);
1606     playback_stream_assert_ref(s);
1607
1608     if (s->connection->version < 12)
1609       return;
1610
1611     t = pa_tagstruct_new(NULL, 0);
1612     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1613     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1614     pa_tagstruct_putu32(t, s->index);
1615     pa_tagstruct_put_boolean(t, suspend);
1616     pa_pstream_send_tagstruct(s->connection->pstream, t);
1617 }
1618
1619 /* Called from main context */
1620 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1621     playback_stream *s;
1622     pa_tagstruct *t;
1623
1624     pa_sink_input_assert_ref(i);
1625     s = PLAYBACK_STREAM(i->userdata);
1626     playback_stream_assert_ref(s);
1627
1628     if (!dest)
1629         return;
1630
1631     fix_playback_buffer_attr(s);
1632     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1633     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1634
1635     if (s->connection->version < 12)
1636       return;
1637
1638     t = pa_tagstruct_new(NULL, 0);
1639     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1640     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1641     pa_tagstruct_putu32(t, s->index);
1642     pa_tagstruct_putu32(t, dest->index);
1643     pa_tagstruct_puts(t, dest->name);
1644     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1645
1646     if (s->connection->version >= 13) {
1647         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1648         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1649         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1650         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1651         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1652     }
1653
1654     pa_pstream_send_tagstruct(s->connection->pstream, t);
1655 }
1656
1657 /*** source_output callbacks ***/
1658
1659 /* Called from thread context */
1660 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1661     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1662     record_stream *s;
1663
1664     pa_source_output_assert_ref(o);
1665     s = RECORD_STREAM(o->userdata);
1666     record_stream_assert_ref(s);
1667
1668     switch (code) {
1669         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1670             /* Atomically get a snapshot of all timing parameters... */
1671             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1672             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1673             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1674             return 0;
1675     }
1676
1677     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1678 }
1679
1680 /* Called from thread context */
1681 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1682     record_stream *s;
1683
1684     pa_source_output_assert_ref(o);
1685     s = RECORD_STREAM(o->userdata);
1686     record_stream_assert_ref(s);
1687     pa_assert(chunk);
1688
1689     pa_atomic_add(&s->on_the_fly, chunk->length);
1690     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1691 }
1692
1693 static void source_output_kill_cb(pa_source_output *o) {
1694     record_stream *s;
1695
1696     pa_source_output_assert_ref(o);
1697     s = RECORD_STREAM(o->userdata);
1698     record_stream_assert_ref(s);
1699
1700     record_stream_send_killed(s);
1701     record_stream_unlink(s);
1702 }
1703
1704 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1705     record_stream *s;
1706
1707     pa_source_output_assert_ref(o);
1708     s = RECORD_STREAM(o->userdata);
1709     record_stream_assert_ref(s);
1710
1711     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1712
1713     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1714 }
1715
1716 /* Called from main context */
1717 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1718     record_stream *s;
1719     pa_tagstruct *t;
1720
1721     pa_source_output_assert_ref(o);
1722     s = RECORD_STREAM(o->userdata);
1723     record_stream_assert_ref(s);
1724
1725     if (s->connection->version < 15)
1726       return;
1727
1728     t = pa_tagstruct_new(NULL, 0);
1729     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1730     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1731     pa_tagstruct_putu32(t, s->index);
1732     pa_tagstruct_puts(t, event);
1733     pa_tagstruct_put_proplist(t, pl);
1734     pa_pstream_send_tagstruct(s->connection->pstream, t);
1735 }
1736
1737 /* Called from main context */
1738 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1739     record_stream *s;
1740     pa_tagstruct *t;
1741
1742     pa_source_output_assert_ref(o);
1743     s = RECORD_STREAM(o->userdata);
1744     record_stream_assert_ref(s);
1745
1746     if (s->connection->version < 12)
1747       return;
1748
1749     t = pa_tagstruct_new(NULL, 0);
1750     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1751     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1752     pa_tagstruct_putu32(t, s->index);
1753     pa_tagstruct_put_boolean(t, suspend);
1754     pa_pstream_send_tagstruct(s->connection->pstream, t);
1755 }
1756
1757 /* Called from main context */
1758 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1759     record_stream *s;
1760     pa_tagstruct *t;
1761
1762     pa_source_output_assert_ref(o);
1763     s = RECORD_STREAM(o->userdata);
1764     record_stream_assert_ref(s);
1765
1766     if (!dest)
1767         return;
1768
1769     fix_record_buffer_attr_pre(s);
1770     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1771     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1772     fix_record_buffer_attr_post(s);
1773
1774     if (s->connection->version < 12)
1775       return;
1776
1777     t = pa_tagstruct_new(NULL, 0);
1778     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1779     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1780     pa_tagstruct_putu32(t, s->index);
1781     pa_tagstruct_putu32(t, dest->index);
1782     pa_tagstruct_puts(t, dest->name);
1783     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1784
1785     if (s->connection->version >= 13) {
1786         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1787         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1788         pa_tagstruct_put_usec(t, s->configured_source_latency);
1789     }
1790
1791     pa_pstream_send_tagstruct(s->connection->pstream, t);
1792 }
1793
1794 /*** pdispatch callbacks ***/
1795
1796 static void protocol_error(pa_native_connection *c) {
1797     pa_log("protocol error, kicking client");
1798     native_connection_unlink(c);
1799 }
1800
1801 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1802 if (!(expression)) { \
1803     pa_pstream_send_error((pstream), (tag), (error)); \
1804     return; \
1805 } \
1806 } while(0);
1807
1808 static pa_tagstruct *reply_new(uint32_t tag) {
1809     pa_tagstruct *reply;
1810
1811     reply = pa_tagstruct_new(NULL, 0);
1812     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1813     pa_tagstruct_putu32(reply, tag);
1814     return reply;
1815 }
1816
1817 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1818     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1819     playback_stream *s;
1820     uint32_t sink_index, syncid, missing;
1821     pa_buffer_attr attr;
1822     const char *name = NULL, *sink_name;
1823     pa_sample_spec ss;
1824     pa_channel_map map;
1825     pa_tagstruct *reply;
1826     pa_sink *sink = NULL;
1827     pa_cvolume volume;
1828     pa_bool_t
1829         corked = FALSE,
1830         no_remap = FALSE,
1831         no_remix = FALSE,
1832         fix_format = FALSE,
1833         fix_rate = FALSE,
1834         fix_channels = FALSE,
1835         no_move = FALSE,
1836         variable_rate = FALSE,
1837         muted = FALSE,
1838         adjust_latency = FALSE,
1839         early_requests = FALSE,
1840         dont_inhibit_auto_suspend = FALSE,
1841         muted_set = FALSE,
1842         fail_on_suspend = FALSE;
1843     pa_sink_input_flags_t flags = 0;
1844     pa_proplist *p;
1845     pa_bool_t volume_set = TRUE;
1846     int ret = PA_ERR_INVALID;
1847
1848     pa_native_connection_assert_ref(c);
1849     pa_assert(t);
1850     memset(&attr, 0, sizeof(attr));
1851
1852     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1853         pa_tagstruct_get(
1854                 t,
1855                 PA_TAG_SAMPLE_SPEC, &ss,
1856                 PA_TAG_CHANNEL_MAP, &map,
1857                 PA_TAG_U32, &sink_index,
1858                 PA_TAG_STRING, &sink_name,
1859                 PA_TAG_U32, &attr.maxlength,
1860                 PA_TAG_BOOLEAN, &corked,
1861                 PA_TAG_U32, &attr.tlength,
1862                 PA_TAG_U32, &attr.prebuf,
1863                 PA_TAG_U32, &attr.minreq,
1864                 PA_TAG_U32, &syncid,
1865                 PA_TAG_CVOLUME, &volume,
1866                 PA_TAG_INVALID) < 0) {
1867
1868         protocol_error(c);
1869         return;
1870     }
1871
1872     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1873     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1874     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1875     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1876     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1877     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1878     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1879     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1880
1881     p = pa_proplist_new();
1882
1883     if (name)
1884         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1885
1886     if (c->version >= 12)  {
1887         /* Since 0.9.8 the user can ask for a couple of additional flags */
1888
1889         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1890             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1891             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1892             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1893             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1894             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1895             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1896
1897             protocol_error(c);
1898             pa_proplist_free(p);
1899             return;
1900         }
1901     }
1902
1903     if (c->version >= 13) {
1904
1905         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1906             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1907             pa_tagstruct_get_proplist(t, p) < 0) {
1908             protocol_error(c);
1909             pa_proplist_free(p);
1910             return;
1911         }
1912     }
1913
1914     if (c->version >= 14) {
1915
1916         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1917             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1918             protocol_error(c);
1919             pa_proplist_free(p);
1920             return;
1921         }
1922     }
1923
1924     if (c->version >= 15) {
1925
1926         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1927             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1928             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1929             protocol_error(c);
1930             pa_proplist_free(p);
1931             return;
1932         }
1933     }
1934
1935     if (!pa_tagstruct_eof(t)) {
1936         protocol_error(c);
1937         pa_proplist_free(p);
1938         return;
1939     }
1940
1941     if (sink_index != PA_INVALID_INDEX) {
1942
1943         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1944             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1945             pa_proplist_free(p);
1946             return;
1947         }
1948
1949     } else if (sink_name) {
1950
1951         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1952             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1953             pa_proplist_free(p);
1954             return;
1955         }
1956     }
1957
1958     flags =
1959         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1960         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1961         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1962         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1963         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1964         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1965         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1966         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1967         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1968         (fail_on_suspend ? PA_SINK_INPUT_NO_CREATE_ON_SUSPEND|PA_SINK_INPUT_KILL_ON_SUSPEND : 0);
1969
1970     /* Only since protocol version 15 there's a seperate muted_set
1971      * flag. For older versions we synthesize it here */
1972     muted_set = muted_set || muted;
1973
1974     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1975     pa_proplist_free(p);
1976
1977     CHECK_VALIDITY(c->pstream, s, tag, ret);
1978
1979     reply = reply_new(tag);
1980     pa_tagstruct_putu32(reply, s->index);
1981     pa_assert(s->sink_input);
1982     pa_tagstruct_putu32(reply, s->sink_input->index);
1983     pa_tagstruct_putu32(reply, missing);
1984
1985 /*     pa_log("initial request is %u", missing); */
1986
1987     if (c->version >= 9) {
1988         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1989
1990         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1991         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1992         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1993         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1994     }
1995
1996     if (c->version >= 12) {
1997         /* Since 0.9.8 we support sending the chosen sample
1998          * spec/channel map/device/suspend status back to the
1999          * client */
2000
2001         pa_tagstruct_put_sample_spec(reply, &ss);
2002         pa_tagstruct_put_channel_map(reply, &map);
2003
2004         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
2005         pa_tagstruct_puts(reply, s->sink_input->sink->name);
2006
2007         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
2008     }
2009
2010     if (c->version >= 13)
2011         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
2012
2013     pa_pstream_send_tagstruct(c->pstream, reply);
2014 }
2015
2016 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2017     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2018     uint32_t channel;
2019
2020     pa_native_connection_assert_ref(c);
2021     pa_assert(t);
2022
2023     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2024         !pa_tagstruct_eof(t)) {
2025         protocol_error(c);
2026         return;
2027     }
2028
2029     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2030
2031     switch (command) {
2032
2033         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2034             playback_stream *s;
2035             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2036                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2037                 return;
2038             }
2039
2040             playback_stream_unlink(s);
2041             break;
2042         }
2043
2044         case PA_COMMAND_DELETE_RECORD_STREAM: {
2045             record_stream *s;
2046             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2047                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2048                 return;
2049             }
2050
2051             record_stream_unlink(s);
2052             break;
2053         }
2054
2055         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2056             upload_stream *s;
2057
2058             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2059                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2060                 return;
2061             }
2062
2063             upload_stream_unlink(s);
2064             break;
2065         }
2066
2067         default:
2068             pa_assert_not_reached();
2069     }
2070
2071     pa_pstream_send_simple_ack(c->pstream, tag);
2072 }
2073
2074 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2075     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2076     record_stream *s;
2077     pa_buffer_attr attr;
2078     uint32_t source_index;
2079     const char *name = NULL, *source_name;
2080     pa_sample_spec ss;
2081     pa_channel_map map;
2082     pa_tagstruct *reply;
2083     pa_source *source = NULL;
2084     pa_bool_t
2085         corked = FALSE,
2086         no_remap = FALSE,
2087         no_remix = FALSE,
2088         fix_format = FALSE,
2089         fix_rate = FALSE,
2090         fix_channels = FALSE,
2091         no_move = FALSE,
2092         variable_rate = FALSE,
2093         adjust_latency = FALSE,
2094         peak_detect = FALSE,
2095         early_requests = FALSE,
2096         dont_inhibit_auto_suspend = FALSE,
2097         fail_on_suspend = FALSE;
2098     pa_source_output_flags_t flags = 0;
2099     pa_proplist *p;
2100     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2101     pa_sink_input *direct_on_input = NULL;
2102     int ret = PA_ERR_INVALID;
2103
2104     pa_native_connection_assert_ref(c);
2105     pa_assert(t);
2106
2107     memset(&attr, 0, sizeof(attr));
2108
2109     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2110         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2111         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2112         pa_tagstruct_getu32(t, &source_index) < 0 ||
2113         pa_tagstruct_gets(t, &source_name) < 0 ||
2114         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2115         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2116         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2117         protocol_error(c);
2118         return;
2119     }
2120
2121     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2122     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2123     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2124     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2125     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2126     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2127     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2128
2129     p = pa_proplist_new();
2130
2131     if (name)
2132         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2133
2134     if (c->version >= 12)  {
2135         /* Since 0.9.8 the user can ask for a couple of additional flags */
2136
2137         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2138             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2139             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2140             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2141             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2142             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2143             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2144
2145             protocol_error(c);
2146             pa_proplist_free(p);
2147             return;
2148         }
2149     }
2150
2151     if (c->version >= 13) {
2152
2153         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2154             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2155             pa_tagstruct_get_proplist(t, p) < 0 ||
2156             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2157             protocol_error(c);
2158             pa_proplist_free(p);
2159             return;
2160         }
2161     }
2162
2163     if (c->version >= 14) {
2164
2165         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2166             protocol_error(c);
2167             pa_proplist_free(p);
2168             return;
2169         }
2170     }
2171
2172     if (c->version >= 15) {
2173
2174         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2175             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2176             protocol_error(c);
2177             pa_proplist_free(p);
2178             return;
2179         }
2180     }
2181
2182     if (!pa_tagstruct_eof(t)) {
2183         protocol_error(c);
2184         pa_proplist_free(p);
2185         return;
2186     }
2187
2188     if (source_index != PA_INVALID_INDEX) {
2189
2190         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2191             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2192             pa_proplist_free(p);
2193             return;
2194         }
2195
2196     } else if (source_name) {
2197
2198         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2199             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2200             pa_proplist_free(p);
2201             return;
2202         }
2203     }
2204
2205     if (direct_on_input_idx != PA_INVALID_INDEX) {
2206
2207         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2208             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2209             pa_proplist_free(p);
2210             return;
2211         }
2212     }
2213
2214     flags =
2215         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2216         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2217         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2218         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2219         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2220         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2221         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2222         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2223         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2224         (fail_on_suspend ? PA_SOURCE_OUTPUT_NO_CREATE_ON_SUSPEND|PA_SOURCE_OUTPUT_KILL_ON_SUSPEND : 0);
2225
2226     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2227     pa_proplist_free(p);
2228
2229     CHECK_VALIDITY(c->pstream, s, tag, ret);
2230
2231     reply = reply_new(tag);
2232     pa_tagstruct_putu32(reply, s->index);
2233     pa_assert(s->source_output);
2234     pa_tagstruct_putu32(reply, s->source_output->index);
2235
2236     if (c->version >= 9) {
2237         /* Since 0.9 we support sending the buffer metrics back to the client */
2238
2239         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2240         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2241     }
2242
2243     if (c->version >= 12) {
2244         /* Since 0.9.8 we support sending the chosen sample
2245          * spec/channel map/device/suspend status back to the
2246          * client */
2247
2248         pa_tagstruct_put_sample_spec(reply, &ss);
2249         pa_tagstruct_put_channel_map(reply, &map);
2250
2251         pa_tagstruct_putu32(reply, s->source_output->source->index);
2252         pa_tagstruct_puts(reply, s->source_output->source->name);
2253
2254         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2255     }
2256
2257     if (c->version >= 13)
2258         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2259
2260     pa_pstream_send_tagstruct(c->pstream, reply);
2261 }
2262
2263 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2264     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2265     int ret;
2266
2267     pa_native_connection_assert_ref(c);
2268     pa_assert(t);
2269
2270     if (!pa_tagstruct_eof(t)) {
2271         protocol_error(c);
2272         return;
2273     }
2274
2275     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2276     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2277     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2278
2279     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2280 }
2281
2282 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2283     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2284     const void*cookie;
2285     pa_tagstruct *reply;
2286     pa_bool_t shm_on_remote = FALSE, do_shm;
2287
2288     pa_native_connection_assert_ref(c);
2289     pa_assert(t);
2290
2291     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2292         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2293         !pa_tagstruct_eof(t)) {
2294         protocol_error(c);
2295         return;
2296     }
2297
2298     /* Minimum supported version */
2299     if (c->version < 8) {
2300         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2301         return;
2302     }
2303
2304     /* Starting with protocol version 13 the MSB of the version tag
2305        reflects if shm is available for this pa_native_connection or
2306        not. */
2307     if (c->version >= 13) {
2308         shm_on_remote = !!(c->version & 0x80000000U);
2309         c->version &= 0x7FFFFFFFU;
2310     }
2311
2312     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2313
2314     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2315
2316     if (!c->authorized) {
2317         pa_bool_t success = FALSE;
2318
2319 #ifdef HAVE_CREDS
2320         const pa_creds *creds;
2321
2322         if ((creds = pa_pdispatch_creds(pd))) {
2323             if (creds->uid == getuid())
2324                 success = TRUE;
2325             else if (c->options->auth_group) {
2326                 int r;
2327                 gid_t gid;
2328
2329                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2330                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2331                 else if (gid == creds->gid)
2332                     success = TRUE;
2333
2334                 if (!success) {
2335                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2336                         pa_log_warn("Failed to check group membership.");
2337                     else if (r > 0)
2338                         success = TRUE;
2339                 }
2340             }
2341
2342             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2343                         (unsigned long) creds->uid,
2344                         (unsigned long) creds->gid,
2345                         (int) success);
2346         }
2347 #endif
2348
2349         if (!success && c->options->auth_cookie) {
2350             const uint8_t *ac;
2351
2352             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2353                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2354                     success = TRUE;
2355         }
2356
2357         if (!success) {
2358             pa_log_warn("Denied access to client with invalid authorization data.");
2359             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2360             return;
2361         }
2362
2363         c->authorized = TRUE;
2364         if (c->auth_timeout_event) {
2365             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2366             c->auth_timeout_event = NULL;
2367         }
2368     }
2369
2370     /* Enable shared memory support if possible */
2371     do_shm =
2372         pa_mempool_is_shared(c->protocol->core->mempool) &&
2373         c->is_local;
2374
2375     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2376
2377     if (do_shm)
2378         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2379             do_shm = FALSE;
2380
2381 #ifdef HAVE_CREDS
2382     if (do_shm) {
2383         /* Only enable SHM if both sides are owned by the same
2384          * user. This is a security measure because otherwise data
2385          * private to the user might leak. */
2386
2387         const pa_creds *creds;
2388         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2389             do_shm = FALSE;
2390     }
2391 #endif
2392
2393     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2394     pa_pstream_enable_shm(c->pstream, do_shm);
2395
2396     reply = reply_new(tag);
2397     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2398
2399 #ifdef HAVE_CREDS
2400 {
2401     /* SHM support is only enabled after both sides made sure they are the same user. */
2402
2403     pa_creds ucred;
2404
2405     ucred.uid = getuid();
2406     ucred.gid = getgid();
2407
2408     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2409 }
2410 #else
2411     pa_pstream_send_tagstruct(c->pstream, reply);
2412 #endif
2413 }
2414
2415 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2416     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2417     const char *name = NULL;
2418     pa_proplist *p;
2419     pa_tagstruct *reply;
2420
2421     pa_native_connection_assert_ref(c);
2422     pa_assert(t);
2423
2424     p = pa_proplist_new();
2425
2426     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2427         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2428         !pa_tagstruct_eof(t)) {
2429
2430         protocol_error(c);
2431         pa_proplist_free(p);
2432         return;
2433     }
2434
2435     if (name)
2436         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2437             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2438             pa_proplist_free(p);
2439             return;
2440         }
2441
2442     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2443     pa_proplist_free(p);
2444
2445     reply = reply_new(tag);
2446
2447     if (c->version >= 13)
2448         pa_tagstruct_putu32(reply, c->client->index);
2449
2450     pa_pstream_send_tagstruct(c->pstream, reply);
2451 }
2452
2453 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2454     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2455     const char *name;
2456     uint32_t idx = PA_IDXSET_INVALID;
2457
2458     pa_native_connection_assert_ref(c);
2459     pa_assert(t);
2460
2461     if (pa_tagstruct_gets(t, &name) < 0 ||
2462         !pa_tagstruct_eof(t)) {
2463         protocol_error(c);
2464         return;
2465     }
2466
2467     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2468     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2469
2470     if (command == PA_COMMAND_LOOKUP_SINK) {
2471         pa_sink *sink;
2472         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2473             idx = sink->index;
2474     } else {
2475         pa_source *source;
2476         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2477         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2478             idx = source->index;
2479     }
2480
2481     if (idx == PA_IDXSET_INVALID)
2482         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2483     else {
2484         pa_tagstruct *reply;
2485         reply = reply_new(tag);
2486         pa_tagstruct_putu32(reply, idx);
2487         pa_pstream_send_tagstruct(c->pstream, reply);
2488     }
2489 }
2490
2491 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2492     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2493     uint32_t idx;
2494     playback_stream *s;
2495
2496     pa_native_connection_assert_ref(c);
2497     pa_assert(t);
2498
2499     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2500         !pa_tagstruct_eof(t)) {
2501         protocol_error(c);
2502         return;
2503     }
2504
2505     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2506     s = pa_idxset_get_by_index(c->output_streams, idx);
2507     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2508     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2509
2510     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2511 }
2512
2513 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2514     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2515     pa_tagstruct *reply;
2516     const pa_mempool_stat *stat;
2517
2518     pa_native_connection_assert_ref(c);
2519     pa_assert(t);
2520
2521     if (!pa_tagstruct_eof(t)) {
2522         protocol_error(c);
2523         return;
2524     }
2525
2526     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2527
2528     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2529
2530     reply = reply_new(tag);
2531     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2532     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2533     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2534     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2535     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2536     pa_pstream_send_tagstruct(c->pstream, reply);
2537 }
2538
2539 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2540     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2541     pa_tagstruct *reply;
2542     playback_stream *s;
2543     struct timeval tv, now;
2544     uint32_t idx;
2545
2546     pa_native_connection_assert_ref(c);
2547     pa_assert(t);
2548
2549     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2550         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2551         !pa_tagstruct_eof(t)) {
2552         protocol_error(c);
2553         return;
2554     }
2555
2556     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2557     s = pa_idxset_get_by_index(c->output_streams, idx);
2558     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2559     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2560
2561     /* Get an atomic snapshot of all timing parameters */
2562     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2563
2564     reply = reply_new(tag);
2565     pa_tagstruct_put_usec(reply,
2566                           s->current_sink_latency +
2567                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sink->sample_spec));
2568     pa_tagstruct_put_usec(reply, 0);
2569     pa_tagstruct_put_boolean(reply,
2570                              s->playing_for > 0 &&
2571                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2572                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2573     pa_tagstruct_put_timeval(reply, &tv);
2574     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2575     pa_tagstruct_puts64(reply, s->write_index);
2576     pa_tagstruct_puts64(reply, s->read_index);
2577
2578     if (c->version >= 13) {
2579         pa_tagstruct_putu64(reply, s->underrun_for);
2580         pa_tagstruct_putu64(reply, s->playing_for);
2581     }
2582
2583     pa_pstream_send_tagstruct(c->pstream, reply);
2584 }
2585
2586 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2587     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2588     pa_tagstruct *reply;
2589     record_stream *s;
2590     struct timeval tv, now;
2591     uint32_t idx;
2592
2593     pa_native_connection_assert_ref(c);
2594     pa_assert(t);
2595
2596     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2597         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2598         !pa_tagstruct_eof(t)) {
2599         protocol_error(c);
2600         return;
2601     }
2602
2603     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2604     s = pa_idxset_get_by_index(c->record_streams, idx);
2605     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2606
2607     /* Get an atomic snapshot of all timing parameters */
2608     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2609
2610     reply = reply_new(tag);
2611     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2612     pa_tagstruct_put_usec(reply,
2613                           s->current_source_latency +
2614                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2615     pa_tagstruct_put_boolean(reply,
2616                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2617                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2618     pa_tagstruct_put_timeval(reply, &tv);
2619     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2620     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2621     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2622     pa_pstream_send_tagstruct(c->pstream, reply);
2623 }
2624
2625 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2626     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2627     upload_stream *s;
2628     uint32_t length;
2629     const char *name = NULL;
2630     pa_sample_spec ss;
2631     pa_channel_map map;
2632     pa_tagstruct *reply;
2633     pa_proplist *p;
2634
2635     pa_native_connection_assert_ref(c);
2636     pa_assert(t);
2637
2638     if (pa_tagstruct_gets(t, &name) < 0 ||
2639         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2640         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2641         pa_tagstruct_getu32(t, &length) < 0) {
2642         protocol_error(c);
2643         return;
2644     }
2645
2646     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2647     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2648     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2649     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2650     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2651     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2652
2653     p = pa_proplist_new();
2654
2655     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2656         !pa_tagstruct_eof(t)) {
2657
2658         protocol_error(c);
2659         pa_proplist_free(p);
2660         return;
2661     }
2662
2663     if (c->version < 13)
2664         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2665     else if (!name)
2666         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2667             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2668
2669     if (!name || !pa_namereg_is_valid_name(name)) {
2670         pa_proplist_free(p);
2671         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2672     }
2673
2674     s = upload_stream_new(c, &ss, &map, name, length, p);
2675     pa_proplist_free(p);
2676
2677     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2678
2679     reply = reply_new(tag);
2680     pa_tagstruct_putu32(reply, s->index);
2681     pa_tagstruct_putu32(reply, length);
2682     pa_pstream_send_tagstruct(c->pstream, reply);
2683 }
2684
2685 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2686     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2687     uint32_t channel;
2688     upload_stream *s;
2689     uint32_t idx;
2690
2691     pa_native_connection_assert_ref(c);
2692     pa_assert(t);
2693
2694     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2695         !pa_tagstruct_eof(t)) {
2696         protocol_error(c);
2697         return;
2698     }
2699
2700     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2701
2702     s = pa_idxset_get_by_index(c->output_streams, channel);
2703     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2704     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2705
2706     if (!s->memchunk.memblock)
2707         pa_pstream_send_error(c->pstream, tag, PA_ERR_TOOLARGE);
2708     else if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2709         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2710     else
2711         pa_pstream_send_simple_ack(c->pstream, tag);
2712
2713     upload_stream_unlink(s);
2714 }
2715
2716 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2717     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2718     uint32_t sink_index;
2719     pa_volume_t volume;
2720     pa_sink *sink;
2721     const char *name, *sink_name;
2722     uint32_t idx;
2723     pa_proplist *p;
2724     pa_tagstruct *reply;
2725
2726     pa_native_connection_assert_ref(c);
2727     pa_assert(t);
2728
2729     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2730
2731     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2732         pa_tagstruct_gets(t, &sink_name) < 0 ||
2733         pa_tagstruct_getu32(t, &volume) < 0 ||
2734         pa_tagstruct_gets(t, &name) < 0) {
2735         protocol_error(c);
2736         return;
2737     }
2738
2739     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2740     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2741     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2742     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2743
2744     if (sink_index != PA_INVALID_INDEX)
2745         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2746     else
2747         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2748
2749     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2750
2751     p = pa_proplist_new();
2752
2753     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2754         !pa_tagstruct_eof(t)) {
2755         protocol_error(c);
2756         pa_proplist_free(p);
2757         return;
2758     }
2759
2760     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2761
2762     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2763         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2764         pa_proplist_free(p);
2765         return;
2766     }
2767
2768     pa_proplist_free(p);
2769
2770     reply = reply_new(tag);
2771
2772     if (c->version >= 13)
2773         pa_tagstruct_putu32(reply, idx);
2774
2775     pa_pstream_send_tagstruct(c->pstream, reply);
2776 }
2777
2778 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2779     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2780     const char *name;
2781
2782     pa_native_connection_assert_ref(c);
2783     pa_assert(t);
2784
2785     if (pa_tagstruct_gets(t, &name) < 0 ||
2786         !pa_tagstruct_eof(t)) {
2787         protocol_error(c);
2788         return;
2789     }
2790
2791     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2792     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2793
2794     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2795         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2796         return;
2797     }
2798
2799     pa_pstream_send_simple_ack(c->pstream, tag);
2800 }
2801
2802 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2803     pa_assert(c);
2804     pa_assert(fixed);
2805     pa_assert(original);
2806
2807     *fixed = *original;
2808
2809     if (c->version < 12) {
2810         /* Before protocol version 12 we didn't support S32 samples,
2811          * so we need to lie about this to the client */
2812
2813         if (fixed->format == PA_SAMPLE_S32LE)
2814             fixed->format = PA_SAMPLE_FLOAT32LE;
2815         if (fixed->format == PA_SAMPLE_S32BE)
2816             fixed->format = PA_SAMPLE_FLOAT32BE;
2817     }
2818
2819     if (c->version < 15) {
2820         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2821             fixed->format = PA_SAMPLE_FLOAT32LE;
2822         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2823             fixed->format = PA_SAMPLE_FLOAT32BE;
2824     }
2825 }
2826
2827 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2828     pa_sample_spec fixed_ss;
2829
2830     pa_assert(t);
2831     pa_sink_assert_ref(sink);
2832
2833     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2834
2835     pa_tagstruct_put(
2836         t,
2837         PA_TAG_U32, sink->index,
2838         PA_TAG_STRING, sink->name,
2839         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2840         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2841         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2842         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2843         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
2844         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2845         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2846         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2847         PA_TAG_USEC, pa_sink_get_latency(sink),
2848         PA_TAG_STRING, sink->driver,
2849         PA_TAG_U32, sink->flags,
2850         PA_TAG_INVALID);
2851
2852     if (c->version >= 13) {
2853         pa_tagstruct_put_proplist(t, sink->proplist);
2854         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2855     }
2856
2857     if (c->version >= 15) {
2858         pa_tagstruct_put_volume(t, sink->base_volume);
2859         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2860             pa_log_error("Internal sink state is invalid.");
2861         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2862         pa_tagstruct_putu32(t, sink->n_volume_steps);
2863         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2864     }
2865
2866     if (c->version >= 16) {
2867         pa_tagstruct_putu32(t, sink->ports ? pa_hashmap_size(sink->ports) : 0);
2868
2869         if (sink->ports) {
2870             void *state;
2871             pa_device_port *p;
2872
2873             PA_HASHMAP_FOREACH(p, sink->ports, state) {
2874                 pa_tagstruct_puts(t, p->name);
2875                 pa_tagstruct_puts(t, p->description);
2876                 pa_tagstruct_putu32(t, p->priority);
2877             }
2878         }
2879
2880         pa_tagstruct_puts(t, sink->active_port ? sink->active_port->name : NULL);
2881     }
2882 }
2883
2884 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2885     pa_sample_spec fixed_ss;
2886
2887     pa_assert(t);
2888     pa_source_assert_ref(source);
2889
2890     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2891
2892     pa_tagstruct_put(
2893         t,
2894         PA_TAG_U32, source->index,
2895         PA_TAG_STRING, source->name,
2896         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2897         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2898         PA_TAG_CHANNEL_MAP, &source->channel_map,
2899         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2900         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2901         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2902         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2903         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2904         PA_TAG_USEC, pa_source_get_latency(source),
2905         PA_TAG_STRING, source->driver,
2906         PA_TAG_U32, source->flags,
2907         PA_TAG_INVALID);
2908
2909     if (c->version >= 13) {
2910         pa_tagstruct_put_proplist(t, source->proplist);
2911         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2912     }
2913
2914     if (c->version >= 15) {
2915         pa_tagstruct_put_volume(t, source->base_volume);
2916         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2917             pa_log_error("Internal source state is invalid.");
2918         pa_tagstruct_putu32(t, pa_source_get_state(source));
2919         pa_tagstruct_putu32(t, source->n_volume_steps);
2920         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2921     }
2922
2923     if (c->version >= 16) {
2924
2925         pa_tagstruct_putu32(t, source->ports ? pa_hashmap_size(source->ports) : 0);
2926
2927         if (source->ports) {
2928             void *state;
2929             pa_device_port *p;
2930
2931             PA_HASHMAP_FOREACH(p, source->ports, state) {
2932                 pa_tagstruct_puts(t, p->name);
2933                 pa_tagstruct_puts(t, p->description);
2934                 pa_tagstruct_putu32(t, p->priority);
2935             }
2936         }
2937
2938         pa_tagstruct_puts(t, source->active_port ? source->active_port->name : NULL);
2939     }
2940 }
2941
2942 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2943     pa_assert(t);
2944     pa_assert(client);
2945
2946     pa_tagstruct_putu32(t, client->index);
2947     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2948     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2949     pa_tagstruct_puts(t, client->driver);
2950
2951     if (c->version >= 13)
2952         pa_tagstruct_put_proplist(t, client->proplist);
2953 }
2954
2955 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2956     void *state = NULL;
2957     pa_card_profile *p;
2958
2959     pa_assert(t);
2960     pa_assert(card);
2961
2962     pa_tagstruct_putu32(t, card->index);
2963     pa_tagstruct_puts(t, card->name);
2964     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2965     pa_tagstruct_puts(t, card->driver);
2966
2967     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2968
2969     if (card->profiles) {
2970         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2971             pa_tagstruct_puts(t, p->name);
2972             pa_tagstruct_puts(t, p->description);
2973             pa_tagstruct_putu32(t, p->n_sinks);
2974             pa_tagstruct_putu32(t, p->n_sources);
2975             pa_tagstruct_putu32(t, p->priority);
2976         }
2977     }
2978
2979     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2980     pa_tagstruct_put_proplist(t, card->proplist);
2981 }
2982
2983 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2984     pa_assert(t);
2985     pa_assert(module);
2986
2987     pa_tagstruct_putu32(t, module->index);
2988     pa_tagstruct_puts(t, module->name);
2989     pa_tagstruct_puts(t, module->argument);
2990     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2991
2992     if (c->version < 15)
2993         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2994
2995     if (c->version >= 15)
2996         pa_tagstruct_put_proplist(t, module->proplist);
2997 }
2998
2999 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
3000     pa_sample_spec fixed_ss;
3001     pa_usec_t sink_latency;
3002     pa_cvolume v;
3003
3004     pa_assert(t);
3005     pa_sink_input_assert_ref(s);
3006
3007     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3008
3009     pa_tagstruct_putu32(t, s->index);
3010     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3011     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3012     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3013     pa_tagstruct_putu32(t, s->sink->index);
3014     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3015     pa_tagstruct_put_channel_map(t, &s->channel_map);
3016     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s, &v, TRUE));
3017     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
3018     pa_tagstruct_put_usec(t, sink_latency);
3019     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
3020     pa_tagstruct_puts(t, s->driver);
3021     if (c->version >= 11)
3022         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
3023     if (c->version >= 13)
3024         pa_tagstruct_put_proplist(t, s->proplist);
3025 }
3026
3027 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
3028     pa_sample_spec fixed_ss;
3029     pa_usec_t source_latency;
3030
3031     pa_assert(t);
3032     pa_source_output_assert_ref(s);
3033
3034     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
3035
3036     pa_tagstruct_putu32(t, s->index);
3037     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
3038     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
3039     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
3040     pa_tagstruct_putu32(t, s->source->index);
3041     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3042     pa_tagstruct_put_channel_map(t, &s->channel_map);
3043     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
3044     pa_tagstruct_put_usec(t, source_latency);
3045     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
3046     pa_tagstruct_puts(t, s->driver);
3047
3048     if (c->version >= 13)
3049         pa_tagstruct_put_proplist(t, s->proplist);
3050 }
3051
3052 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
3053     pa_sample_spec fixed_ss;
3054     pa_cvolume v;
3055
3056     pa_assert(t);
3057     pa_assert(e);
3058
3059     if (e->memchunk.memblock)
3060         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3061     else
3062         memset(&fixed_ss, 0, sizeof(fixed_ss));
3063
3064     pa_tagstruct_putu32(t, e->index);
3065     pa_tagstruct_puts(t, e->name);
3066
3067     if (e->volume_is_set)
3068         v = e->volume;
3069     else
3070         pa_cvolume_init(&v);
3071
3072     pa_tagstruct_put_cvolume(t, &v);
3073     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3074     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3075     pa_tagstruct_put_channel_map(t, &e->channel_map);
3076     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3077     pa_tagstruct_put_boolean(t, e->lazy);
3078     pa_tagstruct_puts(t, e->filename);
3079
3080     if (c->version >= 13)
3081         pa_tagstruct_put_proplist(t, e->proplist);
3082 }
3083
3084 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3085     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3086     uint32_t idx;
3087     pa_sink *sink = NULL;
3088     pa_source *source = NULL;
3089     pa_client *client = NULL;
3090     pa_card *card = NULL;
3091     pa_module *module = NULL;
3092     pa_sink_input *si = NULL;
3093     pa_source_output *so = NULL;
3094     pa_scache_entry *sce = NULL;
3095     const char *name = NULL;
3096     pa_tagstruct *reply;
3097
3098     pa_native_connection_assert_ref(c);
3099     pa_assert(t);
3100
3101     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3102         (command != PA_COMMAND_GET_CLIENT_INFO &&
3103          command != PA_COMMAND_GET_MODULE_INFO &&
3104          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3105          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3106          pa_tagstruct_gets(t, &name) < 0) ||
3107         !pa_tagstruct_eof(t)) {
3108         protocol_error(c);
3109         return;
3110     }
3111
3112     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3113     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3114     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3115     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3116     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3117
3118     if (command == PA_COMMAND_GET_SINK_INFO) {
3119         if (idx != PA_INVALID_INDEX)
3120             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3121         else
3122             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3123     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3124         if (idx != PA_INVALID_INDEX)
3125             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3126         else
3127             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3128     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3129         if (idx != PA_INVALID_INDEX)
3130             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3131         else
3132             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3133     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3134         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3135     else if (command == PA_COMMAND_GET_MODULE_INFO)
3136         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3137     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3138         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3139     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3140         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3141     else {
3142         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3143         if (idx != PA_INVALID_INDEX)
3144             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3145         else
3146             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3147     }
3148
3149     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3150         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3151         return;
3152     }
3153
3154     reply = reply_new(tag);
3155     if (sink)
3156         sink_fill_tagstruct(c, reply, sink);
3157     else if (source)
3158         source_fill_tagstruct(c, reply, source);
3159     else if (client)
3160         client_fill_tagstruct(c, reply, client);
3161     else if (card)
3162         card_fill_tagstruct(c, reply, card);
3163     else if (module)
3164         module_fill_tagstruct(c, reply, module);
3165     else if (si)
3166         sink_input_fill_tagstruct(c, reply, si);
3167     else if (so)
3168         source_output_fill_tagstruct(c, reply, so);
3169     else
3170         scache_fill_tagstruct(c, reply, sce);
3171     pa_pstream_send_tagstruct(c->pstream, reply);
3172 }
3173
3174 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3175     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3176     pa_idxset *i;
3177     uint32_t idx;
3178     void *p;
3179     pa_tagstruct *reply;
3180
3181     pa_native_connection_assert_ref(c);
3182     pa_assert(t);
3183
3184     if (!pa_tagstruct_eof(t)) {
3185         protocol_error(c);
3186         return;
3187     }
3188
3189     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3190
3191     reply = reply_new(tag);
3192
3193     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3194         i = c->protocol->core->sinks;
3195     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3196         i = c->protocol->core->sources;
3197     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3198         i = c->protocol->core->clients;
3199     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3200         i = c->protocol->core->cards;
3201     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3202         i = c->protocol->core->modules;
3203     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3204         i = c->protocol->core->sink_inputs;
3205     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3206         i = c->protocol->core->source_outputs;
3207     else {
3208         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3209         i = c->protocol->core->scache;
3210     }
3211
3212     if (i) {
3213         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3214             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3215                 sink_fill_tagstruct(c, reply, p);
3216             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3217                 source_fill_tagstruct(c, reply, p);
3218             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3219                 client_fill_tagstruct(c, reply, p);
3220             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3221                 card_fill_tagstruct(c, reply, p);
3222             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3223                 module_fill_tagstruct(c, reply, p);
3224             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3225                 sink_input_fill_tagstruct(c, reply, p);
3226             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3227                 source_output_fill_tagstruct(c, reply, p);
3228             else {
3229                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3230                 scache_fill_tagstruct(c, reply, p);
3231             }
3232         }
3233     }
3234
3235     pa_pstream_send_tagstruct(c->pstream, reply);
3236 }
3237
3238 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3239     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3240     pa_tagstruct *reply;
3241     pa_sink *def_sink;
3242     pa_source *def_source;
3243     pa_sample_spec fixed_ss;
3244     char *h, *u;
3245
3246     pa_native_connection_assert_ref(c);
3247     pa_assert(t);
3248
3249     if (!pa_tagstruct_eof(t)) {
3250         protocol_error(c);
3251         return;
3252     }
3253
3254     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3255
3256     reply = reply_new(tag);
3257     pa_tagstruct_puts(reply, PACKAGE_NAME);
3258     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3259
3260     u = pa_get_user_name_malloc();
3261     pa_tagstruct_puts(reply, u);
3262     pa_xfree(u);
3263
3264     h = pa_get_host_name_malloc();
3265     pa_tagstruct_puts(reply, h);
3266     pa_xfree(h);
3267
3268     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3269     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3270
3271     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3272     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3273     def_source = pa_namereg_get_default_source(c->protocol->core);
3274     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3275
3276     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3277
3278     if (c->version >= 15)
3279         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3280
3281     pa_pstream_send_tagstruct(c->pstream, reply);
3282 }
3283
3284 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3285     pa_tagstruct *t;
3286     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3287
3288     pa_native_connection_assert_ref(c);
3289
3290     t = pa_tagstruct_new(NULL, 0);
3291     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3292     pa_tagstruct_putu32(t, (uint32_t) -1);
3293     pa_tagstruct_putu32(t, e);
3294     pa_tagstruct_putu32(t, idx);
3295     pa_pstream_send_tagstruct(c->pstream, t);
3296 }
3297
3298 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3299     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3300     pa_subscription_mask_t m;
3301
3302     pa_native_connection_assert_ref(c);
3303     pa_assert(t);
3304
3305     if (pa_tagstruct_getu32(t, &m) < 0 ||
3306         !pa_tagstruct_eof(t)) {
3307         protocol_error(c);
3308         return;
3309     }
3310
3311     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3312     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3313
3314     if (c->subscription)
3315         pa_subscription_free(c->subscription);
3316
3317     if (m != 0) {
3318         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3319         pa_assert(c->subscription);
3320     } else
3321         c->subscription = NULL;
3322
3323     pa_pstream_send_simple_ack(c->pstream, tag);
3324 }
3325
3326 static void command_set_volume(
3327         pa_pdispatch *pd,
3328         uint32_t command,
3329         uint32_t tag,
3330         pa_tagstruct *t,
3331         void *userdata) {
3332
3333     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3334     uint32_t idx;
3335     pa_cvolume volume;
3336     pa_sink *sink = NULL;
3337     pa_source *source = NULL;
3338     pa_sink_input *si = NULL;
3339     const char *name = NULL;
3340     const char *client_name;
3341
3342     pa_native_connection_assert_ref(c);
3343     pa_assert(t);
3344
3345     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3346         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3347         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3348         pa_tagstruct_get_cvolume(t, &volume) ||
3349         !pa_tagstruct_eof(t)) {
3350         protocol_error(c);
3351         return;
3352     }
3353
3354     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3355     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3356     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3357     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3358     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3359     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3360
3361     switch (command) {
3362
3363         case PA_COMMAND_SET_SINK_VOLUME:
3364             if (idx != PA_INVALID_INDEX)
3365                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3366             else
3367                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3368             break;
3369
3370         case PA_COMMAND_SET_SOURCE_VOLUME:
3371             if (idx != PA_INVALID_INDEX)
3372                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3373             else
3374                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3375             break;
3376
3377         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3378             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3379             break;
3380
3381         default:
3382             pa_assert_not_reached();
3383     }
3384
3385     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3386
3387     client_name = pa_strnull(pa_proplist_gets(c->client->proplist, PA_PROP_APPLICATION_PROCESS_BINARY));
3388
3389     if (sink) {
3390         pa_log_debug("Client %s changes volume of sink %s.", client_name, sink->name);
3391         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3392     } else if (source) {
3393         pa_log_debug("Client %s changes volume of sink %s.", client_name, source->name);
3394         pa_source_set_volume(source, &volume, TRUE);
3395     } else if (si) {
3396         pa_log_debug("Client %s changes volume of sink %s.",
3397                      client_name,
3398                      pa_strnull(pa_proplist_gets(si->proplist, PA_PROP_MEDIA_NAME)));
3399         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3400     }
3401
3402     pa_pstream_send_simple_ack(c->pstream, tag);
3403 }
3404
3405 static void command_set_mute(
3406         pa_pdispatch *pd,
3407         uint32_t command,
3408         uint32_t tag,
3409         pa_tagstruct *t,
3410         void *userdata) {
3411
3412     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3413     uint32_t idx;
3414     pa_bool_t mute;
3415     pa_sink *sink = NULL;
3416     pa_source *source = NULL;
3417     pa_sink_input *si = NULL;
3418     const char *name = NULL;
3419
3420     pa_native_connection_assert_ref(c);
3421     pa_assert(t);
3422
3423     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3424         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3425         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3426         pa_tagstruct_get_boolean(t, &mute) ||
3427         !pa_tagstruct_eof(t)) {
3428         protocol_error(c);
3429         return;
3430     }
3431
3432     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3433     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3434     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3435     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3436     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3437
3438     switch (command) {
3439
3440         case PA_COMMAND_SET_SINK_MUTE:
3441
3442             if (idx != PA_INVALID_INDEX)
3443                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3444             else
3445                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3446
3447             break;
3448
3449         case PA_COMMAND_SET_SOURCE_MUTE:
3450             if (idx != PA_INVALID_INDEX)
3451                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3452             else
3453                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3454
3455             break;
3456
3457         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3458             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3459             break;
3460
3461         default:
3462             pa_assert_not_reached();
3463     }
3464
3465     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3466
3467     if (sink)
3468         pa_sink_set_mute(sink, mute, TRUE);
3469     else if (source)
3470         pa_source_set_mute(source, mute, TRUE);
3471     else if (si)
3472         pa_sink_input_set_mute(si, mute, TRUE);
3473
3474     pa_pstream_send_simple_ack(c->pstream, tag);
3475 }
3476
3477 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3478     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3479     uint32_t idx;
3480     pa_bool_t b;
3481     playback_stream *s;
3482
3483     pa_native_connection_assert_ref(c);
3484     pa_assert(t);
3485
3486     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3487         pa_tagstruct_get_boolean(t, &b) < 0 ||
3488         !pa_tagstruct_eof(t)) {
3489         protocol_error(c);
3490         return;
3491     }
3492
3493     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3494     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3495     s = pa_idxset_get_by_index(c->output_streams, idx);
3496     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3497     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3498
3499     pa_sink_input_cork(s->sink_input, b);
3500
3501     if (b)
3502         s->is_underrun = TRUE;
3503
3504     pa_pstream_send_simple_ack(c->pstream, tag);
3505 }
3506
3507 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3508     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3509     uint32_t idx;
3510     playback_stream *s;
3511
3512     pa_native_connection_assert_ref(c);
3513     pa_assert(t);
3514
3515     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3516         !pa_tagstruct_eof(t)) {
3517         protocol_error(c);
3518         return;
3519     }
3520
3521     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3522     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3523     s = pa_idxset_get_by_index(c->output_streams, idx);
3524     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3525     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3526
3527     switch (command) {
3528         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3529             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3530             break;
3531
3532         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3533             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3534             break;
3535
3536         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3537             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3538             break;
3539
3540         default:
3541             pa_assert_not_reached();
3542     }
3543
3544     pa_pstream_send_simple_ack(c->pstream, tag);
3545 }
3546
3547 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3548     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3549     uint32_t idx;
3550     record_stream *s;
3551     pa_bool_t b;
3552
3553     pa_native_connection_assert_ref(c);
3554     pa_assert(t);
3555
3556     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3557         pa_tagstruct_get_boolean(t, &b) < 0 ||
3558         !pa_tagstruct_eof(t)) {
3559         protocol_error(c);
3560         return;
3561     }
3562
3563     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3564     s = pa_idxset_get_by_index(c->record_streams, idx);
3565     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3566
3567     pa_source_output_cork(s->source_output, b);
3568     pa_memblockq_prebuf_force(s->memblockq);
3569     pa_pstream_send_simple_ack(c->pstream, tag);
3570 }
3571
3572 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3573     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3574     uint32_t idx;
3575     record_stream *s;
3576
3577     pa_native_connection_assert_ref(c);
3578     pa_assert(t);
3579
3580     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3581         !pa_tagstruct_eof(t)) {
3582         protocol_error(c);
3583         return;
3584     }
3585
3586     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3587     s = pa_idxset_get_by_index(c->record_streams, idx);
3588     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3589
3590     pa_memblockq_flush_read(s->memblockq);
3591     pa_pstream_send_simple_ack(c->pstream, tag);
3592 }
3593
3594 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3595     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3596     uint32_t idx;
3597     pa_buffer_attr a;
3598     pa_tagstruct *reply;
3599
3600     pa_native_connection_assert_ref(c);
3601     pa_assert(t);
3602
3603     memset(&a, 0, sizeof(a));
3604
3605     if (pa_tagstruct_getu32(t, &idx) < 0) {
3606         protocol_error(c);
3607         return;
3608     }
3609
3610     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3611
3612     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3613         playback_stream *s;
3614         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3615
3616         s = pa_idxset_get_by_index(c->output_streams, idx);
3617         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3618         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3619
3620         if (pa_tagstruct_get(
3621                     t,
3622                     PA_TAG_U32, &a.maxlength,
3623                     PA_TAG_U32, &a.tlength,
3624                     PA_TAG_U32, &a.prebuf,
3625                     PA_TAG_U32, &a.minreq,
3626                     PA_TAG_INVALID) < 0 ||
3627             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3628             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3629             !pa_tagstruct_eof(t)) {
3630             protocol_error(c);
3631             return;
3632         }
3633
3634         s->adjust_latency = adjust_latency;
3635         s->early_requests = early_requests;
3636         s->buffer_attr = a;
3637
3638         fix_playback_buffer_attr(s);
3639         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3640
3641         reply = reply_new(tag);
3642         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3643         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3644         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3645         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3646
3647         if (c->version >= 13)
3648             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3649
3650     } else {
3651         record_stream *s;
3652         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3653         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3654
3655         s = pa_idxset_get_by_index(c->record_streams, idx);
3656         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3657
3658         if (pa_tagstruct_get(
3659                     t,
3660                     PA_TAG_U32, &a.maxlength,
3661                     PA_TAG_U32, &a.fragsize,
3662                     PA_TAG_INVALID) < 0 ||
3663             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3664             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3665             !pa_tagstruct_eof(t)) {
3666             protocol_error(c);
3667             return;
3668         }
3669
3670         s->adjust_latency = adjust_latency;
3671         s->early_requests = early_requests;
3672         s->buffer_attr = a;
3673
3674         fix_record_buffer_attr_pre(s);
3675         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3676         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3677         fix_record_buffer_attr_post(s);
3678
3679         reply = reply_new(tag);
3680         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3681         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3682
3683         if (c->version >= 13)
3684             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3685     }
3686
3687     pa_pstream_send_tagstruct(c->pstream, reply);
3688 }
3689
3690 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3691     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3692     uint32_t idx;
3693     uint32_t rate;
3694
3695     pa_native_connection_assert_ref(c);
3696     pa_assert(t);
3697
3698     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3699         pa_tagstruct_getu32(t, &rate) < 0 ||
3700         !pa_tagstruct_eof(t)) {
3701         protocol_error(c);
3702         return;
3703     }
3704
3705     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3706     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3707
3708     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3709         playback_stream *s;
3710
3711         s = pa_idxset_get_by_index(c->output_streams, idx);
3712         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3713         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3714
3715         pa_sink_input_set_rate(s->sink_input, rate);
3716
3717     } else {
3718         record_stream *s;
3719         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3720
3721         s = pa_idxset_get_by_index(c->record_streams, idx);
3722         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3723
3724         pa_source_output_set_rate(s->source_output, rate);
3725     }
3726
3727     pa_pstream_send_simple_ack(c->pstream, tag);
3728 }
3729
3730 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3731     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3732     uint32_t idx;
3733     uint32_t mode;
3734     pa_proplist *p;
3735
3736     pa_native_connection_assert_ref(c);
3737     pa_assert(t);
3738
3739     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3740
3741     p = pa_proplist_new();
3742
3743     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3744
3745         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3746             pa_tagstruct_get_proplist(t, p) < 0 ||
3747             !pa_tagstruct_eof(t)) {
3748             protocol_error(c);
3749             pa_proplist_free(p);
3750             return;
3751         }
3752
3753     } else {
3754
3755         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3756             pa_tagstruct_getu32(t, &mode) < 0 ||
3757             pa_tagstruct_get_proplist(t, p) < 0 ||
3758             !pa_tagstruct_eof(t)) {
3759             protocol_error(c);
3760             pa_proplist_free(p);
3761             return;
3762         }
3763     }
3764
3765     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3766         pa_proplist_free(p);
3767         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3768     }
3769
3770     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3771         playback_stream *s;
3772
3773         s = pa_idxset_get_by_index(c->output_streams, idx);
3774         if (!s || !playback_stream_isinstance(s)) {
3775             pa_proplist_free(p);
3776             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3777         }
3778         pa_sink_input_update_proplist(s->sink_input, mode, p);
3779
3780     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3781         record_stream *s;
3782
3783         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3784             pa_proplist_free(p);
3785             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3786         }
3787         pa_source_output_update_proplist(s->source_output, mode, p);
3788
3789     } else {
3790         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3791
3792         pa_client_update_proplist(c->client, mode, p);
3793     }
3794
3795     pa_pstream_send_simple_ack(c->pstream, tag);
3796     pa_proplist_free(p);
3797 }
3798
3799 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3800     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3801     uint32_t idx;
3802     unsigned changed = 0;
3803     pa_proplist *p;
3804     pa_strlist *l = NULL;
3805
3806     pa_native_connection_assert_ref(c);
3807     pa_assert(t);
3808
3809     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3810
3811     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3812
3813         if (pa_tagstruct_getu32(t, &idx) < 0) {
3814             protocol_error(c);
3815             return;
3816         }
3817     }
3818
3819     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3820         playback_stream *s;
3821
3822         s = pa_idxset_get_by_index(c->output_streams, idx);
3823         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3824         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3825
3826         p = s->sink_input->proplist;
3827
3828     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3829         record_stream *s;
3830
3831         s = pa_idxset_get_by_index(c->record_streams, idx);
3832         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3833
3834         p = s->source_output->proplist;
3835     } else {
3836         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3837
3838         p = c->client->proplist;
3839     }
3840
3841     for (;;) {
3842         const char *k;
3843
3844         if (pa_tagstruct_gets(t, &k) < 0) {
3845             protocol_error(c);
3846             pa_strlist_free(l);
3847             return;
3848         }
3849
3850         if (!k)
3851             break;
3852
3853         l = pa_strlist_prepend(l, k);
3854     }
3855
3856     if (!pa_tagstruct_eof(t)) {
3857         protocol_error(c);
3858         pa_strlist_free(l);
3859         return;
3860     }
3861
3862     for (;;) {
3863         char *z;
3864
3865         l = pa_strlist_pop(l, &z);
3866
3867         if (!z)
3868             break;
3869
3870         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3871         pa_xfree(z);
3872     }
3873
3874     pa_pstream_send_simple_ack(c->pstream, tag);
3875
3876     if (changed) {
3877         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3878             playback_stream *s;
3879
3880             s = pa_idxset_get_by_index(c->output_streams, idx);
3881             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3882
3883         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3884             record_stream *s;
3885
3886             s = pa_idxset_get_by_index(c->record_streams, idx);
3887             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3888
3889         } else {
3890             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3891             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3892         }
3893     }
3894 }
3895
3896 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3897     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3898     const char *s;
3899
3900     pa_native_connection_assert_ref(c);
3901     pa_assert(t);
3902
3903     if (pa_tagstruct_gets(t, &s) < 0 ||
3904         !pa_tagstruct_eof(t)) {
3905         protocol_error(c);
3906         return;
3907     }
3908
3909     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3910     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3911
3912     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3913         pa_source *source;
3914
3915         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3916         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3917
3918         pa_namereg_set_default_source(c->protocol->core, source);
3919     } else {
3920         pa_sink *sink;
3921         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3922
3923         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3924         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3925
3926         pa_namereg_set_default_sink(c->protocol->core, sink);
3927     }
3928
3929     pa_pstream_send_simple_ack(c->pstream, tag);
3930 }
3931
3932 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3933     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3934     uint32_t idx;
3935     const char *name;
3936
3937     pa_native_connection_assert_ref(c);
3938     pa_assert(t);
3939
3940     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3941         pa_tagstruct_gets(t, &name) < 0 ||
3942         !pa_tagstruct_eof(t)) {
3943         protocol_error(c);
3944         return;
3945     }
3946
3947     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3948     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3949
3950     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3951         playback_stream *s;
3952
3953         s = pa_idxset_get_by_index(c->output_streams, idx);
3954         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3955         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3956
3957         pa_sink_input_set_name(s->sink_input, name);
3958
3959     } else {
3960         record_stream *s;
3961         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3962
3963         s = pa_idxset_get_by_index(c->record_streams, idx);
3964         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3965
3966         pa_source_output_set_name(s->source_output, name);
3967     }
3968
3969     pa_pstream_send_simple_ack(c->pstream, tag);
3970 }
3971
3972 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3973     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3974     uint32_t idx;
3975
3976     pa_native_connection_assert_ref(c);
3977     pa_assert(t);
3978
3979     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3980         !pa_tagstruct_eof(t)) {
3981         protocol_error(c);
3982         return;
3983     }
3984
3985     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3986
3987     if (command == PA_COMMAND_KILL_CLIENT) {
3988         pa_client *client;
3989
3990         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3991         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3992
3993         pa_native_connection_ref(c);
3994         pa_client_kill(client);
3995
3996     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3997         pa_sink_input *s;
3998
3999         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4000         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4001
4002         pa_native_connection_ref(c);
4003         pa_sink_input_kill(s);
4004     } else {
4005         pa_source_output *s;
4006
4007         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
4008
4009         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4010         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
4011
4012         pa_native_connection_ref(c);
4013         pa_source_output_kill(s);
4014     }
4015
4016     pa_pstream_send_simple_ack(c->pstream, tag);
4017     pa_native_connection_unref(c);
4018 }
4019
4020 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4021     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4022     pa_module *m;
4023     const char *name, *argument;
4024     pa_tagstruct *reply;
4025
4026     pa_native_connection_assert_ref(c);
4027     pa_assert(t);
4028
4029     if (pa_tagstruct_gets(t, &name) < 0 ||
4030         pa_tagstruct_gets(t, &argument) < 0 ||
4031         !pa_tagstruct_eof(t)) {
4032         protocol_error(c);
4033         return;
4034     }
4035
4036     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4037     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
4038     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
4039
4040     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
4041         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
4042         return;
4043     }
4044
4045     reply = reply_new(tag);
4046     pa_tagstruct_putu32(reply, m->index);
4047     pa_pstream_send_tagstruct(c->pstream, reply);
4048 }
4049
4050 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4051     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4052     uint32_t idx;
4053     pa_module *m;
4054
4055     pa_native_connection_assert_ref(c);
4056     pa_assert(t);
4057
4058     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4059         !pa_tagstruct_eof(t)) {
4060         protocol_error(c);
4061         return;
4062     }
4063
4064     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4065     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4066     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4067
4068     pa_module_unload_request(m, FALSE);
4069     pa_pstream_send_simple_ack(c->pstream, tag);
4070 }
4071
4072 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4073     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4074     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4075     const char *name_device = NULL;
4076
4077     pa_native_connection_assert_ref(c);
4078     pa_assert(t);
4079
4080     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4081         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4082         pa_tagstruct_gets(t, &name_device) < 0 ||
4083         !pa_tagstruct_eof(t)) {
4084         protocol_error(c);
4085         return;
4086     }
4087
4088     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4089     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4090
4091     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4092     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4093     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4094     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4095
4096     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4097         pa_sink_input *si = NULL;
4098         pa_sink *sink = NULL;
4099
4100         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4101
4102         if (idx_device != PA_INVALID_INDEX)
4103             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4104         else
4105             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4106
4107         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4108
4109         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4110             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4111             return;
4112         }
4113     } else {
4114         pa_source_output *so = NULL;
4115         pa_source *source;
4116
4117         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4118
4119         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4120
4121         if (idx_device != PA_INVALID_INDEX)
4122             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4123         else
4124             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4125
4126         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4127
4128         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4129             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4130             return;
4131         }
4132     }
4133
4134     pa_pstream_send_simple_ack(c->pstream, tag);
4135 }
4136
4137 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4138     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4139     uint32_t idx = PA_INVALID_INDEX;
4140     const char *name = NULL;
4141     pa_bool_t b;
4142
4143     pa_native_connection_assert_ref(c);
4144     pa_assert(t);
4145
4146     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4147         pa_tagstruct_gets(t, &name) < 0 ||
4148         pa_tagstruct_get_boolean(t, &b) < 0 ||
4149         !pa_tagstruct_eof(t)) {
4150         protocol_error(c);
4151         return;
4152     }
4153
4154     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4155     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4156     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4157     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4158     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4159
4160     if (command == PA_COMMAND_SUSPEND_SINK) {
4161
4162         if (idx == PA_INVALID_INDEX && name && !*name) {
4163
4164             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4165
4166             if (pa_sink_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4167                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4168                 return;
4169             }
4170         } else {
4171             pa_sink *sink = NULL;
4172
4173             if (idx != PA_INVALID_INDEX)
4174                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4175             else
4176                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4177
4178             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4179
4180             if (pa_sink_suspend(sink, b, PA_SUSPEND_USER) < 0) {
4181                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4182                 return;
4183             }
4184         }
4185     } else {
4186
4187         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4188
4189         if (idx == PA_INVALID_INDEX && name && !*name) {
4190
4191             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4192
4193             if (pa_source_suspend_all(c->protocol->core, b, PA_SUSPEND_USER) < 0) {
4194                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4195                 return;
4196             }
4197
4198         } else {
4199             pa_source *source;
4200
4201             if (idx != PA_INVALID_INDEX)
4202                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4203             else
4204                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4205
4206             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4207
4208             if (pa_source_suspend(source, b, PA_SUSPEND_USER) < 0) {
4209                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4210                 return;
4211             }
4212         }
4213     }
4214
4215     pa_pstream_send_simple_ack(c->pstream, tag);
4216 }
4217
4218 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4219     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4220     uint32_t idx = PA_INVALID_INDEX;
4221     const char *name = NULL;
4222     pa_module *m;
4223     pa_native_protocol_ext_cb_t cb;
4224
4225     pa_native_connection_assert_ref(c);
4226     pa_assert(t);
4227
4228     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4229         pa_tagstruct_gets(t, &name) < 0) {
4230         protocol_error(c);
4231         return;
4232     }
4233
4234     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4235     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4236     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4237     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4238     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4239
4240     if (idx != PA_INVALID_INDEX)
4241         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4242     else {
4243         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4244             if (strcmp(name, m->name) == 0)
4245                 break;
4246     }
4247
4248     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4249     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4250
4251     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4252     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4253
4254     if (cb(c->protocol, m, c, tag, t) < 0)
4255         protocol_error(c);
4256 }
4257
4258 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4259     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4260     uint32_t idx = PA_INVALID_INDEX;
4261     const char *name = NULL, *profile = NULL;
4262     pa_card *card = NULL;
4263     int ret;
4264
4265     pa_native_connection_assert_ref(c);
4266     pa_assert(t);
4267
4268     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4269         pa_tagstruct_gets(t, &name) < 0 ||
4270         pa_tagstruct_gets(t, &profile) < 0 ||
4271         !pa_tagstruct_eof(t)) {
4272         protocol_error(c);
4273         return;
4274     }
4275
4276     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4277     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4278     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4279     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4280     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4281
4282     if (idx != PA_INVALID_INDEX)
4283         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4284     else
4285         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4286
4287     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4288
4289     if ((ret = pa_card_set_profile(card, profile, TRUE)) < 0) {
4290         pa_pstream_send_error(c->pstream, tag, -ret);
4291         return;
4292     }
4293
4294     pa_pstream_send_simple_ack(c->pstream, tag);
4295 }
4296
4297 static void command_set_sink_or_source_port(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4298     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4299     uint32_t idx = PA_INVALID_INDEX;
4300     const char *name = NULL, *port = NULL;
4301     int ret;
4302
4303     pa_native_connection_assert_ref(c);
4304     pa_assert(t);
4305
4306     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4307         pa_tagstruct_gets(t, &name) < 0 ||
4308         pa_tagstruct_gets(t, &port) < 0 ||
4309         !pa_tagstruct_eof(t)) {
4310         protocol_error(c);
4311         return;
4312     }
4313
4314     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4315     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4316     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4317     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4318     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4319
4320     if (command == PA_COMMAND_SET_SINK_PORT) {
4321         pa_sink *sink;
4322
4323         if (idx != PA_INVALID_INDEX)
4324             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4325         else
4326             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4327
4328         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4329
4330         if ((ret = pa_sink_set_port(sink, port, TRUE)) < 0) {
4331             pa_pstream_send_error(c->pstream, tag, -ret);
4332             return;
4333         }
4334     } else {
4335         pa_source *source;
4336
4337         pa_assert(command = PA_COMMAND_SET_SOURCE_PORT);
4338
4339         if (idx != PA_INVALID_INDEX)
4340             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4341         else
4342             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4343
4344         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4345
4346         if ((ret = pa_source_set_port(source, port, TRUE)) < 0) {
4347             pa_pstream_send_error(c->pstream, tag, -ret);
4348             return;
4349         }
4350     }
4351
4352     pa_pstream_send_simple_ack(c->pstream, tag);
4353 }
4354
4355 /*** pstream callbacks ***/
4356
4357 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4358     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4359
4360     pa_assert(p);
4361     pa_assert(packet);
4362     pa_native_connection_assert_ref(c);
4363
4364     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4365         pa_log("invalid packet.");
4366         native_connection_unlink(c);
4367     }
4368 }
4369
4370 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4371     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4372     output_stream *stream;
4373
4374     pa_assert(p);
4375     pa_assert(chunk);
4376     pa_native_connection_assert_ref(c);
4377
4378     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4379         pa_log_debug("Client sent block for invalid stream.");
4380         /* Ignoring */
4381         return;
4382     }
4383
4384 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4385
4386     if (playback_stream_isinstance(stream)) {
4387         playback_stream *ps = PLAYBACK_STREAM(stream);
4388
4389         if (chunk->memblock) {
4390             if (seek != PA_SEEK_RELATIVE || offset != 0)
4391                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4392
4393             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4394         } else
4395             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4396
4397     } else {
4398         upload_stream *u = UPLOAD_STREAM(stream);
4399         size_t l;
4400
4401         if (!u->memchunk.memblock) {
4402             if (u->length == chunk->length && chunk->memblock) {
4403                 u->memchunk = *chunk;
4404                 pa_memblock_ref(u->memchunk.memblock);
4405                 u->length = 0;
4406             } else {
4407                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4408                 u->memchunk.index = u->memchunk.length = 0;
4409             }
4410         }
4411
4412         pa_assert(u->memchunk.memblock);
4413
4414         l = u->length;
4415         if (l > chunk->length)
4416             l = chunk->length;
4417
4418         if (l > 0) {
4419             void *dst;
4420             dst = pa_memblock_acquire(u->memchunk.memblock);
4421
4422             if (chunk->memblock) {
4423                 void *src;
4424                 src = pa_memblock_acquire(chunk->memblock);
4425
4426                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4427                        (uint8_t*) src + chunk->index, l);
4428
4429                 pa_memblock_release(chunk->memblock);
4430             } else
4431                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4432
4433             pa_memblock_release(u->memchunk.memblock);
4434
4435             u->memchunk.length += l;
4436             u->length -= l;
4437         }
4438     }
4439 }
4440
4441 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4442     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4443
4444     pa_assert(p);
4445     pa_native_connection_assert_ref(c);
4446
4447     native_connection_unlink(c);
4448     pa_log_info("Connection died.");
4449 }
4450
4451 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4452     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4453
4454     pa_assert(p);
4455     pa_native_connection_assert_ref(c);
4456
4457     native_connection_send_memblock(c);
4458 }
4459
4460 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4461     pa_thread_mq *q;
4462
4463     if (!(q = pa_thread_mq_get()))
4464         pa_pstream_send_revoke(p, block_id);
4465     else
4466         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4467 }
4468
4469 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4470     pa_thread_mq *q;
4471
4472     if (!(q = pa_thread_mq_get()))
4473         pa_pstream_send_release(p, block_id);
4474     else
4475         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4476 }
4477
4478 /*** client callbacks ***/
4479
4480 static void client_kill_cb(pa_client *c) {
4481     pa_assert(c);
4482
4483     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4484     pa_log_info("Connection killed.");
4485 }
4486
4487 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4488     pa_tagstruct *t;
4489     pa_native_connection *c;
4490
4491     pa_assert(client);
4492     c = PA_NATIVE_CONNECTION(client->userdata);
4493     pa_native_connection_assert_ref(c);
4494
4495     if (c->version < 15)
4496       return;
4497
4498     t = pa_tagstruct_new(NULL, 0);
4499     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4500     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4501     pa_tagstruct_puts(t, event);
4502     pa_tagstruct_put_proplist(t, pl);
4503     pa_pstream_send_tagstruct(c->pstream, t);
4504 }
4505
4506 /*** module entry points ***/
4507
4508 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *t, void *userdata) {
4509     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4510
4511     pa_assert(m);
4512     pa_native_connection_assert_ref(c);
4513     pa_assert(c->auth_timeout_event == e);
4514
4515     if (!c->authorized) {
4516         native_connection_unlink(c);
4517         pa_log_info("Connection terminated due to authentication timeout.");
4518     }
4519 }
4520
4521 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4522     pa_native_connection *c;
4523     char pname[128];
4524     pa_client *client;
4525     pa_client_new_data data;
4526
4527     pa_assert(p);
4528     pa_assert(io);
4529     pa_assert(o);
4530
4531     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4532         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4533         pa_iochannel_free(io);
4534         return;
4535     }
4536
4537     pa_client_new_data_init(&data);
4538     data.module = o->module;
4539     data.driver = __FILE__;
4540     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4541     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4542     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4543     client = pa_client_new(p->core, &data);
4544     pa_client_new_data_done(&data);
4545
4546     if (!client)
4547         return;
4548
4549     c = pa_msgobject_new(pa_native_connection);
4550     c->parent.parent.free = native_connection_free;
4551     c->parent.process_msg = native_connection_process_msg;
4552     c->protocol = p;
4553     c->options = pa_native_options_ref(o);
4554     c->authorized = FALSE;
4555
4556     if (o->auth_anonymous) {
4557         pa_log_info("Client authenticated anonymously.");
4558         c->authorized = TRUE;
4559     }
4560
4561     if (!c->authorized &&
4562         o->auth_ip_acl &&
4563         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4564
4565         pa_log_info("Client authenticated by IP ACL.");
4566         c->authorized = TRUE;
4567     }
4568
4569     if (!c->authorized)
4570         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
4571     else
4572         c->auth_timeout_event = NULL;
4573
4574     c->is_local = pa_iochannel_socket_is_local(io);
4575     c->version = 8;
4576
4577     c->client = client;
4578     c->client->kill = client_kill_cb;
4579     c->client->send_event = client_send_event_cb;
4580     c->client->userdata = c;
4581
4582     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4583     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4584     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4585     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4586     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4587     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4588     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4589
4590     c->pdispatch = pa_pdispatch_new(p->core->mainloop, TRUE, command_table, PA_COMMAND_MAX);
4591
4592     c->record_streams = pa_idxset_new(NULL, NULL);
4593     c->output_streams = pa_idxset_new(NULL, NULL);
4594
4595     c->rrobin_index = PA_IDXSET_INVALID;
4596     c->subscription = NULL;
4597
4598     pa_idxset_put(p->connections, c, NULL);
4599
4600 #ifdef HAVE_CREDS
4601     if (pa_iochannel_creds_supported(io))
4602         pa_iochannel_creds_enable(io);
4603 #endif
4604
4605     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4606 }
4607
4608 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4609     pa_native_connection *c;
4610     void *state = NULL;
4611
4612     pa_assert(p);
4613     pa_assert(m);
4614
4615     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4616         if (c->options->module == m)
4617             native_connection_unlink(c);
4618 }
4619
4620 static pa_native_protocol* native_protocol_new(pa_core *c) {
4621     pa_native_protocol *p;
4622     pa_native_hook_t h;
4623
4624     pa_assert(c);
4625
4626     p = pa_xnew(pa_native_protocol, 1);
4627     PA_REFCNT_INIT(p);
4628     p->core = c;
4629     p->connections = pa_idxset_new(NULL, NULL);
4630
4631     p->servers = NULL;
4632
4633     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4634
4635     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4636         pa_hook_init(&p->hooks[h], p);
4637
4638     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4639
4640     return p;
4641 }
4642
4643 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4644     pa_native_protocol *p;
4645
4646     if ((p = pa_shared_get(c, "native-protocol")))
4647         return pa_native_protocol_ref(p);
4648
4649     return native_protocol_new(c);
4650 }
4651
4652 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4653     pa_assert(p);
4654     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4655
4656     PA_REFCNT_INC(p);
4657
4658     return p;
4659 }
4660
4661 void pa_native_protocol_unref(pa_native_protocol *p) {
4662     pa_native_connection *c;
4663     pa_native_hook_t h;
4664
4665     pa_assert(p);
4666     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4667
4668     if (PA_REFCNT_DEC(p) > 0)
4669         return;
4670
4671     while ((c = pa_idxset_first(p->connections, NULL)))
4672         native_connection_unlink(c);
4673
4674     pa_idxset_free(p->connections, NULL, NULL);
4675
4676     pa_strlist_free(p->servers);
4677
4678     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4679         pa_hook_done(&p->hooks[h]);
4680
4681     pa_hashmap_free(p->extensions, NULL, NULL);
4682
4683     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4684
4685     pa_xfree(p);
4686 }
4687
4688 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4689     pa_assert(p);
4690     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4691     pa_assert(name);
4692
4693     p->servers = pa_strlist_prepend(p->servers, name);
4694
4695     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4696 }
4697
4698 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4699     pa_assert(p);
4700     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4701     pa_assert(name);
4702
4703     p->servers = pa_strlist_remove(p->servers, name);
4704
4705     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4706 }
4707
4708 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4709     pa_assert(p);
4710     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4711
4712     return p->hooks;
4713 }
4714
4715 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4716     pa_assert(p);
4717     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4718
4719     return p->servers;
4720 }
4721
4722 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4723     pa_assert(p);
4724     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4725     pa_assert(m);
4726     pa_assert(cb);
4727     pa_assert(!pa_hashmap_get(p->extensions, m));
4728
4729     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4730     return 0;
4731 }
4732
4733 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4734     pa_assert(p);
4735     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4736     pa_assert(m);
4737
4738     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4739 }
4740
4741 pa_native_options* pa_native_options_new(void) {
4742     pa_native_options *o;
4743
4744     o = pa_xnew0(pa_native_options, 1);
4745     PA_REFCNT_INIT(o);
4746
4747     return o;
4748 }
4749
4750 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4751     pa_assert(o);
4752     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4753
4754     PA_REFCNT_INC(o);
4755
4756     return o;
4757 }
4758
4759 void pa_native_options_unref(pa_native_options *o) {
4760     pa_assert(o);
4761     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4762
4763     if (PA_REFCNT_DEC(o) > 0)
4764         return;
4765
4766     pa_xfree(o->auth_group);
4767
4768     if (o->auth_ip_acl)
4769         pa_ip_acl_free(o->auth_ip_acl);
4770
4771     if (o->auth_cookie)
4772         pa_auth_cookie_unref(o->auth_cookie);
4773
4774     pa_xfree(o);
4775 }
4776
4777 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4778     pa_bool_t enabled;
4779     const char *acl;
4780
4781     pa_assert(o);
4782     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4783     pa_assert(ma);
4784
4785     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4786         pa_log("auth-anonymous= expects a boolean argument.");
4787         return -1;
4788     }
4789
4790     enabled = TRUE;
4791     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4792         pa_log("auth-group-enabled= expects a boolean argument.");
4793         return -1;
4794     }
4795
4796     pa_xfree(o->auth_group);
4797     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4798
4799 #ifndef HAVE_CREDS
4800     if (o->auth_group)
4801         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4802 #endif
4803
4804     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4805         pa_ip_acl *ipa;
4806
4807         if (!(ipa = pa_ip_acl_new(acl))) {
4808             pa_log("Failed to parse IP ACL '%s'", acl);
4809             return -1;
4810         }
4811
4812         if (o->auth_ip_acl)
4813             pa_ip_acl_free(o->auth_ip_acl);
4814
4815         o->auth_ip_acl = ipa;
4816     }
4817
4818     enabled = TRUE;
4819     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4820         pa_log("auth-cookie-enabled= expects a boolean argument.");
4821         return -1;
4822     }
4823
4824     if (o->auth_cookie)
4825         pa_auth_cookie_unref(o->auth_cookie);
4826
4827     if (enabled) {
4828         const char *cn;
4829
4830         /* The new name for this is 'auth-cookie', for compat reasons
4831          * we check the old name too */
4832         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4833             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4834                 cn = PA_NATIVE_COOKIE_FILE;
4835
4836         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4837             return -1;
4838
4839     } else
4840           o->auth_cookie = NULL;
4841
4842     return 0;
4843 }
4844
4845 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4846     pa_native_connection_assert_ref(c);
4847
4848     return c->pstream;
4849 }