02d81db3ed0856405e2a1f08c2b383a9b9f7ffd6
[profile/ivi/pulseaudio-panda.git] / polyp / protocol-native.c
1 /* $Id$ */
2
3 /***
4   This file is part of polypaudio.
5  
6   polypaudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   polypaudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU Lesser General Public License
17   along with polypaudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <string.h>
27 #include <stdio.h>
28 #include <assert.h>
29 #include <stdlib.h>
30
31 #include "protocol-native.h"
32 #include "native-common.h"
33 #include "packet.h"
34 #include "client.h"
35 #include "source-output.h"
36 #include "sink-input.h"
37 #include "pstream.h"
38 #include "tagstruct.h"
39 #include "pdispatch.h"
40 #include "pstream-util.h"
41 #include "authkey.h"
42 #include "namereg.h"
43 #include "scache.h"
44 #include "xmalloc.h"
45 #include "util.h"
46 #include "subscribe.h"
47 #include "log.h"
48 #include "autoload.h"
49 #include "authkey-prop.h"
50 #include "strlist.h"
51 #include "props.h"
52
53 struct connection;
54 struct pa_protocol_native;
55
56 struct record_stream {
57     struct connection *connection;
58     uint32_t index;
59     struct pa_source_output *source_output;
60     struct pa_memblockq *memblockq;
61     size_t fragment_size;
62 };
63
64 struct playback_stream {
65     int type;
66     struct connection *connection;
67     uint32_t index;
68     struct pa_sink_input *sink_input;
69     struct pa_memblockq *memblockq;
70     size_t requested_bytes;
71     int drain_request;
72     uint32_t drain_tag;
73 };
74
75 struct upload_stream {
76     int type;
77     struct connection *connection;
78     uint32_t index;
79     struct pa_memchunk memchunk;
80     size_t length;
81     char *name;
82     struct pa_sample_spec sample_spec;
83 };
84
85 struct output_stream {
86     int type;
87 };
88
89 enum {
90     UPLOAD_STREAM,
91     PLAYBACK_STREAM
92 };
93
94 struct connection {
95     int authorized;
96     struct pa_protocol_native *protocol;
97     struct pa_client *client;
98     struct pa_pstream *pstream;
99     struct pa_pdispatch *pdispatch;
100     struct pa_idxset *record_streams, *output_streams;
101     uint32_t rrobin_index;
102     struct pa_subscription *subscription;
103 };
104
105 struct pa_protocol_native {
106     struct pa_module *module;
107     int public;
108     struct pa_core *core;
109     struct pa_socket_server *server;
110     struct pa_idxset *connections;
111     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
112     int auth_cookie_in_property;
113 };
114
115 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
116 static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
117 static void sink_input_kill_cb(struct pa_sink_input *i);
118 static pa_usec_t sink_input_get_latency_cb(struct pa_sink_input *i);
119
120 static void request_bytes(struct playback_stream*s);
121
122 static void source_output_kill_cb(struct pa_source_output *o);
123 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk);
124 static pa_usec_t source_output_get_latency_cb(struct pa_source_output *o);
125
126 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
127 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
128 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
129 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
130 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
131 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
132 static void command_set_client_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
133 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
134 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
135 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
136 static void command_get_record_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
137 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
138 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
139 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
140 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
141 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
142 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
143 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
144 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
145 static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
146 static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
147 static void command_flush_or_trigger_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
148 static void command_set_default_sink_or_source(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
149 static void command_set_stream_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
150 static void command_kill(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
151 static void command_load_module(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
152 static void command_unload_module(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
153 static void command_add_autoload(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
154 static void command_remove_autoload(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
155 static void command_get_autoload_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
156 static void command_get_autoload_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
157 static void command_cork_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
158 static void command_flush_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
159
160 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
161     [PA_COMMAND_ERROR] = { NULL },
162     [PA_COMMAND_TIMEOUT] = { NULL },
163     [PA_COMMAND_REPLY] = { NULL },
164     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
165     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_stream },
166     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = { command_drain_playback_stream },
167     [PA_COMMAND_CREATE_RECORD_STREAM] = { command_create_record_stream },
168     [PA_COMMAND_DELETE_RECORD_STREAM] = { command_delete_stream },
169     [PA_COMMAND_AUTH] = { command_auth },
170     [PA_COMMAND_REQUEST] = { NULL },
171     [PA_COMMAND_EXIT] = { command_exit },
172     [PA_COMMAND_SET_CLIENT_NAME] = { command_set_client_name },
173     [PA_COMMAND_LOOKUP_SINK] = { command_lookup },
174     [PA_COMMAND_LOOKUP_SOURCE] = { command_lookup },
175     [PA_COMMAND_STAT] = { command_stat },
176     [PA_COMMAND_GET_PLAYBACK_LATENCY] = { command_get_playback_latency },
177     [PA_COMMAND_GET_RECORD_LATENCY] = { command_get_record_latency },
178     [PA_COMMAND_CREATE_UPLOAD_STREAM] = { command_create_upload_stream },
179     [PA_COMMAND_DELETE_UPLOAD_STREAM] = { command_delete_stream },
180     [PA_COMMAND_FINISH_UPLOAD_STREAM] = { command_finish_upload_stream },
181     [PA_COMMAND_PLAY_SAMPLE] = { command_play_sample },
182     [PA_COMMAND_REMOVE_SAMPLE] = { command_remove_sample },
183     [PA_COMMAND_GET_SINK_INFO] = { command_get_info },
184     [PA_COMMAND_GET_SOURCE_INFO] = { command_get_info },
185     [PA_COMMAND_GET_CLIENT_INFO] = { command_get_info },
186     [PA_COMMAND_GET_MODULE_INFO] = { command_get_info },
187     [PA_COMMAND_GET_SINK_INPUT_INFO] = { command_get_info },
188     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = { command_get_info },
189     [PA_COMMAND_GET_SAMPLE_INFO] = { command_get_info },
190     [PA_COMMAND_GET_SINK_INFO_LIST] = { command_get_info_list },
191     [PA_COMMAND_GET_SOURCE_INFO_LIST] = { command_get_info_list },
192     [PA_COMMAND_GET_MODULE_INFO_LIST] = { command_get_info_list },
193     [PA_COMMAND_GET_CLIENT_INFO_LIST] = { command_get_info_list },
194     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = { command_get_info_list },
195     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = { command_get_info_list },
196     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = { command_get_info_list },
197     [PA_COMMAND_GET_SERVER_INFO] = { command_get_server_info },
198     [PA_COMMAND_SUBSCRIBE] = { command_subscribe },
199
200     [PA_COMMAND_SET_SINK_VOLUME] = { command_set_volume },
201     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = { command_set_volume },
202     
203     [PA_COMMAND_CORK_PLAYBACK_STREAM] = { command_cork_playback_stream },
204     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = { command_flush_or_trigger_playback_stream },
205     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = { command_flush_or_trigger_playback_stream },
206     [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = { command_flush_or_trigger_playback_stream },
207     
208     [PA_COMMAND_CORK_RECORD_STREAM] = { command_cork_record_stream },
209     [PA_COMMAND_FLUSH_RECORD_STREAM] = { command_flush_record_stream },
210     
211     [PA_COMMAND_SET_DEFAULT_SINK] = { command_set_default_sink_or_source },
212     [PA_COMMAND_SET_DEFAULT_SOURCE] = { command_set_default_sink_or_source },
213     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = { command_set_stream_name }, 
214     [PA_COMMAND_SET_RECORD_STREAM_NAME] = { command_set_stream_name },
215     [PA_COMMAND_KILL_CLIENT] = { command_kill },
216     [PA_COMMAND_KILL_SINK_INPUT] = { command_kill },
217     [PA_COMMAND_KILL_SOURCE_OUTPUT] = { command_kill },
218     [PA_COMMAND_LOAD_MODULE] = { command_load_module },
219     [PA_COMMAND_UNLOAD_MODULE] = { command_unload_module },
220     [PA_COMMAND_GET_AUTOLOAD_INFO] = { command_get_autoload_info },
221     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST] = { command_get_autoload_info_list },
222     [PA_COMMAND_ADD_AUTOLOAD] = { command_add_autoload },
223     [PA_COMMAND_REMOVE_AUTOLOAD] = { command_remove_autoload },
224
225 };
226
227 /* structure management */
228
229 static struct upload_stream* upload_stream_new(struct connection *c, const struct pa_sample_spec *ss, const char *name, size_t length) {
230     struct upload_stream *s;
231     assert(c && ss && name && length);
232     
233     s = pa_xmalloc(sizeof(struct upload_stream));
234     s->type = UPLOAD_STREAM;
235     s->connection = c;
236     s->sample_spec = *ss;
237     s->name = pa_xstrdup(name);
238
239     s->memchunk.memblock = NULL;
240     s->memchunk.index = 0;
241     s->memchunk.length = 0;
242
243     s->length = length;
244     
245     pa_idxset_put(c->output_streams, s, &s->index);
246     return s;
247 }
248
249 static void upload_stream_free(struct upload_stream *o) {
250     assert(o && o->connection);
251
252     pa_idxset_remove_by_data(o->connection->output_streams, o, NULL);
253
254     pa_xfree(o->name);
255     
256     if (o->memchunk.memblock)
257         pa_memblock_unref(o->memchunk.memblock);
258     
259     pa_xfree(o);
260 }
261
262 static struct record_stream* record_stream_new(struct connection *c, struct pa_source *source, const struct pa_sample_spec *ss, const char *name, size_t maxlength, size_t fragment_size) {
263     struct record_stream *s;
264     struct pa_source_output *source_output;
265     size_t base;
266     assert(c && source && ss && name && maxlength);
267
268     if (!(source_output = pa_source_output_new(source, name, ss, -1)))
269         return NULL;
270
271     s = pa_xmalloc(sizeof(struct record_stream));
272     s->connection = c;
273     s->source_output = source_output;
274     s->source_output->push = source_output_push_cb;
275     s->source_output->kill = source_output_kill_cb;
276     s->source_output->get_latency = source_output_get_latency_cb;
277     s->source_output->userdata = s;
278     s->source_output->owner = c->protocol->module;
279     s->source_output->client = c->client;
280
281     s->memblockq = pa_memblockq_new(maxlength, 0, base = pa_frame_size(ss), 0, 0, c->protocol->core->memblock_stat);
282     assert(s->memblockq);
283
284     s->fragment_size = (fragment_size/base)*base;
285     if (!s->fragment_size)
286         s->fragment_size = base;
287
288     pa_idxset_put(c->record_streams, s, &s->index);
289     return s;
290 }
291
292 static void record_stream_free(struct record_stream* r) {
293     assert(r && r->connection);
294
295     pa_idxset_remove_by_data(r->connection->record_streams, r, NULL);
296     pa_source_output_disconnect(r->source_output);
297     pa_source_output_unref(r->source_output);
298     pa_memblockq_free(r->memblockq);
299     pa_xfree(r);
300 }
301
302 static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, const struct pa_sample_spec *ss, const char *name,
303                                                    size_t maxlength,
304                                                    size_t tlength,
305                                                    size_t prebuf,
306                                                    size_t minreq,
307                                                    pa_volume_t volume) {
308     struct playback_stream *s;
309     struct pa_sink_input *sink_input;
310     assert(c && sink && ss && name && maxlength);
311
312     if (!(sink_input = pa_sink_input_new(sink, name, ss, 0, -1)))
313         return NULL;
314     
315     s = pa_xmalloc(sizeof(struct playback_stream));
316     s->type = PLAYBACK_STREAM;
317     s->connection = c;
318     s->sink_input = sink_input;
319     
320     s->sink_input->peek = sink_input_peek_cb;
321     s->sink_input->drop = sink_input_drop_cb;
322     s->sink_input->kill = sink_input_kill_cb;
323     s->sink_input->get_latency = sink_input_get_latency_cb;
324     s->sink_input->userdata = s;
325     s->sink_input->owner = c->protocol->module;
326     s->sink_input->client = c->client;
327     
328     s->memblockq = pa_memblockq_new(maxlength, tlength, pa_frame_size(ss), prebuf, minreq, c->protocol->core->memblock_stat);
329     assert(s->memblockq);
330
331     s->requested_bytes = 0;
332     s->drain_request = 0;
333
334     s->sink_input->volume = volume;
335     
336     pa_idxset_put(c->output_streams, s, &s->index);
337     return s;
338 }
339
340 static void playback_stream_free(struct playback_stream* p) {
341     assert(p && p->connection);
342
343     if (p->drain_request)
344         pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERROR_NOENTITY);
345
346     pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
347     pa_sink_input_disconnect(p->sink_input);
348     pa_sink_input_unref(p->sink_input);
349     pa_memblockq_free(p->memblockq);
350     pa_xfree(p);
351 }
352
353 static void connection_free(struct connection *c) {
354     struct record_stream *r;
355     struct output_stream *o;
356     assert(c && c->protocol);
357
358     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
359     while ((r = pa_idxset_first(c->record_streams, NULL)))
360         record_stream_free(r);
361     pa_idxset_free(c->record_streams, NULL, NULL);
362
363     while ((o = pa_idxset_first(c->output_streams, NULL)))
364         if (o->type == PLAYBACK_STREAM)
365             playback_stream_free((struct playback_stream*) o);
366         else
367             upload_stream_free((struct upload_stream*) o);
368     pa_idxset_free(c->output_streams, NULL, NULL);
369
370     pa_pdispatch_unref(c->pdispatch);
371     pa_pstream_close(c->pstream);
372     pa_pstream_unref(c->pstream);
373     pa_client_free(c->client);
374
375     if (c->subscription)
376         pa_subscription_free(c->subscription);
377     
378     pa_xfree(c);
379 }
380
381 static void request_bytes(struct playback_stream *s) {
382     struct pa_tagstruct *t;
383     size_t l;
384     assert(s);
385
386     if (!(l = pa_memblockq_missing(s->memblockq)))
387         return;
388     
389     if (l <= s->requested_bytes)
390         return;
391
392     l -= s->requested_bytes;
393
394     if (l < pa_memblockq_get_minreq(s->memblockq))
395         return;
396     
397     s->requested_bytes += l;
398
399     t = pa_tagstruct_new(NULL, 0);
400     assert(t);
401     pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
402     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
403     pa_tagstruct_putu32(t, s->index);
404     pa_tagstruct_putu32(t, l);
405     pa_pstream_send_tagstruct(s->connection->pstream, t);
406
407 /*     pa_log(__FILE__": Requesting %u bytes\n", l); */
408 }
409
410 static void send_memblock(struct connection *c) {
411     uint32_t start;
412     struct record_stream *r;
413
414     start = PA_IDXSET_INVALID;
415     for (;;) {
416         struct pa_memchunk chunk;
417         
418         if (!(r = pa_idxset_rrobin(c->record_streams, &c->rrobin_index)))
419             return;
420
421         if (start == PA_IDXSET_INVALID)
422             start = c->rrobin_index;
423         else if (start == c->rrobin_index)
424             return;
425
426         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
427             struct pa_memchunk schunk = chunk;
428             
429             if (schunk.length > r->fragment_size)
430                 schunk.length = r->fragment_size;
431
432             pa_pstream_send_memblock(c->pstream, r->index, 0, &schunk);
433             pa_memblockq_drop(r->memblockq, &chunk, schunk.length);
434             pa_memblock_unref(schunk.memblock);
435             
436             return;
437         }
438     }
439 }
440
441 static void send_playback_stream_killed(struct playback_stream *p) {
442     struct pa_tagstruct *t;
443     assert(p);
444
445     t = pa_tagstruct_new(NULL, 0);
446     assert(t);
447     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
448     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
449     pa_tagstruct_putu32(t, p->index);
450     pa_pstream_send_tagstruct(p->connection->pstream, t);
451 }
452
453 static void send_record_stream_killed(struct record_stream *r) {
454     struct pa_tagstruct *t;
455     assert(r);
456
457     t = pa_tagstruct_new(NULL, 0);
458     assert(t);
459     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
460     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
461     pa_tagstruct_putu32(t, r->index);
462     pa_pstream_send_tagstruct(r->connection->pstream, t);
463 }
464
465 /*** sinkinput callbacks ***/
466
467 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) {
468     struct playback_stream *s;
469     assert(i && i->userdata && chunk);
470     s = i->userdata;
471
472     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
473         return -1;
474
475     return 0;
476 }
477
478 static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
479     struct playback_stream *s;
480     assert(i && i->userdata && length);
481     s = i->userdata;
482
483     pa_memblockq_drop(s->memblockq, chunk, length);
484     request_bytes(s);
485
486     if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
487         pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag);
488         s->drain_request = 0;
489     }
490
491 /*     pa_log(__FILE__": after_drop: %u\n", pa_memblockq_get_length(s->memblockq)); */
492 }
493
494 static void sink_input_kill_cb(struct pa_sink_input *i) {
495     assert(i && i->userdata);
496     send_playback_stream_killed((struct playback_stream *) i->userdata);
497     playback_stream_free((struct playback_stream *) i->userdata);
498 }
499
500 static pa_usec_t sink_input_get_latency_cb(struct pa_sink_input *i) {
501     struct playback_stream *s;
502     assert(i && i->userdata);
503     s = i->userdata;
504
505     /*pa_log(__FILE__": get_latency: %u\n", pa_memblockq_get_length(s->memblockq));*/
506     
507     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
508 }
509
510 /*** source_output callbacks ***/
511
512 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk) {
513     struct record_stream *s;
514     assert(o && o->userdata && chunk);
515     s = o->userdata;
516     
517     pa_memblockq_push_align(s->memblockq, chunk, 0);
518     if (!pa_pstream_is_pending(s->connection->pstream))
519         send_memblock(s->connection);
520 }
521
522 static void source_output_kill_cb(struct pa_source_output *o) {
523     assert(o && o->userdata);
524     send_record_stream_killed((struct record_stream *) o->userdata);
525     record_stream_free((struct record_stream *) o->userdata);
526 }
527
528 static pa_usec_t source_output_get_latency_cb(struct pa_source_output *o) {
529     struct record_stream *s;
530     assert(o && o->userdata);
531     s = o->userdata;
532
533     /*pa_log(__FILE__": get_latency: %u\n", pa_memblockq_get_length(s->memblockq));*/
534     
535     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
536 }
537
538 /*** pdispatch callbacks ***/
539
540 static void protocol_error(struct connection *c) {
541     pa_log(__FILE__": protocol error, kicking client\n");
542     connection_free(c);
543 }
544
545 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
546     struct connection *c = userdata;
547     struct playback_stream *s;
548     size_t maxlength, tlength, prebuf, minreq;
549     uint32_t sink_index;
550     const char *name, *sink_name;
551     struct pa_sample_spec ss;
552     struct pa_tagstruct *reply;
553     struct pa_sink *sink;
554     pa_volume_t volume;
555     int corked;
556     assert(c && t && c->protocol && c->protocol->core);
557     
558     if (pa_tagstruct_gets(t, &name) < 0 || !name ||
559         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
560         pa_tagstruct_getu32(t, &sink_index) < 0 ||
561         pa_tagstruct_gets(t, &sink_name) < 0 ||
562         pa_tagstruct_getu32(t, &maxlength) < 0 ||
563         pa_tagstruct_get_boolean(t, &corked) < 0 ||
564         pa_tagstruct_getu32(t, &tlength) < 0 ||
565         pa_tagstruct_getu32(t, &prebuf) < 0 ||
566         pa_tagstruct_getu32(t, &minreq) < 0 ||
567         pa_tagstruct_getu32(t, &volume) < 0 ||
568         !pa_tagstruct_eof(t)) {
569         protocol_error(c);
570         return;
571     }
572
573
574     if (!c->authorized) {
575         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
576         return;
577     }
578
579     if (sink_index != (uint32_t) -1)
580         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
581     else
582         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK, 1);
583
584     if (!sink) {
585         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
586         return;
587     }
588     
589     if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, tlength, prebuf, minreq, volume))) {
590         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
591         return;
592     }
593
594     pa_sink_input_cork(s->sink_input, corked);
595     
596     reply = pa_tagstruct_new(NULL, 0);
597     assert(reply);
598     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
599     pa_tagstruct_putu32(reply, tag);
600     pa_tagstruct_putu32(reply, s->index);
601     assert(s->sink_input);
602     pa_tagstruct_putu32(reply, s->sink_input->index);
603     pa_tagstruct_putu32(reply, s->requested_bytes = pa_memblockq_missing(s->memblockq));
604     pa_pstream_send_tagstruct(c->pstream, reply);
605     request_bytes(s);
606 }
607
608 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
609     struct connection *c = userdata;
610     uint32_t channel;
611     assert(c && t);
612     
613     if (pa_tagstruct_getu32(t, &channel) < 0 ||
614         !pa_tagstruct_eof(t)) {
615         protocol_error(c);
616         return;
617     }
618
619     if (!c->authorized) {
620         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
621         return;
622     }
623
624     if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
625         struct playback_stream *s;
626         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
627             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
628             return;
629         }
630
631         playback_stream_free(s);
632     } else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
633         struct record_stream *s;
634         if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
635             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
636             return;
637         }
638
639         record_stream_free(s);
640     } else {
641         struct upload_stream *s;
642         assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
643         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
644             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
645             return;
646         }
647
648         upload_stream_free(s);
649     }
650             
651     pa_pstream_send_simple_ack(c->pstream, tag);
652 }
653
654 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
655     struct connection *c = userdata;
656     struct record_stream *s;
657     size_t maxlength, fragment_size;
658     uint32_t source_index;
659     const char *name, *source_name;
660     struct pa_sample_spec ss;
661     struct pa_tagstruct *reply;
662     struct pa_source *source;
663     int corked;
664     assert(c && t && c->protocol && c->protocol->core);
665     
666     if (pa_tagstruct_gets(t, &name) < 0 || !name ||
667         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
668         pa_tagstruct_getu32(t, &source_index) < 0 ||
669         pa_tagstruct_gets(t, &source_name) < 0 ||
670         pa_tagstruct_getu32(t, &maxlength) < 0 ||
671         pa_tagstruct_get_boolean(t, &corked) < 0 ||
672         pa_tagstruct_getu32(t, &fragment_size) < 0 ||
673         !pa_tagstruct_eof(t)) {
674         protocol_error(c);
675         return;
676     }
677
678     if (!c->authorized) {
679         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
680         return;
681     }
682
683     if (source_index != (uint32_t) -1)
684         source = pa_idxset_get_by_index(c->protocol->core->sources, source_index);
685     else
686         source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE, 1);
687
688     if (!source) {
689         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
690         return;
691     }
692     
693     if (!(s = record_stream_new(c, source, &ss, name, maxlength, fragment_size))) {
694         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
695         return;
696     }
697
698     pa_source_output_cork(s->source_output, corked);
699     
700     reply = pa_tagstruct_new(NULL, 0);
701     assert(reply);
702     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
703     pa_tagstruct_putu32(reply, tag);
704     pa_tagstruct_putu32(reply, s->index);
705     assert(s->source_output);
706     pa_tagstruct_putu32(reply, s->source_output->index);
707     pa_pstream_send_tagstruct(c->pstream, reply);
708 }
709
710 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
711     struct connection *c = userdata;
712     assert(c && t);
713     
714     if (!pa_tagstruct_eof(t)) {
715         protocol_error(c);
716         return;
717     }
718
719     if (!c->authorized) {
720         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
721         return;
722     }
723     
724     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
725     c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
726     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
727     return;
728 }
729
730 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
731     struct connection *c = userdata;
732     const void*cookie;
733     assert(c && t);
734
735     if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
736         !pa_tagstruct_eof(t)) {
737         protocol_error(c);
738         return;
739     }
740
741     if (!c->authorized) {
742         if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
743             pa_log(__FILE__": Denied access to client with invalid authorization key.\n");
744             pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
745             return;
746         }
747         
748         c->authorized = 1;
749     }
750     
751     pa_pstream_send_simple_ack(c->pstream, tag);
752     return;
753 }
754
755 static void command_set_client_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
756     struct connection *c = userdata;
757     const char *name;
758     assert(c && t);
759
760     if (pa_tagstruct_gets(t, &name) < 0 || !name ||
761         !pa_tagstruct_eof(t)) {
762         protocol_error(c);
763         return;
764     }
765
766     pa_client_set_name(c->client, name);
767     pa_pstream_send_simple_ack(c->pstream, tag);
768     return;
769 }
770
771 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
772     struct connection *c = userdata;
773     const char *name;
774     uint32_t index = PA_IDXSET_INVALID;
775     assert(c && t);
776
777     if (pa_tagstruct_gets(t, &name) < 0 || !name ||
778         !pa_tagstruct_eof(t)) {
779         protocol_error(c);
780         return;
781     }
782
783     if (!c->authorized) {
784         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
785         return;
786     }
787
788     if (command == PA_COMMAND_LOOKUP_SINK) {
789         struct pa_sink *sink;
790         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1)))
791             index = sink->index;
792     } else {
793         struct pa_source *source;
794         assert(command == PA_COMMAND_LOOKUP_SOURCE);
795         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1)))
796             index = source->index;
797     }
798
799     if (index == PA_IDXSET_INVALID)
800         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
801     else {
802         struct pa_tagstruct *reply;
803         reply = pa_tagstruct_new(NULL, 0);
804         assert(reply);
805         pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
806         pa_tagstruct_putu32(reply, tag);
807         pa_tagstruct_putu32(reply, index);
808         pa_pstream_send_tagstruct(c->pstream, reply);
809     }
810 }
811
812 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
813     struct connection *c = userdata;
814     uint32_t index;
815     struct playback_stream *s;
816     assert(c && t);
817
818     if (pa_tagstruct_getu32(t, &index) < 0 ||
819         !pa_tagstruct_eof(t)) {
820         protocol_error(c);
821         return;
822     }
823
824     if (!c->authorized) {
825         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
826         return;
827     }
828
829     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
830         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
831         return;
832     }
833
834     s->drain_request = 0;
835
836     pa_memblockq_prebuf_disable(s->memblockq);
837     
838     if (!pa_memblockq_is_readable(s->memblockq)) {
839 /*         pa_log("immediate drain: %u\n", pa_memblockq_get_length(s->memblockq)); */
840         pa_pstream_send_simple_ack(c->pstream, tag);
841     } else {
842 /*         pa_log("slow drain triggered\n"); */
843         s->drain_request = 1;
844         s->drain_tag = tag;
845
846         pa_sink_notify(s->sink_input->sink);
847     }
848
849
850 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
851     struct connection *c = userdata;
852     struct pa_tagstruct *reply;
853     assert(c && t);
854
855     if (!pa_tagstruct_eof(t)) {
856         protocol_error(c);
857         return;
858     }
859
860     if (!c->authorized) {
861         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
862         return;
863     }
864
865     reply = pa_tagstruct_new(NULL, 0);
866     assert(reply);
867     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
868     pa_tagstruct_putu32(reply, tag);
869     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total);
870     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total_size);
871     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated);
872     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated_size);
873     pa_tagstruct_putu32(reply, pa_scache_total_size(c->protocol->core));
874     pa_pstream_send_tagstruct(c->pstream, reply);
875 }
876
877 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
878     struct connection *c = userdata;
879     struct pa_tagstruct *reply;
880     struct playback_stream *s;
881     struct timeval tv, now;
882     uint64_t counter;
883     uint32_t index;
884     assert(c && t);
885     
886     if (pa_tagstruct_getu32(t, &index) < 0 ||
887         pa_tagstruct_get_timeval(t, &tv) < 0 ||
888         pa_tagstruct_getu64(t, &counter) < 0 ||
889         !pa_tagstruct_eof(t)) {
890         protocol_error(c);
891         return;
892     }
893
894     if (!c->authorized) {
895         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
896         return;
897     }
898
899     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
900         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
901         return;
902     }
903
904     reply = pa_tagstruct_new(NULL, 0);
905     assert(reply);
906     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
907     pa_tagstruct_putu32(reply, tag);
908     pa_tagstruct_put_usec(reply, pa_sink_input_get_latency(s->sink_input));
909     pa_tagstruct_put_usec(reply, pa_sink_get_latency(s->sink_input->sink));
910     pa_tagstruct_put_usec(reply, 0);
911     pa_tagstruct_put_boolean(reply, pa_memblockq_is_readable(s->memblockq));
912     pa_tagstruct_putu32(reply, pa_memblockq_get_length(s->memblockq));
913     pa_tagstruct_put_timeval(reply, &tv);
914     gettimeofday(&now, NULL);
915     pa_tagstruct_put_timeval(reply, &now);
916     pa_tagstruct_putu64(reply, counter);
917     pa_pstream_send_tagstruct(c->pstream, reply);
918 }
919
920 static void command_get_record_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
921     struct connection *c = userdata;
922     struct pa_tagstruct *reply;
923     struct record_stream *s;
924     struct timeval tv, now;
925     uint64_t counter;
926     uint32_t index;
927     assert(c && t);
928
929     if (pa_tagstruct_getu32(t, &index) < 0 ||
930         pa_tagstruct_get_timeval(t, &tv) < 0 ||
931         pa_tagstruct_getu64(t, &counter) < 0 ||
932         !pa_tagstruct_eof(t)) {
933         protocol_error(c);
934         return;
935     }
936
937     if (!c->authorized) {
938         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
939         return;
940     }
941
942     if (!(s = pa_idxset_get_by_index(c->record_streams, index))) {
943         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
944         return;
945     }
946
947     reply = pa_tagstruct_new(NULL, 0);
948     assert(reply);
949     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
950     pa_tagstruct_putu32(reply, tag);
951     pa_tagstruct_put_usec(reply, pa_source_output_get_latency(s->source_output));
952     pa_tagstruct_put_usec(reply, s->source_output->source->monitor_of ? pa_sink_get_latency(s->source_output->source->monitor_of) : 0);
953     pa_tagstruct_put_usec(reply, pa_source_get_latency(s->source_output->source));
954     pa_tagstruct_put_boolean(reply, 0);
955     pa_tagstruct_putu32(reply, pa_memblockq_get_length(s->memblockq));
956     pa_tagstruct_put_timeval(reply, &tv);
957     gettimeofday(&now, NULL);
958     pa_tagstruct_put_timeval(reply, &now);
959     pa_tagstruct_putu64(reply, counter);
960     pa_pstream_send_tagstruct(c->pstream, reply);
961 }
962
963
964 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
965     struct connection *c = userdata;
966     struct upload_stream *s;
967     size_t length;
968     const char *name;
969     struct pa_sample_spec ss;
970     struct pa_tagstruct *reply;
971     assert(c && t && c->protocol && c->protocol->core);
972     
973     if (pa_tagstruct_gets(t, &name) < 0 || !name ||
974         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
975         pa_tagstruct_getu32(t, &length) < 0 ||
976         !pa_tagstruct_eof(t)) {
977         protocol_error(c);
978         return;
979     }
980
981     if (!c->authorized) {
982         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
983         return;
984     }
985
986     if ((length % pa_frame_size(&ss)) != 0 || length <= 0 || !*name) {
987         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
988         return;
989     }
990     
991     if (!(s = upload_stream_new(c, &ss, name, length))) {
992         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
993         return;
994     }
995     
996     reply = pa_tagstruct_new(NULL, 0);
997     assert(reply);
998     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
999     pa_tagstruct_putu32(reply, tag);
1000     pa_tagstruct_putu32(reply, s->index);
1001     pa_tagstruct_putu32(reply, length);
1002     pa_pstream_send_tagstruct(c->pstream, reply);
1003 }
1004
1005 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1006     struct connection *c = userdata;
1007     uint32_t channel;
1008     struct upload_stream *s;
1009     uint32_t index;
1010     assert(c && t);
1011     
1012     if (pa_tagstruct_getu32(t, &channel) < 0 ||
1013         !pa_tagstruct_eof(t)) {
1014         protocol_error(c);
1015         return;
1016     }
1017
1018     if (!c->authorized) {
1019         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1020         return;
1021     }
1022
1023     if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
1024         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
1025         return;
1026     }
1027
1028     pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->memchunk, &index);
1029     pa_pstream_send_simple_ack(c->pstream, tag);
1030     upload_stream_free(s);
1031 }
1032
1033 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1034     struct connection *c = userdata;
1035     uint32_t sink_index, volume;
1036     struct pa_sink *sink;
1037     const char *name, *sink_name;
1038     assert(c && t);
1039
1040     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
1041         pa_tagstruct_gets(t, &sink_name) < 0 ||
1042         pa_tagstruct_getu32(t, &volume) < 0 ||
1043         pa_tagstruct_gets(t, &name) < 0 || !name || 
1044         !pa_tagstruct_eof(t)) {
1045         protocol_error(c);
1046         return;
1047     }
1048     
1049     if (!c->authorized) {
1050         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1051         return;
1052     }
1053
1054     if (sink_index != (uint32_t) -1)
1055         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
1056     else
1057         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK, 1);
1058
1059     if (!sink) {
1060         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1061         return;
1062     }
1063
1064     if (pa_scache_play_item(c->protocol->core, name, sink, volume) < 0) {
1065         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1066         return;
1067     }
1068
1069     pa_pstream_send_simple_ack(c->pstream, tag);
1070 }
1071
1072 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1073     struct connection *c = userdata;
1074     const char *name;
1075     assert(c && t);
1076
1077     if (pa_tagstruct_gets(t, &name) < 0 || !name || 
1078         !pa_tagstruct_eof(t)) {
1079         protocol_error(c);
1080         return;
1081     }
1082
1083     if (!c->authorized) {
1084         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1085         return;
1086     }
1087
1088     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
1089         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1090         return;
1091     }
1092
1093     pa_pstream_send_simple_ack(c->pstream, tag);
1094 }
1095
1096 static void sink_fill_tagstruct(struct pa_tagstruct *t, struct pa_sink *sink) {
1097     assert(t && sink);
1098     pa_tagstruct_putu32(t, sink->index);
1099     pa_tagstruct_puts(t, sink->name);
1100     pa_tagstruct_puts(t, sink->description);
1101     pa_tagstruct_put_sample_spec(t, &sink->sample_spec);
1102     pa_tagstruct_putu32(t, sink->owner ? sink->owner->index : (uint32_t) -1);
1103     pa_tagstruct_putu32(t, sink->volume);
1104     pa_tagstruct_putu32(t, sink->monitor_source->index);
1105     pa_tagstruct_puts(t, sink->monitor_source->name);
1106     pa_tagstruct_put_usec(t, pa_sink_get_latency(sink));
1107 }
1108
1109 static void source_fill_tagstruct(struct pa_tagstruct *t, struct pa_source *source) {
1110     assert(t && source);
1111     pa_tagstruct_putu32(t, source->index);
1112     pa_tagstruct_puts(t, source->name);
1113     pa_tagstruct_puts(t, source->description);
1114     pa_tagstruct_put_sample_spec(t, &source->sample_spec);
1115     pa_tagstruct_putu32(t, source->owner ? source->owner->index : (uint32_t) -1);
1116     pa_tagstruct_putu32(t, source->monitor_of ? source->monitor_of->index : (uint32_t) -1);
1117     pa_tagstruct_puts(t, source->monitor_of ? source->monitor_of->name : NULL);
1118     pa_tagstruct_put_usec(t, pa_source_get_latency(source));
1119 }
1120
1121 static void client_fill_tagstruct(struct pa_tagstruct *t, struct pa_client *client) {
1122     assert(t && client);
1123     pa_tagstruct_putu32(t, client->index);
1124     pa_tagstruct_puts(t, client->name);
1125     pa_tagstruct_puts(t, client->protocol_name);
1126     pa_tagstruct_putu32(t, client->owner ? client->owner->index : (uint32_t) -1);
1127 }
1128
1129 static void module_fill_tagstruct(struct pa_tagstruct *t, struct pa_module *module) {
1130     assert(t && module);
1131     pa_tagstruct_putu32(t, module->index);
1132     pa_tagstruct_puts(t, module->name);
1133     pa_tagstruct_puts(t, module->argument);
1134     pa_tagstruct_putu32(t, module->n_used);
1135     pa_tagstruct_put_boolean(t, module->auto_unload);
1136 }
1137
1138 static void sink_input_fill_tagstruct(struct pa_tagstruct *t, struct pa_sink_input *s) {
1139     assert(t && s);
1140     pa_tagstruct_putu32(t, s->index);
1141     pa_tagstruct_puts(t, s->name);
1142     pa_tagstruct_putu32(t, s->owner ? s->owner->index : (uint32_t) -1);
1143     pa_tagstruct_putu32(t, s->client ? s->client->index : (uint32_t) -1);
1144     pa_tagstruct_putu32(t, s->sink->index);
1145     pa_tagstruct_put_sample_spec(t, &s->sample_spec);
1146     pa_tagstruct_putu32(t, s->volume);
1147     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s));
1148     pa_tagstruct_put_usec(t, pa_sink_get_latency(s->sink));
1149 }
1150
1151 static void source_output_fill_tagstruct(struct pa_tagstruct *t, struct pa_source_output *s) {
1152     assert(t && s);
1153     pa_tagstruct_putu32(t, s->index);
1154     pa_tagstruct_puts(t, s->name);
1155     pa_tagstruct_putu32(t, s->owner ? s->owner->index : (uint32_t) -1);
1156     pa_tagstruct_putu32(t, s->client ? s->client->index : (uint32_t) -1);
1157     pa_tagstruct_putu32(t, s->source->index);
1158     pa_tagstruct_put_sample_spec(t, &s->sample_spec);
1159     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s));
1160     pa_tagstruct_put_usec(t, pa_source_get_latency(s->source));
1161 }
1162
1163 static void scache_fill_tagstruct(struct pa_tagstruct *t, struct pa_scache_entry *e) {
1164     assert(t && e);
1165     pa_tagstruct_putu32(t, e->index);
1166     pa_tagstruct_puts(t, e->name);
1167     pa_tagstruct_putu32(t, e->volume);
1168     pa_tagstruct_put_usec(t, pa_bytes_to_usec(e->memchunk.length, &e->sample_spec));
1169     pa_tagstruct_put_sample_spec(t, &e->sample_spec);
1170     pa_tagstruct_putu32(t, e->memchunk.length);
1171     pa_tagstruct_put_boolean(t, e->lazy);
1172     pa_tagstruct_puts(t, e->filename);
1173 }
1174
1175 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1176     struct connection *c = userdata;
1177     uint32_t index;
1178     struct pa_sink *sink = NULL;
1179     struct pa_source *source = NULL;
1180     struct pa_client *client = NULL;
1181     struct pa_module *module = NULL;
1182     struct pa_sink_input *si = NULL;
1183     struct pa_source_output *so = NULL;
1184     struct pa_scache_entry *sce = NULL;
1185     const char *name;
1186     struct pa_tagstruct *reply;
1187     assert(c && t);
1188
1189     
1190     if (pa_tagstruct_getu32(t, &index) < 0 ||
1191         (command != PA_COMMAND_GET_CLIENT_INFO &&
1192          command != PA_COMMAND_GET_MODULE_INFO &&
1193          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
1194          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
1195          pa_tagstruct_gets(t, &name) < 0) ||
1196         !pa_tagstruct_eof(t)) {
1197         protocol_error(c);
1198         return;
1199     }
1200     
1201     if (!c->authorized) {
1202         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1203         return;
1204     }
1205
1206     if (command == PA_COMMAND_GET_SINK_INFO) {
1207         if (index != (uint32_t) -1)
1208             sink = pa_idxset_get_by_index(c->protocol->core->sinks, index);
1209         else
1210             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
1211     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
1212         if (index != (uint32_t) -1)
1213             source = pa_idxset_get_by_index(c->protocol->core->sources, index);
1214         else
1215             source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1);
1216     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
1217         client = pa_idxset_get_by_index(c->protocol->core->clients, index);
1218     else if (command == PA_COMMAND_GET_MODULE_INFO) 
1219         module = pa_idxset_get_by_index(c->protocol->core->modules, index);
1220     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
1221         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index);
1222     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
1223         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, index);
1224     else {
1225         assert(command == PA_COMMAND_GET_SAMPLE_INFO);
1226         if (index != (uint32_t) -1)
1227             sce = pa_idxset_get_by_index(c->protocol->core->scache, index);
1228         else
1229             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE, 0);
1230     }
1231             
1232     if (!sink && !source && !client && !module && !si && !so && !sce) {
1233         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1234         return;
1235     }
1236
1237     reply = pa_tagstruct_new(NULL, 0);
1238     assert(reply);
1239     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1240     pa_tagstruct_putu32(reply, tag); 
1241     if (sink)
1242         sink_fill_tagstruct(reply, sink);
1243     else if (source)
1244         source_fill_tagstruct(reply, source);
1245     else if (client)
1246         client_fill_tagstruct(reply, client);
1247     else if (module)
1248         module_fill_tagstruct(reply, module);
1249     else if (si)
1250         sink_input_fill_tagstruct(reply, si);
1251     else if (so)
1252         source_output_fill_tagstruct(reply, so);
1253     else
1254         scache_fill_tagstruct(reply, sce);
1255     pa_pstream_send_tagstruct(c->pstream, reply);
1256 }
1257
1258 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1259     struct connection *c = userdata;
1260     struct pa_idxset *i;
1261     uint32_t index;
1262     void *p;
1263     struct pa_tagstruct *reply;
1264     assert(c && t);
1265
1266     if (!pa_tagstruct_eof(t)) {
1267         protocol_error(c);
1268         return;
1269     }
1270     
1271     if (!c->authorized) {
1272         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1273         return;
1274     }
1275
1276     reply = pa_tagstruct_new(NULL, 0);
1277     assert(reply);
1278     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1279     pa_tagstruct_putu32(reply, tag);
1280
1281     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1282         i = c->protocol->core->sinks;
1283     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1284         i = c->protocol->core->sources;
1285     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1286         i = c->protocol->core->clients;
1287     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
1288         i = c->protocol->core->modules;
1289     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
1290         i = c->protocol->core->sink_inputs;
1291     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
1292         i = c->protocol->core->source_outputs;
1293     else {
1294         assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
1295         i = c->protocol->core->scache;
1296     }
1297
1298     if (i) {
1299         for (p = pa_idxset_first(i, &index); p; p = pa_idxset_next(i, &index)) {
1300             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1301                 sink_fill_tagstruct(reply, p);
1302             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1303                 source_fill_tagstruct(reply, p);
1304             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1305                 client_fill_tagstruct(reply, p);
1306             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
1307                 module_fill_tagstruct(reply, p);
1308             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
1309                 sink_input_fill_tagstruct(reply, p);
1310             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST) 
1311                 source_output_fill_tagstruct(reply, p);
1312             else {
1313                 assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
1314                 scache_fill_tagstruct(reply, p);
1315             }
1316         }
1317     }
1318     
1319     pa_pstream_send_tagstruct(c->pstream, reply);
1320 }
1321
1322 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1323     struct connection *c = userdata;
1324     struct pa_tagstruct *reply;
1325     char txt[256];
1326     const char *n;
1327     assert(c && t);
1328
1329     if (!pa_tagstruct_eof(t)) {
1330         protocol_error(c);
1331         return;
1332     }
1333     
1334     if (!c->authorized) {
1335         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1336         return;
1337     }
1338
1339     reply = pa_tagstruct_new(NULL, 0);
1340     assert(reply);
1341     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1342     pa_tagstruct_putu32(reply, tag);
1343     pa_tagstruct_puts(reply, PACKAGE_NAME);
1344     pa_tagstruct_puts(reply, PACKAGE_VERSION);
1345     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
1346     pa_tagstruct_puts(reply, pa_get_fqdn(txt, sizeof(txt)));
1347     pa_tagstruct_put_sample_spec(reply, &c->protocol->core->default_sample_spec);
1348
1349     n = pa_namereg_get_default_sink_name(c->protocol->core);
1350     pa_tagstruct_puts(reply, n);
1351     n = pa_namereg_get_default_source_name(c->protocol->core);
1352     pa_tagstruct_puts(reply, n);
1353     pa_pstream_send_tagstruct(c->pstream, reply);
1354 }
1355
1356 static void subscription_cb(struct pa_core *core, enum pa_subscription_event_type e, uint32_t index, void *userdata) {
1357     struct pa_tagstruct *t;
1358     struct connection *c = userdata;
1359     assert(c && core);
1360
1361     t = pa_tagstruct_new(NULL, 0);
1362     assert(t);
1363     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
1364     pa_tagstruct_putu32(t, (uint32_t) -1);
1365     pa_tagstruct_putu32(t, e);
1366     pa_tagstruct_putu32(t, index);
1367     pa_pstream_send_tagstruct(c->pstream, t);
1368 }
1369
1370 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1371     struct connection *c = userdata;
1372     enum pa_subscription_mask m;
1373     assert(c && t);
1374
1375     if (pa_tagstruct_getu32(t, &m) < 0 ||
1376         !pa_tagstruct_eof(t)) {
1377         protocol_error(c);
1378         return;
1379     }
1380     
1381     if (!c->authorized) {
1382         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1383         return;
1384     }
1385
1386     if (c->subscription)
1387         pa_subscription_free(c->subscription);
1388
1389     if (m != 0) {
1390         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
1391         assert(c->subscription);
1392     } else
1393         c->subscription = NULL;
1394
1395     pa_pstream_send_simple_ack(c->pstream, tag);
1396 }
1397
1398 static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1399     struct connection *c = userdata;
1400     uint32_t index, volume;
1401     struct pa_sink *sink = NULL;
1402     struct pa_sink_input *si = NULL;
1403     const char *name = NULL;
1404     assert(c && t);
1405
1406     if (pa_tagstruct_getu32(t, &index) < 0 ||
1407         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
1408         pa_tagstruct_getu32(t, &volume) ||
1409         !pa_tagstruct_eof(t)) {
1410         protocol_error(c);
1411         return;
1412     }
1413     
1414     if (!c->authorized) {
1415         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1416         return;
1417     }
1418
1419     if (command == PA_COMMAND_SET_SINK_VOLUME) {
1420         if (index != (uint32_t) -1)
1421             sink = pa_idxset_get_by_index(c->protocol->core->sinks, index);
1422         else
1423             sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1);
1424     }  else {
1425         assert(command == PA_COMMAND_SET_SINK_INPUT_VOLUME);
1426         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index);
1427     }
1428
1429     if (!si && !sink) {
1430         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1431         return;
1432     }
1433
1434     if (sink)
1435         pa_sink_set_volume(sink, volume);
1436     else if (si)
1437         pa_sink_input_set_volume(si, volume);
1438
1439     pa_pstream_send_simple_ack(c->pstream, tag);
1440 }
1441
1442 static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1443     struct connection *c = userdata;
1444     uint32_t index;
1445     int b;
1446     struct playback_stream *s;
1447     assert(c && t);
1448
1449     if (pa_tagstruct_getu32(t, &index) < 0 ||
1450         pa_tagstruct_get_boolean(t, &b) < 0 ||
1451         !pa_tagstruct_eof(t)) {
1452         protocol_error(c);
1453         return;
1454     }
1455
1456     if (!c->authorized) {
1457         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1458         return;
1459     }
1460
1461     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1462         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1463         return;
1464     }
1465
1466     pa_sink_input_cork(s->sink_input, b);
1467     pa_pstream_send_simple_ack(c->pstream, tag);
1468 }
1469
1470 static void command_flush_or_trigger_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1471     struct connection *c = userdata;
1472     uint32_t index;
1473     struct playback_stream *s;
1474     assert(c && t);
1475
1476     if (pa_tagstruct_getu32(t, &index) < 0 ||
1477         !pa_tagstruct_eof(t)) {
1478         protocol_error(c);
1479         return;
1480     }
1481
1482     if (!c->authorized) {
1483         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1484         return;
1485     }
1486
1487     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1488         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1489         return;
1490     }
1491
1492     if (command == PA_COMMAND_PREBUF_PLAYBACK_STREAM)
1493         pa_memblockq_prebuf_reenable(s->memblockq);
1494     else if (command == PA_COMMAND_TRIGGER_PLAYBACK_STREAM)
1495         pa_memblockq_prebuf_disable(s->memblockq);
1496     else {
1497         assert(command == PA_COMMAND_FLUSH_PLAYBACK_STREAM);
1498         pa_memblockq_flush(s->memblockq);
1499         /*pa_log(__FILE__": flush: %u\n", pa_memblockq_get_length(s->memblockq));*/
1500     }
1501
1502     pa_sink_notify(s->sink_input->sink);
1503     pa_pstream_send_simple_ack(c->pstream, tag);
1504     request_bytes(s);
1505 }
1506
1507 static void command_cork_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1508     struct connection *c = userdata;
1509     uint32_t index;
1510     struct record_stream *s;
1511     int b;
1512     assert(c && t);
1513
1514     if (pa_tagstruct_getu32(t, &index) < 0 ||
1515         pa_tagstruct_get_boolean(t, &b) < 0 ||
1516         !pa_tagstruct_eof(t)) {
1517         protocol_error(c);
1518         return;
1519     }
1520
1521     if (!c->authorized) {
1522         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1523         return;
1524     }
1525
1526     if (!(s = pa_idxset_get_by_index(c->record_streams, index))) {
1527         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1528         return;
1529     }
1530
1531     pa_source_output_cork(s->source_output, b);
1532     pa_pstream_send_simple_ack(c->pstream, tag);
1533 }
1534
1535 static void command_flush_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1536     struct connection *c = userdata;
1537     uint32_t index;
1538     struct record_stream *s;
1539     assert(c && t);
1540
1541     if (pa_tagstruct_getu32(t, &index) < 0 ||
1542         !pa_tagstruct_eof(t)) {
1543         protocol_error(c);
1544         return;
1545     }
1546
1547     if (!c->authorized) {
1548         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1549         return;
1550     }
1551
1552     if (!(s = pa_idxset_get_by_index(c->record_streams, index))) {
1553         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1554         return;
1555     }
1556
1557     pa_memblockq_flush(s->memblockq);
1558     pa_pstream_send_simple_ack(c->pstream, tag);
1559 }
1560
1561 static void command_set_default_sink_or_source(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1562     struct connection *c = userdata;
1563     uint32_t index;
1564     const char *s;
1565     assert(c && t);
1566
1567     if (pa_tagstruct_getu32(t, &index) < 0 ||
1568         pa_tagstruct_gets(t, &s) < 0 || !s ||
1569         !pa_tagstruct_eof(t)) {
1570         protocol_error(c);
1571         return;
1572     }
1573
1574     if (!c->authorized) {
1575         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1576         return;
1577     }
1578
1579     pa_namereg_set_default(c->protocol->core, s, command == PA_COMMAND_SET_DEFAULT_SOURCE ? PA_NAMEREG_SOURCE : PA_NAMEREG_SINK);
1580     pa_pstream_send_simple_ack(c->pstream, tag);
1581 }
1582
1583 static void command_set_stream_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1584     struct connection *c = userdata;
1585     uint32_t index;
1586     const char *name;
1587     assert(c && t);
1588
1589     if (pa_tagstruct_getu32(t, &index) < 0 ||
1590         pa_tagstruct_gets(t, &name) < 0 || !name || 
1591         !pa_tagstruct_eof(t)) {
1592         protocol_error(c);
1593         return;
1594     }
1595     
1596     if (!c->authorized) {
1597         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1598         return;
1599     }
1600
1601     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
1602         struct playback_stream *s;
1603         
1604         if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1605             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1606             return;
1607         }
1608
1609         pa_sink_input_set_name(s->sink_input, name);
1610         
1611     } else {
1612         struct record_stream *s;
1613         
1614         if (!(s = pa_idxset_get_by_index(c->record_streams, index))) {
1615             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1616             return;
1617         }
1618
1619         pa_source_output_set_name(s->source_output, name);
1620     }
1621
1622     pa_pstream_send_simple_ack(c->pstream, tag);
1623 }
1624
1625 static void command_kill(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1626     struct connection *c = userdata;
1627     uint32_t index;
1628     assert(c && t);
1629
1630     if (pa_tagstruct_getu32(t, &index) < 0 ||
1631         !pa_tagstruct_eof(t)) {
1632         protocol_error(c);
1633         return;
1634     }
1635     
1636     if (!c->authorized) {
1637         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1638         return;
1639     }
1640
1641     if (command == PA_COMMAND_KILL_CLIENT) {
1642         struct pa_client *client;
1643         
1644         if (!(client = pa_idxset_get_by_index(c->protocol->core->clients, index))) {
1645             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1646             return;
1647         }
1648
1649         pa_client_kill(client);
1650     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
1651         struct pa_sink_input *s;
1652         
1653         if (!(s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index))) {
1654             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1655             return;
1656         }
1657
1658         pa_sink_input_kill(s);
1659     } else {
1660         struct pa_source_output *s;
1661
1662         assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
1663         
1664         if (!(s = pa_idxset_get_by_index(c->protocol->core->source_outputs, index))) {
1665             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1666             return;
1667         }
1668
1669         pa_source_output_kill(s);
1670     }
1671
1672     pa_pstream_send_simple_ack(c->pstream, tag);
1673 }
1674
1675 static void command_load_module(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1676     struct connection *c = userdata;
1677     struct pa_module *m;
1678     const char *name, *argument;
1679     struct pa_tagstruct *reply;
1680     assert(c && t);
1681
1682     if (pa_tagstruct_gets(t, &name) < 0 || !name ||
1683         pa_tagstruct_gets(t, &argument) < 0 ||
1684         !pa_tagstruct_eof(t)) {
1685         protocol_error(c);
1686         return;
1687     }
1688     
1689     if (!c->authorized) {
1690         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1691         return;
1692     }
1693
1694     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
1695         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INITFAILED);
1696         return;
1697     }
1698
1699     reply = pa_tagstruct_new(NULL, 0);
1700     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1701     pa_tagstruct_putu32(reply, tag);
1702     pa_tagstruct_putu32(reply, m->index);
1703     pa_pstream_send_tagstruct(c->pstream, reply);
1704 }
1705
1706 static void command_unload_module(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1707     struct connection *c = userdata;
1708     uint32_t index;
1709     struct pa_module *m;
1710     assert(c && t);
1711
1712     if (pa_tagstruct_getu32(t, &index) < 0 ||
1713         !pa_tagstruct_eof(t)) {
1714         protocol_error(c);
1715         return;
1716     }
1717     
1718     if (!c->authorized) {
1719         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1720         return;
1721     }
1722
1723     if (!(m = pa_idxset_get_by_index(c->protocol->core->modules, index))) {
1724         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1725         return;
1726     }
1727
1728     pa_module_unload_request(m);
1729     pa_pstream_send_simple_ack(c->pstream, tag);
1730 }
1731
1732 static void command_add_autoload(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1733     struct connection *c = userdata;
1734     const char *name, *module, *argument;
1735     uint32_t type;
1736     uint32_t index;
1737     struct pa_tagstruct *reply;
1738     assert(c && t);
1739
1740     if (pa_tagstruct_gets(t, &name) < 0 || !name ||
1741         pa_tagstruct_getu32(t, &type) < 0 || type > 1 ||
1742         pa_tagstruct_gets(t, &module) < 0 || !module ||
1743         pa_tagstruct_gets(t, &argument) < 0 ||
1744         !pa_tagstruct_eof(t)) {
1745         protocol_error(c);
1746         return;
1747     }
1748     
1749     if (!c->authorized) {
1750         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1751         return;
1752     }
1753
1754     if (pa_autoload_add(c->protocol->core, name, type == 0 ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE, module, argument, &index) < 0) {
1755         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
1756         return;
1757     }
1758
1759     reply = pa_tagstruct_new(NULL, 0);
1760     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1761     pa_tagstruct_putu32(reply, tag);
1762     pa_tagstruct_putu32(reply, index);
1763     pa_pstream_send_tagstruct(c->pstream, reply);
1764 }
1765
1766 static void command_remove_autoload(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1767     struct connection *c = userdata;
1768     const char *name = NULL;
1769     uint32_t type, index = PA_IDXSET_INVALID;
1770     int r;
1771     assert(c && t);
1772
1773     if ((pa_tagstruct_getu32(t, &index) < 0 &&
1774         (pa_tagstruct_gets(t, &name) < 0 ||
1775          pa_tagstruct_getu32(t, &type) < 0)) ||
1776         (!name && index == PA_IDXSET_INVALID) ||
1777         (name && type > 1) ||
1778         !pa_tagstruct_eof(t)) {
1779         protocol_error(c);
1780         return;
1781     }
1782     
1783     if (!c->authorized) {
1784         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1785         return;
1786     }
1787
1788     if (name) 
1789         r = pa_autoload_remove_by_name(c->protocol->core, name, type == 0 ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE);
1790     else
1791         r = pa_autoload_remove_by_index(c->protocol->core, index);
1792
1793     if (r < 0) {
1794         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1795         return;
1796     }
1797
1798     pa_pstream_send_simple_ack(c->pstream, tag);
1799 }
1800
1801 static void autoload_fill_tagstruct(struct pa_tagstruct *t, const struct pa_autoload_entry *e) {
1802     assert(t && e);
1803
1804     pa_tagstruct_putu32(t, e->index);
1805     pa_tagstruct_puts(t, e->name);
1806     pa_tagstruct_putu32(t, e->type == PA_NAMEREG_SINK ? 0 : 1);
1807     pa_tagstruct_puts(t, e->module);
1808     pa_tagstruct_puts(t, e->argument);
1809 }
1810
1811 static void command_get_autoload_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1812     struct connection *c = userdata;
1813     const struct pa_autoload_entry *a = NULL;
1814     uint32_t type, index;
1815     const char *name;
1816     struct pa_tagstruct *reply;
1817     assert(c && t);
1818
1819     if ((pa_tagstruct_getu32(t, &index) < 0 &&
1820         (pa_tagstruct_gets(t, &name) < 0 ||
1821          pa_tagstruct_getu32(t, &type) < 0)) ||
1822         (!name && index == PA_IDXSET_INVALID) ||
1823         (name && type > 1) ||
1824         !pa_tagstruct_eof(t)) {
1825         protocol_error(c);
1826         return;
1827     }
1828
1829     if (!c->authorized) {
1830         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1831         return;
1832     }
1833
1834
1835     if (name)
1836         a = pa_autoload_get_by_name(c->protocol->core, name, type == 0 ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE);
1837     else
1838         a = pa_autoload_get_by_index(c->protocol->core, index);
1839
1840     if (!a) {
1841         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1842         return;
1843     }
1844
1845     reply = pa_tagstruct_new(NULL, 0);
1846     assert(reply);
1847     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1848     pa_tagstruct_putu32(reply, tag);
1849     autoload_fill_tagstruct(reply, a);
1850     pa_pstream_send_tagstruct(c->pstream, reply);
1851 }
1852
1853 static void command_get_autoload_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1854     struct connection *c = userdata;
1855     struct pa_tagstruct *reply;
1856     assert(c && t);
1857
1858     if (!pa_tagstruct_eof(t)) {
1859         protocol_error(c);
1860         return;
1861     }
1862     
1863     if (!c->authorized) {
1864         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1865         return;
1866     }
1867
1868     reply = pa_tagstruct_new(NULL, 0);
1869     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1870     pa_tagstruct_putu32(reply, tag);
1871
1872     if (c->protocol->core->autoload_hashmap) {
1873         struct pa_autoload_entry *a;
1874         void *state = NULL;
1875
1876         while ((a = pa_hashmap_iterate(c->protocol->core->autoload_hashmap, &state, NULL)))
1877             autoload_fill_tagstruct(reply, a);
1878     }
1879     
1880     pa_pstream_send_tagstruct(c->pstream, reply);
1881 }
1882
1883 /*** pstream callbacks ***/
1884
1885 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
1886     struct connection *c = userdata;
1887     assert(p && packet && packet->data && c);
1888
1889     if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
1890         pa_log(__FILE__": invalid packet.\n");
1891         connection_free(c);
1892     }
1893 }
1894
1895 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
1896     struct connection *c = userdata;
1897     struct output_stream *stream;
1898     assert(p && chunk && userdata);
1899     
1900     if (!(stream = pa_idxset_get_by_index(c->output_streams, channel))) {
1901         pa_log(__FILE__": client sent block for invalid stream.\n");
1902         connection_free(c);
1903         return;
1904     }
1905
1906     if (stream->type == PLAYBACK_STREAM) {
1907         struct playback_stream *p = (struct playback_stream*) stream;
1908         if (chunk->length >= p->requested_bytes)
1909             p->requested_bytes = 0;
1910         else
1911             p->requested_bytes -= chunk->length;
1912         
1913         pa_memblockq_push_align(p->memblockq, chunk, delta);
1914         assert(p->sink_input);
1915 /*         pa_log(__FILE__": after_recv: %u\n", pa_memblockq_get_length(p->memblockq)); */
1916
1917         pa_sink_notify(p->sink_input->sink);
1918 /*          pa_log(__FILE__": Recieved %u bytes.\n", chunk->length);  */
1919
1920     } else {
1921         struct upload_stream *u = (struct upload_stream*) stream;
1922         size_t l;
1923         assert(u->type == UPLOAD_STREAM);
1924
1925         if (!u->memchunk.memblock) {
1926             if (u->length == chunk->length) {
1927                 u->memchunk = *chunk;
1928                 pa_memblock_ref(u->memchunk.memblock);
1929                 u->length = 0;
1930             } else {
1931                 u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
1932                 u->memchunk.index = u->memchunk.length = 0;
1933             }
1934         }
1935         
1936         assert(u->memchunk.memblock);
1937         
1938         l = u->length; 
1939         if (l > chunk->length)
1940             l = chunk->length;
1941
1942         if (l > 0) {
1943             memcpy((uint8_t*) u->memchunk.memblock->data + u->memchunk.index + u->memchunk.length,
1944                    (uint8_t*) chunk->memblock->data+chunk->index, l);
1945             u->memchunk.length += l;
1946             u->length -= l;
1947         }
1948     }
1949 }
1950
1951 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
1952     struct connection *c = userdata;
1953     assert(p && c);
1954     connection_free(c);
1955
1956 /*    pa_log(__FILE__": connection died.\n");*/
1957 }
1958
1959
1960 static void pstream_drain_callback(struct pa_pstream *p, void *userdata) {
1961     struct connection *c = userdata;
1962     assert(p && c);
1963
1964     send_memblock(c);
1965 }
1966
1967 /*** client callbacks ***/
1968
1969 static void client_kill_cb(struct pa_client *c) {
1970     assert(c && c->userdata);
1971     connection_free(c->userdata);
1972 }
1973
1974 /*** socket server callbacks ***/
1975
1976 static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) {
1977     struct pa_protocol_native *p = userdata;
1978     struct connection *c;
1979     assert(io && p);
1980
1981     c = pa_xmalloc(sizeof(struct connection));
1982
1983     c->authorized =!! p->public;
1984     c->protocol = p;
1985     assert(p->core);
1986     c->client = pa_client_new(p->core, "NATIVE", "Client");
1987     assert(c->client);
1988     c->client->kill = client_kill_cb;
1989     c->client->userdata = c;
1990     c->client->owner = p->module;
1991     
1992     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->memblock_stat);
1993     assert(c->pstream);
1994
1995     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
1996     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
1997     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
1998     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
1999
2000     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
2001     assert(c->pdispatch);
2002
2003     c->record_streams = pa_idxset_new(NULL, NULL);
2004     c->output_streams = pa_idxset_new(NULL, NULL);
2005     assert(c->record_streams && c->output_streams);
2006
2007     c->rrobin_index = PA_IDXSET_INVALID;
2008     c->subscription = NULL;
2009
2010     pa_idxset_put(p->connections, c, NULL);
2011 }
2012
2013 /*** module entry points ***/
2014
2015 static int load_key(struct pa_protocol_native*p, const char*fn) {
2016     assert(p);
2017
2018     p->auth_cookie_in_property = 0;
2019     
2020     if (!fn && pa_authkey_prop_get(p->core, PA_NATIVE_COOKIE_PROPERTY_NAME, p->auth_cookie, sizeof(p->auth_cookie)) >= 0) {
2021         pa_log(__FILE__": using already loaded auth cookie.\n");
2022         pa_authkey_prop_ref(p->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
2023         p->auth_cookie_in_property = 1;
2024         return 0;
2025     }
2026     
2027     if (!fn)
2028         fn = PA_NATIVE_COOKIE_FILE;
2029
2030     if (pa_authkey_load_from_home(fn, p->auth_cookie, sizeof(p->auth_cookie)) < 0)
2031         return -1;
2032
2033     pa_log(__FILE__": loading cookie from disk.\n");
2034
2035     if (pa_authkey_prop_put(p->core, PA_NATIVE_COOKIE_PROPERTY_NAME, p->auth_cookie, sizeof(p->auth_cookie)) >= 0)
2036         p->auth_cookie_in_property = 1;
2037         
2038     return 0;
2039 }
2040
2041 static struct pa_protocol_native* protocol_new_internal(struct pa_core *c, struct pa_module *m, struct pa_modargs *ma) {
2042     struct pa_protocol_native *p;
2043     int public = 0;
2044     assert(c && ma);
2045
2046     if (pa_modargs_get_value_boolean(ma, "public", &public) < 0) {
2047         pa_log(__FILE__": public= expects a boolean argument.\n");
2048         return NULL;
2049     }
2050     
2051     p = pa_xmalloc(sizeof(struct pa_protocol_native));
2052     p->core = c;
2053     p->module = m;
2054     p->public = public;
2055     p->server = NULL;
2056
2057     if (load_key(p, pa_modargs_get_value(ma, "cookie", NULL)) < 0) {
2058         pa_xfree(p);
2059         return NULL;
2060     }
2061
2062     p->connections = pa_idxset_new(NULL, NULL);
2063     assert(p->connections);
2064
2065     return p;
2066 }
2067
2068 struct pa_protocol_native* pa_protocol_native_new(struct pa_core *core, struct pa_socket_server *server, struct pa_module *m, struct pa_modargs *ma) {
2069     char t[256];
2070     struct pa_protocol_native *p;
2071
2072     if (!(p = protocol_new_internal(core, m, ma)))
2073         return NULL;
2074     
2075     p->server = server;
2076     pa_socket_server_set_callback(p->server, on_connection, p);
2077
2078     if (pa_socket_server_get_address(p->server, t, sizeof(t))) {
2079         struct pa_strlist *l;
2080         l = pa_property_get(core, PA_NATIVE_SERVER_PROPERTY_NAME);
2081         l = pa_strlist_prepend(l, t);
2082         pa_property_replace(core, PA_NATIVE_SERVER_PROPERTY_NAME, l);
2083     }
2084     
2085     return p;
2086 }
2087
2088 void pa_protocol_native_free(struct pa_protocol_native *p) {
2089     struct connection *c;
2090     assert(p);
2091
2092     while ((c = pa_idxset_first(p->connections, NULL)))
2093         connection_free(c);
2094     pa_idxset_free(p->connections, NULL, NULL);
2095
2096     if (p->server) {
2097         char t[256];
2098         
2099         if (pa_socket_server_get_address(p->server, t, sizeof(t))) {
2100             struct pa_strlist *l;
2101             l = pa_property_get(p->core, PA_NATIVE_SERVER_PROPERTY_NAME);
2102             l = pa_strlist_remove(l, t);
2103
2104             if (l)
2105                 pa_property_replace(p->core, PA_NATIVE_SERVER_PROPERTY_NAME, l);
2106             else
2107                 pa_property_remove(p->core, PA_NATIVE_SERVER_PROPERTY_NAME);
2108         }
2109         
2110         pa_socket_server_unref(p->server);
2111     }
2112
2113     if (p->auth_cookie_in_property)
2114         pa_authkey_prop_unref(p->core, PA_NATIVE_COOKIE_PROPERTY_NAME);
2115
2116     pa_xfree(p);
2117 }
2118
2119 struct pa_protocol_native* pa_protocol_native_new_iochannel(struct pa_core*core, struct pa_iochannel *io, struct pa_module *m, struct pa_modargs *ma) {
2120     struct pa_protocol_native *p;
2121
2122     if (!(p = protocol_new_internal(core, m, ma)))
2123         return NULL;
2124
2125     on_connection(NULL, io, p);
2126     
2127     return p;
2128 }