Merge commit 'coling/master'
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/timeval.h>
33 #include <pulse/version.h>
34 #include <pulse/utf8.h>
35 #include <pulse/util.h>
36 #include <pulse/xmalloc.h>
37
38 #include <pulsecore/native-common.h>
39 #include <pulsecore/packet.h>
40 #include <pulsecore/client.h>
41 #include <pulsecore/source-output.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/pstream.h>
44 #include <pulsecore/tagstruct.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream-util.h>
47 #include <pulsecore/authkey.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/core-subscribe.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/strlist.h>
53 #include <pulsecore/shared.h>
54 #include <pulsecore/sample-util.h>
55 #include <pulsecore/llist.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* Kick a client if it doesn't authenticate within this time */
64 #define AUTH_TIMEOUT 60
65
66 /* Don't accept more connection than this */
67 #define MAX_CONNECTIONS 64
68
69 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
70 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
71 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
72 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
73
74 struct pa_native_protocol;
75
76 typedef struct record_stream {
77     pa_msgobject parent;
78
79     pa_native_connection *connection;
80     uint32_t index;
81
82     pa_source_output *source_output;
83     pa_memblockq *memblockq;
84
85     pa_bool_t adjust_latency:1;
86     pa_bool_t early_requests:1;
87
88     pa_buffer_attr buffer_attr;
89     pa_usec_t source_latency;
90 } record_stream;
91
92 PA_DECLARE_CLASS(record_stream);
93 #define RECORD_STREAM(o) (record_stream_cast(o))
94 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
95
96 typedef struct output_stream {
97     pa_msgobject parent;
98 } output_stream;
99
100 PA_DECLARE_CLASS(output_stream);
101 #define OUTPUT_STREAM(o) (output_stream_cast(o))
102 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
103
104 typedef struct playback_stream {
105     output_stream parent;
106
107     pa_native_connection *connection;
108     uint32_t index;
109
110     pa_sink_input *sink_input;
111     pa_memblockq *memblockq;
112
113     pa_bool_t adjust_latency:1;
114     pa_bool_t early_requests:1;
115
116     pa_bool_t is_underrun:1;
117     pa_bool_t drain_request:1;
118     uint32_t drain_tag;
119     uint32_t syncid;
120
121     pa_atomic_t missing;
122     pa_usec_t sink_latency;
123     pa_buffer_attr buffer_attr;
124
125     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
126     int64_t read_index, write_index;
127     size_t render_memblockq_length;
128 } playback_stream;
129
130 PA_DECLARE_CLASS(playback_stream);
131 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
132 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
133
134 typedef struct upload_stream {
135     output_stream parent;
136
137     pa_native_connection *connection;
138     uint32_t index;
139
140     pa_memchunk memchunk;
141     size_t length;
142     char *name;
143     pa_sample_spec sample_spec;
144     pa_channel_map channel_map;
145     pa_proplist *proplist;
146 } upload_stream;
147
148 PA_DECLARE_CLASS(upload_stream);
149 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
150 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
151
152 struct pa_native_connection {
153     pa_msgobject parent;
154     pa_native_protocol *protocol;
155     pa_native_options *options;
156     pa_bool_t authorized:1;
157     pa_bool_t is_local:1;
158     uint32_t version;
159     pa_client *client;
160     pa_pstream *pstream;
161     pa_pdispatch *pdispatch;
162     pa_idxset *record_streams, *output_streams;
163     uint32_t rrobin_index;
164     pa_subscription *subscription;
165     pa_time_event *auth_timeout_event;
166 };
167
168 PA_DECLARE_CLASS(pa_native_connection);
169 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
170 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
171
172 struct pa_native_protocol {
173     PA_REFCNT_DECLARE;
174
175     pa_core *core;
176     pa_idxset *connections;
177
178     pa_strlist *servers;
179     pa_hook hooks[PA_NATIVE_HOOK_MAX];
180
181     pa_hashmap *extensions;
182 };
183
184 enum {
185     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
186     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
187     SINK_INPUT_MESSAGE_FLUSH,
188     SINK_INPUT_MESSAGE_TRIGGER,
189     SINK_INPUT_MESSAGE_SEEK,
190     SINK_INPUT_MESSAGE_PREBUF_FORCE,
191     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
192     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
193 };
194
195 enum {
196     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
197     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
198     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
199     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
200     PLAYBACK_STREAM_MESSAGE_STARTED,
201     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
202 };
203
204 enum {
205     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
206 };
207
208 enum {
209     CONNECTION_MESSAGE_RELEASE,
210     CONNECTION_MESSAGE_REVOKE
211 };
212
213 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
214 static void sink_input_kill_cb(pa_sink_input *i);
215 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
216 static void sink_input_moving_cb(pa_sink_input *i);
217 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
218 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
219 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
220 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
221
222 static void native_connection_send_memblock(pa_native_connection *c);
223 static void playback_stream_request_bytes(struct playback_stream*s);
224
225 static void source_output_kill_cb(pa_source_output *o);
226 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
227 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
228 static void source_output_moving_cb(pa_source_output *o);
229 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
230 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
231
232 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
233
234 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
235 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
236 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
237 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
238 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
239 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
240 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
241 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
242 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
243 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
244 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
245 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
246 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
247 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
248 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
249 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272
273 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
274     [PA_COMMAND_ERROR] = NULL,
275     [PA_COMMAND_TIMEOUT] = NULL,
276     [PA_COMMAND_REPLY] = NULL,
277     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
278     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
279     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
280     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
281     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
282     [PA_COMMAND_AUTH] = command_auth,
283     [PA_COMMAND_REQUEST] = NULL,
284     [PA_COMMAND_EXIT] = command_exit,
285     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
286     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
287     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
288     [PA_COMMAND_STAT] = command_stat,
289     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
290     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
291     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
292     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
293     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
294     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
295     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
296     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
297     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
298     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
299     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
300     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
301     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
302     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
303     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
304     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
305     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
306     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
307     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
308     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
309     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
310     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
311     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
312     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
313     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
314
315     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
316     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
317     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
318
319     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
320     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
321     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
322
323     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
324     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
325
326     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
327     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
328     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
329     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
330
331     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
332     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
333
334     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
335     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
336     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
337     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
338     [PA_COMMAND_KILL_CLIENT] = command_kill,
339     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
340     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
341     [PA_COMMAND_LOAD_MODULE] = command_load_module,
342     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
343
344     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
345     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
346     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
347     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
348
349     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
350     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
351
352     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
353     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
354
355     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
356     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
357
358     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
359     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
360     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
361
362     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
363     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
364     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
365
366     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
367
368     [PA_COMMAND_EXTENSION] = command_extension
369 };
370
371 /* structure management */
372
373 /* Called from main context */
374 static void upload_stream_unlink(upload_stream *s) {
375     pa_assert(s);
376
377     if (!s->connection)
378         return;
379
380     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
381     s->connection = NULL;
382     upload_stream_unref(s);
383 }
384
385 /* Called from main context */
386 static void upload_stream_free(pa_object *o) {
387     upload_stream *s = UPLOAD_STREAM(o);
388     pa_assert(s);
389
390     upload_stream_unlink(s);
391
392     pa_xfree(s->name);
393
394     if (s->proplist)
395         pa_proplist_free(s->proplist);
396
397     if (s->memchunk.memblock)
398         pa_memblock_unref(s->memchunk.memblock);
399
400     pa_xfree(s);
401 }
402
403 /* Called from main context */
404 static upload_stream* upload_stream_new(
405         pa_native_connection *c,
406         const pa_sample_spec *ss,
407         const pa_channel_map *map,
408         const char *name,
409         size_t length,
410         pa_proplist *p) {
411
412     upload_stream *s;
413
414     pa_assert(c);
415     pa_assert(ss);
416     pa_assert(name);
417     pa_assert(length > 0);
418     pa_assert(p);
419
420     s = pa_msgobject_new(upload_stream);
421     s->parent.parent.parent.free = upload_stream_free;
422     s->connection = c;
423     s->sample_spec = *ss;
424     s->channel_map = *map;
425     s->name = pa_xstrdup(name);
426     pa_memchunk_reset(&s->memchunk);
427     s->length = length;
428     s->proplist = pa_proplist_copy(p);
429     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
430
431     pa_idxset_put(c->output_streams, s, &s->index);
432
433     return s;
434 }
435
436 /* Called from main context */
437 static void record_stream_unlink(record_stream *s) {
438     pa_assert(s);
439
440     if (!s->connection)
441         return;
442
443     if (s->source_output) {
444         pa_source_output_unlink(s->source_output);
445         pa_source_output_unref(s->source_output);
446         s->source_output = NULL;
447     }
448
449     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
450     s->connection = NULL;
451     record_stream_unref(s);
452 }
453
454 /* Called from main context */
455 static void record_stream_free(pa_object *o) {
456     record_stream *s = RECORD_STREAM(o);
457     pa_assert(s);
458
459     record_stream_unlink(s);
460
461     pa_memblockq_free(s->memblockq);
462     pa_xfree(s);
463 }
464
465 /* Called from main context */
466 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
467     record_stream *s = RECORD_STREAM(o);
468     record_stream_assert_ref(s);
469
470     if (!s->connection)
471         return -1;
472
473     switch (code) {
474
475         case RECORD_STREAM_MESSAGE_POST_DATA:
476
477             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
478 /*                 pa_log_warn("Failed to push data into output queue."); */
479                 return -1;
480             }
481
482             if (!pa_pstream_is_pending(s->connection->pstream))
483                 native_connection_send_memblock(s->connection);
484
485             break;
486     }
487
488     return 0;
489 }
490
491 /* Called from main context */
492 static void fix_record_buffer_attr_pre(record_stream *s) {
493
494     size_t frame_size;
495     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
496
497     pa_assert(s);
498
499     /* This function will be called from the main thread, before as
500      * well as after the source output has been activated using
501      * pa_source_output_put()! That means it may not touch any
502      * ->thread_info data! */
503
504     frame_size = pa_frame_size(&s->source_output->sample_spec);
505
506     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
507         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
508     if (s->buffer_attr.maxlength <= 0)
509         s->buffer_attr.maxlength = (uint32_t) frame_size;
510
511     if (s->buffer_attr.fragsize == (uint32_t) -1)
512         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
513     if (s->buffer_attr.fragsize <= 0)
514         s->buffer_attr.fragsize = (uint32_t) frame_size;
515
516     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
517
518     if (s->early_requests) {
519
520         /* In early request mode we need to emulate the classic
521          * fragment-based playback model. We do this setting the source
522          * latency to the fragment size. */
523
524         source_usec = fragsize_usec;
525
526     } else if (s->adjust_latency) {
527
528         /* So, the user asked us to adjust the latency according to
529          * what the source can provide. Half the latency will be
530          * spent on the hw buffer, half of it in the async buffer
531          * queue we maintain for each client. */
532
533         source_usec = fragsize_usec/2;
534
535     } else {
536
537         /* Ok, the user didn't ask us to adjust the latency, hence we
538          * don't */
539
540         source_usec = 0;
541     }
542
543     if (source_usec > 0)
544         s->source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
545     else
546         s->source_latency = 0;
547
548     if (s->early_requests) {
549
550         /* Ok, we didn't necessarily get what we were asking for, so
551          * let's tell the user */
552
553         fragsize_usec = s->source_latency;
554
555     } else if (s->adjust_latency) {
556
557         /* Now subtract what we actually got */
558
559         if (fragsize_usec >= s->source_latency*2)
560             fragsize_usec -= s->source_latency;
561         else
562             fragsize_usec = s->source_latency;
563     }
564
565     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
566         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
567
568         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
569
570     if (s->buffer_attr.fragsize <= 0)
571         s->buffer_attr.fragsize = (uint32_t) frame_size;
572 }
573
574 /* Called from main context */
575 static void fix_record_buffer_attr_post(record_stream *s) {
576     size_t base;
577
578     pa_assert(s);
579
580     /* This function will be called from the main thread, before as
581      * well as after the source output has been activated using
582      * pa_source_output_put()! That means it may not touch and
583      * ->thread_info data! */
584
585     base = pa_frame_size(&s->source_output->sample_spec);
586
587     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
588     if (s->buffer_attr.fragsize <= 0)
589         s->buffer_attr.fragsize = base;
590
591     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
592         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
593 }
594
595 /* Called from main context */
596 static record_stream* record_stream_new(
597         pa_native_connection *c,
598         pa_source *source,
599         pa_sample_spec *ss,
600         pa_channel_map *map,
601         pa_bool_t peak_detect,
602         pa_buffer_attr *attr,
603         pa_source_output_flags_t flags,
604         pa_proplist *p,
605         pa_bool_t adjust_latency,
606         pa_sink_input *direct_on_input,
607         pa_bool_t early_requests,
608         int *ret) {
609
610     record_stream *s;
611     pa_source_output *source_output = NULL;
612     size_t base;
613     pa_source_output_new_data data;
614
615     pa_assert(c);
616     pa_assert(ss);
617     pa_assert(p);
618     pa_assert(ret);
619
620     pa_source_output_new_data_init(&data);
621
622     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
623     data.driver = __FILE__;
624     data.module = c->options->module;
625     data.client = c->client;
626     data.source = source;
627     data.direct_on_input = direct_on_input;
628     pa_source_output_new_data_set_sample_spec(&data, ss);
629     pa_source_output_new_data_set_channel_map(&data, map);
630     if (peak_detect)
631         data.resample_method = PA_RESAMPLER_PEAKS;
632
633     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
634
635     pa_source_output_new_data_done(&data);
636
637     if (!source_output)
638         return NULL;
639
640     s = pa_msgobject_new(record_stream);
641     s->parent.parent.free = record_stream_free;
642     s->parent.process_msg = record_stream_process_msg;
643     s->connection = c;
644     s->source_output = source_output;
645     s->buffer_attr = *attr;
646     s->adjust_latency = adjust_latency;
647     s->early_requests = early_requests;
648
649     s->source_output->push = source_output_push_cb;
650     s->source_output->kill = source_output_kill_cb;
651     s->source_output->get_latency = source_output_get_latency_cb;
652     s->source_output->moving = source_output_moving_cb;
653     s->source_output->suspend = source_output_suspend_cb;
654     s->source_output->send_event = source_output_send_event_cb;
655     s->source_output->userdata = s;
656
657     fix_record_buffer_attr_pre(s);
658
659     s->memblockq = pa_memblockq_new(
660             0,
661             s->buffer_attr.maxlength,
662             0,
663             base = pa_frame_size(&source_output->sample_spec),
664             1,
665             0,
666             0,
667             NULL);
668
669     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
670     fix_record_buffer_attr_post(s);
671
672     *ss = s->source_output->sample_spec;
673     *map = s->source_output->channel_map;
674
675     pa_idxset_put(c->record_streams, s, &s->index);
676
677     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
678                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->source_latency) / PA_USEC_PER_MSEC,
679                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
680                 (double) s->source_latency / PA_USEC_PER_MSEC);
681
682     pa_source_output_put(s->source_output);
683     return s;
684 }
685
686 /* Called from main context */
687 static void record_stream_send_killed(record_stream *r) {
688     pa_tagstruct *t;
689     record_stream_assert_ref(r);
690
691     t = pa_tagstruct_new(NULL, 0);
692     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
693     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
694     pa_tagstruct_putu32(t, r->index);
695     pa_pstream_send_tagstruct(r->connection->pstream, t);
696 }
697
698 /* Called from main context */
699 static void playback_stream_unlink(playback_stream *s) {
700     pa_assert(s);
701
702     if (!s->connection)
703         return;
704
705     if (s->sink_input) {
706         pa_sink_input_unlink(s->sink_input);
707         pa_sink_input_unref(s->sink_input);
708         s->sink_input = NULL;
709     }
710
711     if (s->drain_request)
712         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
713
714     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
715     s->connection = NULL;
716     playback_stream_unref(s);
717 }
718
719 /* Called from main context */
720 static void playback_stream_free(pa_object* o) {
721     playback_stream *s = PLAYBACK_STREAM(o);
722     pa_assert(s);
723
724     playback_stream_unlink(s);
725
726     pa_memblockq_free(s->memblockq);
727     pa_xfree(s);
728 }
729
730 /* Called from main context */
731 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
732     playback_stream *s = PLAYBACK_STREAM(o);
733     playback_stream_assert_ref(s);
734
735     if (!s->connection)
736         return -1;
737
738     switch (code) {
739         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
740             pa_tagstruct *t;
741             uint32_t l = 0;
742
743             for (;;) {
744                 if ((l = (uint32_t) pa_atomic_load(&s->missing)) <= 0)
745                     break;
746
747                 if (pa_atomic_cmpxchg(&s->missing, (int) l, 0))
748                     break;
749             }
750
751             if (l <= 0)
752                 break;
753
754             t = pa_tagstruct_new(NULL, 0);
755             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
756             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
757             pa_tagstruct_putu32(t, s->index);
758             pa_tagstruct_putu32(t, l);
759             pa_pstream_send_tagstruct(s->connection->pstream, t);
760
761 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
762             break;
763         }
764
765         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
766             pa_tagstruct *t;
767
768 /*             pa_log("signalling underflow"); */
769
770             /* Report that we're empty */
771             t = pa_tagstruct_new(NULL, 0);
772             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
773             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
774             pa_tagstruct_putu32(t, s->index);
775             pa_pstream_send_tagstruct(s->connection->pstream, t);
776             break;
777         }
778
779         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
780             pa_tagstruct *t;
781
782             /* Notify the user we're overflowed*/
783             t = pa_tagstruct_new(NULL, 0);
784             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
785             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
786             pa_tagstruct_putu32(t, s->index);
787             pa_pstream_send_tagstruct(s->connection->pstream, t);
788             break;
789         }
790
791         case PLAYBACK_STREAM_MESSAGE_STARTED:
792
793             if (s->connection->version >= 13) {
794                 pa_tagstruct *t;
795
796                 /* Notify the user we started playback */
797                 t = pa_tagstruct_new(NULL, 0);
798                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
799                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
800                 pa_tagstruct_putu32(t, s->index);
801                 pa_pstream_send_tagstruct(s->connection->pstream, t);
802             }
803
804             break;
805
806         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
807             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
808             break;
809
810         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
811             pa_tagstruct *t;
812
813             s->buffer_attr.tlength = (uint32_t) offset;
814
815             t = pa_tagstruct_new(NULL, 0);
816             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
817             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
818             pa_tagstruct_putu32(t, s->index);
819             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
820             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
821             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
822             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
823             pa_tagstruct_put_usec(t, s->sink_latency);
824             pa_pstream_send_tagstruct(s->connection->pstream, t);
825
826             break;
827         }
828     }
829
830     return 0;
831 }
832
833 /* Called from main context */
834 static void fix_playback_buffer_attr(playback_stream *s) {
835     size_t frame_size;
836     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
837
838     pa_assert(s);
839
840     /* This function will be called from the main thread, before as
841      * well as after the sink input has been activated using
842      * pa_sink_input_put()! That means it may not touch any
843      * ->thread_info data, such as the memblockq! */
844
845     frame_size = pa_frame_size(&s->sink_input->sample_spec);
846
847     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
848         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
849     if (s->buffer_attr.maxlength <= 0)
850         s->buffer_attr.maxlength = (uint32_t) frame_size;
851
852     if (s->buffer_attr.tlength == (uint32_t) -1)
853         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
854     if (s->buffer_attr.tlength <= 0)
855         s->buffer_attr.tlength = (uint32_t) frame_size;
856
857     if (s->buffer_attr.minreq == (uint32_t) -1)
858         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
859     if (s->buffer_attr.minreq <= 0)
860         s->buffer_attr.minreq = (uint32_t) frame_size;
861
862     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
863         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
864
865     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
866     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
867
868     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
869                 (double) tlength_usec / PA_USEC_PER_MSEC,
870                 (double) minreq_usec / PA_USEC_PER_MSEC);
871
872     if (s->early_requests) {
873
874         /* In early request mode we need to emulate the classic
875          * fragment-based playback model. We do this setting the sink
876          * latency to the fragment size. */
877
878         sink_usec = minreq_usec;
879         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
880
881     } else if (s->adjust_latency) {
882
883         /* So, the user asked us to adjust the latency of the stream
884          * buffer according to the what the sink can provide. The
885          * tlength passed in shall be the overall latency. Roughly
886          * half the latency will be spent on the hw buffer, the other
887          * half of it in the async buffer queue we maintain for each
888          * client. In between we'll have a safety space of size
889          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
890          * empty and needs to be filled, then our buffer must have
891          * enough data to fulfill this request immediatly and thus
892          * have at least the same tlength as the size of the hw
893          * buffer. It additionally needs space for 2 times minreq
894          * because if the buffer ran empty and a partial fillup
895          * happens immediately on the next iteration we need to be
896          * able to fulfill it and give the application also minreq
897          * time to fill it up again for the next request Makes 2 times
898          * minreq in plus.. */
899
900         if (tlength_usec > minreq_usec*2)
901             sink_usec = (tlength_usec - minreq_usec*2)/2;
902         else
903             sink_usec = 0;
904
905         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
906
907     } else {
908
909         /* Ok, the user didn't ask us to adjust the latency, but we
910          * still need to make sure that the parameters from the user
911          * do make sense. */
912
913         if (tlength_usec > minreq_usec*2)
914             sink_usec = (tlength_usec - minreq_usec*2);
915         else
916             sink_usec = 0;
917
918         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
919     }
920
921     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
922
923     if (s->early_requests) {
924
925         /* Ok, we didn't necessarily get what we were asking for, so
926          * let's tell the user */
927
928         minreq_usec = s->sink_latency;
929
930     } else if (s->adjust_latency) {
931
932         /* Ok, we didn't necessarily get what we were asking for, so
933          * let's subtract from what we asked for for the remaining
934          * buffer space */
935
936         if (tlength_usec >= s->sink_latency)
937             tlength_usec -= s->sink_latency;
938     }
939
940     /* FIXME: This is actually larger than necessary, since not all of
941      * the sink latency is actually rewritable. */
942     if (tlength_usec < s->sink_latency + 2*minreq_usec)
943         tlength_usec = s->sink_latency + 2*minreq_usec;
944
945     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
946         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
947         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
948
949     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
950         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
951         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
952
953     if (s->buffer_attr.minreq <= 0) {
954         s->buffer_attr.minreq = (uint32_t) frame_size;
955         s->buffer_attr.tlength += (uint32_t) frame_size*2;
956     }
957
958     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
959         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
960
961     if (s->buffer_attr.prebuf == (uint32_t) -1 || s->buffer_attr.prebuf > s->buffer_attr.tlength)
962         s->buffer_attr.prebuf = s->buffer_attr.tlength;
963 }
964
965 /* Called from main context */
966 static playback_stream* playback_stream_new(
967         pa_native_connection *c,
968         pa_sink *sink,
969         pa_sample_spec *ss,
970         pa_channel_map *map,
971         pa_buffer_attr *a,
972         pa_cvolume *volume,
973         pa_bool_t muted,
974         pa_bool_t muted_set,
975         uint32_t syncid,
976         uint32_t *missing,
977         pa_sink_input_flags_t flags,
978         pa_proplist *p,
979         pa_bool_t adjust_latency,
980         pa_bool_t early_requests,
981         int *ret) {
982
983     playback_stream *s, *ssync;
984     pa_sink_input *sink_input = NULL;
985     pa_memchunk silence;
986     uint32_t idx;
987     int64_t start_index;
988     pa_sink_input_new_data data;
989
990     pa_assert(c);
991     pa_assert(ss);
992     pa_assert(missing);
993     pa_assert(p);
994     pa_assert(ret);
995
996     /* Find syncid group */
997     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
998
999         if (!playback_stream_isinstance(ssync))
1000             continue;
1001
1002         if (ssync->syncid == syncid)
1003             break;
1004     }
1005
1006     /* Synced streams must connect to the same sink */
1007     if (ssync) {
1008
1009         if (!sink)
1010             sink = ssync->sink_input->sink;
1011         else if (sink != ssync->sink_input->sink) {
1012             *ret = PA_ERR_INVALID;
1013             return NULL;
1014         }
1015     }
1016
1017     pa_sink_input_new_data_init(&data);
1018
1019     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1020     data.driver = __FILE__;
1021     data.module = c->options->module;
1022     data.client = c->client;
1023     data.sink = sink;
1024     pa_sink_input_new_data_set_sample_spec(&data, ss);
1025     pa_sink_input_new_data_set_channel_map(&data, map);
1026     if (volume)
1027         pa_sink_input_new_data_set_volume(&data, volume);
1028     if (muted_set)
1029         pa_sink_input_new_data_set_muted(&data, muted);
1030     data.sync_base = ssync ? ssync->sink_input : NULL;
1031
1032     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1033
1034     pa_sink_input_new_data_done(&data);
1035
1036     if (!sink_input)
1037         return NULL;
1038
1039     s = pa_msgobject_new(playback_stream);
1040     s->parent.parent.parent.free = playback_stream_free;
1041     s->parent.parent.process_msg = playback_stream_process_msg;
1042     s->connection = c;
1043     s->syncid = syncid;
1044     s->sink_input = sink_input;
1045     s->is_underrun = TRUE;
1046     s->drain_request = FALSE;
1047     pa_atomic_store(&s->missing, 0);
1048     s->buffer_attr = *a;
1049     s->adjust_latency = adjust_latency;
1050     s->early_requests = early_requests;
1051
1052     s->sink_input->parent.process_msg = sink_input_process_msg;
1053     s->sink_input->pop = sink_input_pop_cb;
1054     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1055     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1056     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1057     s->sink_input->kill = sink_input_kill_cb;
1058     s->sink_input->moving = sink_input_moving_cb;
1059     s->sink_input->suspend = sink_input_suspend_cb;
1060     s->sink_input->send_event = sink_input_send_event_cb;
1061     s->sink_input->userdata = s;
1062
1063     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1064
1065     fix_playback_buffer_attr(s);
1066
1067     pa_sink_input_get_silence(sink_input, &silence);
1068     s->memblockq = pa_memblockq_new(
1069             start_index,
1070             s->buffer_attr.maxlength,
1071             s->buffer_attr.tlength,
1072             pa_frame_size(&sink_input->sample_spec),
1073             s->buffer_attr.prebuf,
1074             s->buffer_attr.minreq,
1075             0,
1076             &silence);
1077     pa_memblock_unref(silence.memblock);
1078
1079     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1080
1081     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1082
1083     *ss = s->sink_input->sample_spec;
1084     *map = s->sink_input->channel_map;
1085
1086     pa_idxset_put(c->output_streams, s, &s->index);
1087
1088     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1089                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->sink_latency) / PA_USEC_PER_MSEC,
1090                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1091                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1092                 (double) s->sink_latency / PA_USEC_PER_MSEC);
1093
1094     pa_sink_input_put(s->sink_input);
1095     return s;
1096 }
1097
1098 /* Called from IO context */
1099 static void playback_stream_request_bytes(playback_stream *s) {
1100     size_t m, previous_missing, minreq;
1101
1102     playback_stream_assert_ref(s);
1103
1104     m = pa_memblockq_pop_missing(s->memblockq);
1105
1106     if (m <= 0)
1107         return;
1108
1109 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1110
1111     previous_missing = (size_t) pa_atomic_add(&s->missing, (int) m);
1112     minreq = pa_memblockq_get_minreq(s->memblockq);
1113
1114     if (pa_memblockq_prebuf_active(s->memblockq) ||
1115         (previous_missing < minreq && previous_missing+m >= minreq))
1116         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1117 }
1118
1119
1120 /* Called from main context */
1121 static void playback_stream_send_killed(playback_stream *p) {
1122     pa_tagstruct *t;
1123     playback_stream_assert_ref(p);
1124
1125     t = pa_tagstruct_new(NULL, 0);
1126     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1127     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1128     pa_tagstruct_putu32(t, p->index);
1129     pa_pstream_send_tagstruct(p->connection->pstream, t);
1130 }
1131
1132 /* Called from main context */
1133 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1134     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1135     pa_native_connection_assert_ref(c);
1136
1137     if (!c->protocol)
1138         return -1;
1139
1140     switch (code) {
1141
1142         case CONNECTION_MESSAGE_REVOKE:
1143             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1144             break;
1145
1146         case CONNECTION_MESSAGE_RELEASE:
1147             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1148             break;
1149     }
1150
1151     return 0;
1152 }
1153
1154 /* Called from main context */
1155 static void native_connection_unlink(pa_native_connection *c) {
1156     record_stream *r;
1157     output_stream *o;
1158
1159     pa_assert(c);
1160
1161     if (!c->protocol)
1162         return;
1163
1164     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1165
1166     if (c->options)
1167         pa_native_options_unref(c->options);
1168
1169     while ((r = pa_idxset_first(c->record_streams, NULL)))
1170         record_stream_unlink(r);
1171
1172     while ((o = pa_idxset_first(c->output_streams, NULL)))
1173         if (playback_stream_isinstance(o))
1174             playback_stream_unlink(PLAYBACK_STREAM(o));
1175         else
1176             upload_stream_unlink(UPLOAD_STREAM(o));
1177
1178     if (c->subscription)
1179         pa_subscription_free(c->subscription);
1180
1181     if (c->pstream)
1182         pa_pstream_unlink(c->pstream);
1183
1184     if (c->auth_timeout_event) {
1185         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1186         c->auth_timeout_event = NULL;
1187     }
1188
1189     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1190     c->protocol = NULL;
1191     pa_native_connection_unref(c);
1192 }
1193
1194 /* Called from main context */
1195 static void native_connection_free(pa_object *o) {
1196     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1197
1198     pa_assert(c);
1199
1200     native_connection_unlink(c);
1201
1202     pa_idxset_free(c->record_streams, NULL, NULL);
1203     pa_idxset_free(c->output_streams, NULL, NULL);
1204
1205     pa_pdispatch_unref(c->pdispatch);
1206     pa_pstream_unref(c->pstream);
1207     pa_client_free(c->client);
1208
1209     pa_xfree(c);
1210 }
1211
1212 /* Called from main context */
1213 static void native_connection_send_memblock(pa_native_connection *c) {
1214     uint32_t start;
1215     record_stream *r;
1216
1217     start = PA_IDXSET_INVALID;
1218     for (;;) {
1219         pa_memchunk chunk;
1220
1221         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1222             return;
1223
1224         if (start == PA_IDXSET_INVALID)
1225             start = c->rrobin_index;
1226         else if (start == c->rrobin_index)
1227             return;
1228
1229         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1230             pa_memchunk schunk = chunk;
1231
1232             if (schunk.length > r->buffer_attr.fragsize)
1233                 schunk.length = r->buffer_attr.fragsize;
1234
1235             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1236
1237             pa_memblockq_drop(r->memblockq, schunk.length);
1238             pa_memblock_unref(schunk.memblock);
1239
1240             return;
1241         }
1242     }
1243 }
1244
1245 /*** sink input callbacks ***/
1246
1247 /* Called from thread context */
1248 static void handle_seek(playback_stream *s, int64_t indexw) {
1249     playback_stream_assert_ref(s);
1250
1251 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1252
1253     if (s->sink_input->thread_info.underrun_for > 0) {
1254
1255 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1256
1257         if (pa_memblockq_is_readable(s->memblockq)) {
1258
1259             /* We just ended an underrun, let's ask the sink
1260              * for a complete rewind rewrite */
1261
1262             pa_log_debug("Requesting rewind due to end of underrun.");
1263             pa_sink_input_request_rewind(s->sink_input,
1264                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1265                                          FALSE, TRUE, FALSE);
1266         }
1267
1268     } else {
1269         int64_t indexr;
1270
1271         indexr = pa_memblockq_get_read_index(s->memblockq);
1272
1273         if (indexw < indexr) {
1274             /* OK, the sink already asked for this data, so
1275              * let's have it usk us again */
1276
1277             pa_log_debug("Requesting rewind due to rewrite.");
1278             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1279         }
1280     }
1281
1282     playback_stream_request_bytes(s);
1283 }
1284
1285 /* Called from thread context */
1286 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1287     pa_sink_input *i = PA_SINK_INPUT(o);
1288     playback_stream *s;
1289
1290     pa_sink_input_assert_ref(i);
1291     s = PLAYBACK_STREAM(i->userdata);
1292     playback_stream_assert_ref(s);
1293
1294     switch (code) {
1295
1296         case SINK_INPUT_MESSAGE_SEEK: {
1297             int64_t windex;
1298
1299             windex = pa_memblockq_get_write_index(s->memblockq);
1300             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
1301
1302             handle_seek(s, windex);
1303             return 0;
1304         }
1305
1306         case SINK_INPUT_MESSAGE_POST_DATA: {
1307             int64_t windex;
1308
1309             pa_assert(chunk);
1310
1311             windex = pa_memblockq_get_write_index(s->memblockq);
1312
1313 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1314
1315             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1316                 pa_log_warn("Failed to push data into queue");
1317                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1318                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE);
1319             }
1320
1321             handle_seek(s, windex);
1322
1323 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1324
1325             return 0;
1326         }
1327
1328         case SINK_INPUT_MESSAGE_DRAIN:
1329         case SINK_INPUT_MESSAGE_FLUSH:
1330         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1331         case SINK_INPUT_MESSAGE_TRIGGER: {
1332
1333             int64_t windex;
1334             pa_sink_input *isync;
1335             void (*func)(pa_memblockq *bq);
1336
1337             switch  (code) {
1338                 case SINK_INPUT_MESSAGE_FLUSH:
1339                     func = pa_memblockq_flush_write;
1340                     break;
1341
1342                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1343                     func = pa_memblockq_prebuf_force;
1344                     break;
1345
1346                 case SINK_INPUT_MESSAGE_DRAIN:
1347                 case SINK_INPUT_MESSAGE_TRIGGER:
1348                     func = pa_memblockq_prebuf_disable;
1349                     break;
1350
1351                 default:
1352                     pa_assert_not_reached();
1353             }
1354
1355             windex = pa_memblockq_get_write_index(s->memblockq);
1356             func(s->memblockq);
1357             handle_seek(s, windex);
1358
1359             /* Do the same for all other members in the sync group */
1360             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1361                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1362                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1363                 func(ssync->memblockq);
1364                 handle_seek(ssync, windex);
1365             }
1366
1367             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1368                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1369                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1370                 func(ssync->memblockq);
1371                 handle_seek(ssync, windex);
1372             }
1373
1374             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1375                 if (!pa_memblockq_is_readable(s->memblockq))
1376                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1377                 else {
1378                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1379                     s->drain_request = TRUE;
1380                 }
1381             }
1382
1383             return 0;
1384         }
1385
1386         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1387
1388             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1389             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1390             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1391             return 0;
1392
1393         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1394             int64_t windex;
1395
1396             windex = pa_memblockq_get_write_index(s->memblockq);
1397
1398             pa_memblockq_prebuf_force(s->memblockq);
1399
1400             handle_seek(s, windex);
1401
1402             /* Fall through to the default handler */
1403             break;
1404         }
1405
1406         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1407             pa_usec_t *r = userdata;
1408
1409             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1410
1411             /* Fall through, the default handler will add in the extra
1412              * latency added by the resampler */
1413             break;
1414         }
1415
1416         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1417             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1418             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1419             return 0;
1420         }
1421     }
1422
1423     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1424 }
1425
1426 /* Called from thread context */
1427 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1428     playback_stream *s;
1429
1430     pa_sink_input_assert_ref(i);
1431     s = PLAYBACK_STREAM(i->userdata);
1432     playback_stream_assert_ref(s);
1433     pa_assert(chunk);
1434
1435 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1436
1437     if (pa_memblockq_is_readable(s->memblockq))
1438         s->is_underrun = FALSE;
1439     else {
1440 /*         pa_log("%s, UNDERRUN: %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1441
1442         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1443             s->drain_request = FALSE;
1444             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1445         } else if (!s->is_underrun)
1446             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1447
1448         s->is_underrun = TRUE;
1449
1450         playback_stream_request_bytes(s);
1451     }
1452
1453     /* This call will not fail with prebuf=0, hence we check for
1454        underrun explicitly above */
1455     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1456         return -1;
1457
1458     chunk->length = PA_MIN(nbytes, chunk->length);
1459
1460     if (i->thread_info.underrun_for > 0)
1461         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1462
1463     pa_memblockq_drop(s->memblockq, chunk->length);
1464     playback_stream_request_bytes(s);
1465
1466     return 0;
1467 }
1468
1469 /* Called from thread context */
1470 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1471     playback_stream *s;
1472
1473     pa_sink_input_assert_ref(i);
1474     s = PLAYBACK_STREAM(i->userdata);
1475     playback_stream_assert_ref(s);
1476
1477     /* If we are in an underrun, then we don't rewind */
1478     if (i->thread_info.underrun_for > 0)
1479         return;
1480
1481     pa_memblockq_rewind(s->memblockq, nbytes);
1482 }
1483
1484 /* Called from thread context */
1485 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1486     playback_stream *s;
1487
1488     pa_sink_input_assert_ref(i);
1489     s = PLAYBACK_STREAM(i->userdata);
1490     playback_stream_assert_ref(s);
1491
1492     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1493 }
1494
1495 /* Called from thread context */
1496 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1497     playback_stream *s;
1498     size_t tlength;
1499
1500     pa_sink_input_assert_ref(i);
1501     s = PLAYBACK_STREAM(i->userdata);
1502     playback_stream_assert_ref(s);
1503
1504     tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1505
1506     if (pa_memblockq_get_tlength(s->memblockq) < tlength) {
1507         pa_memblockq_set_tlength(s->memblockq, tlength);
1508         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1509     }
1510 }
1511
1512 /* Called from main context */
1513 static void sink_input_kill_cb(pa_sink_input *i) {
1514     playback_stream *s;
1515
1516     pa_sink_input_assert_ref(i);
1517     s = PLAYBACK_STREAM(i->userdata);
1518     playback_stream_assert_ref(s);
1519
1520     playback_stream_send_killed(s);
1521     playback_stream_unlink(s);
1522 }
1523
1524 /* Called from main context */
1525 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1526     playback_stream *s;
1527     pa_tagstruct *t;
1528
1529     pa_sink_input_assert_ref(i);
1530     s = PLAYBACK_STREAM(i->userdata);
1531     playback_stream_assert_ref(s);
1532
1533     if (s->connection->version < 15)
1534       return;
1535
1536     t = pa_tagstruct_new(NULL, 0);
1537     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1538     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1539     pa_tagstruct_putu32(t, s->index);
1540     pa_tagstruct_puts(t, event);
1541     pa_tagstruct_put_proplist(t, pl);
1542     pa_pstream_send_tagstruct(s->connection->pstream, t);
1543 }
1544
1545 /* Called from main context */
1546 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1547     playback_stream *s;
1548     pa_tagstruct *t;
1549
1550     pa_sink_input_assert_ref(i);
1551     s = PLAYBACK_STREAM(i->userdata);
1552     playback_stream_assert_ref(s);
1553
1554     if (s->connection->version < 12)
1555       return;
1556
1557     t = pa_tagstruct_new(NULL, 0);
1558     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1559     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1560     pa_tagstruct_putu32(t, s->index);
1561     pa_tagstruct_put_boolean(t, suspend);
1562     pa_pstream_send_tagstruct(s->connection->pstream, t);
1563 }
1564
1565 /* Called from main context */
1566 static void sink_input_moving_cb(pa_sink_input *i) {
1567     playback_stream *s;
1568     pa_tagstruct *t;
1569
1570     pa_sink_input_assert_ref(i);
1571     s = PLAYBACK_STREAM(i->userdata);
1572     playback_stream_assert_ref(s);
1573
1574     fix_playback_buffer_attr(s);
1575     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1576     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1577
1578     if (s->connection->version < 12)
1579       return;
1580
1581     t = pa_tagstruct_new(NULL, 0);
1582     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1583     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1584     pa_tagstruct_putu32(t, s->index);
1585     pa_tagstruct_putu32(t, i->sink->index);
1586     pa_tagstruct_puts(t, i->sink->name);
1587     pa_tagstruct_put_boolean(t, pa_sink_get_state(i->sink) == PA_SINK_SUSPENDED);
1588
1589     if (s->connection->version >= 13) {
1590         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1591         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1592         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1593         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1594         pa_tagstruct_put_usec(t, s->sink_latency);
1595     }
1596
1597     pa_pstream_send_tagstruct(s->connection->pstream, t);
1598 }
1599
1600 /*** source_output callbacks ***/
1601
1602 /* Called from thread context */
1603 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1604     record_stream *s;
1605
1606     pa_source_output_assert_ref(o);
1607     s = RECORD_STREAM(o->userdata);
1608     record_stream_assert_ref(s);
1609     pa_assert(chunk);
1610
1611     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1612 }
1613
1614 static void source_output_kill_cb(pa_source_output *o) {
1615     record_stream *s;
1616
1617     pa_source_output_assert_ref(o);
1618     s = RECORD_STREAM(o->userdata);
1619     record_stream_assert_ref(s);
1620
1621     record_stream_send_killed(s);
1622     record_stream_unlink(s);
1623 }
1624
1625 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1626     record_stream *s;
1627
1628     pa_source_output_assert_ref(o);
1629     s = RECORD_STREAM(o->userdata);
1630     record_stream_assert_ref(s);
1631
1632     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1633
1634     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1635 }
1636
1637 /* Called from main context */
1638 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1639     record_stream *s;
1640     pa_tagstruct *t;
1641
1642     pa_source_output_assert_ref(o);
1643     s = RECORD_STREAM(o->userdata);
1644     record_stream_assert_ref(s);
1645
1646     if (s->connection->version < 15)
1647       return;
1648
1649     t = pa_tagstruct_new(NULL, 0);
1650     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1651     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1652     pa_tagstruct_putu32(t, s->index);
1653     pa_tagstruct_puts(t, event);
1654     pa_tagstruct_put_proplist(t, pl);
1655     pa_pstream_send_tagstruct(s->connection->pstream, t);
1656 }
1657
1658 /* Called from main context */
1659 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1660     record_stream *s;
1661     pa_tagstruct *t;
1662
1663     pa_source_output_assert_ref(o);
1664     s = RECORD_STREAM(o->userdata);
1665     record_stream_assert_ref(s);
1666
1667     if (s->connection->version < 12)
1668       return;
1669
1670     t = pa_tagstruct_new(NULL, 0);
1671     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1672     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1673     pa_tagstruct_putu32(t, s->index);
1674     pa_tagstruct_put_boolean(t, suspend);
1675     pa_pstream_send_tagstruct(s->connection->pstream, t);
1676 }
1677
1678 /* Called from main context */
1679 static void source_output_moving_cb(pa_source_output *o) {
1680     record_stream *s;
1681     pa_tagstruct *t;
1682
1683     pa_source_output_assert_ref(o);
1684     s = RECORD_STREAM(o->userdata);
1685     record_stream_assert_ref(s);
1686
1687     fix_record_buffer_attr_pre(s);
1688     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1689     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1690     fix_record_buffer_attr_post(s);
1691
1692     if (s->connection->version < 12)
1693       return;
1694
1695     t = pa_tagstruct_new(NULL, 0);
1696     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1697     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1698     pa_tagstruct_putu32(t, s->index);
1699     pa_tagstruct_putu32(t, o->source->index);
1700     pa_tagstruct_puts(t, o->source->name);
1701     pa_tagstruct_put_boolean(t, pa_source_get_state(o->source) == PA_SOURCE_SUSPENDED);
1702
1703     if (s->connection->version >= 13) {
1704         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1705         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1706         pa_tagstruct_put_usec(t, s->source_latency);
1707     }
1708
1709     pa_pstream_send_tagstruct(s->connection->pstream, t);
1710 }
1711
1712 /*** pdispatch callbacks ***/
1713
1714 static void protocol_error(pa_native_connection *c) {
1715     pa_log("protocol error, kicking client");
1716     native_connection_unlink(c);
1717 }
1718
1719 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1720 if (!(expression)) { \
1721     pa_pstream_send_error((pstream), (tag), (error)); \
1722     return; \
1723 } \
1724 } while(0);
1725
1726 static pa_tagstruct *reply_new(uint32_t tag) {
1727     pa_tagstruct *reply;
1728
1729     reply = pa_tagstruct_new(NULL, 0);
1730     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1731     pa_tagstruct_putu32(reply, tag);
1732     return reply;
1733 }
1734
1735 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1736     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1737     playback_stream *s;
1738     uint32_t sink_index, syncid, missing;
1739     pa_buffer_attr attr;
1740     const char *name = NULL, *sink_name;
1741     pa_sample_spec ss;
1742     pa_channel_map map;
1743     pa_tagstruct *reply;
1744     pa_sink *sink = NULL;
1745     pa_cvolume volume;
1746     pa_bool_t
1747         corked = FALSE,
1748         no_remap = FALSE,
1749         no_remix = FALSE,
1750         fix_format = FALSE,
1751         fix_rate = FALSE,
1752         fix_channels = FALSE,
1753         no_move = FALSE,
1754         variable_rate = FALSE,
1755         muted = FALSE,
1756         adjust_latency = FALSE,
1757         early_requests = FALSE,
1758         dont_inhibit_auto_suspend = FALSE,
1759         muted_set = FALSE,
1760         fail_on_suspend = FALSE;
1761     pa_sink_input_flags_t flags = 0;
1762     pa_proplist *p;
1763     pa_bool_t volume_set = TRUE;
1764     int ret = PA_ERR_INVALID;
1765
1766     pa_native_connection_assert_ref(c);
1767     pa_assert(t);
1768     memset(&attr, 0, sizeof(attr));
1769
1770     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1771         pa_tagstruct_get(
1772                 t,
1773                 PA_TAG_SAMPLE_SPEC, &ss,
1774                 PA_TAG_CHANNEL_MAP, &map,
1775                 PA_TAG_U32, &sink_index,
1776                 PA_TAG_STRING, &sink_name,
1777                 PA_TAG_U32, &attr.maxlength,
1778                 PA_TAG_BOOLEAN, &corked,
1779                 PA_TAG_U32, &attr.tlength,
1780                 PA_TAG_U32, &attr.prebuf,
1781                 PA_TAG_U32, &attr.minreq,
1782                 PA_TAG_U32, &syncid,
1783                 PA_TAG_CVOLUME, &volume,
1784                 PA_TAG_INVALID) < 0) {
1785
1786         protocol_error(c);
1787         return;
1788     }
1789
1790     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1791     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1792     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1793     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1794     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1795     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1796     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1797     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1798
1799     p = pa_proplist_new();
1800
1801     if (name)
1802         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1803
1804     if (c->version >= 12)  {
1805         /* Since 0.9.8 the user can ask for a couple of additional flags */
1806
1807         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1808             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1809             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1810             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1811             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1812             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1813             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1814
1815             protocol_error(c);
1816             pa_proplist_free(p);
1817             return;
1818         }
1819     }
1820
1821     if (c->version >= 13) {
1822
1823         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1824             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1825             pa_tagstruct_get_proplist(t, p) < 0) {
1826             protocol_error(c);
1827             pa_proplist_free(p);
1828             return;
1829         }
1830     }
1831
1832     if (c->version >= 14) {
1833
1834         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1835             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1836             protocol_error(c);
1837             pa_proplist_free(p);
1838             return;
1839         }
1840     }
1841
1842     if (c->version >= 15) {
1843
1844         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1845             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1846             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1847             protocol_error(c);
1848             pa_proplist_free(p);
1849             return;
1850         }
1851     }
1852
1853     if (!pa_tagstruct_eof(t)) {
1854         protocol_error(c);
1855         pa_proplist_free(p);
1856         return;
1857     }
1858
1859     if (sink_index != PA_INVALID_INDEX) {
1860
1861         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1862             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1863             pa_proplist_free(p);
1864             return;
1865         }
1866
1867     } else if (sink_name) {
1868
1869         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1870             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1871             pa_proplist_free(p);
1872             return;
1873         }
1874     }
1875
1876     flags =
1877         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1878         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1879         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1880         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1881         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1882         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1883         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1884         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1885         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1886         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1887
1888     /* Only since protocol version 15 there's a seperate muted_set
1889      * flag. For older versions we synthesize it here */
1890     muted_set = muted_set || muted;
1891
1892     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1893     pa_proplist_free(p);
1894
1895     CHECK_VALIDITY(c->pstream, s, tag, ret);
1896
1897     reply = reply_new(tag);
1898     pa_tagstruct_putu32(reply, s->index);
1899     pa_assert(s->sink_input);
1900     pa_tagstruct_putu32(reply, s->sink_input->index);
1901     pa_tagstruct_putu32(reply, missing);
1902
1903 /*     pa_log("initial request is %u", missing); */
1904
1905     if (c->version >= 9) {
1906         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1907
1908         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1909         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1910         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1911         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1912     }
1913
1914     if (c->version >= 12) {
1915         /* Since 0.9.8 we support sending the chosen sample
1916          * spec/channel map/device/suspend status back to the
1917          * client */
1918
1919         pa_tagstruct_put_sample_spec(reply, &ss);
1920         pa_tagstruct_put_channel_map(reply, &map);
1921
1922         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1923         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1924
1925         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
1926     }
1927
1928     if (c->version >= 13)
1929         pa_tagstruct_put_usec(reply, s->sink_latency);
1930
1931     pa_pstream_send_tagstruct(c->pstream, reply);
1932 }
1933
1934 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1935     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1936     uint32_t channel;
1937
1938     pa_native_connection_assert_ref(c);
1939     pa_assert(t);
1940
1941     if (pa_tagstruct_getu32(t, &channel) < 0 ||
1942         !pa_tagstruct_eof(t)) {
1943         protocol_error(c);
1944         return;
1945     }
1946
1947     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1948
1949     switch (command) {
1950
1951         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
1952             playback_stream *s;
1953             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
1954                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
1955                 return;
1956             }
1957
1958             playback_stream_unlink(s);
1959             break;
1960         }
1961
1962         case PA_COMMAND_DELETE_RECORD_STREAM: {
1963             record_stream *s;
1964             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
1965                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
1966                 return;
1967             }
1968
1969             record_stream_unlink(s);
1970             break;
1971         }
1972
1973         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
1974             upload_stream *s;
1975
1976             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
1977                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
1978                 return;
1979             }
1980
1981             upload_stream_unlink(s);
1982             break;
1983         }
1984
1985         default:
1986             pa_assert_not_reached();
1987     }
1988
1989     pa_pstream_send_simple_ack(c->pstream, tag);
1990 }
1991
1992 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1993     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1994     record_stream *s;
1995     pa_buffer_attr attr;
1996     uint32_t source_index;
1997     const char *name = NULL, *source_name;
1998     pa_sample_spec ss;
1999     pa_channel_map map;
2000     pa_tagstruct *reply;
2001     pa_source *source = NULL;
2002     pa_bool_t
2003         corked = FALSE,
2004         no_remap = FALSE,
2005         no_remix = FALSE,
2006         fix_format = FALSE,
2007         fix_rate = FALSE,
2008         fix_channels = FALSE,
2009         no_move = FALSE,
2010         variable_rate = FALSE,
2011         adjust_latency = FALSE,
2012         peak_detect = FALSE,
2013         early_requests = FALSE,
2014         dont_inhibit_auto_suspend = FALSE,
2015         fail_on_suspend = FALSE;
2016     pa_source_output_flags_t flags = 0;
2017     pa_proplist *p;
2018     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2019     pa_sink_input *direct_on_input = NULL;
2020     int ret = PA_ERR_INVALID;
2021
2022     pa_native_connection_assert_ref(c);
2023     pa_assert(t);
2024
2025     memset(&attr, 0, sizeof(attr));
2026
2027     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2028         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2029         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2030         pa_tagstruct_getu32(t, &source_index) < 0 ||
2031         pa_tagstruct_gets(t, &source_name) < 0 ||
2032         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2033         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2034         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2035         protocol_error(c);
2036         return;
2037     }
2038
2039     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2040     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2041     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2042     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2043     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2044     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2045     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2046
2047     p = pa_proplist_new();
2048
2049     if (name)
2050         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2051
2052     if (c->version >= 12)  {
2053         /* Since 0.9.8 the user can ask for a couple of additional flags */
2054
2055         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2056             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2057             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2058             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2059             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2060             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2061             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2062
2063             protocol_error(c);
2064             pa_proplist_free(p);
2065             return;
2066         }
2067     }
2068
2069     if (c->version >= 13) {
2070
2071         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2072             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2073             pa_tagstruct_get_proplist(t, p) < 0 ||
2074             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2075             protocol_error(c);
2076             pa_proplist_free(p);
2077             return;
2078         }
2079     }
2080
2081     if (c->version >= 14) {
2082
2083         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2084             protocol_error(c);
2085             pa_proplist_free(p);
2086             return;
2087         }
2088     }
2089
2090     if (c->version >= 15) {
2091
2092         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2093             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2094             protocol_error(c);
2095             pa_proplist_free(p);
2096             return;
2097         }
2098     }
2099
2100     if (!pa_tagstruct_eof(t)) {
2101         protocol_error(c);
2102         pa_proplist_free(p);
2103         return;
2104     }
2105
2106     if (source_index != PA_INVALID_INDEX) {
2107
2108         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2109             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2110             pa_proplist_free(p);
2111             return;
2112         }
2113
2114     } else if (source_name) {
2115
2116         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2117             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2118             pa_proplist_free(p);
2119             return;
2120         }
2121     }
2122
2123     if (direct_on_input_idx != PA_INVALID_INDEX) {
2124
2125         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2126             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2127             pa_proplist_free(p);
2128             return;
2129         }
2130     }
2131
2132     flags =
2133         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2134         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2135         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2136         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2137         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2138         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2139         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2140         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2141         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2142         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2143
2144     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2145     pa_proplist_free(p);
2146
2147     CHECK_VALIDITY(c->pstream, s, tag, ret);
2148
2149     reply = reply_new(tag);
2150     pa_tagstruct_putu32(reply, s->index);
2151     pa_assert(s->source_output);
2152     pa_tagstruct_putu32(reply, s->source_output->index);
2153
2154     if (c->version >= 9) {
2155         /* Since 0.9 we support sending the buffer metrics back to the client */
2156
2157         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2158         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2159     }
2160
2161     if (c->version >= 12) {
2162         /* Since 0.9.8 we support sending the chosen sample
2163          * spec/channel map/device/suspend status back to the
2164          * client */
2165
2166         pa_tagstruct_put_sample_spec(reply, &ss);
2167         pa_tagstruct_put_channel_map(reply, &map);
2168
2169         pa_tagstruct_putu32(reply, s->source_output->source->index);
2170         pa_tagstruct_puts(reply, s->source_output->source->name);
2171
2172         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2173     }
2174
2175     if (c->version >= 13)
2176         pa_tagstruct_put_usec(reply, s->source_latency);
2177
2178     pa_pstream_send_tagstruct(c->pstream, reply);
2179 }
2180
2181 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2182     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2183     int ret;
2184
2185     pa_native_connection_assert_ref(c);
2186     pa_assert(t);
2187
2188     if (!pa_tagstruct_eof(t)) {
2189         protocol_error(c);
2190         return;
2191     }
2192
2193     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2194     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2195     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2196
2197     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2198 }
2199
2200 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2201     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2202     const void*cookie;
2203     pa_tagstruct *reply;
2204     pa_bool_t shm_on_remote = FALSE, do_shm;
2205
2206     pa_native_connection_assert_ref(c);
2207     pa_assert(t);
2208
2209     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2210         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2211         !pa_tagstruct_eof(t)) {
2212         protocol_error(c);
2213         return;
2214     }
2215
2216     /* Minimum supported version */
2217     if (c->version < 8) {
2218         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2219         return;
2220     }
2221
2222     /* Starting with protocol version 13 the MSB of the version tag
2223        reflects if shm is available for this pa_native_connection or
2224        not. */
2225     if (c->version >= 13) {
2226         shm_on_remote = !!(c->version & 0x80000000U);
2227         c->version &= 0x7FFFFFFFU;
2228     }
2229
2230     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2231
2232     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2233
2234     if (!c->authorized) {
2235         pa_bool_t success = FALSE;
2236
2237 #ifdef HAVE_CREDS
2238         const pa_creds *creds;
2239
2240         if ((creds = pa_pdispatch_creds(pd))) {
2241             if (creds->uid == getuid())
2242                 success = TRUE;
2243             else if (c->options->auth_group) {
2244                 int r;
2245                 gid_t gid;
2246
2247                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2248                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2249                 else if (gid == creds->gid)
2250                     success = TRUE;
2251
2252                 if (!success) {
2253                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2254                         pa_log_warn("Failed to check group membership.");
2255                     else if (r > 0)
2256                         success = TRUE;
2257                 }
2258             }
2259
2260             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2261                         (unsigned long) creds->uid,
2262                         (unsigned long) creds->gid,
2263                         (int) success);
2264         }
2265 #endif
2266
2267         if (!success && c->options->auth_cookie) {
2268             const uint8_t *ac;
2269
2270             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2271                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2272                     success = TRUE;
2273         }
2274
2275         if (!success) {
2276             pa_log_warn("Denied access to client with invalid authorization data.");
2277             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2278             return;
2279         }
2280
2281         c->authorized = TRUE;
2282         if (c->auth_timeout_event) {
2283             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2284             c->auth_timeout_event = NULL;
2285         }
2286     }
2287
2288     /* Enable shared memory support if possible */
2289     do_shm =
2290         pa_mempool_is_shared(c->protocol->core->mempool) &&
2291         c->is_local;
2292
2293     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2294
2295     if (do_shm)
2296         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2297             do_shm = FALSE;
2298
2299 #ifdef HAVE_CREDS
2300     if (do_shm) {
2301         /* Only enable SHM if both sides are owned by the same
2302          * user. This is a security measure because otherwise data
2303          * private to the user might leak. */
2304
2305         const pa_creds *creds;
2306         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2307             do_shm = FALSE;
2308     }
2309 #endif
2310
2311     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2312     pa_pstream_enable_shm(c->pstream, do_shm);
2313
2314     reply = reply_new(tag);
2315     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2316
2317 #ifdef HAVE_CREDS
2318 {
2319     /* SHM support is only enabled after both sides made sure they are the same user. */
2320
2321     pa_creds ucred;
2322
2323     ucred.uid = getuid();
2324     ucred.gid = getgid();
2325
2326     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2327 }
2328 #else
2329     pa_pstream_send_tagstruct(c->pstream, reply);
2330 #endif
2331 }
2332
2333 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2334     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2335     const char *name = NULL;
2336     pa_proplist *p;
2337     pa_tagstruct *reply;
2338
2339     pa_native_connection_assert_ref(c);
2340     pa_assert(t);
2341
2342     p = pa_proplist_new();
2343
2344     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2345         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2346         !pa_tagstruct_eof(t)) {
2347
2348         protocol_error(c);
2349         pa_proplist_free(p);
2350         return;
2351     }
2352
2353     if (name)
2354         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2355             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2356             pa_proplist_free(p);
2357             return;
2358         }
2359
2360     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2361     pa_proplist_free(p);
2362
2363     reply = reply_new(tag);
2364
2365     if (c->version >= 13)
2366         pa_tagstruct_putu32(reply, c->client->index);
2367
2368     pa_pstream_send_tagstruct(c->pstream, reply);
2369 }
2370
2371 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2372     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2373     const char *name;
2374     uint32_t idx = PA_IDXSET_INVALID;
2375
2376     pa_native_connection_assert_ref(c);
2377     pa_assert(t);
2378
2379     if (pa_tagstruct_gets(t, &name) < 0 ||
2380         !pa_tagstruct_eof(t)) {
2381         protocol_error(c);
2382         return;
2383     }
2384
2385     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2386     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2387
2388     if (command == PA_COMMAND_LOOKUP_SINK) {
2389         pa_sink *sink;
2390         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2391             idx = sink->index;
2392     } else {
2393         pa_source *source;
2394         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2395         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2396             idx = source->index;
2397     }
2398
2399     if (idx == PA_IDXSET_INVALID)
2400         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2401     else {
2402         pa_tagstruct *reply;
2403         reply = reply_new(tag);
2404         pa_tagstruct_putu32(reply, idx);
2405         pa_pstream_send_tagstruct(c->pstream, reply);
2406     }
2407 }
2408
2409 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2410     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2411     uint32_t idx;
2412     playback_stream *s;
2413
2414     pa_native_connection_assert_ref(c);
2415     pa_assert(t);
2416
2417     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2418         !pa_tagstruct_eof(t)) {
2419         protocol_error(c);
2420         return;
2421     }
2422
2423     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2424     s = pa_idxset_get_by_index(c->output_streams, idx);
2425     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2426     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2427
2428     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2429 }
2430
2431 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2432     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2433     pa_tagstruct *reply;
2434     const pa_mempool_stat *stat;
2435
2436     pa_native_connection_assert_ref(c);
2437     pa_assert(t);
2438
2439     if (!pa_tagstruct_eof(t)) {
2440         protocol_error(c);
2441         return;
2442     }
2443
2444     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2445
2446     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2447
2448     reply = reply_new(tag);
2449     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2450     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2451     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2452     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2453     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2454     pa_pstream_send_tagstruct(c->pstream, reply);
2455 }
2456
2457 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2458     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2459     pa_tagstruct *reply;
2460     playback_stream *s;
2461     struct timeval tv, now;
2462     uint32_t idx;
2463     pa_usec_t latency;
2464
2465     pa_native_connection_assert_ref(c);
2466     pa_assert(t);
2467
2468     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2469         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2470         !pa_tagstruct_eof(t)) {
2471         protocol_error(c);
2472         return;
2473     }
2474
2475     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2476     s = pa_idxset_get_by_index(c->output_streams, idx);
2477     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2478     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2479     CHECK_VALIDITY(c->pstream, pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0, tag, PA_ERR_NOENTITY)
2480
2481     reply = reply_new(tag);
2482
2483     latency = pa_sink_get_latency(s->sink_input->sink);
2484     latency += pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec);
2485
2486     pa_tagstruct_put_usec(reply, latency);
2487
2488     pa_tagstruct_put_usec(reply, 0);
2489     pa_tagstruct_put_boolean(reply, s->sink_input->thread_info.playing_for > 0);
2490     pa_tagstruct_put_timeval(reply, &tv);
2491     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2492     pa_tagstruct_puts64(reply, s->write_index);
2493     pa_tagstruct_puts64(reply, s->read_index);
2494
2495     if (c->version >= 13) {
2496         pa_tagstruct_putu64(reply, s->sink_input->thread_info.underrun_for);
2497         pa_tagstruct_putu64(reply, s->sink_input->thread_info.playing_for);
2498     }
2499
2500     pa_pstream_send_tagstruct(c->pstream, reply);
2501 }
2502
2503 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2504     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2505     pa_tagstruct *reply;
2506     record_stream *s;
2507     struct timeval tv, now;
2508     uint32_t idx;
2509
2510     pa_native_connection_assert_ref(c);
2511     pa_assert(t);
2512
2513     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2514         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2515         !pa_tagstruct_eof(t)) {
2516         protocol_error(c);
2517         return;
2518     }
2519
2520     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2521     s = pa_idxset_get_by_index(c->record_streams, idx);
2522     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2523
2524     reply = reply_new(tag);
2525     pa_tagstruct_put_usec(reply, s->source_output->source->monitor_of ? pa_sink_get_latency(s->source_output->source->monitor_of) : 0);
2526     pa_tagstruct_put_usec(reply, pa_source_get_latency(s->source_output->source));
2527     pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING);
2528     pa_tagstruct_put_timeval(reply, &tv);
2529     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2530     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2531     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2532     pa_pstream_send_tagstruct(c->pstream, reply);
2533 }
2534
2535 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2536     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2537     upload_stream *s;
2538     uint32_t length;
2539     const char *name = NULL;
2540     pa_sample_spec ss;
2541     pa_channel_map map;
2542     pa_tagstruct *reply;
2543     pa_proplist *p;
2544
2545     pa_native_connection_assert_ref(c);
2546     pa_assert(t);
2547
2548     if (pa_tagstruct_gets(t, &name) < 0 ||
2549         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2550         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2551         pa_tagstruct_getu32(t, &length) < 0) {
2552         protocol_error(c);
2553         return;
2554     }
2555
2556     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2557     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2558     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2559     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2560     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2561     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2562
2563     p = pa_proplist_new();
2564
2565     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2566         !pa_tagstruct_eof(t)) {
2567
2568         protocol_error(c);
2569         pa_proplist_free(p);
2570         return;
2571     }
2572
2573     if (c->version < 13)
2574         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2575     else if (!name)
2576         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2577             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2578
2579     if (!name || !pa_namereg_is_valid_name(name)) {
2580         pa_proplist_free(p);
2581         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2582     }
2583
2584     s = upload_stream_new(c, &ss, &map, name, length, p);
2585     pa_proplist_free(p);
2586
2587     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2588
2589     reply = reply_new(tag);
2590     pa_tagstruct_putu32(reply, s->index);
2591     pa_tagstruct_putu32(reply, length);
2592     pa_pstream_send_tagstruct(c->pstream, reply);
2593 }
2594
2595 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2596     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2597     uint32_t channel;
2598     upload_stream *s;
2599     uint32_t idx;
2600
2601     pa_native_connection_assert_ref(c);
2602     pa_assert(t);
2603
2604     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2605         !pa_tagstruct_eof(t)) {
2606         protocol_error(c);
2607         return;
2608     }
2609
2610     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2611
2612     s = pa_idxset_get_by_index(c->output_streams, channel);
2613     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2614     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2615
2616     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2617         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2618     else
2619         pa_pstream_send_simple_ack(c->pstream, tag);
2620
2621     upload_stream_unlink(s);
2622 }
2623
2624 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2625     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2626     uint32_t sink_index;
2627     pa_volume_t volume;
2628     pa_sink *sink;
2629     const char *name, *sink_name;
2630     uint32_t idx;
2631     pa_proplist *p;
2632     pa_tagstruct *reply;
2633
2634     pa_native_connection_assert_ref(c);
2635     pa_assert(t);
2636
2637     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2638
2639     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2640         pa_tagstruct_gets(t, &sink_name) < 0 ||
2641         pa_tagstruct_getu32(t, &volume) < 0 ||
2642         pa_tagstruct_gets(t, &name) < 0) {
2643         protocol_error(c);
2644         return;
2645     }
2646
2647     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2648     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2649     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2650     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2651
2652     if (sink_index != PA_INVALID_INDEX)
2653         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2654     else
2655         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2656
2657     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2658
2659     p = pa_proplist_new();
2660
2661     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2662         !pa_tagstruct_eof(t)) {
2663         protocol_error(c);
2664         pa_proplist_free(p);
2665         return;
2666     }
2667
2668     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2669
2670     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2671         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2672         pa_proplist_free(p);
2673         return;
2674     }
2675
2676     pa_proplist_free(p);
2677
2678     reply = reply_new(tag);
2679
2680     if (c->version >= 13)
2681         pa_tagstruct_putu32(reply, idx);
2682
2683     pa_pstream_send_tagstruct(c->pstream, reply);
2684 }
2685
2686 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2687     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2688     const char *name;
2689
2690     pa_native_connection_assert_ref(c);
2691     pa_assert(t);
2692
2693     if (pa_tagstruct_gets(t, &name) < 0 ||
2694         !pa_tagstruct_eof(t)) {
2695         protocol_error(c);
2696         return;
2697     }
2698
2699     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2700     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2701
2702     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2703         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2704         return;
2705     }
2706
2707     pa_pstream_send_simple_ack(c->pstream, tag);
2708 }
2709
2710 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2711     pa_assert(c);
2712     pa_assert(fixed);
2713     pa_assert(original);
2714
2715     *fixed = *original;
2716
2717     if (c->version < 12) {
2718         /* Before protocol version 12 we didn't support S32 samples,
2719          * so we need to lie about this to the client */
2720
2721         if (fixed->format == PA_SAMPLE_S32LE)
2722             fixed->format = PA_SAMPLE_FLOAT32LE;
2723         if (fixed->format == PA_SAMPLE_S32BE)
2724             fixed->format = PA_SAMPLE_FLOAT32BE;
2725     }
2726
2727     if (c->version < 15) {
2728         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2729             fixed->format = PA_SAMPLE_FLOAT32LE;
2730         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2731             fixed->format = PA_SAMPLE_FLOAT32BE;
2732     }
2733 }
2734
2735 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2736     pa_sample_spec fixed_ss;
2737
2738     pa_assert(t);
2739     pa_sink_assert_ref(sink);
2740
2741     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2742
2743     pa_tagstruct_put(
2744         t,
2745         PA_TAG_U32, sink->index,
2746         PA_TAG_STRING, sink->name,
2747         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2748         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2749         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2750         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2751         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
2752         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2753         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2754         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2755         PA_TAG_USEC, pa_sink_get_latency(sink),
2756         PA_TAG_STRING, sink->driver,
2757         PA_TAG_U32, sink->flags,
2758         PA_TAG_INVALID);
2759
2760     if (c->version >= 13) {
2761         pa_tagstruct_put_proplist(t, sink->proplist);
2762         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2763     }
2764
2765     if (c->version >= 15) {
2766         pa_tagstruct_put_volume(t, sink->base_volume);
2767         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2768             pa_log_error("Internal sink state is invalid.");
2769         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2770         pa_tagstruct_putu32(t, sink->n_volume_steps);
2771         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2772     }
2773 }
2774
2775 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2776     pa_sample_spec fixed_ss;
2777
2778     pa_assert(t);
2779     pa_source_assert_ref(source);
2780
2781     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2782
2783     pa_tagstruct_put(
2784         t,
2785         PA_TAG_U32, source->index,
2786         PA_TAG_STRING, source->name,
2787         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2788         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2789         PA_TAG_CHANNEL_MAP, &source->channel_map,
2790         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2791         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2792         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2793         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2794         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2795         PA_TAG_USEC, pa_source_get_latency(source),
2796         PA_TAG_STRING, source->driver,
2797         PA_TAG_U32, source->flags,
2798         PA_TAG_INVALID);
2799
2800     if (c->version >= 13) {
2801         pa_tagstruct_put_proplist(t, source->proplist);
2802         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2803     }
2804
2805     if (c->version >= 15) {
2806         pa_tagstruct_put_volume(t, source->base_volume);
2807         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2808             pa_log_error("Internal source state is invalid.");
2809         pa_tagstruct_putu32(t, pa_source_get_state(source));
2810         pa_tagstruct_putu32(t, source->n_volume_steps);
2811         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2812     }
2813 }
2814
2815 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2816     pa_assert(t);
2817     pa_assert(client);
2818
2819     pa_tagstruct_putu32(t, client->index);
2820     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2821     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2822     pa_tagstruct_puts(t, client->driver);
2823
2824     if (c->version >= 13)
2825         pa_tagstruct_put_proplist(t, client->proplist);
2826 }
2827
2828 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2829     void *state = NULL;
2830     pa_card_profile *p;
2831
2832     pa_assert(t);
2833     pa_assert(card);
2834
2835     pa_tagstruct_putu32(t, card->index);
2836     pa_tagstruct_puts(t, card->name);
2837     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2838     pa_tagstruct_puts(t, card->driver);
2839
2840     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2841
2842     if (card->profiles) {
2843         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2844             pa_tagstruct_puts(t, p->name);
2845             pa_tagstruct_puts(t, p->description);
2846             pa_tagstruct_putu32(t, p->n_sinks);
2847             pa_tagstruct_putu32(t, p->n_sources);
2848             pa_tagstruct_putu32(t, p->priority);
2849         }
2850     }
2851
2852     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2853     pa_tagstruct_put_proplist(t, card->proplist);
2854 }
2855
2856 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2857     pa_assert(t);
2858     pa_assert(module);
2859
2860     pa_tagstruct_putu32(t, module->index);
2861     pa_tagstruct_puts(t, module->name);
2862     pa_tagstruct_puts(t, module->argument);
2863     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2864
2865     if (c->version < 15)
2866         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2867
2868     if (c->version >= 15)
2869         pa_tagstruct_put_proplist(t, module->proplist);
2870 }
2871
2872 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2873     pa_sample_spec fixed_ss;
2874     pa_usec_t sink_latency;
2875
2876     pa_assert(t);
2877     pa_sink_input_assert_ref(s);
2878
2879     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2880
2881     pa_tagstruct_putu32(t, s->index);
2882     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2883     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2884     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2885     pa_tagstruct_putu32(t, s->sink->index);
2886     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2887     pa_tagstruct_put_channel_map(t, &s->channel_map);
2888     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s));
2889     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
2890     pa_tagstruct_put_usec(t, sink_latency);
2891     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
2892     pa_tagstruct_puts(t, s->driver);
2893     if (c->version >= 11)
2894         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
2895     if (c->version >= 13)
2896         pa_tagstruct_put_proplist(t, s->proplist);
2897 }
2898
2899 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
2900     pa_sample_spec fixed_ss;
2901     pa_usec_t source_latency;
2902
2903     pa_assert(t);
2904     pa_source_output_assert_ref(s);
2905
2906     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2907
2908     pa_tagstruct_putu32(t, s->index);
2909     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2910     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2911     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2912     pa_tagstruct_putu32(t, s->source->index);
2913     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2914     pa_tagstruct_put_channel_map(t, &s->channel_map);
2915     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
2916     pa_tagstruct_put_usec(t, source_latency);
2917     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
2918     pa_tagstruct_puts(t, s->driver);
2919
2920     if (c->version >= 13)
2921         pa_tagstruct_put_proplist(t, s->proplist);
2922 }
2923
2924 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
2925     pa_sample_spec fixed_ss;
2926     pa_cvolume v;
2927
2928     pa_assert(t);
2929     pa_assert(e);
2930
2931     if (e->memchunk.memblock)
2932         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
2933     else
2934         memset(&fixed_ss, 0, sizeof(fixed_ss));
2935
2936     pa_tagstruct_putu32(t, e->index);
2937     pa_tagstruct_puts(t, e->name);
2938
2939     if (e->volume_is_set)
2940         v = e->volume;
2941     else
2942         pa_cvolume_init(&v);
2943
2944     pa_tagstruct_put_cvolume(t, &v);
2945     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
2946     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2947     pa_tagstruct_put_channel_map(t, &e->channel_map);
2948     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
2949     pa_tagstruct_put_boolean(t, e->lazy);
2950     pa_tagstruct_puts(t, e->filename);
2951
2952     if (c->version >= 13)
2953         pa_tagstruct_put_proplist(t, e->proplist);
2954 }
2955
2956 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2957     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2958     uint32_t idx;
2959     pa_sink *sink = NULL;
2960     pa_source *source = NULL;
2961     pa_client *client = NULL;
2962     pa_card *card = NULL;
2963     pa_module *module = NULL;
2964     pa_sink_input *si = NULL;
2965     pa_source_output *so = NULL;
2966     pa_scache_entry *sce = NULL;
2967     const char *name = NULL;
2968     pa_tagstruct *reply;
2969
2970     pa_native_connection_assert_ref(c);
2971     pa_assert(t);
2972
2973     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2974         (command != PA_COMMAND_GET_CLIENT_INFO &&
2975          command != PA_COMMAND_GET_MODULE_INFO &&
2976          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
2977          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
2978          pa_tagstruct_gets(t, &name) < 0) ||
2979         !pa_tagstruct_eof(t)) {
2980         protocol_error(c);
2981         return;
2982     }
2983
2984     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2985     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2986     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
2987     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
2988     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2989
2990     if (command == PA_COMMAND_GET_SINK_INFO) {
2991         if (idx != PA_INVALID_INDEX)
2992             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
2993         else
2994             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
2995     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
2996         if (idx != PA_INVALID_INDEX)
2997             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
2998         else
2999             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3000     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3001         if (idx != PA_INVALID_INDEX)
3002             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3003         else
3004             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3005     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3006         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3007     else if (command == PA_COMMAND_GET_MODULE_INFO)
3008         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3009     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3010         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3011     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3012         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3013     else {
3014         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3015         if (idx != PA_INVALID_INDEX)
3016             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3017         else
3018             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3019     }
3020
3021     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3022         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3023         return;
3024     }
3025
3026     reply = reply_new(tag);
3027     if (sink)
3028         sink_fill_tagstruct(c, reply, sink);
3029     else if (source)
3030         source_fill_tagstruct(c, reply, source);
3031     else if (client)
3032         client_fill_tagstruct(c, reply, client);
3033     else if (card)
3034         card_fill_tagstruct(c, reply, card);
3035     else if (module)
3036         module_fill_tagstruct(c, reply, module);
3037     else if (si)
3038         sink_input_fill_tagstruct(c, reply, si);
3039     else if (so)
3040         source_output_fill_tagstruct(c, reply, so);
3041     else
3042         scache_fill_tagstruct(c, reply, sce);
3043     pa_pstream_send_tagstruct(c->pstream, reply);
3044 }
3045
3046 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3047     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3048     pa_idxset *i;
3049     uint32_t idx;
3050     void *p;
3051     pa_tagstruct *reply;
3052
3053     pa_native_connection_assert_ref(c);
3054     pa_assert(t);
3055
3056     if (!pa_tagstruct_eof(t)) {
3057         protocol_error(c);
3058         return;
3059     }
3060
3061     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3062
3063     reply = reply_new(tag);
3064
3065     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3066         i = c->protocol->core->sinks;
3067     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3068         i = c->protocol->core->sources;
3069     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3070         i = c->protocol->core->clients;
3071     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3072         i = c->protocol->core->cards;
3073     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3074         i = c->protocol->core->modules;
3075     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3076         i = c->protocol->core->sink_inputs;
3077     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3078         i = c->protocol->core->source_outputs;
3079     else {
3080         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3081         i = c->protocol->core->scache;
3082     }
3083
3084     if (i) {
3085         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3086             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3087                 sink_fill_tagstruct(c, reply, p);
3088             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3089                 source_fill_tagstruct(c, reply, p);
3090             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3091                 client_fill_tagstruct(c, reply, p);
3092             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3093                 card_fill_tagstruct(c, reply, p);
3094             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3095                 module_fill_tagstruct(c, reply, p);
3096             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3097                 sink_input_fill_tagstruct(c, reply, p);
3098             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3099                 source_output_fill_tagstruct(c, reply, p);
3100             else {
3101                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3102                 scache_fill_tagstruct(c, reply, p);
3103             }
3104         }
3105     }
3106
3107     pa_pstream_send_tagstruct(c->pstream, reply);
3108 }
3109
3110 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3111     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3112     pa_tagstruct *reply;
3113     char txt[256];
3114     pa_sink *def_sink;
3115     pa_source *def_source;
3116     pa_sample_spec fixed_ss;
3117
3118     pa_native_connection_assert_ref(c);
3119     pa_assert(t);
3120
3121     if (!pa_tagstruct_eof(t)) {
3122         protocol_error(c);
3123         return;
3124     }
3125
3126     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3127
3128     reply = reply_new(tag);
3129     pa_tagstruct_puts(reply, PACKAGE_NAME);
3130     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3131     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
3132     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
3133
3134     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3135     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3136
3137     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3138     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3139     def_source = pa_namereg_get_default_source(c->protocol->core);
3140     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3141
3142     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3143
3144     if (c->version >= 15)
3145         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3146
3147     pa_pstream_send_tagstruct(c->pstream, reply);
3148 }
3149
3150 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3151     pa_tagstruct *t;
3152     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3153
3154     pa_native_connection_assert_ref(c);
3155
3156     t = pa_tagstruct_new(NULL, 0);
3157     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3158     pa_tagstruct_putu32(t, (uint32_t) -1);
3159     pa_tagstruct_putu32(t, e);
3160     pa_tagstruct_putu32(t, idx);
3161     pa_pstream_send_tagstruct(c->pstream, t);
3162 }
3163
3164 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3165     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3166     pa_subscription_mask_t m;
3167
3168     pa_native_connection_assert_ref(c);
3169     pa_assert(t);
3170
3171     if (pa_tagstruct_getu32(t, &m) < 0 ||
3172         !pa_tagstruct_eof(t)) {
3173         protocol_error(c);
3174         return;
3175     }
3176
3177     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3178     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3179
3180     if (c->subscription)
3181         pa_subscription_free(c->subscription);
3182
3183     if (m != 0) {
3184         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3185         pa_assert(c->subscription);
3186     } else
3187         c->subscription = NULL;
3188
3189     pa_pstream_send_simple_ack(c->pstream, tag);
3190 }
3191
3192 static void command_set_volume(
3193         pa_pdispatch *pd,
3194         uint32_t command,
3195         uint32_t tag,
3196         pa_tagstruct *t,
3197         void *userdata) {
3198
3199     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3200     uint32_t idx;
3201     pa_cvolume volume;
3202     pa_sink *sink = NULL;
3203     pa_source *source = NULL;
3204     pa_sink_input *si = NULL;
3205     const char *name = NULL;
3206
3207     pa_native_connection_assert_ref(c);
3208     pa_assert(t);
3209
3210     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3211         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3212         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3213         pa_tagstruct_get_cvolume(t, &volume) ||
3214         !pa_tagstruct_eof(t)) {
3215         protocol_error(c);
3216         return;
3217     }
3218
3219     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3220     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3221     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3222     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3223     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3224     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3225
3226     switch (command) {
3227
3228         case PA_COMMAND_SET_SINK_VOLUME:
3229             if (idx != PA_INVALID_INDEX)
3230                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3231             else
3232                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3233             break;
3234
3235         case PA_COMMAND_SET_SOURCE_VOLUME:
3236             if (idx != PA_INVALID_INDEX)
3237                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3238             else
3239                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3240             break;
3241
3242         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3243             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3244             break;
3245
3246         default:
3247             pa_assert_not_reached();
3248     }
3249
3250     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3251
3252     if (sink)
3253         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3254     else if (source)
3255         pa_source_set_volume(source, &volume);
3256     else if (si)
3257         pa_sink_input_set_volume(si, &volume, TRUE);
3258
3259     pa_pstream_send_simple_ack(c->pstream, tag);
3260 }
3261
3262 static void command_set_mute(
3263         pa_pdispatch *pd,
3264         uint32_t command,
3265         uint32_t tag,
3266         pa_tagstruct *t,
3267         void *userdata) {
3268
3269     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3270     uint32_t idx;
3271     pa_bool_t mute;
3272     pa_sink *sink = NULL;
3273     pa_source *source = NULL;
3274     pa_sink_input *si = NULL;
3275     const char *name = NULL;
3276
3277     pa_native_connection_assert_ref(c);
3278     pa_assert(t);
3279
3280     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3281         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3282         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3283         pa_tagstruct_get_boolean(t, &mute) ||
3284         !pa_tagstruct_eof(t)) {
3285         protocol_error(c);
3286         return;
3287     }
3288
3289     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3290     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3291     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3292     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3293     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3294
3295     switch (command) {
3296
3297         case PA_COMMAND_SET_SINK_MUTE:
3298
3299             if (idx != PA_INVALID_INDEX)
3300                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3301             else
3302                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3303
3304             break;
3305
3306         case PA_COMMAND_SET_SOURCE_MUTE:
3307             if (idx != PA_INVALID_INDEX)
3308                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3309             else
3310                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3311
3312             break;
3313
3314         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3315             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3316             break;
3317
3318         default:
3319             pa_assert_not_reached();
3320     }
3321
3322     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3323
3324     if (sink)
3325         pa_sink_set_mute(sink, mute);
3326     else if (source)
3327         pa_source_set_mute(source, mute);
3328     else if (si)
3329         pa_sink_input_set_mute(si, mute, TRUE);
3330
3331     pa_pstream_send_simple_ack(c->pstream, tag);
3332 }
3333
3334 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3335     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3336     uint32_t idx;
3337     pa_bool_t b;
3338     playback_stream *s;
3339
3340     pa_native_connection_assert_ref(c);
3341     pa_assert(t);
3342
3343     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3344         pa_tagstruct_get_boolean(t, &b) < 0 ||
3345         !pa_tagstruct_eof(t)) {
3346         protocol_error(c);
3347         return;
3348     }
3349
3350     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3351     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3352     s = pa_idxset_get_by_index(c->output_streams, idx);
3353     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3354     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3355
3356     pa_sink_input_cork(s->sink_input, b);
3357
3358     if (b)
3359         s->is_underrun = TRUE;
3360
3361     pa_pstream_send_simple_ack(c->pstream, tag);
3362 }
3363
3364 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3365     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3366     uint32_t idx;
3367     playback_stream *s;
3368
3369     pa_native_connection_assert_ref(c);
3370     pa_assert(t);
3371
3372     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3373         !pa_tagstruct_eof(t)) {
3374         protocol_error(c);
3375         return;
3376     }
3377
3378     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3379     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3380     s = pa_idxset_get_by_index(c->output_streams, idx);
3381     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3382     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3383
3384     switch (command) {
3385         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3386             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3387             break;
3388
3389         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3390             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3391             break;
3392
3393         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3394             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3395             break;
3396
3397         default:
3398             pa_assert_not_reached();
3399     }
3400
3401     pa_pstream_send_simple_ack(c->pstream, tag);
3402 }
3403
3404 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3405     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3406     uint32_t idx;
3407     record_stream *s;
3408     pa_bool_t b;
3409
3410     pa_native_connection_assert_ref(c);
3411     pa_assert(t);
3412
3413     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3414         pa_tagstruct_get_boolean(t, &b) < 0 ||
3415         !pa_tagstruct_eof(t)) {
3416         protocol_error(c);
3417         return;
3418     }
3419
3420     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3421     s = pa_idxset_get_by_index(c->record_streams, idx);
3422     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3423
3424     pa_source_output_cork(s->source_output, b);
3425     pa_memblockq_prebuf_force(s->memblockq);
3426     pa_pstream_send_simple_ack(c->pstream, tag);
3427 }
3428
3429 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3430     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3431     uint32_t idx;
3432     record_stream *s;
3433
3434     pa_native_connection_assert_ref(c);
3435     pa_assert(t);
3436
3437     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3438         !pa_tagstruct_eof(t)) {
3439         protocol_error(c);
3440         return;
3441     }
3442
3443     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3444     s = pa_idxset_get_by_index(c->record_streams, idx);
3445     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3446
3447     pa_memblockq_flush_read(s->memblockq);
3448     pa_pstream_send_simple_ack(c->pstream, tag);
3449 }
3450
3451 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3452     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3453     uint32_t idx;
3454     pa_buffer_attr a;
3455     pa_tagstruct *reply;
3456
3457     pa_native_connection_assert_ref(c);
3458     pa_assert(t);
3459
3460     memset(&a, 0, sizeof(a));
3461
3462     if (pa_tagstruct_getu32(t, &idx) < 0) {
3463         protocol_error(c);
3464         return;
3465     }
3466
3467     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3468
3469     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3470         playback_stream *s;
3471         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3472
3473         s = pa_idxset_get_by_index(c->output_streams, idx);
3474         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3475         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3476
3477         if (pa_tagstruct_get(
3478                     t,
3479                     PA_TAG_U32, &a.maxlength,
3480                     PA_TAG_U32, &a.tlength,
3481                     PA_TAG_U32, &a.prebuf,
3482                     PA_TAG_U32, &a.minreq,
3483                     PA_TAG_INVALID) < 0 ||
3484             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3485             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3486             !pa_tagstruct_eof(t)) {
3487             protocol_error(c);
3488             return;
3489         }
3490
3491         s->adjust_latency = adjust_latency;
3492         s->early_requests = early_requests;
3493         s->buffer_attr = a;
3494
3495         fix_playback_buffer_attr(s);
3496         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3497
3498         reply = reply_new(tag);
3499         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3500         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3501         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3502         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3503
3504         if (c->version >= 13)
3505             pa_tagstruct_put_usec(reply, s->sink_latency);
3506
3507     } else {
3508         record_stream *s;
3509         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3510         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3511
3512         s = pa_idxset_get_by_index(c->record_streams, idx);
3513         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3514
3515         if (pa_tagstruct_get(
3516                     t,
3517                     PA_TAG_U32, &a.maxlength,
3518                     PA_TAG_U32, &a.fragsize,
3519                     PA_TAG_INVALID) < 0 ||
3520             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3521             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3522             !pa_tagstruct_eof(t)) {
3523             protocol_error(c);
3524             return;
3525         }
3526
3527         s->adjust_latency = adjust_latency;
3528         s->early_requests = early_requests;
3529         s->buffer_attr = a;
3530
3531         fix_record_buffer_attr_pre(s);
3532         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3533         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3534         fix_record_buffer_attr_post(s);
3535
3536         reply = reply_new(tag);
3537         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3538         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3539
3540         if (c->version >= 13)
3541             pa_tagstruct_put_usec(reply, s->source_latency);
3542     }
3543
3544     pa_pstream_send_tagstruct(c->pstream, reply);
3545 }
3546
3547 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3548     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3549     uint32_t idx;
3550     uint32_t rate;
3551
3552     pa_native_connection_assert_ref(c);
3553     pa_assert(t);
3554
3555     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3556         pa_tagstruct_getu32(t, &rate) < 0 ||
3557         !pa_tagstruct_eof(t)) {
3558         protocol_error(c);
3559         return;
3560     }
3561
3562     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3563     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3564
3565     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3566         playback_stream *s;
3567
3568         s = pa_idxset_get_by_index(c->output_streams, idx);
3569         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3570         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3571
3572         pa_sink_input_set_rate(s->sink_input, rate);
3573
3574     } else {
3575         record_stream *s;
3576         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3577
3578         s = pa_idxset_get_by_index(c->record_streams, idx);
3579         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3580
3581         pa_source_output_set_rate(s->source_output, rate);
3582     }
3583
3584     pa_pstream_send_simple_ack(c->pstream, tag);
3585 }
3586
3587 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3588     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3589     uint32_t idx;
3590     uint32_t mode;
3591     pa_proplist *p;
3592
3593     pa_native_connection_assert_ref(c);
3594     pa_assert(t);
3595
3596     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3597
3598     p = pa_proplist_new();
3599
3600     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3601
3602         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3603             pa_tagstruct_get_proplist(t, p) < 0 ||
3604             !pa_tagstruct_eof(t)) {
3605             protocol_error(c);
3606             pa_proplist_free(p);
3607             return;
3608         }
3609
3610     } else {
3611
3612         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3613             pa_tagstruct_getu32(t, &mode) < 0 ||
3614             pa_tagstruct_get_proplist(t, p) < 0 ||
3615             !pa_tagstruct_eof(t)) {
3616             protocol_error(c);
3617             pa_proplist_free(p);
3618             return;
3619         }
3620     }
3621
3622     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3623         pa_proplist_free(p);
3624         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3625     }
3626
3627     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3628         playback_stream *s;
3629
3630         s = pa_idxset_get_by_index(c->output_streams, idx);
3631         if (!s || !playback_stream_isinstance(s)) {
3632             pa_proplist_free(p);
3633             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3634         }
3635         pa_sink_input_update_proplist(s->sink_input, mode, p);
3636
3637     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3638         record_stream *s;
3639
3640         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3641             pa_proplist_free(p);
3642             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3643         }
3644         pa_source_output_update_proplist(s->source_output, mode, p);
3645
3646     } else {
3647         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3648
3649         pa_client_update_proplist(c->client, mode, p);
3650     }
3651
3652     pa_pstream_send_simple_ack(c->pstream, tag);
3653     pa_proplist_free(p);
3654 }
3655
3656 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3657     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3658     uint32_t idx;
3659     unsigned changed = 0;
3660     pa_proplist *p;
3661     pa_strlist *l = NULL;
3662
3663     pa_native_connection_assert_ref(c);
3664     pa_assert(t);
3665
3666     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3667
3668     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3669
3670         if (pa_tagstruct_getu32(t, &idx) < 0) {
3671             protocol_error(c);
3672             return;
3673         }
3674     }
3675
3676     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3677         playback_stream *s;
3678
3679         s = pa_idxset_get_by_index(c->output_streams, idx);
3680         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3681         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3682
3683         p = s->sink_input->proplist;
3684
3685     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3686         record_stream *s;
3687
3688         s = pa_idxset_get_by_index(c->record_streams, idx);
3689         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3690
3691         p = s->source_output->proplist;
3692     } else {
3693         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3694
3695         p = c->client->proplist;
3696     }
3697
3698     for (;;) {
3699         const char *k;
3700
3701         if (pa_tagstruct_gets(t, &k) < 0) {
3702             protocol_error(c);
3703             pa_strlist_free(l);
3704             return;
3705         }
3706
3707         if (!k)
3708             break;
3709
3710         l = pa_strlist_prepend(l, k);
3711     }
3712
3713     if (!pa_tagstruct_eof(t)) {
3714         protocol_error(c);
3715         pa_strlist_free(l);
3716         return;
3717     }
3718
3719     for (;;) {
3720         char *z;
3721
3722         l = pa_strlist_pop(l, &z);
3723
3724         if (!z)
3725             break;
3726
3727         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3728         pa_xfree(z);
3729     }
3730
3731     pa_pstream_send_simple_ack(c->pstream, tag);
3732
3733     if (changed) {
3734         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3735             playback_stream *s;
3736
3737             s = pa_idxset_get_by_index(c->output_streams, idx);
3738             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3739
3740         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3741             record_stream *s;
3742
3743             s = pa_idxset_get_by_index(c->record_streams, idx);
3744             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3745
3746         } else {
3747             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3748             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3749         }
3750     }
3751 }
3752
3753 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3754     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3755     const char *s;
3756
3757     pa_native_connection_assert_ref(c);
3758     pa_assert(t);
3759
3760     if (pa_tagstruct_gets(t, &s) < 0 ||
3761         !pa_tagstruct_eof(t)) {
3762         protocol_error(c);
3763         return;
3764     }
3765
3766     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3767     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3768
3769     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3770         pa_source *source;
3771
3772         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3773         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3774
3775         pa_namereg_set_default_source(c->protocol->core, source);
3776     } else {
3777         pa_sink *sink;
3778         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3779
3780         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3781         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3782
3783         pa_namereg_set_default_sink(c->protocol->core, sink);
3784     }
3785
3786     pa_pstream_send_simple_ack(c->pstream, tag);
3787 }
3788
3789 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3790     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3791     uint32_t idx;
3792     const char *name;
3793
3794     pa_native_connection_assert_ref(c);
3795     pa_assert(t);
3796
3797     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3798         pa_tagstruct_gets(t, &name) < 0 ||
3799         !pa_tagstruct_eof(t)) {
3800         protocol_error(c);
3801         return;
3802     }
3803
3804     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3805     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3806
3807     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3808         playback_stream *s;
3809
3810         s = pa_idxset_get_by_index(c->output_streams, idx);
3811         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3812         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3813
3814         pa_sink_input_set_name(s->sink_input, name);
3815
3816     } else {
3817         record_stream *s;
3818         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3819
3820         s = pa_idxset_get_by_index(c->record_streams, idx);
3821         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3822
3823         pa_source_output_set_name(s->source_output, name);
3824     }
3825
3826     pa_pstream_send_simple_ack(c->pstream, tag);
3827 }
3828
3829 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3830     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3831     uint32_t idx;
3832
3833     pa_native_connection_assert_ref(c);
3834     pa_assert(t);
3835
3836     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3837         !pa_tagstruct_eof(t)) {
3838         protocol_error(c);
3839         return;
3840     }
3841
3842     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3843
3844     if (command == PA_COMMAND_KILL_CLIENT) {
3845         pa_client *client;
3846
3847         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3848         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3849
3850         pa_native_connection_ref(c);
3851         pa_client_kill(client);
3852
3853     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3854         pa_sink_input *s;
3855
3856         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3857         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3858
3859         pa_native_connection_ref(c);
3860         pa_sink_input_kill(s);
3861     } else {
3862         pa_source_output *s;
3863
3864         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
3865
3866         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3867         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3868
3869         pa_native_connection_ref(c);
3870         pa_source_output_kill(s);
3871     }
3872
3873     pa_pstream_send_simple_ack(c->pstream, tag);
3874     pa_native_connection_unref(c);
3875 }
3876
3877 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3878     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3879     pa_module *m;
3880     const char *name, *argument;
3881     pa_tagstruct *reply;
3882
3883     pa_native_connection_assert_ref(c);
3884     pa_assert(t);
3885
3886     if (pa_tagstruct_gets(t, &name) < 0 ||
3887         pa_tagstruct_gets(t, &argument) < 0 ||
3888         !pa_tagstruct_eof(t)) {
3889         protocol_error(c);
3890         return;
3891     }
3892
3893     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3894     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
3895     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
3896
3897     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
3898         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
3899         return;
3900     }
3901
3902     reply = reply_new(tag);
3903     pa_tagstruct_putu32(reply, m->index);
3904     pa_pstream_send_tagstruct(c->pstream, reply);
3905 }
3906
3907 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3908     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3909     uint32_t idx;
3910     pa_module *m;
3911
3912     pa_native_connection_assert_ref(c);
3913     pa_assert(t);
3914
3915     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3916         !pa_tagstruct_eof(t)) {
3917         protocol_error(c);
3918         return;
3919     }
3920
3921     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3922     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3923     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
3924
3925     pa_module_unload_request(m, FALSE);
3926     pa_pstream_send_simple_ack(c->pstream, tag);
3927 }
3928
3929 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3930     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3931     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
3932     const char *name_device = NULL;
3933
3934     pa_native_connection_assert_ref(c);
3935     pa_assert(t);
3936
3937     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3938         pa_tagstruct_getu32(t, &idx_device) < 0 ||
3939         pa_tagstruct_gets(t, &name_device) < 0 ||
3940         !pa_tagstruct_eof(t)) {
3941         protocol_error(c);
3942         return;
3943     }
3944
3945     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3946     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3947
3948     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
3949     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
3950     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
3951     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3952
3953     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
3954         pa_sink_input *si = NULL;
3955         pa_sink *sink = NULL;
3956
3957         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3958
3959         if (idx_device != PA_INVALID_INDEX)
3960             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
3961         else
3962             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
3963
3964         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
3965
3966         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
3967             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
3968             return;
3969         }
3970     } else {
3971         pa_source_output *so = NULL;
3972         pa_source *source;
3973
3974         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
3975
3976         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3977
3978         if (idx_device != PA_INVALID_INDEX)
3979             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
3980         else
3981             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
3982
3983         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
3984
3985         if (pa_source_output_move_to(so, source, TRUE) < 0) {
3986             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
3987             return;
3988         }
3989     }
3990
3991     pa_pstream_send_simple_ack(c->pstream, tag);
3992 }
3993
3994 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3995     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3996     uint32_t idx = PA_INVALID_INDEX;
3997     const char *name = NULL;
3998     pa_bool_t b;
3999
4000     pa_native_connection_assert_ref(c);
4001     pa_assert(t);
4002
4003     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4004         pa_tagstruct_gets(t, &name) < 0 ||
4005         pa_tagstruct_get_boolean(t, &b) < 0 ||
4006         !pa_tagstruct_eof(t)) {
4007         protocol_error(c);
4008         return;
4009     }
4010
4011     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4012     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4013     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4014     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4015     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4016
4017     if (command == PA_COMMAND_SUSPEND_SINK) {
4018
4019         if (idx == PA_INVALID_INDEX && name && !*name) {
4020
4021             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4022
4023             if (pa_sink_suspend_all(c->protocol->core, b) < 0) {
4024                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4025                 return;
4026             }
4027         } else {
4028             pa_sink *sink = NULL;
4029
4030             if (idx != PA_INVALID_INDEX)
4031                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4032             else
4033                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4034
4035             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4036
4037             if (pa_sink_suspend(sink, b) < 0) {
4038                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4039                 return;
4040             }
4041         }
4042     } else {
4043
4044         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4045
4046         if (idx == PA_INVALID_INDEX && name && !*name) {
4047
4048             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4049
4050             if (pa_source_suspend_all(c->protocol->core, b) < 0) {
4051                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4052                 return;
4053             }
4054
4055         } else {
4056             pa_source *source;
4057
4058             if (idx != PA_INVALID_INDEX)
4059                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4060             else
4061                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4062
4063             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4064
4065             if (pa_source_suspend(source, b) < 0) {
4066                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4067                 return;
4068             }
4069         }
4070     }
4071
4072     pa_pstream_send_simple_ack(c->pstream, tag);
4073 }
4074
4075 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4076     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4077     uint32_t idx = PA_INVALID_INDEX;
4078     const char *name = NULL;
4079     pa_module *m;
4080     pa_native_protocol_ext_cb_t cb;
4081
4082     pa_native_connection_assert_ref(c);
4083     pa_assert(t);
4084
4085     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4086         pa_tagstruct_gets(t, &name) < 0) {
4087         protocol_error(c);
4088         return;
4089     }
4090
4091     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4092     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4093     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4094     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4095     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4096
4097     if (idx != PA_INVALID_INDEX)
4098         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4099     else {
4100         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4101             if (strcmp(name, m->name) == 0)
4102                 break;
4103     }
4104
4105     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4106     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4107
4108     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4109     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4110
4111     if (cb(c->protocol, m, c, tag, t) < 0)
4112         protocol_error(c);
4113 }
4114
4115 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4116     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4117     uint32_t idx = PA_INVALID_INDEX;
4118     const char *name = NULL, *profile = NULL;
4119     pa_card *card = NULL;
4120
4121     pa_native_connection_assert_ref(c);
4122     pa_assert(t);
4123
4124     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4125         pa_tagstruct_gets(t, &name) < 0 ||
4126         pa_tagstruct_gets(t, &profile) < 0 ||
4127         !pa_tagstruct_eof(t)) {
4128         protocol_error(c);
4129         return;
4130     }
4131
4132     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4133     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4134     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4135     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4136     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4137
4138     if (idx != PA_INVALID_INDEX)
4139         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4140     else
4141         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4142
4143     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4144
4145     if (pa_card_set_profile(card, profile, TRUE) < 0) {
4146         pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4147         return;
4148     }
4149
4150     pa_pstream_send_simple_ack(c->pstream, tag);
4151 }
4152
4153 /*** pstream callbacks ***/
4154
4155 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4156     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4157
4158     pa_assert(p);
4159     pa_assert(packet);
4160     pa_native_connection_assert_ref(c);
4161
4162     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4163         pa_log("invalid packet.");
4164         native_connection_unlink(c);
4165     }
4166 }
4167
4168 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4169     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4170     output_stream *stream;
4171
4172     pa_assert(p);
4173     pa_assert(chunk);
4174     pa_native_connection_assert_ref(c);
4175
4176     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4177         pa_log("client sent block for invalid stream.");
4178         /* Ignoring */
4179         return;
4180     }
4181
4182 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4183
4184     if (playback_stream_isinstance(stream)) {
4185         playback_stream *ps = PLAYBACK_STREAM(stream);
4186
4187         if (chunk->memblock) {
4188             if (seek != PA_SEEK_RELATIVE || offset != 0)
4189                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4190
4191             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4192         } else
4193             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4194
4195     } else {
4196         upload_stream *u = UPLOAD_STREAM(stream);
4197         size_t l;
4198
4199         if (!u->memchunk.memblock) {
4200             if (u->length == chunk->length && chunk->memblock) {
4201                 u->memchunk = *chunk;
4202                 pa_memblock_ref(u->memchunk.memblock);
4203                 u->length = 0;
4204             } else {
4205                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4206                 u->memchunk.index = u->memchunk.length = 0;
4207             }
4208         }
4209
4210         pa_assert(u->memchunk.memblock);
4211
4212         l = u->length;
4213         if (l > chunk->length)
4214             l = chunk->length;
4215
4216         if (l > 0) {
4217             void *dst;
4218             dst = pa_memblock_acquire(u->memchunk.memblock);
4219
4220             if (chunk->memblock) {
4221                 void *src;
4222                 src = pa_memblock_acquire(chunk->memblock);
4223
4224                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4225                        (uint8_t*) src + chunk->index, l);
4226
4227                 pa_memblock_release(chunk->memblock);
4228             } else
4229                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4230
4231             pa_memblock_release(u->memchunk.memblock);
4232
4233             u->memchunk.length += l;
4234             u->length -= l;
4235         }
4236     }
4237 }
4238
4239 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4240     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4241
4242     pa_assert(p);
4243     pa_native_connection_assert_ref(c);
4244
4245     native_connection_unlink(c);
4246     pa_log_info("Connection died.");
4247 }
4248
4249 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4250     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4251
4252     pa_assert(p);
4253     pa_native_connection_assert_ref(c);
4254
4255     native_connection_send_memblock(c);
4256 }
4257
4258 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4259     pa_thread_mq *q;
4260
4261     if (!(q = pa_thread_mq_get()))
4262         pa_pstream_send_revoke(p, block_id);
4263     else
4264         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4265 }
4266
4267 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4268     pa_thread_mq *q;
4269
4270     if (!(q = pa_thread_mq_get()))
4271         pa_pstream_send_release(p, block_id);
4272     else
4273         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4274 }
4275
4276 /*** client callbacks ***/
4277
4278 static void client_kill_cb(pa_client *c) {
4279     pa_assert(c);
4280
4281     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4282     pa_log_info("Connection killed.");
4283 }
4284
4285 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4286     pa_tagstruct *t;
4287     pa_native_connection *c;
4288
4289     pa_assert(client);
4290     c = PA_NATIVE_CONNECTION(client->userdata);
4291     pa_native_connection_assert_ref(c);
4292
4293     if (c->version < 15)
4294       return;
4295
4296     t = pa_tagstruct_new(NULL, 0);
4297     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4298     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4299     pa_tagstruct_puts(t, event);
4300     pa_tagstruct_put_proplist(t, pl);
4301     pa_pstream_send_tagstruct(c->pstream, t);
4302 }
4303
4304 /*** module entry points ***/
4305
4306 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
4307     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4308
4309     pa_assert(m);
4310     pa_assert(tv);
4311     pa_native_connection_assert_ref(c);
4312     pa_assert(c->auth_timeout_event == e);
4313
4314     if (!c->authorized) {
4315         native_connection_unlink(c);
4316         pa_log_info("Connection terminated due to authentication timeout.");
4317     }
4318 }
4319
4320 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4321     pa_native_connection *c;
4322     char pname[128];
4323     pa_client *client;
4324     pa_client_new_data data;
4325
4326     pa_assert(p);
4327     pa_assert(io);
4328     pa_assert(o);
4329
4330     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4331         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4332         pa_iochannel_free(io);
4333         return;
4334     }
4335
4336     pa_client_new_data_init(&data);
4337     data.module = o->module;
4338     data.driver = __FILE__;
4339     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4340     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4341     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4342     client = pa_client_new(p->core, &data);
4343     pa_client_new_data_done(&data);
4344
4345     if (!client)
4346         return;
4347
4348     c = pa_msgobject_new(pa_native_connection);
4349     c->parent.parent.free = native_connection_free;
4350     c->parent.process_msg = native_connection_process_msg;
4351     c->protocol = p;
4352     c->options = pa_native_options_ref(o);
4353     c->authorized = FALSE;
4354
4355     if (o->auth_anonymous) {
4356         pa_log_info("Client authenticated anonymously.");
4357         c->authorized = TRUE;
4358     }
4359
4360     if (!c->authorized &&
4361         o->auth_ip_acl &&
4362         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4363
4364         pa_log_info("Client authenticated by IP ACL.");
4365         c->authorized = TRUE;
4366     }
4367
4368     if (!c->authorized) {
4369         struct timeval tv;
4370         pa_gettimeofday(&tv);
4371         tv.tv_sec += AUTH_TIMEOUT;
4372         c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c);
4373     } else
4374         c->auth_timeout_event = NULL;
4375
4376     c->is_local = pa_iochannel_socket_is_local(io);
4377     c->version = 8;
4378
4379     c->client = client;
4380     c->client->kill = client_kill_cb;
4381     c->client->send_event = client_send_event_cb;
4382     c->client->userdata = c;
4383
4384     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4385     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4386     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4387     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4388     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4389     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4390     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4391
4392     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
4393
4394     c->record_streams = pa_idxset_new(NULL, NULL);
4395     c->output_streams = pa_idxset_new(NULL, NULL);
4396
4397     c->rrobin_index = PA_IDXSET_INVALID;
4398     c->subscription = NULL;
4399
4400     pa_idxset_put(p->connections, c, NULL);
4401
4402 #ifdef HAVE_CREDS
4403     if (pa_iochannel_creds_supported(io))
4404         pa_iochannel_creds_enable(io);
4405 #endif
4406
4407     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4408 }
4409
4410 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4411     pa_native_connection *c;
4412     void *state = NULL;
4413
4414     pa_assert(p);
4415     pa_assert(m);
4416
4417     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4418         if (c->options->module == m)
4419             native_connection_unlink(c);
4420 }
4421
4422 static pa_native_protocol* native_protocol_new(pa_core *c) {
4423     pa_native_protocol *p;
4424     pa_native_hook_t h;
4425
4426     pa_assert(c);
4427
4428     p = pa_xnew(pa_native_protocol, 1);
4429     PA_REFCNT_INIT(p);
4430     p->core = c;
4431     p->connections = pa_idxset_new(NULL, NULL);
4432
4433     p->servers = NULL;
4434
4435     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4436
4437     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4438         pa_hook_init(&p->hooks[h], p);
4439
4440     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4441
4442     return p;
4443 }
4444
4445 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4446     pa_native_protocol *p;
4447
4448     if ((p = pa_shared_get(c, "native-protocol")))
4449         return pa_native_protocol_ref(p);
4450
4451     return native_protocol_new(c);
4452 }
4453
4454 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4455     pa_assert(p);
4456     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4457
4458     PA_REFCNT_INC(p);
4459
4460     return p;
4461 }
4462
4463 void pa_native_protocol_unref(pa_native_protocol *p) {
4464     pa_native_connection *c;
4465     pa_native_hook_t h;
4466
4467     pa_assert(p);
4468     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4469
4470     if (PA_REFCNT_DEC(p) > 0)
4471         return;
4472
4473     while ((c = pa_idxset_first(p->connections, NULL)))
4474         native_connection_unlink(c);
4475
4476     pa_idxset_free(p->connections, NULL, NULL);
4477
4478     pa_strlist_free(p->servers);
4479
4480     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4481         pa_hook_done(&p->hooks[h]);
4482
4483     pa_hashmap_free(p->extensions, NULL, NULL);
4484
4485     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4486
4487     pa_xfree(p);
4488 }
4489
4490 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4491     pa_assert(p);
4492     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4493     pa_assert(name);
4494
4495     p->servers = pa_strlist_prepend(p->servers, name);
4496
4497     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4498 }
4499
4500 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4501     pa_assert(p);
4502     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4503     pa_assert(name);
4504
4505     p->servers = pa_strlist_remove(p->servers, name);
4506
4507     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4508 }
4509
4510 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4511     pa_assert(p);
4512     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4513
4514     return p->hooks;
4515 }
4516
4517 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4518     pa_assert(p);
4519     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4520
4521     return p->servers;
4522 }
4523
4524 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4525     pa_assert(p);
4526     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4527     pa_assert(m);
4528     pa_assert(cb);
4529     pa_assert(!pa_hashmap_get(p->extensions, m));
4530
4531     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4532     return 0;
4533 }
4534
4535 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4536     pa_assert(p);
4537     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4538     pa_assert(m);
4539
4540     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4541 }
4542
4543 pa_native_options* pa_native_options_new(void) {
4544     pa_native_options *o;
4545
4546     o = pa_xnew0(pa_native_options, 1);
4547     PA_REFCNT_INIT(o);
4548
4549     return o;
4550 }
4551
4552 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4553     pa_assert(o);
4554     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4555
4556     PA_REFCNT_INC(o);
4557
4558     return o;
4559 }
4560
4561 void pa_native_options_unref(pa_native_options *o) {
4562     pa_assert(o);
4563     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4564
4565     if (PA_REFCNT_DEC(o) > 0)
4566         return;
4567
4568     pa_xfree(o->auth_group);
4569
4570     if (o->auth_ip_acl)
4571         pa_ip_acl_free(o->auth_ip_acl);
4572
4573     if (o->auth_cookie)
4574         pa_auth_cookie_unref(o->auth_cookie);
4575
4576     pa_xfree(o);
4577 }
4578
4579 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4580     pa_bool_t enabled;
4581     const char *acl;
4582
4583     pa_assert(o);
4584     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4585     pa_assert(ma);
4586
4587     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4588         pa_log("auth-anonymous= expects a boolean argument.");
4589         return -1;
4590     }
4591
4592     enabled = TRUE;
4593     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4594         pa_log("auth-group-enabled= expects a boolean argument.");
4595         return -1;
4596     }
4597
4598     pa_xfree(o->auth_group);
4599     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4600
4601 #ifndef HAVE_CREDS
4602     if (o->auth_group)
4603         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4604 #endif
4605
4606     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4607         pa_ip_acl *ipa;
4608
4609         if (!(ipa = pa_ip_acl_new(acl))) {
4610             pa_log("Failed to parse IP ACL '%s'", acl);
4611             return -1;
4612         }
4613
4614         if (o->auth_ip_acl)
4615             pa_ip_acl_free(o->auth_ip_acl);
4616
4617         o->auth_ip_acl = ipa;
4618     }
4619
4620     enabled = TRUE;
4621     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4622         pa_log("auth-cookie-enabled= expects a boolean argument.");
4623         return -1;
4624     }
4625
4626     if (o->auth_cookie)
4627         pa_auth_cookie_unref(o->auth_cookie);
4628
4629     if (enabled) {
4630         const char *cn;
4631
4632         /* The new name for this is 'auth-cookie', for compat reasons
4633          * we check the old name too */
4634         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4635             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4636                 cn = PA_NATIVE_COOKIE_FILE;
4637
4638         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4639             return -1;
4640
4641     } else
4642           o->auth_cookie = NULL;
4643
4644     return 0;
4645 }
4646
4647 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4648     pa_native_connection_assert_ref(c);
4649
4650     return c->pstream;
4651 }