Make sure we don't get stuck when prebuf is too high
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/timeval.h>
33 #include <pulse/version.h>
34 #include <pulse/utf8.h>
35 #include <pulse/util.h>
36 #include <pulse/xmalloc.h>
37
38 #include <pulsecore/native-common.h>
39 #include <pulsecore/packet.h>
40 #include <pulsecore/client.h>
41 #include <pulsecore/source-output.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/pstream.h>
44 #include <pulsecore/tagstruct.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream-util.h>
47 #include <pulsecore/authkey.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/core-subscribe.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/strlist.h>
53 #include <pulsecore/shared.h>
54 #include <pulsecore/sample-util.h>
55 #include <pulsecore/llist.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* Kick a client if it doesn't authenticate within this time */
64 #define AUTH_TIMEOUT 60
65
66 /* Don't accept more connection than this */
67 #define MAX_CONNECTIONS 64
68
69 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
70 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
71 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
72 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
73
74 struct pa_native_protocol;
75
76 typedef struct record_stream {
77     pa_msgobject parent;
78
79     pa_native_connection *connection;
80     uint32_t index;
81
82     pa_source_output *source_output;
83     pa_memblockq *memblockq;
84
85     pa_bool_t adjust_latency:1;
86     pa_bool_t early_requests:1;
87
88     pa_buffer_attr buffer_attr;
89
90     pa_atomic_t on_the_fly;
91     pa_usec_t configured_source_latency;
92     size_t drop_initial;
93
94     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
95     size_t on_the_fly_snapshot;
96     pa_usec_t current_monitor_latency;
97     pa_usec_t current_source_latency;
98 } record_stream;
99
100 PA_DECLARE_CLASS(record_stream);
101 #define RECORD_STREAM(o) (record_stream_cast(o))
102 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
103
104 typedef struct output_stream {
105     pa_msgobject parent;
106 } output_stream;
107
108 PA_DECLARE_CLASS(output_stream);
109 #define OUTPUT_STREAM(o) (output_stream_cast(o))
110 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
111
112 typedef struct playback_stream {
113     output_stream parent;
114
115     pa_native_connection *connection;
116     uint32_t index;
117
118     pa_sink_input *sink_input;
119     pa_memblockq *memblockq;
120
121     pa_bool_t adjust_latency:1;
122     pa_bool_t early_requests:1;
123
124     pa_bool_t is_underrun:1;
125     pa_bool_t drain_request:1;
126     uint32_t drain_tag;
127     uint32_t syncid;
128
129     pa_atomic_t missing;
130     pa_usec_t configured_sink_latency;
131     pa_buffer_attr buffer_attr;
132
133     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
134     int64_t read_index, write_index;
135     size_t render_memblockq_length;
136     pa_usec_t current_sink_latency;
137     uint64_t playing_for, underrun_for;
138 } playback_stream;
139
140 PA_DECLARE_CLASS(playback_stream);
141 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
142 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
143
144 typedef struct upload_stream {
145     output_stream parent;
146
147     pa_native_connection *connection;
148     uint32_t index;
149
150     pa_memchunk memchunk;
151     size_t length;
152     char *name;
153     pa_sample_spec sample_spec;
154     pa_channel_map channel_map;
155     pa_proplist *proplist;
156 } upload_stream;
157
158 PA_DECLARE_CLASS(upload_stream);
159 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
160 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
161
162 struct pa_native_connection {
163     pa_msgobject parent;
164     pa_native_protocol *protocol;
165     pa_native_options *options;
166     pa_bool_t authorized:1;
167     pa_bool_t is_local:1;
168     uint32_t version;
169     pa_client *client;
170     pa_pstream *pstream;
171     pa_pdispatch *pdispatch;
172     pa_idxset *record_streams, *output_streams;
173     uint32_t rrobin_index;
174     pa_subscription *subscription;
175     pa_time_event *auth_timeout_event;
176 };
177
178 PA_DECLARE_CLASS(pa_native_connection);
179 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
180 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
181
182 struct pa_native_protocol {
183     PA_REFCNT_DECLARE;
184
185     pa_core *core;
186     pa_idxset *connections;
187
188     pa_strlist *servers;
189     pa_hook hooks[PA_NATIVE_HOOK_MAX];
190
191     pa_hashmap *extensions;
192 };
193
194 enum {
195     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
196 };
197
198 enum {
199     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
200     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
201     SINK_INPUT_MESSAGE_FLUSH,
202     SINK_INPUT_MESSAGE_TRIGGER,
203     SINK_INPUT_MESSAGE_SEEK,
204     SINK_INPUT_MESSAGE_PREBUF_FORCE,
205     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
206     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
207 };
208
209 enum {
210     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
211     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
212     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
213     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
214     PLAYBACK_STREAM_MESSAGE_STARTED,
215     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
216 };
217
218 enum {
219     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
220 };
221
222 enum {
223     CONNECTION_MESSAGE_RELEASE,
224     CONNECTION_MESSAGE_REVOKE
225 };
226
227 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
228 static void sink_input_kill_cb(pa_sink_input *i);
229 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
230 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
231 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
232 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
235
236 static void native_connection_send_memblock(pa_native_connection *c);
237 static void playback_stream_request_bytes(struct playback_stream*s);
238
239 static void source_output_kill_cb(pa_source_output *o);
240 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
241 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
242 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
243 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
244 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
245
246 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
247 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248
249 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287
288 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
289     [PA_COMMAND_ERROR] = NULL,
290     [PA_COMMAND_TIMEOUT] = NULL,
291     [PA_COMMAND_REPLY] = NULL,
292     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
293     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
294     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
295     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
296     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
297     [PA_COMMAND_AUTH] = command_auth,
298     [PA_COMMAND_REQUEST] = NULL,
299     [PA_COMMAND_EXIT] = command_exit,
300     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
301     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
302     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
303     [PA_COMMAND_STAT] = command_stat,
304     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
305     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
306     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
307     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
308     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
309     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
310     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
311     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
312     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
313     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
314     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
315     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
316     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
317     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
318     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
319     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
320     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
321     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
328     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
329
330     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
331     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
332     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
333
334     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
335     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
336     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
337
338     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
339     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
340
341     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
342     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
343     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
344     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345
346     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
347     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
348
349     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
350     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
351     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
352     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
353     [PA_COMMAND_KILL_CLIENT] = command_kill,
354     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
355     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
356     [PA_COMMAND_LOAD_MODULE] = command_load_module,
357     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
358
359     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
360     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
361     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
362     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
363
364     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
365     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
366
367     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
368     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
369
370     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
371     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
372
373     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
374     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
375     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
376
377     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
378     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
379     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
380
381     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
382
383     [PA_COMMAND_EXTENSION] = command_extension
384 };
385
386 /* structure management */
387
388 /* Called from main context */
389 static void upload_stream_unlink(upload_stream *s) {
390     pa_assert(s);
391
392     if (!s->connection)
393         return;
394
395     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
396     s->connection = NULL;
397     upload_stream_unref(s);
398 }
399
400 /* Called from main context */
401 static void upload_stream_free(pa_object *o) {
402     upload_stream *s = UPLOAD_STREAM(o);
403     pa_assert(s);
404
405     upload_stream_unlink(s);
406
407     pa_xfree(s->name);
408
409     if (s->proplist)
410         pa_proplist_free(s->proplist);
411
412     if (s->memchunk.memblock)
413         pa_memblock_unref(s->memchunk.memblock);
414
415     pa_xfree(s);
416 }
417
418 /* Called from main context */
419 static upload_stream* upload_stream_new(
420         pa_native_connection *c,
421         const pa_sample_spec *ss,
422         const pa_channel_map *map,
423         const char *name,
424         size_t length,
425         pa_proplist *p) {
426
427     upload_stream *s;
428
429     pa_assert(c);
430     pa_assert(ss);
431     pa_assert(name);
432     pa_assert(length > 0);
433     pa_assert(p);
434
435     s = pa_msgobject_new(upload_stream);
436     s->parent.parent.parent.free = upload_stream_free;
437     s->connection = c;
438     s->sample_spec = *ss;
439     s->channel_map = *map;
440     s->name = pa_xstrdup(name);
441     pa_memchunk_reset(&s->memchunk);
442     s->length = length;
443     s->proplist = pa_proplist_copy(p);
444     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
445
446     pa_idxset_put(c->output_streams, s, &s->index);
447
448     return s;
449 }
450
451 /* Called from main context */
452 static void record_stream_unlink(record_stream *s) {
453     pa_assert(s);
454
455     if (!s->connection)
456         return;
457
458     if (s->source_output) {
459         pa_source_output_unlink(s->source_output);
460         pa_source_output_unref(s->source_output);
461         s->source_output = NULL;
462     }
463
464     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
465     s->connection = NULL;
466     record_stream_unref(s);
467 }
468
469 /* Called from main context */
470 static void record_stream_free(pa_object *o) {
471     record_stream *s = RECORD_STREAM(o);
472     pa_assert(s);
473
474     record_stream_unlink(s);
475
476     pa_memblockq_free(s->memblockq);
477     pa_xfree(s);
478 }
479
480 /* Called from main context */
481 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
482     record_stream *s = RECORD_STREAM(o);
483     record_stream_assert_ref(s);
484
485     if (!s->connection)
486         return -1;
487
488     switch (code) {
489
490         case RECORD_STREAM_MESSAGE_POST_DATA:
491
492             /* We try to keep up to date with how many bytes are
493              * currently on the fly */
494             pa_atomic_sub(&s->on_the_fly, chunk->length);
495
496             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
497 /*                 pa_log_warn("Failed to push data into output queue."); */
498                 return -1;
499             }
500
501             if (!pa_pstream_is_pending(s->connection->pstream))
502                 native_connection_send_memblock(s->connection);
503
504             break;
505     }
506
507     return 0;
508 }
509
510 /* Called from main context */
511 static void fix_record_buffer_attr_pre(record_stream *s) {
512
513     size_t frame_size;
514     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
515
516     pa_assert(s);
517
518     /* This function will be called from the main thread, before as
519      * well as after the source output has been activated using
520      * pa_source_output_put()! That means it may not touch any
521      * ->thread_info data! */
522
523     frame_size = pa_frame_size(&s->source_output->sample_spec);
524
525     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
526         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
527     if (s->buffer_attr.maxlength <= 0)
528         s->buffer_attr.maxlength = (uint32_t) frame_size;
529
530     if (s->buffer_attr.fragsize == (uint32_t) -1)
531         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
532     if (s->buffer_attr.fragsize <= 0)
533         s->buffer_attr.fragsize = (uint32_t) frame_size;
534
535     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
536
537     if (s->early_requests) {
538
539         /* In early request mode we need to emulate the classic
540          * fragment-based playback model. We do this setting the source
541          * latency to the fragment size. */
542
543         source_usec = fragsize_usec;
544
545     } else if (s->adjust_latency) {
546
547         /* So, the user asked us to adjust the latency according to
548          * what the source can provide. Half the latency will be
549          * spent on the hw buffer, half of it in the async buffer
550          * queue we maintain for each client. */
551
552         source_usec = fragsize_usec/2;
553
554     } else {
555
556         /* Ok, the user didn't ask us to adjust the latency, hence we
557          * don't */
558
559         source_usec = (pa_usec_t) -1;
560     }
561
562     if (source_usec != (pa_usec_t) -1)
563         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
564     else
565         s->configured_source_latency = 0;
566
567     if (s->early_requests) {
568
569         /* Ok, we didn't necessarily get what we were asking for, so
570          * let's tell the user */
571
572         fragsize_usec = s->configured_source_latency;
573
574     } else if (s->adjust_latency) {
575
576         /* Now subtract what we actually got */
577
578         if (fragsize_usec >= s->configured_source_latency*2)
579             fragsize_usec -= s->configured_source_latency;
580         else
581             fragsize_usec = s->configured_source_latency;
582     }
583
584     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
585         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
586
587         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
588
589     if (s->buffer_attr.fragsize <= 0)
590         s->buffer_attr.fragsize = (uint32_t) frame_size;
591 }
592
593 /* Called from main context */
594 static void fix_record_buffer_attr_post(record_stream *s) {
595     size_t base;
596
597     pa_assert(s);
598
599     /* This function will be called from the main thread, before as
600      * well as after the source output has been activated using
601      * pa_source_output_put()! That means it may not touch and
602      * ->thread_info data! */
603
604     base = pa_frame_size(&s->source_output->sample_spec);
605
606     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
607     if (s->buffer_attr.fragsize <= 0)
608         s->buffer_attr.fragsize = base;
609
610     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
611         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
612 }
613
614 /* Called from main context */
615 static record_stream* record_stream_new(
616         pa_native_connection *c,
617         pa_source *source,
618         pa_sample_spec *ss,
619         pa_channel_map *map,
620         pa_bool_t peak_detect,
621         pa_buffer_attr *attr,
622         pa_source_output_flags_t flags,
623         pa_proplist *p,
624         pa_bool_t adjust_latency,
625         pa_sink_input *direct_on_input,
626         pa_bool_t early_requests,
627         int *ret) {
628
629     record_stream *s;
630     pa_source_output *source_output = NULL;
631     size_t base;
632     pa_source_output_new_data data;
633
634     pa_assert(c);
635     pa_assert(ss);
636     pa_assert(p);
637     pa_assert(ret);
638
639     pa_source_output_new_data_init(&data);
640
641     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
642     data.driver = __FILE__;
643     data.module = c->options->module;
644     data.client = c->client;
645     data.source = source;
646     data.direct_on_input = direct_on_input;
647     pa_source_output_new_data_set_sample_spec(&data, ss);
648     pa_source_output_new_data_set_channel_map(&data, map);
649     if (peak_detect)
650         data.resample_method = PA_RESAMPLER_PEAKS;
651
652     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
653
654     pa_source_output_new_data_done(&data);
655
656     if (!source_output)
657         return NULL;
658
659     s = pa_msgobject_new(record_stream);
660     s->parent.parent.free = record_stream_free;
661     s->parent.process_msg = record_stream_process_msg;
662     s->connection = c;
663     s->source_output = source_output;
664     s->buffer_attr = *attr;
665     s->adjust_latency = adjust_latency;
666     s->early_requests = early_requests;
667     pa_atomic_store(&s->on_the_fly, 0);
668
669     s->source_output->parent.process_msg = source_output_process_msg;
670     s->source_output->push = source_output_push_cb;
671     s->source_output->kill = source_output_kill_cb;
672     s->source_output->get_latency = source_output_get_latency_cb;
673     s->source_output->moving = source_output_moving_cb;
674     s->source_output->suspend = source_output_suspend_cb;
675     s->source_output->send_event = source_output_send_event_cb;
676     s->source_output->userdata = s;
677
678     fix_record_buffer_attr_pre(s);
679
680     s->memblockq = pa_memblockq_new(
681             0,
682             s->buffer_attr.maxlength,
683             0,
684             base = pa_frame_size(&source_output->sample_spec),
685             1,
686             0,
687             0,
688             NULL);
689
690     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
691     fix_record_buffer_attr_post(s);
692
693     *ss = s->source_output->sample_spec;
694     *map = s->source_output->channel_map;
695
696     pa_idxset_put(c->record_streams, s, &s->index);
697
698     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
699                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
700                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
701                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
702
703     pa_source_output_put(s->source_output);
704     return s;
705 }
706
707 /* Called from main context */
708 static void record_stream_send_killed(record_stream *r) {
709     pa_tagstruct *t;
710     record_stream_assert_ref(r);
711
712     t = pa_tagstruct_new(NULL, 0);
713     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
714     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
715     pa_tagstruct_putu32(t, r->index);
716     pa_pstream_send_tagstruct(r->connection->pstream, t);
717 }
718
719 /* Called from main context */
720 static void playback_stream_unlink(playback_stream *s) {
721     pa_assert(s);
722
723     if (!s->connection)
724         return;
725
726     if (s->sink_input) {
727         pa_sink_input_unlink(s->sink_input);
728         pa_sink_input_unref(s->sink_input);
729         s->sink_input = NULL;
730     }
731
732     if (s->drain_request)
733         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
734
735     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
736     s->connection = NULL;
737     playback_stream_unref(s);
738 }
739
740 /* Called from main context */
741 static void playback_stream_free(pa_object* o) {
742     playback_stream *s = PLAYBACK_STREAM(o);
743     pa_assert(s);
744
745     playback_stream_unlink(s);
746
747     pa_memblockq_free(s->memblockq);
748     pa_xfree(s);
749 }
750
751 /* Called from main context */
752 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
753     playback_stream *s = PLAYBACK_STREAM(o);
754     playback_stream_assert_ref(s);
755
756     if (!s->connection)
757         return -1;
758
759     switch (code) {
760         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
761             pa_tagstruct *t;
762             int l = 0;
763
764             for (;;) {
765                 if ((l = pa_atomic_load(&s->missing)) <= 0)
766                     return 0;
767
768                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
769                     break;
770             }
771
772             t = pa_tagstruct_new(NULL, 0);
773             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
774             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
775             pa_tagstruct_putu32(t, s->index);
776             pa_tagstruct_putu32(t, (uint32_t) l);
777             pa_pstream_send_tagstruct(s->connection->pstream, t);
778
779 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
780             break;
781         }
782
783         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
784             pa_tagstruct *t;
785
786 /*             pa_log("signalling underflow"); */
787
788             /* Report that we're empty */
789             t = pa_tagstruct_new(NULL, 0);
790             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
791             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
792             pa_tagstruct_putu32(t, s->index);
793             pa_pstream_send_tagstruct(s->connection->pstream, t);
794             break;
795         }
796
797         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
798             pa_tagstruct *t;
799
800             /* Notify the user we're overflowed*/
801             t = pa_tagstruct_new(NULL, 0);
802             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
803             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
804             pa_tagstruct_putu32(t, s->index);
805             pa_pstream_send_tagstruct(s->connection->pstream, t);
806             break;
807         }
808
809         case PLAYBACK_STREAM_MESSAGE_STARTED:
810
811             if (s->connection->version >= 13) {
812                 pa_tagstruct *t;
813
814                 /* Notify the user we started playback */
815                 t = pa_tagstruct_new(NULL, 0);
816                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
817                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
818                 pa_tagstruct_putu32(t, s->index);
819                 pa_pstream_send_tagstruct(s->connection->pstream, t);
820             }
821
822             break;
823
824         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
825             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
826             break;
827
828         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
829             pa_tagstruct *t;
830
831             s->buffer_attr.tlength = (uint32_t) offset;
832
833             t = pa_tagstruct_new(NULL, 0);
834             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
835             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
836             pa_tagstruct_putu32(t, s->index);
837             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
838             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
839             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
840             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
841             pa_tagstruct_put_usec(t, s->configured_sink_latency);
842             pa_pstream_send_tagstruct(s->connection->pstream, t);
843
844             break;
845         }
846     }
847
848     return 0;
849 }
850
851 /* Called from main context */
852 static void fix_playback_buffer_attr(playback_stream *s) {
853     size_t frame_size, max_prebuf;
854     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
855
856     pa_assert(s);
857
858     /* This function will be called from the main thread, before as
859      * well as after the sink input has been activated using
860      * pa_sink_input_put()! That means it may not touch any
861      * ->thread_info data, such as the memblockq! */
862
863     frame_size = pa_frame_size(&s->sink_input->sample_spec);
864
865     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
866         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
867     if (s->buffer_attr.maxlength <= 0)
868         s->buffer_attr.maxlength = (uint32_t) frame_size;
869
870     if (s->buffer_attr.tlength == (uint32_t) -1)
871         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
872     if (s->buffer_attr.tlength <= 0)
873         s->buffer_attr.tlength = (uint32_t) frame_size;
874
875     if (s->buffer_attr.minreq == (uint32_t) -1)
876         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
877     if (s->buffer_attr.minreq <= 0)
878         s->buffer_attr.minreq = (uint32_t) frame_size;
879
880     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
881         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
882
883     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
884     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
885
886     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
887                 (double) tlength_usec / PA_USEC_PER_MSEC,
888                 (double) minreq_usec / PA_USEC_PER_MSEC);
889
890     if (s->early_requests) {
891
892         /* In early request mode we need to emulate the classic
893          * fragment-based playback model. We do this setting the sink
894          * latency to the fragment size. */
895
896         sink_usec = minreq_usec;
897         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
898
899     } else if (s->adjust_latency) {
900
901         /* So, the user asked us to adjust the latency of the stream
902          * buffer according to the what the sink can provide. The
903          * tlength passed in shall be the overall latency. Roughly
904          * half the latency will be spent on the hw buffer, the other
905          * half of it in the async buffer queue we maintain for each
906          * client. In between we'll have a safety space of size
907          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
908          * empty and needs to be filled, then our buffer must have
909          * enough data to fulfill this request immediatly and thus
910          * have at least the same tlength as the size of the hw
911          * buffer. It additionally needs space for 2 times minreq
912          * because if the buffer ran empty and a partial fillup
913          * happens immediately on the next iteration we need to be
914          * able to fulfill it and give the application also minreq
915          * time to fill it up again for the next request Makes 2 times
916          * minreq in plus.. */
917
918         if (tlength_usec > minreq_usec*2)
919             sink_usec = (tlength_usec - minreq_usec*2)/2;
920         else
921             sink_usec = 0;
922
923         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
924
925     } else {
926
927         /* Ok, the user didn't ask us to adjust the latency, but we
928          * still need to make sure that the parameters from the user
929          * do make sense. */
930
931         if (tlength_usec > minreq_usec*2)
932             sink_usec = (tlength_usec - minreq_usec*2);
933         else
934             sink_usec = 0;
935
936         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
937     }
938
939     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
940
941     if (s->early_requests) {
942
943         /* Ok, we didn't necessarily get what we were asking for, so
944          * let's tell the user */
945
946         minreq_usec = s->configured_sink_latency;
947
948     } else if (s->adjust_latency) {
949
950         /* Ok, we didn't necessarily get what we were asking for, so
951          * let's subtract from what we asked for for the remaining
952          * buffer space */
953
954         if (tlength_usec >= s->configured_sink_latency)
955             tlength_usec -= s->configured_sink_latency;
956     }
957
958     /* FIXME: This is actually larger than necessary, since not all of
959      * the sink latency is actually rewritable. */
960     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
961         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
962
963     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
964         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
965         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
966
967     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
968         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
969         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
970
971     if (s->buffer_attr.minreq <= 0) {
972         s->buffer_attr.minreq = (uint32_t) frame_size;
973         s->buffer_attr.tlength += (uint32_t) frame_size*2;
974     }
975
976     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
977         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
978
979     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
980
981     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
982         s->buffer_attr.prebuf > max_prebuf)
983         s->buffer_attr.prebuf = max_prebuf;
984 }
985
986 /* Called from main context */
987 static playback_stream* playback_stream_new(
988         pa_native_connection *c,
989         pa_sink *sink,
990         pa_sample_spec *ss,
991         pa_channel_map *map,
992         pa_buffer_attr *a,
993         pa_cvolume *volume,
994         pa_bool_t muted,
995         pa_bool_t muted_set,
996         uint32_t syncid,
997         uint32_t *missing,
998         pa_sink_input_flags_t flags,
999         pa_proplist *p,
1000         pa_bool_t adjust_latency,
1001         pa_bool_t early_requests,
1002         int *ret) {
1003
1004     playback_stream *s, *ssync;
1005     pa_sink_input *sink_input = NULL;
1006     pa_memchunk silence;
1007     uint32_t idx;
1008     int64_t start_index;
1009     pa_sink_input_new_data data;
1010
1011     pa_assert(c);
1012     pa_assert(ss);
1013     pa_assert(missing);
1014     pa_assert(p);
1015     pa_assert(ret);
1016
1017     /* Find syncid group */
1018     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1019
1020         if (!playback_stream_isinstance(ssync))
1021             continue;
1022
1023         if (ssync->syncid == syncid)
1024             break;
1025     }
1026
1027     /* Synced streams must connect to the same sink */
1028     if (ssync) {
1029
1030         if (!sink)
1031             sink = ssync->sink_input->sink;
1032         else if (sink != ssync->sink_input->sink) {
1033             *ret = PA_ERR_INVALID;
1034             return NULL;
1035         }
1036     }
1037
1038     pa_sink_input_new_data_init(&data);
1039
1040     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1041     data.driver = __FILE__;
1042     data.module = c->options->module;
1043     data.client = c->client;
1044     data.sink = sink;
1045     pa_sink_input_new_data_set_sample_spec(&data, ss);
1046     pa_sink_input_new_data_set_channel_map(&data, map);
1047     if (volume)
1048         pa_sink_input_new_data_set_volume(&data, volume);
1049     if (muted_set)
1050         pa_sink_input_new_data_set_muted(&data, muted);
1051     data.sync_base = ssync ? ssync->sink_input : NULL;
1052
1053     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1054
1055     pa_sink_input_new_data_done(&data);
1056
1057     if (!sink_input)
1058         return NULL;
1059
1060     s = pa_msgobject_new(playback_stream);
1061     s->parent.parent.parent.free = playback_stream_free;
1062     s->parent.parent.process_msg = playback_stream_process_msg;
1063     s->connection = c;
1064     s->syncid = syncid;
1065     s->sink_input = sink_input;
1066     s->is_underrun = TRUE;
1067     s->drain_request = FALSE;
1068     pa_atomic_store(&s->missing, 0);
1069     s->buffer_attr = *a;
1070     s->adjust_latency = adjust_latency;
1071     s->early_requests = early_requests;
1072
1073     s->sink_input->parent.process_msg = sink_input_process_msg;
1074     s->sink_input->pop = sink_input_pop_cb;
1075     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1076     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1077     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1078     s->sink_input->kill = sink_input_kill_cb;
1079     s->sink_input->moving = sink_input_moving_cb;
1080     s->sink_input->suspend = sink_input_suspend_cb;
1081     s->sink_input->send_event = sink_input_send_event_cb;
1082     s->sink_input->userdata = s;
1083
1084     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1085
1086     fix_playback_buffer_attr(s);
1087
1088     pa_sink_input_get_silence(sink_input, &silence);
1089     s->memblockq = pa_memblockq_new(
1090             start_index,
1091             s->buffer_attr.maxlength,
1092             s->buffer_attr.tlength,
1093             pa_frame_size(&sink_input->sample_spec),
1094             s->buffer_attr.prebuf,
1095             s->buffer_attr.minreq,
1096             0,
1097             &silence);
1098     pa_memblock_unref(silence.memblock);
1099
1100     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1101
1102     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1103
1104     *ss = s->sink_input->sample_spec;
1105     *map = s->sink_input->channel_map;
1106
1107     pa_idxset_put(c->output_streams, s, &s->index);
1108
1109     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1110                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1111                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1112                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1113                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1114
1115     pa_sink_input_put(s->sink_input);
1116     return s;
1117 }
1118
1119 /* Called from IO context */
1120 static void playback_stream_request_bytes(playback_stream *s) {
1121     size_t m, minreq;
1122     int previous_missing;
1123
1124     playback_stream_assert_ref(s);
1125
1126     m = pa_memblockq_pop_missing(s->memblockq);
1127
1128     if (m <= 0)
1129         return;
1130
1131 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1132
1133     previous_missing = pa_atomic_add(&s->missing, (int) m);
1134     minreq = pa_memblockq_get_minreq(s->memblockq);
1135
1136     if (pa_memblockq_prebuf_active(s->memblockq) ||
1137         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1138         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1139 }
1140
1141
1142 /* Called from main context */
1143 static void playback_stream_send_killed(playback_stream *p) {
1144     pa_tagstruct *t;
1145     playback_stream_assert_ref(p);
1146
1147     t = pa_tagstruct_new(NULL, 0);
1148     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1149     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1150     pa_tagstruct_putu32(t, p->index);
1151     pa_pstream_send_tagstruct(p->connection->pstream, t);
1152 }
1153
1154 /* Called from main context */
1155 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1156     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1157     pa_native_connection_assert_ref(c);
1158
1159     if (!c->protocol)
1160         return -1;
1161
1162     switch (code) {
1163
1164         case CONNECTION_MESSAGE_REVOKE:
1165             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1166             break;
1167
1168         case CONNECTION_MESSAGE_RELEASE:
1169             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1170             break;
1171     }
1172
1173     return 0;
1174 }
1175
1176 /* Called from main context */
1177 static void native_connection_unlink(pa_native_connection *c) {
1178     record_stream *r;
1179     output_stream *o;
1180
1181     pa_assert(c);
1182
1183     if (!c->protocol)
1184         return;
1185
1186     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1187
1188     if (c->options)
1189         pa_native_options_unref(c->options);
1190
1191     while ((r = pa_idxset_first(c->record_streams, NULL)))
1192         record_stream_unlink(r);
1193
1194     while ((o = pa_idxset_first(c->output_streams, NULL)))
1195         if (playback_stream_isinstance(o))
1196             playback_stream_unlink(PLAYBACK_STREAM(o));
1197         else
1198             upload_stream_unlink(UPLOAD_STREAM(o));
1199
1200     if (c->subscription)
1201         pa_subscription_free(c->subscription);
1202
1203     if (c->pstream)
1204         pa_pstream_unlink(c->pstream);
1205
1206     if (c->auth_timeout_event) {
1207         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1208         c->auth_timeout_event = NULL;
1209     }
1210
1211     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1212     c->protocol = NULL;
1213     pa_native_connection_unref(c);
1214 }
1215
1216 /* Called from main context */
1217 static void native_connection_free(pa_object *o) {
1218     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1219
1220     pa_assert(c);
1221
1222     native_connection_unlink(c);
1223
1224     pa_idxset_free(c->record_streams, NULL, NULL);
1225     pa_idxset_free(c->output_streams, NULL, NULL);
1226
1227     pa_pdispatch_unref(c->pdispatch);
1228     pa_pstream_unref(c->pstream);
1229     pa_client_free(c->client);
1230
1231     pa_xfree(c);
1232 }
1233
1234 /* Called from main context */
1235 static void native_connection_send_memblock(pa_native_connection *c) {
1236     uint32_t start;
1237     record_stream *r;
1238
1239     start = PA_IDXSET_INVALID;
1240     for (;;) {
1241         pa_memchunk chunk;
1242
1243         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1244             return;
1245
1246         if (start == PA_IDXSET_INVALID)
1247             start = c->rrobin_index;
1248         else if (start == c->rrobin_index)
1249             return;
1250
1251         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1252             pa_memchunk schunk = chunk;
1253
1254             if (schunk.length > r->buffer_attr.fragsize)
1255                 schunk.length = r->buffer_attr.fragsize;
1256
1257             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1258
1259             pa_memblockq_drop(r->memblockq, schunk.length);
1260             pa_memblock_unref(schunk.memblock);
1261
1262             return;
1263         }
1264     }
1265 }
1266
1267 /*** sink input callbacks ***/
1268
1269 /* Called from thread context */
1270 static void handle_seek(playback_stream *s, int64_t indexw) {
1271     playback_stream_assert_ref(s);
1272
1273 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1274
1275     if (s->sink_input->thread_info.underrun_for > 0) {
1276
1277 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1278
1279         if (pa_memblockq_is_readable(s->memblockq)) {
1280
1281             /* We just ended an underrun, let's ask the sink
1282              * for a complete rewind rewrite */
1283
1284             pa_log_debug("Requesting rewind due to end of underrun.");
1285             pa_sink_input_request_rewind(s->sink_input,
1286                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1287                                          FALSE, TRUE, FALSE);
1288         }
1289
1290     } else {
1291         int64_t indexr;
1292
1293         indexr = pa_memblockq_get_read_index(s->memblockq);
1294
1295         if (indexw < indexr) {
1296             /* OK, the sink already asked for this data, so
1297              * let's have it usk us again */
1298
1299             pa_log_debug("Requesting rewind due to rewrite.");
1300             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1301         }
1302     }
1303
1304     playback_stream_request_bytes(s);
1305 }
1306
1307 /* Called from thread context */
1308 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1309     pa_sink_input *i = PA_SINK_INPUT(o);
1310     playback_stream *s;
1311
1312     pa_sink_input_assert_ref(i);
1313     s = PLAYBACK_STREAM(i->userdata);
1314     playback_stream_assert_ref(s);
1315
1316     switch (code) {
1317
1318         case SINK_INPUT_MESSAGE_SEEK: {
1319             int64_t windex;
1320
1321             windex = pa_memblockq_get_write_index(s->memblockq);
1322
1323             /* The client side is incapable of accounting correctly
1324              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1325              * able to deal with that. */
1326
1327             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1328
1329             handle_seek(s, windex);
1330             return 0;
1331         }
1332
1333         case SINK_INPUT_MESSAGE_POST_DATA: {
1334             int64_t windex;
1335
1336             pa_assert(chunk);
1337
1338             windex = pa_memblockq_get_write_index(s->memblockq);
1339
1340 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1341
1342             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1343                 pa_log_warn("Failed to push data into queue");
1344                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1345                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1346             }
1347
1348             handle_seek(s, windex);
1349
1350 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1351
1352             return 0;
1353         }
1354
1355         case SINK_INPUT_MESSAGE_DRAIN:
1356         case SINK_INPUT_MESSAGE_FLUSH:
1357         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1358         case SINK_INPUT_MESSAGE_TRIGGER: {
1359
1360             int64_t windex;
1361             pa_sink_input *isync;
1362             void (*func)(pa_memblockq *bq);
1363
1364             switch  (code) {
1365                 case SINK_INPUT_MESSAGE_FLUSH:
1366                     func = pa_memblockq_flush_write;
1367                     break;
1368
1369                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1370                     func = pa_memblockq_prebuf_force;
1371                     break;
1372
1373                 case SINK_INPUT_MESSAGE_DRAIN:
1374                 case SINK_INPUT_MESSAGE_TRIGGER:
1375                     func = pa_memblockq_prebuf_disable;
1376                     break;
1377
1378                 default:
1379                     pa_assert_not_reached();
1380             }
1381
1382             windex = pa_memblockq_get_write_index(s->memblockq);
1383             func(s->memblockq);
1384             handle_seek(s, windex);
1385
1386             /* Do the same for all other members in the sync group */
1387             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1388                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1389                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1390                 func(ssync->memblockq);
1391                 handle_seek(ssync, windex);
1392             }
1393
1394             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1395                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1396                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1397                 func(ssync->memblockq);
1398                 handle_seek(ssync, windex);
1399             }
1400
1401             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1402                 if (!pa_memblockq_is_readable(s->memblockq))
1403                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1404                 else {
1405                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1406                     s->drain_request = TRUE;
1407                 }
1408             }
1409
1410             return 0;
1411         }
1412
1413         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1414             /* Atomically get a snapshot of all timing parameters... */
1415             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1416             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1417             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1418             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1419             s->underrun_for = s->sink_input->thread_info.underrun_for;
1420             s->playing_for = s->sink_input->thread_info.playing_for;
1421
1422             return 0;
1423
1424         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1425             int64_t windex;
1426
1427             windex = pa_memblockq_get_write_index(s->memblockq);
1428
1429             pa_memblockq_prebuf_force(s->memblockq);
1430
1431             handle_seek(s, windex);
1432
1433             /* Fall through to the default handler */
1434             break;
1435         }
1436
1437         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1438             pa_usec_t *r = userdata;
1439
1440             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1441
1442             /* Fall through, the default handler will add in the extra
1443              * latency added by the resampler */
1444             break;
1445         }
1446
1447         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1448             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1449             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1450             return 0;
1451         }
1452     }
1453
1454     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1455 }
1456
1457 /* Called from thread context */
1458 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1459     playback_stream *s;
1460
1461     pa_sink_input_assert_ref(i);
1462     s = PLAYBACK_STREAM(i->userdata);
1463     playback_stream_assert_ref(s);
1464     pa_assert(chunk);
1465
1466 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1467
1468     if (pa_memblockq_is_readable(s->memblockq))
1469         s->is_underrun = FALSE;
1470     else {
1471         pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1472
1473         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1474             s->drain_request = FALSE;
1475             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1476         } else if (!s->is_underrun)
1477             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1478
1479         s->is_underrun = TRUE;
1480
1481         playback_stream_request_bytes(s);
1482     }
1483
1484     /* This call will not fail with prebuf=0, hence we check for
1485        underrun explicitly above */
1486     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1487         return -1;
1488
1489     chunk->length = PA_MIN(nbytes, chunk->length);
1490
1491     if (i->thread_info.underrun_for > 0)
1492         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1493
1494     pa_memblockq_drop(s->memblockq, chunk->length);
1495     playback_stream_request_bytes(s);
1496
1497     return 0;
1498 }
1499
1500 /* Called from thread context */
1501 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1502     playback_stream *s;
1503
1504     pa_sink_input_assert_ref(i);
1505     s = PLAYBACK_STREAM(i->userdata);
1506     playback_stream_assert_ref(s);
1507
1508     /* If we are in an underrun, then we don't rewind */
1509     if (i->thread_info.underrun_for > 0)
1510         return;
1511
1512     pa_memblockq_rewind(s->memblockq, nbytes);
1513 }
1514
1515 /* Called from thread context */
1516 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1517     playback_stream *s;
1518
1519     pa_sink_input_assert_ref(i);
1520     s = PLAYBACK_STREAM(i->userdata);
1521     playback_stream_assert_ref(s);
1522
1523     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1524 }
1525
1526 /* Called from thread context */
1527 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1528     playback_stream *s;
1529     size_t new_tlength, old_tlength;
1530
1531     pa_sink_input_assert_ref(i);
1532     s = PLAYBACK_STREAM(i->userdata);
1533     playback_stream_assert_ref(s);
1534
1535     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1536     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1537
1538     if (old_tlength < new_tlength) {
1539         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1540         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1541         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1542
1543         if (new_tlength == old_tlength)
1544             pa_log_debug("Failed to increase tlength");
1545         else {
1546             pa_log_debug("Notifying client about increased tlength");
1547             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1548         }
1549     }
1550 }
1551
1552 /* Called from main context */
1553 static void sink_input_kill_cb(pa_sink_input *i) {
1554     playback_stream *s;
1555
1556     pa_sink_input_assert_ref(i);
1557     s = PLAYBACK_STREAM(i->userdata);
1558     playback_stream_assert_ref(s);
1559
1560     playback_stream_send_killed(s);
1561     playback_stream_unlink(s);
1562 }
1563
1564 /* Called from main context */
1565 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1566     playback_stream *s;
1567     pa_tagstruct *t;
1568
1569     pa_sink_input_assert_ref(i);
1570     s = PLAYBACK_STREAM(i->userdata);
1571     playback_stream_assert_ref(s);
1572
1573     if (s->connection->version < 15)
1574       return;
1575
1576     t = pa_tagstruct_new(NULL, 0);
1577     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1578     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1579     pa_tagstruct_putu32(t, s->index);
1580     pa_tagstruct_puts(t, event);
1581     pa_tagstruct_put_proplist(t, pl);
1582     pa_pstream_send_tagstruct(s->connection->pstream, t);
1583 }
1584
1585 /* Called from main context */
1586 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1587     playback_stream *s;
1588     pa_tagstruct *t;
1589
1590     pa_sink_input_assert_ref(i);
1591     s = PLAYBACK_STREAM(i->userdata);
1592     playback_stream_assert_ref(s);
1593
1594     if (s->connection->version < 12)
1595       return;
1596
1597     t = pa_tagstruct_new(NULL, 0);
1598     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1599     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1600     pa_tagstruct_putu32(t, s->index);
1601     pa_tagstruct_put_boolean(t, suspend);
1602     pa_pstream_send_tagstruct(s->connection->pstream, t);
1603 }
1604
1605 /* Called from main context */
1606 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1607     playback_stream *s;
1608     pa_tagstruct *t;
1609
1610     pa_sink_input_assert_ref(i);
1611     s = PLAYBACK_STREAM(i->userdata);
1612     playback_stream_assert_ref(s);
1613
1614     fix_playback_buffer_attr(s);
1615     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1616     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1617
1618     if (s->connection->version < 12)
1619       return;
1620
1621     t = pa_tagstruct_new(NULL, 0);
1622     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1623     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1624     pa_tagstruct_putu32(t, s->index);
1625     pa_tagstruct_putu32(t, dest->index);
1626     pa_tagstruct_puts(t, dest->name);
1627     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1628
1629     if (s->connection->version >= 13) {
1630         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1631         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1632         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1633         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1634         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1635     }
1636
1637     pa_pstream_send_tagstruct(s->connection->pstream, t);
1638 }
1639
1640 /*** source_output callbacks ***/
1641
1642 /* Called from thread context */
1643 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1644     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1645     record_stream *s;
1646
1647     pa_source_output_assert_ref(o);
1648     s = RECORD_STREAM(o->userdata);
1649     record_stream_assert_ref(s);
1650
1651     switch (code) {
1652         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1653             /* Atomically get a snapshot of all timing parameters... */
1654             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1655             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1656             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1657             return 0;
1658     }
1659
1660     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1661 }
1662
1663 /* Called from thread context */
1664 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1665     record_stream *s;
1666
1667     pa_source_output_assert_ref(o);
1668     s = RECORD_STREAM(o->userdata);
1669     record_stream_assert_ref(s);
1670     pa_assert(chunk);
1671
1672     pa_atomic_add(&s->on_the_fly, chunk->length);
1673     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1674 }
1675
1676 static void source_output_kill_cb(pa_source_output *o) {
1677     record_stream *s;
1678
1679     pa_source_output_assert_ref(o);
1680     s = RECORD_STREAM(o->userdata);
1681     record_stream_assert_ref(s);
1682
1683     record_stream_send_killed(s);
1684     record_stream_unlink(s);
1685 }
1686
1687 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1688     record_stream *s;
1689
1690     pa_source_output_assert_ref(o);
1691     s = RECORD_STREAM(o->userdata);
1692     record_stream_assert_ref(s);
1693
1694     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1695
1696     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1697 }
1698
1699 /* Called from main context */
1700 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1701     record_stream *s;
1702     pa_tagstruct *t;
1703
1704     pa_source_output_assert_ref(o);
1705     s = RECORD_STREAM(o->userdata);
1706     record_stream_assert_ref(s);
1707
1708     if (s->connection->version < 15)
1709       return;
1710
1711     t = pa_tagstruct_new(NULL, 0);
1712     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1713     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1714     pa_tagstruct_putu32(t, s->index);
1715     pa_tagstruct_puts(t, event);
1716     pa_tagstruct_put_proplist(t, pl);
1717     pa_pstream_send_tagstruct(s->connection->pstream, t);
1718 }
1719
1720 /* Called from main context */
1721 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1722     record_stream *s;
1723     pa_tagstruct *t;
1724
1725     pa_source_output_assert_ref(o);
1726     s = RECORD_STREAM(o->userdata);
1727     record_stream_assert_ref(s);
1728
1729     if (s->connection->version < 12)
1730       return;
1731
1732     t = pa_tagstruct_new(NULL, 0);
1733     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1734     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1735     pa_tagstruct_putu32(t, s->index);
1736     pa_tagstruct_put_boolean(t, suspend);
1737     pa_pstream_send_tagstruct(s->connection->pstream, t);
1738 }
1739
1740 /* Called from main context */
1741 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1742     record_stream *s;
1743     pa_tagstruct *t;
1744
1745     pa_source_output_assert_ref(o);
1746     s = RECORD_STREAM(o->userdata);
1747     record_stream_assert_ref(s);
1748
1749     fix_record_buffer_attr_pre(s);
1750     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1751     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1752     fix_record_buffer_attr_post(s);
1753
1754     if (s->connection->version < 12)
1755       return;
1756
1757     t = pa_tagstruct_new(NULL, 0);
1758     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1759     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1760     pa_tagstruct_putu32(t, s->index);
1761     pa_tagstruct_putu32(t, dest->index);
1762     pa_tagstruct_puts(t, dest->name);
1763     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1764
1765     if (s->connection->version >= 13) {
1766         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1767         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1768         pa_tagstruct_put_usec(t, s->configured_source_latency);
1769     }
1770
1771     pa_pstream_send_tagstruct(s->connection->pstream, t);
1772 }
1773
1774 /*** pdispatch callbacks ***/
1775
1776 static void protocol_error(pa_native_connection *c) {
1777     pa_log("protocol error, kicking client");
1778     native_connection_unlink(c);
1779 }
1780
1781 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1782 if (!(expression)) { \
1783     pa_pstream_send_error((pstream), (tag), (error)); \
1784     return; \
1785 } \
1786 } while(0);
1787
1788 static pa_tagstruct *reply_new(uint32_t tag) {
1789     pa_tagstruct *reply;
1790
1791     reply = pa_tagstruct_new(NULL, 0);
1792     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1793     pa_tagstruct_putu32(reply, tag);
1794     return reply;
1795 }
1796
1797 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1798     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1799     playback_stream *s;
1800     uint32_t sink_index, syncid, missing;
1801     pa_buffer_attr attr;
1802     const char *name = NULL, *sink_name;
1803     pa_sample_spec ss;
1804     pa_channel_map map;
1805     pa_tagstruct *reply;
1806     pa_sink *sink = NULL;
1807     pa_cvolume volume;
1808     pa_bool_t
1809         corked = FALSE,
1810         no_remap = FALSE,
1811         no_remix = FALSE,
1812         fix_format = FALSE,
1813         fix_rate = FALSE,
1814         fix_channels = FALSE,
1815         no_move = FALSE,
1816         variable_rate = FALSE,
1817         muted = FALSE,
1818         adjust_latency = FALSE,
1819         early_requests = FALSE,
1820         dont_inhibit_auto_suspend = FALSE,
1821         muted_set = FALSE,
1822         fail_on_suspend = FALSE;
1823     pa_sink_input_flags_t flags = 0;
1824     pa_proplist *p;
1825     pa_bool_t volume_set = TRUE;
1826     int ret = PA_ERR_INVALID;
1827
1828     pa_native_connection_assert_ref(c);
1829     pa_assert(t);
1830     memset(&attr, 0, sizeof(attr));
1831
1832     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1833         pa_tagstruct_get(
1834                 t,
1835                 PA_TAG_SAMPLE_SPEC, &ss,
1836                 PA_TAG_CHANNEL_MAP, &map,
1837                 PA_TAG_U32, &sink_index,
1838                 PA_TAG_STRING, &sink_name,
1839                 PA_TAG_U32, &attr.maxlength,
1840                 PA_TAG_BOOLEAN, &corked,
1841                 PA_TAG_U32, &attr.tlength,
1842                 PA_TAG_U32, &attr.prebuf,
1843                 PA_TAG_U32, &attr.minreq,
1844                 PA_TAG_U32, &syncid,
1845                 PA_TAG_CVOLUME, &volume,
1846                 PA_TAG_INVALID) < 0) {
1847
1848         protocol_error(c);
1849         return;
1850     }
1851
1852     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1853     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1854     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1855     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1856     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1857     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1858     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1859     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1860
1861     p = pa_proplist_new();
1862
1863     if (name)
1864         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1865
1866     if (c->version >= 12)  {
1867         /* Since 0.9.8 the user can ask for a couple of additional flags */
1868
1869         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1870             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1871             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1872             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1873             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1874             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1875             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1876
1877             protocol_error(c);
1878             pa_proplist_free(p);
1879             return;
1880         }
1881     }
1882
1883     if (c->version >= 13) {
1884
1885         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1886             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1887             pa_tagstruct_get_proplist(t, p) < 0) {
1888             protocol_error(c);
1889             pa_proplist_free(p);
1890             return;
1891         }
1892     }
1893
1894     if (c->version >= 14) {
1895
1896         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1897             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1898             protocol_error(c);
1899             pa_proplist_free(p);
1900             return;
1901         }
1902     }
1903
1904     if (c->version >= 15) {
1905
1906         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1907             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1908             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1909             protocol_error(c);
1910             pa_proplist_free(p);
1911             return;
1912         }
1913     }
1914
1915     if (!pa_tagstruct_eof(t)) {
1916         protocol_error(c);
1917         pa_proplist_free(p);
1918         return;
1919     }
1920
1921     if (sink_index != PA_INVALID_INDEX) {
1922
1923         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1924             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1925             pa_proplist_free(p);
1926             return;
1927         }
1928
1929     } else if (sink_name) {
1930
1931         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1932             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1933             pa_proplist_free(p);
1934             return;
1935         }
1936     }
1937
1938     flags =
1939         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1940         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1941         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1942         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1943         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1944         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1945         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1946         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1947         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1948         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1949
1950     /* Only since protocol version 15 there's a seperate muted_set
1951      * flag. For older versions we synthesize it here */
1952     muted_set = muted_set || muted;
1953
1954     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1955     pa_proplist_free(p);
1956
1957     CHECK_VALIDITY(c->pstream, s, tag, ret);
1958
1959     reply = reply_new(tag);
1960     pa_tagstruct_putu32(reply, s->index);
1961     pa_assert(s->sink_input);
1962     pa_tagstruct_putu32(reply, s->sink_input->index);
1963     pa_tagstruct_putu32(reply, missing);
1964
1965 /*     pa_log("initial request is %u", missing); */
1966
1967     if (c->version >= 9) {
1968         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1969
1970         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1971         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1972         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1973         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1974     }
1975
1976     if (c->version >= 12) {
1977         /* Since 0.9.8 we support sending the chosen sample
1978          * spec/channel map/device/suspend status back to the
1979          * client */
1980
1981         pa_tagstruct_put_sample_spec(reply, &ss);
1982         pa_tagstruct_put_channel_map(reply, &map);
1983
1984         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1985         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1986
1987         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
1988     }
1989
1990     if (c->version >= 13)
1991         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
1992
1993     pa_pstream_send_tagstruct(c->pstream, reply);
1994 }
1995
1996 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1997     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1998     uint32_t channel;
1999
2000     pa_native_connection_assert_ref(c);
2001     pa_assert(t);
2002
2003     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2004         !pa_tagstruct_eof(t)) {
2005         protocol_error(c);
2006         return;
2007     }
2008
2009     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2010
2011     switch (command) {
2012
2013         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2014             playback_stream *s;
2015             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2016                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2017                 return;
2018             }
2019
2020             playback_stream_unlink(s);
2021             break;
2022         }
2023
2024         case PA_COMMAND_DELETE_RECORD_STREAM: {
2025             record_stream *s;
2026             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2027                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2028                 return;
2029             }
2030
2031             record_stream_unlink(s);
2032             break;
2033         }
2034
2035         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2036             upload_stream *s;
2037
2038             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2039                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2040                 return;
2041             }
2042
2043             upload_stream_unlink(s);
2044             break;
2045         }
2046
2047         default:
2048             pa_assert_not_reached();
2049     }
2050
2051     pa_pstream_send_simple_ack(c->pstream, tag);
2052 }
2053
2054 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2055     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2056     record_stream *s;
2057     pa_buffer_attr attr;
2058     uint32_t source_index;
2059     const char *name = NULL, *source_name;
2060     pa_sample_spec ss;
2061     pa_channel_map map;
2062     pa_tagstruct *reply;
2063     pa_source *source = NULL;
2064     pa_bool_t
2065         corked = FALSE,
2066         no_remap = FALSE,
2067         no_remix = FALSE,
2068         fix_format = FALSE,
2069         fix_rate = FALSE,
2070         fix_channels = FALSE,
2071         no_move = FALSE,
2072         variable_rate = FALSE,
2073         adjust_latency = FALSE,
2074         peak_detect = FALSE,
2075         early_requests = FALSE,
2076         dont_inhibit_auto_suspend = FALSE,
2077         fail_on_suspend = FALSE;
2078     pa_source_output_flags_t flags = 0;
2079     pa_proplist *p;
2080     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2081     pa_sink_input *direct_on_input = NULL;
2082     int ret = PA_ERR_INVALID;
2083
2084     pa_native_connection_assert_ref(c);
2085     pa_assert(t);
2086
2087     memset(&attr, 0, sizeof(attr));
2088
2089     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2090         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2091         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2092         pa_tagstruct_getu32(t, &source_index) < 0 ||
2093         pa_tagstruct_gets(t, &source_name) < 0 ||
2094         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2095         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2096         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2097         protocol_error(c);
2098         return;
2099     }
2100
2101     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2102     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2103     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2104     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2105     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2106     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2107     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2108
2109     p = pa_proplist_new();
2110
2111     if (name)
2112         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2113
2114     if (c->version >= 12)  {
2115         /* Since 0.9.8 the user can ask for a couple of additional flags */
2116
2117         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2118             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2119             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2120             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2121             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2122             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2123             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2124
2125             protocol_error(c);
2126             pa_proplist_free(p);
2127             return;
2128         }
2129     }
2130
2131     if (c->version >= 13) {
2132
2133         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2134             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2135             pa_tagstruct_get_proplist(t, p) < 0 ||
2136             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2137             protocol_error(c);
2138             pa_proplist_free(p);
2139             return;
2140         }
2141     }
2142
2143     if (c->version >= 14) {
2144
2145         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2146             protocol_error(c);
2147             pa_proplist_free(p);
2148             return;
2149         }
2150     }
2151
2152     if (c->version >= 15) {
2153
2154         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2155             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2156             protocol_error(c);
2157             pa_proplist_free(p);
2158             return;
2159         }
2160     }
2161
2162     if (!pa_tagstruct_eof(t)) {
2163         protocol_error(c);
2164         pa_proplist_free(p);
2165         return;
2166     }
2167
2168     if (source_index != PA_INVALID_INDEX) {
2169
2170         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2171             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2172             pa_proplist_free(p);
2173             return;
2174         }
2175
2176     } else if (source_name) {
2177
2178         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2179             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2180             pa_proplist_free(p);
2181             return;
2182         }
2183     }
2184
2185     if (direct_on_input_idx != PA_INVALID_INDEX) {
2186
2187         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2188             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2189             pa_proplist_free(p);
2190             return;
2191         }
2192     }
2193
2194     flags =
2195         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2196         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2197         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2198         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2199         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2200         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2201         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2202         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2203         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2204         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2205
2206     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2207     pa_proplist_free(p);
2208
2209     CHECK_VALIDITY(c->pstream, s, tag, ret);
2210
2211     reply = reply_new(tag);
2212     pa_tagstruct_putu32(reply, s->index);
2213     pa_assert(s->source_output);
2214     pa_tagstruct_putu32(reply, s->source_output->index);
2215
2216     if (c->version >= 9) {
2217         /* Since 0.9 we support sending the buffer metrics back to the client */
2218
2219         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2220         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2221     }
2222
2223     if (c->version >= 12) {
2224         /* Since 0.9.8 we support sending the chosen sample
2225          * spec/channel map/device/suspend status back to the
2226          * client */
2227
2228         pa_tagstruct_put_sample_spec(reply, &ss);
2229         pa_tagstruct_put_channel_map(reply, &map);
2230
2231         pa_tagstruct_putu32(reply, s->source_output->source->index);
2232         pa_tagstruct_puts(reply, s->source_output->source->name);
2233
2234         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2235     }
2236
2237     if (c->version >= 13)
2238         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2239
2240     pa_pstream_send_tagstruct(c->pstream, reply);
2241 }
2242
2243 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2244     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2245     int ret;
2246
2247     pa_native_connection_assert_ref(c);
2248     pa_assert(t);
2249
2250     if (!pa_tagstruct_eof(t)) {
2251         protocol_error(c);
2252         return;
2253     }
2254
2255     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2256     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2257     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2258
2259     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2260 }
2261
2262 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2263     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2264     const void*cookie;
2265     pa_tagstruct *reply;
2266     pa_bool_t shm_on_remote = FALSE, do_shm;
2267
2268     pa_native_connection_assert_ref(c);
2269     pa_assert(t);
2270
2271     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2272         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2273         !pa_tagstruct_eof(t)) {
2274         protocol_error(c);
2275         return;
2276     }
2277
2278     /* Minimum supported version */
2279     if (c->version < 8) {
2280         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2281         return;
2282     }
2283
2284     /* Starting with protocol version 13 the MSB of the version tag
2285        reflects if shm is available for this pa_native_connection or
2286        not. */
2287     if (c->version >= 13) {
2288         shm_on_remote = !!(c->version & 0x80000000U);
2289         c->version &= 0x7FFFFFFFU;
2290     }
2291
2292     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2293
2294     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2295
2296     if (!c->authorized) {
2297         pa_bool_t success = FALSE;
2298
2299 #ifdef HAVE_CREDS
2300         const pa_creds *creds;
2301
2302         if ((creds = pa_pdispatch_creds(pd))) {
2303             if (creds->uid == getuid())
2304                 success = TRUE;
2305             else if (c->options->auth_group) {
2306                 int r;
2307                 gid_t gid;
2308
2309                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2310                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2311                 else if (gid == creds->gid)
2312                     success = TRUE;
2313
2314                 if (!success) {
2315                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2316                         pa_log_warn("Failed to check group membership.");
2317                     else if (r > 0)
2318                         success = TRUE;
2319                 }
2320             }
2321
2322             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2323                         (unsigned long) creds->uid,
2324                         (unsigned long) creds->gid,
2325                         (int) success);
2326         }
2327 #endif
2328
2329         if (!success && c->options->auth_cookie) {
2330             const uint8_t *ac;
2331
2332             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2333                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2334                     success = TRUE;
2335         }
2336
2337         if (!success) {
2338             pa_log_warn("Denied access to client with invalid authorization data.");
2339             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2340             return;
2341         }
2342
2343         c->authorized = TRUE;
2344         if (c->auth_timeout_event) {
2345             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2346             c->auth_timeout_event = NULL;
2347         }
2348     }
2349
2350     /* Enable shared memory support if possible */
2351     do_shm =
2352         pa_mempool_is_shared(c->protocol->core->mempool) &&
2353         c->is_local;
2354
2355     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2356
2357     if (do_shm)
2358         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2359             do_shm = FALSE;
2360
2361 #ifdef HAVE_CREDS
2362     if (do_shm) {
2363         /* Only enable SHM if both sides are owned by the same
2364          * user. This is a security measure because otherwise data
2365          * private to the user might leak. */
2366
2367         const pa_creds *creds;
2368         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2369             do_shm = FALSE;
2370     }
2371 #endif
2372
2373     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2374     pa_pstream_enable_shm(c->pstream, do_shm);
2375
2376     reply = reply_new(tag);
2377     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2378
2379 #ifdef HAVE_CREDS
2380 {
2381     /* SHM support is only enabled after both sides made sure they are the same user. */
2382
2383     pa_creds ucred;
2384
2385     ucred.uid = getuid();
2386     ucred.gid = getgid();
2387
2388     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2389 }
2390 #else
2391     pa_pstream_send_tagstruct(c->pstream, reply);
2392 #endif
2393 }
2394
2395 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2396     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2397     const char *name = NULL;
2398     pa_proplist *p;
2399     pa_tagstruct *reply;
2400
2401     pa_native_connection_assert_ref(c);
2402     pa_assert(t);
2403
2404     p = pa_proplist_new();
2405
2406     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2407         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2408         !pa_tagstruct_eof(t)) {
2409
2410         protocol_error(c);
2411         pa_proplist_free(p);
2412         return;
2413     }
2414
2415     if (name)
2416         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2417             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2418             pa_proplist_free(p);
2419             return;
2420         }
2421
2422     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2423     pa_proplist_free(p);
2424
2425     reply = reply_new(tag);
2426
2427     if (c->version >= 13)
2428         pa_tagstruct_putu32(reply, c->client->index);
2429
2430     pa_pstream_send_tagstruct(c->pstream, reply);
2431 }
2432
2433 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2434     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2435     const char *name;
2436     uint32_t idx = PA_IDXSET_INVALID;
2437
2438     pa_native_connection_assert_ref(c);
2439     pa_assert(t);
2440
2441     if (pa_tagstruct_gets(t, &name) < 0 ||
2442         !pa_tagstruct_eof(t)) {
2443         protocol_error(c);
2444         return;
2445     }
2446
2447     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2448     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2449
2450     if (command == PA_COMMAND_LOOKUP_SINK) {
2451         pa_sink *sink;
2452         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2453             idx = sink->index;
2454     } else {
2455         pa_source *source;
2456         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2457         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2458             idx = source->index;
2459     }
2460
2461     if (idx == PA_IDXSET_INVALID)
2462         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2463     else {
2464         pa_tagstruct *reply;
2465         reply = reply_new(tag);
2466         pa_tagstruct_putu32(reply, idx);
2467         pa_pstream_send_tagstruct(c->pstream, reply);
2468     }
2469 }
2470
2471 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2472     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2473     uint32_t idx;
2474     playback_stream *s;
2475
2476     pa_native_connection_assert_ref(c);
2477     pa_assert(t);
2478
2479     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2480         !pa_tagstruct_eof(t)) {
2481         protocol_error(c);
2482         return;
2483     }
2484
2485     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2486     s = pa_idxset_get_by_index(c->output_streams, idx);
2487     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2488     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2489
2490     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2491 }
2492
2493 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2494     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2495     pa_tagstruct *reply;
2496     const pa_mempool_stat *stat;
2497
2498     pa_native_connection_assert_ref(c);
2499     pa_assert(t);
2500
2501     if (!pa_tagstruct_eof(t)) {
2502         protocol_error(c);
2503         return;
2504     }
2505
2506     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2507
2508     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2509
2510     reply = reply_new(tag);
2511     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2512     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2513     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2514     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2515     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2516     pa_pstream_send_tagstruct(c->pstream, reply);
2517 }
2518
2519 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2520     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2521     pa_tagstruct *reply;
2522     playback_stream *s;
2523     struct timeval tv, now;
2524     uint32_t idx;
2525
2526     pa_native_connection_assert_ref(c);
2527     pa_assert(t);
2528
2529     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2530         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2531         !pa_tagstruct_eof(t)) {
2532         protocol_error(c);
2533         return;
2534     }
2535
2536     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2537     s = pa_idxset_get_by_index(c->output_streams, idx);
2538     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2539     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2540
2541     /* Get an atomic snapshot of all timing parameters */
2542     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2543
2544     reply = reply_new(tag);
2545     pa_tagstruct_put_usec(reply,
2546                           s->current_sink_latency +
2547                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec));
2548     pa_tagstruct_put_usec(reply, 0);
2549     pa_tagstruct_put_boolean(reply,
2550                              s->playing_for > 0 &&
2551                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2552                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2553     pa_tagstruct_put_timeval(reply, &tv);
2554     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2555     pa_tagstruct_puts64(reply, s->write_index);
2556     pa_tagstruct_puts64(reply, s->read_index);
2557
2558     if (c->version >= 13) {
2559         pa_tagstruct_putu64(reply, s->underrun_for);
2560         pa_tagstruct_putu64(reply, s->playing_for);
2561     }
2562
2563     pa_pstream_send_tagstruct(c->pstream, reply);
2564 }
2565
2566 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2567     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2568     pa_tagstruct *reply;
2569     record_stream *s;
2570     struct timeval tv, now;
2571     uint32_t idx;
2572
2573     pa_native_connection_assert_ref(c);
2574     pa_assert(t);
2575
2576     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2577         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2578         !pa_tagstruct_eof(t)) {
2579         protocol_error(c);
2580         return;
2581     }
2582
2583     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2584     s = pa_idxset_get_by_index(c->record_streams, idx);
2585     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2586
2587     /* Get an atomic snapshot of all timing parameters */
2588     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2589
2590     reply = reply_new(tag);
2591     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2592     pa_tagstruct_put_usec(reply,
2593                           s->current_source_latency +
2594                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2595     pa_tagstruct_put_boolean(reply,
2596                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2597                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2598     pa_tagstruct_put_timeval(reply, &tv);
2599     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2600     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2601     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2602     pa_pstream_send_tagstruct(c->pstream, reply);
2603 }
2604
2605 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2606     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2607     upload_stream *s;
2608     uint32_t length;
2609     const char *name = NULL;
2610     pa_sample_spec ss;
2611     pa_channel_map map;
2612     pa_tagstruct *reply;
2613     pa_proplist *p;
2614
2615     pa_native_connection_assert_ref(c);
2616     pa_assert(t);
2617
2618     if (pa_tagstruct_gets(t, &name) < 0 ||
2619         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2620         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2621         pa_tagstruct_getu32(t, &length) < 0) {
2622         protocol_error(c);
2623         return;
2624     }
2625
2626     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2627     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2628     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2629     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2630     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2631     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2632
2633     p = pa_proplist_new();
2634
2635     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2636         !pa_tagstruct_eof(t)) {
2637
2638         protocol_error(c);
2639         pa_proplist_free(p);
2640         return;
2641     }
2642
2643     if (c->version < 13)
2644         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2645     else if (!name)
2646         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2647             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2648
2649     if (!name || !pa_namereg_is_valid_name(name)) {
2650         pa_proplist_free(p);
2651         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2652     }
2653
2654     s = upload_stream_new(c, &ss, &map, name, length, p);
2655     pa_proplist_free(p);
2656
2657     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2658
2659     reply = reply_new(tag);
2660     pa_tagstruct_putu32(reply, s->index);
2661     pa_tagstruct_putu32(reply, length);
2662     pa_pstream_send_tagstruct(c->pstream, reply);
2663 }
2664
2665 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2666     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2667     uint32_t channel;
2668     upload_stream *s;
2669     uint32_t idx;
2670
2671     pa_native_connection_assert_ref(c);
2672     pa_assert(t);
2673
2674     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2675         !pa_tagstruct_eof(t)) {
2676         protocol_error(c);
2677         return;
2678     }
2679
2680     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2681
2682     s = pa_idxset_get_by_index(c->output_streams, channel);
2683     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2684     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2685
2686     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2687         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2688     else
2689         pa_pstream_send_simple_ack(c->pstream, tag);
2690
2691     upload_stream_unlink(s);
2692 }
2693
2694 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2695     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2696     uint32_t sink_index;
2697     pa_volume_t volume;
2698     pa_sink *sink;
2699     const char *name, *sink_name;
2700     uint32_t idx;
2701     pa_proplist *p;
2702     pa_tagstruct *reply;
2703
2704     pa_native_connection_assert_ref(c);
2705     pa_assert(t);
2706
2707     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2708
2709     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2710         pa_tagstruct_gets(t, &sink_name) < 0 ||
2711         pa_tagstruct_getu32(t, &volume) < 0 ||
2712         pa_tagstruct_gets(t, &name) < 0) {
2713         protocol_error(c);
2714         return;
2715     }
2716
2717     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2718     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2719     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2720     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2721
2722     if (sink_index != PA_INVALID_INDEX)
2723         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2724     else
2725         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2726
2727     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2728
2729     p = pa_proplist_new();
2730
2731     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2732         !pa_tagstruct_eof(t)) {
2733         protocol_error(c);
2734         pa_proplist_free(p);
2735         return;
2736     }
2737
2738     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2739
2740     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2741         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2742         pa_proplist_free(p);
2743         return;
2744     }
2745
2746     pa_proplist_free(p);
2747
2748     reply = reply_new(tag);
2749
2750     if (c->version >= 13)
2751         pa_tagstruct_putu32(reply, idx);
2752
2753     pa_pstream_send_tagstruct(c->pstream, reply);
2754 }
2755
2756 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2757     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2758     const char *name;
2759
2760     pa_native_connection_assert_ref(c);
2761     pa_assert(t);
2762
2763     if (pa_tagstruct_gets(t, &name) < 0 ||
2764         !pa_tagstruct_eof(t)) {
2765         protocol_error(c);
2766         return;
2767     }
2768
2769     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2770     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2771
2772     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2773         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2774         return;
2775     }
2776
2777     pa_pstream_send_simple_ack(c->pstream, tag);
2778 }
2779
2780 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2781     pa_assert(c);
2782     pa_assert(fixed);
2783     pa_assert(original);
2784
2785     *fixed = *original;
2786
2787     if (c->version < 12) {
2788         /* Before protocol version 12 we didn't support S32 samples,
2789          * so we need to lie about this to the client */
2790
2791         if (fixed->format == PA_SAMPLE_S32LE)
2792             fixed->format = PA_SAMPLE_FLOAT32LE;
2793         if (fixed->format == PA_SAMPLE_S32BE)
2794             fixed->format = PA_SAMPLE_FLOAT32BE;
2795     }
2796
2797     if (c->version < 15) {
2798         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2799             fixed->format = PA_SAMPLE_FLOAT32LE;
2800         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2801             fixed->format = PA_SAMPLE_FLOAT32BE;
2802     }
2803 }
2804
2805 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2806     pa_sample_spec fixed_ss;
2807
2808     pa_assert(t);
2809     pa_sink_assert_ref(sink);
2810
2811     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2812
2813     pa_tagstruct_put(
2814         t,
2815         PA_TAG_U32, sink->index,
2816         PA_TAG_STRING, sink->name,
2817         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2818         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2819         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2820         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2821         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
2822         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2823         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2824         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2825         PA_TAG_USEC, pa_sink_get_latency(sink),
2826         PA_TAG_STRING, sink->driver,
2827         PA_TAG_U32, sink->flags,
2828         PA_TAG_INVALID);
2829
2830     if (c->version >= 13) {
2831         pa_tagstruct_put_proplist(t, sink->proplist);
2832         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2833     }
2834
2835     if (c->version >= 15) {
2836         pa_tagstruct_put_volume(t, sink->base_volume);
2837         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2838             pa_log_error("Internal sink state is invalid.");
2839         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2840         pa_tagstruct_putu32(t, sink->n_volume_steps);
2841         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2842     }
2843 }
2844
2845 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2846     pa_sample_spec fixed_ss;
2847
2848     pa_assert(t);
2849     pa_source_assert_ref(source);
2850
2851     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2852
2853     pa_tagstruct_put(
2854         t,
2855         PA_TAG_U32, source->index,
2856         PA_TAG_STRING, source->name,
2857         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2858         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2859         PA_TAG_CHANNEL_MAP, &source->channel_map,
2860         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2861         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2862         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2863         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2864         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2865         PA_TAG_USEC, pa_source_get_latency(source),
2866         PA_TAG_STRING, source->driver,
2867         PA_TAG_U32, source->flags,
2868         PA_TAG_INVALID);
2869
2870     if (c->version >= 13) {
2871         pa_tagstruct_put_proplist(t, source->proplist);
2872         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2873     }
2874
2875     if (c->version >= 15) {
2876         pa_tagstruct_put_volume(t, source->base_volume);
2877         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2878             pa_log_error("Internal source state is invalid.");
2879         pa_tagstruct_putu32(t, pa_source_get_state(source));
2880         pa_tagstruct_putu32(t, source->n_volume_steps);
2881         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2882     }
2883 }
2884
2885 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2886     pa_assert(t);
2887     pa_assert(client);
2888
2889     pa_tagstruct_putu32(t, client->index);
2890     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2891     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2892     pa_tagstruct_puts(t, client->driver);
2893
2894     if (c->version >= 13)
2895         pa_tagstruct_put_proplist(t, client->proplist);
2896 }
2897
2898 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2899     void *state = NULL;
2900     pa_card_profile *p;
2901
2902     pa_assert(t);
2903     pa_assert(card);
2904
2905     pa_tagstruct_putu32(t, card->index);
2906     pa_tagstruct_puts(t, card->name);
2907     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2908     pa_tagstruct_puts(t, card->driver);
2909
2910     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2911
2912     if (card->profiles) {
2913         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2914             pa_tagstruct_puts(t, p->name);
2915             pa_tagstruct_puts(t, p->description);
2916             pa_tagstruct_putu32(t, p->n_sinks);
2917             pa_tagstruct_putu32(t, p->n_sources);
2918             pa_tagstruct_putu32(t, p->priority);
2919         }
2920     }
2921
2922     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2923     pa_tagstruct_put_proplist(t, card->proplist);
2924 }
2925
2926 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2927     pa_assert(t);
2928     pa_assert(module);
2929
2930     pa_tagstruct_putu32(t, module->index);
2931     pa_tagstruct_puts(t, module->name);
2932     pa_tagstruct_puts(t, module->argument);
2933     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2934
2935     if (c->version < 15)
2936         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2937
2938     if (c->version >= 15)
2939         pa_tagstruct_put_proplist(t, module->proplist);
2940 }
2941
2942 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2943     pa_sample_spec fixed_ss;
2944     pa_usec_t sink_latency;
2945
2946     pa_assert(t);
2947     pa_sink_input_assert_ref(s);
2948
2949     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2950
2951     pa_tagstruct_putu32(t, s->index);
2952     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2953     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2954     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2955     pa_tagstruct_putu32(t, s->sink->index);
2956     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2957     pa_tagstruct_put_channel_map(t, &s->channel_map);
2958     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s));
2959     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
2960     pa_tagstruct_put_usec(t, sink_latency);
2961     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
2962     pa_tagstruct_puts(t, s->driver);
2963     if (c->version >= 11)
2964         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
2965     if (c->version >= 13)
2966         pa_tagstruct_put_proplist(t, s->proplist);
2967 }
2968
2969 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
2970     pa_sample_spec fixed_ss;
2971     pa_usec_t source_latency;
2972
2973     pa_assert(t);
2974     pa_source_output_assert_ref(s);
2975
2976     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2977
2978     pa_tagstruct_putu32(t, s->index);
2979     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2980     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2981     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2982     pa_tagstruct_putu32(t, s->source->index);
2983     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2984     pa_tagstruct_put_channel_map(t, &s->channel_map);
2985     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
2986     pa_tagstruct_put_usec(t, source_latency);
2987     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
2988     pa_tagstruct_puts(t, s->driver);
2989
2990     if (c->version >= 13)
2991         pa_tagstruct_put_proplist(t, s->proplist);
2992 }
2993
2994 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
2995     pa_sample_spec fixed_ss;
2996     pa_cvolume v;
2997
2998     pa_assert(t);
2999     pa_assert(e);
3000
3001     if (e->memchunk.memblock)
3002         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3003     else
3004         memset(&fixed_ss, 0, sizeof(fixed_ss));
3005
3006     pa_tagstruct_putu32(t, e->index);
3007     pa_tagstruct_puts(t, e->name);
3008
3009     if (e->volume_is_set)
3010         v = e->volume;
3011     else
3012         pa_cvolume_init(&v);
3013
3014     pa_tagstruct_put_cvolume(t, &v);
3015     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3016     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3017     pa_tagstruct_put_channel_map(t, &e->channel_map);
3018     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3019     pa_tagstruct_put_boolean(t, e->lazy);
3020     pa_tagstruct_puts(t, e->filename);
3021
3022     if (c->version >= 13)
3023         pa_tagstruct_put_proplist(t, e->proplist);
3024 }
3025
3026 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3027     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3028     uint32_t idx;
3029     pa_sink *sink = NULL;
3030     pa_source *source = NULL;
3031     pa_client *client = NULL;
3032     pa_card *card = NULL;
3033     pa_module *module = NULL;
3034     pa_sink_input *si = NULL;
3035     pa_source_output *so = NULL;
3036     pa_scache_entry *sce = NULL;
3037     const char *name = NULL;
3038     pa_tagstruct *reply;
3039
3040     pa_native_connection_assert_ref(c);
3041     pa_assert(t);
3042
3043     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3044         (command != PA_COMMAND_GET_CLIENT_INFO &&
3045          command != PA_COMMAND_GET_MODULE_INFO &&
3046          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3047          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3048          pa_tagstruct_gets(t, &name) < 0) ||
3049         !pa_tagstruct_eof(t)) {
3050         protocol_error(c);
3051         return;
3052     }
3053
3054     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3055     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3056     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3057     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3058     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3059
3060     if (command == PA_COMMAND_GET_SINK_INFO) {
3061         if (idx != PA_INVALID_INDEX)
3062             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3063         else
3064             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3065     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3066         if (idx != PA_INVALID_INDEX)
3067             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3068         else
3069             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3070     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3071         if (idx != PA_INVALID_INDEX)
3072             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3073         else
3074             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3075     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3076         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3077     else if (command == PA_COMMAND_GET_MODULE_INFO)
3078         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3079     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3080         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3081     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3082         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3083     else {
3084         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3085         if (idx != PA_INVALID_INDEX)
3086             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3087         else
3088             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3089     }
3090
3091     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3092         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3093         return;
3094     }
3095
3096     reply = reply_new(tag);
3097     if (sink)
3098         sink_fill_tagstruct(c, reply, sink);
3099     else if (source)
3100         source_fill_tagstruct(c, reply, source);
3101     else if (client)
3102         client_fill_tagstruct(c, reply, client);
3103     else if (card)
3104         card_fill_tagstruct(c, reply, card);
3105     else if (module)
3106         module_fill_tagstruct(c, reply, module);
3107     else if (si)
3108         sink_input_fill_tagstruct(c, reply, si);
3109     else if (so)
3110         source_output_fill_tagstruct(c, reply, so);
3111     else
3112         scache_fill_tagstruct(c, reply, sce);
3113     pa_pstream_send_tagstruct(c->pstream, reply);
3114 }
3115
3116 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3117     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3118     pa_idxset *i;
3119     uint32_t idx;
3120     void *p;
3121     pa_tagstruct *reply;
3122
3123     pa_native_connection_assert_ref(c);
3124     pa_assert(t);
3125
3126     if (!pa_tagstruct_eof(t)) {
3127         protocol_error(c);
3128         return;
3129     }
3130
3131     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3132
3133     reply = reply_new(tag);
3134
3135     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3136         i = c->protocol->core->sinks;
3137     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3138         i = c->protocol->core->sources;
3139     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3140         i = c->protocol->core->clients;
3141     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3142         i = c->protocol->core->cards;
3143     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3144         i = c->protocol->core->modules;
3145     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3146         i = c->protocol->core->sink_inputs;
3147     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3148         i = c->protocol->core->source_outputs;
3149     else {
3150         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3151         i = c->protocol->core->scache;
3152     }
3153
3154     if (i) {
3155         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3156             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3157                 sink_fill_tagstruct(c, reply, p);
3158             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3159                 source_fill_tagstruct(c, reply, p);
3160             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3161                 client_fill_tagstruct(c, reply, p);
3162             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3163                 card_fill_tagstruct(c, reply, p);
3164             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3165                 module_fill_tagstruct(c, reply, p);
3166             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3167                 sink_input_fill_tagstruct(c, reply, p);
3168             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3169                 source_output_fill_tagstruct(c, reply, p);
3170             else {
3171                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3172                 scache_fill_tagstruct(c, reply, p);
3173             }
3174         }
3175     }
3176
3177     pa_pstream_send_tagstruct(c->pstream, reply);
3178 }
3179
3180 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3181     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3182     pa_tagstruct *reply;
3183     char txt[256];
3184     pa_sink *def_sink;
3185     pa_source *def_source;
3186     pa_sample_spec fixed_ss;
3187
3188     pa_native_connection_assert_ref(c);
3189     pa_assert(t);
3190
3191     if (!pa_tagstruct_eof(t)) {
3192         protocol_error(c);
3193         return;
3194     }
3195
3196     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3197
3198     reply = reply_new(tag);
3199     pa_tagstruct_puts(reply, PACKAGE_NAME);
3200     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3201     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
3202     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
3203
3204     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3205     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3206
3207     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3208     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3209     def_source = pa_namereg_get_default_source(c->protocol->core);
3210     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3211
3212     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3213
3214     if (c->version >= 15)
3215         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3216
3217     pa_pstream_send_tagstruct(c->pstream, reply);
3218 }
3219
3220 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3221     pa_tagstruct *t;
3222     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3223
3224     pa_native_connection_assert_ref(c);
3225
3226     t = pa_tagstruct_new(NULL, 0);
3227     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3228     pa_tagstruct_putu32(t, (uint32_t) -1);
3229     pa_tagstruct_putu32(t, e);
3230     pa_tagstruct_putu32(t, idx);
3231     pa_pstream_send_tagstruct(c->pstream, t);
3232 }
3233
3234 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3235     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3236     pa_subscription_mask_t m;
3237
3238     pa_native_connection_assert_ref(c);
3239     pa_assert(t);
3240
3241     if (pa_tagstruct_getu32(t, &m) < 0 ||
3242         !pa_tagstruct_eof(t)) {
3243         protocol_error(c);
3244         return;
3245     }
3246
3247     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3248     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3249
3250     if (c->subscription)
3251         pa_subscription_free(c->subscription);
3252
3253     if (m != 0) {
3254         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3255         pa_assert(c->subscription);
3256     } else
3257         c->subscription = NULL;
3258
3259     pa_pstream_send_simple_ack(c->pstream, tag);
3260 }
3261
3262 static void command_set_volume(
3263         pa_pdispatch *pd,
3264         uint32_t command,
3265         uint32_t tag,
3266         pa_tagstruct *t,
3267         void *userdata) {
3268
3269     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3270     uint32_t idx;
3271     pa_cvolume volume;
3272     pa_sink *sink = NULL;
3273     pa_source *source = NULL;
3274     pa_sink_input *si = NULL;
3275     const char *name = NULL;
3276
3277     pa_native_connection_assert_ref(c);
3278     pa_assert(t);
3279
3280     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3281         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3282         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3283         pa_tagstruct_get_cvolume(t, &volume) ||
3284         !pa_tagstruct_eof(t)) {
3285         protocol_error(c);
3286         return;
3287     }
3288
3289     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3290     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3291     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3292     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3293     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3294     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3295
3296     switch (command) {
3297
3298         case PA_COMMAND_SET_SINK_VOLUME:
3299             if (idx != PA_INVALID_INDEX)
3300                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3301             else
3302                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3303             break;
3304
3305         case PA_COMMAND_SET_SOURCE_VOLUME:
3306             if (idx != PA_INVALID_INDEX)
3307                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3308             else
3309                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3310             break;
3311
3312         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3313             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3314             break;
3315
3316         default:
3317             pa_assert_not_reached();
3318     }
3319
3320     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3321
3322     if (sink)
3323         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3324     else if (source)
3325         pa_source_set_volume(source, &volume);
3326     else if (si)
3327         pa_sink_input_set_volume(si, &volume, TRUE);
3328
3329     pa_pstream_send_simple_ack(c->pstream, tag);
3330 }
3331
3332 static void command_set_mute(
3333         pa_pdispatch *pd,
3334         uint32_t command,
3335         uint32_t tag,
3336         pa_tagstruct *t,
3337         void *userdata) {
3338
3339     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3340     uint32_t idx;
3341     pa_bool_t mute;
3342     pa_sink *sink = NULL;
3343     pa_source *source = NULL;
3344     pa_sink_input *si = NULL;
3345     const char *name = NULL;
3346
3347     pa_native_connection_assert_ref(c);
3348     pa_assert(t);
3349
3350     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3351         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3352         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3353         pa_tagstruct_get_boolean(t, &mute) ||
3354         !pa_tagstruct_eof(t)) {
3355         protocol_error(c);
3356         return;
3357     }
3358
3359     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3360     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3361     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3362     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3363     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3364
3365     switch (command) {
3366
3367         case PA_COMMAND_SET_SINK_MUTE:
3368
3369             if (idx != PA_INVALID_INDEX)
3370                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3371             else
3372                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3373
3374             break;
3375
3376         case PA_COMMAND_SET_SOURCE_MUTE:
3377             if (idx != PA_INVALID_INDEX)
3378                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3379             else
3380                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3381
3382             break;
3383
3384         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3385             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3386             break;
3387
3388         default:
3389             pa_assert_not_reached();
3390     }
3391
3392     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3393
3394     if (sink)
3395         pa_sink_set_mute(sink, mute);
3396     else if (source)
3397         pa_source_set_mute(source, mute);
3398     else if (si)
3399         pa_sink_input_set_mute(si, mute, TRUE);
3400
3401     pa_pstream_send_simple_ack(c->pstream, tag);
3402 }
3403
3404 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3405     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3406     uint32_t idx;
3407     pa_bool_t b;
3408     playback_stream *s;
3409
3410     pa_native_connection_assert_ref(c);
3411     pa_assert(t);
3412
3413     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3414         pa_tagstruct_get_boolean(t, &b) < 0 ||
3415         !pa_tagstruct_eof(t)) {
3416         protocol_error(c);
3417         return;
3418     }
3419
3420     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3421     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3422     s = pa_idxset_get_by_index(c->output_streams, idx);
3423     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3424     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3425
3426     pa_sink_input_cork(s->sink_input, b);
3427
3428     if (b)
3429         s->is_underrun = TRUE;
3430
3431     pa_pstream_send_simple_ack(c->pstream, tag);
3432 }
3433
3434 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3435     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3436     uint32_t idx;
3437     playback_stream *s;
3438
3439     pa_native_connection_assert_ref(c);
3440     pa_assert(t);
3441
3442     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3443         !pa_tagstruct_eof(t)) {
3444         protocol_error(c);
3445         return;
3446     }
3447
3448     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3449     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3450     s = pa_idxset_get_by_index(c->output_streams, idx);
3451     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3452     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3453
3454     switch (command) {
3455         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3456             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3457             break;
3458
3459         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3460             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3461             break;
3462
3463         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3464             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3465             break;
3466
3467         default:
3468             pa_assert_not_reached();
3469     }
3470
3471     pa_pstream_send_simple_ack(c->pstream, tag);
3472 }
3473
3474 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3475     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3476     uint32_t idx;
3477     record_stream *s;
3478     pa_bool_t b;
3479
3480     pa_native_connection_assert_ref(c);
3481     pa_assert(t);
3482
3483     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3484         pa_tagstruct_get_boolean(t, &b) < 0 ||
3485         !pa_tagstruct_eof(t)) {
3486         protocol_error(c);
3487         return;
3488     }
3489
3490     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3491     s = pa_idxset_get_by_index(c->record_streams, idx);
3492     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3493
3494     pa_source_output_cork(s->source_output, b);
3495     pa_memblockq_prebuf_force(s->memblockq);
3496     pa_pstream_send_simple_ack(c->pstream, tag);
3497 }
3498
3499 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3500     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3501     uint32_t idx;
3502     record_stream *s;
3503
3504     pa_native_connection_assert_ref(c);
3505     pa_assert(t);
3506
3507     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3508         !pa_tagstruct_eof(t)) {
3509         protocol_error(c);
3510         return;
3511     }
3512
3513     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3514     s = pa_idxset_get_by_index(c->record_streams, idx);
3515     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3516
3517     pa_memblockq_flush_read(s->memblockq);
3518     pa_pstream_send_simple_ack(c->pstream, tag);
3519 }
3520
3521 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3522     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3523     uint32_t idx;
3524     pa_buffer_attr a;
3525     pa_tagstruct *reply;
3526
3527     pa_native_connection_assert_ref(c);
3528     pa_assert(t);
3529
3530     memset(&a, 0, sizeof(a));
3531
3532     if (pa_tagstruct_getu32(t, &idx) < 0) {
3533         protocol_error(c);
3534         return;
3535     }
3536
3537     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3538
3539     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3540         playback_stream *s;
3541         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3542
3543         s = pa_idxset_get_by_index(c->output_streams, idx);
3544         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3545         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3546
3547         if (pa_tagstruct_get(
3548                     t,
3549                     PA_TAG_U32, &a.maxlength,
3550                     PA_TAG_U32, &a.tlength,
3551                     PA_TAG_U32, &a.prebuf,
3552                     PA_TAG_U32, &a.minreq,
3553                     PA_TAG_INVALID) < 0 ||
3554             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3555             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3556             !pa_tagstruct_eof(t)) {
3557             protocol_error(c);
3558             return;
3559         }
3560
3561         s->adjust_latency = adjust_latency;
3562         s->early_requests = early_requests;
3563         s->buffer_attr = a;
3564
3565         fix_playback_buffer_attr(s);
3566         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3567
3568         reply = reply_new(tag);
3569         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3570         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3571         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3572         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3573
3574         if (c->version >= 13)
3575             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3576
3577     } else {
3578         record_stream *s;
3579         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3580         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3581
3582         s = pa_idxset_get_by_index(c->record_streams, idx);
3583         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3584
3585         if (pa_tagstruct_get(
3586                     t,
3587                     PA_TAG_U32, &a.maxlength,
3588                     PA_TAG_U32, &a.fragsize,
3589                     PA_TAG_INVALID) < 0 ||
3590             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3591             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3592             !pa_tagstruct_eof(t)) {
3593             protocol_error(c);
3594             return;
3595         }
3596
3597         s->adjust_latency = adjust_latency;
3598         s->early_requests = early_requests;
3599         s->buffer_attr = a;
3600
3601         fix_record_buffer_attr_pre(s);
3602         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3603         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3604         fix_record_buffer_attr_post(s);
3605
3606         reply = reply_new(tag);
3607         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3608         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3609
3610         if (c->version >= 13)
3611             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3612     }
3613
3614     pa_pstream_send_tagstruct(c->pstream, reply);
3615 }
3616
3617 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3618     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3619     uint32_t idx;
3620     uint32_t rate;
3621
3622     pa_native_connection_assert_ref(c);
3623     pa_assert(t);
3624
3625     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3626         pa_tagstruct_getu32(t, &rate) < 0 ||
3627         !pa_tagstruct_eof(t)) {
3628         protocol_error(c);
3629         return;
3630     }
3631
3632     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3633     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3634
3635     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3636         playback_stream *s;
3637
3638         s = pa_idxset_get_by_index(c->output_streams, idx);
3639         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3640         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3641
3642         pa_sink_input_set_rate(s->sink_input, rate);
3643
3644     } else {
3645         record_stream *s;
3646         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3647
3648         s = pa_idxset_get_by_index(c->record_streams, idx);
3649         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3650
3651         pa_source_output_set_rate(s->source_output, rate);
3652     }
3653
3654     pa_pstream_send_simple_ack(c->pstream, tag);
3655 }
3656
3657 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3658     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3659     uint32_t idx;
3660     uint32_t mode;
3661     pa_proplist *p;
3662
3663     pa_native_connection_assert_ref(c);
3664     pa_assert(t);
3665
3666     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3667
3668     p = pa_proplist_new();
3669
3670     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3671
3672         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3673             pa_tagstruct_get_proplist(t, p) < 0 ||
3674             !pa_tagstruct_eof(t)) {
3675             protocol_error(c);
3676             pa_proplist_free(p);
3677             return;
3678         }
3679
3680     } else {
3681
3682         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3683             pa_tagstruct_getu32(t, &mode) < 0 ||
3684             pa_tagstruct_get_proplist(t, p) < 0 ||
3685             !pa_tagstruct_eof(t)) {
3686             protocol_error(c);
3687             pa_proplist_free(p);
3688             return;
3689         }
3690     }
3691
3692     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3693         pa_proplist_free(p);
3694         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3695     }
3696
3697     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3698         playback_stream *s;
3699
3700         s = pa_idxset_get_by_index(c->output_streams, idx);
3701         if (!s || !playback_stream_isinstance(s)) {
3702             pa_proplist_free(p);
3703             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3704         }
3705         pa_sink_input_update_proplist(s->sink_input, mode, p);
3706
3707     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3708         record_stream *s;
3709
3710         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3711             pa_proplist_free(p);
3712             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3713         }
3714         pa_source_output_update_proplist(s->source_output, mode, p);
3715
3716     } else {
3717         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3718
3719         pa_client_update_proplist(c->client, mode, p);
3720     }
3721
3722     pa_pstream_send_simple_ack(c->pstream, tag);
3723     pa_proplist_free(p);
3724 }
3725
3726 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3727     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3728     uint32_t idx;
3729     unsigned changed = 0;
3730     pa_proplist *p;
3731     pa_strlist *l = NULL;
3732
3733     pa_native_connection_assert_ref(c);
3734     pa_assert(t);
3735
3736     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3737
3738     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3739
3740         if (pa_tagstruct_getu32(t, &idx) < 0) {
3741             protocol_error(c);
3742             return;
3743         }
3744     }
3745
3746     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3747         playback_stream *s;
3748
3749         s = pa_idxset_get_by_index(c->output_streams, idx);
3750         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3751         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3752
3753         p = s->sink_input->proplist;
3754
3755     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3756         record_stream *s;
3757
3758         s = pa_idxset_get_by_index(c->record_streams, idx);
3759         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3760
3761         p = s->source_output->proplist;
3762     } else {
3763         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3764
3765         p = c->client->proplist;
3766     }
3767
3768     for (;;) {
3769         const char *k;
3770
3771         if (pa_tagstruct_gets(t, &k) < 0) {
3772             protocol_error(c);
3773             pa_strlist_free(l);
3774             return;
3775         }
3776
3777         if (!k)
3778             break;
3779
3780         l = pa_strlist_prepend(l, k);
3781     }
3782
3783     if (!pa_tagstruct_eof(t)) {
3784         protocol_error(c);
3785         pa_strlist_free(l);
3786         return;
3787     }
3788
3789     for (;;) {
3790         char *z;
3791
3792         l = pa_strlist_pop(l, &z);
3793
3794         if (!z)
3795             break;
3796
3797         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3798         pa_xfree(z);
3799     }
3800
3801     pa_pstream_send_simple_ack(c->pstream, tag);
3802
3803     if (changed) {
3804         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3805             playback_stream *s;
3806
3807             s = pa_idxset_get_by_index(c->output_streams, idx);
3808             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3809
3810         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3811             record_stream *s;
3812
3813             s = pa_idxset_get_by_index(c->record_streams, idx);
3814             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3815
3816         } else {
3817             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3818             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3819         }
3820     }
3821 }
3822
3823 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3824     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3825     const char *s;
3826
3827     pa_native_connection_assert_ref(c);
3828     pa_assert(t);
3829
3830     if (pa_tagstruct_gets(t, &s) < 0 ||
3831         !pa_tagstruct_eof(t)) {
3832         protocol_error(c);
3833         return;
3834     }
3835
3836     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3837     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3838
3839     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3840         pa_source *source;
3841
3842         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3843         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3844
3845         pa_namereg_set_default_source(c->protocol->core, source);
3846     } else {
3847         pa_sink *sink;
3848         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3849
3850         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3851         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3852
3853         pa_namereg_set_default_sink(c->protocol->core, sink);
3854     }
3855
3856     pa_pstream_send_simple_ack(c->pstream, tag);
3857 }
3858
3859 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3860     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3861     uint32_t idx;
3862     const char *name;
3863
3864     pa_native_connection_assert_ref(c);
3865     pa_assert(t);
3866
3867     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3868         pa_tagstruct_gets(t, &name) < 0 ||
3869         !pa_tagstruct_eof(t)) {
3870         protocol_error(c);
3871         return;
3872     }
3873
3874     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3875     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3876
3877     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3878         playback_stream *s;
3879
3880         s = pa_idxset_get_by_index(c->output_streams, idx);
3881         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3882         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3883
3884         pa_sink_input_set_name(s->sink_input, name);
3885
3886     } else {
3887         record_stream *s;
3888         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3889
3890         s = pa_idxset_get_by_index(c->record_streams, idx);
3891         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3892
3893         pa_source_output_set_name(s->source_output, name);
3894     }
3895
3896     pa_pstream_send_simple_ack(c->pstream, tag);
3897 }
3898
3899 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3900     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3901     uint32_t idx;
3902
3903     pa_native_connection_assert_ref(c);
3904     pa_assert(t);
3905
3906     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3907         !pa_tagstruct_eof(t)) {
3908         protocol_error(c);
3909         return;
3910     }
3911
3912     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3913
3914     if (command == PA_COMMAND_KILL_CLIENT) {
3915         pa_client *client;
3916
3917         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3918         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3919
3920         pa_native_connection_ref(c);
3921         pa_client_kill(client);
3922
3923     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3924         pa_sink_input *s;
3925
3926         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3927         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3928
3929         pa_native_connection_ref(c);
3930         pa_sink_input_kill(s);
3931     } else {
3932         pa_source_output *s;
3933
3934         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
3935
3936         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3937         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3938
3939         pa_native_connection_ref(c);
3940         pa_source_output_kill(s);
3941     }
3942
3943     pa_pstream_send_simple_ack(c->pstream, tag);
3944     pa_native_connection_unref(c);
3945 }
3946
3947 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3948     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3949     pa_module *m;
3950     const char *name, *argument;
3951     pa_tagstruct *reply;
3952
3953     pa_native_connection_assert_ref(c);
3954     pa_assert(t);
3955
3956     if (pa_tagstruct_gets(t, &name) < 0 ||
3957         pa_tagstruct_gets(t, &argument) < 0 ||
3958         !pa_tagstruct_eof(t)) {
3959         protocol_error(c);
3960         return;
3961     }
3962
3963     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3964     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
3965     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
3966
3967     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
3968         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
3969         return;
3970     }
3971
3972     reply = reply_new(tag);
3973     pa_tagstruct_putu32(reply, m->index);
3974     pa_pstream_send_tagstruct(c->pstream, reply);
3975 }
3976
3977 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3978     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3979     uint32_t idx;
3980     pa_module *m;
3981
3982     pa_native_connection_assert_ref(c);
3983     pa_assert(t);
3984
3985     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3986         !pa_tagstruct_eof(t)) {
3987         protocol_error(c);
3988         return;
3989     }
3990
3991     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3992     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3993     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
3994
3995     pa_module_unload_request(m, FALSE);
3996     pa_pstream_send_simple_ack(c->pstream, tag);
3997 }
3998
3999 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4000     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4001     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4002     const char *name_device = NULL;
4003
4004     pa_native_connection_assert_ref(c);
4005     pa_assert(t);
4006
4007     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4008         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4009         pa_tagstruct_gets(t, &name_device) < 0 ||
4010         !pa_tagstruct_eof(t)) {
4011         protocol_error(c);
4012         return;
4013     }
4014
4015     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4016     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4017
4018     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4019     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4020     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4021     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4022
4023     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4024         pa_sink_input *si = NULL;
4025         pa_sink *sink = NULL;
4026
4027         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4028
4029         if (idx_device != PA_INVALID_INDEX)
4030             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4031         else
4032             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4033
4034         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4035
4036         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4037             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4038             return;
4039         }
4040     } else {
4041         pa_source_output *so = NULL;
4042         pa_source *source;
4043
4044         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4045
4046         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4047
4048         if (idx_device != PA_INVALID_INDEX)
4049             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4050         else
4051             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4052
4053         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4054
4055         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4056             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4057             return;
4058         }
4059     }
4060
4061     pa_pstream_send_simple_ack(c->pstream, tag);
4062 }
4063
4064 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4065     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4066     uint32_t idx = PA_INVALID_INDEX;
4067     const char *name = NULL;
4068     pa_bool_t b;
4069
4070     pa_native_connection_assert_ref(c);
4071     pa_assert(t);
4072
4073     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4074         pa_tagstruct_gets(t, &name) < 0 ||
4075         pa_tagstruct_get_boolean(t, &b) < 0 ||
4076         !pa_tagstruct_eof(t)) {
4077         protocol_error(c);
4078         return;
4079     }
4080
4081     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4082     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4083     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4084     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4085     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4086
4087     if (command == PA_COMMAND_SUSPEND_SINK) {
4088
4089         if (idx == PA_INVALID_INDEX && name && !*name) {
4090
4091             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4092
4093             if (pa_sink_suspend_all(c->protocol->core, b) < 0) {
4094                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4095                 return;
4096             }
4097         } else {
4098             pa_sink *sink = NULL;
4099
4100             if (idx != PA_INVALID_INDEX)
4101                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4102             else
4103                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4104
4105             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4106
4107             if (pa_sink_suspend(sink, b) < 0) {
4108                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4109                 return;
4110             }
4111         }
4112     } else {
4113
4114         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4115
4116         if (idx == PA_INVALID_INDEX && name && !*name) {
4117
4118             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4119
4120             if (pa_source_suspend_all(c->protocol->core, b) < 0) {
4121                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4122                 return;
4123             }
4124
4125         } else {
4126             pa_source *source;
4127
4128             if (idx != PA_INVALID_INDEX)
4129                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4130             else
4131                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4132
4133             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4134
4135             if (pa_source_suspend(source, b) < 0) {
4136                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4137                 return;
4138             }
4139         }
4140     }
4141
4142     pa_pstream_send_simple_ack(c->pstream, tag);
4143 }
4144
4145 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4146     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4147     uint32_t idx = PA_INVALID_INDEX;
4148     const char *name = NULL;
4149     pa_module *m;
4150     pa_native_protocol_ext_cb_t cb;
4151
4152     pa_native_connection_assert_ref(c);
4153     pa_assert(t);
4154
4155     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4156         pa_tagstruct_gets(t, &name) < 0) {
4157         protocol_error(c);
4158         return;
4159     }
4160
4161     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4162     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4163     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4164     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4165     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4166
4167     if (idx != PA_INVALID_INDEX)
4168         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4169     else {
4170         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4171             if (strcmp(name, m->name) == 0)
4172                 break;
4173     }
4174
4175     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4176     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4177
4178     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4179     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4180
4181     if (cb(c->protocol, m, c, tag, t) < 0)
4182         protocol_error(c);
4183 }
4184
4185 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4186     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4187     uint32_t idx = PA_INVALID_INDEX;
4188     const char *name = NULL, *profile = NULL;
4189     pa_card *card = NULL;
4190
4191     pa_native_connection_assert_ref(c);
4192     pa_assert(t);
4193
4194     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4195         pa_tagstruct_gets(t, &name) < 0 ||
4196         pa_tagstruct_gets(t, &profile) < 0 ||
4197         !pa_tagstruct_eof(t)) {
4198         protocol_error(c);
4199         return;
4200     }
4201
4202     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4203     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4204     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4205     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4206     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4207
4208     if (idx != PA_INVALID_INDEX)
4209         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4210     else
4211         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4212
4213     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4214
4215     if (pa_card_set_profile(card, profile, TRUE) < 0) {
4216         pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4217         return;
4218     }
4219
4220     pa_pstream_send_simple_ack(c->pstream, tag);
4221 }
4222
4223 /*** pstream callbacks ***/
4224
4225 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4226     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4227
4228     pa_assert(p);
4229     pa_assert(packet);
4230     pa_native_connection_assert_ref(c);
4231
4232     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4233         pa_log("invalid packet.");
4234         native_connection_unlink(c);
4235     }
4236 }
4237
4238 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4239     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4240     output_stream *stream;
4241
4242     pa_assert(p);
4243     pa_assert(chunk);
4244     pa_native_connection_assert_ref(c);
4245
4246     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4247         pa_log("client sent block for invalid stream.");
4248         /* Ignoring */
4249         return;
4250     }
4251
4252 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4253
4254     if (playback_stream_isinstance(stream)) {
4255         playback_stream *ps = PLAYBACK_STREAM(stream);
4256
4257         if (chunk->memblock) {
4258             if (seek != PA_SEEK_RELATIVE || offset != 0)
4259                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4260
4261             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4262         } else
4263             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4264
4265     } else {
4266         upload_stream *u = UPLOAD_STREAM(stream);
4267         size_t l;
4268
4269         if (!u->memchunk.memblock) {
4270             if (u->length == chunk->length && chunk->memblock) {
4271                 u->memchunk = *chunk;
4272                 pa_memblock_ref(u->memchunk.memblock);
4273                 u->length = 0;
4274             } else {
4275                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4276                 u->memchunk.index = u->memchunk.length = 0;
4277             }
4278         }
4279
4280         pa_assert(u->memchunk.memblock);
4281
4282         l = u->length;
4283         if (l > chunk->length)
4284             l = chunk->length;
4285
4286         if (l > 0) {
4287             void *dst;
4288             dst = pa_memblock_acquire(u->memchunk.memblock);
4289
4290             if (chunk->memblock) {
4291                 void *src;
4292                 src = pa_memblock_acquire(chunk->memblock);
4293
4294                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4295                        (uint8_t*) src + chunk->index, l);
4296
4297                 pa_memblock_release(chunk->memblock);
4298             } else
4299                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4300
4301             pa_memblock_release(u->memchunk.memblock);
4302
4303             u->memchunk.length += l;
4304             u->length -= l;
4305         }
4306     }
4307 }
4308
4309 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4310     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4311
4312     pa_assert(p);
4313     pa_native_connection_assert_ref(c);
4314
4315     native_connection_unlink(c);
4316     pa_log_info("Connection died.");
4317 }
4318
4319 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4320     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4321
4322     pa_assert(p);
4323     pa_native_connection_assert_ref(c);
4324
4325     native_connection_send_memblock(c);
4326 }
4327
4328 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4329     pa_thread_mq *q;
4330
4331     if (!(q = pa_thread_mq_get()))
4332         pa_pstream_send_revoke(p, block_id);
4333     else
4334         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4335 }
4336
4337 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4338     pa_thread_mq *q;
4339
4340     if (!(q = pa_thread_mq_get()))
4341         pa_pstream_send_release(p, block_id);
4342     else
4343         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4344 }
4345
4346 /*** client callbacks ***/
4347
4348 static void client_kill_cb(pa_client *c) {
4349     pa_assert(c);
4350
4351     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4352     pa_log_info("Connection killed.");
4353 }
4354
4355 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4356     pa_tagstruct *t;
4357     pa_native_connection *c;
4358
4359     pa_assert(client);
4360     c = PA_NATIVE_CONNECTION(client->userdata);
4361     pa_native_connection_assert_ref(c);
4362
4363     if (c->version < 15)
4364       return;
4365
4366     t = pa_tagstruct_new(NULL, 0);
4367     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4368     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4369     pa_tagstruct_puts(t, event);
4370     pa_tagstruct_put_proplist(t, pl);
4371     pa_pstream_send_tagstruct(c->pstream, t);
4372 }
4373
4374 /*** module entry points ***/
4375
4376 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
4377     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4378
4379     pa_assert(m);
4380     pa_assert(tv);
4381     pa_native_connection_assert_ref(c);
4382     pa_assert(c->auth_timeout_event == e);
4383
4384     if (!c->authorized) {
4385         native_connection_unlink(c);
4386         pa_log_info("Connection terminated due to authentication timeout.");
4387     }
4388 }
4389
4390 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4391     pa_native_connection *c;
4392     char pname[128];
4393     pa_client *client;
4394     pa_client_new_data data;
4395
4396     pa_assert(p);
4397     pa_assert(io);
4398     pa_assert(o);
4399
4400     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4401         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4402         pa_iochannel_free(io);
4403         return;
4404     }
4405
4406     pa_client_new_data_init(&data);
4407     data.module = o->module;
4408     data.driver = __FILE__;
4409     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4410     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4411     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4412     client = pa_client_new(p->core, &data);
4413     pa_client_new_data_done(&data);
4414
4415     if (!client)
4416         return;
4417
4418     c = pa_msgobject_new(pa_native_connection);
4419     c->parent.parent.free = native_connection_free;
4420     c->parent.process_msg = native_connection_process_msg;
4421     c->protocol = p;
4422     c->options = pa_native_options_ref(o);
4423     c->authorized = FALSE;
4424
4425     if (o->auth_anonymous) {
4426         pa_log_info("Client authenticated anonymously.");
4427         c->authorized = TRUE;
4428     }
4429
4430     if (!c->authorized &&
4431         o->auth_ip_acl &&
4432         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4433
4434         pa_log_info("Client authenticated by IP ACL.");
4435         c->authorized = TRUE;
4436     }
4437
4438     if (!c->authorized) {
4439         struct timeval tv;
4440         pa_gettimeofday(&tv);
4441         tv.tv_sec += AUTH_TIMEOUT;
4442         c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c);
4443     } else
4444         c->auth_timeout_event = NULL;
4445
4446     c->is_local = pa_iochannel_socket_is_local(io);
4447     c->version = 8;
4448
4449     c->client = client;
4450     c->client->kill = client_kill_cb;
4451     c->client->send_event = client_send_event_cb;
4452     c->client->userdata = c;
4453
4454     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4455     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4456     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4457     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4458     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4459     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4460     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4461
4462     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
4463
4464     c->record_streams = pa_idxset_new(NULL, NULL);
4465     c->output_streams = pa_idxset_new(NULL, NULL);
4466
4467     c->rrobin_index = PA_IDXSET_INVALID;
4468     c->subscription = NULL;
4469
4470     pa_idxset_put(p->connections, c, NULL);
4471
4472 #ifdef HAVE_CREDS
4473     if (pa_iochannel_creds_supported(io))
4474         pa_iochannel_creds_enable(io);
4475 #endif
4476
4477     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4478 }
4479
4480 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4481     pa_native_connection *c;
4482     void *state = NULL;
4483
4484     pa_assert(p);
4485     pa_assert(m);
4486
4487     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4488         if (c->options->module == m)
4489             native_connection_unlink(c);
4490 }
4491
4492 static pa_native_protocol* native_protocol_new(pa_core *c) {
4493     pa_native_protocol *p;
4494     pa_native_hook_t h;
4495
4496     pa_assert(c);
4497
4498     p = pa_xnew(pa_native_protocol, 1);
4499     PA_REFCNT_INIT(p);
4500     p->core = c;
4501     p->connections = pa_idxset_new(NULL, NULL);
4502
4503     p->servers = NULL;
4504
4505     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4506
4507     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4508         pa_hook_init(&p->hooks[h], p);
4509
4510     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4511
4512     return p;
4513 }
4514
4515 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4516     pa_native_protocol *p;
4517
4518     if ((p = pa_shared_get(c, "native-protocol")))
4519         return pa_native_protocol_ref(p);
4520
4521     return native_protocol_new(c);
4522 }
4523
4524 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4525     pa_assert(p);
4526     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4527
4528     PA_REFCNT_INC(p);
4529
4530     return p;
4531 }
4532
4533 void pa_native_protocol_unref(pa_native_protocol *p) {
4534     pa_native_connection *c;
4535     pa_native_hook_t h;
4536
4537     pa_assert(p);
4538     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4539
4540     if (PA_REFCNT_DEC(p) > 0)
4541         return;
4542
4543     while ((c = pa_idxset_first(p->connections, NULL)))
4544         native_connection_unlink(c);
4545
4546     pa_idxset_free(p->connections, NULL, NULL);
4547
4548     pa_strlist_free(p->servers);
4549
4550     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4551         pa_hook_done(&p->hooks[h]);
4552
4553     pa_hashmap_free(p->extensions, NULL, NULL);
4554
4555     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4556
4557     pa_xfree(p);
4558 }
4559
4560 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4561     pa_assert(p);
4562     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4563     pa_assert(name);
4564
4565     p->servers = pa_strlist_prepend(p->servers, name);
4566
4567     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4568 }
4569
4570 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4571     pa_assert(p);
4572     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4573     pa_assert(name);
4574
4575     p->servers = pa_strlist_remove(p->servers, name);
4576
4577     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4578 }
4579
4580 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4581     pa_assert(p);
4582     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4583
4584     return p->hooks;
4585 }
4586
4587 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4588     pa_assert(p);
4589     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4590
4591     return p->servers;
4592 }
4593
4594 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4595     pa_assert(p);
4596     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4597     pa_assert(m);
4598     pa_assert(cb);
4599     pa_assert(!pa_hashmap_get(p->extensions, m));
4600
4601     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4602     return 0;
4603 }
4604
4605 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4606     pa_assert(p);
4607     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4608     pa_assert(m);
4609
4610     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4611 }
4612
4613 pa_native_options* pa_native_options_new(void) {
4614     pa_native_options *o;
4615
4616     o = pa_xnew0(pa_native_options, 1);
4617     PA_REFCNT_INIT(o);
4618
4619     return o;
4620 }
4621
4622 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4623     pa_assert(o);
4624     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4625
4626     PA_REFCNT_INC(o);
4627
4628     return o;
4629 }
4630
4631 void pa_native_options_unref(pa_native_options *o) {
4632     pa_assert(o);
4633     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4634
4635     if (PA_REFCNT_DEC(o) > 0)
4636         return;
4637
4638     pa_xfree(o->auth_group);
4639
4640     if (o->auth_ip_acl)
4641         pa_ip_acl_free(o->auth_ip_acl);
4642
4643     if (o->auth_cookie)
4644         pa_auth_cookie_unref(o->auth_cookie);
4645
4646     pa_xfree(o);
4647 }
4648
4649 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4650     pa_bool_t enabled;
4651     const char *acl;
4652
4653     pa_assert(o);
4654     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4655     pa_assert(ma);
4656
4657     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4658         pa_log("auth-anonymous= expects a boolean argument.");
4659         return -1;
4660     }
4661
4662     enabled = TRUE;
4663     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4664         pa_log("auth-group-enabled= expects a boolean argument.");
4665         return -1;
4666     }
4667
4668     pa_xfree(o->auth_group);
4669     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4670
4671 #ifndef HAVE_CREDS
4672     if (o->auth_group)
4673         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4674 #endif
4675
4676     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4677         pa_ip_acl *ipa;
4678
4679         if (!(ipa = pa_ip_acl_new(acl))) {
4680             pa_log("Failed to parse IP ACL '%s'", acl);
4681             return -1;
4682         }
4683
4684         if (o->auth_ip_acl)
4685             pa_ip_acl_free(o->auth_ip_acl);
4686
4687         o->auth_ip_acl = ipa;
4688     }
4689
4690     enabled = TRUE;
4691     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4692         pa_log("auth-cookie-enabled= expects a boolean argument.");
4693         return -1;
4694     }
4695
4696     if (o->auth_cookie)
4697         pa_auth_cookie_unref(o->auth_cookie);
4698
4699     if (enabled) {
4700         const char *cn;
4701
4702         /* The new name for this is 'auth-cookie', for compat reasons
4703          * we check the old name too */
4704         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4705             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4706                 cn = PA_NATIVE_COOKIE_FILE;
4707
4708         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4709             return -1;
4710
4711     } else
4712           o->auth_cookie = NULL;
4713
4714     return 0;
4715 }
4716
4717 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4718     pa_native_connection_assert_ref(c);
4719
4720     return c->pstream;
4721 }