protocol-native,proplist-util: port to pa_get_{user|host}_name_malloc()
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/timeval.h>
33 #include <pulse/version.h>
34 #include <pulse/utf8.h>
35 #include <pulse/util.h>
36 #include <pulse/xmalloc.h>
37
38 #include <pulsecore/native-common.h>
39 #include <pulsecore/packet.h>
40 #include <pulsecore/client.h>
41 #include <pulsecore/source-output.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/pstream.h>
44 #include <pulsecore/tagstruct.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream-util.h>
47 #include <pulsecore/authkey.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/core-subscribe.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/strlist.h>
53 #include <pulsecore/shared.h>
54 #include <pulsecore/sample-util.h>
55 #include <pulsecore/llist.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* Kick a client if it doesn't authenticate within this time */
64 #define AUTH_TIMEOUT 60
65
66 /* Don't accept more connection than this */
67 #define MAX_CONNECTIONS 64
68
69 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
70 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
71 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
72 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
73
74 struct pa_native_protocol;
75
76 typedef struct record_stream {
77     pa_msgobject parent;
78
79     pa_native_connection *connection;
80     uint32_t index;
81
82     pa_source_output *source_output;
83     pa_memblockq *memblockq;
84
85     pa_bool_t adjust_latency:1;
86     pa_bool_t early_requests:1;
87
88     pa_buffer_attr buffer_attr;
89
90     pa_atomic_t on_the_fly;
91     pa_usec_t configured_source_latency;
92     size_t drop_initial;
93
94     /* Only updated after SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY */
95     size_t on_the_fly_snapshot;
96     pa_usec_t current_monitor_latency;
97     pa_usec_t current_source_latency;
98 } record_stream;
99
100 PA_DECLARE_CLASS(record_stream);
101 #define RECORD_STREAM(o) (record_stream_cast(o))
102 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
103
104 typedef struct output_stream {
105     pa_msgobject parent;
106 } output_stream;
107
108 PA_DECLARE_CLASS(output_stream);
109 #define OUTPUT_STREAM(o) (output_stream_cast(o))
110 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
111
112 typedef struct playback_stream {
113     output_stream parent;
114
115     pa_native_connection *connection;
116     uint32_t index;
117
118     pa_sink_input *sink_input;
119     pa_memblockq *memblockq;
120
121     pa_bool_t adjust_latency:1;
122     pa_bool_t early_requests:1;
123
124     pa_bool_t is_underrun:1;
125     pa_bool_t drain_request:1;
126     uint32_t drain_tag;
127     uint32_t syncid;
128
129     pa_atomic_t missing;
130     pa_usec_t configured_sink_latency;
131     pa_buffer_attr buffer_attr;
132
133     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
134     int64_t read_index, write_index;
135     size_t render_memblockq_length;
136     pa_usec_t current_sink_latency;
137     uint64_t playing_for, underrun_for;
138 } playback_stream;
139
140 PA_DECLARE_CLASS(playback_stream);
141 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
142 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
143
144 typedef struct upload_stream {
145     output_stream parent;
146
147     pa_native_connection *connection;
148     uint32_t index;
149
150     pa_memchunk memchunk;
151     size_t length;
152     char *name;
153     pa_sample_spec sample_spec;
154     pa_channel_map channel_map;
155     pa_proplist *proplist;
156 } upload_stream;
157
158 PA_DECLARE_CLASS(upload_stream);
159 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
160 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
161
162 struct pa_native_connection {
163     pa_msgobject parent;
164     pa_native_protocol *protocol;
165     pa_native_options *options;
166     pa_bool_t authorized:1;
167     pa_bool_t is_local:1;
168     uint32_t version;
169     pa_client *client;
170     pa_pstream *pstream;
171     pa_pdispatch *pdispatch;
172     pa_idxset *record_streams, *output_streams;
173     uint32_t rrobin_index;
174     pa_subscription *subscription;
175     pa_time_event *auth_timeout_event;
176 };
177
178 PA_DECLARE_CLASS(pa_native_connection);
179 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
180 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
181
182 struct pa_native_protocol {
183     PA_REFCNT_DECLARE;
184
185     pa_core *core;
186     pa_idxset *connections;
187
188     pa_strlist *servers;
189     pa_hook hooks[PA_NATIVE_HOOK_MAX];
190
191     pa_hashmap *extensions;
192 };
193
194 enum {
195     SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY = PA_SOURCE_OUTPUT_MESSAGE_MAX
196 };
197
198 enum {
199     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
200     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
201     SINK_INPUT_MESSAGE_FLUSH,
202     SINK_INPUT_MESSAGE_TRIGGER,
203     SINK_INPUT_MESSAGE_SEEK,
204     SINK_INPUT_MESSAGE_PREBUF_FORCE,
205     SINK_INPUT_MESSAGE_UPDATE_LATENCY,
206     SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR
207 };
208
209 enum {
210     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
211     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
212     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
213     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
214     PLAYBACK_STREAM_MESSAGE_STARTED,
215     PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH
216 };
217
218 enum {
219     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
220 };
221
222 enum {
223     CONNECTION_MESSAGE_RELEASE,
224     CONNECTION_MESSAGE_REVOKE
225 };
226
227 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
228 static void sink_input_kill_cb(pa_sink_input *i);
229 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
230 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest);
231 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
232 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
233 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
234 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
235
236 static void native_connection_send_memblock(pa_native_connection *c);
237 static void playback_stream_request_bytes(struct playback_stream*s);
238
239 static void source_output_kill_cb(pa_source_output *o);
240 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
241 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
242 static void source_output_moving_cb(pa_source_output *o, pa_source *dest);
243 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
244 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
245
246 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
247 static int source_output_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
248
249 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
263 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
264 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
265 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
266 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
267 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
268 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
269 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
270 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
271 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
272 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
273 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
274 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
275 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
276 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
277 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
278 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
279 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
280 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
281 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
282 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
283 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
284 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
285 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
286 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
287
288 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
289     [PA_COMMAND_ERROR] = NULL,
290     [PA_COMMAND_TIMEOUT] = NULL,
291     [PA_COMMAND_REPLY] = NULL,
292     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
293     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
294     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
295     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
296     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
297     [PA_COMMAND_AUTH] = command_auth,
298     [PA_COMMAND_REQUEST] = NULL,
299     [PA_COMMAND_EXIT] = command_exit,
300     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
301     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
302     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
303     [PA_COMMAND_STAT] = command_stat,
304     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
305     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
306     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
307     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
308     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
309     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
310     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
311     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
312     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
313     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
314     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
315     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
316     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
317     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
318     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
319     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
320     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
321     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
322     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
323     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
324     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
325     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
326     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
327     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
328     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
329
330     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
331     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
332     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
333
334     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
335     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
336     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
337
338     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
339     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
340
341     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
342     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
343     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
344     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
345
346     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
347     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
348
349     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
350     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
351     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
352     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
353     [PA_COMMAND_KILL_CLIENT] = command_kill,
354     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
355     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
356     [PA_COMMAND_LOAD_MODULE] = command_load_module,
357     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
358
359     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
360     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
361     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
362     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
363
364     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
365     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
366
367     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
368     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
369
370     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
371     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
372
373     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
374     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
375     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
376
377     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
378     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
379     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
380
381     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
382
383     [PA_COMMAND_EXTENSION] = command_extension
384 };
385
386 /* structure management */
387
388 /* Called from main context */
389 static void upload_stream_unlink(upload_stream *s) {
390     pa_assert(s);
391
392     if (!s->connection)
393         return;
394
395     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
396     s->connection = NULL;
397     upload_stream_unref(s);
398 }
399
400 /* Called from main context */
401 static void upload_stream_free(pa_object *o) {
402     upload_stream *s = UPLOAD_STREAM(o);
403     pa_assert(s);
404
405     upload_stream_unlink(s);
406
407     pa_xfree(s->name);
408
409     if (s->proplist)
410         pa_proplist_free(s->proplist);
411
412     if (s->memchunk.memblock)
413         pa_memblock_unref(s->memchunk.memblock);
414
415     pa_xfree(s);
416 }
417
418 /* Called from main context */
419 static upload_stream* upload_stream_new(
420         pa_native_connection *c,
421         const pa_sample_spec *ss,
422         const pa_channel_map *map,
423         const char *name,
424         size_t length,
425         pa_proplist *p) {
426
427     upload_stream *s;
428
429     pa_assert(c);
430     pa_assert(ss);
431     pa_assert(name);
432     pa_assert(length > 0);
433     pa_assert(p);
434
435     s = pa_msgobject_new(upload_stream);
436     s->parent.parent.parent.free = upload_stream_free;
437     s->connection = c;
438     s->sample_spec = *ss;
439     s->channel_map = *map;
440     s->name = pa_xstrdup(name);
441     pa_memchunk_reset(&s->memchunk);
442     s->length = length;
443     s->proplist = pa_proplist_copy(p);
444     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
445
446     pa_idxset_put(c->output_streams, s, &s->index);
447
448     return s;
449 }
450
451 /* Called from main context */
452 static void record_stream_unlink(record_stream *s) {
453     pa_assert(s);
454
455     if (!s->connection)
456         return;
457
458     if (s->source_output) {
459         pa_source_output_unlink(s->source_output);
460         pa_source_output_unref(s->source_output);
461         s->source_output = NULL;
462     }
463
464     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
465     s->connection = NULL;
466     record_stream_unref(s);
467 }
468
469 /* Called from main context */
470 static void record_stream_free(pa_object *o) {
471     record_stream *s = RECORD_STREAM(o);
472     pa_assert(s);
473
474     record_stream_unlink(s);
475
476     pa_memblockq_free(s->memblockq);
477     pa_xfree(s);
478 }
479
480 /* Called from main context */
481 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
482     record_stream *s = RECORD_STREAM(o);
483     record_stream_assert_ref(s);
484
485     if (!s->connection)
486         return -1;
487
488     switch (code) {
489
490         case RECORD_STREAM_MESSAGE_POST_DATA:
491
492             /* We try to keep up to date with how many bytes are
493              * currently on the fly */
494             pa_atomic_sub(&s->on_the_fly, chunk->length);
495
496             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
497 /*                 pa_log_warn("Failed to push data into output queue."); */
498                 return -1;
499             }
500
501             if (!pa_pstream_is_pending(s->connection->pstream))
502                 native_connection_send_memblock(s->connection);
503
504             break;
505     }
506
507     return 0;
508 }
509
510 /* Called from main context */
511 static void fix_record_buffer_attr_pre(record_stream *s) {
512
513     size_t frame_size;
514     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
515
516     pa_assert(s);
517
518     /* This function will be called from the main thread, before as
519      * well as after the source output has been activated using
520      * pa_source_output_put()! That means it may not touch any
521      * ->thread_info data! */
522
523     frame_size = pa_frame_size(&s->source_output->sample_spec);
524
525     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
526         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
527     if (s->buffer_attr.maxlength <= 0)
528         s->buffer_attr.maxlength = (uint32_t) frame_size;
529
530     if (s->buffer_attr.fragsize == (uint32_t) -1)
531         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
532     if (s->buffer_attr.fragsize <= 0)
533         s->buffer_attr.fragsize = (uint32_t) frame_size;
534
535     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(s->buffer_attr.fragsize, &s->source_output->sample_spec);
536
537     if (s->early_requests) {
538
539         /* In early request mode we need to emulate the classic
540          * fragment-based playback model. We do this setting the source
541          * latency to the fragment size. */
542
543         source_usec = fragsize_usec;
544
545     } else if (s->adjust_latency) {
546
547         /* So, the user asked us to adjust the latency according to
548          * what the source can provide. Half the latency will be
549          * spent on the hw buffer, half of it in the async buffer
550          * queue we maintain for each client. */
551
552         source_usec = fragsize_usec/2;
553
554     } else {
555
556         /* Ok, the user didn't ask us to adjust the latency, hence we
557          * don't */
558
559         source_usec = (pa_usec_t) -1;
560     }
561
562     if (source_usec != (pa_usec_t) -1)
563         s->configured_source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
564     else
565         s->configured_source_latency = 0;
566
567     if (s->early_requests) {
568
569         /* Ok, we didn't necessarily get what we were asking for, so
570          * let's tell the user */
571
572         fragsize_usec = s->configured_source_latency;
573
574     } else if (s->adjust_latency) {
575
576         /* Now subtract what we actually got */
577
578         if (fragsize_usec >= s->configured_source_latency*2)
579             fragsize_usec -= s->configured_source_latency;
580         else
581             fragsize_usec = s->configured_source_latency;
582     }
583
584     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
585         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
586
587         s->buffer_attr.fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
588
589     if (s->buffer_attr.fragsize <= 0)
590         s->buffer_attr.fragsize = (uint32_t) frame_size;
591 }
592
593 /* Called from main context */
594 static void fix_record_buffer_attr_post(record_stream *s) {
595     size_t base;
596
597     pa_assert(s);
598
599     /* This function will be called from the main thread, before as
600      * well as after the source output has been activated using
601      * pa_source_output_put()! That means it may not touch and
602      * ->thread_info data! */
603
604     base = pa_frame_size(&s->source_output->sample_spec);
605
606     s->buffer_attr.fragsize = (s->buffer_attr.fragsize/base)*base;
607     if (s->buffer_attr.fragsize <= 0)
608         s->buffer_attr.fragsize = base;
609
610     if (s->buffer_attr.fragsize > s->buffer_attr.maxlength)
611         s->buffer_attr.fragsize = s->buffer_attr.maxlength;
612 }
613
614 /* Called from main context */
615 static record_stream* record_stream_new(
616         pa_native_connection *c,
617         pa_source *source,
618         pa_sample_spec *ss,
619         pa_channel_map *map,
620         pa_bool_t peak_detect,
621         pa_buffer_attr *attr,
622         pa_source_output_flags_t flags,
623         pa_proplist *p,
624         pa_bool_t adjust_latency,
625         pa_sink_input *direct_on_input,
626         pa_bool_t early_requests,
627         int *ret) {
628
629     record_stream *s;
630     pa_source_output *source_output = NULL;
631     size_t base;
632     pa_source_output_new_data data;
633
634     pa_assert(c);
635     pa_assert(ss);
636     pa_assert(p);
637     pa_assert(ret);
638
639     pa_source_output_new_data_init(&data);
640
641     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
642     data.driver = __FILE__;
643     data.module = c->options->module;
644     data.client = c->client;
645     data.source = source;
646     data.direct_on_input = direct_on_input;
647     pa_source_output_new_data_set_sample_spec(&data, ss);
648     pa_source_output_new_data_set_channel_map(&data, map);
649     if (peak_detect)
650         data.resample_method = PA_RESAMPLER_PEAKS;
651
652     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
653
654     pa_source_output_new_data_done(&data);
655
656     if (!source_output)
657         return NULL;
658
659     s = pa_msgobject_new(record_stream);
660     s->parent.parent.free = record_stream_free;
661     s->parent.process_msg = record_stream_process_msg;
662     s->connection = c;
663     s->source_output = source_output;
664     s->buffer_attr = *attr;
665     s->adjust_latency = adjust_latency;
666     s->early_requests = early_requests;
667     pa_atomic_store(&s->on_the_fly, 0);
668
669     s->source_output->parent.process_msg = source_output_process_msg;
670     s->source_output->push = source_output_push_cb;
671     s->source_output->kill = source_output_kill_cb;
672     s->source_output->get_latency = source_output_get_latency_cb;
673     s->source_output->moving = source_output_moving_cb;
674     s->source_output->suspend = source_output_suspend_cb;
675     s->source_output->send_event = source_output_send_event_cb;
676     s->source_output->userdata = s;
677
678     fix_record_buffer_attr_pre(s);
679
680     s->memblockq = pa_memblockq_new(
681             0,
682             s->buffer_attr.maxlength,
683             0,
684             base = pa_frame_size(&source_output->sample_spec),
685             1,
686             0,
687             0,
688             NULL);
689
690     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
691     fix_record_buffer_attr_post(s);
692
693     *ss = s->source_output->sample_spec;
694     *map = s->source_output->channel_map;
695
696     pa_idxset_put(c->record_streams, s, &s->index);
697
698     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
699                 ((double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) + (double) s->configured_source_latency) / PA_USEC_PER_MSEC,
700                 (double) pa_bytes_to_usec(s->buffer_attr.fragsize, &source_output->sample_spec) / PA_USEC_PER_MSEC,
701                 (double) s->configured_source_latency / PA_USEC_PER_MSEC);
702
703     pa_source_output_put(s->source_output);
704     return s;
705 }
706
707 /* Called from main context */
708 static void record_stream_send_killed(record_stream *r) {
709     pa_tagstruct *t;
710     record_stream_assert_ref(r);
711
712     t = pa_tagstruct_new(NULL, 0);
713     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
714     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
715     pa_tagstruct_putu32(t, r->index);
716     pa_pstream_send_tagstruct(r->connection->pstream, t);
717 }
718
719 /* Called from main context */
720 static void playback_stream_unlink(playback_stream *s) {
721     pa_assert(s);
722
723     if (!s->connection)
724         return;
725
726     if (s->sink_input) {
727         pa_sink_input_unlink(s->sink_input);
728         pa_sink_input_unref(s->sink_input);
729         s->sink_input = NULL;
730     }
731
732     if (s->drain_request)
733         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
734
735     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
736     s->connection = NULL;
737     playback_stream_unref(s);
738 }
739
740 /* Called from main context */
741 static void playback_stream_free(pa_object* o) {
742     playback_stream *s = PLAYBACK_STREAM(o);
743     pa_assert(s);
744
745     playback_stream_unlink(s);
746
747     pa_memblockq_free(s->memblockq);
748     pa_xfree(s);
749 }
750
751 /* Called from main context */
752 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
753     playback_stream *s = PLAYBACK_STREAM(o);
754     playback_stream_assert_ref(s);
755
756     if (!s->connection)
757         return -1;
758
759     switch (code) {
760         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
761             pa_tagstruct *t;
762             int l = 0;
763
764             for (;;) {
765                 if ((l = pa_atomic_load(&s->missing)) <= 0)
766                     return 0;
767
768                 if (pa_atomic_cmpxchg(&s->missing, l, 0))
769                     break;
770             }
771
772             t = pa_tagstruct_new(NULL, 0);
773             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
774             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
775             pa_tagstruct_putu32(t, s->index);
776             pa_tagstruct_putu32(t, (uint32_t) l);
777             pa_pstream_send_tagstruct(s->connection->pstream, t);
778
779 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
780             break;
781         }
782
783         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
784             pa_tagstruct *t;
785
786 /*             pa_log("signalling underflow"); */
787
788             /* Report that we're empty */
789             t = pa_tagstruct_new(NULL, 0);
790             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
791             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
792             pa_tagstruct_putu32(t, s->index);
793             pa_pstream_send_tagstruct(s->connection->pstream, t);
794             break;
795         }
796
797         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
798             pa_tagstruct *t;
799
800             /* Notify the user we're overflowed*/
801             t = pa_tagstruct_new(NULL, 0);
802             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
803             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
804             pa_tagstruct_putu32(t, s->index);
805             pa_pstream_send_tagstruct(s->connection->pstream, t);
806             break;
807         }
808
809         case PLAYBACK_STREAM_MESSAGE_STARTED:
810
811             if (s->connection->version >= 13) {
812                 pa_tagstruct *t;
813
814                 /* Notify the user we started playback */
815                 t = pa_tagstruct_new(NULL, 0);
816                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
817                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
818                 pa_tagstruct_putu32(t, s->index);
819                 pa_pstream_send_tagstruct(s->connection->pstream, t);
820             }
821
822             break;
823
824         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
825             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
826             break;
827
828         case PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH: {
829             pa_tagstruct *t;
830
831             s->buffer_attr.tlength = (uint32_t) offset;
832
833             t = pa_tagstruct_new(NULL, 0);
834             pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED);
835             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
836             pa_tagstruct_putu32(t, s->index);
837             pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
838             pa_tagstruct_putu32(t, s->buffer_attr.tlength);
839             pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
840             pa_tagstruct_putu32(t, s->buffer_attr.minreq);
841             pa_tagstruct_put_usec(t, s->configured_sink_latency);
842             pa_pstream_send_tagstruct(s->connection->pstream, t);
843
844             break;
845         }
846     }
847
848     return 0;
849 }
850
851 /* Called from main context */
852 static void fix_playback_buffer_attr(playback_stream *s) {
853     size_t frame_size, max_prebuf;
854     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
855
856     pa_assert(s);
857
858     /* This function will be called from the main thread, before as
859      * well as after the sink input has been activated using
860      * pa_sink_input_put()! That means it may not touch any
861      * ->thread_info data, such as the memblockq! */
862
863     frame_size = pa_frame_size(&s->sink_input->sample_spec);
864
865     if (s->buffer_attr.maxlength == (uint32_t) -1 || s->buffer_attr.maxlength > MAX_MEMBLOCKQ_LENGTH)
866         s->buffer_attr.maxlength = MAX_MEMBLOCKQ_LENGTH;
867     if (s->buffer_attr.maxlength <= 0)
868         s->buffer_attr.maxlength = (uint32_t) frame_size;
869
870     if (s->buffer_attr.tlength == (uint32_t) -1)
871         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
872     if (s->buffer_attr.tlength <= 0)
873         s->buffer_attr.tlength = (uint32_t) frame_size;
874
875     if (s->buffer_attr.minreq == (uint32_t) -1)
876         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
877     if (s->buffer_attr.minreq <= 0)
878         s->buffer_attr.minreq = (uint32_t) frame_size;
879
880     if (s->buffer_attr.tlength < s->buffer_attr.minreq+frame_size)
881         s->buffer_attr.tlength = s->buffer_attr.minreq+(uint32_t) frame_size;
882
883     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(s->buffer_attr.tlength, &s->sink_input->sample_spec);
884     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(s->buffer_attr.minreq, &s->sink_input->sample_spec);
885
886     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
887                 (double) tlength_usec / PA_USEC_PER_MSEC,
888                 (double) minreq_usec / PA_USEC_PER_MSEC);
889
890     if (s->early_requests) {
891
892         /* In early request mode we need to emulate the classic
893          * fragment-based playback model. We do this setting the sink
894          * latency to the fragment size. */
895
896         sink_usec = minreq_usec;
897         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
898
899     } else if (s->adjust_latency) {
900
901         /* So, the user asked us to adjust the latency of the stream
902          * buffer according to the what the sink can provide. The
903          * tlength passed in shall be the overall latency. Roughly
904          * half the latency will be spent on the hw buffer, the other
905          * half of it in the async buffer queue we maintain for each
906          * client. In between we'll have a safety space of size
907          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
908          * empty and needs to be filled, then our buffer must have
909          * enough data to fulfill this request immediatly and thus
910          * have at least the same tlength as the size of the hw
911          * buffer. It additionally needs space for 2 times minreq
912          * because if the buffer ran empty and a partial fillup
913          * happens immediately on the next iteration we need to be
914          * able to fulfill it and give the application also minreq
915          * time to fill it up again for the next request Makes 2 times
916          * minreq in plus.. */
917
918         if (tlength_usec > minreq_usec*2)
919             sink_usec = (tlength_usec - minreq_usec*2)/2;
920         else
921             sink_usec = 0;
922
923         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
924
925     } else {
926
927         /* Ok, the user didn't ask us to adjust the latency, but we
928          * still need to make sure that the parameters from the user
929          * do make sense. */
930
931         if (tlength_usec > minreq_usec*2)
932             sink_usec = (tlength_usec - minreq_usec*2);
933         else
934             sink_usec = 0;
935
936         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
937     }
938
939     s->configured_sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
940
941     if (s->early_requests) {
942
943         /* Ok, we didn't necessarily get what we were asking for, so
944          * let's tell the user */
945
946         minreq_usec = s->configured_sink_latency;
947
948     } else if (s->adjust_latency) {
949
950         /* Ok, we didn't necessarily get what we were asking for, so
951          * let's subtract from what we asked for for the remaining
952          * buffer space */
953
954         if (tlength_usec >= s->configured_sink_latency)
955             tlength_usec -= s->configured_sink_latency;
956     }
957
958     /* FIXME: This is actually larger than necessary, since not all of
959      * the sink latency is actually rewritable. */
960     if (tlength_usec < s->configured_sink_latency + 2*minreq_usec)
961         tlength_usec = s->configured_sink_latency + 2*minreq_usec;
962
963     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
964         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
965         s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
966
967     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
968         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
969         s->buffer_attr.minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
970
971     if (s->buffer_attr.minreq <= 0) {
972         s->buffer_attr.minreq = (uint32_t) frame_size;
973         s->buffer_attr.tlength += (uint32_t) frame_size*2;
974     }
975
976     if (s->buffer_attr.tlength <= s->buffer_attr.minreq)
977         s->buffer_attr.tlength = s->buffer_attr.minreq*2 + (uint32_t) frame_size;
978
979     max_prebuf = s->buffer_attr.tlength + (uint32_t)frame_size - s->buffer_attr.minreq;
980
981     if (s->buffer_attr.prebuf == (uint32_t) -1 ||
982         s->buffer_attr.prebuf > max_prebuf)
983         s->buffer_attr.prebuf = max_prebuf;
984 }
985
986 /* Called from main context */
987 static playback_stream* playback_stream_new(
988         pa_native_connection *c,
989         pa_sink *sink,
990         pa_sample_spec *ss,
991         pa_channel_map *map,
992         pa_buffer_attr *a,
993         pa_cvolume *volume,
994         pa_bool_t muted,
995         pa_bool_t muted_set,
996         uint32_t syncid,
997         uint32_t *missing,
998         pa_sink_input_flags_t flags,
999         pa_proplist *p,
1000         pa_bool_t adjust_latency,
1001         pa_bool_t early_requests,
1002         int *ret) {
1003
1004     playback_stream *s, *ssync;
1005     pa_sink_input *sink_input = NULL;
1006     pa_memchunk silence;
1007     uint32_t idx;
1008     int64_t start_index;
1009     pa_sink_input_new_data data;
1010
1011     pa_assert(c);
1012     pa_assert(ss);
1013     pa_assert(missing);
1014     pa_assert(p);
1015     pa_assert(ret);
1016
1017     /* Find syncid group */
1018     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
1019
1020         if (!playback_stream_isinstance(ssync))
1021             continue;
1022
1023         if (ssync->syncid == syncid)
1024             break;
1025     }
1026
1027     /* Synced streams must connect to the same sink */
1028     if (ssync) {
1029
1030         if (!sink)
1031             sink = ssync->sink_input->sink;
1032         else if (sink != ssync->sink_input->sink) {
1033             *ret = PA_ERR_INVALID;
1034             return NULL;
1035         }
1036     }
1037
1038     pa_sink_input_new_data_init(&data);
1039
1040     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1041     data.driver = __FILE__;
1042     data.module = c->options->module;
1043     data.client = c->client;
1044     data.sink = sink;
1045     pa_sink_input_new_data_set_sample_spec(&data, ss);
1046     pa_sink_input_new_data_set_channel_map(&data, map);
1047     if (volume)
1048         pa_sink_input_new_data_set_volume(&data, volume);
1049     if (muted_set)
1050         pa_sink_input_new_data_set_muted(&data, muted);
1051     data.sync_base = ssync ? ssync->sink_input : NULL;
1052
1053     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1054
1055     pa_sink_input_new_data_done(&data);
1056
1057     if (!sink_input)
1058         return NULL;
1059
1060     s = pa_msgobject_new(playback_stream);
1061     s->parent.parent.parent.free = playback_stream_free;
1062     s->parent.parent.process_msg = playback_stream_process_msg;
1063     s->connection = c;
1064     s->syncid = syncid;
1065     s->sink_input = sink_input;
1066     s->is_underrun = TRUE;
1067     s->drain_request = FALSE;
1068     pa_atomic_store(&s->missing, 0);
1069     s->buffer_attr = *a;
1070     s->adjust_latency = adjust_latency;
1071     s->early_requests = early_requests;
1072
1073     s->sink_input->parent.process_msg = sink_input_process_msg;
1074     s->sink_input->pop = sink_input_pop_cb;
1075     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1076     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1077     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1078     s->sink_input->kill = sink_input_kill_cb;
1079     s->sink_input->moving = sink_input_moving_cb;
1080     s->sink_input->suspend = sink_input_suspend_cb;
1081     s->sink_input->send_event = sink_input_send_event_cb;
1082     s->sink_input->userdata = s;
1083
1084     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1085
1086     fix_playback_buffer_attr(s);
1087
1088     pa_sink_input_get_silence(sink_input, &silence);
1089     s->memblockq = pa_memblockq_new(
1090             start_index,
1091             s->buffer_attr.maxlength,
1092             s->buffer_attr.tlength,
1093             pa_frame_size(&sink_input->sample_spec),
1094             s->buffer_attr.prebuf,
1095             s->buffer_attr.minreq,
1096             0,
1097             &silence);
1098     pa_memblock_unref(silence.memblock);
1099
1100     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1101
1102     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1103
1104     *ss = s->sink_input->sample_spec;
1105     *map = s->sink_input->channel_map;
1106
1107     pa_idxset_put(c->output_streams, s, &s->index);
1108
1109     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1110                 ((double) pa_bytes_to_usec(s->buffer_attr.tlength, &sink_input->sample_spec) + (double) s->configured_sink_latency) / PA_USEC_PER_MSEC,
1111                 (double) pa_bytes_to_usec(s->buffer_attr.tlength-s->buffer_attr.minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1112                 (double) pa_bytes_to_usec(s->buffer_attr.minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1113                 (double) s->configured_sink_latency / PA_USEC_PER_MSEC);
1114
1115     pa_sink_input_put(s->sink_input);
1116     return s;
1117 }
1118
1119 /* Called from IO context */
1120 static void playback_stream_request_bytes(playback_stream *s) {
1121     size_t m, minreq;
1122     int previous_missing;
1123
1124     playback_stream_assert_ref(s);
1125
1126     m = pa_memblockq_pop_missing(s->memblockq);
1127
1128     if (m <= 0)
1129         return;
1130
1131 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1132
1133     previous_missing = pa_atomic_add(&s->missing, (int) m);
1134     minreq = pa_memblockq_get_minreq(s->memblockq);
1135
1136     if (pa_memblockq_prebuf_active(s->memblockq) ||
1137         (previous_missing < (int) minreq && previous_missing + (int) m >= (int) minreq))
1138         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1139 }
1140
1141
1142 /* Called from main context */
1143 static void playback_stream_send_killed(playback_stream *p) {
1144     pa_tagstruct *t;
1145     playback_stream_assert_ref(p);
1146
1147     t = pa_tagstruct_new(NULL, 0);
1148     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1149     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1150     pa_tagstruct_putu32(t, p->index);
1151     pa_pstream_send_tagstruct(p->connection->pstream, t);
1152 }
1153
1154 /* Called from main context */
1155 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1156     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1157     pa_native_connection_assert_ref(c);
1158
1159     if (!c->protocol)
1160         return -1;
1161
1162     switch (code) {
1163
1164         case CONNECTION_MESSAGE_REVOKE:
1165             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1166             break;
1167
1168         case CONNECTION_MESSAGE_RELEASE:
1169             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1170             break;
1171     }
1172
1173     return 0;
1174 }
1175
1176 /* Called from main context */
1177 static void native_connection_unlink(pa_native_connection *c) {
1178     record_stream *r;
1179     output_stream *o;
1180
1181     pa_assert(c);
1182
1183     if (!c->protocol)
1184         return;
1185
1186     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1187
1188     if (c->options)
1189         pa_native_options_unref(c->options);
1190
1191     while ((r = pa_idxset_first(c->record_streams, NULL)))
1192         record_stream_unlink(r);
1193
1194     while ((o = pa_idxset_first(c->output_streams, NULL)))
1195         if (playback_stream_isinstance(o))
1196             playback_stream_unlink(PLAYBACK_STREAM(o));
1197         else
1198             upload_stream_unlink(UPLOAD_STREAM(o));
1199
1200     if (c->subscription)
1201         pa_subscription_free(c->subscription);
1202
1203     if (c->pstream)
1204         pa_pstream_unlink(c->pstream);
1205
1206     if (c->auth_timeout_event) {
1207         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1208         c->auth_timeout_event = NULL;
1209     }
1210
1211     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1212     c->protocol = NULL;
1213     pa_native_connection_unref(c);
1214 }
1215
1216 /* Called from main context */
1217 static void native_connection_free(pa_object *o) {
1218     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1219
1220     pa_assert(c);
1221
1222     native_connection_unlink(c);
1223
1224     pa_idxset_free(c->record_streams, NULL, NULL);
1225     pa_idxset_free(c->output_streams, NULL, NULL);
1226
1227     pa_pdispatch_unref(c->pdispatch);
1228     pa_pstream_unref(c->pstream);
1229     pa_client_free(c->client);
1230
1231     pa_xfree(c);
1232 }
1233
1234 /* Called from main context */
1235 static void native_connection_send_memblock(pa_native_connection *c) {
1236     uint32_t start;
1237     record_stream *r;
1238
1239     start = PA_IDXSET_INVALID;
1240     for (;;) {
1241         pa_memchunk chunk;
1242
1243         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1244             return;
1245
1246         if (start == PA_IDXSET_INVALID)
1247             start = c->rrobin_index;
1248         else if (start == c->rrobin_index)
1249             return;
1250
1251         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1252             pa_memchunk schunk = chunk;
1253
1254             if (schunk.length > r->buffer_attr.fragsize)
1255                 schunk.length = r->buffer_attr.fragsize;
1256
1257             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1258
1259             pa_memblockq_drop(r->memblockq, schunk.length);
1260             pa_memblock_unref(schunk.memblock);
1261
1262             return;
1263         }
1264     }
1265 }
1266
1267 /*** sink input callbacks ***/
1268
1269 /* Called from thread context */
1270 static void handle_seek(playback_stream *s, int64_t indexw) {
1271     playback_stream_assert_ref(s);
1272
1273 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1274
1275     if (s->sink_input->thread_info.underrun_for > 0) {
1276
1277 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1278
1279         if (pa_memblockq_is_readable(s->memblockq)) {
1280
1281             /* We just ended an underrun, let's ask the sink
1282              * for a complete rewind rewrite */
1283
1284             pa_log_debug("Requesting rewind due to end of underrun.");
1285             pa_sink_input_request_rewind(s->sink_input,
1286                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1287                                          FALSE, TRUE, FALSE);
1288         }
1289
1290     } else {
1291         int64_t indexr;
1292
1293         indexr = pa_memblockq_get_read_index(s->memblockq);
1294
1295         if (indexw < indexr) {
1296             /* OK, the sink already asked for this data, so
1297              * let's have it usk us again */
1298
1299             pa_log_debug("Requesting rewind due to rewrite.");
1300             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1301         }
1302     }
1303
1304     playback_stream_request_bytes(s);
1305 }
1306
1307 /* Called from thread context */
1308 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1309     pa_sink_input *i = PA_SINK_INPUT(o);
1310     playback_stream *s;
1311
1312     pa_sink_input_assert_ref(i);
1313     s = PLAYBACK_STREAM(i->userdata);
1314     playback_stream_assert_ref(s);
1315
1316     switch (code) {
1317
1318         case SINK_INPUT_MESSAGE_SEEK: {
1319             int64_t windex;
1320
1321             windex = pa_memblockq_get_write_index(s->memblockq);
1322
1323             /* The client side is incapable of accounting correctly
1324              * for seeks of a type != PA_SEEK_RELATIVE. We need to be
1325              * able to deal with that. */
1326
1327             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata), PA_PTR_TO_UINT(userdata) == PA_SEEK_RELATIVE);
1328
1329             handle_seek(s, windex);
1330             return 0;
1331         }
1332
1333         case SINK_INPUT_MESSAGE_POST_DATA: {
1334             int64_t windex;
1335
1336             pa_assert(chunk);
1337
1338             windex = pa_memblockq_get_write_index(s->memblockq);
1339
1340 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1341
1342             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1343                 pa_log_warn("Failed to push data into queue");
1344                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1345                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE, TRUE);
1346             }
1347
1348             handle_seek(s, windex);
1349
1350 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1351
1352             return 0;
1353         }
1354
1355         case SINK_INPUT_MESSAGE_DRAIN:
1356         case SINK_INPUT_MESSAGE_FLUSH:
1357         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1358         case SINK_INPUT_MESSAGE_TRIGGER: {
1359
1360             int64_t windex;
1361             pa_sink_input *isync;
1362             void (*func)(pa_memblockq *bq);
1363
1364             switch  (code) {
1365                 case SINK_INPUT_MESSAGE_FLUSH:
1366                     func = pa_memblockq_flush_write;
1367                     break;
1368
1369                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1370                     func = pa_memblockq_prebuf_force;
1371                     break;
1372
1373                 case SINK_INPUT_MESSAGE_DRAIN:
1374                 case SINK_INPUT_MESSAGE_TRIGGER:
1375                     func = pa_memblockq_prebuf_disable;
1376                     break;
1377
1378                 default:
1379                     pa_assert_not_reached();
1380             }
1381
1382             windex = pa_memblockq_get_write_index(s->memblockq);
1383             func(s->memblockq);
1384             handle_seek(s, windex);
1385
1386             /* Do the same for all other members in the sync group */
1387             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1388                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1389                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1390                 func(ssync->memblockq);
1391                 handle_seek(ssync, windex);
1392             }
1393
1394             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1395                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1396                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1397                 func(ssync->memblockq);
1398                 handle_seek(ssync, windex);
1399             }
1400
1401             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1402                 if (!pa_memblockq_is_readable(s->memblockq))
1403                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1404                 else {
1405                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1406                     s->drain_request = TRUE;
1407                 }
1408             }
1409
1410             return 0;
1411         }
1412
1413         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1414             /* Atomically get a snapshot of all timing parameters... */
1415             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1416             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1417             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1418             s->current_sink_latency = pa_sink_get_latency_within_thread(s->sink_input->sink);
1419             s->underrun_for = s->sink_input->thread_info.underrun_for;
1420             s->playing_for = s->sink_input->thread_info.playing_for;
1421
1422             return 0;
1423
1424         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1425             int64_t windex;
1426
1427             windex = pa_memblockq_get_write_index(s->memblockq);
1428
1429             pa_memblockq_prebuf_force(s->memblockq);
1430
1431             handle_seek(s, windex);
1432
1433             /* Fall through to the default handler */
1434             break;
1435         }
1436
1437         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1438             pa_usec_t *r = userdata;
1439
1440             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1441
1442             /* Fall through, the default handler will add in the extra
1443              * latency added by the resampler */
1444             break;
1445         }
1446
1447         case SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR: {
1448             pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1449             pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1450             return 0;
1451         }
1452     }
1453
1454     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1455 }
1456
1457 /* Called from thread context */
1458 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1459     playback_stream *s;
1460
1461     pa_sink_input_assert_ref(i);
1462     s = PLAYBACK_STREAM(i->userdata);
1463     playback_stream_assert_ref(s);
1464     pa_assert(chunk);
1465
1466 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1467
1468     if (pa_memblockq_is_readable(s->memblockq))
1469         s->is_underrun = FALSE;
1470     else {
1471         if (!s->is_underrun)
1472             pa_log_debug("Underrun on '%s', %lu bytes in queue.", pa_strnull(pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME)), (unsigned long) pa_memblockq_get_length(s->memblockq));
1473
1474         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1475             s->drain_request = FALSE;
1476             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1477         } else if (!s->is_underrun)
1478             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1479
1480         s->is_underrun = TRUE;
1481
1482         playback_stream_request_bytes(s);
1483     }
1484
1485     /* This call will not fail with prebuf=0, hence we check for
1486        underrun explicitly above */
1487     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1488         return -1;
1489
1490     chunk->length = PA_MIN(nbytes, chunk->length);
1491
1492     if (i->thread_info.underrun_for > 0)
1493         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1494
1495     pa_memblockq_drop(s->memblockq, chunk->length);
1496     playback_stream_request_bytes(s);
1497
1498     return 0;
1499 }
1500
1501 /* Called from thread context */
1502 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1503     playback_stream *s;
1504
1505     pa_sink_input_assert_ref(i);
1506     s = PLAYBACK_STREAM(i->userdata);
1507     playback_stream_assert_ref(s);
1508
1509     /* If we are in an underrun, then we don't rewind */
1510     if (i->thread_info.underrun_for > 0)
1511         return;
1512
1513     pa_memblockq_rewind(s->memblockq, nbytes);
1514 }
1515
1516 /* Called from thread context */
1517 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1518     playback_stream *s;
1519
1520     pa_sink_input_assert_ref(i);
1521     s = PLAYBACK_STREAM(i->userdata);
1522     playback_stream_assert_ref(s);
1523
1524     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1525 }
1526
1527 /* Called from thread context */
1528 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1529     playback_stream *s;
1530     size_t new_tlength, old_tlength;
1531
1532     pa_sink_input_assert_ref(i);
1533     s = PLAYBACK_STREAM(i->userdata);
1534     playback_stream_assert_ref(s);
1535
1536     old_tlength = pa_memblockq_get_tlength(s->memblockq);
1537     new_tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1538
1539     if (old_tlength < new_tlength) {
1540         pa_log_debug("max_request changed, trying to update from %zu to %zu.", old_tlength, new_tlength);
1541         pa_memblockq_set_tlength(s->memblockq, new_tlength);
1542         new_tlength = pa_memblockq_get_tlength(s->memblockq);
1543
1544         if (new_tlength == old_tlength)
1545             pa_log_debug("Failed to increase tlength");
1546         else {
1547             pa_log_debug("Notifying client about increased tlength");
1548             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UPDATE_TLENGTH, NULL, pa_memblockq_get_tlength(s->memblockq), NULL, NULL);
1549         }
1550     }
1551 }
1552
1553 /* Called from main context */
1554 static void sink_input_kill_cb(pa_sink_input *i) {
1555     playback_stream *s;
1556
1557     pa_sink_input_assert_ref(i);
1558     s = PLAYBACK_STREAM(i->userdata);
1559     playback_stream_assert_ref(s);
1560
1561     playback_stream_send_killed(s);
1562     playback_stream_unlink(s);
1563 }
1564
1565 /* Called from main context */
1566 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1567     playback_stream *s;
1568     pa_tagstruct *t;
1569
1570     pa_sink_input_assert_ref(i);
1571     s = PLAYBACK_STREAM(i->userdata);
1572     playback_stream_assert_ref(s);
1573
1574     if (s->connection->version < 15)
1575       return;
1576
1577     t = pa_tagstruct_new(NULL, 0);
1578     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1579     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1580     pa_tagstruct_putu32(t, s->index);
1581     pa_tagstruct_puts(t, event);
1582     pa_tagstruct_put_proplist(t, pl);
1583     pa_pstream_send_tagstruct(s->connection->pstream, t);
1584 }
1585
1586 /* Called from main context */
1587 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1588     playback_stream *s;
1589     pa_tagstruct *t;
1590
1591     pa_sink_input_assert_ref(i);
1592     s = PLAYBACK_STREAM(i->userdata);
1593     playback_stream_assert_ref(s);
1594
1595     if (s->connection->version < 12)
1596       return;
1597
1598     t = pa_tagstruct_new(NULL, 0);
1599     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1600     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1601     pa_tagstruct_putu32(t, s->index);
1602     pa_tagstruct_put_boolean(t, suspend);
1603     pa_pstream_send_tagstruct(s->connection->pstream, t);
1604 }
1605
1606 /* Called from main context */
1607 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1608     playback_stream *s;
1609     pa_tagstruct *t;
1610
1611     pa_sink_input_assert_ref(i);
1612     s = PLAYBACK_STREAM(i->userdata);
1613     playback_stream_assert_ref(s);
1614
1615     fix_playback_buffer_attr(s);
1616     pa_memblockq_apply_attr(s->memblockq, &s->buffer_attr);
1617     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1618
1619     if (s->connection->version < 12)
1620       return;
1621
1622     t = pa_tagstruct_new(NULL, 0);
1623     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1624     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1625     pa_tagstruct_putu32(t, s->index);
1626     pa_tagstruct_putu32(t, dest->index);
1627     pa_tagstruct_puts(t, dest->name);
1628     pa_tagstruct_put_boolean(t, pa_sink_get_state(dest) == PA_SINK_SUSPENDED);
1629
1630     if (s->connection->version >= 13) {
1631         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1632         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
1633         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
1634         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
1635         pa_tagstruct_put_usec(t, s->configured_sink_latency);
1636     }
1637
1638     pa_pstream_send_tagstruct(s->connection->pstream, t);
1639 }
1640
1641 /*** source_output callbacks ***/
1642
1643 /* Called from thread context */
1644 static int source_output_process_msg(pa_msgobject *_o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1645     pa_source_output *o = PA_SOURCE_OUTPUT(_o);
1646     record_stream *s;
1647
1648     pa_source_output_assert_ref(o);
1649     s = RECORD_STREAM(o->userdata);
1650     record_stream_assert_ref(s);
1651
1652     switch (code) {
1653         case SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY:
1654             /* Atomically get a snapshot of all timing parameters... */
1655             s->current_monitor_latency = o->source->monitor_of ? pa_sink_get_latency_within_thread(o->source->monitor_of) : 0;
1656             s->current_source_latency = pa_source_get_latency_within_thread(o->source);
1657             s->on_the_fly_snapshot = pa_atomic_load(&s->on_the_fly);
1658             return 0;
1659     }
1660
1661     return pa_source_output_process_msg(_o, code, userdata, offset, chunk);
1662 }
1663
1664 /* Called from thread context */
1665 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1666     record_stream *s;
1667
1668     pa_source_output_assert_ref(o);
1669     s = RECORD_STREAM(o->userdata);
1670     record_stream_assert_ref(s);
1671     pa_assert(chunk);
1672
1673     pa_atomic_add(&s->on_the_fly, chunk->length);
1674     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1675 }
1676
1677 static void source_output_kill_cb(pa_source_output *o) {
1678     record_stream *s;
1679
1680     pa_source_output_assert_ref(o);
1681     s = RECORD_STREAM(o->userdata);
1682     record_stream_assert_ref(s);
1683
1684     record_stream_send_killed(s);
1685     record_stream_unlink(s);
1686 }
1687
1688 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1689     record_stream *s;
1690
1691     pa_source_output_assert_ref(o);
1692     s = RECORD_STREAM(o->userdata);
1693     record_stream_assert_ref(s);
1694
1695     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1696
1697     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1698 }
1699
1700 /* Called from main context */
1701 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1702     record_stream *s;
1703     pa_tagstruct *t;
1704
1705     pa_source_output_assert_ref(o);
1706     s = RECORD_STREAM(o->userdata);
1707     record_stream_assert_ref(s);
1708
1709     if (s->connection->version < 15)
1710       return;
1711
1712     t = pa_tagstruct_new(NULL, 0);
1713     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1714     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1715     pa_tagstruct_putu32(t, s->index);
1716     pa_tagstruct_puts(t, event);
1717     pa_tagstruct_put_proplist(t, pl);
1718     pa_pstream_send_tagstruct(s->connection->pstream, t);
1719 }
1720
1721 /* Called from main context */
1722 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1723     record_stream *s;
1724     pa_tagstruct *t;
1725
1726     pa_source_output_assert_ref(o);
1727     s = RECORD_STREAM(o->userdata);
1728     record_stream_assert_ref(s);
1729
1730     if (s->connection->version < 12)
1731       return;
1732
1733     t = pa_tagstruct_new(NULL, 0);
1734     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1735     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1736     pa_tagstruct_putu32(t, s->index);
1737     pa_tagstruct_put_boolean(t, suspend);
1738     pa_pstream_send_tagstruct(s->connection->pstream, t);
1739 }
1740
1741 /* Called from main context */
1742 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1743     record_stream *s;
1744     pa_tagstruct *t;
1745
1746     pa_source_output_assert_ref(o);
1747     s = RECORD_STREAM(o->userdata);
1748     record_stream_assert_ref(s);
1749
1750     fix_record_buffer_attr_pre(s);
1751     pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
1752     pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
1753     fix_record_buffer_attr_post(s);
1754
1755     if (s->connection->version < 12)
1756       return;
1757
1758     t = pa_tagstruct_new(NULL, 0);
1759     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1760     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1761     pa_tagstruct_putu32(t, s->index);
1762     pa_tagstruct_putu32(t, dest->index);
1763     pa_tagstruct_puts(t, dest->name);
1764     pa_tagstruct_put_boolean(t, pa_source_get_state(dest) == PA_SOURCE_SUSPENDED);
1765
1766     if (s->connection->version >= 13) {
1767         pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
1768         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1769         pa_tagstruct_put_usec(t, s->configured_source_latency);
1770     }
1771
1772     pa_pstream_send_tagstruct(s->connection->pstream, t);
1773 }
1774
1775 /*** pdispatch callbacks ***/
1776
1777 static void protocol_error(pa_native_connection *c) {
1778     pa_log("protocol error, kicking client");
1779     native_connection_unlink(c);
1780 }
1781
1782 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1783 if (!(expression)) { \
1784     pa_pstream_send_error((pstream), (tag), (error)); \
1785     return; \
1786 } \
1787 } while(0);
1788
1789 static pa_tagstruct *reply_new(uint32_t tag) {
1790     pa_tagstruct *reply;
1791
1792     reply = pa_tagstruct_new(NULL, 0);
1793     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1794     pa_tagstruct_putu32(reply, tag);
1795     return reply;
1796 }
1797
1798 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1799     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1800     playback_stream *s;
1801     uint32_t sink_index, syncid, missing;
1802     pa_buffer_attr attr;
1803     const char *name = NULL, *sink_name;
1804     pa_sample_spec ss;
1805     pa_channel_map map;
1806     pa_tagstruct *reply;
1807     pa_sink *sink = NULL;
1808     pa_cvolume volume;
1809     pa_bool_t
1810         corked = FALSE,
1811         no_remap = FALSE,
1812         no_remix = FALSE,
1813         fix_format = FALSE,
1814         fix_rate = FALSE,
1815         fix_channels = FALSE,
1816         no_move = FALSE,
1817         variable_rate = FALSE,
1818         muted = FALSE,
1819         adjust_latency = FALSE,
1820         early_requests = FALSE,
1821         dont_inhibit_auto_suspend = FALSE,
1822         muted_set = FALSE,
1823         fail_on_suspend = FALSE;
1824     pa_sink_input_flags_t flags = 0;
1825     pa_proplist *p;
1826     pa_bool_t volume_set = TRUE;
1827     int ret = PA_ERR_INVALID;
1828
1829     pa_native_connection_assert_ref(c);
1830     pa_assert(t);
1831     memset(&attr, 0, sizeof(attr));
1832
1833     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1834         pa_tagstruct_get(
1835                 t,
1836                 PA_TAG_SAMPLE_SPEC, &ss,
1837                 PA_TAG_CHANNEL_MAP, &map,
1838                 PA_TAG_U32, &sink_index,
1839                 PA_TAG_STRING, &sink_name,
1840                 PA_TAG_U32, &attr.maxlength,
1841                 PA_TAG_BOOLEAN, &corked,
1842                 PA_TAG_U32, &attr.tlength,
1843                 PA_TAG_U32, &attr.prebuf,
1844                 PA_TAG_U32, &attr.minreq,
1845                 PA_TAG_U32, &syncid,
1846                 PA_TAG_CVOLUME, &volume,
1847                 PA_TAG_INVALID) < 0) {
1848
1849         protocol_error(c);
1850         return;
1851     }
1852
1853     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1854     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1855     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1856     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1857     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1858     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1859     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1860     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1861
1862     p = pa_proplist_new();
1863
1864     if (name)
1865         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1866
1867     if (c->version >= 12)  {
1868         /* Since 0.9.8 the user can ask for a couple of additional flags */
1869
1870         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1871             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1872             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1873             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1874             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1875             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1876             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1877
1878             protocol_error(c);
1879             pa_proplist_free(p);
1880             return;
1881         }
1882     }
1883
1884     if (c->version >= 13) {
1885
1886         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1887             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1888             pa_tagstruct_get_proplist(t, p) < 0) {
1889             protocol_error(c);
1890             pa_proplist_free(p);
1891             return;
1892         }
1893     }
1894
1895     if (c->version >= 14) {
1896
1897         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1898             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1899             protocol_error(c);
1900             pa_proplist_free(p);
1901             return;
1902         }
1903     }
1904
1905     if (c->version >= 15) {
1906
1907         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1908             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1909             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1910             protocol_error(c);
1911             pa_proplist_free(p);
1912             return;
1913         }
1914     }
1915
1916     if (!pa_tagstruct_eof(t)) {
1917         protocol_error(c);
1918         pa_proplist_free(p);
1919         return;
1920     }
1921
1922     if (sink_index != PA_INVALID_INDEX) {
1923
1924         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1925             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1926             pa_proplist_free(p);
1927             return;
1928         }
1929
1930     } else if (sink_name) {
1931
1932         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1933             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1934             pa_proplist_free(p);
1935             return;
1936         }
1937     }
1938
1939     flags =
1940         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1941         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1942         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1943         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1944         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1945         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1946         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1947         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1948         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1949         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1950
1951     /* Only since protocol version 15 there's a seperate muted_set
1952      * flag. For older versions we synthesize it here */
1953     muted_set = muted_set || muted;
1954
1955     s = playback_stream_new(c, sink, &ss, &map, &attr, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1956     pa_proplist_free(p);
1957
1958     CHECK_VALIDITY(c->pstream, s, tag, ret);
1959
1960     reply = reply_new(tag);
1961     pa_tagstruct_putu32(reply, s->index);
1962     pa_assert(s->sink_input);
1963     pa_tagstruct_putu32(reply, s->sink_input->index);
1964     pa_tagstruct_putu32(reply, missing);
1965
1966 /*     pa_log("initial request is %u", missing); */
1967
1968     if (c->version >= 9) {
1969         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1970
1971         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
1972         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.tlength);
1973         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.prebuf);
1974         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.minreq);
1975     }
1976
1977     if (c->version >= 12) {
1978         /* Since 0.9.8 we support sending the chosen sample
1979          * spec/channel map/device/suspend status back to the
1980          * client */
1981
1982         pa_tagstruct_put_sample_spec(reply, &ss);
1983         pa_tagstruct_put_channel_map(reply, &map);
1984
1985         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1986         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1987
1988         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
1989     }
1990
1991     if (c->version >= 13)
1992         pa_tagstruct_put_usec(reply, s->configured_sink_latency);
1993
1994     pa_pstream_send_tagstruct(c->pstream, reply);
1995 }
1996
1997 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1998     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1999     uint32_t channel;
2000
2001     pa_native_connection_assert_ref(c);
2002     pa_assert(t);
2003
2004     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2005         !pa_tagstruct_eof(t)) {
2006         protocol_error(c);
2007         return;
2008     }
2009
2010     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2011
2012     switch (command) {
2013
2014         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
2015             playback_stream *s;
2016             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
2017                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2018                 return;
2019             }
2020
2021             playback_stream_unlink(s);
2022             break;
2023         }
2024
2025         case PA_COMMAND_DELETE_RECORD_STREAM: {
2026             record_stream *s;
2027             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
2028                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2029                 return;
2030             }
2031
2032             record_stream_unlink(s);
2033             break;
2034         }
2035
2036         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
2037             upload_stream *s;
2038
2039             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
2040                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
2041                 return;
2042             }
2043
2044             upload_stream_unlink(s);
2045             break;
2046         }
2047
2048         default:
2049             pa_assert_not_reached();
2050     }
2051
2052     pa_pstream_send_simple_ack(c->pstream, tag);
2053 }
2054
2055 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2056     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2057     record_stream *s;
2058     pa_buffer_attr attr;
2059     uint32_t source_index;
2060     const char *name = NULL, *source_name;
2061     pa_sample_spec ss;
2062     pa_channel_map map;
2063     pa_tagstruct *reply;
2064     pa_source *source = NULL;
2065     pa_bool_t
2066         corked = FALSE,
2067         no_remap = FALSE,
2068         no_remix = FALSE,
2069         fix_format = FALSE,
2070         fix_rate = FALSE,
2071         fix_channels = FALSE,
2072         no_move = FALSE,
2073         variable_rate = FALSE,
2074         adjust_latency = FALSE,
2075         peak_detect = FALSE,
2076         early_requests = FALSE,
2077         dont_inhibit_auto_suspend = FALSE,
2078         fail_on_suspend = FALSE;
2079     pa_source_output_flags_t flags = 0;
2080     pa_proplist *p;
2081     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2082     pa_sink_input *direct_on_input = NULL;
2083     int ret = PA_ERR_INVALID;
2084
2085     pa_native_connection_assert_ref(c);
2086     pa_assert(t);
2087
2088     memset(&attr, 0, sizeof(attr));
2089
2090     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2091         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2092         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2093         pa_tagstruct_getu32(t, &source_index) < 0 ||
2094         pa_tagstruct_gets(t, &source_name) < 0 ||
2095         pa_tagstruct_getu32(t, &attr.maxlength) < 0 ||
2096         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2097         pa_tagstruct_getu32(t, &attr.fragsize) < 0) {
2098         protocol_error(c);
2099         return;
2100     }
2101
2102     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2103     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2104     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2105     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2106     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2107     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2108     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2109
2110     p = pa_proplist_new();
2111
2112     if (name)
2113         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2114
2115     if (c->version >= 12)  {
2116         /* Since 0.9.8 the user can ask for a couple of additional flags */
2117
2118         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2119             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2120             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2121             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2122             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2123             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2124             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2125
2126             protocol_error(c);
2127             pa_proplist_free(p);
2128             return;
2129         }
2130     }
2131
2132     if (c->version >= 13) {
2133
2134         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2135             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2136             pa_tagstruct_get_proplist(t, p) < 0 ||
2137             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2138             protocol_error(c);
2139             pa_proplist_free(p);
2140             return;
2141         }
2142     }
2143
2144     if (c->version >= 14) {
2145
2146         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2147             protocol_error(c);
2148             pa_proplist_free(p);
2149             return;
2150         }
2151     }
2152
2153     if (c->version >= 15) {
2154
2155         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2156             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2157             protocol_error(c);
2158             pa_proplist_free(p);
2159             return;
2160         }
2161     }
2162
2163     if (!pa_tagstruct_eof(t)) {
2164         protocol_error(c);
2165         pa_proplist_free(p);
2166         return;
2167     }
2168
2169     if (source_index != PA_INVALID_INDEX) {
2170
2171         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2172             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2173             pa_proplist_free(p);
2174             return;
2175         }
2176
2177     } else if (source_name) {
2178
2179         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2180             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2181             pa_proplist_free(p);
2182             return;
2183         }
2184     }
2185
2186     if (direct_on_input_idx != PA_INVALID_INDEX) {
2187
2188         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2189             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2190             pa_proplist_free(p);
2191             return;
2192         }
2193     }
2194
2195     flags =
2196         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2197         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2198         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2199         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2200         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2201         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2202         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2203         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2204         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2205         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2206
2207     s = record_stream_new(c, source, &ss, &map, peak_detect, &attr, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2208     pa_proplist_free(p);
2209
2210     CHECK_VALIDITY(c->pstream, s, tag, ret);
2211
2212     reply = reply_new(tag);
2213     pa_tagstruct_putu32(reply, s->index);
2214     pa_assert(s->source_output);
2215     pa_tagstruct_putu32(reply, s->source_output->index);
2216
2217     if (c->version >= 9) {
2218         /* Since 0.9 we support sending the buffer metrics back to the client */
2219
2220         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.maxlength);
2221         pa_tagstruct_putu32(reply, (uint32_t) s->buffer_attr.fragsize);
2222     }
2223
2224     if (c->version >= 12) {
2225         /* Since 0.9.8 we support sending the chosen sample
2226          * spec/channel map/device/suspend status back to the
2227          * client */
2228
2229         pa_tagstruct_put_sample_spec(reply, &ss);
2230         pa_tagstruct_put_channel_map(reply, &map);
2231
2232         pa_tagstruct_putu32(reply, s->source_output->source->index);
2233         pa_tagstruct_puts(reply, s->source_output->source->name);
2234
2235         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2236     }
2237
2238     if (c->version >= 13)
2239         pa_tagstruct_put_usec(reply, s->configured_source_latency);
2240
2241     pa_pstream_send_tagstruct(c->pstream, reply);
2242 }
2243
2244 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2245     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2246     int ret;
2247
2248     pa_native_connection_assert_ref(c);
2249     pa_assert(t);
2250
2251     if (!pa_tagstruct_eof(t)) {
2252         protocol_error(c);
2253         return;
2254     }
2255
2256     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2257     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2258     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2259
2260     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2261 }
2262
2263 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2264     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2265     const void*cookie;
2266     pa_tagstruct *reply;
2267     pa_bool_t shm_on_remote = FALSE, do_shm;
2268
2269     pa_native_connection_assert_ref(c);
2270     pa_assert(t);
2271
2272     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2273         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2274         !pa_tagstruct_eof(t)) {
2275         protocol_error(c);
2276         return;
2277     }
2278
2279     /* Minimum supported version */
2280     if (c->version < 8) {
2281         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2282         return;
2283     }
2284
2285     /* Starting with protocol version 13 the MSB of the version tag
2286        reflects if shm is available for this pa_native_connection or
2287        not. */
2288     if (c->version >= 13) {
2289         shm_on_remote = !!(c->version & 0x80000000U);
2290         c->version &= 0x7FFFFFFFU;
2291     }
2292
2293     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2294
2295     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2296
2297     if (!c->authorized) {
2298         pa_bool_t success = FALSE;
2299
2300 #ifdef HAVE_CREDS
2301         const pa_creds *creds;
2302
2303         if ((creds = pa_pdispatch_creds(pd))) {
2304             if (creds->uid == getuid())
2305                 success = TRUE;
2306             else if (c->options->auth_group) {
2307                 int r;
2308                 gid_t gid;
2309
2310                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2311                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2312                 else if (gid == creds->gid)
2313                     success = TRUE;
2314
2315                 if (!success) {
2316                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2317                         pa_log_warn("Failed to check group membership.");
2318                     else if (r > 0)
2319                         success = TRUE;
2320                 }
2321             }
2322
2323             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2324                         (unsigned long) creds->uid,
2325                         (unsigned long) creds->gid,
2326                         (int) success);
2327         }
2328 #endif
2329
2330         if (!success && c->options->auth_cookie) {
2331             const uint8_t *ac;
2332
2333             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2334                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2335                     success = TRUE;
2336         }
2337
2338         if (!success) {
2339             pa_log_warn("Denied access to client with invalid authorization data.");
2340             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2341             return;
2342         }
2343
2344         c->authorized = TRUE;
2345         if (c->auth_timeout_event) {
2346             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2347             c->auth_timeout_event = NULL;
2348         }
2349     }
2350
2351     /* Enable shared memory support if possible */
2352     do_shm =
2353         pa_mempool_is_shared(c->protocol->core->mempool) &&
2354         c->is_local;
2355
2356     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2357
2358     if (do_shm)
2359         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2360             do_shm = FALSE;
2361
2362 #ifdef HAVE_CREDS
2363     if (do_shm) {
2364         /* Only enable SHM if both sides are owned by the same
2365          * user. This is a security measure because otherwise data
2366          * private to the user might leak. */
2367
2368         const pa_creds *creds;
2369         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2370             do_shm = FALSE;
2371     }
2372 #endif
2373
2374     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2375     pa_pstream_enable_shm(c->pstream, do_shm);
2376
2377     reply = reply_new(tag);
2378     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2379
2380 #ifdef HAVE_CREDS
2381 {
2382     /* SHM support is only enabled after both sides made sure they are the same user. */
2383
2384     pa_creds ucred;
2385
2386     ucred.uid = getuid();
2387     ucred.gid = getgid();
2388
2389     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2390 }
2391 #else
2392     pa_pstream_send_tagstruct(c->pstream, reply);
2393 #endif
2394 }
2395
2396 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2397     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2398     const char *name = NULL;
2399     pa_proplist *p;
2400     pa_tagstruct *reply;
2401
2402     pa_native_connection_assert_ref(c);
2403     pa_assert(t);
2404
2405     p = pa_proplist_new();
2406
2407     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2408         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2409         !pa_tagstruct_eof(t)) {
2410
2411         protocol_error(c);
2412         pa_proplist_free(p);
2413         return;
2414     }
2415
2416     if (name)
2417         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2418             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2419             pa_proplist_free(p);
2420             return;
2421         }
2422
2423     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2424     pa_proplist_free(p);
2425
2426     reply = reply_new(tag);
2427
2428     if (c->version >= 13)
2429         pa_tagstruct_putu32(reply, c->client->index);
2430
2431     pa_pstream_send_tagstruct(c->pstream, reply);
2432 }
2433
2434 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2435     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2436     const char *name;
2437     uint32_t idx = PA_IDXSET_INVALID;
2438
2439     pa_native_connection_assert_ref(c);
2440     pa_assert(t);
2441
2442     if (pa_tagstruct_gets(t, &name) < 0 ||
2443         !pa_tagstruct_eof(t)) {
2444         protocol_error(c);
2445         return;
2446     }
2447
2448     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2449     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2450
2451     if (command == PA_COMMAND_LOOKUP_SINK) {
2452         pa_sink *sink;
2453         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2454             idx = sink->index;
2455     } else {
2456         pa_source *source;
2457         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2458         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2459             idx = source->index;
2460     }
2461
2462     if (idx == PA_IDXSET_INVALID)
2463         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2464     else {
2465         pa_tagstruct *reply;
2466         reply = reply_new(tag);
2467         pa_tagstruct_putu32(reply, idx);
2468         pa_pstream_send_tagstruct(c->pstream, reply);
2469     }
2470 }
2471
2472 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2473     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2474     uint32_t idx;
2475     playback_stream *s;
2476
2477     pa_native_connection_assert_ref(c);
2478     pa_assert(t);
2479
2480     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2481         !pa_tagstruct_eof(t)) {
2482         protocol_error(c);
2483         return;
2484     }
2485
2486     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2487     s = pa_idxset_get_by_index(c->output_streams, idx);
2488     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2489     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2490
2491     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2492 }
2493
2494 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2495     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2496     pa_tagstruct *reply;
2497     const pa_mempool_stat *stat;
2498
2499     pa_native_connection_assert_ref(c);
2500     pa_assert(t);
2501
2502     if (!pa_tagstruct_eof(t)) {
2503         protocol_error(c);
2504         return;
2505     }
2506
2507     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2508
2509     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2510
2511     reply = reply_new(tag);
2512     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2513     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2514     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2515     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2516     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2517     pa_pstream_send_tagstruct(c->pstream, reply);
2518 }
2519
2520 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2521     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2522     pa_tagstruct *reply;
2523     playback_stream *s;
2524     struct timeval tv, now;
2525     uint32_t idx;
2526
2527     pa_native_connection_assert_ref(c);
2528     pa_assert(t);
2529
2530     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2531         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2532         !pa_tagstruct_eof(t)) {
2533         protocol_error(c);
2534         return;
2535     }
2536
2537     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2538     s = pa_idxset_get_by_index(c->output_streams, idx);
2539     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2540     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2541
2542     /* Get an atomic snapshot of all timing parameters */
2543     pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2544
2545     reply = reply_new(tag);
2546     pa_tagstruct_put_usec(reply,
2547                           s->current_sink_latency +
2548                           pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec));
2549     pa_tagstruct_put_usec(reply, 0);
2550     pa_tagstruct_put_boolean(reply,
2551                              s->playing_for > 0 &&
2552                              pa_sink_get_state(s->sink_input->sink) == PA_SINK_RUNNING &&
2553                              pa_sink_input_get_state(s->sink_input) == PA_SINK_INPUT_RUNNING);
2554     pa_tagstruct_put_timeval(reply, &tv);
2555     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2556     pa_tagstruct_puts64(reply, s->write_index);
2557     pa_tagstruct_puts64(reply, s->read_index);
2558
2559     if (c->version >= 13) {
2560         pa_tagstruct_putu64(reply, s->underrun_for);
2561         pa_tagstruct_putu64(reply, s->playing_for);
2562     }
2563
2564     pa_pstream_send_tagstruct(c->pstream, reply);
2565 }
2566
2567 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2568     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2569     pa_tagstruct *reply;
2570     record_stream *s;
2571     struct timeval tv, now;
2572     uint32_t idx;
2573
2574     pa_native_connection_assert_ref(c);
2575     pa_assert(t);
2576
2577     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2578         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2579         !pa_tagstruct_eof(t)) {
2580         protocol_error(c);
2581         return;
2582     }
2583
2584     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2585     s = pa_idxset_get_by_index(c->record_streams, idx);
2586     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2587
2588     /* Get an atomic snapshot of all timing parameters */
2589     pa_assert_se(pa_asyncmsgq_send(s->source_output->source->asyncmsgq, PA_MSGOBJECT(s->source_output), SOURCE_OUTPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0);
2590
2591     reply = reply_new(tag);
2592     pa_tagstruct_put_usec(reply, s->current_monitor_latency);
2593     pa_tagstruct_put_usec(reply,
2594                           s->current_source_latency +
2595                           pa_bytes_to_usec(s->on_the_fly_snapshot, &s->source_output->sample_spec));
2596     pa_tagstruct_put_boolean(reply,
2597                              pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING &&
2598                              pa_source_output_get_state(s->source_output) == PA_SOURCE_OUTPUT_RUNNING);
2599     pa_tagstruct_put_timeval(reply, &tv);
2600     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2601     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2602     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2603     pa_pstream_send_tagstruct(c->pstream, reply);
2604 }
2605
2606 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2607     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2608     upload_stream *s;
2609     uint32_t length;
2610     const char *name = NULL;
2611     pa_sample_spec ss;
2612     pa_channel_map map;
2613     pa_tagstruct *reply;
2614     pa_proplist *p;
2615
2616     pa_native_connection_assert_ref(c);
2617     pa_assert(t);
2618
2619     if (pa_tagstruct_gets(t, &name) < 0 ||
2620         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2621         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2622         pa_tagstruct_getu32(t, &length) < 0) {
2623         protocol_error(c);
2624         return;
2625     }
2626
2627     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2628     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2629     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2630     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2631     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2632     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2633
2634     p = pa_proplist_new();
2635
2636     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2637         !pa_tagstruct_eof(t)) {
2638
2639         protocol_error(c);
2640         pa_proplist_free(p);
2641         return;
2642     }
2643
2644     if (c->version < 13)
2645         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2646     else if (!name)
2647         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2648             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2649
2650     if (!name || !pa_namereg_is_valid_name(name)) {
2651         pa_proplist_free(p);
2652         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2653     }
2654
2655     s = upload_stream_new(c, &ss, &map, name, length, p);
2656     pa_proplist_free(p);
2657
2658     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2659
2660     reply = reply_new(tag);
2661     pa_tagstruct_putu32(reply, s->index);
2662     pa_tagstruct_putu32(reply, length);
2663     pa_pstream_send_tagstruct(c->pstream, reply);
2664 }
2665
2666 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2667     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2668     uint32_t channel;
2669     upload_stream *s;
2670     uint32_t idx;
2671
2672     pa_native_connection_assert_ref(c);
2673     pa_assert(t);
2674
2675     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2676         !pa_tagstruct_eof(t)) {
2677         protocol_error(c);
2678         return;
2679     }
2680
2681     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2682
2683     s = pa_idxset_get_by_index(c->output_streams, channel);
2684     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2685     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2686
2687     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2688         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2689     else
2690         pa_pstream_send_simple_ack(c->pstream, tag);
2691
2692     upload_stream_unlink(s);
2693 }
2694
2695 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2696     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2697     uint32_t sink_index;
2698     pa_volume_t volume;
2699     pa_sink *sink;
2700     const char *name, *sink_name;
2701     uint32_t idx;
2702     pa_proplist *p;
2703     pa_tagstruct *reply;
2704
2705     pa_native_connection_assert_ref(c);
2706     pa_assert(t);
2707
2708     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2709
2710     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2711         pa_tagstruct_gets(t, &sink_name) < 0 ||
2712         pa_tagstruct_getu32(t, &volume) < 0 ||
2713         pa_tagstruct_gets(t, &name) < 0) {
2714         protocol_error(c);
2715         return;
2716     }
2717
2718     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2719     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2720     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2721     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2722
2723     if (sink_index != PA_INVALID_INDEX)
2724         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2725     else
2726         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2727
2728     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2729
2730     p = pa_proplist_new();
2731
2732     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2733         !pa_tagstruct_eof(t)) {
2734         protocol_error(c);
2735         pa_proplist_free(p);
2736         return;
2737     }
2738
2739     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2740
2741     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2742         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2743         pa_proplist_free(p);
2744         return;
2745     }
2746
2747     pa_proplist_free(p);
2748
2749     reply = reply_new(tag);
2750
2751     if (c->version >= 13)
2752         pa_tagstruct_putu32(reply, idx);
2753
2754     pa_pstream_send_tagstruct(c->pstream, reply);
2755 }
2756
2757 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2758     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2759     const char *name;
2760
2761     pa_native_connection_assert_ref(c);
2762     pa_assert(t);
2763
2764     if (pa_tagstruct_gets(t, &name) < 0 ||
2765         !pa_tagstruct_eof(t)) {
2766         protocol_error(c);
2767         return;
2768     }
2769
2770     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2771     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2772
2773     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2774         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2775         return;
2776     }
2777
2778     pa_pstream_send_simple_ack(c->pstream, tag);
2779 }
2780
2781 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2782     pa_assert(c);
2783     pa_assert(fixed);
2784     pa_assert(original);
2785
2786     *fixed = *original;
2787
2788     if (c->version < 12) {
2789         /* Before protocol version 12 we didn't support S32 samples,
2790          * so we need to lie about this to the client */
2791
2792         if (fixed->format == PA_SAMPLE_S32LE)
2793             fixed->format = PA_SAMPLE_FLOAT32LE;
2794         if (fixed->format == PA_SAMPLE_S32BE)
2795             fixed->format = PA_SAMPLE_FLOAT32BE;
2796     }
2797
2798     if (c->version < 15) {
2799         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2800             fixed->format = PA_SAMPLE_FLOAT32LE;
2801         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2802             fixed->format = PA_SAMPLE_FLOAT32BE;
2803     }
2804 }
2805
2806 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2807     pa_sample_spec fixed_ss;
2808
2809     pa_assert(t);
2810     pa_sink_assert_ref(sink);
2811
2812     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2813
2814     pa_tagstruct_put(
2815         t,
2816         PA_TAG_U32, sink->index,
2817         PA_TAG_STRING, sink->name,
2818         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2819         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2820         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2821         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2822         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE, FALSE),
2823         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2824         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2825         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2826         PA_TAG_USEC, pa_sink_get_latency(sink),
2827         PA_TAG_STRING, sink->driver,
2828         PA_TAG_U32, sink->flags,
2829         PA_TAG_INVALID);
2830
2831     if (c->version >= 13) {
2832         pa_tagstruct_put_proplist(t, sink->proplist);
2833         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2834     }
2835
2836     if (c->version >= 15) {
2837         pa_tagstruct_put_volume(t, sink->base_volume);
2838         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2839             pa_log_error("Internal sink state is invalid.");
2840         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2841         pa_tagstruct_putu32(t, sink->n_volume_steps);
2842         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2843     }
2844 }
2845
2846 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2847     pa_sample_spec fixed_ss;
2848
2849     pa_assert(t);
2850     pa_source_assert_ref(source);
2851
2852     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2853
2854     pa_tagstruct_put(
2855         t,
2856         PA_TAG_U32, source->index,
2857         PA_TAG_STRING, source->name,
2858         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2859         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2860         PA_TAG_CHANNEL_MAP, &source->channel_map,
2861         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2862         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2863         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2864         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2865         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2866         PA_TAG_USEC, pa_source_get_latency(source),
2867         PA_TAG_STRING, source->driver,
2868         PA_TAG_U32, source->flags,
2869         PA_TAG_INVALID);
2870
2871     if (c->version >= 13) {
2872         pa_tagstruct_put_proplist(t, source->proplist);
2873         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2874     }
2875
2876     if (c->version >= 15) {
2877         pa_tagstruct_put_volume(t, source->base_volume);
2878         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2879             pa_log_error("Internal source state is invalid.");
2880         pa_tagstruct_putu32(t, pa_source_get_state(source));
2881         pa_tagstruct_putu32(t, source->n_volume_steps);
2882         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2883     }
2884 }
2885
2886 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2887     pa_assert(t);
2888     pa_assert(client);
2889
2890     pa_tagstruct_putu32(t, client->index);
2891     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2892     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2893     pa_tagstruct_puts(t, client->driver);
2894
2895     if (c->version >= 13)
2896         pa_tagstruct_put_proplist(t, client->proplist);
2897 }
2898
2899 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2900     void *state = NULL;
2901     pa_card_profile *p;
2902
2903     pa_assert(t);
2904     pa_assert(card);
2905
2906     pa_tagstruct_putu32(t, card->index);
2907     pa_tagstruct_puts(t, card->name);
2908     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2909     pa_tagstruct_puts(t, card->driver);
2910
2911     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2912
2913     if (card->profiles) {
2914         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2915             pa_tagstruct_puts(t, p->name);
2916             pa_tagstruct_puts(t, p->description);
2917             pa_tagstruct_putu32(t, p->n_sinks);
2918             pa_tagstruct_putu32(t, p->n_sources);
2919             pa_tagstruct_putu32(t, p->priority);
2920         }
2921     }
2922
2923     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2924     pa_tagstruct_put_proplist(t, card->proplist);
2925 }
2926
2927 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2928     pa_assert(t);
2929     pa_assert(module);
2930
2931     pa_tagstruct_putu32(t, module->index);
2932     pa_tagstruct_puts(t, module->name);
2933     pa_tagstruct_puts(t, module->argument);
2934     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2935
2936     if (c->version < 15)
2937         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2938
2939     if (c->version >= 15)
2940         pa_tagstruct_put_proplist(t, module->proplist);
2941 }
2942
2943 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2944     pa_sample_spec fixed_ss;
2945     pa_usec_t sink_latency;
2946     pa_cvolume v;
2947
2948     pa_assert(t);
2949     pa_sink_input_assert_ref(s);
2950
2951     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2952
2953     pa_tagstruct_putu32(t, s->index);
2954     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2955     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2956     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2957     pa_tagstruct_putu32(t, s->sink->index);
2958     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2959     pa_tagstruct_put_channel_map(t, &s->channel_map);
2960     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s, &v, TRUE));
2961     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
2962     pa_tagstruct_put_usec(t, sink_latency);
2963     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
2964     pa_tagstruct_puts(t, s->driver);
2965     if (c->version >= 11)
2966         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
2967     if (c->version >= 13)
2968         pa_tagstruct_put_proplist(t, s->proplist);
2969 }
2970
2971 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
2972     pa_sample_spec fixed_ss;
2973     pa_usec_t source_latency;
2974
2975     pa_assert(t);
2976     pa_source_output_assert_ref(s);
2977
2978     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2979
2980     pa_tagstruct_putu32(t, s->index);
2981     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2982     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2983     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2984     pa_tagstruct_putu32(t, s->source->index);
2985     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2986     pa_tagstruct_put_channel_map(t, &s->channel_map);
2987     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
2988     pa_tagstruct_put_usec(t, source_latency);
2989     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
2990     pa_tagstruct_puts(t, s->driver);
2991
2992     if (c->version >= 13)
2993         pa_tagstruct_put_proplist(t, s->proplist);
2994 }
2995
2996 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
2997     pa_sample_spec fixed_ss;
2998     pa_cvolume v;
2999
3000     pa_assert(t);
3001     pa_assert(e);
3002
3003     if (e->memchunk.memblock)
3004         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
3005     else
3006         memset(&fixed_ss, 0, sizeof(fixed_ss));
3007
3008     pa_tagstruct_putu32(t, e->index);
3009     pa_tagstruct_puts(t, e->name);
3010
3011     if (e->volume_is_set)
3012         v = e->volume;
3013     else
3014         pa_cvolume_init(&v);
3015
3016     pa_tagstruct_put_cvolume(t, &v);
3017     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
3018     pa_tagstruct_put_sample_spec(t, &fixed_ss);
3019     pa_tagstruct_put_channel_map(t, &e->channel_map);
3020     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
3021     pa_tagstruct_put_boolean(t, e->lazy);
3022     pa_tagstruct_puts(t, e->filename);
3023
3024     if (c->version >= 13)
3025         pa_tagstruct_put_proplist(t, e->proplist);
3026 }
3027
3028 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3029     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3030     uint32_t idx;
3031     pa_sink *sink = NULL;
3032     pa_source *source = NULL;
3033     pa_client *client = NULL;
3034     pa_card *card = NULL;
3035     pa_module *module = NULL;
3036     pa_sink_input *si = NULL;
3037     pa_source_output *so = NULL;
3038     pa_scache_entry *sce = NULL;
3039     const char *name = NULL;
3040     pa_tagstruct *reply;
3041
3042     pa_native_connection_assert_ref(c);
3043     pa_assert(t);
3044
3045     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3046         (command != PA_COMMAND_GET_CLIENT_INFO &&
3047          command != PA_COMMAND_GET_MODULE_INFO &&
3048          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
3049          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
3050          pa_tagstruct_gets(t, &name) < 0) ||
3051         !pa_tagstruct_eof(t)) {
3052         protocol_error(c);
3053         return;
3054     }
3055
3056     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3057     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3058     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3059     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3060     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3061
3062     if (command == PA_COMMAND_GET_SINK_INFO) {
3063         if (idx != PA_INVALID_INDEX)
3064             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3065         else
3066             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3067     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
3068         if (idx != PA_INVALID_INDEX)
3069             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3070         else
3071             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3072     } else if (command == PA_COMMAND_GET_CARD_INFO) {
3073         if (idx != PA_INVALID_INDEX)
3074             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
3075         else
3076             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
3077     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
3078         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3079     else if (command == PA_COMMAND_GET_MODULE_INFO)
3080         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3081     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
3082         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3083     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
3084         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3085     else {
3086         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
3087         if (idx != PA_INVALID_INDEX)
3088             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3089         else
3090             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3091     }
3092
3093     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3094         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3095         return;
3096     }
3097
3098     reply = reply_new(tag);
3099     if (sink)
3100         sink_fill_tagstruct(c, reply, sink);
3101     else if (source)
3102         source_fill_tagstruct(c, reply, source);
3103     else if (client)
3104         client_fill_tagstruct(c, reply, client);
3105     else if (card)
3106         card_fill_tagstruct(c, reply, card);
3107     else if (module)
3108         module_fill_tagstruct(c, reply, module);
3109     else if (si)
3110         sink_input_fill_tagstruct(c, reply, si);
3111     else if (so)
3112         source_output_fill_tagstruct(c, reply, so);
3113     else
3114         scache_fill_tagstruct(c, reply, sce);
3115     pa_pstream_send_tagstruct(c->pstream, reply);
3116 }
3117
3118 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3119     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3120     pa_idxset *i;
3121     uint32_t idx;
3122     void *p;
3123     pa_tagstruct *reply;
3124
3125     pa_native_connection_assert_ref(c);
3126     pa_assert(t);
3127
3128     if (!pa_tagstruct_eof(t)) {
3129         protocol_error(c);
3130         return;
3131     }
3132
3133     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3134
3135     reply = reply_new(tag);
3136
3137     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3138         i = c->protocol->core->sinks;
3139     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3140         i = c->protocol->core->sources;
3141     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3142         i = c->protocol->core->clients;
3143     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3144         i = c->protocol->core->cards;
3145     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3146         i = c->protocol->core->modules;
3147     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3148         i = c->protocol->core->sink_inputs;
3149     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3150         i = c->protocol->core->source_outputs;
3151     else {
3152         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3153         i = c->protocol->core->scache;
3154     }
3155
3156     if (i) {
3157         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3158             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3159                 sink_fill_tagstruct(c, reply, p);
3160             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3161                 source_fill_tagstruct(c, reply, p);
3162             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3163                 client_fill_tagstruct(c, reply, p);
3164             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3165                 card_fill_tagstruct(c, reply, p);
3166             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3167                 module_fill_tagstruct(c, reply, p);
3168             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3169                 sink_input_fill_tagstruct(c, reply, p);
3170             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3171                 source_output_fill_tagstruct(c, reply, p);
3172             else {
3173                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3174                 scache_fill_tagstruct(c, reply, p);
3175             }
3176         }
3177     }
3178
3179     pa_pstream_send_tagstruct(c->pstream, reply);
3180 }
3181
3182 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3183     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3184     pa_tagstruct *reply;
3185     pa_sink *def_sink;
3186     pa_source *def_source;
3187     pa_sample_spec fixed_ss;
3188     char *h, *u;
3189
3190     pa_native_connection_assert_ref(c);
3191     pa_assert(t);
3192
3193     if (!pa_tagstruct_eof(t)) {
3194         protocol_error(c);
3195         return;
3196     }
3197
3198     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3199
3200     reply = reply_new(tag);
3201     pa_tagstruct_puts(reply, PACKAGE_NAME);
3202     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3203
3204     u = pa_get_user_name_malloc();
3205     pa_tagstruct_puts(reply, u);
3206     pa_xfree(u);
3207
3208     h = pa_get_host_name_malloc();
3209     pa_tagstruct_puts(reply, h);
3210     pa_xfree(h);
3211
3212     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3213     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3214
3215     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3216     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3217     def_source = pa_namereg_get_default_source(c->protocol->core);
3218     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3219
3220     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3221
3222     if (c->version >= 15)
3223         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3224
3225     pa_pstream_send_tagstruct(c->pstream, reply);
3226 }
3227
3228 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3229     pa_tagstruct *t;
3230     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3231
3232     pa_native_connection_assert_ref(c);
3233
3234     t = pa_tagstruct_new(NULL, 0);
3235     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3236     pa_tagstruct_putu32(t, (uint32_t) -1);
3237     pa_tagstruct_putu32(t, e);
3238     pa_tagstruct_putu32(t, idx);
3239     pa_pstream_send_tagstruct(c->pstream, t);
3240 }
3241
3242 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3243     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3244     pa_subscription_mask_t m;
3245
3246     pa_native_connection_assert_ref(c);
3247     pa_assert(t);
3248
3249     if (pa_tagstruct_getu32(t, &m) < 0 ||
3250         !pa_tagstruct_eof(t)) {
3251         protocol_error(c);
3252         return;
3253     }
3254
3255     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3256     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3257
3258     if (c->subscription)
3259         pa_subscription_free(c->subscription);
3260
3261     if (m != 0) {
3262         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3263         pa_assert(c->subscription);
3264     } else
3265         c->subscription = NULL;
3266
3267     pa_pstream_send_simple_ack(c->pstream, tag);
3268 }
3269
3270 static void command_set_volume(
3271         pa_pdispatch *pd,
3272         uint32_t command,
3273         uint32_t tag,
3274         pa_tagstruct *t,
3275         void *userdata) {
3276
3277     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3278     uint32_t idx;
3279     pa_cvolume volume;
3280     pa_sink *sink = NULL;
3281     pa_source *source = NULL;
3282     pa_sink_input *si = NULL;
3283     const char *name = NULL;
3284
3285     pa_native_connection_assert_ref(c);
3286     pa_assert(t);
3287
3288     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3289         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3290         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3291         pa_tagstruct_get_cvolume(t, &volume) ||
3292         !pa_tagstruct_eof(t)) {
3293         protocol_error(c);
3294         return;
3295     }
3296
3297     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3298     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3299     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3300     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3301     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3302     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3303
3304     switch (command) {
3305
3306         case PA_COMMAND_SET_SINK_VOLUME:
3307             if (idx != PA_INVALID_INDEX)
3308                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3309             else
3310                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3311             break;
3312
3313         case PA_COMMAND_SET_SOURCE_VOLUME:
3314             if (idx != PA_INVALID_INDEX)
3315                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3316             else
3317                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3318             break;
3319
3320         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3321             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3322             break;
3323
3324         default:
3325             pa_assert_not_reached();
3326     }
3327
3328     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3329
3330     if (sink)
3331         pa_sink_set_volume(sink, &volume, TRUE, TRUE, TRUE);
3332     else if (source)
3333         pa_source_set_volume(source, &volume);
3334     else if (si)
3335         pa_sink_input_set_volume(si, &volume, TRUE, TRUE);
3336
3337     pa_pstream_send_simple_ack(c->pstream, tag);
3338 }
3339
3340 static void command_set_mute(
3341         pa_pdispatch *pd,
3342         uint32_t command,
3343         uint32_t tag,
3344         pa_tagstruct *t,
3345         void *userdata) {
3346
3347     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3348     uint32_t idx;
3349     pa_bool_t mute;
3350     pa_sink *sink = NULL;
3351     pa_source *source = NULL;
3352     pa_sink_input *si = NULL;
3353     const char *name = NULL;
3354
3355     pa_native_connection_assert_ref(c);
3356     pa_assert(t);
3357
3358     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3359         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3360         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3361         pa_tagstruct_get_boolean(t, &mute) ||
3362         !pa_tagstruct_eof(t)) {
3363         protocol_error(c);
3364         return;
3365     }
3366
3367     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3368     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3369     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3370     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3371     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3372
3373     switch (command) {
3374
3375         case PA_COMMAND_SET_SINK_MUTE:
3376
3377             if (idx != PA_INVALID_INDEX)
3378                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3379             else
3380                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3381
3382             break;
3383
3384         case PA_COMMAND_SET_SOURCE_MUTE:
3385             if (idx != PA_INVALID_INDEX)
3386                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3387             else
3388                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3389
3390             break;
3391
3392         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3393             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3394             break;
3395
3396         default:
3397             pa_assert_not_reached();
3398     }
3399
3400     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3401
3402     if (sink)
3403         pa_sink_set_mute(sink, mute);
3404     else if (source)
3405         pa_source_set_mute(source, mute);
3406     else if (si)
3407         pa_sink_input_set_mute(si, mute, TRUE);
3408
3409     pa_pstream_send_simple_ack(c->pstream, tag);
3410 }
3411
3412 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3413     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3414     uint32_t idx;
3415     pa_bool_t b;
3416     playback_stream *s;
3417
3418     pa_native_connection_assert_ref(c);
3419     pa_assert(t);
3420
3421     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3422         pa_tagstruct_get_boolean(t, &b) < 0 ||
3423         !pa_tagstruct_eof(t)) {
3424         protocol_error(c);
3425         return;
3426     }
3427
3428     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3429     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3430     s = pa_idxset_get_by_index(c->output_streams, idx);
3431     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3432     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3433
3434     pa_sink_input_cork(s->sink_input, b);
3435
3436     if (b)
3437         s->is_underrun = TRUE;
3438
3439     pa_pstream_send_simple_ack(c->pstream, tag);
3440 }
3441
3442 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3443     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3444     uint32_t idx;
3445     playback_stream *s;
3446
3447     pa_native_connection_assert_ref(c);
3448     pa_assert(t);
3449
3450     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3451         !pa_tagstruct_eof(t)) {
3452         protocol_error(c);
3453         return;
3454     }
3455
3456     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3457     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3458     s = pa_idxset_get_by_index(c->output_streams, idx);
3459     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3460     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3461
3462     switch (command) {
3463         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3464             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3465             break;
3466
3467         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3468             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3469             break;
3470
3471         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3472             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3473             break;
3474
3475         default:
3476             pa_assert_not_reached();
3477     }
3478
3479     pa_pstream_send_simple_ack(c->pstream, tag);
3480 }
3481
3482 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3483     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3484     uint32_t idx;
3485     record_stream *s;
3486     pa_bool_t b;
3487
3488     pa_native_connection_assert_ref(c);
3489     pa_assert(t);
3490
3491     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3492         pa_tagstruct_get_boolean(t, &b) < 0 ||
3493         !pa_tagstruct_eof(t)) {
3494         protocol_error(c);
3495         return;
3496     }
3497
3498     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3499     s = pa_idxset_get_by_index(c->record_streams, idx);
3500     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3501
3502     pa_source_output_cork(s->source_output, b);
3503     pa_memblockq_prebuf_force(s->memblockq);
3504     pa_pstream_send_simple_ack(c->pstream, tag);
3505 }
3506
3507 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3508     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3509     uint32_t idx;
3510     record_stream *s;
3511
3512     pa_native_connection_assert_ref(c);
3513     pa_assert(t);
3514
3515     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3516         !pa_tagstruct_eof(t)) {
3517         protocol_error(c);
3518         return;
3519     }
3520
3521     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3522     s = pa_idxset_get_by_index(c->record_streams, idx);
3523     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3524
3525     pa_memblockq_flush_read(s->memblockq);
3526     pa_pstream_send_simple_ack(c->pstream, tag);
3527 }
3528
3529 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3530     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3531     uint32_t idx;
3532     pa_buffer_attr a;
3533     pa_tagstruct *reply;
3534
3535     pa_native_connection_assert_ref(c);
3536     pa_assert(t);
3537
3538     memset(&a, 0, sizeof(a));
3539
3540     if (pa_tagstruct_getu32(t, &idx) < 0) {
3541         protocol_error(c);
3542         return;
3543     }
3544
3545     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3546
3547     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3548         playback_stream *s;
3549         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3550
3551         s = pa_idxset_get_by_index(c->output_streams, idx);
3552         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3553         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3554
3555         if (pa_tagstruct_get(
3556                     t,
3557                     PA_TAG_U32, &a.maxlength,
3558                     PA_TAG_U32, &a.tlength,
3559                     PA_TAG_U32, &a.prebuf,
3560                     PA_TAG_U32, &a.minreq,
3561                     PA_TAG_INVALID) < 0 ||
3562             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3563             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3564             !pa_tagstruct_eof(t)) {
3565             protocol_error(c);
3566             return;
3567         }
3568
3569         s->adjust_latency = adjust_latency;
3570         s->early_requests = early_requests;
3571         s->buffer_attr = a;
3572
3573         fix_playback_buffer_attr(s);
3574         pa_assert_se(pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_BUFFER_ATTR, NULL, 0, NULL) == 0);
3575
3576         reply = reply_new(tag);
3577         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3578         pa_tagstruct_putu32(reply, s->buffer_attr.tlength);
3579         pa_tagstruct_putu32(reply, s->buffer_attr.prebuf);
3580         pa_tagstruct_putu32(reply, s->buffer_attr.minreq);
3581
3582         if (c->version >= 13)
3583             pa_tagstruct_put_usec(reply, s->configured_sink_latency);
3584
3585     } else {
3586         record_stream *s;
3587         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3588         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3589
3590         s = pa_idxset_get_by_index(c->record_streams, idx);
3591         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3592
3593         if (pa_tagstruct_get(
3594                     t,
3595                     PA_TAG_U32, &a.maxlength,
3596                     PA_TAG_U32, &a.fragsize,
3597                     PA_TAG_INVALID) < 0 ||
3598             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3599             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3600             !pa_tagstruct_eof(t)) {
3601             protocol_error(c);
3602             return;
3603         }
3604
3605         s->adjust_latency = adjust_latency;
3606         s->early_requests = early_requests;
3607         s->buffer_attr = a;
3608
3609         fix_record_buffer_attr_pre(s);
3610         pa_memblockq_set_maxlength(s->memblockq, s->buffer_attr.maxlength);
3611         pa_memblockq_get_attr(s->memblockq, &s->buffer_attr);
3612         fix_record_buffer_attr_post(s);
3613
3614         reply = reply_new(tag);
3615         pa_tagstruct_putu32(reply, s->buffer_attr.maxlength);
3616         pa_tagstruct_putu32(reply, s->buffer_attr.fragsize);
3617
3618         if (c->version >= 13)
3619             pa_tagstruct_put_usec(reply, s->configured_source_latency);
3620     }
3621
3622     pa_pstream_send_tagstruct(c->pstream, reply);
3623 }
3624
3625 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3626     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3627     uint32_t idx;
3628     uint32_t rate;
3629
3630     pa_native_connection_assert_ref(c);
3631     pa_assert(t);
3632
3633     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3634         pa_tagstruct_getu32(t, &rate) < 0 ||
3635         !pa_tagstruct_eof(t)) {
3636         protocol_error(c);
3637         return;
3638     }
3639
3640     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3641     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3642
3643     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3644         playback_stream *s;
3645
3646         s = pa_idxset_get_by_index(c->output_streams, idx);
3647         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3648         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3649
3650         pa_sink_input_set_rate(s->sink_input, rate);
3651
3652     } else {
3653         record_stream *s;
3654         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3655
3656         s = pa_idxset_get_by_index(c->record_streams, idx);
3657         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3658
3659         pa_source_output_set_rate(s->source_output, rate);
3660     }
3661
3662     pa_pstream_send_simple_ack(c->pstream, tag);
3663 }
3664
3665 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3666     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3667     uint32_t idx;
3668     uint32_t mode;
3669     pa_proplist *p;
3670
3671     pa_native_connection_assert_ref(c);
3672     pa_assert(t);
3673
3674     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3675
3676     p = pa_proplist_new();
3677
3678     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3679
3680         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3681             pa_tagstruct_get_proplist(t, p) < 0 ||
3682             !pa_tagstruct_eof(t)) {
3683             protocol_error(c);
3684             pa_proplist_free(p);
3685             return;
3686         }
3687
3688     } else {
3689
3690         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3691             pa_tagstruct_getu32(t, &mode) < 0 ||
3692             pa_tagstruct_get_proplist(t, p) < 0 ||
3693             !pa_tagstruct_eof(t)) {
3694             protocol_error(c);
3695             pa_proplist_free(p);
3696             return;
3697         }
3698     }
3699
3700     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3701         pa_proplist_free(p);
3702         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3703     }
3704
3705     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3706         playback_stream *s;
3707
3708         s = pa_idxset_get_by_index(c->output_streams, idx);
3709         if (!s || !playback_stream_isinstance(s)) {
3710             pa_proplist_free(p);
3711             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3712         }
3713         pa_sink_input_update_proplist(s->sink_input, mode, p);
3714
3715     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3716         record_stream *s;
3717
3718         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3719             pa_proplist_free(p);
3720             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3721         }
3722         pa_source_output_update_proplist(s->source_output, mode, p);
3723
3724     } else {
3725         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3726
3727         pa_client_update_proplist(c->client, mode, p);
3728     }
3729
3730     pa_pstream_send_simple_ack(c->pstream, tag);
3731     pa_proplist_free(p);
3732 }
3733
3734 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3735     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3736     uint32_t idx;
3737     unsigned changed = 0;
3738     pa_proplist *p;
3739     pa_strlist *l = NULL;
3740
3741     pa_native_connection_assert_ref(c);
3742     pa_assert(t);
3743
3744     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3745
3746     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3747
3748         if (pa_tagstruct_getu32(t, &idx) < 0) {
3749             protocol_error(c);
3750             return;
3751         }
3752     }
3753
3754     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3755         playback_stream *s;
3756
3757         s = pa_idxset_get_by_index(c->output_streams, idx);
3758         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3759         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3760
3761         p = s->sink_input->proplist;
3762
3763     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3764         record_stream *s;
3765
3766         s = pa_idxset_get_by_index(c->record_streams, idx);
3767         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3768
3769         p = s->source_output->proplist;
3770     } else {
3771         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3772
3773         p = c->client->proplist;
3774     }
3775
3776     for (;;) {
3777         const char *k;
3778
3779         if (pa_tagstruct_gets(t, &k) < 0) {
3780             protocol_error(c);
3781             pa_strlist_free(l);
3782             return;
3783         }
3784
3785         if (!k)
3786             break;
3787
3788         l = pa_strlist_prepend(l, k);
3789     }
3790
3791     if (!pa_tagstruct_eof(t)) {
3792         protocol_error(c);
3793         pa_strlist_free(l);
3794         return;
3795     }
3796
3797     for (;;) {
3798         char *z;
3799
3800         l = pa_strlist_pop(l, &z);
3801
3802         if (!z)
3803             break;
3804
3805         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3806         pa_xfree(z);
3807     }
3808
3809     pa_pstream_send_simple_ack(c->pstream, tag);
3810
3811     if (changed) {
3812         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3813             playback_stream *s;
3814
3815             s = pa_idxset_get_by_index(c->output_streams, idx);
3816             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3817
3818         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3819             record_stream *s;
3820
3821             s = pa_idxset_get_by_index(c->record_streams, idx);
3822             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3823
3824         } else {
3825             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3826             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3827         }
3828     }
3829 }
3830
3831 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3832     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3833     const char *s;
3834
3835     pa_native_connection_assert_ref(c);
3836     pa_assert(t);
3837
3838     if (pa_tagstruct_gets(t, &s) < 0 ||
3839         !pa_tagstruct_eof(t)) {
3840         protocol_error(c);
3841         return;
3842     }
3843
3844     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3845     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3846
3847     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3848         pa_source *source;
3849
3850         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3851         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3852
3853         pa_namereg_set_default_source(c->protocol->core, source);
3854     } else {
3855         pa_sink *sink;
3856         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3857
3858         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3859         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3860
3861         pa_namereg_set_default_sink(c->protocol->core, sink);
3862     }
3863
3864     pa_pstream_send_simple_ack(c->pstream, tag);
3865 }
3866
3867 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3868     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3869     uint32_t idx;
3870     const char *name;
3871
3872     pa_native_connection_assert_ref(c);
3873     pa_assert(t);
3874
3875     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3876         pa_tagstruct_gets(t, &name) < 0 ||
3877         !pa_tagstruct_eof(t)) {
3878         protocol_error(c);
3879         return;
3880     }
3881
3882     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3883     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3884
3885     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3886         playback_stream *s;
3887
3888         s = pa_idxset_get_by_index(c->output_streams, idx);
3889         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3890         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3891
3892         pa_sink_input_set_name(s->sink_input, name);
3893
3894     } else {
3895         record_stream *s;
3896         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3897
3898         s = pa_idxset_get_by_index(c->record_streams, idx);
3899         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3900
3901         pa_source_output_set_name(s->source_output, name);
3902     }
3903
3904     pa_pstream_send_simple_ack(c->pstream, tag);
3905 }
3906
3907 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3908     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3909     uint32_t idx;
3910
3911     pa_native_connection_assert_ref(c);
3912     pa_assert(t);
3913
3914     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3915         !pa_tagstruct_eof(t)) {
3916         protocol_error(c);
3917         return;
3918     }
3919
3920     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3921
3922     if (command == PA_COMMAND_KILL_CLIENT) {
3923         pa_client *client;
3924
3925         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3926         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3927
3928         pa_native_connection_ref(c);
3929         pa_client_kill(client);
3930
3931     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3932         pa_sink_input *s;
3933
3934         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3935         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3936
3937         pa_native_connection_ref(c);
3938         pa_sink_input_kill(s);
3939     } else {
3940         pa_source_output *s;
3941
3942         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
3943
3944         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3945         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3946
3947         pa_native_connection_ref(c);
3948         pa_source_output_kill(s);
3949     }
3950
3951     pa_pstream_send_simple_ack(c->pstream, tag);
3952     pa_native_connection_unref(c);
3953 }
3954
3955 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3956     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3957     pa_module *m;
3958     const char *name, *argument;
3959     pa_tagstruct *reply;
3960
3961     pa_native_connection_assert_ref(c);
3962     pa_assert(t);
3963
3964     if (pa_tagstruct_gets(t, &name) < 0 ||
3965         pa_tagstruct_gets(t, &argument) < 0 ||
3966         !pa_tagstruct_eof(t)) {
3967         protocol_error(c);
3968         return;
3969     }
3970
3971     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3972     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
3973     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
3974
3975     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
3976         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
3977         return;
3978     }
3979
3980     reply = reply_new(tag);
3981     pa_tagstruct_putu32(reply, m->index);
3982     pa_pstream_send_tagstruct(c->pstream, reply);
3983 }
3984
3985 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3986     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3987     uint32_t idx;
3988     pa_module *m;
3989
3990     pa_native_connection_assert_ref(c);
3991     pa_assert(t);
3992
3993     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3994         !pa_tagstruct_eof(t)) {
3995         protocol_error(c);
3996         return;
3997     }
3998
3999     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4000     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4001     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
4002
4003     pa_module_unload_request(m, FALSE);
4004     pa_pstream_send_simple_ack(c->pstream, tag);
4005 }
4006
4007 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4008     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4009     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
4010     const char *name_device = NULL;
4011
4012     pa_native_connection_assert_ref(c);
4013     pa_assert(t);
4014
4015     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4016         pa_tagstruct_getu32(t, &idx_device) < 0 ||
4017         pa_tagstruct_gets(t, &name_device) < 0 ||
4018         !pa_tagstruct_eof(t)) {
4019         protocol_error(c);
4020         return;
4021     }
4022
4023     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4024     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4025
4026     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
4027     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
4028     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
4029     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4030
4031     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
4032         pa_sink_input *si = NULL;
4033         pa_sink *sink = NULL;
4034
4035         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
4036
4037         if (idx_device != PA_INVALID_INDEX)
4038             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
4039         else
4040             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
4041
4042         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
4043
4044         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
4045             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4046             return;
4047         }
4048     } else {
4049         pa_source_output *so = NULL;
4050         pa_source *source;
4051
4052         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
4053
4054         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
4055
4056         if (idx_device != PA_INVALID_INDEX)
4057             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
4058         else
4059             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
4060
4061         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
4062
4063         if (pa_source_output_move_to(so, source, TRUE) < 0) {
4064             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4065             return;
4066         }
4067     }
4068
4069     pa_pstream_send_simple_ack(c->pstream, tag);
4070 }
4071
4072 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4073     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4074     uint32_t idx = PA_INVALID_INDEX;
4075     const char *name = NULL;
4076     pa_bool_t b;
4077
4078     pa_native_connection_assert_ref(c);
4079     pa_assert(t);
4080
4081     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4082         pa_tagstruct_gets(t, &name) < 0 ||
4083         pa_tagstruct_get_boolean(t, &b) < 0 ||
4084         !pa_tagstruct_eof(t)) {
4085         protocol_error(c);
4086         return;
4087     }
4088
4089     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4090     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
4091     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4092     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4093     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4094
4095     if (command == PA_COMMAND_SUSPEND_SINK) {
4096
4097         if (idx == PA_INVALID_INDEX && name && !*name) {
4098
4099             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
4100
4101             if (pa_sink_suspend_all(c->protocol->core, b) < 0) {
4102                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4103                 return;
4104             }
4105         } else {
4106             pa_sink *sink = NULL;
4107
4108             if (idx != PA_INVALID_INDEX)
4109                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4110             else
4111                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4112
4113             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4114
4115             if (pa_sink_suspend(sink, b) < 0) {
4116                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4117                 return;
4118             }
4119         }
4120     } else {
4121
4122         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4123
4124         if (idx == PA_INVALID_INDEX && name && !*name) {
4125
4126             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4127
4128             if (pa_source_suspend_all(c->protocol->core, b) < 0) {
4129                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4130                 return;
4131             }
4132
4133         } else {
4134             pa_source *source;
4135
4136             if (idx != PA_INVALID_INDEX)
4137                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4138             else
4139                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4140
4141             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4142
4143             if (pa_source_suspend(source, b) < 0) {
4144                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4145                 return;
4146             }
4147         }
4148     }
4149
4150     pa_pstream_send_simple_ack(c->pstream, tag);
4151 }
4152
4153 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4154     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4155     uint32_t idx = PA_INVALID_INDEX;
4156     const char *name = NULL;
4157     pa_module *m;
4158     pa_native_protocol_ext_cb_t cb;
4159
4160     pa_native_connection_assert_ref(c);
4161     pa_assert(t);
4162
4163     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4164         pa_tagstruct_gets(t, &name) < 0) {
4165         protocol_error(c);
4166         return;
4167     }
4168
4169     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4170     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4171     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4172     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4173     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4174
4175     if (idx != PA_INVALID_INDEX)
4176         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4177     else {
4178         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4179             if (strcmp(name, m->name) == 0)
4180                 break;
4181     }
4182
4183     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4184     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4185
4186     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4187     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4188
4189     if (cb(c->protocol, m, c, tag, t) < 0)
4190         protocol_error(c);
4191 }
4192
4193 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4194     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4195     uint32_t idx = PA_INVALID_INDEX;
4196     const char *name = NULL, *profile = NULL;
4197     pa_card *card = NULL;
4198
4199     pa_native_connection_assert_ref(c);
4200     pa_assert(t);
4201
4202     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4203         pa_tagstruct_gets(t, &name) < 0 ||
4204         pa_tagstruct_gets(t, &profile) < 0 ||
4205         !pa_tagstruct_eof(t)) {
4206         protocol_error(c);
4207         return;
4208     }
4209
4210     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4211     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4212     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4213     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4214     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4215
4216     if (idx != PA_INVALID_INDEX)
4217         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4218     else
4219         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4220
4221     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4222
4223     if (pa_card_set_profile(card, profile, TRUE) < 0) {
4224         pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4225         return;
4226     }
4227
4228     pa_pstream_send_simple_ack(c->pstream, tag);
4229 }
4230
4231 /*** pstream callbacks ***/
4232
4233 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4234     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4235
4236     pa_assert(p);
4237     pa_assert(packet);
4238     pa_native_connection_assert_ref(c);
4239
4240     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4241         pa_log("invalid packet.");
4242         native_connection_unlink(c);
4243     }
4244 }
4245
4246 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4247     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4248     output_stream *stream;
4249
4250     pa_assert(p);
4251     pa_assert(chunk);
4252     pa_native_connection_assert_ref(c);
4253
4254     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4255         pa_log_debug("Client sent block for invalid stream.");
4256         /* Ignoring */
4257         return;
4258     }
4259
4260 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4261
4262     if (playback_stream_isinstance(stream)) {
4263         playback_stream *ps = PLAYBACK_STREAM(stream);
4264
4265         if (chunk->memblock) {
4266             if (seek != PA_SEEK_RELATIVE || offset != 0)
4267                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4268
4269             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4270         } else
4271             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4272
4273     } else {
4274         upload_stream *u = UPLOAD_STREAM(stream);
4275         size_t l;
4276
4277         if (!u->memchunk.memblock) {
4278             if (u->length == chunk->length && chunk->memblock) {
4279                 u->memchunk = *chunk;
4280                 pa_memblock_ref(u->memchunk.memblock);
4281                 u->length = 0;
4282             } else {
4283                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4284                 u->memchunk.index = u->memchunk.length = 0;
4285             }
4286         }
4287
4288         pa_assert(u->memchunk.memblock);
4289
4290         l = u->length;
4291         if (l > chunk->length)
4292             l = chunk->length;
4293
4294         if (l > 0) {
4295             void *dst;
4296             dst = pa_memblock_acquire(u->memchunk.memblock);
4297
4298             if (chunk->memblock) {
4299                 void *src;
4300                 src = pa_memblock_acquire(chunk->memblock);
4301
4302                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4303                        (uint8_t*) src + chunk->index, l);
4304
4305                 pa_memblock_release(chunk->memblock);
4306             } else
4307                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4308
4309             pa_memblock_release(u->memchunk.memblock);
4310
4311             u->memchunk.length += l;
4312             u->length -= l;
4313         }
4314     }
4315 }
4316
4317 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4318     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4319
4320     pa_assert(p);
4321     pa_native_connection_assert_ref(c);
4322
4323     native_connection_unlink(c);
4324     pa_log_info("Connection died.");
4325 }
4326
4327 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4328     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4329
4330     pa_assert(p);
4331     pa_native_connection_assert_ref(c);
4332
4333     native_connection_send_memblock(c);
4334 }
4335
4336 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4337     pa_thread_mq *q;
4338
4339     if (!(q = pa_thread_mq_get()))
4340         pa_pstream_send_revoke(p, block_id);
4341     else
4342         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4343 }
4344
4345 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4346     pa_thread_mq *q;
4347
4348     if (!(q = pa_thread_mq_get()))
4349         pa_pstream_send_release(p, block_id);
4350     else
4351         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4352 }
4353
4354 /*** client callbacks ***/
4355
4356 static void client_kill_cb(pa_client *c) {
4357     pa_assert(c);
4358
4359     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4360     pa_log_info("Connection killed.");
4361 }
4362
4363 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4364     pa_tagstruct *t;
4365     pa_native_connection *c;
4366
4367     pa_assert(client);
4368     c = PA_NATIVE_CONNECTION(client->userdata);
4369     pa_native_connection_assert_ref(c);
4370
4371     if (c->version < 15)
4372       return;
4373
4374     t = pa_tagstruct_new(NULL, 0);
4375     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4376     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4377     pa_tagstruct_puts(t, event);
4378     pa_tagstruct_put_proplist(t, pl);
4379     pa_pstream_send_tagstruct(c->pstream, t);
4380 }
4381
4382 /*** module entry points ***/
4383
4384 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
4385     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4386
4387     pa_assert(m);
4388     pa_assert(tv);
4389     pa_native_connection_assert_ref(c);
4390     pa_assert(c->auth_timeout_event == e);
4391
4392     if (!c->authorized) {
4393         native_connection_unlink(c);
4394         pa_log_info("Connection terminated due to authentication timeout.");
4395     }
4396 }
4397
4398 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4399     pa_native_connection *c;
4400     char pname[128];
4401     pa_client *client;
4402     pa_client_new_data data;
4403
4404     pa_assert(p);
4405     pa_assert(io);
4406     pa_assert(o);
4407
4408     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4409         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4410         pa_iochannel_free(io);
4411         return;
4412     }
4413
4414     pa_client_new_data_init(&data);
4415     data.module = o->module;
4416     data.driver = __FILE__;
4417     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4418     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4419     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4420     client = pa_client_new(p->core, &data);
4421     pa_client_new_data_done(&data);
4422
4423     if (!client)
4424         return;
4425
4426     c = pa_msgobject_new(pa_native_connection);
4427     c->parent.parent.free = native_connection_free;
4428     c->parent.process_msg = native_connection_process_msg;
4429     c->protocol = p;
4430     c->options = pa_native_options_ref(o);
4431     c->authorized = FALSE;
4432
4433     if (o->auth_anonymous) {
4434         pa_log_info("Client authenticated anonymously.");
4435         c->authorized = TRUE;
4436     }
4437
4438     if (!c->authorized &&
4439         o->auth_ip_acl &&
4440         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4441
4442         pa_log_info("Client authenticated by IP ACL.");
4443         c->authorized = TRUE;
4444     }
4445
4446     if (!c->authorized) {
4447         struct timeval tv;
4448         pa_gettimeofday(&tv);
4449         tv.tv_sec += AUTH_TIMEOUT;
4450         c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c);
4451     } else
4452         c->auth_timeout_event = NULL;
4453
4454     c->is_local = pa_iochannel_socket_is_local(io);
4455     c->version = 8;
4456
4457     c->client = client;
4458     c->client->kill = client_kill_cb;
4459     c->client->send_event = client_send_event_cb;
4460     c->client->userdata = c;
4461
4462     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4463     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4464     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4465     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4466     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4467     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4468     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4469
4470     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
4471
4472     c->record_streams = pa_idxset_new(NULL, NULL);
4473     c->output_streams = pa_idxset_new(NULL, NULL);
4474
4475     c->rrobin_index = PA_IDXSET_INVALID;
4476     c->subscription = NULL;
4477
4478     pa_idxset_put(p->connections, c, NULL);
4479
4480 #ifdef HAVE_CREDS
4481     if (pa_iochannel_creds_supported(io))
4482         pa_iochannel_creds_enable(io);
4483 #endif
4484
4485     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4486 }
4487
4488 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4489     pa_native_connection *c;
4490     void *state = NULL;
4491
4492     pa_assert(p);
4493     pa_assert(m);
4494
4495     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4496         if (c->options->module == m)
4497             native_connection_unlink(c);
4498 }
4499
4500 static pa_native_protocol* native_protocol_new(pa_core *c) {
4501     pa_native_protocol *p;
4502     pa_native_hook_t h;
4503
4504     pa_assert(c);
4505
4506     p = pa_xnew(pa_native_protocol, 1);
4507     PA_REFCNT_INIT(p);
4508     p->core = c;
4509     p->connections = pa_idxset_new(NULL, NULL);
4510
4511     p->servers = NULL;
4512
4513     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4514
4515     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4516         pa_hook_init(&p->hooks[h], p);
4517
4518     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4519
4520     return p;
4521 }
4522
4523 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4524     pa_native_protocol *p;
4525
4526     if ((p = pa_shared_get(c, "native-protocol")))
4527         return pa_native_protocol_ref(p);
4528
4529     return native_protocol_new(c);
4530 }
4531
4532 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4533     pa_assert(p);
4534     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4535
4536     PA_REFCNT_INC(p);
4537
4538     return p;
4539 }
4540
4541 void pa_native_protocol_unref(pa_native_protocol *p) {
4542     pa_native_connection *c;
4543     pa_native_hook_t h;
4544
4545     pa_assert(p);
4546     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4547
4548     if (PA_REFCNT_DEC(p) > 0)
4549         return;
4550
4551     while ((c = pa_idxset_first(p->connections, NULL)))
4552         native_connection_unlink(c);
4553
4554     pa_idxset_free(p->connections, NULL, NULL);
4555
4556     pa_strlist_free(p->servers);
4557
4558     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4559         pa_hook_done(&p->hooks[h]);
4560
4561     pa_hashmap_free(p->extensions, NULL, NULL);
4562
4563     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4564
4565     pa_xfree(p);
4566 }
4567
4568 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4569     pa_assert(p);
4570     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4571     pa_assert(name);
4572
4573     p->servers = pa_strlist_prepend(p->servers, name);
4574
4575     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4576 }
4577
4578 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4579     pa_assert(p);
4580     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4581     pa_assert(name);
4582
4583     p->servers = pa_strlist_remove(p->servers, name);
4584
4585     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4586 }
4587
4588 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4589     pa_assert(p);
4590     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4591
4592     return p->hooks;
4593 }
4594
4595 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4596     pa_assert(p);
4597     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4598
4599     return p->servers;
4600 }
4601
4602 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4603     pa_assert(p);
4604     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4605     pa_assert(m);
4606     pa_assert(cb);
4607     pa_assert(!pa_hashmap_get(p->extensions, m));
4608
4609     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4610     return 0;
4611 }
4612
4613 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4614     pa_assert(p);
4615     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4616     pa_assert(m);
4617
4618     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4619 }
4620
4621 pa_native_options* pa_native_options_new(void) {
4622     pa_native_options *o;
4623
4624     o = pa_xnew0(pa_native_options, 1);
4625     PA_REFCNT_INIT(o);
4626
4627     return o;
4628 }
4629
4630 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4631     pa_assert(o);
4632     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4633
4634     PA_REFCNT_INC(o);
4635
4636     return o;
4637 }
4638
4639 void pa_native_options_unref(pa_native_options *o) {
4640     pa_assert(o);
4641     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4642
4643     if (PA_REFCNT_DEC(o) > 0)
4644         return;
4645
4646     pa_xfree(o->auth_group);
4647
4648     if (o->auth_ip_acl)
4649         pa_ip_acl_free(o->auth_ip_acl);
4650
4651     if (o->auth_cookie)
4652         pa_auth_cookie_unref(o->auth_cookie);
4653
4654     pa_xfree(o);
4655 }
4656
4657 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4658     pa_bool_t enabled;
4659     const char *acl;
4660
4661     pa_assert(o);
4662     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4663     pa_assert(ma);
4664
4665     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4666         pa_log("auth-anonymous= expects a boolean argument.");
4667         return -1;
4668     }
4669
4670     enabled = TRUE;
4671     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4672         pa_log("auth-group-enabled= expects a boolean argument.");
4673         return -1;
4674     }
4675
4676     pa_xfree(o->auth_group);
4677     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4678
4679 #ifndef HAVE_CREDS
4680     if (o->auth_group)
4681         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4682 #endif
4683
4684     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4685         pa_ip_acl *ipa;
4686
4687         if (!(ipa = pa_ip_acl_new(acl))) {
4688             pa_log("Failed to parse IP ACL '%s'", acl);
4689             return -1;
4690         }
4691
4692         if (o->auth_ip_acl)
4693             pa_ip_acl_free(o->auth_ip_acl);
4694
4695         o->auth_ip_acl = ipa;
4696     }
4697
4698     enabled = TRUE;
4699     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4700         pa_log("auth-cookie-enabled= expects a boolean argument.");
4701         return -1;
4702     }
4703
4704     if (o->auth_cookie)
4705         pa_auth_cookie_unref(o->auth_cookie);
4706
4707     if (enabled) {
4708         const char *cn;
4709
4710         /* The new name for this is 'auth-cookie', for compat reasons
4711          * we check the old name too */
4712         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4713             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4714                 cn = PA_NATIVE_COOKIE_FILE;
4715
4716         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4717             return -1;
4718
4719     } else
4720           o->auth_cookie = NULL;
4721
4722     return 0;
4723 }
4724
4725 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4726     pa_native_connection_assert_ref(c);
4727
4728     return c->pstream;
4729 }