fix public= on native and esound protocol
[profile/ivi/pulseaudio-panda.git] / polyp / protocol-native.c
1 /* $Id$ */
2
3 /***
4   This file is part of polypaudio.
5  
6   polypaudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   polypaudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU General Public License
17   along with polypaudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <string.h>
27 #include <stdio.h>
28 #include <assert.h>
29 #include <stdlib.h>
30
31 #include "protocol-native.h"
32 #include "native-common.h"
33 #include "packet.h"
34 #include "client.h"
35 #include "source-output.h"
36 #include "sink-input.h"
37 #include "pstream.h"
38 #include "tagstruct.h"
39 #include "pdispatch.h"
40 #include "pstream-util.h"
41 #include "authkey.h"
42 #include "namereg.h"
43 #include "scache.h"
44 #include "xmalloc.h"
45 #include "util.h"
46 #include "subscribe.h"
47 #include "log.h"
48
49 struct connection;
50 struct pa_protocol_native;
51
52 struct record_stream {
53     struct connection *connection;
54     uint32_t index;
55     struct pa_source_output *source_output;
56     struct pa_memblockq *memblockq;
57     size_t fragment_size;
58 };
59
60 struct playback_stream {
61     int type;
62     struct connection *connection;
63     uint32_t index;
64     struct pa_sink_input *sink_input;
65     struct pa_memblockq *memblockq;
66     size_t requested_bytes;
67     int drain_request;
68     uint32_t drain_tag;
69 };
70
71 struct upload_stream {
72     int type;
73     struct connection *connection;
74     uint32_t index;
75     struct pa_memchunk memchunk;
76     size_t length;
77     char *name;
78     struct pa_sample_spec sample_spec;
79 };
80
81 struct output_stream {
82     int type;
83 };
84
85 enum {
86     UPLOAD_STREAM,
87     PLAYBACK_STREAM
88 };
89
90 struct connection {
91     int authorized;
92     struct pa_protocol_native *protocol;
93     struct pa_client *client;
94     struct pa_pstream *pstream;
95     struct pa_pdispatch *pdispatch;
96     struct pa_idxset *record_streams, *output_streams;
97     uint32_t rrobin_index;
98     struct pa_subscription *subscription;
99 };
100
101 struct pa_protocol_native {
102     struct pa_module *module;
103     int public;
104     struct pa_core *core;
105     struct pa_socket_server *server;
106     struct pa_idxset *connections;
107     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
108 };
109
110 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
111 static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
112 static void sink_input_kill_cb(struct pa_sink_input *i);
113 static pa_usec_t sink_input_get_latency_cb(struct pa_sink_input *i);
114
115 static void request_bytes(struct playback_stream*s);
116
117 static void source_output_kill_cb(struct pa_source_output *o);
118 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk);
119
120 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
121 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
122 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
123 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
124 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
125 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
126 static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
127 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
128 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
129 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
130 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
131 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
132 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
133 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
134 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
135 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
136 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
137 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
138 static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
139 static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
140 static void command_flush_or_trigger_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
141 static void command_set_default_sink_or_source(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
142
143 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
144     [PA_COMMAND_ERROR] = { NULL },
145     [PA_COMMAND_TIMEOUT] = { NULL },
146     [PA_COMMAND_REPLY] = { NULL },
147     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
148     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_stream },
149     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = { command_drain_playback_stream },
150     [PA_COMMAND_CREATE_RECORD_STREAM] = { command_create_record_stream },
151     [PA_COMMAND_DELETE_RECORD_STREAM] = { command_delete_stream },
152     [PA_COMMAND_AUTH] = { command_auth },
153     [PA_COMMAND_REQUEST] = { NULL },
154     [PA_COMMAND_EXIT] = { command_exit },
155     [PA_COMMAND_SET_NAME] = { command_set_name },
156     [PA_COMMAND_LOOKUP_SINK] = { command_lookup },
157     [PA_COMMAND_LOOKUP_SOURCE] = { command_lookup },
158     [PA_COMMAND_STAT] = { command_stat },
159     [PA_COMMAND_GET_PLAYBACK_LATENCY] = { command_get_playback_latency },
160     [PA_COMMAND_CREATE_UPLOAD_STREAM] = { command_create_upload_stream },
161     [PA_COMMAND_DELETE_UPLOAD_STREAM] = { command_delete_stream },
162     [PA_COMMAND_FINISH_UPLOAD_STREAM] = { command_finish_upload_stream },
163     [PA_COMMAND_PLAY_SAMPLE] = { command_play_sample },
164     [PA_COMMAND_REMOVE_SAMPLE] = { command_remove_sample },
165     [PA_COMMAND_GET_SINK_INFO] = { command_get_info },
166     [PA_COMMAND_GET_SOURCE_INFO] = { command_get_info },
167     [PA_COMMAND_GET_CLIENT_INFO] = { command_get_info },
168     [PA_COMMAND_GET_MODULE_INFO] = { command_get_info },
169     [PA_COMMAND_GET_SINK_INPUT_INFO] = { command_get_info },
170     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = { command_get_info },
171     [PA_COMMAND_GET_SAMPLE_INFO] = { command_get_info },
172     [PA_COMMAND_GET_SINK_INFO_LIST] = { command_get_info_list },
173     [PA_COMMAND_GET_SOURCE_INFO_LIST] = { command_get_info_list },
174     [PA_COMMAND_GET_MODULE_INFO_LIST] = { command_get_info_list },
175     [PA_COMMAND_GET_CLIENT_INFO_LIST] = { command_get_info_list },
176     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = { command_get_info_list },
177     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = { command_get_info_list },
178     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = { command_get_info_list },
179     [PA_COMMAND_GET_SERVER_INFO] = { command_get_server_info },
180     [PA_COMMAND_SUBSCRIBE] = { command_subscribe },
181     [PA_COMMAND_SET_SINK_VOLUME] = { command_set_volume },
182     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = { command_set_volume },
183     [PA_COMMAND_CORK_PLAYBACK_STREAM] = { command_cork_playback_stream },
184     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = { command_flush_or_trigger_playback_stream },
185     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = { command_flush_or_trigger_playback_stream },
186     [PA_COMMAND_SET_DEFAULT_SINK] = { command_set_default_sink_or_source },
187     [PA_COMMAND_SET_DEFAULT_SOURCE] = { command_set_default_sink_or_source },
188 };
189
190 /* structure management */
191
192 static struct upload_stream* upload_stream_new(struct connection *c, const struct pa_sample_spec *ss, const char *name, size_t length) {
193     struct upload_stream *s;
194     assert(c && ss && name && length);
195     
196     s = pa_xmalloc(sizeof(struct upload_stream));
197     s->type = UPLOAD_STREAM;
198     s->connection = c;
199     s->sample_spec = *ss;
200     s->name = pa_xstrdup(name);
201
202     s->memchunk.memblock = NULL;
203     s->memchunk.index = 0;
204     s->memchunk.length = 0;
205
206     s->length = length;
207     
208     pa_idxset_put(c->output_streams, s, &s->index);
209     return s;
210 }
211
212 static void upload_stream_free(struct upload_stream *o) {
213     assert(o && o->connection);
214
215     pa_idxset_remove_by_data(o->connection->output_streams, o, NULL);
216
217     pa_xfree(o->name);
218     
219     if (o->memchunk.memblock)
220         pa_memblock_unref(o->memchunk.memblock);
221     
222     pa_xfree(o);
223 }
224
225 static struct record_stream* record_stream_new(struct connection *c, struct pa_source *source, const struct pa_sample_spec *ss, const char *name, size_t maxlength, size_t fragment_size) {
226     struct record_stream *s;
227     struct pa_source_output *source_output;
228     size_t base;
229     assert(c && source && ss && name && maxlength);
230
231     if (!(source_output = pa_source_output_new(source, name, ss)))
232         return NULL;
233
234     s = pa_xmalloc(sizeof(struct record_stream));
235     s->connection = c;
236     s->source_output = source_output;
237     s->source_output->push = source_output_push_cb;
238     s->source_output->kill = source_output_kill_cb;
239     s->source_output->userdata = s;
240     s->source_output->owner = c->protocol->module;
241     s->source_output->client = c->client;
242
243     s->memblockq = pa_memblockq_new(maxlength, 0, base = pa_frame_size(ss), 0, 0, c->protocol->core->memblock_stat);
244     assert(s->memblockq);
245
246     s->fragment_size = (fragment_size/base)*base;
247     if (!s->fragment_size)
248         s->fragment_size = base;
249
250     pa_idxset_put(c->record_streams, s, &s->index);
251     return s;
252 }
253
254 static void record_stream_free(struct record_stream* r) {
255     assert(r && r->connection);
256
257     pa_idxset_remove_by_data(r->connection->record_streams, r, NULL);
258     pa_source_output_free(r->source_output);
259     pa_memblockq_free(r->memblockq);
260     pa_xfree(r);
261 }
262
263 static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, const struct pa_sample_spec *ss, const char *name,
264                                                    size_t maxlength,
265                                                    size_t tlength,
266                                                    size_t prebuf,
267                                                    size_t minreq) {
268     struct playback_stream *s;
269     struct pa_sink_input *sink_input;
270     assert(c && sink && ss && name && maxlength);
271
272     if (!(sink_input = pa_sink_input_new(sink, name, ss)))
273         return NULL;
274     
275     s = pa_xmalloc(sizeof(struct playback_stream));
276     s->type = PLAYBACK_STREAM;
277     s->connection = c;
278     s->sink_input = sink_input;
279     
280     s->sink_input->peek = sink_input_peek_cb;
281     s->sink_input->drop = sink_input_drop_cb;
282     s->sink_input->kill = sink_input_kill_cb;
283     s->sink_input->get_latency = sink_input_get_latency_cb;
284     s->sink_input->userdata = s;
285     s->sink_input->owner = c->protocol->module;
286     s->sink_input->client = c->client;
287     
288     s->memblockq = pa_memblockq_new(maxlength, tlength, pa_frame_size(ss), prebuf, minreq, c->protocol->core->memblock_stat);
289     assert(s->memblockq);
290
291     s->requested_bytes = 0;
292     s->drain_request = 0;
293     
294     pa_idxset_put(c->output_streams, s, &s->index);
295     return s;
296 }
297
298 static void playback_stream_free(struct playback_stream* p) {
299     assert(p && p->connection);
300
301     if (p->drain_request)
302         pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERROR_NOENTITY);
303
304     pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
305     pa_sink_input_free(p->sink_input);
306     pa_memblockq_free(p->memblockq);
307     pa_xfree(p);
308 }
309
310 static void connection_free(struct connection *c) {
311     struct record_stream *r;
312     struct output_stream *o;
313     assert(c && c->protocol);
314
315     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
316     while ((r = pa_idxset_first(c->record_streams, NULL)))
317         record_stream_free(r);
318     pa_idxset_free(c->record_streams, NULL, NULL);
319
320     while ((o = pa_idxset_first(c->output_streams, NULL)))
321         if (o->type == PLAYBACK_STREAM)
322             playback_stream_free((struct playback_stream*) o);
323         else
324             upload_stream_free((struct upload_stream*) o);
325     pa_idxset_free(c->output_streams, NULL, NULL);
326
327     pa_pdispatch_unref(c->pdispatch);
328     pa_pstream_close(c->pstream);
329     pa_pstream_unref(c->pstream);
330     pa_client_free(c->client);
331
332     if (c->subscription)
333         pa_subscription_free(c->subscription);
334     
335     pa_xfree(c);
336 }
337
338 static void request_bytes(struct playback_stream *s) {
339     struct pa_tagstruct *t;
340     size_t l;
341     assert(s);
342
343     if (!(l = pa_memblockq_missing(s->memblockq)))
344         return;
345     
346     if (l <= s->requested_bytes)
347         return;
348
349     l -= s->requested_bytes;
350
351     if (l < pa_memblockq_get_minreq(s->memblockq))
352         return;
353     
354     s->requested_bytes += l;
355
356     t = pa_tagstruct_new(NULL, 0);
357     assert(t);
358     pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
359     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
360     pa_tagstruct_putu32(t, s->index);
361     pa_tagstruct_putu32(t, l);
362     pa_pstream_send_tagstruct(s->connection->pstream, t);
363
364 /*     pa_log(__FILE__": Requesting %u bytes\n", l); */
365 }
366
367 static void send_memblock(struct connection *c) {
368     uint32_t start;
369     struct record_stream *r;
370
371     start = PA_IDXSET_INVALID;
372     for (;;) {
373         struct pa_memchunk chunk;
374         
375         if (!(r = pa_idxset_rrobin(c->record_streams, &c->rrobin_index)))
376             return;
377
378         if (start == PA_IDXSET_INVALID)
379             start = c->rrobin_index;
380         else if (start == c->rrobin_index)
381             return;
382
383         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
384             struct pa_memchunk schunk = chunk;
385             
386             if (schunk.length > r->fragment_size)
387                 schunk.length = r->fragment_size;
388
389             pa_pstream_send_memblock(c->pstream, r->index, 0, &schunk);
390             pa_memblockq_drop(r->memblockq, &chunk, schunk.length);
391             pa_memblock_unref(schunk.memblock);
392             
393             return;
394         }
395     }
396 }
397
398 static void send_playback_stream_killed(struct playback_stream *p) {
399     struct pa_tagstruct *t;
400     assert(p);
401
402     t = pa_tagstruct_new(NULL, 0);
403     assert(t);
404     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
405     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
406     pa_tagstruct_putu32(t, p->index);
407     pa_pstream_send_tagstruct(p->connection->pstream, t);
408 }
409
410 static void send_record_stream_killed(struct record_stream *r) {
411     struct pa_tagstruct *t;
412     assert(r);
413
414     t = pa_tagstruct_new(NULL, 0);
415     assert(t);
416     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
417     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
418     pa_tagstruct_putu32(t, r->index);
419     pa_pstream_send_tagstruct(r->connection->pstream, t);
420 }
421
422
423 /*** sinkinput callbacks ***/
424
425 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) {
426     struct playback_stream *s;
427     assert(i && i->userdata && chunk);
428     s = i->userdata;
429
430     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
431         return -1;
432
433     return 0;
434 }
435
436 static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
437     struct playback_stream *s;
438     assert(i && i->userdata && length);
439     s = i->userdata;
440
441     pa_memblockq_drop(s->memblockq, chunk, length);
442     request_bytes(s);
443
444     if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
445         pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag);
446         s->drain_request = 0;
447     }
448
449     /*pa_log(__FILE__": after_drop: %u\n", pa_memblockq_get_length(s->memblockq));*/
450 }
451
452 static void sink_input_kill_cb(struct pa_sink_input *i) {
453     assert(i && i->userdata);
454     send_playback_stream_killed((struct playback_stream *) i->userdata);
455     playback_stream_free((struct playback_stream *) i->userdata);
456 }
457
458 static pa_usec_t sink_input_get_latency_cb(struct pa_sink_input *i) {
459     struct playback_stream *s;
460     assert(i && i->userdata);
461     s = i->userdata;
462
463     /*pa_log(__FILE__": get_latency: %u\n", pa_memblockq_get_length(s->memblockq));*/
464     
465     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
466 }
467
468 /*** source_output callbacks ***/
469
470 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk) {
471     struct record_stream *s;
472     assert(o && o->userdata && chunk);
473     s = o->userdata;
474     
475     pa_memblockq_push_align(s->memblockq, chunk, 0);
476     if (!pa_pstream_is_pending(s->connection->pstream))
477         send_memblock(s->connection);
478 }
479
480 static void source_output_kill_cb(struct pa_source_output *o) {
481     assert(o && o->userdata);
482     send_record_stream_killed((struct record_stream *) o->userdata);
483     record_stream_free((struct record_stream *) o->userdata);
484 }
485
486 /*** pdispatch callbacks ***/
487
488 static void protocol_error(struct connection *c) {
489     pa_log(__FILE__": protocol error, kicking client\n");
490     connection_free(c);
491 }
492
493 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
494     struct connection *c = userdata;
495     struct playback_stream *s;
496     size_t maxlength, tlength, prebuf, minreq;
497     uint32_t sink_index;
498     const char *name, *sink_name;
499     struct pa_sample_spec ss;
500     struct pa_tagstruct *reply;
501     struct pa_sink *sink;
502     assert(c && t && c->protocol && c->protocol->core);
503     
504     if (pa_tagstruct_gets(t, &name) < 0 ||
505         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
506         pa_tagstruct_getu32(t, &sink_index) < 0 ||
507         pa_tagstruct_gets(t, &sink_name) < 0 ||
508         pa_tagstruct_getu32(t, &maxlength) < 0 ||
509         pa_tagstruct_getu32(t, &tlength) < 0 ||
510         pa_tagstruct_getu32(t, &prebuf) < 0 ||
511         pa_tagstruct_getu32(t, &minreq) < 0 ||
512         !pa_tagstruct_eof(t)) {
513         protocol_error(c);
514         return;
515     }
516
517     if (!c->authorized) {
518         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
519         return;
520     }
521
522     if (sink_index != (uint32_t) -1)
523         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
524     else
525         sink = pa_namereg_get(c->protocol->core, *sink_name ? sink_name : NULL, PA_NAMEREG_SINK, 1);
526
527     if (!sink) {
528         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
529         return;
530     }
531     
532     if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, tlength, prebuf, minreq))) {
533         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
534         return;
535     }
536     
537     reply = pa_tagstruct_new(NULL, 0);
538     assert(reply);
539     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
540     pa_tagstruct_putu32(reply, tag);
541     pa_tagstruct_putu32(reply, s->index);
542     assert(s->sink_input);
543     pa_tagstruct_putu32(reply, s->sink_input->index);
544     pa_tagstruct_putu32(reply, s->requested_bytes = pa_memblockq_missing(s->memblockq));
545     pa_pstream_send_tagstruct(c->pstream, reply);
546     request_bytes(s);
547 }
548
549 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
550     struct connection *c = userdata;
551     uint32_t channel;
552     assert(c && t);
553     
554     if (pa_tagstruct_getu32(t, &channel) < 0 ||
555         !pa_tagstruct_eof(t)) {
556         protocol_error(c);
557         return;
558     }
559
560     if (!c->authorized) {
561         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
562         return;
563     }
564
565     if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
566         struct playback_stream *s;
567         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
568             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
569             return;
570         }
571
572         playback_stream_free(s);
573     } else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
574         struct record_stream *s;
575         if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
576             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
577             return;
578         }
579
580         record_stream_free(s);
581     } else {
582         struct upload_stream *s;
583         assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
584         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
585             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
586             return;
587         }
588
589         upload_stream_free(s);
590     }
591             
592     pa_pstream_send_simple_ack(c->pstream, tag);
593 }
594
595 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
596     struct connection *c = userdata;
597     struct record_stream *s;
598     size_t maxlength, fragment_size;
599     uint32_t source_index;
600     const char *name, *source_name;
601     struct pa_sample_spec ss;
602     struct pa_tagstruct *reply;
603     struct pa_source *source;
604     assert(c && t && c->protocol && c->protocol->core);
605     
606     if (pa_tagstruct_gets(t, &name) < 0 ||
607         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
608         pa_tagstruct_getu32(t, &source_index) < 0 ||
609         pa_tagstruct_gets(t, &source_name) < 0 ||
610         pa_tagstruct_getu32(t, &maxlength) < 0 ||
611         pa_tagstruct_getu32(t, &fragment_size) < 0 ||
612         !pa_tagstruct_eof(t)) {
613         protocol_error(c);
614         return;
615     }
616
617     if (!c->authorized) {
618         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
619         return;
620     }
621
622     if (source_index != (uint32_t) -1)
623         source = pa_idxset_get_by_index(c->protocol->core->sources, source_index);
624     else
625         source = pa_namereg_get(c->protocol->core, *source_name ? source_name : NULL, PA_NAMEREG_SOURCE, 1);
626
627     if (!source) {
628         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
629         return;
630     }
631     
632     if (!(s = record_stream_new(c, source, &ss, name, maxlength, fragment_size))) {
633         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
634         return;
635     }
636     
637     reply = pa_tagstruct_new(NULL, 0);
638     assert(reply);
639     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
640     pa_tagstruct_putu32(reply, tag);
641     pa_tagstruct_putu32(reply, s->index);
642     assert(s->source_output);
643     pa_tagstruct_putu32(reply, s->source_output->index);
644     pa_pstream_send_tagstruct(c->pstream, reply);
645 }
646
647 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
648     struct connection *c = userdata;
649     assert(c && t);
650     
651     if (!pa_tagstruct_eof(t)) {
652         protocol_error(c);
653         return;
654     }
655
656     if (!c->authorized) {
657         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
658         return;
659     }
660     
661     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
662     c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
663     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
664     return;
665 }
666
667 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
668     struct connection *c = userdata;
669     const void*cookie;
670     assert(c && t);
671
672     if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
673         !pa_tagstruct_eof(t)) {
674         protocol_error(c);
675         return;
676     }
677
678     if (!c->authorized) {
679         if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
680             pa_log(__FILE__": Denied access to client with invalid authorization key.\n");
681             pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
682             return;
683         }
684         
685         c->authorized = 1;
686     }
687     
688     pa_pstream_send_simple_ack(c->pstream, tag);
689     return;
690 }
691
692 static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
693     struct connection *c = userdata;
694     const char *name;
695     assert(c && t);
696
697     if (pa_tagstruct_gets(t, &name) < 0 ||
698         !pa_tagstruct_eof(t)) {
699         protocol_error(c);
700         return;
701     }
702
703     pa_client_rename(c->client, name);
704     pa_pstream_send_simple_ack(c->pstream, tag);
705     return;
706 }
707
708 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
709     struct connection *c = userdata;
710     const char *name;
711     uint32_t index = PA_IDXSET_INVALID;
712     assert(c && t);
713
714     if (pa_tagstruct_gets(t, &name) < 0 ||
715         !pa_tagstruct_eof(t)) {
716         protocol_error(c);
717         return;
718     }
719
720     if (!c->authorized) {
721         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
722         return;
723     }
724
725     if (command == PA_COMMAND_LOOKUP_SINK) {
726         struct pa_sink *sink;
727         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1)))
728             index = sink->index;
729     } else {
730         struct pa_source *source;
731         assert(command == PA_COMMAND_LOOKUP_SOURCE);
732         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1)))
733             index = source->index;
734     }
735
736     if (index == PA_IDXSET_INVALID)
737         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
738     else {
739         struct pa_tagstruct *reply;
740         reply = pa_tagstruct_new(NULL, 0);
741         assert(reply);
742         pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
743         pa_tagstruct_putu32(reply, tag);
744         pa_tagstruct_putu32(reply, index);
745         pa_pstream_send_tagstruct(c->pstream, reply);
746     }
747 }
748
749 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
750     struct connection *c = userdata;
751     uint32_t index;
752     struct playback_stream *s;
753     assert(c && t);
754
755     if (pa_tagstruct_getu32(t, &index) < 0 ||
756         !pa_tagstruct_eof(t)) {
757         protocol_error(c);
758         return;
759     }
760
761     if (!c->authorized) {
762         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
763         return;
764     }
765
766     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
767         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
768         return;
769     }
770
771     s->drain_request = 0;
772
773     pa_memblockq_prebuf_disable(s->memblockq);
774     
775     if (!pa_memblockq_is_readable(s->memblockq)) {
776         pa_pstream_send_simple_ack(c->pstream, tag);
777     } else {
778         s->drain_request = 1;
779         s->drain_tag = tag;
780
781         pa_sink_notify(s->sink_input->sink);
782     }
783
784
785 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
786     struct connection *c = userdata;
787     struct pa_tagstruct *reply;
788     assert(c && t);
789
790     if (!pa_tagstruct_eof(t)) {
791         protocol_error(c);
792         return;
793     }
794
795     if (!c->authorized) {
796         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
797         return;
798     }
799
800     reply = pa_tagstruct_new(NULL, 0);
801     assert(reply);
802     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
803     pa_tagstruct_putu32(reply, tag);
804     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total);
805     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total_size);
806     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated);
807     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated_size);
808     pa_tagstruct_putu32(reply, pa_scache_total_size(c->protocol->core));
809     pa_pstream_send_tagstruct(c->pstream, reply);
810 }
811
812 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
813     struct connection *c = userdata;
814     struct pa_tagstruct *reply;
815     struct playback_stream *s;
816     struct timeval tv, now;
817     uint32_t index;
818     assert(c && t);
819
820     if (pa_tagstruct_getu32(t, &index) < 0 ||
821         pa_tagstruct_get_timeval(t, &tv) < 0 ||
822         !pa_tagstruct_eof(t)) {
823         protocol_error(c);
824         return;
825     }
826
827     if (!c->authorized) {
828         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
829         return;
830     }
831
832     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
833         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
834         return;
835     }
836
837     reply = pa_tagstruct_new(NULL, 0);
838     assert(reply);
839     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
840     pa_tagstruct_putu32(reply, tag);
841     pa_tagstruct_put_usec(reply, pa_sink_input_get_latency(s->sink_input));
842     pa_tagstruct_put_usec(reply, pa_sink_get_latency(s->sink_input->sink));
843     pa_tagstruct_put_boolean(reply, pa_memblockq_is_readable(s->memblockq));
844     pa_tagstruct_putu32(reply, pa_memblockq_get_length(s->memblockq));
845     pa_tagstruct_put_timeval(reply, &tv);
846     gettimeofday(&now, NULL);
847     pa_tagstruct_put_timeval(reply, &now);
848     pa_pstream_send_tagstruct(c->pstream, reply);
849 }
850
851 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
852     struct connection *c = userdata;
853     struct upload_stream *s;
854     size_t length;
855     const char *name;
856     struct pa_sample_spec ss;
857     struct pa_tagstruct *reply;
858     assert(c && t && c->protocol && c->protocol->core);
859     
860     if (pa_tagstruct_gets(t, &name) < 0 ||
861         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
862         pa_tagstruct_getu32(t, &length) < 0 ||
863         !pa_tagstruct_eof(t)) {
864         protocol_error(c);
865         return;
866     }
867
868     if (!c->authorized) {
869         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
870         return;
871     }
872
873     if ((length % pa_frame_size(&ss)) != 0 || length <= 0 || !*name) {
874         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
875         return;
876     }
877     
878     if (!(s = upload_stream_new(c, &ss, name, length))) {
879         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
880         return;
881     }
882     
883     reply = pa_tagstruct_new(NULL, 0);
884     assert(reply);
885     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
886     pa_tagstruct_putu32(reply, tag);
887     pa_tagstruct_putu32(reply, s->index);
888     pa_pstream_send_tagstruct(c->pstream, reply);
889     
890     reply = pa_tagstruct_new(NULL, 0);
891     assert(reply);
892     pa_tagstruct_putu32(reply, PA_COMMAND_REQUEST);
893     pa_tagstruct_putu32(reply, (uint32_t) -1); /* tag */
894     pa_tagstruct_putu32(reply, s->index);
895     pa_tagstruct_putu32(reply, length);
896     pa_pstream_send_tagstruct(c->pstream, reply);
897 }
898
899 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
900     struct connection *c = userdata;
901     uint32_t channel;
902     struct upload_stream *s;
903     uint32_t index;
904     assert(c && t);
905     
906     if (pa_tagstruct_getu32(t, &channel) < 0 ||
907         !pa_tagstruct_eof(t)) {
908         protocol_error(c);
909         return;
910     }
911
912     if (!c->authorized) {
913         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
914         return;
915     }
916
917     if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
918         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
919         return;
920     }
921
922     pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->memchunk, &index);
923     pa_pstream_send_simple_ack(c->pstream, tag);
924     upload_stream_free(s);
925 }
926
927 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
928     struct connection *c = userdata;
929     uint32_t sink_index, volume;
930     struct pa_sink *sink;
931     const char *name, *sink_name;
932     assert(c && t);
933
934     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
935         pa_tagstruct_gets(t, &sink_name) < 0 ||
936         pa_tagstruct_getu32(t, &volume) < 0 ||
937         pa_tagstruct_gets(t, &name) < 0 ||
938         !pa_tagstruct_eof(t)) {
939         protocol_error(c);
940         return;
941     }
942     
943     if (!c->authorized) {
944         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
945         return;
946     }
947
948     if (sink_index != (uint32_t) -1)
949         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
950     else
951         sink = pa_namereg_get(c->protocol->core, *sink_name ? sink_name : NULL, PA_NAMEREG_SINK, 1);
952
953     if (!sink) {
954         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
955         return;
956     }
957
958     if (pa_scache_play_item(c->protocol->core, name, sink, volume) < 0) {
959         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
960         return;
961     }
962
963     pa_pstream_send_simple_ack(c->pstream, tag);
964 }
965
966 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
967     struct connection *c = userdata;
968     const char *name;
969     assert(c && t);
970
971     if (pa_tagstruct_gets(t, &name) < 0 ||
972         !pa_tagstruct_eof(t)) {
973         protocol_error(c);
974         return;
975     }
976
977     if (!c->authorized) {
978         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
979         return;
980     }
981
982     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
983         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
984         return;
985     }
986
987     pa_pstream_send_simple_ack(c->pstream, tag);
988 }
989
990 static void sink_fill_tagstruct(struct pa_tagstruct *t, struct pa_sink *sink) {
991     assert(t && sink);
992     pa_tagstruct_putu32(t, sink->index);
993     pa_tagstruct_puts(t, sink->name);
994     pa_tagstruct_puts(t, sink->description ? sink->description : "");
995     pa_tagstruct_put_sample_spec(t, &sink->sample_spec);
996     pa_tagstruct_putu32(t, sink->owner ? sink->owner->index : (uint32_t) -1);
997     pa_tagstruct_putu32(t, sink->volume);
998     pa_tagstruct_putu32(t, sink->monitor_source->index);
999     pa_tagstruct_puts(t, sink->monitor_source->name);
1000     pa_tagstruct_put_usec(t, pa_sink_get_latency(sink));
1001 }
1002
1003 static void source_fill_tagstruct(struct pa_tagstruct *t, struct pa_source *source) {
1004     assert(t && source);
1005     pa_tagstruct_putu32(t, source->index);
1006     pa_tagstruct_puts(t, source->name);
1007     pa_tagstruct_puts(t, source->description ? source->description : "");
1008     pa_tagstruct_put_sample_spec(t, &source->sample_spec);
1009     pa_tagstruct_putu32(t, source->owner ? source->owner->index : (uint32_t) -1);
1010     pa_tagstruct_putu32(t, source->monitor_of ? source->monitor_of->index : (uint32_t) -1);
1011     pa_tagstruct_puts(t, source->monitor_of ? source->monitor_of->name : "");
1012 }
1013
1014 static void client_fill_tagstruct(struct pa_tagstruct *t, struct pa_client *client) {
1015     assert(t && client);
1016     pa_tagstruct_putu32(t, client->index);
1017     pa_tagstruct_puts(t, client->name);
1018     pa_tagstruct_puts(t, client->protocol_name);
1019     pa_tagstruct_putu32(t, client->owner ? client->owner->index : (uint32_t) -1);
1020 }
1021
1022 static void module_fill_tagstruct(struct pa_tagstruct *t, struct pa_module *module) {
1023     assert(t && module);
1024     pa_tagstruct_putu32(t, module->index);
1025     pa_tagstruct_puts(t, module->name);
1026     pa_tagstruct_puts(t, module->argument ? module->argument : "");
1027     pa_tagstruct_putu32(t, module->n_used);
1028     pa_tagstruct_put_boolean(t, module->auto_unload);
1029 }
1030
1031 static void sink_input_fill_tagstruct(struct pa_tagstruct *t, struct pa_sink_input *s) {
1032     assert(t && s);
1033     pa_tagstruct_putu32(t, s->index);
1034     pa_tagstruct_puts(t, s->name ? s->name : "");
1035     pa_tagstruct_putu32(t, s->owner ? s->owner->index : (uint32_t) -1);
1036     pa_tagstruct_putu32(t, s->client ? s->client->index : (uint32_t) -1);
1037     pa_tagstruct_putu32(t, s->sink->index);
1038     pa_tagstruct_put_sample_spec(t, &s->sample_spec);
1039     pa_tagstruct_putu32(t, s->volume);
1040     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s));
1041     pa_tagstruct_put_usec(t, pa_sink_get_latency(s->sink));
1042 }
1043
1044 static void source_output_fill_tagstruct(struct pa_tagstruct *t, struct pa_source_output *s) {
1045     assert(t && s);
1046     pa_tagstruct_putu32(t, s->index);
1047     pa_tagstruct_puts(t, s->name ? s->name : "");
1048     pa_tagstruct_putu32(t, s->owner ? s->owner->index : (uint32_t) -1);
1049     pa_tagstruct_putu32(t, s->client ? s->client->index : (uint32_t) -1);
1050     pa_tagstruct_putu32(t, s->source->index);
1051     pa_tagstruct_put_sample_spec(t, &s->sample_spec);
1052 }
1053
1054 static void scache_fill_tagstruct(struct pa_tagstruct *t, struct pa_scache_entry *e) {
1055     assert(t && e);
1056     pa_tagstruct_putu32(t, e->index);
1057     pa_tagstruct_puts(t, e->name);
1058     pa_tagstruct_putu32(t, e->volume);
1059     pa_tagstruct_put_usec(t, pa_bytes_to_usec(e->memchunk.length, &e->sample_spec));
1060     pa_tagstruct_put_sample_spec(t, &e->sample_spec);
1061     pa_tagstruct_putu32(t, e->memchunk.length);
1062 }
1063
1064 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1065     struct connection *c = userdata;
1066     uint32_t index;
1067     struct pa_sink *sink = NULL;
1068     struct pa_source *source = NULL;
1069     struct pa_client *client = NULL;
1070     struct pa_module *module = NULL;
1071     struct pa_sink_input *si = NULL;
1072     struct pa_source_output *so = NULL;
1073     struct pa_scache_entry *sce = NULL;
1074     const char *name;
1075     struct pa_tagstruct *reply;
1076     assert(c && t);
1077
1078     
1079     if (pa_tagstruct_getu32(t, &index) < 0 ||
1080         (command != PA_COMMAND_GET_CLIENT_INFO &&
1081          command != PA_COMMAND_GET_MODULE_INFO &&
1082          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
1083          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
1084          pa_tagstruct_gets(t, &name) < 0) ||
1085         !pa_tagstruct_eof(t)) {
1086         protocol_error(c);
1087         return;
1088     }
1089     
1090     if (!c->authorized) {
1091         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1092         return;
1093     }
1094
1095     if (command == PA_COMMAND_GET_SINK_INFO) {
1096         if (index != (uint32_t) -1)
1097             sink = pa_idxset_get_by_index(c->protocol->core->sinks, index);
1098         else
1099             sink = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SINK, 1);
1100     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
1101         if (index != (uint32_t) -1)
1102             source = pa_idxset_get_by_index(c->protocol->core->sources, index);
1103         else
1104             source = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SOURCE, 1);
1105     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
1106         client = pa_idxset_get_by_index(c->protocol->core->clients, index);
1107     else if (command == PA_COMMAND_GET_MODULE_INFO) 
1108         module = pa_idxset_get_by_index(c->protocol->core->modules, index);
1109     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
1110         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index);
1111     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
1112         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, index);
1113     else {
1114         assert(command == PA_COMMAND_GET_SAMPLE_INFO && name);
1115         if (index != (uint32_t) -1)
1116             sce = pa_idxset_get_by_index(c->protocol->core->scache, index);
1117         else
1118             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE, 0);
1119     }
1120             
1121     if (!sink && !source && !client && !module && !si && !so && !sce) {
1122         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1123         return;
1124     }
1125
1126     reply = pa_tagstruct_new(NULL, 0);
1127     assert(reply);
1128     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1129     pa_tagstruct_putu32(reply, tag); 
1130     if (sink)
1131         sink_fill_tagstruct(reply, sink);
1132     else if (source)
1133         source_fill_tagstruct(reply, source);
1134     else if (client)
1135         client_fill_tagstruct(reply, client);
1136     else if (module)
1137         module_fill_tagstruct(reply, module);
1138     else if (si)
1139         sink_input_fill_tagstruct(reply, si);
1140     else if (so)
1141         source_output_fill_tagstruct(reply, so);
1142     else
1143         scache_fill_tagstruct(reply, sce);
1144     pa_pstream_send_tagstruct(c->pstream, reply);
1145 }
1146
1147 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1148     struct connection *c = userdata;
1149     struct pa_idxset *i;
1150     uint32_t index;
1151     void *p;
1152     struct pa_tagstruct *reply;
1153     assert(c && t);
1154
1155     if (!pa_tagstruct_eof(t)) {
1156         protocol_error(c);
1157         return;
1158     }
1159     
1160     if (!c->authorized) {
1161         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1162         return;
1163     }
1164
1165     reply = pa_tagstruct_new(NULL, 0);
1166     assert(reply);
1167     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1168     pa_tagstruct_putu32(reply, tag);
1169
1170     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1171         i = c->protocol->core->sinks;
1172     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1173         i = c->protocol->core->sources;
1174     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1175         i = c->protocol->core->clients;
1176     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
1177         i = c->protocol->core->modules;
1178     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
1179         i = c->protocol->core->sink_inputs;
1180     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
1181         i = c->protocol->core->source_outputs;
1182     else {
1183         assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
1184         i = c->protocol->core->scache;
1185     }
1186
1187     if (i) {
1188         for (p = pa_idxset_first(i, &index); p; p = pa_idxset_next(i, &index)) {
1189             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1190                 sink_fill_tagstruct(reply, p);
1191             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1192                 source_fill_tagstruct(reply, p);
1193             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1194                 client_fill_tagstruct(reply, p);
1195             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
1196                 module_fill_tagstruct(reply, p);
1197             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
1198                 sink_input_fill_tagstruct(reply, p);
1199             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST) 
1200                 source_output_fill_tagstruct(reply, p);
1201             else {
1202                 assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
1203                 scache_fill_tagstruct(reply, p);
1204             }
1205         }
1206     }
1207     
1208     pa_pstream_send_tagstruct(c->pstream, reply);
1209 }
1210
1211 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1212     struct connection *c = userdata;
1213     struct pa_tagstruct *reply;
1214     char txt[256];
1215     const char *n;
1216     assert(c && t);
1217
1218     if (!pa_tagstruct_eof(t)) {
1219         protocol_error(c);
1220         return;
1221     }
1222     
1223     if (!c->authorized) {
1224         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1225         return;
1226     }
1227
1228     reply = pa_tagstruct_new(NULL, 0);
1229     assert(reply);
1230     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1231     pa_tagstruct_putu32(reply, tag);
1232     pa_tagstruct_puts(reply, PACKAGE_NAME);
1233     pa_tagstruct_puts(reply, PACKAGE_VERSION);
1234     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
1235     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
1236     pa_tagstruct_put_sample_spec(reply, &c->protocol->core->default_sample_spec);
1237
1238     n = pa_namereg_get_default_sink_name(c->protocol->core);
1239     pa_tagstruct_puts(reply, n ? n : "");
1240     n = pa_namereg_get_default_source_name(c->protocol->core);
1241     pa_tagstruct_puts(reply, n ? n : "");
1242     pa_pstream_send_tagstruct(c->pstream, reply);
1243 }
1244
1245 static void subscription_cb(struct pa_core *core, enum pa_subscription_event_type e, uint32_t index, void *userdata) {
1246     struct pa_tagstruct *t;
1247     struct connection *c = userdata;
1248     assert(c && core);
1249
1250     t = pa_tagstruct_new(NULL, 0);
1251     assert(t);
1252     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
1253     pa_tagstruct_putu32(t, (uint32_t) -1);
1254     pa_tagstruct_putu32(t, e);
1255     pa_tagstruct_putu32(t, index);
1256     pa_pstream_send_tagstruct(c->pstream, t);
1257 }
1258
1259 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1260     struct connection *c = userdata;
1261     enum pa_subscription_mask m;
1262     assert(c && t);
1263
1264     if (pa_tagstruct_getu32(t, &m) < 0 ||
1265         !pa_tagstruct_eof(t)) {
1266         protocol_error(c);
1267         return;
1268     }
1269     
1270     if (!c->authorized) {
1271         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1272         return;
1273     }
1274
1275     if (c->subscription)
1276         pa_subscription_free(c->subscription);
1277
1278     if (m != 0) {
1279         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
1280         assert(c->subscription);
1281     } else
1282         c->subscription = NULL;
1283
1284     pa_pstream_send_simple_ack(c->pstream, tag);
1285 }
1286
1287 static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1288     struct connection *c = userdata;
1289     uint32_t index, volume;
1290     struct pa_sink *sink = NULL;
1291     struct pa_sink_input *si = NULL;
1292     const char *name = NULL;
1293     assert(c && t);
1294
1295     if (pa_tagstruct_getu32(t, &index) < 0 ||
1296         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
1297         pa_tagstruct_getu32(t, &volume) ||
1298         !pa_tagstruct_eof(t)) {
1299         protocol_error(c);
1300         return;
1301     }
1302     
1303     if (!c->authorized) {
1304         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1305         return;
1306     }
1307
1308     if (command == PA_COMMAND_SET_SINK_VOLUME) {
1309         if (index != (uint32_t) -1)
1310             sink = pa_idxset_get_by_index(c->protocol->core->sinks, index);
1311         else
1312             sink = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SINK, 1);
1313     }  else {
1314         assert(command == PA_COMMAND_SET_SINK_INPUT_VOLUME);
1315         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index);
1316     }
1317
1318     if (!si && !sink) {
1319         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1320         return;
1321     }
1322
1323     if (sink)
1324         pa_sink_set_volume(sink, volume);
1325     else if (si)
1326         pa_sink_input_set_volume(si, volume);
1327
1328     pa_pstream_send_simple_ack(c->pstream, tag);
1329 }
1330
1331 static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1332     struct connection *c = userdata;
1333     uint32_t index;
1334     uint32_t b;
1335     struct playback_stream *s;
1336     assert(c && t);
1337
1338     if (pa_tagstruct_getu32(t, &index) < 0 ||
1339         pa_tagstruct_getu32(t, &b) < 0 ||
1340         !pa_tagstruct_eof(t)) {
1341         protocol_error(c);
1342         return;
1343     }
1344
1345     if (!c->authorized) {
1346         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1347         return;
1348     }
1349
1350     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1351         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1352         return;
1353     }
1354
1355     pa_sink_input_cork(s->sink_input, b);
1356     pa_pstream_send_simple_ack(c->pstream, tag);
1357 }
1358
1359 static void command_flush_or_trigger_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1360     struct connection *c = userdata;
1361     uint32_t index;
1362     struct playback_stream *s;
1363     assert(c && t);
1364
1365     if (pa_tagstruct_getu32(t, &index) < 0 ||
1366         !pa_tagstruct_eof(t)) {
1367         protocol_error(c);
1368         return;
1369     }
1370
1371     if (!c->authorized) {
1372         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1373         return;
1374     }
1375
1376     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1377         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1378         return;
1379     }
1380
1381     if (command == PA_COMMAND_TRIGGER_PLAYBACK_STREAM)
1382         pa_memblockq_prebuf_disable(s->memblockq);
1383     else {
1384         assert(command == PA_COMMAND_FLUSH_PLAYBACK_STREAM);
1385         pa_memblockq_flush(s->memblockq);
1386         /*pa_log(__FILE__": flush: %u\n", pa_memblockq_get_length(s->memblockq));*/
1387     }
1388
1389     pa_sink_notify(s->sink_input->sink);
1390     pa_pstream_send_simple_ack(c->pstream, tag);
1391     request_bytes(s);
1392 }
1393
1394 static void command_set_default_sink_or_source(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1395     struct connection *c = userdata;
1396     uint32_t index;
1397     const char *s;
1398     assert(c && t);
1399
1400     if (pa_tagstruct_getu32(t, &index) < 0 ||
1401         pa_tagstruct_gets(t, &s) < 0 ||
1402         !pa_tagstruct_eof(t)) {
1403         protocol_error(c);
1404         return;
1405     }
1406
1407     if (!c->authorized) {
1408         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1409         return;
1410     }
1411
1412     pa_namereg_set_default(c->protocol->core, s, command == PA_COMMAND_SET_DEFAULT_SOURCE ? PA_NAMEREG_SOURCE : PA_NAMEREG_SINK);
1413     pa_pstream_send_simple_ack(c->pstream, tag);
1414 }
1415
1416 /*** pstream callbacks ***/
1417
1418 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
1419     struct connection *c = userdata;
1420     assert(p && packet && packet->data && c);
1421
1422     if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
1423         pa_log(__FILE__": invalid packet.\n");
1424         connection_free(c);
1425     }
1426 }
1427
1428 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
1429     struct connection *c = userdata;
1430     struct output_stream *stream;
1431     assert(p && chunk && userdata);
1432
1433     if (!(stream = pa_idxset_get_by_index(c->output_streams, channel))) {
1434         pa_log(__FILE__": client sent block for invalid stream.\n");
1435         connection_free(c);
1436         return;
1437     }
1438
1439     if (stream->type == PLAYBACK_STREAM) {
1440         struct playback_stream *p = (struct playback_stream*) stream;
1441         if (chunk->length >= p->requested_bytes)
1442             p->requested_bytes = 0;
1443         else
1444             p->requested_bytes -= chunk->length;
1445         
1446         pa_memblockq_push_align(p->memblockq, chunk, delta);
1447         assert(p->sink_input);
1448         /*pa_log(__FILE__": after_recv: %u\n", pa_memblockq_get_length(p->memblockq));*/
1449
1450         pa_sink_notify(p->sink_input->sink);
1451 /*         pa_log(__FILE__": Recieved %u bytes.\n", chunk->length); */
1452
1453     } else {
1454         struct upload_stream *u = (struct upload_stream*) stream;
1455         size_t l;
1456         assert(u->type == UPLOAD_STREAM);
1457
1458         if (!u->memchunk.memblock) {
1459             if (u->length == chunk->length) {
1460                 u->memchunk = *chunk;
1461                 pa_memblock_ref(u->memchunk.memblock);
1462                 u->length = 0;
1463             } else {
1464                 u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
1465                 u->memchunk.index = u->memchunk.length = 0;
1466             }
1467         }
1468         
1469         assert(u->memchunk.memblock);
1470         
1471         l = u->length; 
1472         if (l > chunk->length)
1473             l = chunk->length;
1474
1475         if (l > 0) {
1476             memcpy((uint8_t*) u->memchunk.memblock->data + u->memchunk.index + u->memchunk.length,
1477                    (uint8_t*) chunk->memblock->data+chunk->index, l);
1478             u->memchunk.length += l;
1479             u->length -= l;
1480         }
1481     }
1482 }
1483
1484 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
1485     struct connection *c = userdata;
1486     assert(p && c);
1487     connection_free(c);
1488
1489 /*    pa_log(__FILE__": connection died.\n");*/
1490 }
1491
1492
1493 static void pstream_drain_callback(struct pa_pstream *p, void *userdata) {
1494     struct connection *c = userdata;
1495     assert(p && c);
1496
1497     send_memblock(c);
1498 }
1499
1500 /*** client callbacks ***/
1501
1502 static void client_kill_cb(struct pa_client *c) {
1503     assert(c && c->userdata);
1504     connection_free(c->userdata);
1505 }
1506
1507 /*** socket server callbacks ***/
1508
1509 static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) {
1510     struct pa_protocol_native *p = userdata;
1511     struct connection *c;
1512     assert(io && p);
1513
1514     c = pa_xmalloc(sizeof(struct connection));
1515     c->authorized = p->public;
1516     c->protocol = p;
1517     assert(p->core);
1518     c->client = pa_client_new(p->core, "NATIVE", "Client");
1519     assert(c->client);
1520     c->client->kill = client_kill_cb;
1521     c->client->userdata = c;
1522     c->client->owner = p->module;
1523     
1524     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->memblock_stat);
1525     assert(c->pstream);
1526
1527     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
1528     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
1529     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
1530     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
1531
1532     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
1533     assert(c->pdispatch);
1534
1535     c->record_streams = pa_idxset_new(NULL, NULL);
1536     c->output_streams = pa_idxset_new(NULL, NULL);
1537     assert(c->record_streams && c->output_streams);
1538
1539     c->rrobin_index = PA_IDXSET_INVALID;
1540     c->subscription = NULL;
1541
1542     pa_idxset_put(p->connections, c, NULL);
1543 }
1544
1545 /*** module entry points ***/
1546
1547 static struct pa_protocol_native* protocol_new_internal(struct pa_core *c, struct pa_module *m, struct pa_modargs *ma) {
1548     struct pa_protocol_native *p;
1549     int public;
1550     assert(c && ma);
1551
1552     if (pa_modargs_get_value_boolean(ma, "public", &public) < 0) {
1553         pa_log(__FILE__": public= expects a boolean argument.\n");
1554         return NULL;
1555     }
1556     
1557     p = pa_xmalloc(sizeof(struct pa_protocol_native));
1558
1559     if (pa_authkey_load_from_home(pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), p->auth_cookie, sizeof(p->auth_cookie)) < 0) {
1560         pa_xfree(p);
1561         return NULL;
1562     }
1563
1564     p->module = m;
1565     p->public = public;
1566     p->server = NULL;
1567     p->core = c;
1568     p->connections = pa_idxset_new(NULL, NULL);
1569     assert(p->connections);
1570
1571     return p;
1572 }
1573
1574 struct pa_protocol_native* pa_protocol_native_new(struct pa_core *core, struct pa_socket_server *server, struct pa_module *m, struct pa_modargs *ma) {
1575     struct pa_protocol_native *p;
1576
1577     if (!(p = protocol_new_internal(core, m, ma)))
1578         return NULL;
1579     
1580     p->server = server;
1581     pa_socket_server_set_callback(p->server, on_connection, p);
1582     
1583     return p;
1584 }
1585
1586 void pa_protocol_native_free(struct pa_protocol_native *p) {
1587     struct connection *c;
1588     assert(p);
1589
1590     while ((c = pa_idxset_first(p->connections, NULL)))
1591         connection_free(c);
1592     pa_idxset_free(p->connections, NULL, NULL);
1593
1594     if (p->server)
1595         pa_socket_server_unref(p->server);
1596     
1597     pa_xfree(p);
1598 }
1599
1600 struct pa_protocol_native* pa_protocol_native_new_iochannel(struct pa_core*core, struct pa_iochannel *io, struct pa_module *m, struct pa_modargs *ma) {
1601     struct pa_protocol_native *p;
1602
1603     if (!(p = protocol_new_internal(core, m, ma)))
1604         return NULL;
1605
1606     on_connection(NULL, io, p);
1607     
1608     return p;
1609 }