fix a comment
[profile/ivi/pulseaudio.git] / src / pulsecore / protocol-native.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31
32 #include <pulse/timeval.h>
33 #include <pulse/version.h>
34 #include <pulse/utf8.h>
35 #include <pulse/util.h>
36 #include <pulse/xmalloc.h>
37
38 #include <pulsecore/native-common.h>
39 #include <pulsecore/packet.h>
40 #include <pulsecore/client.h>
41 #include <pulsecore/source-output.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/pstream.h>
44 #include <pulsecore/tagstruct.h>
45 #include <pulsecore/pdispatch.h>
46 #include <pulsecore/pstream-util.h>
47 #include <pulsecore/authkey.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/core-scache.h>
50 #include <pulsecore/core-subscribe.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/strlist.h>
53 #include <pulsecore/shared.h>
54 #include <pulsecore/sample-util.h>
55 #include <pulsecore/llist.h>
56 #include <pulsecore/creds.h>
57 #include <pulsecore/core-util.h>
58 #include <pulsecore/ipacl.h>
59 #include <pulsecore/thread-mq.h>
60
61 #include "protocol-native.h"
62
63 /* Kick a client if it doesn't authenticate within this time */
64 #define AUTH_TIMEOUT 60
65
66 /* Don't accept more connection than this */
67 #define MAX_CONNECTIONS 64
68
69 #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
70 #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
71 #define DEFAULT_PROCESS_MSEC 20   /* 20ms */
72 #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
73
74 struct pa_native_protocol;
75
76 typedef struct record_stream {
77     pa_msgobject parent;
78
79     pa_native_connection *connection;
80     uint32_t index;
81
82     pa_source_output *source_output;
83     pa_memblockq *memblockq;
84     size_t fragment_size;
85     pa_usec_t source_latency;
86 } record_stream;
87
88 PA_DECLARE_CLASS(record_stream);
89 #define RECORD_STREAM(o) (record_stream_cast(o))
90 static PA_DEFINE_CHECK_TYPE(record_stream, pa_msgobject);
91
92 typedef struct output_stream {
93     pa_msgobject parent;
94 } output_stream;
95
96 PA_DECLARE_CLASS(output_stream);
97 #define OUTPUT_STREAM(o) (output_stream_cast(o))
98 static PA_DEFINE_CHECK_TYPE(output_stream, pa_msgobject);
99
100 typedef struct playback_stream {
101     output_stream parent;
102
103     pa_native_connection *connection;
104     uint32_t index;
105
106     pa_sink_input *sink_input;
107     pa_memblockq *memblockq;
108     pa_bool_t is_underrun:1;
109     pa_bool_t drain_request:1;
110     uint32_t drain_tag;
111     uint32_t syncid;
112
113     pa_atomic_t missing;
114     size_t minreq;
115     pa_usec_t sink_latency;
116
117     /* Only updated after SINK_INPUT_MESSAGE_UPDATE_LATENCY */
118     int64_t read_index, write_index;
119     size_t render_memblockq_length;
120 } playback_stream;
121
122 PA_DECLARE_CLASS(playback_stream);
123 #define PLAYBACK_STREAM(o) (playback_stream_cast(o))
124 static PA_DEFINE_CHECK_TYPE(playback_stream, output_stream);
125
126 typedef struct upload_stream {
127     output_stream parent;
128
129     pa_native_connection *connection;
130     uint32_t index;
131
132     pa_memchunk memchunk;
133     size_t length;
134     char *name;
135     pa_sample_spec sample_spec;
136     pa_channel_map channel_map;
137     pa_proplist *proplist;
138 } upload_stream;
139
140 PA_DECLARE_CLASS(upload_stream);
141 #define UPLOAD_STREAM(o) (upload_stream_cast(o))
142 static PA_DEFINE_CHECK_TYPE(upload_stream, output_stream);
143
144 struct pa_native_connection {
145     pa_msgobject parent;
146     pa_native_protocol *protocol;
147     pa_native_options *options;
148     pa_bool_t authorized:1;
149     pa_bool_t is_local:1;
150     uint32_t version;
151     pa_client *client;
152     pa_pstream *pstream;
153     pa_pdispatch *pdispatch;
154     pa_idxset *record_streams, *output_streams;
155     uint32_t rrobin_index;
156     pa_subscription *subscription;
157     pa_time_event *auth_timeout_event;
158 };
159
160 PA_DECLARE_CLASS(pa_native_connection);
161 #define PA_NATIVE_CONNECTION(o) (pa_native_connection_cast(o))
162 static PA_DEFINE_CHECK_TYPE(pa_native_connection, pa_msgobject);
163
164 struct pa_native_protocol {
165     PA_REFCNT_DECLARE;
166
167     pa_core *core;
168     pa_idxset *connections;
169
170     pa_strlist *servers;
171     pa_hook hooks[PA_NATIVE_HOOK_MAX];
172
173     pa_hashmap *extensions;
174 };
175
176 enum {
177     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
178     SINK_INPUT_MESSAGE_DRAIN, /* disabled prebuf, get playback started. */
179     SINK_INPUT_MESSAGE_FLUSH,
180     SINK_INPUT_MESSAGE_TRIGGER,
181     SINK_INPUT_MESSAGE_SEEK,
182     SINK_INPUT_MESSAGE_PREBUF_FORCE,
183     SINK_INPUT_MESSAGE_UPDATE_LATENCY
184 };
185
186 enum {
187     PLAYBACK_STREAM_MESSAGE_REQUEST_DATA,      /* data requested from sink input from the main loop */
188     PLAYBACK_STREAM_MESSAGE_UNDERFLOW,
189     PLAYBACK_STREAM_MESSAGE_OVERFLOW,
190     PLAYBACK_STREAM_MESSAGE_DRAIN_ACK,
191     PLAYBACK_STREAM_MESSAGE_STARTED
192 };
193
194 enum {
195     RECORD_STREAM_MESSAGE_POST_DATA         /* data from source output to main loop */
196 };
197
198 enum {
199     CONNECTION_MESSAGE_RELEASE,
200     CONNECTION_MESSAGE_REVOKE
201 };
202
203 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
204 static void sink_input_kill_cb(pa_sink_input *i);
205 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
206 static void sink_input_moved_cb(pa_sink_input *i);
207 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
208 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
209 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes);
210 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl);
211
212 static void native_connection_send_memblock(pa_native_connection *c);
213 static void playback_stream_request_bytes(struct playback_stream*s);
214
215 static void source_output_kill_cb(pa_source_output *o);
216 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
217 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend);
218 static void source_output_moved_cb(pa_source_output *o);
219 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
220 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl);
221
222 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
223
224 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
225 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
226 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
227 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
228 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
229 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
230 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
231 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
232 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
233 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
234 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
235 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
236 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
237 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
238 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
239 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
240 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
241 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
242 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
243 static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
244 static void command_set_mute(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
245 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
246 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
247 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
248 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
249 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
250 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
251 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
252 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
253 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
254 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
255 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
256 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
257 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
258 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
259 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
260 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
261 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
262
263 static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
264     [PA_COMMAND_ERROR] = NULL,
265     [PA_COMMAND_TIMEOUT] = NULL,
266     [PA_COMMAND_REPLY] = NULL,
267     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
268     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
269     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = command_drain_playback_stream,
270     [PA_COMMAND_CREATE_RECORD_STREAM] = command_create_record_stream,
271     [PA_COMMAND_DELETE_RECORD_STREAM] = command_delete_stream,
272     [PA_COMMAND_AUTH] = command_auth,
273     [PA_COMMAND_REQUEST] = NULL,
274     [PA_COMMAND_EXIT] = command_exit,
275     [PA_COMMAND_SET_CLIENT_NAME] = command_set_client_name,
276     [PA_COMMAND_LOOKUP_SINK] = command_lookup,
277     [PA_COMMAND_LOOKUP_SOURCE] = command_lookup,
278     [PA_COMMAND_STAT] = command_stat,
279     [PA_COMMAND_GET_PLAYBACK_LATENCY] = command_get_playback_latency,
280     [PA_COMMAND_GET_RECORD_LATENCY] = command_get_record_latency,
281     [PA_COMMAND_CREATE_UPLOAD_STREAM] = command_create_upload_stream,
282     [PA_COMMAND_DELETE_UPLOAD_STREAM] = command_delete_stream,
283     [PA_COMMAND_FINISH_UPLOAD_STREAM] = command_finish_upload_stream,
284     [PA_COMMAND_PLAY_SAMPLE] = command_play_sample,
285     [PA_COMMAND_REMOVE_SAMPLE] = command_remove_sample,
286     [PA_COMMAND_GET_SINK_INFO] = command_get_info,
287     [PA_COMMAND_GET_SOURCE_INFO] = command_get_info,
288     [PA_COMMAND_GET_CLIENT_INFO] = command_get_info,
289     [PA_COMMAND_GET_CARD_INFO] = command_get_info,
290     [PA_COMMAND_GET_MODULE_INFO] = command_get_info,
291     [PA_COMMAND_GET_SINK_INPUT_INFO] = command_get_info,
292     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = command_get_info,
293     [PA_COMMAND_GET_SAMPLE_INFO] = command_get_info,
294     [PA_COMMAND_GET_SINK_INFO_LIST] = command_get_info_list,
295     [PA_COMMAND_GET_SOURCE_INFO_LIST] = command_get_info_list,
296     [PA_COMMAND_GET_MODULE_INFO_LIST] = command_get_info_list,
297     [PA_COMMAND_GET_CLIENT_INFO_LIST] = command_get_info_list,
298     [PA_COMMAND_GET_CARD_INFO_LIST] = command_get_info_list,
299     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = command_get_info_list,
300     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = command_get_info_list,
301     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = command_get_info_list,
302     [PA_COMMAND_GET_SERVER_INFO] = command_get_server_info,
303     [PA_COMMAND_SUBSCRIBE] = command_subscribe,
304
305     [PA_COMMAND_SET_SINK_VOLUME] = command_set_volume,
306     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
307     [PA_COMMAND_SET_SOURCE_VOLUME] = command_set_volume,
308
309     [PA_COMMAND_SET_SINK_MUTE] = command_set_mute,
310     [PA_COMMAND_SET_SINK_INPUT_MUTE] = command_set_mute,
311     [PA_COMMAND_SET_SOURCE_MUTE] = command_set_mute,
312
313     [PA_COMMAND_SUSPEND_SINK] = command_suspend,
314     [PA_COMMAND_SUSPEND_SOURCE] = command_suspend,
315
316     [PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
317     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
318     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
319     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_flush_or_prebuf_playback_stream,
320
321     [PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
322     [PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
323
324     [PA_COMMAND_SET_DEFAULT_SINK] = command_set_default_sink_or_source,
325     [PA_COMMAND_SET_DEFAULT_SOURCE] = command_set_default_sink_or_source,
326     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = command_set_stream_name,
327     [PA_COMMAND_SET_RECORD_STREAM_NAME] = command_set_stream_name,
328     [PA_COMMAND_KILL_CLIENT] = command_kill,
329     [PA_COMMAND_KILL_SINK_INPUT] = command_kill,
330     [PA_COMMAND_KILL_SOURCE_OUTPUT] = command_kill,
331     [PA_COMMAND_LOAD_MODULE] = command_load_module,
332     [PA_COMMAND_UNLOAD_MODULE] = command_unload_module,
333
334     [PA_COMMAND_GET_AUTOLOAD_INFO___OBSOLETE] = NULL,
335     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST___OBSOLETE] = NULL,
336     [PA_COMMAND_ADD_AUTOLOAD___OBSOLETE] = NULL,
337     [PA_COMMAND_REMOVE_AUTOLOAD___OBSOLETE] = NULL,
338
339     [PA_COMMAND_MOVE_SINK_INPUT] = command_move_stream,
340     [PA_COMMAND_MOVE_SOURCE_OUTPUT] = command_move_stream,
341
342     [PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
343     [PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR] = command_set_stream_buffer_attr,
344
345     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
346     [PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE] = command_update_stream_sample_rate,
347
348     [PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST] = command_update_proplist,
349     [PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST] = command_update_proplist,
350     [PA_COMMAND_UPDATE_CLIENT_PROPLIST] = command_update_proplist,
351
352     [PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST] = command_remove_proplist,
353     [PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST] = command_remove_proplist,
354     [PA_COMMAND_REMOVE_CLIENT_PROPLIST] = command_remove_proplist,
355
356     [PA_COMMAND_SET_CARD_PROFILE] = command_set_card_profile,
357
358     [PA_COMMAND_EXTENSION] = command_extension
359 };
360
361 /* structure management */
362
363 static void upload_stream_unlink(upload_stream *s) {
364     pa_assert(s);
365
366     if (!s->connection)
367         return;
368
369     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
370     s->connection = NULL;
371     upload_stream_unref(s);
372 }
373
374 static void upload_stream_free(pa_object *o) {
375     upload_stream *s = UPLOAD_STREAM(o);
376     pa_assert(s);
377
378     upload_stream_unlink(s);
379
380     pa_xfree(s->name);
381
382     if (s->proplist)
383         pa_proplist_free(s->proplist);
384
385     if (s->memchunk.memblock)
386         pa_memblock_unref(s->memchunk.memblock);
387
388     pa_xfree(s);
389 }
390
391 static upload_stream* upload_stream_new(
392         pa_native_connection *c,
393         const pa_sample_spec *ss,
394         const pa_channel_map *map,
395         const char *name,
396         size_t length,
397         pa_proplist *p) {
398
399     upload_stream *s;
400
401     pa_assert(c);
402     pa_assert(ss);
403     pa_assert(name);
404     pa_assert(length > 0);
405     pa_assert(p);
406
407     s = pa_msgobject_new(upload_stream);
408     s->parent.parent.parent.free = upload_stream_free;
409     s->connection = c;
410     s->sample_spec = *ss;
411     s->channel_map = *map;
412     s->name = pa_xstrdup(name);
413     pa_memchunk_reset(&s->memchunk);
414     s->length = length;
415     s->proplist = pa_proplist_copy(p);
416     pa_proplist_update(s->proplist, PA_UPDATE_MERGE, c->client->proplist);
417
418     pa_idxset_put(c->output_streams, s, &s->index);
419
420     return s;
421 }
422
423 static void record_stream_unlink(record_stream *s) {
424     pa_assert(s);
425
426     if (!s->connection)
427         return;
428
429     if (s->source_output) {
430         pa_source_output_unlink(s->source_output);
431         pa_source_output_unref(s->source_output);
432         s->source_output = NULL;
433     }
434
435     pa_assert_se(pa_idxset_remove_by_data(s->connection->record_streams, s, NULL) == s);
436     s->connection = NULL;
437     record_stream_unref(s);
438 }
439
440 static void record_stream_free(pa_object *o) {
441     record_stream *s = RECORD_STREAM(o);
442     pa_assert(s);
443
444     record_stream_unlink(s);
445
446     pa_memblockq_free(s->memblockq);
447     pa_xfree(s);
448 }
449
450 static int record_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
451     record_stream *s = RECORD_STREAM(o);
452     record_stream_assert_ref(s);
453
454     if (!s->connection)
455         return -1;
456
457     switch (code) {
458
459         case RECORD_STREAM_MESSAGE_POST_DATA:
460
461             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
462 /*                 pa_log_warn("Failed to push data into output queue."); */
463                 return -1;
464             }
465
466             if (!pa_pstream_is_pending(s->connection->pstream))
467                 native_connection_send_memblock(s->connection);
468
469             break;
470     }
471
472     return 0;
473 }
474
475 static void fix_record_buffer_attr_pre(
476         record_stream *s,
477         pa_bool_t adjust_latency,
478         pa_bool_t early_requests,
479         uint32_t *maxlength,
480         uint32_t *fragsize) {
481
482     size_t frame_size;
483     pa_usec_t orig_fragsize_usec, fragsize_usec, source_usec;
484
485     pa_assert(s);
486     pa_assert(maxlength);
487     pa_assert(fragsize);
488
489     frame_size = pa_frame_size(&s->source_output->sample_spec);
490
491     if (*maxlength == (uint32_t) -1 || *maxlength > MAX_MEMBLOCKQ_LENGTH)
492         *maxlength = MAX_MEMBLOCKQ_LENGTH;
493     if (*maxlength <= 0)
494         *maxlength = (uint32_t) frame_size;
495
496     if (*fragsize == (uint32_t) -1)
497         *fragsize = (uint32_t) pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &s->source_output->sample_spec);
498     if (*fragsize <= 0)
499         *fragsize = (uint32_t) frame_size;
500
501     orig_fragsize_usec = fragsize_usec = pa_bytes_to_usec(*fragsize, &s->source_output->sample_spec);
502
503     if (early_requests) {
504
505         /* In early request mode we need to emulate the classic
506          * fragment-based playback model. We do this setting the source
507          * latency to the fragment size. */
508
509         source_usec = fragsize_usec;
510
511     } else if (adjust_latency) {
512
513         /* So, the user asked us to adjust the latency according to
514          * what the source can provide. Half the latency will be
515          * spent on the hw buffer, half of it in the async buffer
516          * queue we maintain for each client. */
517
518         source_usec = fragsize_usec/2;
519
520     } else {
521
522         /* Ok, the user didn't ask us to adjust the latency, hence we
523          * don't */
524
525         source_usec = 0;
526     }
527
528     if (source_usec > 0)
529         s->source_latency = pa_source_output_set_requested_latency(s->source_output, source_usec);
530     else
531         s->source_latency = 0;
532
533     if (early_requests) {
534
535         /* Ok, we didn't necessarily get what we were asking for, so
536          * let's tell the user */
537
538         fragsize_usec = s->source_latency;
539
540     } else if (adjust_latency) {
541
542         /* Now subtract what we actually got */
543
544         if (fragsize_usec >= s->source_latency*2)
545             fragsize_usec -= s->source_latency;
546         else
547             fragsize_usec = s->source_latency;
548     }
549
550     if (pa_usec_to_bytes(orig_fragsize_usec, &s->source_output->sample_spec) !=
551         pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec))
552
553         *fragsize = (uint32_t) pa_usec_to_bytes(fragsize_usec, &s->source_output->sample_spec);
554
555     if (*fragsize <= 0)
556         *fragsize = (uint32_t) frame_size;
557 }
558
559 static void fix_record_buffer_attr_post(
560         record_stream *s,
561         uint32_t *maxlength,
562         uint32_t *fragsize) {
563
564     size_t base;
565
566     pa_assert(s);
567     pa_assert(maxlength);
568     pa_assert(fragsize);
569
570     *maxlength = (uint32_t) pa_memblockq_get_maxlength(s->memblockq);
571
572     base = pa_frame_size(&s->source_output->sample_spec);
573
574     s->fragment_size = (*fragsize/base)*base;
575     if (s->fragment_size <= 0)
576         s->fragment_size = base;
577
578     if (s->fragment_size > *maxlength)
579         s->fragment_size = *maxlength;
580
581     *fragsize = (uint32_t) s->fragment_size;
582 }
583
584 static record_stream* record_stream_new(
585         pa_native_connection *c,
586         pa_source *source,
587         pa_sample_spec *ss,
588         pa_channel_map *map,
589         pa_bool_t peak_detect,
590         uint32_t *maxlength,
591         uint32_t *fragsize,
592         pa_source_output_flags_t flags,
593         pa_proplist *p,
594         pa_bool_t adjust_latency,
595         pa_sink_input *direct_on_input,
596         pa_bool_t early_requests,
597         int *ret) {
598
599     record_stream *s;
600     pa_source_output *source_output = NULL;
601     size_t base;
602     pa_source_output_new_data data;
603
604     pa_assert(c);
605     pa_assert(ss);
606     pa_assert(maxlength);
607     pa_assert(p);
608     pa_assert(ret);
609
610     pa_source_output_new_data_init(&data);
611
612     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
613     data.driver = __FILE__;
614     data.module = c->options->module;
615     data.client = c->client;
616     data.source = source;
617     data.direct_on_input = direct_on_input;
618     pa_source_output_new_data_set_sample_spec(&data, ss);
619     pa_source_output_new_data_set_channel_map(&data, map);
620     if (peak_detect)
621         data.resample_method = PA_RESAMPLER_PEAKS;
622
623     *ret = -pa_source_output_new(&source_output, c->protocol->core, &data, flags);
624
625     pa_source_output_new_data_done(&data);
626
627     if (!source_output)
628         return NULL;
629
630     s = pa_msgobject_new(record_stream);
631     s->parent.parent.free = record_stream_free;
632     s->parent.process_msg = record_stream_process_msg;
633     s->connection = c;
634     s->source_output = source_output;
635
636     s->source_output->push = source_output_push_cb;
637     s->source_output->kill = source_output_kill_cb;
638     s->source_output->get_latency = source_output_get_latency_cb;
639     s->source_output->moved = source_output_moved_cb;
640     s->source_output->suspend = source_output_suspend_cb;
641     s->source_output->send_event = source_output_send_event_cb;
642     s->source_output->userdata = s;
643
644     fix_record_buffer_attr_pre(s, adjust_latency, early_requests, maxlength, fragsize);
645
646     s->memblockq = pa_memblockq_new(
647             0,
648             *maxlength,
649             0,
650             base = pa_frame_size(&source_output->sample_spec),
651             1,
652             0,
653             0,
654             NULL);
655
656     fix_record_buffer_attr_post(s, maxlength, fragsize);
657
658     *ss = s->source_output->sample_spec;
659     *map = s->source_output->channel_map;
660
661     pa_idxset_put(c->record_streams, s, &s->index);
662
663     pa_log_info("Final latency %0.2f ms = %0.2f ms + %0.2f ms",
664                 ((double) pa_bytes_to_usec(s->fragment_size, &source_output->sample_spec) + (double) s->source_latency) / PA_USEC_PER_MSEC,
665                 (double) pa_bytes_to_usec(s->fragment_size, &source_output->sample_spec) / PA_USEC_PER_MSEC,
666                 (double) s->source_latency / PA_USEC_PER_MSEC);
667
668     pa_source_output_put(s->source_output);
669     return s;
670 }
671
672 static void record_stream_send_killed(record_stream *r) {
673     pa_tagstruct *t;
674     record_stream_assert_ref(r);
675
676     t = pa_tagstruct_new(NULL, 0);
677     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
678     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
679     pa_tagstruct_putu32(t, r->index);
680     pa_pstream_send_tagstruct(r->connection->pstream, t);
681 }
682
683 static void playback_stream_unlink(playback_stream *s) {
684     pa_assert(s);
685
686     if (!s->connection)
687         return;
688
689     if (s->sink_input) {
690         pa_sink_input_unlink(s->sink_input);
691         pa_sink_input_unref(s->sink_input);
692         s->sink_input = NULL;
693     }
694
695     if (s->drain_request)
696         pa_pstream_send_error(s->connection->pstream, s->drain_tag, PA_ERR_NOENTITY);
697
698     pa_assert_se(pa_idxset_remove_by_data(s->connection->output_streams, s, NULL) == s);
699     s->connection = NULL;
700     playback_stream_unref(s);
701 }
702
703 static void playback_stream_free(pa_object* o) {
704     playback_stream *s = PLAYBACK_STREAM(o);
705     pa_assert(s);
706
707     playback_stream_unlink(s);
708
709     pa_memblockq_free(s->memblockq);
710     pa_xfree(s);
711 }
712
713 static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
714     playback_stream *s = PLAYBACK_STREAM(o);
715     playback_stream_assert_ref(s);
716
717     if (!s->connection)
718         return -1;
719
720     switch (code) {
721         case PLAYBACK_STREAM_MESSAGE_REQUEST_DATA: {
722             pa_tagstruct *t;
723             uint32_t l = 0;
724
725             for (;;) {
726                 if ((l = (uint32_t) pa_atomic_load(&s->missing)) <= 0)
727                     break;
728
729                 if (pa_atomic_cmpxchg(&s->missing, (int) l, 0))
730                     break;
731             }
732
733             if (l <= 0)
734                 break;
735
736             t = pa_tagstruct_new(NULL, 0);
737             pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
738             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
739             pa_tagstruct_putu32(t, s->index);
740             pa_tagstruct_putu32(t, l);
741             pa_pstream_send_tagstruct(s->connection->pstream, t);
742
743 /*             pa_log("Requesting %lu bytes", (unsigned long) l); */
744             break;
745         }
746
747         case PLAYBACK_STREAM_MESSAGE_UNDERFLOW: {
748             pa_tagstruct *t;
749
750 /*             pa_log("signalling underflow"); */
751
752             /* Report that we're empty */
753             t = pa_tagstruct_new(NULL, 0);
754             pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
755             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
756             pa_tagstruct_putu32(t, s->index);
757             pa_pstream_send_tagstruct(s->connection->pstream, t);
758             break;
759         }
760
761         case PLAYBACK_STREAM_MESSAGE_OVERFLOW: {
762             pa_tagstruct *t;
763
764             /* Notify the user we're overflowed*/
765             t = pa_tagstruct_new(NULL, 0);
766             pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
767             pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
768             pa_tagstruct_putu32(t, s->index);
769             pa_pstream_send_tagstruct(s->connection->pstream, t);
770             break;
771         }
772
773         case PLAYBACK_STREAM_MESSAGE_STARTED:
774
775             if (s->connection->version >= 13) {
776                 pa_tagstruct *t;
777
778                 /* Notify the user we started playback */
779                 t = pa_tagstruct_new(NULL, 0);
780                 pa_tagstruct_putu32(t, PA_COMMAND_STARTED);
781                 pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
782                 pa_tagstruct_putu32(t, s->index);
783                 pa_pstream_send_tagstruct(s->connection->pstream, t);
784             }
785
786             break;
787
788         case PLAYBACK_STREAM_MESSAGE_DRAIN_ACK:
789             pa_pstream_send_simple_ack(s->connection->pstream, PA_PTR_TO_UINT(userdata));
790             break;
791     }
792
793     return 0;
794 }
795
796 static void fix_playback_buffer_attr_pre(
797         playback_stream *s,
798         pa_bool_t adjust_latency,
799         pa_bool_t early_requests,
800         uint32_t *maxlength,
801         uint32_t *tlength,
802         uint32_t* prebuf,
803         uint32_t* minreq) {
804
805     size_t frame_size;
806     pa_usec_t orig_tlength_usec, tlength_usec, orig_minreq_usec, minreq_usec, sink_usec;
807
808     pa_assert(s);
809     pa_assert(maxlength);
810     pa_assert(tlength);
811     pa_assert(prebuf);
812     pa_assert(minreq);
813
814     frame_size = pa_frame_size(&s->sink_input->sample_spec);
815
816     if (*maxlength == (uint32_t) -1 || *maxlength > MAX_MEMBLOCKQ_LENGTH)
817         *maxlength = MAX_MEMBLOCKQ_LENGTH;
818     if (*maxlength <= 0)
819         *maxlength = (uint32_t) frame_size;
820
821     if (*tlength == (uint32_t) -1)
822         *tlength = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
823     if (*tlength <= 0)
824         *tlength = (uint32_t) frame_size;
825
826     if (*minreq == (uint32_t) -1)
827         *minreq = (uint32_t) pa_usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &s->sink_input->sample_spec);
828     if (*minreq <= 0)
829         *minreq = (uint32_t) frame_size;
830
831     if (*tlength < *minreq+frame_size)
832         *tlength = *minreq+(uint32_t) frame_size;
833
834     orig_tlength_usec = tlength_usec = pa_bytes_to_usec(*tlength, &s->sink_input->sample_spec);
835     orig_minreq_usec = minreq_usec = pa_bytes_to_usec(*minreq, &s->sink_input->sample_spec);
836
837     pa_log_info("Requested tlength=%0.2f ms, minreq=%0.2f ms",
838                 (double) tlength_usec / PA_USEC_PER_MSEC,
839                 (double) minreq_usec / PA_USEC_PER_MSEC);
840
841     if (early_requests) {
842
843         /* In early request mode we need to emulate the classic
844          * fragment-based playback model. We do this setting the sink
845          * latency to the fragment size. */
846
847         sink_usec = minreq_usec;
848
849         pa_log_debug("Early requests mode enabled, configuring sink latency to minreq.");
850
851     } else if (adjust_latency) {
852
853         /* So, the user asked us to adjust the latency of the stream
854          * buffer according to the what the sink can provide. The
855          * tlength passed in shall be the overall latency. Roughly
856          * half the latency will be spent on the hw buffer, the other
857          * half of it in the async buffer queue we maintain for each
858          * client. In between we'll have a safety space of size
859          * 2*minreq. Why the 2*minreq? When the hw buffer is completey
860          * empty and needs to be filled, then our buffer must have
861          * enough data to fulfill this request immediatly and thus
862          * have at least the same tlength as the size of the hw
863          * buffer. It additionally needs space for 2 times minreq
864          * because if the buffer ran empty and a partial fillup
865          * happens immediately on the next iteration we need to be
866          * able to fulfill it and give the application also minreq
867          * time to fill it up again for the next request Makes 2 times
868          * minreq in plus.. */
869
870         if (tlength_usec > minreq_usec*2)
871             sink_usec = (tlength_usec - minreq_usec*2)/2;
872         else
873             sink_usec = 0;
874
875         pa_log_debug("Adjust latency mode enabled, configuring sink latency to half of overall latency.");
876
877     } else {
878
879         /* Ok, the user didn't ask us to adjust the latency, but we
880          * still need to make sure that the parameters from the user
881          * do make sense. */
882
883         if (tlength_usec > minreq_usec*2)
884             sink_usec = (tlength_usec - minreq_usec*2);
885         else
886             sink_usec = 0;
887
888         pa_log_debug("Traditional mode enabled, modifying sink usec only for compat with minreq.");
889     }
890
891     s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, sink_usec);
892
893     if (early_requests) {
894
895         /* Ok, we didn't necessarily get what we were asking for, so
896          * let's tell the user */
897
898         minreq_usec = s->sink_latency;
899
900     } else if (adjust_latency) {
901
902         /* Ok, we didn't necessarily get what we were asking for, so
903          * let's subtract from what we asked for for the remaining
904          * buffer space */
905
906         if (tlength_usec >= s->sink_latency)
907             tlength_usec -= s->sink_latency;
908     }
909
910     /* FIXME: This is actually larger than necessary, since not all of
911      * the sink latency is actually rewritable. */
912     if (tlength_usec < s->sink_latency + 2*minreq_usec)
913         tlength_usec = s->sink_latency + 2*minreq_usec;
914
915     if (pa_usec_to_bytes_round_up(orig_tlength_usec, &s->sink_input->sample_spec) !=
916         pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec))
917         *tlength = (uint32_t) pa_usec_to_bytes_round_up(tlength_usec, &s->sink_input->sample_spec);
918
919     if (pa_usec_to_bytes(orig_minreq_usec, &s->sink_input->sample_spec) !=
920         pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec))
921         *minreq = (uint32_t) pa_usec_to_bytes(minreq_usec, &s->sink_input->sample_spec);
922
923     if (*minreq <= 0) {
924         *minreq = (uint32_t) frame_size;
925         *tlength += (uint32_t) frame_size*2;
926     }
927
928     if (*tlength <= *minreq)
929         *tlength = *minreq*2 + (uint32_t) frame_size;
930
931     if (*prebuf == (uint32_t) -1 || *prebuf > *tlength)
932         *prebuf = *tlength;
933 }
934
935 static void fix_playback_buffer_attr_post(
936         playback_stream *s,
937         uint32_t *maxlength,
938         uint32_t *tlength,
939         uint32_t* prebuf,
940         uint32_t* minreq) {
941
942     pa_assert(s);
943     pa_assert(maxlength);
944     pa_assert(tlength);
945     pa_assert(prebuf);
946     pa_assert(minreq);
947
948     *maxlength = (uint32_t) pa_memblockq_get_maxlength(s->memblockq);
949     *tlength = (uint32_t) pa_memblockq_get_tlength(s->memblockq);
950     *prebuf = (uint32_t) pa_memblockq_get_prebuf(s->memblockq);
951     *minreq = (uint32_t) pa_memblockq_get_minreq(s->memblockq);
952
953     s->minreq = *minreq;
954 }
955
956 static playback_stream* playback_stream_new(
957         pa_native_connection *c,
958         pa_sink *sink,
959         pa_sample_spec *ss,
960         pa_channel_map *map,
961         uint32_t *maxlength,
962         uint32_t *tlength,
963         uint32_t *prebuf,
964         uint32_t *minreq,
965         pa_cvolume *volume,
966         pa_bool_t muted,
967         pa_bool_t muted_set,
968         uint32_t syncid,
969         uint32_t *missing,
970         pa_sink_input_flags_t flags,
971         pa_proplist *p,
972         pa_bool_t adjust_latency,
973         pa_bool_t early_requests,
974         int *ret) {
975
976     playback_stream *s, *ssync;
977     pa_sink_input *sink_input = NULL;
978     pa_memchunk silence;
979     uint32_t idx;
980     int64_t start_index;
981     pa_sink_input_new_data data;
982
983     pa_assert(c);
984     pa_assert(ss);
985     pa_assert(maxlength);
986     pa_assert(tlength);
987     pa_assert(prebuf);
988     pa_assert(minreq);
989     pa_assert(missing);
990     pa_assert(p);
991     pa_assert(ret);
992
993     /* Find syncid group */
994     for (ssync = pa_idxset_first(c->output_streams, &idx); ssync; ssync = pa_idxset_next(c->output_streams, &idx)) {
995
996         if (!playback_stream_isinstance(ssync))
997             continue;
998
999         if (ssync->syncid == syncid)
1000             break;
1001     }
1002
1003     /* Synced streams must connect to the same sink */
1004     if (ssync) {
1005
1006         if (!sink)
1007             sink = ssync->sink_input->sink;
1008         else if (sink != ssync->sink_input->sink) {
1009             *ret = PA_ERR_INVALID;
1010             return NULL;
1011         }
1012     }
1013
1014     pa_sink_input_new_data_init(&data);
1015
1016     pa_proplist_update(data.proplist, PA_UPDATE_REPLACE, p);
1017     data.driver = __FILE__;
1018     data.module = c->options->module;
1019     data.client = c->client;
1020     data.sink = sink;
1021     pa_sink_input_new_data_set_sample_spec(&data, ss);
1022     pa_sink_input_new_data_set_channel_map(&data, map);
1023     if (volume)
1024         pa_sink_input_new_data_set_volume(&data, volume);
1025     if (muted_set)
1026         pa_sink_input_new_data_set_muted(&data, muted);
1027     data.sync_base = ssync ? ssync->sink_input : NULL;
1028
1029     *ret = -pa_sink_input_new(&sink_input, c->protocol->core, &data, flags);
1030
1031     pa_sink_input_new_data_done(&data);
1032
1033     if (!sink_input)
1034         return NULL;
1035
1036     s = pa_msgobject_new(playback_stream);
1037     s->parent.parent.parent.free = playback_stream_free;
1038     s->parent.parent.process_msg = playback_stream_process_msg;
1039     s->connection = c;
1040     s->syncid = syncid;
1041     s->sink_input = sink_input;
1042     s->is_underrun = TRUE;
1043     s->drain_request = FALSE;
1044     pa_atomic_store(&s->missing, 0);
1045
1046     s->sink_input->parent.process_msg = sink_input_process_msg;
1047     s->sink_input->pop = sink_input_pop_cb;
1048     s->sink_input->process_rewind = sink_input_process_rewind_cb;
1049     s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1050     s->sink_input->update_max_request = sink_input_update_max_request_cb;
1051     s->sink_input->kill = sink_input_kill_cb;
1052     s->sink_input->moved = sink_input_moved_cb;
1053     s->sink_input->suspend = sink_input_suspend_cb;
1054     s->sink_input->send_event = sink_input_send_event_cb;
1055     s->sink_input->userdata = s;
1056
1057     start_index = ssync ? pa_memblockq_get_read_index(ssync->memblockq) : 0;
1058
1059     fix_playback_buffer_attr_pre(s, adjust_latency, early_requests, maxlength, tlength, prebuf, minreq);
1060     pa_sink_input_get_silence(sink_input, &silence);
1061
1062     s->memblockq = pa_memblockq_new(
1063             start_index,
1064             *maxlength,
1065             *tlength,
1066             pa_frame_size(&sink_input->sample_spec),
1067             *prebuf,
1068             *minreq,
1069             0,
1070             &silence);
1071
1072     pa_memblock_unref(silence.memblock);
1073     fix_playback_buffer_attr_post(s, maxlength, tlength, prebuf, minreq);
1074
1075     *missing = (uint32_t) pa_memblockq_pop_missing(s->memblockq);
1076
1077     *ss = s->sink_input->sample_spec;
1078     *map = s->sink_input->channel_map;
1079
1080     pa_idxset_put(c->output_streams, s, &s->index);
1081
1082     pa_log_info("Final latency %0.2f ms = %0.2f ms + 2*%0.2f ms + %0.2f ms",
1083                 ((double) pa_bytes_to_usec(*tlength, &sink_input->sample_spec) + (double) s->sink_latency) / PA_USEC_PER_MSEC,
1084                 (double) pa_bytes_to_usec(*tlength-*minreq*2, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1085                 (double) pa_bytes_to_usec(*minreq, &sink_input->sample_spec) / PA_USEC_PER_MSEC,
1086                 (double) s->sink_latency / PA_USEC_PER_MSEC);
1087
1088     pa_sink_input_put(s->sink_input);
1089     return s;
1090 }
1091
1092 /* Called from thread context */
1093 static void playback_stream_request_bytes(playback_stream *s) {
1094     size_t m, previous_missing;
1095
1096     playback_stream_assert_ref(s);
1097
1098     m = pa_memblockq_pop_missing(s->memblockq);
1099
1100     if (m <= 0)
1101         return;
1102
1103 /*     pa_log("request_bytes(%lu)", (unsigned long) m); */
1104
1105     previous_missing = (size_t) pa_atomic_add(&s->missing, (int) m);
1106
1107     if (pa_memblockq_prebuf_active(s->memblockq) ||
1108         (previous_missing < s->minreq && previous_missing+m >= s->minreq))
1109         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1110 }
1111
1112
1113 static void playback_stream_send_killed(playback_stream *p) {
1114     pa_tagstruct *t;
1115     playback_stream_assert_ref(p);
1116
1117     t = pa_tagstruct_new(NULL, 0);
1118     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
1119     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1120     pa_tagstruct_putu32(t, p->index);
1121     pa_pstream_send_tagstruct(p->connection->pstream, t);
1122 }
1123
1124 static int native_connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1125     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1126     pa_native_connection_assert_ref(c);
1127
1128     if (!c->protocol)
1129         return -1;
1130
1131     switch (code) {
1132
1133         case CONNECTION_MESSAGE_REVOKE:
1134             pa_pstream_send_revoke(c->pstream, PA_PTR_TO_UINT(userdata));
1135             break;
1136
1137         case CONNECTION_MESSAGE_RELEASE:
1138             pa_pstream_send_release(c->pstream, PA_PTR_TO_UINT(userdata));
1139             break;
1140     }
1141
1142     return 0;
1143 }
1144
1145 static void native_connection_unlink(pa_native_connection *c) {
1146     record_stream *r;
1147     output_stream *o;
1148
1149     pa_assert(c);
1150
1151     if (!c->protocol)
1152         return;
1153
1154     pa_hook_fire(&c->protocol->hooks[PA_NATIVE_HOOK_CONNECTION_UNLINK], c);
1155
1156     if (c->options)
1157         pa_native_options_unref(c->options);
1158
1159     while ((r = pa_idxset_first(c->record_streams, NULL)))
1160         record_stream_unlink(r);
1161
1162     while ((o = pa_idxset_first(c->output_streams, NULL)))
1163         if (playback_stream_isinstance(o))
1164             playback_stream_unlink(PLAYBACK_STREAM(o));
1165         else
1166             upload_stream_unlink(UPLOAD_STREAM(o));
1167
1168     if (c->subscription)
1169         pa_subscription_free(c->subscription);
1170
1171     if (c->pstream)
1172         pa_pstream_unlink(c->pstream);
1173
1174     if (c->auth_timeout_event) {
1175         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
1176         c->auth_timeout_event = NULL;
1177     }
1178
1179     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
1180     c->protocol = NULL;
1181     pa_native_connection_unref(c);
1182 }
1183
1184 static void native_connection_free(pa_object *o) {
1185     pa_native_connection *c = PA_NATIVE_CONNECTION(o);
1186
1187     pa_assert(c);
1188
1189     native_connection_unlink(c);
1190
1191     pa_idxset_free(c->record_streams, NULL, NULL);
1192     pa_idxset_free(c->output_streams, NULL, NULL);
1193
1194     pa_pdispatch_unref(c->pdispatch);
1195     pa_pstream_unref(c->pstream);
1196     pa_client_free(c->client);
1197
1198     pa_xfree(c);
1199 }
1200
1201 static void native_connection_send_memblock(pa_native_connection *c) {
1202     uint32_t start;
1203     record_stream *r;
1204
1205     start = PA_IDXSET_INVALID;
1206     for (;;) {
1207         pa_memchunk chunk;
1208
1209         if (!(r = RECORD_STREAM(pa_idxset_rrobin(c->record_streams, &c->rrobin_index))))
1210             return;
1211
1212         if (start == PA_IDXSET_INVALID)
1213             start = c->rrobin_index;
1214         else if (start == c->rrobin_index)
1215             return;
1216
1217         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
1218             pa_memchunk schunk = chunk;
1219
1220             if (schunk.length > r->fragment_size)
1221                 schunk.length = r->fragment_size;
1222
1223             pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
1224
1225             pa_memblockq_drop(r->memblockq, schunk.length);
1226             pa_memblock_unref(schunk.memblock);
1227
1228             return;
1229         }
1230     }
1231 }
1232
1233 /*** sink input callbacks ***/
1234
1235 static void handle_seek(playback_stream *s, int64_t indexw) {
1236     playback_stream_assert_ref(s);
1237
1238 /*     pa_log("handle_seek: %llu -- %i", (unsigned long long) s->sink_input->thread_info.underrun_for, pa_memblockq_is_readable(s->memblockq)); */
1239
1240     if (s->sink_input->thread_info.underrun_for > 0) {
1241
1242 /*         pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
1243
1244         if (pa_memblockq_is_readable(s->memblockq)) {
1245
1246             /* We just ended an underrun, let's ask the sink
1247              * for a complete rewind rewrite */
1248
1249             pa_log_debug("Requesting rewind due to end of underrun.");
1250             pa_sink_input_request_rewind(s->sink_input,
1251                                          (size_t) (s->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : s->sink_input->thread_info.underrun_for),
1252                                          FALSE, TRUE, FALSE);
1253         }
1254
1255     } else {
1256         int64_t indexr;
1257
1258         indexr = pa_memblockq_get_read_index(s->memblockq);
1259
1260         if (indexw < indexr) {
1261             /* OK, the sink already asked for this data, so
1262              * let's have it usk us again */
1263
1264             pa_log_debug("Requesting rewind due to rewrite.");
1265             pa_sink_input_request_rewind(s->sink_input, (size_t) (indexr - indexw), TRUE, FALSE, FALSE);
1266         }
1267     }
1268
1269     playback_stream_request_bytes(s);
1270 }
1271
1272 /* Called from thread context */
1273 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1274     pa_sink_input *i = PA_SINK_INPUT(o);
1275     playback_stream *s;
1276
1277     pa_sink_input_assert_ref(i);
1278     s = PLAYBACK_STREAM(i->userdata);
1279     playback_stream_assert_ref(s);
1280
1281     switch (code) {
1282
1283         case SINK_INPUT_MESSAGE_SEEK: {
1284             int64_t windex;
1285
1286             windex = pa_memblockq_get_write_index(s->memblockq);
1287             pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
1288
1289             handle_seek(s, windex);
1290             return 0;
1291         }
1292
1293         case SINK_INPUT_MESSAGE_POST_DATA: {
1294             int64_t windex;
1295
1296             pa_assert(chunk);
1297
1298             windex = pa_memblockq_get_write_index(s->memblockq);
1299
1300 /*             pa_log("sink input post: %lu %lli", (unsigned long) chunk->length, (long long) windex); */
1301
1302             if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
1303                 pa_log_warn("Failed to push data into queue");
1304                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
1305                 pa_memblockq_seek(s->memblockq, (int64_t) chunk->length, PA_SEEK_RELATIVE);
1306             }
1307
1308             handle_seek(s, windex);
1309
1310 /*             pa_log("sink input post2: %lu", (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1311
1312             return 0;
1313         }
1314
1315         case SINK_INPUT_MESSAGE_DRAIN:
1316         case SINK_INPUT_MESSAGE_FLUSH:
1317         case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1318         case SINK_INPUT_MESSAGE_TRIGGER: {
1319
1320             int64_t windex;
1321             pa_sink_input *isync;
1322             void (*func)(pa_memblockq *bq);
1323
1324             switch  (code) {
1325                 case SINK_INPUT_MESSAGE_FLUSH:
1326                     func = pa_memblockq_flush_write;
1327                     break;
1328
1329                 case SINK_INPUT_MESSAGE_PREBUF_FORCE:
1330                     func = pa_memblockq_prebuf_force;
1331                     break;
1332
1333                 case SINK_INPUT_MESSAGE_DRAIN:
1334                 case SINK_INPUT_MESSAGE_TRIGGER:
1335                     func = pa_memblockq_prebuf_disable;
1336                     break;
1337
1338                 default:
1339                     pa_assert_not_reached();
1340             }
1341
1342             windex = pa_memblockq_get_write_index(s->memblockq);
1343             func(s->memblockq);
1344             handle_seek(s, windex);
1345
1346             /* Do the same for all other members in the sync group */
1347             for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
1348                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1349                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1350                 func(ssync->memblockq);
1351                 handle_seek(ssync, windex);
1352             }
1353
1354             for (isync = i->sync_next; isync; isync = isync->sync_next) {
1355                 playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
1356                 windex = pa_memblockq_get_write_index(ssync->memblockq);
1357                 func(ssync->memblockq);
1358                 handle_seek(ssync, windex);
1359             }
1360
1361             if (code == SINK_INPUT_MESSAGE_DRAIN) {
1362                 if (!pa_memblockq_is_readable(s->memblockq))
1363                     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
1364                 else {
1365                     s->drain_tag = PA_PTR_TO_UINT(userdata);
1366                     s->drain_request = TRUE;
1367                 }
1368             }
1369
1370             return 0;
1371         }
1372
1373         case SINK_INPUT_MESSAGE_UPDATE_LATENCY:
1374
1375             s->read_index = pa_memblockq_get_read_index(s->memblockq);
1376             s->write_index = pa_memblockq_get_write_index(s->memblockq);
1377             s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
1378             return 0;
1379
1380         case PA_SINK_INPUT_MESSAGE_SET_STATE: {
1381             int64_t windex;
1382
1383             windex = pa_memblockq_get_write_index(s->memblockq);
1384
1385             pa_memblockq_prebuf_force(s->memblockq);
1386
1387             handle_seek(s, windex);
1388
1389             /* Fall through to the default handler */
1390             break;
1391         }
1392
1393         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1394             pa_usec_t *r = userdata;
1395
1396             *r = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &i->sample_spec);
1397
1398             /* Fall through, the default handler will add in the extra
1399              * latency added by the resampler */
1400             break;
1401         }
1402     }
1403
1404     return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1405 }
1406
1407 /* Called from thread context */
1408 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
1409     playback_stream *s;
1410
1411     pa_sink_input_assert_ref(i);
1412     s = PLAYBACK_STREAM(i->userdata);
1413     playback_stream_assert_ref(s);
1414     pa_assert(chunk);
1415
1416 /*     pa_log("%s, pop(): %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1417
1418     if (pa_memblockq_is_readable(s->memblockq))
1419         s->is_underrun = FALSE;
1420     else {
1421 /*         pa_log("%s, UNDERRUN: %lu", pa_proplist_gets(i->proplist, PA_PROP_MEDIA_NAME), (unsigned long) pa_memblockq_get_length(s->memblockq)); */
1422
1423         if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
1424             s->drain_request = FALSE;
1425             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
1426         } else if (!s->is_underrun)
1427             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
1428
1429         s->is_underrun = TRUE;
1430
1431         playback_stream_request_bytes(s);
1432     }
1433
1434     /* This call will not fail with prebuf=0, hence we check for
1435        underrun explicitly above */
1436     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
1437         return -1;
1438
1439     chunk->length = PA_MIN(nbytes, chunk->length);
1440
1441     if (i->thread_info.underrun_for > 0)
1442         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_STARTED, NULL, 0, NULL, NULL);
1443
1444     pa_memblockq_drop(s->memblockq, chunk->length);
1445     playback_stream_request_bytes(s);
1446
1447     return 0;
1448 }
1449
1450 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1451     playback_stream *s;
1452
1453     pa_sink_input_assert_ref(i);
1454     s = PLAYBACK_STREAM(i->userdata);
1455     playback_stream_assert_ref(s);
1456
1457     /* If we are in an underrun, then we don't rewind */
1458     if (i->thread_info.underrun_for > 0)
1459         return;
1460
1461     pa_memblockq_rewind(s->memblockq, nbytes);
1462 }
1463
1464 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1465     playback_stream *s;
1466
1467     pa_sink_input_assert_ref(i);
1468     s = PLAYBACK_STREAM(i->userdata);
1469     playback_stream_assert_ref(s);
1470
1471     pa_memblockq_set_maxrewind(s->memblockq, nbytes);
1472 }
1473
1474 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1475     playback_stream *s;
1476     size_t tlength;
1477
1478     pa_sink_input_assert_ref(i);
1479     s = PLAYBACK_STREAM(i->userdata);
1480     playback_stream_assert_ref(s);
1481
1482     tlength = nbytes+2*pa_memblockq_get_minreq(s->memblockq);
1483
1484     if (pa_memblockq_get_tlength(s->memblockq) < tlength)
1485         pa_memblockq_set_tlength(s->memblockq, tlength);
1486 }
1487
1488 /* Called from main context */
1489 static void sink_input_kill_cb(pa_sink_input *i) {
1490     playback_stream *s;
1491
1492     pa_sink_input_assert_ref(i);
1493     s = PLAYBACK_STREAM(i->userdata);
1494     playback_stream_assert_ref(s);
1495
1496     playback_stream_send_killed(s);
1497     playback_stream_unlink(s);
1498 }
1499
1500 /* Called from main context */
1501 static void sink_input_send_event_cb(pa_sink_input *i, const char *event, pa_proplist *pl) {
1502     playback_stream *s;
1503     pa_tagstruct *t;
1504
1505     pa_sink_input_assert_ref(i);
1506     s = PLAYBACK_STREAM(i->userdata);
1507     playback_stream_assert_ref(s);
1508
1509     if (s->connection->version < 15)
1510       return;
1511
1512     t = pa_tagstruct_new(NULL, 0);
1513     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_EVENT);
1514     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1515     pa_tagstruct_putu32(t, s->index);
1516     pa_tagstruct_puts(t, event);
1517     pa_tagstruct_put_proplist(t, pl);
1518     pa_pstream_send_tagstruct(s->connection->pstream, t);
1519 }
1520
1521 /* Called from main context */
1522 static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend) {
1523     playback_stream *s;
1524     pa_tagstruct *t;
1525
1526     pa_sink_input_assert_ref(i);
1527     s = PLAYBACK_STREAM(i->userdata);
1528     playback_stream_assert_ref(s);
1529
1530     if (s->connection->version < 12)
1531       return;
1532
1533     t = pa_tagstruct_new(NULL, 0);
1534     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_SUSPENDED);
1535     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1536     pa_tagstruct_putu32(t, s->index);
1537     pa_tagstruct_put_boolean(t, suspend);
1538     pa_pstream_send_tagstruct(s->connection->pstream, t);
1539 }
1540
1541 /* Called from main context */
1542 static void sink_input_moved_cb(pa_sink_input *i) {
1543     playback_stream *s;
1544     pa_tagstruct *t;
1545     uint32_t maxlength, tlength, prebuf, minreq;
1546
1547     pa_sink_input_assert_ref(i);
1548     s = PLAYBACK_STREAM(i->userdata);
1549     playback_stream_assert_ref(s);
1550
1551     maxlength = (uint32_t) pa_memblockq_get_maxlength(s->memblockq);
1552     tlength = (uint32_t) pa_memblockq_get_tlength(s->memblockq);
1553     prebuf = (uint32_t) pa_memblockq_get_prebuf(s->memblockq);
1554     minreq = (uint32_t) pa_memblockq_get_minreq(s->memblockq);
1555
1556     fix_playback_buffer_attr_pre(s, TRUE, FALSE, &maxlength, &tlength, &prebuf, &minreq);
1557     pa_memblockq_set_maxlength(s->memblockq, maxlength);
1558     pa_memblockq_set_tlength(s->memblockq, tlength);
1559     pa_memblockq_set_prebuf(s->memblockq, prebuf);
1560     pa_memblockq_set_minreq(s->memblockq, minreq);
1561     fix_playback_buffer_attr_post(s, &maxlength, &tlength, &prebuf, &minreq);
1562
1563     if (s->connection->version < 12)
1564       return;
1565
1566     t = pa_tagstruct_new(NULL, 0);
1567     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_MOVED);
1568     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1569     pa_tagstruct_putu32(t, s->index);
1570     pa_tagstruct_putu32(t, i->sink->index);
1571     pa_tagstruct_puts(t, i->sink->name);
1572     pa_tagstruct_put_boolean(t, pa_sink_get_state(i->sink) == PA_SINK_SUSPENDED);
1573
1574     if (s->connection->version >= 13) {
1575         pa_tagstruct_putu32(t, maxlength);
1576         pa_tagstruct_putu32(t, tlength);
1577         pa_tagstruct_putu32(t, prebuf);
1578         pa_tagstruct_putu32(t, minreq);
1579         pa_tagstruct_put_usec(t, s->sink_latency);
1580     }
1581
1582     pa_pstream_send_tagstruct(s->connection->pstream, t);
1583 }
1584
1585 /*** source_output callbacks ***/
1586
1587 /* Called from thread context */
1588 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1589     record_stream *s;
1590
1591     pa_source_output_assert_ref(o);
1592     s = RECORD_STREAM(o->userdata);
1593     record_stream_assert_ref(s);
1594     pa_assert(chunk);
1595
1596     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), RECORD_STREAM_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1597 }
1598
1599 static void source_output_kill_cb(pa_source_output *o) {
1600     record_stream *s;
1601
1602     pa_source_output_assert_ref(o);
1603     s = RECORD_STREAM(o->userdata);
1604     record_stream_assert_ref(s);
1605
1606     record_stream_send_killed(s);
1607     record_stream_unlink(s);
1608 }
1609
1610 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1611     record_stream *s;
1612
1613     pa_source_output_assert_ref(o);
1614     s = RECORD_STREAM(o->userdata);
1615     record_stream_assert_ref(s);
1616
1617     /*pa_log("get_latency: %u", pa_memblockq_get_length(s->memblockq));*/
1618
1619     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
1620 }
1621
1622 /* Called from main context */
1623 static void source_output_send_event_cb(pa_source_output *o, const char *event, pa_proplist *pl) {
1624     record_stream *s;
1625     pa_tagstruct *t;
1626
1627     pa_source_output_assert_ref(o);
1628     s = RECORD_STREAM(o->userdata);
1629     record_stream_assert_ref(s);
1630
1631     if (s->connection->version < 15)
1632       return;
1633
1634     t = pa_tagstruct_new(NULL, 0);
1635     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_EVENT);
1636     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1637     pa_tagstruct_putu32(t, s->index);
1638     pa_tagstruct_puts(t, event);
1639     pa_tagstruct_put_proplist(t, pl);
1640     pa_pstream_send_tagstruct(s->connection->pstream, t);
1641 }
1642
1643 /* Called from main context */
1644 static void source_output_suspend_cb(pa_source_output *o, pa_bool_t suspend) {
1645     record_stream *s;
1646     pa_tagstruct *t;
1647
1648     pa_source_output_assert_ref(o);
1649     s = RECORD_STREAM(o->userdata);
1650     record_stream_assert_ref(s);
1651
1652     if (s->connection->version < 12)
1653       return;
1654
1655     t = pa_tagstruct_new(NULL, 0);
1656     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_SUSPENDED);
1657     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1658     pa_tagstruct_putu32(t, s->index);
1659     pa_tagstruct_put_boolean(t, suspend);
1660     pa_pstream_send_tagstruct(s->connection->pstream, t);
1661 }
1662
1663 /* Called from main context */
1664 static void source_output_moved_cb(pa_source_output *o) {
1665     record_stream *s;
1666     pa_tagstruct *t;
1667     uint32_t maxlength, fragsize;
1668
1669     pa_source_output_assert_ref(o);
1670     s = RECORD_STREAM(o->userdata);
1671     record_stream_assert_ref(s);
1672
1673     fragsize = (uint32_t) s->fragment_size;
1674     maxlength = (uint32_t) pa_memblockq_get_length(s->memblockq);
1675
1676     fix_record_buffer_attr_pre(s, TRUE, FALSE, &maxlength, &fragsize);
1677     pa_memblockq_set_maxlength(s->memblockq, maxlength);
1678     fix_record_buffer_attr_post(s, &maxlength, &fragsize);
1679
1680     if (s->connection->version < 12)
1681       return;
1682
1683     t = pa_tagstruct_new(NULL, 0);
1684     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_MOVED);
1685     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
1686     pa_tagstruct_putu32(t, s->index);
1687     pa_tagstruct_putu32(t, o->source->index);
1688     pa_tagstruct_puts(t, o->source->name);
1689     pa_tagstruct_put_boolean(t, pa_source_get_state(o->source) == PA_SOURCE_SUSPENDED);
1690
1691     if (s->connection->version >= 13) {
1692         pa_tagstruct_putu32(t, maxlength);
1693         pa_tagstruct_putu32(t, fragsize);
1694         pa_tagstruct_put_usec(t, s->source_latency);
1695     }
1696
1697     pa_pstream_send_tagstruct(s->connection->pstream, t);
1698 }
1699
1700 /*** pdispatch callbacks ***/
1701
1702 static void protocol_error(pa_native_connection *c) {
1703     pa_log("protocol error, kicking client");
1704     native_connection_unlink(c);
1705 }
1706
1707 #define CHECK_VALIDITY(pstream, expression, tag, error) do { \
1708 if (!(expression)) { \
1709     pa_pstream_send_error((pstream), (tag), (error)); \
1710     return; \
1711 } \
1712 } while(0);
1713
1714 static pa_tagstruct *reply_new(uint32_t tag) {
1715     pa_tagstruct *reply;
1716
1717     reply = pa_tagstruct_new(NULL, 0);
1718     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1719     pa_tagstruct_putu32(reply, tag);
1720     return reply;
1721 }
1722
1723 static void command_create_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1724     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1725     playback_stream *s;
1726     uint32_t maxlength, tlength, prebuf, minreq, sink_index, syncid, missing;
1727     const char *name = NULL, *sink_name;
1728     pa_sample_spec ss;
1729     pa_channel_map map;
1730     pa_tagstruct *reply;
1731     pa_sink *sink = NULL;
1732     pa_cvolume volume;
1733     pa_bool_t
1734         corked = FALSE,
1735         no_remap = FALSE,
1736         no_remix = FALSE,
1737         fix_format = FALSE,
1738         fix_rate = FALSE,
1739         fix_channels = FALSE,
1740         no_move = FALSE,
1741         variable_rate = FALSE,
1742         muted = FALSE,
1743         adjust_latency = FALSE,
1744         early_requests = FALSE,
1745         dont_inhibit_auto_suspend = FALSE,
1746         muted_set = FALSE,
1747         fail_on_suspend = FALSE;
1748     pa_sink_input_flags_t flags = 0;
1749     pa_proplist *p;
1750     pa_bool_t volume_set = TRUE;
1751     int ret = PA_ERR_INVALID;
1752
1753     pa_native_connection_assert_ref(c);
1754     pa_assert(t);
1755
1756     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
1757         pa_tagstruct_get(
1758                 t,
1759                 PA_TAG_SAMPLE_SPEC, &ss,
1760                 PA_TAG_CHANNEL_MAP, &map,
1761                 PA_TAG_U32, &sink_index,
1762                 PA_TAG_STRING, &sink_name,
1763                 PA_TAG_U32, &maxlength,
1764                 PA_TAG_BOOLEAN, &corked,
1765                 PA_TAG_U32, &tlength,
1766                 PA_TAG_U32, &prebuf,
1767                 PA_TAG_U32, &minreq,
1768                 PA_TAG_U32, &syncid,
1769                 PA_TAG_CVOLUME, &volume,
1770                 PA_TAG_INVALID) < 0) {
1771
1772         protocol_error(c);
1773         return;
1774     }
1775
1776     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1777     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
1778     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
1779     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
1780     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
1781     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
1782     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
1783     CHECK_VALIDITY(c->pstream, map.channels == ss.channels && volume.channels == ss.channels, tag, PA_ERR_INVALID);
1784
1785     p = pa_proplist_new();
1786
1787     if (name)
1788         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
1789
1790     if (c->version >= 12)  {
1791         /* Since 0.9.8 the user can ask for a couple of additional flags */
1792
1793         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
1794             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
1795             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
1796             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
1797             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
1798             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
1799             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
1800
1801             protocol_error(c);
1802             pa_proplist_free(p);
1803             return;
1804         }
1805     }
1806
1807     if (c->version >= 13) {
1808
1809         if (pa_tagstruct_get_boolean(t, &muted) < 0 ||
1810             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
1811             pa_tagstruct_get_proplist(t, p) < 0) {
1812             protocol_error(c);
1813             pa_proplist_free(p);
1814             return;
1815         }
1816     }
1817
1818     if (c->version >= 14) {
1819
1820         if (pa_tagstruct_get_boolean(t, &volume_set) < 0 ||
1821             pa_tagstruct_get_boolean(t, &early_requests) < 0) {
1822             protocol_error(c);
1823             pa_proplist_free(p);
1824             return;
1825         }
1826     }
1827
1828     if (c->version >= 15) {
1829
1830         if (pa_tagstruct_get_boolean(t, &muted_set) < 0 ||
1831             pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
1832             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
1833             protocol_error(c);
1834             pa_proplist_free(p);
1835             return;
1836         }
1837     }
1838
1839     if (!pa_tagstruct_eof(t)) {
1840         protocol_error(c);
1841         pa_proplist_free(p);
1842         return;
1843     }
1844
1845     if (sink_index != PA_INVALID_INDEX) {
1846
1847         if (!(sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index))) {
1848             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1849             pa_proplist_free(p);
1850             return;
1851         }
1852
1853     } else if (sink_name) {
1854
1855         if (!(sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK))) {
1856             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
1857             pa_proplist_free(p);
1858             return;
1859         }
1860     }
1861
1862     flags =
1863         (corked ?  PA_SINK_INPUT_START_CORKED : 0) |
1864         (no_remap ?  PA_SINK_INPUT_NO_REMAP : 0) |
1865         (no_remix ?  PA_SINK_INPUT_NO_REMIX : 0) |
1866         (fix_format ?  PA_SINK_INPUT_FIX_FORMAT : 0) |
1867         (fix_rate ?  PA_SINK_INPUT_FIX_RATE : 0) |
1868         (fix_channels ?  PA_SINK_INPUT_FIX_CHANNELS : 0) |
1869         (no_move ?  PA_SINK_INPUT_DONT_MOVE : 0) |
1870         (variable_rate ?  PA_SINK_INPUT_VARIABLE_RATE : 0) |
1871         (dont_inhibit_auto_suspend ? PA_SINK_INPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
1872         (fail_on_suspend ? PA_SINK_INPUT_FAIL_ON_SUSPEND : 0);
1873
1874     /* Only since protocol version 15 there's a seperate muted_set
1875      * flag. For older versions we synthesize it here */
1876     muted_set = muted_set || muted;
1877
1878     s = playback_stream_new(c, sink, &ss, &map, &maxlength, &tlength, &prebuf, &minreq, volume_set ? &volume : NULL, muted, muted_set, syncid, &missing, flags, p, adjust_latency, early_requests, &ret);
1879     pa_proplist_free(p);
1880
1881     CHECK_VALIDITY(c->pstream, s, tag, ret);
1882
1883     reply = reply_new(tag);
1884     pa_tagstruct_putu32(reply, s->index);
1885     pa_assert(s->sink_input);
1886     pa_tagstruct_putu32(reply, s->sink_input->index);
1887     pa_tagstruct_putu32(reply, missing);
1888
1889 /*     pa_log("initial request is %u", missing); */
1890
1891     if (c->version >= 9) {
1892         /* Since 0.9.0 we support sending the buffer metrics back to the client */
1893
1894         pa_tagstruct_putu32(reply, (uint32_t) maxlength);
1895         pa_tagstruct_putu32(reply, (uint32_t) tlength);
1896         pa_tagstruct_putu32(reply, (uint32_t) prebuf);
1897         pa_tagstruct_putu32(reply, (uint32_t) minreq);
1898     }
1899
1900     if (c->version >= 12) {
1901         /* Since 0.9.8 we support sending the chosen sample
1902          * spec/channel map/device/suspend status back to the
1903          * client */
1904
1905         pa_tagstruct_put_sample_spec(reply, &ss);
1906         pa_tagstruct_put_channel_map(reply, &map);
1907
1908         pa_tagstruct_putu32(reply, s->sink_input->sink->index);
1909         pa_tagstruct_puts(reply, s->sink_input->sink->name);
1910
1911         pa_tagstruct_put_boolean(reply, pa_sink_get_state(s->sink_input->sink) == PA_SINK_SUSPENDED);
1912     }
1913
1914     if (c->version >= 13)
1915         pa_tagstruct_put_usec(reply, s->sink_latency);
1916
1917     pa_pstream_send_tagstruct(c->pstream, reply);
1918 }
1919
1920 static void command_delete_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1921     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1922     uint32_t channel;
1923
1924     pa_native_connection_assert_ref(c);
1925     pa_assert(t);
1926
1927     if (pa_tagstruct_getu32(t, &channel) < 0 ||
1928         !pa_tagstruct_eof(t)) {
1929         protocol_error(c);
1930         return;
1931     }
1932
1933     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
1934
1935     switch (command) {
1936
1937         case PA_COMMAND_DELETE_PLAYBACK_STREAM: {
1938             playback_stream *s;
1939             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !playback_stream_isinstance(s)) {
1940                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
1941                 return;
1942             }
1943
1944             playback_stream_unlink(s);
1945             break;
1946         }
1947
1948         case PA_COMMAND_DELETE_RECORD_STREAM: {
1949             record_stream *s;
1950             if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
1951                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
1952                 return;
1953             }
1954
1955             record_stream_unlink(s);
1956             break;
1957         }
1958
1959         case PA_COMMAND_DELETE_UPLOAD_STREAM: {
1960             upload_stream *s;
1961
1962             if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || !upload_stream_isinstance(s)) {
1963                 pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
1964                 return;
1965             }
1966
1967             upload_stream_unlink(s);
1968             break;
1969         }
1970
1971         default:
1972             pa_assert_not_reached();
1973     }
1974
1975     pa_pstream_send_simple_ack(c->pstream, tag);
1976 }
1977
1978 static void command_create_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1979     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
1980     record_stream *s;
1981     uint32_t maxlength, fragment_size;
1982     uint32_t source_index;
1983     const char *name = NULL, *source_name;
1984     pa_sample_spec ss;
1985     pa_channel_map map;
1986     pa_tagstruct *reply;
1987     pa_source *source = NULL;
1988     pa_bool_t
1989         corked = FALSE,
1990         no_remap = FALSE,
1991         no_remix = FALSE,
1992         fix_format = FALSE,
1993         fix_rate = FALSE,
1994         fix_channels = FALSE,
1995         no_move = FALSE,
1996         variable_rate = FALSE,
1997         adjust_latency = FALSE,
1998         peak_detect = FALSE,
1999         early_requests = FALSE,
2000         dont_inhibit_auto_suspend = FALSE,
2001         fail_on_suspend = FALSE;
2002     pa_source_output_flags_t flags = 0;
2003     pa_proplist *p;
2004     uint32_t direct_on_input_idx = PA_INVALID_INDEX;
2005     pa_sink_input *direct_on_input = NULL;
2006     int ret = PA_ERR_INVALID;
2007
2008     pa_native_connection_assert_ref(c);
2009     pa_assert(t);
2010
2011     if ((c->version < 13 && (pa_tagstruct_gets(t, &name) < 0 || !name)) ||
2012         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2013         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2014         pa_tagstruct_getu32(t, &source_index) < 0 ||
2015         pa_tagstruct_gets(t, &source_name) < 0 ||
2016         pa_tagstruct_getu32(t, &maxlength) < 0 ||
2017         pa_tagstruct_get_boolean(t, &corked) < 0 ||
2018         pa_tagstruct_getu32(t, &fragment_size) < 0) {
2019         protocol_error(c);
2020         return;
2021     }
2022
2023     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2024     CHECK_VALIDITY(c->pstream, !source_name || pa_namereg_is_valid_name(source_name), tag, PA_ERR_INVALID);
2025     CHECK_VALIDITY(c->pstream, source_index == PA_INVALID_INDEX || !source_name, tag, PA_ERR_INVALID);
2026     CHECK_VALIDITY(c->pstream, !source_name || source_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2027     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2028     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2029     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2030
2031     p = pa_proplist_new();
2032
2033     if (name)
2034         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2035
2036     if (c->version >= 12)  {
2037         /* Since 0.9.8 the user can ask for a couple of additional flags */
2038
2039         if (pa_tagstruct_get_boolean(t, &no_remap) < 0 ||
2040             pa_tagstruct_get_boolean(t, &no_remix) < 0 ||
2041             pa_tagstruct_get_boolean(t, &fix_format) < 0 ||
2042             pa_tagstruct_get_boolean(t, &fix_rate) < 0 ||
2043             pa_tagstruct_get_boolean(t, &fix_channels) < 0 ||
2044             pa_tagstruct_get_boolean(t, &no_move) < 0 ||
2045             pa_tagstruct_get_boolean(t, &variable_rate) < 0) {
2046
2047             protocol_error(c);
2048             pa_proplist_free(p);
2049             return;
2050         }
2051     }
2052
2053     if (c->version >= 13) {
2054
2055         if (pa_tagstruct_get_boolean(t, &peak_detect) < 0 ||
2056             pa_tagstruct_get_boolean(t, &adjust_latency) < 0 ||
2057             pa_tagstruct_get_proplist(t, p) < 0 ||
2058             pa_tagstruct_getu32(t, &direct_on_input_idx) < 0) {
2059             protocol_error(c);
2060             pa_proplist_free(p);
2061             return;
2062         }
2063     }
2064
2065     if (c->version >= 14) {
2066
2067         if (pa_tagstruct_get_boolean(t, &early_requests) < 0) {
2068             protocol_error(c);
2069             pa_proplist_free(p);
2070             return;
2071         }
2072     }
2073
2074     if (c->version >= 15) {
2075
2076         if (pa_tagstruct_get_boolean(t, &dont_inhibit_auto_suspend) < 0 ||
2077             pa_tagstruct_get_boolean(t, &fail_on_suspend) < 0) {
2078             protocol_error(c);
2079             pa_proplist_free(p);
2080             return;
2081         }
2082     }
2083
2084     if (!pa_tagstruct_eof(t)) {
2085         protocol_error(c);
2086         pa_proplist_free(p);
2087         return;
2088     }
2089
2090     if (source_index != PA_INVALID_INDEX) {
2091
2092         if (!(source = pa_idxset_get_by_index(c->protocol->core->sources, source_index))) {
2093             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2094             pa_proplist_free(p);
2095             return;
2096         }
2097
2098     } else if (source_name) {
2099
2100         if (!(source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE))) {
2101             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2102             pa_proplist_free(p);
2103             return;
2104         }
2105     }
2106
2107     if (direct_on_input_idx != PA_INVALID_INDEX) {
2108
2109         if (!(direct_on_input = pa_idxset_get_by_index(c->protocol->core->sink_inputs, direct_on_input_idx))) {
2110             pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2111             pa_proplist_free(p);
2112             return;
2113         }
2114     }
2115
2116     flags =
2117         (corked ?  PA_SOURCE_OUTPUT_START_CORKED : 0) |
2118         (no_remap ?  PA_SOURCE_OUTPUT_NO_REMAP : 0) |
2119         (no_remix ?  PA_SOURCE_OUTPUT_NO_REMIX : 0) |
2120         (fix_format ?  PA_SOURCE_OUTPUT_FIX_FORMAT : 0) |
2121         (fix_rate ?  PA_SOURCE_OUTPUT_FIX_RATE : 0) |
2122         (fix_channels ?  PA_SOURCE_OUTPUT_FIX_CHANNELS : 0) |
2123         (no_move ?  PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
2124         (variable_rate ?  PA_SOURCE_OUTPUT_VARIABLE_RATE : 0) |
2125         (dont_inhibit_auto_suspend ? PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND : 0) |
2126         (fail_on_suspend ? PA_SOURCE_OUTPUT_FAIL_ON_SUSPEND : 0);
2127
2128     s = record_stream_new(c, source, &ss, &map, peak_detect, &maxlength, &fragment_size, flags, p, adjust_latency, direct_on_input, early_requests, &ret);
2129     pa_proplist_free(p);
2130
2131     CHECK_VALIDITY(c->pstream, s, tag, ret);
2132
2133     reply = reply_new(tag);
2134     pa_tagstruct_putu32(reply, s->index);
2135     pa_assert(s->source_output);
2136     pa_tagstruct_putu32(reply, s->source_output->index);
2137
2138     if (c->version >= 9) {
2139         /* Since 0.9 we support sending the buffer metrics back to the client */
2140
2141         pa_tagstruct_putu32(reply, (uint32_t) maxlength);
2142         pa_tagstruct_putu32(reply, (uint32_t) fragment_size);
2143     }
2144
2145     if (c->version >= 12) {
2146         /* Since 0.9.8 we support sending the chosen sample
2147          * spec/channel map/device/suspend status back to the
2148          * client */
2149
2150         pa_tagstruct_put_sample_spec(reply, &ss);
2151         pa_tagstruct_put_channel_map(reply, &map);
2152
2153         pa_tagstruct_putu32(reply, s->source_output->source->index);
2154         pa_tagstruct_puts(reply, s->source_output->source->name);
2155
2156         pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_SUSPENDED);
2157     }
2158
2159     if (c->version >= 13)
2160         pa_tagstruct_put_usec(reply, s->source_latency);
2161
2162     pa_pstream_send_tagstruct(c->pstream, reply);
2163 }
2164
2165 static void command_exit(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2166     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2167     int ret;
2168
2169     pa_native_connection_assert_ref(c);
2170     pa_assert(t);
2171
2172     if (!pa_tagstruct_eof(t)) {
2173         protocol_error(c);
2174         return;
2175     }
2176
2177     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2178     ret = pa_core_exit(c->protocol->core, FALSE, 0);
2179     CHECK_VALIDITY(c->pstream, ret >= 0, tag, PA_ERR_ACCESS);
2180
2181     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
2182 }
2183
2184 static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2185     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2186     const void*cookie;
2187     pa_tagstruct *reply;
2188     pa_bool_t shm_on_remote = FALSE, do_shm;
2189
2190     pa_native_connection_assert_ref(c);
2191     pa_assert(t);
2192
2193     if (pa_tagstruct_getu32(t, &c->version) < 0 ||
2194         pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
2195         !pa_tagstruct_eof(t)) {
2196         protocol_error(c);
2197         return;
2198     }
2199
2200     /* Minimum supported version */
2201     if (c->version < 8) {
2202         pa_pstream_send_error(c->pstream, tag, PA_ERR_VERSION);
2203         return;
2204     }
2205
2206     /* Starting with protocol version 13 the MSB of the version tag
2207        reflects if shm is available for this pa_native_connection or
2208        not. */
2209     if (c->version >= 13) {
2210         shm_on_remote = !!(c->version & 0x80000000U);
2211         c->version &= 0x7FFFFFFFU;
2212     }
2213
2214     pa_log_debug("Protocol version: remote %u, local %u", c->version, PA_PROTOCOL_VERSION);
2215
2216     pa_proplist_setf(c->client->proplist, "native-protocol.version", "%u", c->version);
2217
2218     if (!c->authorized) {
2219         pa_bool_t success = FALSE;
2220
2221 #ifdef HAVE_CREDS
2222         const pa_creds *creds;
2223
2224         if ((creds = pa_pdispatch_creds(pd))) {
2225             if (creds->uid == getuid())
2226                 success = TRUE;
2227             else if (c->options->auth_group) {
2228                 int r;
2229                 gid_t gid;
2230
2231                 if ((gid = pa_get_gid_of_group(c->options->auth_group)) == (gid_t) -1)
2232                     pa_log_warn("Failed to get GID of group '%s'", c->options->auth_group);
2233                 else if (gid == creds->gid)
2234                     success = TRUE;
2235
2236                 if (!success) {
2237                     if ((r = pa_uid_in_group(creds->uid, c->options->auth_group)) < 0)
2238                         pa_log_warn("Failed to check group membership.");
2239                     else if (r > 0)
2240                         success = TRUE;
2241                 }
2242             }
2243
2244             pa_log_info("Got credentials: uid=%lu gid=%lu success=%i",
2245                         (unsigned long) creds->uid,
2246                         (unsigned long) creds->gid,
2247                         (int) success);
2248         }
2249 #endif
2250
2251         if (!success && c->options->auth_cookie) {
2252             const uint8_t *ac;
2253
2254             if ((ac = pa_auth_cookie_read(c->options->auth_cookie, PA_NATIVE_COOKIE_LENGTH)))
2255                 if (memcmp(ac, cookie, PA_NATIVE_COOKIE_LENGTH) == 0)
2256                     success = TRUE;
2257         }
2258
2259         if (!success) {
2260             pa_log_warn("Denied access to client with invalid authorization data.");
2261             pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
2262             return;
2263         }
2264
2265         c->authorized = TRUE;
2266         if (c->auth_timeout_event) {
2267             c->protocol->core->mainloop->time_free(c->auth_timeout_event);
2268             c->auth_timeout_event = NULL;
2269         }
2270     }
2271
2272     /* Enable shared memory support if possible */
2273     do_shm =
2274         pa_mempool_is_shared(c->protocol->core->mempool) &&
2275         c->is_local;
2276
2277     pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));
2278
2279     if (do_shm)
2280         if (c->version < 10 || (c->version >= 13 && !shm_on_remote))
2281             do_shm = FALSE;
2282
2283 #ifdef HAVE_CREDS
2284     if (do_shm) {
2285         /* Only enable SHM if both sides are owned by the same
2286          * user. This is a security measure because otherwise data
2287          * private to the user might leak. */
2288
2289         const pa_creds *creds;
2290         if (!(creds = pa_pdispatch_creds(pd)) || getuid() != creds->uid)
2291             do_shm = FALSE;
2292     }
2293 #endif
2294
2295     pa_log_debug("Negotiated SHM: %s", pa_yes_no(do_shm));
2296     pa_pstream_enable_shm(c->pstream, do_shm);
2297
2298     reply = reply_new(tag);
2299     pa_tagstruct_putu32(reply, PA_PROTOCOL_VERSION | (do_shm ? 0x80000000 : 0));
2300
2301 #ifdef HAVE_CREDS
2302 {
2303     /* SHM support is only enabled after both sides made sure they are the same user. */
2304
2305     pa_creds ucred;
2306
2307     ucred.uid = getuid();
2308     ucred.gid = getgid();
2309
2310     pa_pstream_send_tagstruct_with_creds(c->pstream, reply, &ucred);
2311 }
2312 #else
2313     pa_pstream_send_tagstruct(c->pstream, reply);
2314 #endif
2315 }
2316
2317 static void command_set_client_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2318     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2319     const char *name = NULL;
2320     pa_proplist *p;
2321     pa_tagstruct *reply;
2322
2323     pa_native_connection_assert_ref(c);
2324     pa_assert(t);
2325
2326     p = pa_proplist_new();
2327
2328     if ((c->version < 13 && pa_tagstruct_gets(t, &name) < 0) ||
2329         (c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2330         !pa_tagstruct_eof(t)) {
2331
2332         protocol_error(c);
2333         pa_proplist_free(p);
2334         return;
2335     }
2336
2337     if (name)
2338         if (pa_proplist_sets(p, PA_PROP_APPLICATION_NAME, name) < 0) {
2339             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
2340             pa_proplist_free(p);
2341             return;
2342         }
2343
2344     pa_client_update_proplist(c->client, PA_UPDATE_REPLACE, p);
2345     pa_proplist_free(p);
2346
2347     reply = reply_new(tag);
2348
2349     if (c->version >= 13)
2350         pa_tagstruct_putu32(reply, c->client->index);
2351
2352     pa_pstream_send_tagstruct(c->pstream, reply);
2353 }
2354
2355 static void command_lookup(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2356     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2357     const char *name;
2358     uint32_t idx = PA_IDXSET_INVALID;
2359
2360     pa_native_connection_assert_ref(c);
2361     pa_assert(t);
2362
2363     if (pa_tagstruct_gets(t, &name) < 0 ||
2364         !pa_tagstruct_eof(t)) {
2365         protocol_error(c);
2366         return;
2367     }
2368
2369     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2370     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2371
2372     if (command == PA_COMMAND_LOOKUP_SINK) {
2373         pa_sink *sink;
2374         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
2375             idx = sink->index;
2376     } else {
2377         pa_source *source;
2378         pa_assert(command == PA_COMMAND_LOOKUP_SOURCE);
2379         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
2380             idx = source->index;
2381     }
2382
2383     if (idx == PA_IDXSET_INVALID)
2384         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2385     else {
2386         pa_tagstruct *reply;
2387         reply = reply_new(tag);
2388         pa_tagstruct_putu32(reply, idx);
2389         pa_pstream_send_tagstruct(c->pstream, reply);
2390     }
2391 }
2392
2393 static void command_drain_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2394     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2395     uint32_t idx;
2396     playback_stream *s;
2397
2398     pa_native_connection_assert_ref(c);
2399     pa_assert(t);
2400
2401     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2402         !pa_tagstruct_eof(t)) {
2403         protocol_error(c);
2404         return;
2405     }
2406
2407     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2408     s = pa_idxset_get_by_index(c->output_streams, idx);
2409     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2410     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2411
2412     pa_asyncmsgq_post(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_DRAIN, PA_UINT_TO_PTR(tag), 0, NULL, NULL);
2413 }
2414
2415 static void command_stat(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2416     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2417     pa_tagstruct *reply;
2418     const pa_mempool_stat *stat;
2419
2420     pa_native_connection_assert_ref(c);
2421     pa_assert(t);
2422
2423     if (!pa_tagstruct_eof(t)) {
2424         protocol_error(c);
2425         return;
2426     }
2427
2428     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2429
2430     stat = pa_mempool_get_stat(c->protocol->core->mempool);
2431
2432     reply = reply_new(tag);
2433     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_allocated));
2434     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->allocated_size));
2435     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->n_accumulated));
2436     pa_tagstruct_putu32(reply, (uint32_t) pa_atomic_load(&stat->accumulated_size));
2437     pa_tagstruct_putu32(reply, (uint32_t) pa_scache_total_size(c->protocol->core));
2438     pa_pstream_send_tagstruct(c->pstream, reply);
2439 }
2440
2441 static void command_get_playback_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2442     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2443     pa_tagstruct *reply;
2444     playback_stream *s;
2445     struct timeval tv, now;
2446     uint32_t idx;
2447     pa_usec_t latency;
2448
2449     pa_native_connection_assert_ref(c);
2450     pa_assert(t);
2451
2452     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2453         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2454         !pa_tagstruct_eof(t)) {
2455         protocol_error(c);
2456         return;
2457     }
2458
2459     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2460     s = pa_idxset_get_by_index(c->output_streams, idx);
2461     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2462     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2463     CHECK_VALIDITY(c->pstream, pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_UPDATE_LATENCY, s, 0, NULL) == 0, tag, PA_ERR_NOENTITY)
2464
2465     reply = reply_new(tag);
2466
2467     latency = pa_sink_get_latency(s->sink_input->sink);
2468     latency += pa_bytes_to_usec(s->render_memblockq_length, &s->sink_input->sample_spec);
2469
2470     pa_tagstruct_put_usec(reply, latency);
2471
2472     pa_tagstruct_put_usec(reply, 0);
2473     pa_tagstruct_put_boolean(reply, s->sink_input->thread_info.playing_for > 0);
2474     pa_tagstruct_put_timeval(reply, &tv);
2475     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2476     pa_tagstruct_puts64(reply, s->write_index);
2477     pa_tagstruct_puts64(reply, s->read_index);
2478
2479     if (c->version >= 13) {
2480         pa_tagstruct_putu64(reply, s->sink_input->thread_info.underrun_for);
2481         pa_tagstruct_putu64(reply, s->sink_input->thread_info.playing_for);
2482     }
2483
2484     pa_pstream_send_tagstruct(c->pstream, reply);
2485 }
2486
2487 static void command_get_record_latency(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2488     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2489     pa_tagstruct *reply;
2490     record_stream *s;
2491     struct timeval tv, now;
2492     uint32_t idx;
2493
2494     pa_native_connection_assert_ref(c);
2495     pa_assert(t);
2496
2497     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2498         pa_tagstruct_get_timeval(t, &tv) < 0 ||
2499         !pa_tagstruct_eof(t)) {
2500         protocol_error(c);
2501         return;
2502     }
2503
2504     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2505     s = pa_idxset_get_by_index(c->record_streams, idx);
2506     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2507
2508     reply = reply_new(tag);
2509     pa_tagstruct_put_usec(reply, s->source_output->source->monitor_of ? pa_sink_get_latency(s->source_output->source->monitor_of) : 0);
2510     pa_tagstruct_put_usec(reply, pa_source_get_latency(s->source_output->source));
2511     pa_tagstruct_put_boolean(reply, pa_source_get_state(s->source_output->source) == PA_SOURCE_RUNNING);
2512     pa_tagstruct_put_timeval(reply, &tv);
2513     pa_tagstruct_put_timeval(reply, pa_gettimeofday(&now));
2514     pa_tagstruct_puts64(reply, pa_memblockq_get_write_index(s->memblockq));
2515     pa_tagstruct_puts64(reply, pa_memblockq_get_read_index(s->memblockq));
2516     pa_pstream_send_tagstruct(c->pstream, reply);
2517 }
2518
2519 static void command_create_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2520     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2521     upload_stream *s;
2522     uint32_t length;
2523     const char *name = NULL;
2524     pa_sample_spec ss;
2525     pa_channel_map map;
2526     pa_tagstruct *reply;
2527     pa_proplist *p;
2528
2529     pa_native_connection_assert_ref(c);
2530     pa_assert(t);
2531
2532     if (pa_tagstruct_gets(t, &name) < 0 ||
2533         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
2534         pa_tagstruct_get_channel_map(t, &map) < 0 ||
2535         pa_tagstruct_getu32(t, &length) < 0) {
2536         protocol_error(c);
2537         return;
2538     }
2539
2540     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2541     CHECK_VALIDITY(c->pstream, pa_sample_spec_valid(&ss), tag, PA_ERR_INVALID);
2542     CHECK_VALIDITY(c->pstream, pa_channel_map_valid(&map), tag, PA_ERR_INVALID);
2543     CHECK_VALIDITY(c->pstream, map.channels == ss.channels, tag, PA_ERR_INVALID);
2544     CHECK_VALIDITY(c->pstream, (length % pa_frame_size(&ss)) == 0 && length > 0, tag, PA_ERR_INVALID);
2545     CHECK_VALIDITY(c->pstream, length <= PA_SCACHE_ENTRY_SIZE_MAX, tag, PA_ERR_TOOLARGE);
2546
2547     p = pa_proplist_new();
2548
2549     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2550         !pa_tagstruct_eof(t)) {
2551
2552         protocol_error(c);
2553         pa_proplist_free(p);
2554         return;
2555     }
2556
2557     if (c->version < 13)
2558         pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2559     else if (!name)
2560         if (!(name = pa_proplist_gets(p, PA_PROP_EVENT_ID)))
2561             name = pa_proplist_gets(p, PA_PROP_MEDIA_NAME);
2562
2563     if (!name || !pa_namereg_is_valid_name(name)) {
2564         pa_proplist_free(p);
2565         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
2566     }
2567
2568     s = upload_stream_new(c, &ss, &map, name, length, p);
2569     pa_proplist_free(p);
2570
2571     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
2572
2573     reply = reply_new(tag);
2574     pa_tagstruct_putu32(reply, s->index);
2575     pa_tagstruct_putu32(reply, length);
2576     pa_pstream_send_tagstruct(c->pstream, reply);
2577 }
2578
2579 static void command_finish_upload_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2580     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2581     uint32_t channel;
2582     upload_stream *s;
2583     uint32_t idx;
2584
2585     pa_native_connection_assert_ref(c);
2586     pa_assert(t);
2587
2588     if (pa_tagstruct_getu32(t, &channel) < 0 ||
2589         !pa_tagstruct_eof(t)) {
2590         protocol_error(c);
2591         return;
2592     }
2593
2594     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2595
2596     s = pa_idxset_get_by_index(c->output_streams, channel);
2597     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
2598     CHECK_VALIDITY(c->pstream, upload_stream_isinstance(s), tag, PA_ERR_NOENTITY);
2599
2600     if (pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->channel_map, &s->memchunk, s->proplist, &idx) < 0)
2601         pa_pstream_send_error(c->pstream, tag, PA_ERR_INTERNAL);
2602     else
2603         pa_pstream_send_simple_ack(c->pstream, tag);
2604
2605     upload_stream_unlink(s);
2606 }
2607
2608 static void command_play_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2609     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2610     uint32_t sink_index;
2611     pa_volume_t volume;
2612     pa_sink *sink;
2613     const char *name, *sink_name;
2614     uint32_t idx;
2615     pa_proplist *p;
2616     pa_tagstruct *reply;
2617
2618     pa_native_connection_assert_ref(c);
2619     pa_assert(t);
2620
2621     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2622
2623     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
2624         pa_tagstruct_gets(t, &sink_name) < 0 ||
2625         pa_tagstruct_getu32(t, &volume) < 0 ||
2626         pa_tagstruct_gets(t, &name) < 0) {
2627         protocol_error(c);
2628         return;
2629     }
2630
2631     CHECK_VALIDITY(c->pstream, !sink_name || pa_namereg_is_valid_name(sink_name), tag, PA_ERR_INVALID);
2632     CHECK_VALIDITY(c->pstream, sink_index == PA_INVALID_INDEX || !sink_name, tag, PA_ERR_INVALID);
2633     CHECK_VALIDITY(c->pstream, !sink_name || sink_index == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2634     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2635
2636     if (sink_index != PA_INVALID_INDEX)
2637         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
2638     else
2639         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
2640
2641     CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
2642
2643     p = pa_proplist_new();
2644
2645     if ((c->version >= 13 && pa_tagstruct_get_proplist(t, p) < 0) ||
2646         !pa_tagstruct_eof(t)) {
2647         protocol_error(c);
2648         pa_proplist_free(p);
2649         return;
2650     }
2651
2652     pa_proplist_update(p, PA_UPDATE_MERGE, c->client->proplist);
2653
2654     if (pa_scache_play_item(c->protocol->core, name, sink, volume, p, &idx) < 0) {
2655         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2656         pa_proplist_free(p);
2657         return;
2658     }
2659
2660     pa_proplist_free(p);
2661
2662     reply = reply_new(tag);
2663
2664     if (c->version >= 13)
2665         pa_tagstruct_putu32(reply, idx);
2666
2667     pa_pstream_send_tagstruct(c->pstream, reply);
2668 }
2669
2670 static void command_remove_sample(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2671     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2672     const char *name;
2673
2674     pa_native_connection_assert_ref(c);
2675     pa_assert(t);
2676
2677     if (pa_tagstruct_gets(t, &name) < 0 ||
2678         !pa_tagstruct_eof(t)) {
2679         protocol_error(c);
2680         return;
2681     }
2682
2683     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2684     CHECK_VALIDITY(c->pstream, name && pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2685
2686     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
2687         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
2688         return;
2689     }
2690
2691     pa_pstream_send_simple_ack(c->pstream, tag);
2692 }
2693
2694 static void fixup_sample_spec(pa_native_connection *c, pa_sample_spec *fixed, const pa_sample_spec *original) {
2695     pa_assert(c);
2696     pa_assert(fixed);
2697     pa_assert(original);
2698
2699     *fixed = *original;
2700
2701     if (c->version < 12) {
2702         /* Before protocol version 12 we didn't support S32 samples,
2703          * so we need to lie about this to the client */
2704
2705         if (fixed->format == PA_SAMPLE_S32LE)
2706             fixed->format = PA_SAMPLE_FLOAT32LE;
2707         if (fixed->format == PA_SAMPLE_S32BE)
2708             fixed->format = PA_SAMPLE_FLOAT32BE;
2709     }
2710
2711     if (c->version < 15) {
2712         if (fixed->format == PA_SAMPLE_S24LE || fixed->format == PA_SAMPLE_S24_32LE)
2713             fixed->format = PA_SAMPLE_FLOAT32LE;
2714         if (fixed->format == PA_SAMPLE_S24BE || fixed->format == PA_SAMPLE_S24_32BE)
2715             fixed->format = PA_SAMPLE_FLOAT32BE;
2716     }
2717 }
2718
2719 static void sink_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink *sink) {
2720     pa_sample_spec fixed_ss;
2721
2722     pa_assert(t);
2723     pa_sink_assert_ref(sink);
2724
2725     fixup_sample_spec(c, &fixed_ss, &sink->sample_spec);
2726
2727     pa_tagstruct_put(
2728         t,
2729         PA_TAG_U32, sink->index,
2730         PA_TAG_STRING, sink->name,
2731         PA_TAG_STRING, pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2732         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2733         PA_TAG_CHANNEL_MAP, &sink->channel_map,
2734         PA_TAG_U32, sink->module ? sink->module->index : PA_INVALID_INDEX,
2735         PA_TAG_CVOLUME, pa_sink_get_volume(sink, FALSE),
2736         PA_TAG_BOOLEAN, pa_sink_get_mute(sink, FALSE),
2737         PA_TAG_U32, sink->monitor_source ? sink->monitor_source->index : PA_INVALID_INDEX,
2738         PA_TAG_STRING, sink->monitor_source ? sink->monitor_source->name : NULL,
2739         PA_TAG_USEC, pa_sink_get_latency(sink),
2740         PA_TAG_STRING, sink->driver,
2741         PA_TAG_U32, sink->flags,
2742         PA_TAG_INVALID);
2743
2744     if (c->version >= 13) {
2745         pa_tagstruct_put_proplist(t, sink->proplist);
2746         pa_tagstruct_put_usec(t, pa_sink_get_requested_latency(sink));
2747     }
2748
2749     if (c->version >= 15) {
2750         pa_tagstruct_put_volume(t, sink->base_volume);
2751         if (PA_UNLIKELY(pa_sink_get_state(sink) == PA_SINK_INVALID_STATE))
2752             pa_log_error("Internal sink state is invalid.");
2753         pa_tagstruct_putu32(t, pa_sink_get_state(sink));
2754         pa_tagstruct_putu32(t, sink->n_volume_steps);
2755         pa_tagstruct_putu32(t, sink->card ? sink->card->index : PA_INVALID_INDEX);
2756     }
2757 }
2758
2759 static void source_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source *source) {
2760     pa_sample_spec fixed_ss;
2761
2762     pa_assert(t);
2763     pa_source_assert_ref(source);
2764
2765     fixup_sample_spec(c, &fixed_ss, &source->sample_spec);
2766
2767     pa_tagstruct_put(
2768         t,
2769         PA_TAG_U32, source->index,
2770         PA_TAG_STRING, source->name,
2771         PA_TAG_STRING, pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)),
2772         PA_TAG_SAMPLE_SPEC, &fixed_ss,
2773         PA_TAG_CHANNEL_MAP, &source->channel_map,
2774         PA_TAG_U32, source->module ? source->module->index : PA_INVALID_INDEX,
2775         PA_TAG_CVOLUME, pa_source_get_volume(source, FALSE),
2776         PA_TAG_BOOLEAN, pa_source_get_mute(source, FALSE),
2777         PA_TAG_U32, source->monitor_of ? source->monitor_of->index : PA_INVALID_INDEX,
2778         PA_TAG_STRING, source->monitor_of ? source->monitor_of->name : NULL,
2779         PA_TAG_USEC, pa_source_get_latency(source),
2780         PA_TAG_STRING, source->driver,
2781         PA_TAG_U32, source->flags,
2782         PA_TAG_INVALID);
2783
2784     if (c->version >= 13) {
2785         pa_tagstruct_put_proplist(t, source->proplist);
2786         pa_tagstruct_put_usec(t, pa_source_get_requested_latency(source));
2787     }
2788
2789     if (c->version >= 15) {
2790         pa_tagstruct_put_volume(t, source->base_volume);
2791         if (PA_UNLIKELY(pa_source_get_state(source) == PA_SOURCE_INVALID_STATE))
2792             pa_log_error("Internal source state is invalid.");
2793         pa_tagstruct_putu32(t, pa_source_get_state(source));
2794         pa_tagstruct_putu32(t, source->n_volume_steps);
2795         pa_tagstruct_putu32(t, source->card ? source->card->index : PA_INVALID_INDEX);
2796     }
2797 }
2798
2799 static void client_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_client *client) {
2800     pa_assert(t);
2801     pa_assert(client);
2802
2803     pa_tagstruct_putu32(t, client->index);
2804     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(client->proplist, PA_PROP_APPLICATION_NAME)));
2805     pa_tagstruct_putu32(t, client->module ? client->module->index : PA_INVALID_INDEX);
2806     pa_tagstruct_puts(t, client->driver);
2807
2808     if (c->version >= 13)
2809         pa_tagstruct_put_proplist(t, client->proplist);
2810 }
2811
2812 static void card_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_card *card) {
2813     void *state = NULL;
2814     pa_card_profile *p;
2815
2816     pa_assert(t);
2817     pa_assert(card);
2818
2819     pa_tagstruct_putu32(t, card->index);
2820     pa_tagstruct_puts(t, card->name);
2821     pa_tagstruct_putu32(t, card->module ? card->module->index : PA_INVALID_INDEX);
2822     pa_tagstruct_puts(t, card->driver);
2823
2824     pa_tagstruct_putu32(t, card->profiles ? pa_hashmap_size(card->profiles) : 0);
2825
2826     if (card->profiles) {
2827         while ((p = pa_hashmap_iterate(card->profiles, &state, NULL))) {
2828             pa_tagstruct_puts(t, p->name);
2829             pa_tagstruct_puts(t, p->description);
2830             pa_tagstruct_putu32(t, p->n_sinks);
2831             pa_tagstruct_putu32(t, p->n_sources);
2832             pa_tagstruct_putu32(t, p->priority);
2833         }
2834     }
2835
2836     pa_tagstruct_puts(t, card->active_profile ? card->active_profile->name : NULL);
2837     pa_tagstruct_put_proplist(t, card->proplist);
2838 }
2839
2840 static void module_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_module *module) {
2841     pa_assert(t);
2842     pa_assert(module);
2843
2844     pa_tagstruct_putu32(t, module->index);
2845     pa_tagstruct_puts(t, module->name);
2846     pa_tagstruct_puts(t, module->argument);
2847     pa_tagstruct_putu32(t, (uint32_t) pa_module_get_n_used(module));
2848
2849     if (c->version < 15)
2850         pa_tagstruct_put_boolean(t, FALSE); /* autoload is obsolete */
2851
2852     if (c->version >= 15)
2853         pa_tagstruct_put_proplist(t, module->proplist);
2854 }
2855
2856 static void sink_input_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_sink_input *s) {
2857     pa_sample_spec fixed_ss;
2858     pa_usec_t sink_latency;
2859
2860     pa_assert(t);
2861     pa_sink_input_assert_ref(s);
2862
2863     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2864
2865     pa_tagstruct_putu32(t, s->index);
2866     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2867     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2868     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2869     pa_tagstruct_putu32(t, s->sink->index);
2870     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2871     pa_tagstruct_put_channel_map(t, &s->channel_map);
2872     pa_tagstruct_put_cvolume(t, pa_sink_input_get_volume(s));
2873     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s, &sink_latency));
2874     pa_tagstruct_put_usec(t, sink_latency);
2875     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_sink_input_get_resample_method(s)));
2876     pa_tagstruct_puts(t, s->driver);
2877     if (c->version >= 11)
2878         pa_tagstruct_put_boolean(t, pa_sink_input_get_mute(s));
2879     if (c->version >= 13)
2880         pa_tagstruct_put_proplist(t, s->proplist);
2881 }
2882
2883 static void source_output_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_source_output *s) {
2884     pa_sample_spec fixed_ss;
2885     pa_usec_t source_latency;
2886
2887     pa_assert(t);
2888     pa_source_output_assert_ref(s);
2889
2890     fixup_sample_spec(c, &fixed_ss, &s->sample_spec);
2891
2892     pa_tagstruct_putu32(t, s->index);
2893     pa_tagstruct_puts(t, pa_strnull(pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME)));
2894     pa_tagstruct_putu32(t, s->module ? s->module->index : PA_INVALID_INDEX);
2895     pa_tagstruct_putu32(t, s->client ? s->client->index : PA_INVALID_INDEX);
2896     pa_tagstruct_putu32(t, s->source->index);
2897     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2898     pa_tagstruct_put_channel_map(t, &s->channel_map);
2899     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s, &source_latency));
2900     pa_tagstruct_put_usec(t, source_latency);
2901     pa_tagstruct_puts(t, pa_resample_method_to_string(pa_source_output_get_resample_method(s)));
2902     pa_tagstruct_puts(t, s->driver);
2903
2904     if (c->version >= 13)
2905         pa_tagstruct_put_proplist(t, s->proplist);
2906 }
2907
2908 static void scache_fill_tagstruct(pa_native_connection *c, pa_tagstruct *t, pa_scache_entry *e) {
2909     pa_sample_spec fixed_ss;
2910     pa_cvolume v;
2911
2912     pa_assert(t);
2913     pa_assert(e);
2914
2915     if (e->memchunk.memblock)
2916         fixup_sample_spec(c, &fixed_ss, &e->sample_spec);
2917     else
2918         memset(&fixed_ss, 0, sizeof(fixed_ss));
2919
2920     pa_tagstruct_putu32(t, e->index);
2921     pa_tagstruct_puts(t, e->name);
2922
2923     if (e->volume_is_set)
2924         v = e->volume;
2925     else
2926         pa_cvolume_init(&v);
2927
2928     pa_tagstruct_put_cvolume(t, &v);
2929     pa_tagstruct_put_usec(t, e->memchunk.memblock ? pa_bytes_to_usec(e->memchunk.length, &e->sample_spec) : 0);
2930     pa_tagstruct_put_sample_spec(t, &fixed_ss);
2931     pa_tagstruct_put_channel_map(t, &e->channel_map);
2932     pa_tagstruct_putu32(t, (uint32_t) e->memchunk.length);
2933     pa_tagstruct_put_boolean(t, e->lazy);
2934     pa_tagstruct_puts(t, e->filename);
2935
2936     if (c->version >= 13)
2937         pa_tagstruct_put_proplist(t, e->proplist);
2938 }
2939
2940 static void command_get_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2941     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
2942     uint32_t idx;
2943     pa_sink *sink = NULL;
2944     pa_source *source = NULL;
2945     pa_client *client = NULL;
2946     pa_card *card = NULL;
2947     pa_module *module = NULL;
2948     pa_sink_input *si = NULL;
2949     pa_source_output *so = NULL;
2950     pa_scache_entry *sce = NULL;
2951     const char *name = NULL;
2952     pa_tagstruct *reply;
2953
2954     pa_native_connection_assert_ref(c);
2955     pa_assert(t);
2956
2957     if (pa_tagstruct_getu32(t, &idx) < 0 ||
2958         (command != PA_COMMAND_GET_CLIENT_INFO &&
2959          command != PA_COMMAND_GET_MODULE_INFO &&
2960          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
2961          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
2962          pa_tagstruct_gets(t, &name) < 0) ||
2963         !pa_tagstruct_eof(t)) {
2964         protocol_error(c);
2965         return;
2966     }
2967
2968     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
2969     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
2970     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
2971     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
2972     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
2973
2974     if (command == PA_COMMAND_GET_SINK_INFO) {
2975         if (idx != PA_INVALID_INDEX)
2976             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
2977         else
2978             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
2979     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
2980         if (idx != PA_INVALID_INDEX)
2981             source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
2982         else
2983             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
2984     } else if (command == PA_COMMAND_GET_CARD_INFO) {
2985         if (idx != PA_INVALID_INDEX)
2986             card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
2987         else
2988             card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
2989     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
2990         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
2991     else if (command == PA_COMMAND_GET_MODULE_INFO)
2992         module = pa_idxset_get_by_index(c->protocol->core->modules, idx);
2993     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
2994         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
2995     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
2996         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
2997     else {
2998         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO);
2999         if (idx != PA_INVALID_INDEX)
3000             sce = pa_idxset_get_by_index(c->protocol->core->scache, idx);
3001         else
3002             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE);
3003     }
3004
3005     if (!sink && !source && !client && !card && !module && !si && !so && !sce) {
3006         pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
3007         return;
3008     }
3009
3010     reply = reply_new(tag);
3011     if (sink)
3012         sink_fill_tagstruct(c, reply, sink);
3013     else if (source)
3014         source_fill_tagstruct(c, reply, source);
3015     else if (client)
3016         client_fill_tagstruct(c, reply, client);
3017     else if (card)
3018         card_fill_tagstruct(c, reply, card);
3019     else if (module)
3020         module_fill_tagstruct(c, reply, module);
3021     else if (si)
3022         sink_input_fill_tagstruct(c, reply, si);
3023     else if (so)
3024         source_output_fill_tagstruct(c, reply, so);
3025     else
3026         scache_fill_tagstruct(c, reply, sce);
3027     pa_pstream_send_tagstruct(c->pstream, reply);
3028 }
3029
3030 static void command_get_info_list(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3031     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3032     pa_idxset *i;
3033     uint32_t idx;
3034     void *p;
3035     pa_tagstruct *reply;
3036
3037     pa_native_connection_assert_ref(c);
3038     pa_assert(t);
3039
3040     if (!pa_tagstruct_eof(t)) {
3041         protocol_error(c);
3042         return;
3043     }
3044
3045     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3046
3047     reply = reply_new(tag);
3048
3049     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3050         i = c->protocol->core->sinks;
3051     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3052         i = c->protocol->core->sources;
3053     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3054         i = c->protocol->core->clients;
3055     else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3056         i = c->protocol->core->cards;
3057     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3058         i = c->protocol->core->modules;
3059     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3060         i = c->protocol->core->sink_inputs;
3061     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3062         i = c->protocol->core->source_outputs;
3063     else {
3064         pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3065         i = c->protocol->core->scache;
3066     }
3067
3068     if (i) {
3069         for (p = pa_idxset_first(i, &idx); p; p = pa_idxset_next(i, &idx)) {
3070             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
3071                 sink_fill_tagstruct(c, reply, p);
3072             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
3073                 source_fill_tagstruct(c, reply, p);
3074             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
3075                 client_fill_tagstruct(c, reply, p);
3076             else if (command == PA_COMMAND_GET_CARD_INFO_LIST)
3077                 card_fill_tagstruct(c, reply, p);
3078             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
3079                 module_fill_tagstruct(c, reply, p);
3080             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
3081                 sink_input_fill_tagstruct(c, reply, p);
3082             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
3083                 source_output_fill_tagstruct(c, reply, p);
3084             else {
3085                 pa_assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
3086                 scache_fill_tagstruct(c, reply, p);
3087             }
3088         }
3089     }
3090
3091     pa_pstream_send_tagstruct(c->pstream, reply);
3092 }
3093
3094 static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3095     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3096     pa_tagstruct *reply;
3097     char txt[256];
3098     pa_sink *def_sink;
3099     pa_source *def_source;
3100     pa_sample_spec fixed_ss;
3101
3102     pa_native_connection_assert_ref(c);
3103     pa_assert(t);
3104
3105     if (!pa_tagstruct_eof(t)) {
3106         protocol_error(c);
3107         return;
3108     }
3109
3110     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3111
3112     reply = reply_new(tag);
3113     pa_tagstruct_puts(reply, PACKAGE_NAME);
3114     pa_tagstruct_puts(reply, PACKAGE_VERSION);
3115     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
3116     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
3117
3118     fixup_sample_spec(c, &fixed_ss, &c->protocol->core->default_sample_spec);
3119     pa_tagstruct_put_sample_spec(reply, &fixed_ss);
3120
3121     def_sink = pa_namereg_get_default_sink(c->protocol->core);
3122     pa_tagstruct_puts(reply, def_sink ? def_sink->name : NULL);
3123     def_source = pa_namereg_get_default_source(c->protocol->core);
3124     pa_tagstruct_puts(reply, def_source ? def_source->name : NULL);
3125
3126     pa_tagstruct_putu32(reply, c->protocol->core->cookie);
3127
3128     if (c->version >= 15)
3129         pa_tagstruct_put_channel_map(reply, &c->protocol->core->default_channel_map);
3130
3131     pa_pstream_send_tagstruct(c->pstream, reply);
3132 }
3133
3134 static void subscription_cb(pa_core *core, pa_subscription_event_type_t e, uint32_t idx, void *userdata) {
3135     pa_tagstruct *t;
3136     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3137
3138     pa_native_connection_assert_ref(c);
3139
3140     t = pa_tagstruct_new(NULL, 0);
3141     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
3142     pa_tagstruct_putu32(t, (uint32_t) -1);
3143     pa_tagstruct_putu32(t, e);
3144     pa_tagstruct_putu32(t, idx);
3145     pa_pstream_send_tagstruct(c->pstream, t);
3146 }
3147
3148 static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3149     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3150     pa_subscription_mask_t m;
3151
3152     pa_native_connection_assert_ref(c);
3153     pa_assert(t);
3154
3155     if (pa_tagstruct_getu32(t, &m) < 0 ||
3156         !pa_tagstruct_eof(t)) {
3157         protocol_error(c);
3158         return;
3159     }
3160
3161     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3162     CHECK_VALIDITY(c->pstream, (m & ~PA_SUBSCRIPTION_MASK_ALL) == 0, tag, PA_ERR_INVALID);
3163
3164     if (c->subscription)
3165         pa_subscription_free(c->subscription);
3166
3167     if (m != 0) {
3168         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
3169         pa_assert(c->subscription);
3170     } else
3171         c->subscription = NULL;
3172
3173     pa_pstream_send_simple_ack(c->pstream, tag);
3174 }
3175
3176 static void command_set_volume(
3177         pa_pdispatch *pd,
3178         uint32_t command,
3179         uint32_t tag,
3180         pa_tagstruct *t,
3181         void *userdata) {
3182
3183     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3184     uint32_t idx;
3185     pa_cvolume volume;
3186     pa_sink *sink = NULL;
3187     pa_source *source = NULL;
3188     pa_sink_input *si = NULL;
3189     const char *name = NULL;
3190
3191     pa_native_connection_assert_ref(c);
3192     pa_assert(t);
3193
3194     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3195         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3196         (command == PA_COMMAND_SET_SOURCE_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
3197         pa_tagstruct_get_cvolume(t, &volume) ||
3198         !pa_tagstruct_eof(t)) {
3199         protocol_error(c);
3200         return;
3201     }
3202
3203     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3204     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3205     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3206     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3207     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3208     CHECK_VALIDITY(c->pstream, pa_cvolume_valid(&volume), tag, PA_ERR_INVALID);
3209
3210     switch (command) {
3211
3212         case PA_COMMAND_SET_SINK_VOLUME:
3213             if (idx != PA_INVALID_INDEX)
3214                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3215             else
3216                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3217             break;
3218
3219         case PA_COMMAND_SET_SOURCE_VOLUME:
3220             if (idx != PA_INVALID_INDEX)
3221                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3222             else
3223                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3224             break;
3225
3226         case PA_COMMAND_SET_SINK_INPUT_VOLUME:
3227             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3228             break;
3229
3230         default:
3231             pa_assert_not_reached();
3232     }
3233
3234     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3235
3236     if (sink)
3237         pa_sink_set_volume(sink, &volume, TRUE, TRUE);
3238     else if (source)
3239         pa_source_set_volume(source, &volume);
3240     else if (si)
3241         pa_sink_input_set_volume(si, &volume, TRUE);
3242
3243     pa_pstream_send_simple_ack(c->pstream, tag);
3244 }
3245
3246 static void command_set_mute(
3247         pa_pdispatch *pd,
3248         uint32_t command,
3249         uint32_t tag,
3250         pa_tagstruct *t,
3251         void *userdata) {
3252
3253     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3254     uint32_t idx;
3255     pa_bool_t mute;
3256     pa_sink *sink = NULL;
3257     pa_source *source = NULL;
3258     pa_sink_input *si = NULL;
3259     const char *name = NULL;
3260
3261     pa_native_connection_assert_ref(c);
3262     pa_assert(t);
3263
3264     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3265         (command == PA_COMMAND_SET_SINK_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3266         (command == PA_COMMAND_SET_SOURCE_MUTE && pa_tagstruct_gets(t, &name) < 0) ||
3267         pa_tagstruct_get_boolean(t, &mute) ||
3268         !pa_tagstruct_eof(t)) {
3269         protocol_error(c);
3270         return;
3271     }
3272
3273     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3274     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
3275     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3276     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3277     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3278
3279     switch (command) {
3280
3281         case PA_COMMAND_SET_SINK_MUTE:
3282
3283             if (idx != PA_INVALID_INDEX)
3284                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
3285             else
3286                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
3287
3288             break;
3289
3290         case PA_COMMAND_SET_SOURCE_MUTE:
3291             if (idx != PA_INVALID_INDEX)
3292                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
3293             else
3294                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
3295
3296             break;
3297
3298         case PA_COMMAND_SET_SINK_INPUT_MUTE:
3299             si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3300             break;
3301
3302         default:
3303             pa_assert_not_reached();
3304     }
3305
3306     CHECK_VALIDITY(c->pstream, si || sink || source, tag, PA_ERR_NOENTITY);
3307
3308     if (sink)
3309         pa_sink_set_mute(sink, mute);
3310     else if (source)
3311         pa_source_set_mute(source, mute);
3312     else if (si)
3313         pa_sink_input_set_mute(si, mute, TRUE);
3314
3315     pa_pstream_send_simple_ack(c->pstream, tag);
3316 }
3317
3318 static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3319     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3320     uint32_t idx;
3321     pa_bool_t b;
3322     playback_stream *s;
3323
3324     pa_native_connection_assert_ref(c);
3325     pa_assert(t);
3326
3327     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3328         pa_tagstruct_get_boolean(t, &b) < 0 ||
3329         !pa_tagstruct_eof(t)) {
3330         protocol_error(c);
3331         return;
3332     }
3333
3334     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3335     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3336     s = pa_idxset_get_by_index(c->output_streams, idx);
3337     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3338     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3339
3340     pa_sink_input_cork(s->sink_input, b);
3341
3342     if (b)
3343         s->is_underrun = TRUE;
3344
3345     pa_pstream_send_simple_ack(c->pstream, tag);
3346 }
3347
3348 static void command_trigger_or_flush_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3349     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3350     uint32_t idx;
3351     playback_stream *s;
3352
3353     pa_native_connection_assert_ref(c);
3354     pa_assert(t);
3355
3356     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3357         !pa_tagstruct_eof(t)) {
3358         protocol_error(c);
3359         return;
3360     }
3361
3362     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3363     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3364     s = pa_idxset_get_by_index(c->output_streams, idx);
3365     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3366     CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3367
3368     switch (command) {
3369         case PA_COMMAND_FLUSH_PLAYBACK_STREAM:
3370             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_FLUSH, NULL, 0, NULL);
3371             break;
3372
3373         case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
3374             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_PREBUF_FORCE, NULL, 0, NULL);
3375             break;
3376
3377         case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
3378             pa_asyncmsgq_send(s->sink_input->sink->asyncmsgq, PA_MSGOBJECT(s->sink_input), SINK_INPUT_MESSAGE_TRIGGER, NULL, 0, NULL);
3379             break;
3380
3381         default:
3382             pa_assert_not_reached();
3383     }
3384
3385     pa_pstream_send_simple_ack(c->pstream, tag);
3386 }
3387
3388 static void command_cork_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3389     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3390     uint32_t idx;
3391     record_stream *s;
3392     pa_bool_t b;
3393
3394     pa_native_connection_assert_ref(c);
3395     pa_assert(t);
3396
3397     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3398         pa_tagstruct_get_boolean(t, &b) < 0 ||
3399         !pa_tagstruct_eof(t)) {
3400         protocol_error(c);
3401         return;
3402     }
3403
3404     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3405     s = pa_idxset_get_by_index(c->record_streams, idx);
3406     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3407
3408     pa_source_output_cork(s->source_output, b);
3409     pa_memblockq_prebuf_force(s->memblockq);
3410     pa_pstream_send_simple_ack(c->pstream, tag);
3411 }
3412
3413 static void command_flush_record_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3414     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3415     uint32_t idx;
3416     record_stream *s;
3417
3418     pa_native_connection_assert_ref(c);
3419     pa_assert(t);
3420
3421     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3422         !pa_tagstruct_eof(t)) {
3423         protocol_error(c);
3424         return;
3425     }
3426
3427     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3428     s = pa_idxset_get_by_index(c->record_streams, idx);
3429     CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3430
3431     pa_memblockq_flush_read(s->memblockq);
3432     pa_pstream_send_simple_ack(c->pstream, tag);
3433 }
3434
3435 static void command_set_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3436     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3437     uint32_t idx;
3438     uint32_t maxlength, tlength, prebuf, minreq, fragsize;
3439     pa_tagstruct *reply;
3440
3441     pa_native_connection_assert_ref(c);
3442     pa_assert(t);
3443
3444     if (pa_tagstruct_getu32(t, &idx) < 0) {
3445         protocol_error(c);
3446         return;
3447     }
3448
3449     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3450
3451     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
3452         playback_stream *s;
3453         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3454
3455         s = pa_idxset_get_by_index(c->output_streams, idx);
3456         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3457         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3458
3459         if (pa_tagstruct_get(
3460                     t,
3461                     PA_TAG_U32, &maxlength,
3462                     PA_TAG_U32, &tlength,
3463                     PA_TAG_U32, &prebuf,
3464                     PA_TAG_U32, &minreq,
3465                     PA_TAG_INVALID) < 0 ||
3466             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3467             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3468             !pa_tagstruct_eof(t)) {
3469             protocol_error(c);
3470             return;
3471         }
3472
3473         fix_playback_buffer_attr_pre(s, adjust_latency, early_requests, &maxlength, &tlength, &prebuf, &minreq);
3474         pa_memblockq_set_maxlength(s->memblockq, maxlength);
3475         pa_memblockq_set_tlength(s->memblockq, tlength);
3476         pa_memblockq_set_prebuf(s->memblockq, prebuf);
3477         pa_memblockq_set_minreq(s->memblockq, minreq);
3478         fix_playback_buffer_attr_post(s, &maxlength, &tlength, &prebuf, &minreq);
3479
3480         reply = reply_new(tag);
3481         pa_tagstruct_putu32(reply, maxlength);
3482         pa_tagstruct_putu32(reply, tlength);
3483         pa_tagstruct_putu32(reply, prebuf);
3484         pa_tagstruct_putu32(reply, minreq);
3485
3486         if (c->version >= 13)
3487             pa_tagstruct_put_usec(reply, s->sink_latency);
3488
3489     } else {
3490         record_stream *s;
3491         pa_bool_t adjust_latency = FALSE, early_requests = FALSE;
3492         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR);
3493
3494         s = pa_idxset_get_by_index(c->record_streams, idx);
3495         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3496
3497         if (pa_tagstruct_get(
3498                     t,
3499                     PA_TAG_U32, &maxlength,
3500                     PA_TAG_U32, &fragsize,
3501                     PA_TAG_INVALID) < 0 ||
3502             (c->version >= 13 && pa_tagstruct_get_boolean(t, &adjust_latency) < 0) ||
3503             (c->version >= 14 && pa_tagstruct_get_boolean(t, &early_requests) < 0) ||
3504             !pa_tagstruct_eof(t)) {
3505             protocol_error(c);
3506             return;
3507         }
3508
3509         fix_record_buffer_attr_pre(s, adjust_latency, early_requests, &maxlength, &fragsize);
3510         pa_memblockq_set_maxlength(s->memblockq, maxlength);
3511         fix_record_buffer_attr_post(s, &maxlength, &fragsize);
3512
3513         reply = reply_new(tag);
3514         pa_tagstruct_putu32(reply, maxlength);
3515         pa_tagstruct_putu32(reply, fragsize);
3516
3517         if (c->version >= 13)
3518             pa_tagstruct_put_usec(reply, s->source_latency);
3519     }
3520
3521     pa_pstream_send_tagstruct(c->pstream, reply);
3522 }
3523
3524 static void command_update_stream_sample_rate(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3525     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3526     uint32_t idx;
3527     uint32_t rate;
3528
3529     pa_native_connection_assert_ref(c);
3530     pa_assert(t);
3531
3532     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3533         pa_tagstruct_getu32(t, &rate) < 0 ||
3534         !pa_tagstruct_eof(t)) {
3535         protocol_error(c);
3536         return;
3537     }
3538
3539     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3540     CHECK_VALIDITY(c->pstream, rate > 0 && rate <= PA_RATE_MAX, tag, PA_ERR_INVALID);
3541
3542     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE) {
3543         playback_stream *s;
3544
3545         s = pa_idxset_get_by_index(c->output_streams, idx);
3546         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3547         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3548
3549         pa_sink_input_set_rate(s->sink_input, rate);
3550
3551     } else {
3552         record_stream *s;
3553         pa_assert(command == PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE);
3554
3555         s = pa_idxset_get_by_index(c->record_streams, idx);
3556         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3557
3558         pa_source_output_set_rate(s->source_output, rate);
3559     }
3560
3561     pa_pstream_send_simple_ack(c->pstream, tag);
3562 }
3563
3564 static void command_update_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3565     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3566     uint32_t idx;
3567     uint32_t mode;
3568     pa_proplist *p;
3569
3570     pa_native_connection_assert_ref(c);
3571     pa_assert(t);
3572
3573     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3574
3575     p = pa_proplist_new();
3576
3577     if (command == PA_COMMAND_UPDATE_CLIENT_PROPLIST) {
3578
3579         if (pa_tagstruct_getu32(t, &mode) < 0 ||
3580             pa_tagstruct_get_proplist(t, p) < 0 ||
3581             !pa_tagstruct_eof(t)) {
3582             protocol_error(c);
3583             pa_proplist_free(p);
3584             return;
3585         }
3586
3587     } else {
3588
3589         if (pa_tagstruct_getu32(t, &idx) < 0 ||
3590             pa_tagstruct_getu32(t, &mode) < 0 ||
3591             pa_tagstruct_get_proplist(t, p) < 0 ||
3592             !pa_tagstruct_eof(t)) {
3593             protocol_error(c);
3594             pa_proplist_free(p);
3595             return;
3596         }
3597     }
3598
3599     if (!(mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE)) {
3600         pa_proplist_free(p);
3601         CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_INVALID);
3602     }
3603
3604     if (command == PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST) {
3605         playback_stream *s;
3606
3607         s = pa_idxset_get_by_index(c->output_streams, idx);
3608         if (!s || !playback_stream_isinstance(s)) {
3609             pa_proplist_free(p);
3610             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3611         }
3612         pa_sink_input_update_proplist(s->sink_input, mode, p);
3613
3614     } else if (command == PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST) {
3615         record_stream *s;
3616
3617         if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
3618             pa_proplist_free(p);
3619             CHECK_VALIDITY(c->pstream, FALSE, tag, PA_ERR_NOENTITY);
3620         }
3621         pa_source_output_update_proplist(s->source_output, mode, p);
3622
3623     } else {
3624         pa_assert(command == PA_COMMAND_UPDATE_CLIENT_PROPLIST);
3625
3626         pa_client_update_proplist(c->client, mode, p);
3627     }
3628
3629     pa_pstream_send_simple_ack(c->pstream, tag);
3630     pa_proplist_free(p);
3631 }
3632
3633 static void command_remove_proplist(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3634     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3635     uint32_t idx;
3636     unsigned changed = 0;
3637     pa_proplist *p;
3638     pa_strlist *l = NULL;
3639
3640     pa_native_connection_assert_ref(c);
3641     pa_assert(t);
3642
3643     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3644
3645     if (command != PA_COMMAND_REMOVE_CLIENT_PROPLIST) {
3646
3647         if (pa_tagstruct_getu32(t, &idx) < 0) {
3648             protocol_error(c);
3649             return;
3650         }
3651     }
3652
3653     if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3654         playback_stream *s;
3655
3656         s = pa_idxset_get_by_index(c->output_streams, idx);
3657         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3658         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3659
3660         p = s->sink_input->proplist;
3661
3662     } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3663         record_stream *s;
3664
3665         s = pa_idxset_get_by_index(c->record_streams, idx);
3666         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3667
3668         p = s->source_output->proplist;
3669     } else {
3670         pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3671
3672         p = c->client->proplist;
3673     }
3674
3675     for (;;) {
3676         const char *k;
3677
3678         if (pa_tagstruct_gets(t, &k) < 0) {
3679             protocol_error(c);
3680             pa_strlist_free(l);
3681             return;
3682         }
3683
3684         if (!k)
3685             break;
3686
3687         l = pa_strlist_prepend(l, k);
3688     }
3689
3690     if (!pa_tagstruct_eof(t)) {
3691         protocol_error(c);
3692         pa_strlist_free(l);
3693         return;
3694     }
3695
3696     for (;;) {
3697         char *z;
3698
3699         l = pa_strlist_pop(l, &z);
3700
3701         if (!z)
3702             break;
3703
3704         changed += (unsigned) (pa_proplist_unset(p, z) >= 0);
3705         pa_xfree(z);
3706     }
3707
3708     pa_pstream_send_simple_ack(c->pstream, tag);
3709
3710     if (changed) {
3711         if (command == PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST) {
3712             playback_stream *s;
3713
3714             s = pa_idxset_get_by_index(c->output_streams, idx);
3715             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->sink_input->index);
3716
3717         } else if (command == PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST) {
3718             record_stream *s;
3719
3720             s = pa_idxset_get_by_index(c->record_streams, idx);
3721             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_CHANGE, s->source_output->index);
3722
3723         } else {
3724             pa_assert(command == PA_COMMAND_REMOVE_CLIENT_PROPLIST);
3725             pa_subscription_post(c->protocol->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_CHANGE, c->client->index);
3726         }
3727     }
3728 }
3729
3730 static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3731     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3732     const char *s;
3733
3734     pa_native_connection_assert_ref(c);
3735     pa_assert(t);
3736
3737     if (pa_tagstruct_gets(t, &s) < 0 ||
3738         !pa_tagstruct_eof(t)) {
3739         protocol_error(c);
3740         return;
3741     }
3742
3743     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3744     CHECK_VALIDITY(c->pstream, !s || pa_namereg_is_valid_name(s), tag, PA_ERR_INVALID);
3745
3746     if (command == PA_COMMAND_SET_DEFAULT_SOURCE) {
3747         pa_source *source;
3748
3749         source = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SOURCE);
3750         CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
3751
3752         pa_namereg_set_default_source(c->protocol->core, source);
3753     } else {
3754         pa_sink *sink;
3755         pa_assert(command == PA_COMMAND_SET_DEFAULT_SINK);
3756
3757         sink = pa_namereg_get(c->protocol->core, s, PA_NAMEREG_SINK);
3758         CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
3759
3760         pa_namereg_set_default_sink(c->protocol->core, sink);
3761     }
3762
3763     pa_pstream_send_simple_ack(c->pstream, tag);
3764 }
3765
3766 static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3767     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3768     uint32_t idx;
3769     const char *name;
3770
3771     pa_native_connection_assert_ref(c);
3772     pa_assert(t);
3773
3774     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3775         pa_tagstruct_gets(t, &name) < 0 ||
3776         !pa_tagstruct_eof(t)) {
3777         protocol_error(c);
3778         return;
3779     }
3780
3781     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3782     CHECK_VALIDITY(c->pstream, name && pa_utf8_valid(name), tag, PA_ERR_INVALID);
3783
3784     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
3785         playback_stream *s;
3786
3787         s = pa_idxset_get_by_index(c->output_streams, idx);
3788         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3789         CHECK_VALIDITY(c->pstream, playback_stream_isinstance(s), tag, PA_ERR_NOENTITY);
3790
3791         pa_sink_input_set_name(s->sink_input, name);
3792
3793     } else {
3794         record_stream *s;
3795         pa_assert(command == PA_COMMAND_SET_RECORD_STREAM_NAME);
3796
3797         s = pa_idxset_get_by_index(c->record_streams, idx);
3798         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3799
3800         pa_source_output_set_name(s->source_output, name);
3801     }
3802
3803     pa_pstream_send_simple_ack(c->pstream, tag);
3804 }
3805
3806 static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3807     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3808     uint32_t idx;
3809
3810     pa_native_connection_assert_ref(c);
3811     pa_assert(t);
3812
3813     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3814         !pa_tagstruct_eof(t)) {
3815         protocol_error(c);
3816         return;
3817     }
3818
3819     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3820
3821     if (command == PA_COMMAND_KILL_CLIENT) {
3822         pa_client *client;
3823
3824         client = pa_idxset_get_by_index(c->protocol->core->clients, idx);
3825         CHECK_VALIDITY(c->pstream, client, tag, PA_ERR_NOENTITY);
3826
3827         pa_native_connection_ref(c);
3828         pa_client_kill(client);
3829
3830     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
3831         pa_sink_input *s;
3832
3833         s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3834         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3835
3836         pa_native_connection_ref(c);
3837         pa_sink_input_kill(s);
3838     } else {
3839         pa_source_output *s;
3840
3841         pa_assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
3842
3843         s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3844         CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_NOENTITY);
3845
3846         pa_native_connection_ref(c);
3847         pa_source_output_kill(s);
3848     }
3849
3850     pa_pstream_send_simple_ack(c->pstream, tag);
3851     pa_native_connection_unref(c);
3852 }
3853
3854 static void command_load_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3855     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3856     pa_module *m;
3857     const char *name, *argument;
3858     pa_tagstruct *reply;
3859
3860     pa_native_connection_assert_ref(c);
3861     pa_assert(t);
3862
3863     if (pa_tagstruct_gets(t, &name) < 0 ||
3864         pa_tagstruct_gets(t, &argument) < 0 ||
3865         !pa_tagstruct_eof(t)) {
3866         protocol_error(c);
3867         return;
3868     }
3869
3870     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3871     CHECK_VALIDITY(c->pstream, name && *name && pa_utf8_valid(name) && !strchr(name, '/'), tag, PA_ERR_INVALID);
3872     CHECK_VALIDITY(c->pstream, !argument || pa_utf8_valid(argument), tag, PA_ERR_INVALID);
3873
3874     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
3875         pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
3876         return;
3877     }
3878
3879     reply = reply_new(tag);
3880     pa_tagstruct_putu32(reply, m->index);
3881     pa_pstream_send_tagstruct(c->pstream, reply);
3882 }
3883
3884 static void command_unload_module(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3885     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3886     uint32_t idx;
3887     pa_module *m;
3888
3889     pa_native_connection_assert_ref(c);
3890     pa_assert(t);
3891
3892     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3893         !pa_tagstruct_eof(t)) {
3894         protocol_error(c);
3895         return;
3896     }
3897
3898     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3899     m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
3900     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOENTITY);
3901
3902     pa_module_unload_request(m, FALSE);
3903     pa_pstream_send_simple_ack(c->pstream, tag);
3904 }
3905
3906 static void command_move_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3907     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3908     uint32_t idx = PA_INVALID_INDEX, idx_device = PA_INVALID_INDEX;
3909     const char *name_device = NULL;
3910
3911     pa_native_connection_assert_ref(c);
3912     pa_assert(t);
3913
3914     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3915         pa_tagstruct_getu32(t, &idx_device) < 0 ||
3916         pa_tagstruct_gets(t, &name_device) < 0 ||
3917         !pa_tagstruct_eof(t)) {
3918         protocol_error(c);
3919         return;
3920     }
3921
3922     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3923     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3924
3925     CHECK_VALIDITY(c->pstream, !name_device || pa_namereg_is_valid_name(name_device), tag, PA_ERR_INVALID);
3926     CHECK_VALIDITY(c->pstream, idx_device != PA_INVALID_INDEX || name_device, tag, PA_ERR_INVALID);
3927     CHECK_VALIDITY(c->pstream, idx_device == PA_INVALID_INDEX || !name_device, tag, PA_ERR_INVALID);
3928     CHECK_VALIDITY(c->pstream, !name_device || idx_device == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3929
3930     if (command == PA_COMMAND_MOVE_SINK_INPUT) {
3931         pa_sink_input *si = NULL;
3932         pa_sink *sink = NULL;
3933
3934         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx);
3935
3936         if (idx_device != PA_INVALID_INDEX)
3937             sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx_device);
3938         else
3939             sink = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SINK);
3940
3941         CHECK_VALIDITY(c->pstream, si && sink, tag, PA_ERR_NOENTITY);
3942
3943         if (pa_sink_input_move_to(si, sink, TRUE) < 0) {
3944             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
3945             return;
3946         }
3947     } else {
3948         pa_source_output *so = NULL;
3949         pa_source *source;
3950
3951         pa_assert(command == PA_COMMAND_MOVE_SOURCE_OUTPUT);
3952
3953         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx);
3954
3955         if (idx_device != PA_INVALID_INDEX)
3956             source = pa_idxset_get_by_index(c->protocol->core->sources, idx_device);
3957         else
3958             source = pa_namereg_get(c->protocol->core, name_device, PA_NAMEREG_SOURCE);
3959
3960         CHECK_VALIDITY(c->pstream, so && source, tag, PA_ERR_NOENTITY);
3961
3962         if (pa_source_output_move_to(so, source, TRUE) < 0) {
3963             pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
3964             return;
3965         }
3966     }
3967
3968     pa_pstream_send_simple_ack(c->pstream, tag);
3969 }
3970
3971 static void command_suspend(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
3972     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
3973     uint32_t idx = PA_INVALID_INDEX;
3974     const char *name = NULL;
3975     pa_bool_t b;
3976
3977     pa_native_connection_assert_ref(c);
3978     pa_assert(t);
3979
3980     if (pa_tagstruct_getu32(t, &idx) < 0 ||
3981         pa_tagstruct_gets(t, &name) < 0 ||
3982         pa_tagstruct_get_boolean(t, &b) < 0 ||
3983         !pa_tagstruct_eof(t)) {
3984         protocol_error(c);
3985         return;
3986     }
3987
3988     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
3989     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name) || *name == 0, tag, PA_ERR_INVALID);
3990     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
3991     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
3992     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
3993
3994     if (command == PA_COMMAND_SUSPEND_SINK) {
3995
3996         if (idx == PA_INVALID_INDEX && name && !*name) {
3997
3998             pa_log_debug("%s all sinks", b ? "Suspending" : "Resuming");
3999
4000             if (pa_sink_suspend_all(c->protocol->core, b) < 0) {
4001                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4002                 return;
4003             }
4004         } else {
4005             pa_sink *sink = NULL;
4006
4007             if (idx != PA_INVALID_INDEX)
4008                 sink = pa_idxset_get_by_index(c->protocol->core->sinks, idx);
4009             else
4010                 sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK);
4011
4012             CHECK_VALIDITY(c->pstream, sink, tag, PA_ERR_NOENTITY);
4013
4014             if (pa_sink_suspend(sink, b) < 0) {
4015                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4016                 return;
4017             }
4018         }
4019     } else {
4020
4021         pa_assert(command == PA_COMMAND_SUSPEND_SOURCE);
4022
4023         if (idx == PA_INVALID_INDEX && name && !*name) {
4024
4025             pa_log_debug("%s all sources", b ? "Suspending" : "Resuming");
4026
4027             if (pa_source_suspend_all(c->protocol->core, b) < 0) {
4028                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4029                 return;
4030             }
4031
4032         } else {
4033             pa_source *source;
4034
4035             if (idx != PA_INVALID_INDEX)
4036                 source = pa_idxset_get_by_index(c->protocol->core->sources, idx);
4037             else
4038                 source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE);
4039
4040             CHECK_VALIDITY(c->pstream, source, tag, PA_ERR_NOENTITY);
4041
4042             if (pa_source_suspend(source, b) < 0) {
4043                 pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4044                 return;
4045             }
4046         }
4047     }
4048
4049     pa_pstream_send_simple_ack(c->pstream, tag);
4050 }
4051
4052 static void command_extension(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4053     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4054     uint32_t idx = PA_INVALID_INDEX;
4055     const char *name = NULL;
4056     pa_module *m;
4057     pa_native_protocol_ext_cb_t cb;
4058
4059     pa_native_connection_assert_ref(c);
4060     pa_assert(t);
4061
4062     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4063         pa_tagstruct_gets(t, &name) < 0) {
4064         protocol_error(c);
4065         return;
4066     }
4067
4068     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4069     CHECK_VALIDITY(c->pstream, !name || pa_utf8_valid(name), tag, PA_ERR_INVALID);
4070     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4071     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4072     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4073
4074     if (idx != PA_INVALID_INDEX)
4075         m = pa_idxset_get_by_index(c->protocol->core->modules, idx);
4076     else {
4077         for (m = pa_idxset_first(c->protocol->core->modules, &idx); m; m = pa_idxset_next(c->protocol->core->modules, &idx))
4078             if (strcmp(name, m->name) == 0)
4079                 break;
4080     }
4081
4082     CHECK_VALIDITY(c->pstream, m, tag, PA_ERR_NOEXTENSION);
4083     CHECK_VALIDITY(c->pstream, m->load_once || idx != PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4084
4085     cb = (pa_native_protocol_ext_cb_t) (unsigned long) pa_hashmap_get(c->protocol->extensions, m);
4086     CHECK_VALIDITY(c->pstream, cb, tag, PA_ERR_NOEXTENSION);
4087
4088     if (cb(c->protocol, m, c, tag, t) < 0)
4089         protocol_error(c);
4090 }
4091
4092 static void command_set_card_profile(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
4093     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4094     uint32_t idx = PA_INVALID_INDEX;
4095     const char *name = NULL, *profile = NULL;
4096     pa_card *card = NULL;
4097
4098     pa_native_connection_assert_ref(c);
4099     pa_assert(t);
4100
4101     if (pa_tagstruct_getu32(t, &idx) < 0 ||
4102         pa_tagstruct_gets(t, &name) < 0 ||
4103         pa_tagstruct_gets(t, &profile) < 0 ||
4104         !pa_tagstruct_eof(t)) {
4105         protocol_error(c);
4106         return;
4107     }
4108
4109     CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
4110     CHECK_VALIDITY(c->pstream, !name || pa_namereg_is_valid_name(name), tag, PA_ERR_INVALID);
4111     CHECK_VALIDITY(c->pstream, idx != PA_INVALID_INDEX || name, tag, PA_ERR_INVALID);
4112     CHECK_VALIDITY(c->pstream, idx == PA_INVALID_INDEX || !name, tag, PA_ERR_INVALID);
4113     CHECK_VALIDITY(c->pstream, !name || idx == PA_INVALID_INDEX, tag, PA_ERR_INVALID);
4114
4115     if (idx != PA_INVALID_INDEX)
4116         card = pa_idxset_get_by_index(c->protocol->core->cards, idx);
4117     else
4118         card = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_CARD);
4119
4120     CHECK_VALIDITY(c->pstream, card, tag, PA_ERR_NOENTITY);
4121
4122     if (pa_card_set_profile(card, profile) < 0) {
4123         pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
4124         return;
4125     }
4126
4127     pa_pstream_send_simple_ack(c->pstream, tag);
4128 }
4129
4130 /*** pstream callbacks ***/
4131
4132 static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, const pa_creds *creds, void *userdata) {
4133     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4134
4135     pa_assert(p);
4136     pa_assert(packet);
4137     pa_native_connection_assert_ref(c);
4138
4139     if (pa_pdispatch_run(c->pdispatch, packet, creds, c) < 0) {
4140         pa_log("invalid packet.");
4141         native_connection_unlink(c);
4142     }
4143 }
4144
4145 static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
4146     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4147     output_stream *stream;
4148
4149     pa_assert(p);
4150     pa_assert(chunk);
4151     pa_native_connection_assert_ref(c);
4152
4153     if (!(stream = OUTPUT_STREAM(pa_idxset_get_by_index(c->output_streams, channel)))) {
4154         pa_log("client sent block for invalid stream.");
4155         /* Ignoring */
4156         return;
4157     }
4158
4159 /*     pa_log("got %lu bytes", (unsigned long) chunk->length); */
4160
4161     if (playback_stream_isinstance(stream)) {
4162         playback_stream *ps = PLAYBACK_STREAM(stream);
4163
4164         if (chunk->memblock) {
4165             if (seek != PA_SEEK_RELATIVE || offset != 0)
4166                 pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset, NULL, NULL);
4167
4168             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
4169         } else
4170             pa_asyncmsgq_post(ps->sink_input->sink->asyncmsgq, PA_MSGOBJECT(ps->sink_input), SINK_INPUT_MESSAGE_SEEK, PA_UINT_TO_PTR(seek), offset+chunk->length, NULL, NULL);
4171
4172     } else {
4173         upload_stream *u = UPLOAD_STREAM(stream);
4174         size_t l;
4175
4176         if (!u->memchunk.memblock) {
4177             if (u->length == chunk->length && chunk->memblock) {
4178                 u->memchunk = *chunk;
4179                 pa_memblock_ref(u->memchunk.memblock);
4180                 u->length = 0;
4181             } else {
4182                 u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
4183                 u->memchunk.index = u->memchunk.length = 0;
4184             }
4185         }
4186
4187         pa_assert(u->memchunk.memblock);
4188
4189         l = u->length;
4190         if (l > chunk->length)
4191             l = chunk->length;
4192
4193         if (l > 0) {
4194             void *dst;
4195             dst = pa_memblock_acquire(u->memchunk.memblock);
4196
4197             if (chunk->memblock) {
4198                 void *src;
4199                 src = pa_memblock_acquire(chunk->memblock);
4200
4201                 memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
4202                        (uint8_t*) src + chunk->index, l);
4203
4204                 pa_memblock_release(chunk->memblock);
4205             } else
4206                 pa_silence_memory((uint8_t*) dst + u->memchunk.index + u->memchunk.length, l, &u->sample_spec);
4207
4208             pa_memblock_release(u->memchunk.memblock);
4209
4210             u->memchunk.length += l;
4211             u->length -= l;
4212         }
4213     }
4214 }
4215
4216 static void pstream_die_callback(pa_pstream *p, void *userdata) {
4217     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4218
4219     pa_assert(p);
4220     pa_native_connection_assert_ref(c);
4221
4222     native_connection_unlink(c);
4223     pa_log_info("Connection died.");
4224 }
4225
4226 static void pstream_drain_callback(pa_pstream *p, void *userdata) {
4227     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4228
4229     pa_assert(p);
4230     pa_native_connection_assert_ref(c);
4231
4232     native_connection_send_memblock(c);
4233 }
4234
4235 static void pstream_revoke_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4236     pa_thread_mq *q;
4237
4238     if (!(q = pa_thread_mq_get()))
4239         pa_pstream_send_revoke(p, block_id);
4240     else
4241         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_REVOKE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4242 }
4243
4244 static void pstream_release_callback(pa_pstream *p, uint32_t block_id, void *userdata) {
4245     pa_thread_mq *q;
4246
4247     if (!(q = pa_thread_mq_get()))
4248         pa_pstream_send_release(p, block_id);
4249     else
4250         pa_asyncmsgq_post(q->outq, PA_MSGOBJECT(userdata), CONNECTION_MESSAGE_RELEASE, PA_UINT_TO_PTR(block_id), 0, NULL, NULL);
4251 }
4252
4253 /*** client callbacks ***/
4254
4255 static void client_kill_cb(pa_client *c) {
4256     pa_assert(c);
4257
4258     native_connection_unlink(PA_NATIVE_CONNECTION(c->userdata));
4259     pa_log_info("Connection killed.");
4260 }
4261
4262 static void client_send_event_cb(pa_client *client, const char*event, pa_proplist *pl) {
4263     pa_tagstruct *t;
4264     pa_native_connection *c;
4265
4266     pa_assert(client);
4267     c = PA_NATIVE_CONNECTION(client->userdata);
4268     pa_native_connection_assert_ref(c);
4269
4270     if (c->version < 15)
4271       return;
4272
4273     t = pa_tagstruct_new(NULL, 0);
4274     pa_tagstruct_putu32(t, PA_COMMAND_CLIENT_EVENT);
4275     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
4276     pa_tagstruct_puts(t, event);
4277     pa_tagstruct_put_proplist(t, pl);
4278     pa_pstream_send_tagstruct(c->pstream, t);
4279 }
4280
4281 /*** module entry points ***/
4282
4283 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
4284     pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
4285
4286     pa_assert(m);
4287     pa_assert(tv);
4288     pa_native_connection_assert_ref(c);
4289     pa_assert(c->auth_timeout_event == e);
4290
4291     if (!c->authorized) {
4292         native_connection_unlink(c);
4293         pa_log_info("Connection terminated due to authentication timeout.");
4294     }
4295 }
4296
4297 void pa_native_protocol_connect(pa_native_protocol *p, pa_iochannel *io, pa_native_options *o) {
4298     pa_native_connection *c;
4299     char pname[128];
4300     pa_client *client;
4301     pa_client_new_data data;
4302
4303     pa_assert(p);
4304     pa_assert(io);
4305     pa_assert(o);
4306
4307     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
4308         pa_log_warn("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
4309         pa_iochannel_free(io);
4310         return;
4311     }
4312
4313     pa_client_new_data_init(&data);
4314     data.module = o->module;
4315     data.driver = __FILE__;
4316     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
4317     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "Native client (%s)", pname);
4318     pa_proplist_sets(data.proplist, "native-protocol.peer", pname);
4319     client = pa_client_new(p->core, &data);
4320     pa_client_new_data_done(&data);
4321
4322     if (!client)
4323         return;
4324
4325     c = pa_msgobject_new(pa_native_connection);
4326     c->parent.parent.free = native_connection_free;
4327     c->parent.process_msg = native_connection_process_msg;
4328     c->protocol = p;
4329     c->options = pa_native_options_ref(o);
4330     c->authorized = FALSE;
4331
4332     if (o->auth_anonymous) {
4333         pa_log_info("Client authenticated anonymously.");
4334         c->authorized = TRUE;
4335     }
4336
4337     if (!c->authorized &&
4338         o->auth_ip_acl &&
4339         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
4340
4341         pa_log_info("Client authenticated by IP ACL.");
4342         c->authorized = TRUE;
4343     }
4344
4345     if (!c->authorized) {
4346         struct timeval tv;
4347         pa_gettimeofday(&tv);
4348         tv.tv_sec += AUTH_TIMEOUT;
4349         c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c);
4350     } else
4351         c->auth_timeout_event = NULL;
4352
4353     c->is_local = pa_iochannel_socket_is_local(io);
4354     c->version = 8;
4355
4356     c->client = client;
4357     c->client->kill = client_kill_cb;
4358     c->client->send_event = client_send_event_cb;
4359     c->client->userdata = c;
4360
4361     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
4362     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
4363     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
4364     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
4365     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
4366     pa_pstream_set_revoke_callback(c->pstream, pstream_revoke_callback, c);
4367     pa_pstream_set_release_callback(c->pstream, pstream_release_callback, c);
4368
4369     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
4370
4371     c->record_streams = pa_idxset_new(NULL, NULL);
4372     c->output_streams = pa_idxset_new(NULL, NULL);
4373
4374     c->rrobin_index = PA_IDXSET_INVALID;
4375     c->subscription = NULL;
4376
4377     pa_idxset_put(p->connections, c, NULL);
4378
4379 #ifdef HAVE_CREDS
4380     if (pa_iochannel_creds_supported(io))
4381         pa_iochannel_creds_enable(io);
4382 #endif
4383
4384     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_CONNECTION_PUT], c);
4385 }
4386
4387 void pa_native_protocol_disconnect(pa_native_protocol *p, pa_module *m) {
4388     pa_native_connection *c;
4389     void *state = NULL;
4390
4391     pa_assert(p);
4392     pa_assert(m);
4393
4394     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
4395         if (c->options->module == m)
4396             native_connection_unlink(c);
4397 }
4398
4399 static pa_native_protocol* native_protocol_new(pa_core *c) {
4400     pa_native_protocol *p;
4401     pa_native_hook_t h;
4402
4403     pa_assert(c);
4404
4405     p = pa_xnew(pa_native_protocol, 1);
4406     PA_REFCNT_INIT(p);
4407     p->core = c;
4408     p->connections = pa_idxset_new(NULL, NULL);
4409
4410     p->servers = NULL;
4411
4412     p->extensions = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
4413
4414     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4415         pa_hook_init(&p->hooks[h], p);
4416
4417     pa_assert_se(pa_shared_set(c, "native-protocol", p) >= 0);
4418
4419     return p;
4420 }
4421
4422 pa_native_protocol* pa_native_protocol_get(pa_core *c) {
4423     pa_native_protocol *p;
4424
4425     if ((p = pa_shared_get(c, "native-protocol")))
4426         return pa_native_protocol_ref(p);
4427
4428     return native_protocol_new(c);
4429 }
4430
4431 pa_native_protocol* pa_native_protocol_ref(pa_native_protocol *p) {
4432     pa_assert(p);
4433     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4434
4435     PA_REFCNT_INC(p);
4436
4437     return p;
4438 }
4439
4440 void pa_native_protocol_unref(pa_native_protocol *p) {
4441     pa_native_connection *c;
4442     pa_native_hook_t h;
4443
4444     pa_assert(p);
4445     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4446
4447     if (PA_REFCNT_DEC(p) > 0)
4448         return;
4449
4450     while ((c = pa_idxset_first(p->connections, NULL)))
4451         native_connection_unlink(c);
4452
4453     pa_idxset_free(p->connections, NULL, NULL);
4454
4455     pa_strlist_free(p->servers);
4456
4457     for (h = 0; h < PA_NATIVE_HOOK_MAX; h++)
4458         pa_hook_done(&p->hooks[h]);
4459
4460     pa_hashmap_free(p->extensions, NULL, NULL);
4461
4462     pa_assert_se(pa_shared_remove(p->core, "native-protocol") >= 0);
4463
4464     pa_xfree(p);
4465 }
4466
4467 void pa_native_protocol_add_server_string(pa_native_protocol *p, const char *name) {
4468     pa_assert(p);
4469     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4470     pa_assert(name);
4471
4472     p->servers = pa_strlist_prepend(p->servers, name);
4473
4474     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4475 }
4476
4477 void pa_native_protocol_remove_server_string(pa_native_protocol *p, const char *name) {
4478     pa_assert(p);
4479     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4480     pa_assert(name);
4481
4482     p->servers = pa_strlist_remove(p->servers, name);
4483
4484     pa_hook_fire(&p->hooks[PA_NATIVE_HOOK_SERVERS_CHANGED], p->servers);
4485 }
4486
4487 pa_hook *pa_native_protocol_hooks(pa_native_protocol *p) {
4488     pa_assert(p);
4489     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4490
4491     return p->hooks;
4492 }
4493
4494 pa_strlist *pa_native_protocol_servers(pa_native_protocol *p) {
4495     pa_assert(p);
4496     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4497
4498     return p->servers;
4499 }
4500
4501 int pa_native_protocol_install_ext(pa_native_protocol *p, pa_module *m, pa_native_protocol_ext_cb_t cb) {
4502     pa_assert(p);
4503     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4504     pa_assert(m);
4505     pa_assert(cb);
4506     pa_assert(!pa_hashmap_get(p->extensions, m));
4507
4508     pa_assert_se(pa_hashmap_put(p->extensions, m, (void*) (unsigned long) cb) == 0);
4509     return 0;
4510 }
4511
4512 void pa_native_protocol_remove_ext(pa_native_protocol *p, pa_module *m) {
4513     pa_assert(p);
4514     pa_assert(PA_REFCNT_VALUE(p) >= 1);
4515     pa_assert(m);
4516
4517     pa_assert_se(pa_hashmap_remove(p->extensions, m));
4518 }
4519
4520 pa_native_options* pa_native_options_new(void) {
4521     pa_native_options *o;
4522
4523     o = pa_xnew0(pa_native_options, 1);
4524     PA_REFCNT_INIT(o);
4525
4526     return o;
4527 }
4528
4529 pa_native_options* pa_native_options_ref(pa_native_options *o) {
4530     pa_assert(o);
4531     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4532
4533     PA_REFCNT_INC(o);
4534
4535     return o;
4536 }
4537
4538 void pa_native_options_unref(pa_native_options *o) {
4539     pa_assert(o);
4540     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4541
4542     if (PA_REFCNT_DEC(o) > 0)
4543         return;
4544
4545     pa_xfree(o->auth_group);
4546
4547     if (o->auth_ip_acl)
4548         pa_ip_acl_free(o->auth_ip_acl);
4549
4550     if (o->auth_cookie)
4551         pa_auth_cookie_unref(o->auth_cookie);
4552
4553     pa_xfree(o);
4554 }
4555
4556 int pa_native_options_parse(pa_native_options *o, pa_core *c, pa_modargs *ma) {
4557     pa_bool_t enabled;
4558     const char *acl;
4559
4560     pa_assert(o);
4561     pa_assert(PA_REFCNT_VALUE(o) >= 1);
4562     pa_assert(ma);
4563
4564     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
4565         pa_log("auth-anonymous= expects a boolean argument.");
4566         return -1;
4567     }
4568
4569     enabled = TRUE;
4570     if (pa_modargs_get_value_boolean(ma, "auth-group-enabled", &enabled) < 0) {
4571         pa_log("auth-group-enabled= expects a boolean argument.");
4572         return -1;
4573     }
4574
4575     pa_xfree(o->auth_group);
4576     o->auth_group = enabled ? pa_xstrdup(pa_modargs_get_value(ma, "auth-group", pa_in_system_mode() ? PA_ACCESS_GROUP : NULL)) : NULL;
4577
4578 #ifndef HAVE_CREDS
4579     if (o->auth_group)
4580         pa_log_warn("Authentication group configured, but not available on local system. Ignoring.");
4581 #endif
4582
4583     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
4584         pa_ip_acl *ipa;
4585
4586         if (!(ipa = pa_ip_acl_new(acl))) {
4587             pa_log("Failed to parse IP ACL '%s'", acl);
4588             return -1;
4589         }
4590
4591         if (o->auth_ip_acl)
4592             pa_ip_acl_free(o->auth_ip_acl);
4593
4594         o->auth_ip_acl = ipa;
4595     }
4596
4597     enabled = TRUE;
4598     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
4599         pa_log("auth-cookie-enabled= expects a boolean argument.");
4600         return -1;
4601     }
4602
4603     if (o->auth_cookie)
4604         pa_auth_cookie_unref(o->auth_cookie);
4605
4606     if (enabled) {
4607         const char *cn;
4608
4609         /* The new name for this is 'auth-cookie', for compat reasons
4610          * we check the old name too */
4611         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
4612             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
4613                 cn = PA_NATIVE_COOKIE_FILE;
4614
4615         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, PA_NATIVE_COOKIE_LENGTH)))
4616             return -1;
4617
4618     } else
4619           o->auth_cookie = NULL;
4620
4621     return 0;
4622 }
4623
4624 pa_pstream* pa_native_connection_get_pstream(pa_native_connection *c) {
4625     pa_native_connection_assert_ref(c);
4626
4627     return c->pstream;
4628 }