add resample_method option module-combine
[profile/ivi/pulseaudio-panda.git] / polyp / protocol-native.c
1 /* $Id$ */
2
3 /***
4   This file is part of polypaudio.
5  
6   polypaudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   polypaudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU General Public License
17   along with polypaudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <string.h>
27 #include <stdio.h>
28 #include <assert.h>
29 #include <stdlib.h>
30
31 #include "protocol-native.h"
32 #include "native-common.h"
33 #include "packet.h"
34 #include "client.h"
35 #include "source-output.h"
36 #include "sink-input.h"
37 #include "pstream.h"
38 #include "tagstruct.h"
39 #include "pdispatch.h"
40 #include "pstream-util.h"
41 #include "authkey.h"
42 #include "namereg.h"
43 #include "scache.h"
44 #include "xmalloc.h"
45 #include "util.h"
46 #include "subscribe.h"
47 #include "log.h"
48 #include "autoload.h"
49
50 struct connection;
51 struct pa_protocol_native;
52
53 struct record_stream {
54     struct connection *connection;
55     uint32_t index;
56     struct pa_source_output *source_output;
57     struct pa_memblockq *memblockq;
58     size_t fragment_size;
59 };
60
61 struct playback_stream {
62     int type;
63     struct connection *connection;
64     uint32_t index;
65     struct pa_sink_input *sink_input;
66     struct pa_memblockq *memblockq;
67     size_t requested_bytes;
68     int drain_request;
69     uint32_t drain_tag;
70 };
71
72 struct upload_stream {
73     int type;
74     struct connection *connection;
75     uint32_t index;
76     struct pa_memchunk memchunk;
77     size_t length;
78     char *name;
79     struct pa_sample_spec sample_spec;
80 };
81
82 struct output_stream {
83     int type;
84 };
85
86 enum {
87     UPLOAD_STREAM,
88     PLAYBACK_STREAM
89 };
90
91 struct connection {
92     int authorized;
93     struct pa_protocol_native *protocol;
94     struct pa_client *client;
95     struct pa_pstream *pstream;
96     struct pa_pdispatch *pdispatch;
97     struct pa_idxset *record_streams, *output_streams;
98     uint32_t rrobin_index;
99     struct pa_subscription *subscription;
100 };
101
102 struct pa_protocol_native {
103     struct pa_module *module;
104     int public;
105     struct pa_core *core;
106     struct pa_socket_server *server;
107     struct pa_idxset *connections;
108     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
109 };
110
111 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
112 static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
113 static void sink_input_kill_cb(struct pa_sink_input *i);
114 static pa_usec_t sink_input_get_latency_cb(struct pa_sink_input *i);
115
116 static void request_bytes(struct playback_stream*s);
117
118 static void source_output_kill_cb(struct pa_source_output *o);
119 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk);
120 static pa_usec_t source_output_get_latency_cb(struct pa_source_output *o);
121
122 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
123 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
124 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
125 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
126 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
127 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
128 static void command_set_client_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
129 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
130 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
131 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
132 static void command_get_record_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
133 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
134 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
135 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
136 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
137 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
138 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
139 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
140 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
141 static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
142 static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
143 static void command_flush_or_trigger_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
144 static void command_set_default_sink_or_source(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
145 static void command_set_stream_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
146 static void command_kill(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
147 static void command_load_module(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
148 static void command_unload_module(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
149 static void command_add_autoload(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
150 static void command_remove_autoload(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
151 static void command_get_autoload_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
152 static void command_get_autoload_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
153
154 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
155     [PA_COMMAND_ERROR] = { NULL },
156     [PA_COMMAND_TIMEOUT] = { NULL },
157     [PA_COMMAND_REPLY] = { NULL },
158     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
159     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_stream },
160     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = { command_drain_playback_stream },
161     [PA_COMMAND_CREATE_RECORD_STREAM] = { command_create_record_stream },
162     [PA_COMMAND_DELETE_RECORD_STREAM] = { command_delete_stream },
163     [PA_COMMAND_AUTH] = { command_auth },
164     [PA_COMMAND_REQUEST] = { NULL },
165     [PA_COMMAND_EXIT] = { command_exit },
166     [PA_COMMAND_SET_CLIENT_NAME] = { command_set_client_name },
167     [PA_COMMAND_LOOKUP_SINK] = { command_lookup },
168     [PA_COMMAND_LOOKUP_SOURCE] = { command_lookup },
169     [PA_COMMAND_STAT] = { command_stat },
170     [PA_COMMAND_GET_PLAYBACK_LATENCY] = { command_get_playback_latency },
171     [PA_COMMAND_GET_RECORD_LATENCY] = { command_get_record_latency },
172     [PA_COMMAND_CREATE_UPLOAD_STREAM] = { command_create_upload_stream },
173     [PA_COMMAND_DELETE_UPLOAD_STREAM] = { command_delete_stream },
174     [PA_COMMAND_FINISH_UPLOAD_STREAM] = { command_finish_upload_stream },
175     [PA_COMMAND_PLAY_SAMPLE] = { command_play_sample },
176     [PA_COMMAND_REMOVE_SAMPLE] = { command_remove_sample },
177     [PA_COMMAND_GET_SINK_INFO] = { command_get_info },
178     [PA_COMMAND_GET_SOURCE_INFO] = { command_get_info },
179     [PA_COMMAND_GET_CLIENT_INFO] = { command_get_info },
180     [PA_COMMAND_GET_MODULE_INFO] = { command_get_info },
181     [PA_COMMAND_GET_SINK_INPUT_INFO] = { command_get_info },
182     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = { command_get_info },
183     [PA_COMMAND_GET_SAMPLE_INFO] = { command_get_info },
184     [PA_COMMAND_GET_SINK_INFO_LIST] = { command_get_info_list },
185     [PA_COMMAND_GET_SOURCE_INFO_LIST] = { command_get_info_list },
186     [PA_COMMAND_GET_MODULE_INFO_LIST] = { command_get_info_list },
187     [PA_COMMAND_GET_CLIENT_INFO_LIST] = { command_get_info_list },
188     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = { command_get_info_list },
189     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = { command_get_info_list },
190     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = { command_get_info_list },
191     [PA_COMMAND_GET_SERVER_INFO] = { command_get_server_info },
192     [PA_COMMAND_SUBSCRIBE] = { command_subscribe },
193     [PA_COMMAND_SET_SINK_VOLUME] = { command_set_volume },
194     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = { command_set_volume },
195     [PA_COMMAND_CORK_PLAYBACK_STREAM] = { command_cork_playback_stream },
196     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = { command_flush_or_trigger_playback_stream },
197     [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = { command_flush_or_trigger_playback_stream },
198     [PA_COMMAND_SET_DEFAULT_SINK] = { command_set_default_sink_or_source },
199     [PA_COMMAND_SET_DEFAULT_SOURCE] = { command_set_default_sink_or_source },
200     [PA_COMMAND_SET_PLAYBACK_STREAM_NAME] = { command_set_stream_name }, 
201     [PA_COMMAND_SET_RECORD_STREAM_NAME] = { command_set_stream_name },
202     [PA_COMMAND_KILL_CLIENT] = { command_kill },
203     [PA_COMMAND_KILL_SINK_INPUT] = { command_kill },
204     [PA_COMMAND_KILL_SOURCE_OUTPUT] = { command_kill },
205     [PA_COMMAND_LOAD_MODULE] = { command_load_module },
206     [PA_COMMAND_UNLOAD_MODULE] = { command_unload_module },
207     [PA_COMMAND_GET_AUTOLOAD_INFO] = { command_get_autoload_info },
208     [PA_COMMAND_GET_AUTOLOAD_INFO_LIST] = { command_get_autoload_info_list },
209     [PA_COMMAND_ADD_AUTOLOAD] = { command_add_autoload },
210     [PA_COMMAND_REMOVE_AUTOLOAD] = { command_remove_autoload },
211 };
212
213 /* structure management */
214
215 static struct upload_stream* upload_stream_new(struct connection *c, const struct pa_sample_spec *ss, const char *name, size_t length) {
216     struct upload_stream *s;
217     assert(c && ss && name && length);
218     
219     s = pa_xmalloc(sizeof(struct upload_stream));
220     s->type = UPLOAD_STREAM;
221     s->connection = c;
222     s->sample_spec = *ss;
223     s->name = pa_xstrdup(name);
224
225     s->memchunk.memblock = NULL;
226     s->memchunk.index = 0;
227     s->memchunk.length = 0;
228
229     s->length = length;
230     
231     pa_idxset_put(c->output_streams, s, &s->index);
232     return s;
233 }
234
235 static void upload_stream_free(struct upload_stream *o) {
236     assert(o && o->connection);
237
238     pa_idxset_remove_by_data(o->connection->output_streams, o, NULL);
239
240     pa_xfree(o->name);
241     
242     if (o->memchunk.memblock)
243         pa_memblock_unref(o->memchunk.memblock);
244     
245     pa_xfree(o);
246 }
247
248 static struct record_stream* record_stream_new(struct connection *c, struct pa_source *source, const struct pa_sample_spec *ss, const char *name, size_t maxlength, size_t fragment_size) {
249     struct record_stream *s;
250     struct pa_source_output *source_output;
251     size_t base;
252     assert(c && source && ss && name && maxlength);
253
254     if (!(source_output = pa_source_output_new(source, name, ss, -1)))
255         return NULL;
256
257     s = pa_xmalloc(sizeof(struct record_stream));
258     s->connection = c;
259     s->source_output = source_output;
260     s->source_output->push = source_output_push_cb;
261     s->source_output->kill = source_output_kill_cb;
262     s->source_output->get_latency = source_output_get_latency_cb;
263     s->source_output->userdata = s;
264     s->source_output->owner = c->protocol->module;
265     s->source_output->client = c->client;
266
267     s->memblockq = pa_memblockq_new(maxlength, 0, base = pa_frame_size(ss), 0, 0, c->protocol->core->memblock_stat);
268     assert(s->memblockq);
269
270     s->fragment_size = (fragment_size/base)*base;
271     if (!s->fragment_size)
272         s->fragment_size = base;
273
274     pa_idxset_put(c->record_streams, s, &s->index);
275     return s;
276 }
277
278 static void record_stream_free(struct record_stream* r) {
279     assert(r && r->connection);
280
281     pa_idxset_remove_by_data(r->connection->record_streams, r, NULL);
282     pa_source_output_disconnect(r->source_output);
283     pa_source_output_unref(r->source_output);
284     pa_memblockq_free(r->memblockq);
285     pa_xfree(r);
286 }
287
288 static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, const struct pa_sample_spec *ss, const char *name,
289                                                    size_t maxlength,
290                                                    size_t tlength,
291                                                    size_t prebuf,
292                                                    size_t minreq,
293                                                    pa_volume_t volume) {
294     struct playback_stream *s;
295     struct pa_sink_input *sink_input;
296     assert(c && sink && ss && name && maxlength);
297
298     if (!(sink_input = pa_sink_input_new(sink, name, ss, 0, -1)))
299         return NULL;
300     
301     s = pa_xmalloc(sizeof(struct playback_stream));
302     s->type = PLAYBACK_STREAM;
303     s->connection = c;
304     s->sink_input = sink_input;
305     
306     s->sink_input->peek = sink_input_peek_cb;
307     s->sink_input->drop = sink_input_drop_cb;
308     s->sink_input->kill = sink_input_kill_cb;
309     s->sink_input->get_latency = sink_input_get_latency_cb;
310     s->sink_input->userdata = s;
311     s->sink_input->owner = c->protocol->module;
312     s->sink_input->client = c->client;
313     
314     s->memblockq = pa_memblockq_new(maxlength, tlength, pa_frame_size(ss), prebuf, minreq, c->protocol->core->memblock_stat);
315     assert(s->memblockq);
316
317     s->requested_bytes = 0;
318     s->drain_request = 0;
319
320     s->sink_input->volume = volume;
321     
322     pa_idxset_put(c->output_streams, s, &s->index);
323     return s;
324 }
325
326 static void playback_stream_free(struct playback_stream* p) {
327     assert(p && p->connection);
328
329     if (p->drain_request)
330         pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERROR_NOENTITY);
331
332     pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
333     pa_sink_input_disconnect(p->sink_input);
334     pa_sink_input_unref(p->sink_input);
335     pa_memblockq_free(p->memblockq);
336     pa_xfree(p);
337 }
338
339 static void connection_free(struct connection *c) {
340     struct record_stream *r;
341     struct output_stream *o;
342     assert(c && c->protocol);
343
344     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
345     while ((r = pa_idxset_first(c->record_streams, NULL)))
346         record_stream_free(r);
347     pa_idxset_free(c->record_streams, NULL, NULL);
348
349     while ((o = pa_idxset_first(c->output_streams, NULL)))
350         if (o->type == PLAYBACK_STREAM)
351             playback_stream_free((struct playback_stream*) o);
352         else
353             upload_stream_free((struct upload_stream*) o);
354     pa_idxset_free(c->output_streams, NULL, NULL);
355
356     pa_pdispatch_unref(c->pdispatch);
357     pa_pstream_close(c->pstream);
358     pa_pstream_unref(c->pstream);
359     pa_client_free(c->client);
360
361     if (c->subscription)
362         pa_subscription_free(c->subscription);
363     
364     pa_xfree(c);
365 }
366
367 static void request_bytes(struct playback_stream *s) {
368     struct pa_tagstruct *t;
369     size_t l;
370     assert(s);
371
372     if (!(l = pa_memblockq_missing(s->memblockq)))
373         return;
374     
375     if (l <= s->requested_bytes)
376         return;
377
378     l -= s->requested_bytes;
379
380     if (l < pa_memblockq_get_minreq(s->memblockq))
381         return;
382     
383     s->requested_bytes += l;
384
385     t = pa_tagstruct_new(NULL, 0);
386     assert(t);
387     pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
388     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
389     pa_tagstruct_putu32(t, s->index);
390     pa_tagstruct_putu32(t, l);
391     pa_pstream_send_tagstruct(s->connection->pstream, t);
392
393 /*     pa_log(__FILE__": Requesting %u bytes\n", l); */
394 }
395
396 static void send_memblock(struct connection *c) {
397     uint32_t start;
398     struct record_stream *r;
399
400     start = PA_IDXSET_INVALID;
401     for (;;) {
402         struct pa_memchunk chunk;
403         
404         if (!(r = pa_idxset_rrobin(c->record_streams, &c->rrobin_index)))
405             return;
406
407         if (start == PA_IDXSET_INVALID)
408             start = c->rrobin_index;
409         else if (start == c->rrobin_index)
410             return;
411
412         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
413             struct pa_memchunk schunk = chunk;
414             
415             if (schunk.length > r->fragment_size)
416                 schunk.length = r->fragment_size;
417
418             pa_pstream_send_memblock(c->pstream, r->index, 0, &schunk);
419             pa_memblockq_drop(r->memblockq, &chunk, schunk.length);
420             pa_memblock_unref(schunk.memblock);
421             
422             return;
423         }
424     }
425 }
426
427 static void send_playback_stream_killed(struct playback_stream *p) {
428     struct pa_tagstruct *t;
429     assert(p);
430
431     t = pa_tagstruct_new(NULL, 0);
432     assert(t);
433     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
434     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
435     pa_tagstruct_putu32(t, p->index);
436     pa_pstream_send_tagstruct(p->connection->pstream, t);
437 }
438
439 static void send_record_stream_killed(struct record_stream *r) {
440     struct pa_tagstruct *t;
441     assert(r);
442
443     t = pa_tagstruct_new(NULL, 0);
444     assert(t);
445     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
446     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
447     pa_tagstruct_putu32(t, r->index);
448     pa_pstream_send_tagstruct(r->connection->pstream, t);
449 }
450
451 /*** sinkinput callbacks ***/
452
453 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) {
454     struct playback_stream *s;
455     assert(i && i->userdata && chunk);
456     s = i->userdata;
457
458     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
459         return -1;
460
461     return 0;
462 }
463
464 static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
465     struct playback_stream *s;
466     assert(i && i->userdata && length);
467     s = i->userdata;
468
469     pa_memblockq_drop(s->memblockq, chunk, length);
470     request_bytes(s);
471
472     if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
473         pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag);
474         s->drain_request = 0;
475     }
476
477     /*pa_log(__FILE__": after_drop: %u\n", pa_memblockq_get_length(s->memblockq));*/
478 }
479
480 static void sink_input_kill_cb(struct pa_sink_input *i) {
481     assert(i && i->userdata);
482     send_playback_stream_killed((struct playback_stream *) i->userdata);
483     playback_stream_free((struct playback_stream *) i->userdata);
484 }
485
486 static pa_usec_t sink_input_get_latency_cb(struct pa_sink_input *i) {
487     struct playback_stream *s;
488     assert(i && i->userdata);
489     s = i->userdata;
490
491     /*pa_log(__FILE__": get_latency: %u\n", pa_memblockq_get_length(s->memblockq));*/
492     
493     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
494 }
495
496 /*** source_output callbacks ***/
497
498 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk) {
499     struct record_stream *s;
500     assert(o && o->userdata && chunk);
501     s = o->userdata;
502     
503     pa_memblockq_push_align(s->memblockq, chunk, 0);
504     if (!pa_pstream_is_pending(s->connection->pstream))
505         send_memblock(s->connection);
506 }
507
508 static void source_output_kill_cb(struct pa_source_output *o) {
509     assert(o && o->userdata);
510     send_record_stream_killed((struct record_stream *) o->userdata);
511     record_stream_free((struct record_stream *) o->userdata);
512 }
513
514 static pa_usec_t source_output_get_latency_cb(struct pa_source_output *o) {
515     struct record_stream *s;
516     assert(o && o->userdata);
517     s = o->userdata;
518
519     /*pa_log(__FILE__": get_latency: %u\n", pa_memblockq_get_length(s->memblockq));*/
520     
521     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &o->sample_spec);
522 }
523
524 /*** pdispatch callbacks ***/
525
526 static void protocol_error(struct connection *c) {
527     pa_log(__FILE__": protocol error, kicking client\n");
528     connection_free(c);
529 }
530
531 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
532     struct connection *c = userdata;
533     struct playback_stream *s;
534     size_t maxlength, tlength, prebuf, minreq;
535     uint32_t sink_index;
536     const char *name, *sink_name;
537     struct pa_sample_spec ss;
538     struct pa_tagstruct *reply;
539     struct pa_sink *sink;
540     pa_volume_t volume;
541     assert(c && t && c->protocol && c->protocol->core);
542     
543     if (pa_tagstruct_gets(t, &name) < 0 ||
544         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
545         pa_tagstruct_getu32(t, &sink_index) < 0 ||
546         pa_tagstruct_gets(t, &sink_name) < 0 ||
547         pa_tagstruct_getu32(t, &maxlength) < 0 ||
548         pa_tagstruct_getu32(t, &tlength) < 0 ||
549         pa_tagstruct_getu32(t, &prebuf) < 0 ||
550         pa_tagstruct_getu32(t, &minreq) < 0 ||
551         pa_tagstruct_getu32(t, &volume) < 0 ||
552         !pa_tagstruct_eof(t)) {
553         protocol_error(c);
554         return;
555     }
556
557     if (!c->authorized) {
558         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
559         return;
560     }
561
562     if (sink_index != (uint32_t) -1)
563         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
564     else
565         sink = pa_namereg_get(c->protocol->core, *sink_name ? sink_name : NULL, PA_NAMEREG_SINK, 1);
566
567     if (!sink) {
568         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
569         return;
570     }
571     
572     if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, tlength, prebuf, minreq, volume))) {
573         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
574         return;
575     }
576     
577     reply = pa_tagstruct_new(NULL, 0);
578     assert(reply);
579     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
580     pa_tagstruct_putu32(reply, tag);
581     pa_tagstruct_putu32(reply, s->index);
582     assert(s->sink_input);
583     pa_tagstruct_putu32(reply, s->sink_input->index);
584     pa_tagstruct_putu32(reply, s->requested_bytes = pa_memblockq_missing(s->memblockq));
585     pa_pstream_send_tagstruct(c->pstream, reply);
586     request_bytes(s);
587 }
588
589 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
590     struct connection *c = userdata;
591     uint32_t channel;
592     assert(c && t);
593     
594     if (pa_tagstruct_getu32(t, &channel) < 0 ||
595         !pa_tagstruct_eof(t)) {
596         protocol_error(c);
597         return;
598     }
599
600     if (!c->authorized) {
601         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
602         return;
603     }
604
605     if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
606         struct playback_stream *s;
607         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
608             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
609             return;
610         }
611
612         playback_stream_free(s);
613     } else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
614         struct record_stream *s;
615         if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
616             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
617             return;
618         }
619
620         record_stream_free(s);
621     } else {
622         struct upload_stream *s;
623         assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
624         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
625             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
626             return;
627         }
628
629         upload_stream_free(s);
630     }
631             
632     pa_pstream_send_simple_ack(c->pstream, tag);
633 }
634
635 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
636     struct connection *c = userdata;
637     struct record_stream *s;
638     size_t maxlength, fragment_size;
639     uint32_t source_index;
640     const char *name, *source_name;
641     struct pa_sample_spec ss;
642     struct pa_tagstruct *reply;
643     struct pa_source *source;
644     assert(c && t && c->protocol && c->protocol->core);
645     
646     if (pa_tagstruct_gets(t, &name) < 0 ||
647         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
648         pa_tagstruct_getu32(t, &source_index) < 0 ||
649         pa_tagstruct_gets(t, &source_name) < 0 ||
650         pa_tagstruct_getu32(t, &maxlength) < 0 ||
651         pa_tagstruct_getu32(t, &fragment_size) < 0 ||
652         !pa_tagstruct_eof(t)) {
653         protocol_error(c);
654         return;
655     }
656
657     if (!c->authorized) {
658         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
659         return;
660     }
661
662     if (source_index != (uint32_t) -1)
663         source = pa_idxset_get_by_index(c->protocol->core->sources, source_index);
664     else
665         source = pa_namereg_get(c->protocol->core, *source_name ? source_name : NULL, PA_NAMEREG_SOURCE, 1);
666
667     if (!source) {
668         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
669         return;
670     }
671     
672     if (!(s = record_stream_new(c, source, &ss, name, maxlength, fragment_size))) {
673         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
674         return;
675     }
676     
677     reply = pa_tagstruct_new(NULL, 0);
678     assert(reply);
679     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
680     pa_tagstruct_putu32(reply, tag);
681     pa_tagstruct_putu32(reply, s->index);
682     assert(s->source_output);
683     pa_tagstruct_putu32(reply, s->source_output->index);
684     pa_pstream_send_tagstruct(c->pstream, reply);
685 }
686
687 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
688     struct connection *c = userdata;
689     assert(c && t);
690     
691     if (!pa_tagstruct_eof(t)) {
692         protocol_error(c);
693         return;
694     }
695
696     if (!c->authorized) {
697         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
698         return;
699     }
700     
701     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
702     c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
703     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
704     return;
705 }
706
707 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
708     struct connection *c = userdata;
709     const void*cookie;
710     assert(c && t);
711
712     if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
713         !pa_tagstruct_eof(t)) {
714         protocol_error(c);
715         return;
716     }
717
718     if (!c->authorized) {
719         if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
720             pa_log(__FILE__": Denied access to client with invalid authorization key.\n");
721             pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
722             return;
723         }
724         
725         c->authorized = 1;
726     }
727     
728     pa_pstream_send_simple_ack(c->pstream, tag);
729     return;
730 }
731
732 static void command_set_client_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
733     struct connection *c = userdata;
734     const char *name;
735     assert(c && t);
736
737     if (pa_tagstruct_gets(t, &name) < 0 ||
738         !pa_tagstruct_eof(t)) {
739         protocol_error(c);
740         return;
741     }
742
743     pa_client_set_name(c->client, name);
744     pa_pstream_send_simple_ack(c->pstream, tag);
745     return;
746 }
747
748 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
749     struct connection *c = userdata;
750     const char *name;
751     uint32_t index = PA_IDXSET_INVALID;
752     assert(c && t);
753
754     if (pa_tagstruct_gets(t, &name) < 0 ||
755         !pa_tagstruct_eof(t)) {
756         protocol_error(c);
757         return;
758     }
759
760     if (!c->authorized) {
761         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
762         return;
763     }
764
765     if (command == PA_COMMAND_LOOKUP_SINK) {
766         struct pa_sink *sink;
767         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1)))
768             index = sink->index;
769     } else {
770         struct pa_source *source;
771         assert(command == PA_COMMAND_LOOKUP_SOURCE);
772         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1)))
773             index = source->index;
774     }
775
776     if (index == PA_IDXSET_INVALID)
777         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
778     else {
779         struct pa_tagstruct *reply;
780         reply = pa_tagstruct_new(NULL, 0);
781         assert(reply);
782         pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
783         pa_tagstruct_putu32(reply, tag);
784         pa_tagstruct_putu32(reply, index);
785         pa_pstream_send_tagstruct(c->pstream, reply);
786     }
787 }
788
789 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
790     struct connection *c = userdata;
791     uint32_t index;
792     struct playback_stream *s;
793     assert(c && t);
794
795     if (pa_tagstruct_getu32(t, &index) < 0 ||
796         !pa_tagstruct_eof(t)) {
797         protocol_error(c);
798         return;
799     }
800
801     if (!c->authorized) {
802         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
803         return;
804     }
805
806     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
807         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
808         return;
809     }
810
811     s->drain_request = 0;
812
813     pa_memblockq_prebuf_disable(s->memblockq);
814     
815     if (!pa_memblockq_is_readable(s->memblockq)) {
816         pa_pstream_send_simple_ack(c->pstream, tag);
817     } else {
818         s->drain_request = 1;
819         s->drain_tag = tag;
820
821         pa_sink_notify(s->sink_input->sink);
822     }
823
824
825 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
826     struct connection *c = userdata;
827     struct pa_tagstruct *reply;
828     assert(c && t);
829
830     if (!pa_tagstruct_eof(t)) {
831         protocol_error(c);
832         return;
833     }
834
835     if (!c->authorized) {
836         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
837         return;
838     }
839
840     reply = pa_tagstruct_new(NULL, 0);
841     assert(reply);
842     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
843     pa_tagstruct_putu32(reply, tag);
844     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total);
845     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total_size);
846     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated);
847     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated_size);
848     pa_tagstruct_putu32(reply, pa_scache_total_size(c->protocol->core));
849     pa_pstream_send_tagstruct(c->pstream, reply);
850 }
851
852 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
853     struct connection *c = userdata;
854     struct pa_tagstruct *reply;
855     struct playback_stream *s;
856     struct timeval tv, now;
857     uint32_t index;
858     assert(c && t);
859     
860     if (pa_tagstruct_getu32(t, &index) < 0 ||
861         pa_tagstruct_get_timeval(t, &tv) < 0 ||
862         !pa_tagstruct_eof(t)) {
863         protocol_error(c);
864         return;
865     }
866
867     if (!c->authorized) {
868         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
869         return;
870     }
871
872     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
873         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
874         return;
875     }
876
877     reply = pa_tagstruct_new(NULL, 0);
878     assert(reply);
879     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
880     pa_tagstruct_putu32(reply, tag);
881     pa_tagstruct_put_usec(reply, pa_sink_input_get_latency(s->sink_input));
882     pa_tagstruct_put_usec(reply, pa_sink_get_latency(s->sink_input->sink));
883     pa_tagstruct_put_usec(reply, 0);
884     pa_tagstruct_put_boolean(reply, pa_memblockq_is_readable(s->memblockq));
885     pa_tagstruct_putu32(reply, pa_memblockq_get_length(s->memblockq));
886     pa_tagstruct_put_timeval(reply, &tv);
887     gettimeofday(&now, NULL);
888     pa_tagstruct_put_timeval(reply, &now);
889     pa_pstream_send_tagstruct(c->pstream, reply);
890 }
891
892 static void command_get_record_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
893     struct connection *c = userdata;
894     struct pa_tagstruct *reply;
895     struct record_stream *s;
896     struct timeval tv, now;
897     uint32_t index;
898     assert(c && t);
899
900     if (pa_tagstruct_getu32(t, &index) < 0 ||
901         pa_tagstruct_get_timeval(t, &tv) < 0 ||
902         !pa_tagstruct_eof(t)) {
903         protocol_error(c);
904         return;
905     }
906
907     if (!c->authorized) {
908         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
909         return;
910     }
911
912     if (!(s = pa_idxset_get_by_index(c->record_streams, index))) {
913         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
914         return;
915     }
916
917     reply = pa_tagstruct_new(NULL, 0);
918     assert(reply);
919     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
920     pa_tagstruct_putu32(reply, tag);
921     pa_tagstruct_put_usec(reply, pa_source_output_get_latency(s->source_output));
922     pa_tagstruct_put_usec(reply, s->source_output->source->monitor_of ? pa_sink_get_latency(s->source_output->source->monitor_of) : 0);
923     pa_tagstruct_put_usec(reply, pa_source_get_latency(s->source_output->source));
924     pa_tagstruct_put_boolean(reply, 0);
925     pa_tagstruct_putu32(reply, pa_memblockq_get_length(s->memblockq));
926     pa_tagstruct_put_timeval(reply, &tv);
927     gettimeofday(&now, NULL);
928     pa_tagstruct_put_timeval(reply, &now);
929     pa_pstream_send_tagstruct(c->pstream, reply);
930 }
931
932
933 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
934     struct connection *c = userdata;
935     struct upload_stream *s;
936     size_t length;
937     const char *name;
938     struct pa_sample_spec ss;
939     struct pa_tagstruct *reply;
940     assert(c && t && c->protocol && c->protocol->core);
941     
942     if (pa_tagstruct_gets(t, &name) < 0 ||
943         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
944         pa_tagstruct_getu32(t, &length) < 0 ||
945         !pa_tagstruct_eof(t)) {
946         protocol_error(c);
947         return;
948     }
949
950     if (!c->authorized) {
951         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
952         return;
953     }
954
955     if ((length % pa_frame_size(&ss)) != 0 || length <= 0 || !*name) {
956         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
957         return;
958     }
959     
960     if (!(s = upload_stream_new(c, &ss, name, length))) {
961         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
962         return;
963     }
964     
965     reply = pa_tagstruct_new(NULL, 0);
966     assert(reply);
967     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
968     pa_tagstruct_putu32(reply, tag);
969     pa_tagstruct_putu32(reply, s->index);
970     pa_pstream_send_tagstruct(c->pstream, reply);
971     
972     reply = pa_tagstruct_new(NULL, 0);
973     assert(reply);
974     pa_tagstruct_putu32(reply, PA_COMMAND_REQUEST);
975     pa_tagstruct_putu32(reply, (uint32_t) -1); /* tag */
976     pa_tagstruct_putu32(reply, s->index);
977     pa_tagstruct_putu32(reply, length);
978     pa_pstream_send_tagstruct(c->pstream, reply);
979 }
980
981 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
982     struct connection *c = userdata;
983     uint32_t channel;
984     struct upload_stream *s;
985     uint32_t index;
986     assert(c && t);
987     
988     if (pa_tagstruct_getu32(t, &channel) < 0 ||
989         !pa_tagstruct_eof(t)) {
990         protocol_error(c);
991         return;
992     }
993
994     if (!c->authorized) {
995         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
996         return;
997     }
998
999     if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
1000         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
1001         return;
1002     }
1003
1004     pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->memchunk, &index);
1005     pa_pstream_send_simple_ack(c->pstream, tag);
1006     upload_stream_free(s);
1007 }
1008
1009 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1010     struct connection *c = userdata;
1011     uint32_t sink_index, volume;
1012     struct pa_sink *sink;
1013     const char *name, *sink_name;
1014     assert(c && t);
1015
1016     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
1017         pa_tagstruct_gets(t, &sink_name) < 0 ||
1018         pa_tagstruct_getu32(t, &volume) < 0 ||
1019         pa_tagstruct_gets(t, &name) < 0 ||
1020         !pa_tagstruct_eof(t)) {
1021         protocol_error(c);
1022         return;
1023     }
1024     
1025     if (!c->authorized) {
1026         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1027         return;
1028     }
1029
1030     if (sink_index != (uint32_t) -1)
1031         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
1032     else
1033         sink = pa_namereg_get(c->protocol->core, *sink_name ? sink_name : NULL, PA_NAMEREG_SINK, 1);
1034
1035     if (!sink) {
1036         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1037         return;
1038     }
1039
1040     if (pa_scache_play_item(c->protocol->core, name, sink, volume) < 0) {
1041         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1042         return;
1043     }
1044
1045     pa_pstream_send_simple_ack(c->pstream, tag);
1046 }
1047
1048 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1049     struct connection *c = userdata;
1050     const char *name;
1051     assert(c && t);
1052
1053     if (pa_tagstruct_gets(t, &name) < 0 ||
1054         !pa_tagstruct_eof(t)) {
1055         protocol_error(c);
1056         return;
1057     }
1058
1059     if (!c->authorized) {
1060         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1061         return;
1062     }
1063
1064     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
1065         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1066         return;
1067     }
1068
1069     pa_pstream_send_simple_ack(c->pstream, tag);
1070 }
1071
1072 static void sink_fill_tagstruct(struct pa_tagstruct *t, struct pa_sink *sink) {
1073     assert(t && sink);
1074     pa_tagstruct_putu32(t, sink->index);
1075     pa_tagstruct_puts(t, sink->name);
1076     pa_tagstruct_puts(t, sink->description ? sink->description : "");
1077     pa_tagstruct_put_sample_spec(t, &sink->sample_spec);
1078     pa_tagstruct_putu32(t, sink->owner ? sink->owner->index : (uint32_t) -1);
1079     pa_tagstruct_putu32(t, sink->volume);
1080     pa_tagstruct_putu32(t, sink->monitor_source->index);
1081     pa_tagstruct_puts(t, sink->monitor_source->name);
1082     pa_tagstruct_put_usec(t, pa_sink_get_latency(sink));
1083 }
1084
1085 static void source_fill_tagstruct(struct pa_tagstruct *t, struct pa_source *source) {
1086     assert(t && source);
1087     pa_tagstruct_putu32(t, source->index);
1088     pa_tagstruct_puts(t, source->name);
1089     pa_tagstruct_puts(t, source->description ? source->description : "");
1090     pa_tagstruct_put_sample_spec(t, &source->sample_spec);
1091     pa_tagstruct_putu32(t, source->owner ? source->owner->index : (uint32_t) -1);
1092     pa_tagstruct_putu32(t, source->monitor_of ? source->monitor_of->index : (uint32_t) -1);
1093     pa_tagstruct_puts(t, source->monitor_of ? source->monitor_of->name : "");
1094     pa_tagstruct_put_usec(t, pa_source_get_latency(source));
1095 }
1096
1097 static void client_fill_tagstruct(struct pa_tagstruct *t, struct pa_client *client) {
1098     assert(t && client);
1099     pa_tagstruct_putu32(t, client->index);
1100     pa_tagstruct_puts(t, client->name);
1101     pa_tagstruct_puts(t, client->protocol_name);
1102     pa_tagstruct_putu32(t, client->owner ? client->owner->index : (uint32_t) -1);
1103 }
1104
1105 static void module_fill_tagstruct(struct pa_tagstruct *t, struct pa_module *module) {
1106     assert(t && module);
1107     pa_tagstruct_putu32(t, module->index);
1108     pa_tagstruct_puts(t, module->name);
1109     pa_tagstruct_puts(t, module->argument ? module->argument : "");
1110     pa_tagstruct_putu32(t, module->n_used);
1111     pa_tagstruct_put_boolean(t, module->auto_unload);
1112 }
1113
1114 static void sink_input_fill_tagstruct(struct pa_tagstruct *t, struct pa_sink_input *s) {
1115     assert(t && s);
1116     pa_tagstruct_putu32(t, s->index);
1117     pa_tagstruct_puts(t, s->name ? s->name : "");
1118     pa_tagstruct_putu32(t, s->owner ? s->owner->index : (uint32_t) -1);
1119     pa_tagstruct_putu32(t, s->client ? s->client->index : (uint32_t) -1);
1120     pa_tagstruct_putu32(t, s->sink->index);
1121     pa_tagstruct_put_sample_spec(t, &s->sample_spec);
1122     pa_tagstruct_putu32(t, s->volume);
1123     pa_tagstruct_put_usec(t, pa_sink_input_get_latency(s));
1124     pa_tagstruct_put_usec(t, pa_sink_get_latency(s->sink));
1125 }
1126
1127 static void source_output_fill_tagstruct(struct pa_tagstruct *t, struct pa_source_output *s) {
1128     assert(t && s);
1129     pa_tagstruct_putu32(t, s->index);
1130     pa_tagstruct_puts(t, s->name ? s->name : "");
1131     pa_tagstruct_putu32(t, s->owner ? s->owner->index : (uint32_t) -1);
1132     pa_tagstruct_putu32(t, s->client ? s->client->index : (uint32_t) -1);
1133     pa_tagstruct_putu32(t, s->source->index);
1134     pa_tagstruct_put_sample_spec(t, &s->sample_spec);
1135     pa_tagstruct_put_usec(t, pa_source_output_get_latency(s));
1136     pa_tagstruct_put_usec(t, pa_source_get_latency(s->source));
1137 }
1138
1139 static void scache_fill_tagstruct(struct pa_tagstruct *t, struct pa_scache_entry *e) {
1140     assert(t && e);
1141     pa_tagstruct_putu32(t, e->index);
1142     pa_tagstruct_puts(t, e->name);
1143     pa_tagstruct_putu32(t, e->volume);
1144     pa_tagstruct_put_usec(t, pa_bytes_to_usec(e->memchunk.length, &e->sample_spec));
1145     pa_tagstruct_put_sample_spec(t, &e->sample_spec);
1146     pa_tagstruct_putu32(t, e->memchunk.length);
1147     pa_tagstruct_put_boolean(t, e->lazy);
1148     pa_tagstruct_puts(t, e->filename);
1149 }
1150
1151 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1152     struct connection *c = userdata;
1153     uint32_t index;
1154     struct pa_sink *sink = NULL;
1155     struct pa_source *source = NULL;
1156     struct pa_client *client = NULL;
1157     struct pa_module *module = NULL;
1158     struct pa_sink_input *si = NULL;
1159     struct pa_source_output *so = NULL;
1160     struct pa_scache_entry *sce = NULL;
1161     const char *name;
1162     struct pa_tagstruct *reply;
1163     assert(c && t);
1164
1165     
1166     if (pa_tagstruct_getu32(t, &index) < 0 ||
1167         (command != PA_COMMAND_GET_CLIENT_INFO &&
1168          command != PA_COMMAND_GET_MODULE_INFO &&
1169          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
1170          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
1171          pa_tagstruct_gets(t, &name) < 0) ||
1172         !pa_tagstruct_eof(t)) {
1173         protocol_error(c);
1174         return;
1175     }
1176     
1177     if (!c->authorized) {
1178         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1179         return;
1180     }
1181
1182     if (command == PA_COMMAND_GET_SINK_INFO) {
1183         if (index != (uint32_t) -1)
1184             sink = pa_idxset_get_by_index(c->protocol->core->sinks, index);
1185         else
1186             sink = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SINK, 1);
1187     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
1188         if (index != (uint32_t) -1)
1189             source = pa_idxset_get_by_index(c->protocol->core->sources, index);
1190         else
1191             source = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SOURCE, 1);
1192     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
1193         client = pa_idxset_get_by_index(c->protocol->core->clients, index);
1194     else if (command == PA_COMMAND_GET_MODULE_INFO) 
1195         module = pa_idxset_get_by_index(c->protocol->core->modules, index);
1196     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
1197         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index);
1198     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
1199         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, index);
1200     else {
1201         assert(command == PA_COMMAND_GET_SAMPLE_INFO && name);
1202         if (index != (uint32_t) -1)
1203             sce = pa_idxset_get_by_index(c->protocol->core->scache, index);
1204         else
1205             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE, 0);
1206     }
1207             
1208     if (!sink && !source && !client && !module && !si && !so && !sce) {
1209         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1210         return;
1211     }
1212
1213     reply = pa_tagstruct_new(NULL, 0);
1214     assert(reply);
1215     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1216     pa_tagstruct_putu32(reply, tag); 
1217     if (sink)
1218         sink_fill_tagstruct(reply, sink);
1219     else if (source)
1220         source_fill_tagstruct(reply, source);
1221     else if (client)
1222         client_fill_tagstruct(reply, client);
1223     else if (module)
1224         module_fill_tagstruct(reply, module);
1225     else if (si)
1226         sink_input_fill_tagstruct(reply, si);
1227     else if (so)
1228         source_output_fill_tagstruct(reply, so);
1229     else
1230         scache_fill_tagstruct(reply, sce);
1231     pa_pstream_send_tagstruct(c->pstream, reply);
1232 }
1233
1234 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1235     struct connection *c = userdata;
1236     struct pa_idxset *i;
1237     uint32_t index;
1238     void *p;
1239     struct pa_tagstruct *reply;
1240     assert(c && t);
1241
1242     if (!pa_tagstruct_eof(t)) {
1243         protocol_error(c);
1244         return;
1245     }
1246     
1247     if (!c->authorized) {
1248         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1249         return;
1250     }
1251
1252     reply = pa_tagstruct_new(NULL, 0);
1253     assert(reply);
1254     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1255     pa_tagstruct_putu32(reply, tag);
1256
1257     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1258         i = c->protocol->core->sinks;
1259     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1260         i = c->protocol->core->sources;
1261     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1262         i = c->protocol->core->clients;
1263     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
1264         i = c->protocol->core->modules;
1265     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
1266         i = c->protocol->core->sink_inputs;
1267     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
1268         i = c->protocol->core->source_outputs;
1269     else {
1270         assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
1271         i = c->protocol->core->scache;
1272     }
1273
1274     if (i) {
1275         for (p = pa_idxset_first(i, &index); p; p = pa_idxset_next(i, &index)) {
1276             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1277                 sink_fill_tagstruct(reply, p);
1278             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1279                 source_fill_tagstruct(reply, p);
1280             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1281                 client_fill_tagstruct(reply, p);
1282             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
1283                 module_fill_tagstruct(reply, p);
1284             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
1285                 sink_input_fill_tagstruct(reply, p);
1286             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST) 
1287                 source_output_fill_tagstruct(reply, p);
1288             else {
1289                 assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
1290                 scache_fill_tagstruct(reply, p);
1291             }
1292         }
1293     }
1294     
1295     pa_pstream_send_tagstruct(c->pstream, reply);
1296 }
1297
1298 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1299     struct connection *c = userdata;
1300     struct pa_tagstruct *reply;
1301     char txt[256];
1302     const char *n;
1303     assert(c && t);
1304
1305     if (!pa_tagstruct_eof(t)) {
1306         protocol_error(c);
1307         return;
1308     }
1309     
1310     if (!c->authorized) {
1311         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1312         return;
1313     }
1314
1315     reply = pa_tagstruct_new(NULL, 0);
1316     assert(reply);
1317     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1318     pa_tagstruct_putu32(reply, tag);
1319     pa_tagstruct_puts(reply, PACKAGE_NAME);
1320     pa_tagstruct_puts(reply, PACKAGE_VERSION);
1321     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
1322     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
1323     pa_tagstruct_put_sample_spec(reply, &c->protocol->core->default_sample_spec);
1324
1325     n = pa_namereg_get_default_sink_name(c->protocol->core);
1326     pa_tagstruct_puts(reply, n ? n : "");
1327     n = pa_namereg_get_default_source_name(c->protocol->core);
1328     pa_tagstruct_puts(reply, n ? n : "");
1329     pa_pstream_send_tagstruct(c->pstream, reply);
1330 }
1331
1332 static void subscription_cb(struct pa_core *core, enum pa_subscription_event_type e, uint32_t index, void *userdata) {
1333     struct pa_tagstruct *t;
1334     struct connection *c = userdata;
1335     assert(c && core);
1336
1337     t = pa_tagstruct_new(NULL, 0);
1338     assert(t);
1339     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
1340     pa_tagstruct_putu32(t, (uint32_t) -1);
1341     pa_tagstruct_putu32(t, e);
1342     pa_tagstruct_putu32(t, index);
1343     pa_pstream_send_tagstruct(c->pstream, t);
1344 }
1345
1346 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1347     struct connection *c = userdata;
1348     enum pa_subscription_mask m;
1349     assert(c && t);
1350
1351     if (pa_tagstruct_getu32(t, &m) < 0 ||
1352         !pa_tagstruct_eof(t)) {
1353         protocol_error(c);
1354         return;
1355     }
1356     
1357     if (!c->authorized) {
1358         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1359         return;
1360     }
1361
1362     if (c->subscription)
1363         pa_subscription_free(c->subscription);
1364
1365     if (m != 0) {
1366         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
1367         assert(c->subscription);
1368     } else
1369         c->subscription = NULL;
1370
1371     pa_pstream_send_simple_ack(c->pstream, tag);
1372 }
1373
1374 static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1375     struct connection *c = userdata;
1376     uint32_t index, volume;
1377     struct pa_sink *sink = NULL;
1378     struct pa_sink_input *si = NULL;
1379     const char *name = NULL;
1380     assert(c && t);
1381
1382     if (pa_tagstruct_getu32(t, &index) < 0 ||
1383         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
1384         pa_tagstruct_getu32(t, &volume) ||
1385         !pa_tagstruct_eof(t)) {
1386         protocol_error(c);
1387         return;
1388     }
1389     
1390     if (!c->authorized) {
1391         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1392         return;
1393     }
1394
1395     if (command == PA_COMMAND_SET_SINK_VOLUME) {
1396         if (index != (uint32_t) -1)
1397             sink = pa_idxset_get_by_index(c->protocol->core->sinks, index);
1398         else
1399             sink = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SINK, 1);
1400     }  else {
1401         assert(command == PA_COMMAND_SET_SINK_INPUT_VOLUME);
1402         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index);
1403     }
1404
1405     if (!si && !sink) {
1406         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1407         return;
1408     }
1409
1410     if (sink)
1411         pa_sink_set_volume(sink, volume);
1412     else if (si)
1413         pa_sink_input_set_volume(si, volume);
1414
1415     pa_pstream_send_simple_ack(c->pstream, tag);
1416 }
1417
1418 static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1419     struct connection *c = userdata;
1420     uint32_t index;
1421     uint32_t b;
1422     struct playback_stream *s;
1423     assert(c && t);
1424
1425     if (pa_tagstruct_getu32(t, &index) < 0 ||
1426         pa_tagstruct_getu32(t, &b) < 0 ||
1427         !pa_tagstruct_eof(t)) {
1428         protocol_error(c);
1429         return;
1430     }
1431
1432     if (!c->authorized) {
1433         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1434         return;
1435     }
1436
1437     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1438         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1439         return;
1440     }
1441
1442     pa_sink_input_cork(s->sink_input, b);
1443     pa_pstream_send_simple_ack(c->pstream, tag);
1444 }
1445
1446 static void command_flush_or_trigger_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1447     struct connection *c = userdata;
1448     uint32_t index;
1449     struct playback_stream *s;
1450     assert(c && t);
1451
1452     if (pa_tagstruct_getu32(t, &index) < 0 ||
1453         !pa_tagstruct_eof(t)) {
1454         protocol_error(c);
1455         return;
1456     }
1457
1458     if (!c->authorized) {
1459         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1460         return;
1461     }
1462
1463     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1464         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1465         return;
1466     }
1467
1468     if (command == PA_COMMAND_TRIGGER_PLAYBACK_STREAM)
1469         pa_memblockq_prebuf_disable(s->memblockq);
1470     else {
1471         assert(command == PA_COMMAND_FLUSH_PLAYBACK_STREAM);
1472         pa_memblockq_flush(s->memblockq);
1473         /*pa_log(__FILE__": flush: %u\n", pa_memblockq_get_length(s->memblockq));*/
1474     }
1475
1476     pa_sink_notify(s->sink_input->sink);
1477     pa_pstream_send_simple_ack(c->pstream, tag);
1478     request_bytes(s);
1479 }
1480
1481 static void command_set_default_sink_or_source(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1482     struct connection *c = userdata;
1483     uint32_t index;
1484     const char *s;
1485     assert(c && t);
1486
1487     if (pa_tagstruct_getu32(t, &index) < 0 ||
1488         pa_tagstruct_gets(t, &s) < 0 ||
1489         !pa_tagstruct_eof(t)) {
1490         protocol_error(c);
1491         return;
1492     }
1493
1494     if (!c->authorized) {
1495         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1496         return;
1497     }
1498
1499     pa_namereg_set_default(c->protocol->core, s, command == PA_COMMAND_SET_DEFAULT_SOURCE ? PA_NAMEREG_SOURCE : PA_NAMEREG_SINK);
1500     pa_pstream_send_simple_ack(c->pstream, tag);
1501 }
1502
1503 static void command_set_stream_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1504     struct connection *c = userdata;
1505     uint32_t index;
1506     const char *name;
1507     assert(c && t);
1508
1509     if (pa_tagstruct_getu32(t, &index) < 0 ||
1510         pa_tagstruct_gets(t, &name) < 0 ||
1511         !pa_tagstruct_eof(t)) {
1512         protocol_error(c);
1513         return;
1514     }
1515     
1516     if (!c->authorized) {
1517         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1518         return;
1519     }
1520
1521     if (command == PA_COMMAND_SET_PLAYBACK_STREAM_NAME) {
1522         struct playback_stream *s;
1523         
1524         if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1525             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1526             return;
1527         }
1528
1529         pa_sink_input_set_name(s->sink_input, name);
1530         
1531     } else {
1532         struct record_stream *s;
1533         
1534         if (!(s = pa_idxset_get_by_index(c->record_streams, index))) {
1535             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1536             return;
1537         }
1538
1539         pa_source_output_set_name(s->source_output, name);
1540     }
1541
1542     pa_pstream_send_simple_ack(c->pstream, tag);
1543 }
1544
1545 static void command_kill(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1546     struct connection *c = userdata;
1547     uint32_t index;
1548     assert(c && t);
1549
1550     if (pa_tagstruct_getu32(t, &index) < 0 ||
1551         !pa_tagstruct_eof(t)) {
1552         protocol_error(c);
1553         return;
1554     }
1555     
1556     if (!c->authorized) {
1557         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1558         return;
1559     }
1560
1561     if (command == PA_COMMAND_KILL_CLIENT) {
1562         struct pa_client *client;
1563         
1564         if (!(client = pa_idxset_get_by_index(c->protocol->core->clients, index))) {
1565             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1566             return;
1567         }
1568
1569         pa_client_kill(client);
1570     } else if (command == PA_COMMAND_KILL_SINK_INPUT) {
1571         struct pa_sink_input *s;
1572         
1573         if (!(s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index))) {
1574             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1575             return;
1576         }
1577
1578         pa_sink_input_kill(s);
1579     } else {
1580         struct pa_source_output *s;
1581
1582         assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
1583         
1584         if (!(s = pa_idxset_get_by_index(c->protocol->core->source_outputs, index))) {
1585             pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1586             return;
1587         }
1588
1589         pa_source_output_kill(s);
1590     }
1591
1592     pa_pstream_send_simple_ack(c->pstream, tag);
1593 }
1594
1595 static void command_load_module(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1596     struct connection *c = userdata;
1597     struct pa_module *m;
1598     const char *name, *argument;
1599     struct pa_tagstruct *reply;
1600     assert(c && t);
1601
1602     if (pa_tagstruct_gets(t, &name) < 0 ||
1603         pa_tagstruct_gets(t, &argument) < 0 ||
1604         !pa_tagstruct_eof(t)) {
1605         protocol_error(c);
1606         return;
1607     }
1608     
1609     if (!c->authorized) {
1610         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1611         return;
1612     }
1613
1614     if (!(m = pa_module_load(c->protocol->core, name, argument))) {
1615         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INITFAILED);
1616         return;
1617     }
1618
1619     reply = pa_tagstruct_new(NULL, 0);
1620     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1621     pa_tagstruct_putu32(reply, tag);
1622     pa_tagstruct_putu32(reply, m->index);
1623     pa_pstream_send_tagstruct(c->pstream, reply);
1624 }
1625
1626 static void command_unload_module(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1627     struct connection *c = userdata;
1628     uint32_t index;
1629     struct pa_module *m;
1630     assert(c && t);
1631
1632     if (pa_tagstruct_getu32(t, &index) < 0 ||
1633         !pa_tagstruct_eof(t)) {
1634         protocol_error(c);
1635         return;
1636     }
1637     
1638     if (!c->authorized) {
1639         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1640         return;
1641     }
1642
1643     if (!(m = pa_idxset_get_by_index(c->protocol->core->modules, index))) {
1644         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1645         return;
1646     }
1647
1648     pa_module_unload_request(m);
1649     pa_pstream_send_simple_ack(c->pstream, tag);
1650 }
1651
1652 static void command_add_autoload(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1653     struct connection *c = userdata;
1654     const char *name, *module, *argument;
1655     uint32_t type;
1656     assert(c && t);
1657
1658     if (pa_tagstruct_gets(t, &name) < 0 ||
1659         pa_tagstruct_getu32(t, &type) < 0 || type > 1 ||
1660         pa_tagstruct_gets(t, &module) < 0 ||
1661         pa_tagstruct_gets(t, &argument) < 0 ||
1662         !pa_tagstruct_eof(t)) {
1663         protocol_error(c);
1664         return;
1665     }
1666     
1667     if (!c->authorized) {
1668         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1669         return;
1670     }
1671
1672     if (pa_autoload_add(c->protocol->core, name, type == 0 ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE, module, argument) < 0) {
1673         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
1674         return;
1675     }
1676
1677     pa_pstream_send_simple_ack(c->pstream, tag);
1678 }
1679
1680 static void command_remove_autoload(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1681     struct connection *c = userdata;
1682     const char *name;
1683     uint32_t type;
1684     assert(c && t);
1685
1686     if (pa_tagstruct_gets(t, &name) < 0 ||
1687         pa_tagstruct_getu32(t, &type) < 0 || type > 1 ||
1688         !pa_tagstruct_eof(t)) {
1689         protocol_error(c);
1690         return;
1691     }
1692     
1693     if (!c->authorized) {
1694         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1695         return;
1696     }
1697
1698     if (pa_autoload_remove(c->protocol->core, name, type == 0 ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE) < 0) {
1699         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1700         return;
1701     }
1702
1703     pa_pstream_send_simple_ack(c->pstream, tag);
1704 }
1705
1706 static void autoload_fill_tagstruct(struct pa_tagstruct *t, struct pa_autoload_entry *e) {
1707     assert(t && e);
1708     pa_tagstruct_puts(t, e->name);
1709     pa_tagstruct_putu32(t, e->type == PA_NAMEREG_SINK ? 0 : 1);
1710     pa_tagstruct_puts(t, e->module);
1711     pa_tagstruct_puts(t, e->argument);
1712 }
1713
1714 static void command_get_autoload_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1715     struct connection *c = userdata;
1716     struct pa_autoload_entry *a = NULL;
1717     uint32_t type;
1718     const char *name;
1719     struct pa_tagstruct *reply;
1720     assert(c && t);
1721     
1722     if (pa_tagstruct_gets(t, &name) < 0 ||
1723         pa_tagstruct_getu32(t, &type) < 0 || type > 1 ||
1724         !pa_tagstruct_eof(t)) {
1725         protocol_error(c);
1726         return;
1727     }
1728
1729     if (!c->authorized) {
1730         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1731         return;
1732     }
1733
1734     if (!c->protocol->core->autoload_hashmap || !(a = pa_hashmap_get(c->protocol->core->autoload_hashmap, name)) || (a->type == PA_NAMEREG_SINK && type != 0) || (a->type == PA_NAMEREG_SOURCE && type != 1)) {
1735         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1736         return;
1737     }
1738
1739     reply = pa_tagstruct_new(NULL, 0);
1740     assert(reply);
1741     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1742     pa_tagstruct_putu32(reply, tag);
1743     autoload_fill_tagstruct(reply, a);
1744     pa_pstream_send_tagstruct(c->pstream, reply);
1745 }
1746
1747 static void command_get_autoload_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1748     struct connection *c = userdata;
1749     struct pa_tagstruct *reply;
1750     assert(c && t);
1751
1752     if (!pa_tagstruct_eof(t)) {
1753         protocol_error(c);
1754         return;
1755     }
1756     
1757     if (!c->authorized) {
1758         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1759         return;
1760     }
1761
1762     reply = pa_tagstruct_new(NULL, 0);
1763     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1764     pa_tagstruct_putu32(reply, tag);
1765
1766     if (c->protocol->core->autoload_hashmap) {
1767         struct pa_autoload_entry *a;
1768         void *state = NULL;
1769
1770         while ((a = pa_hashmap_iterate(c->protocol->core->autoload_hashmap, &state)))
1771             autoload_fill_tagstruct(reply, a);
1772     }
1773     
1774     pa_pstream_send_tagstruct(c->pstream, reply);
1775 }
1776
1777 /*** pstream callbacks ***/
1778
1779 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
1780     struct connection *c = userdata;
1781     assert(p && packet && packet->data && c);
1782
1783     if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
1784         pa_log(__FILE__": invalid packet.\n");
1785         connection_free(c);
1786     }
1787 }
1788
1789 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
1790     struct connection *c = userdata;
1791     struct output_stream *stream;
1792     assert(p && chunk && userdata);
1793
1794     if (!(stream = pa_idxset_get_by_index(c->output_streams, channel))) {
1795         pa_log(__FILE__": client sent block for invalid stream.\n");
1796         connection_free(c);
1797         return;
1798     }
1799
1800     if (stream->type == PLAYBACK_STREAM) {
1801         struct playback_stream *p = (struct playback_stream*) stream;
1802         if (chunk->length >= p->requested_bytes)
1803             p->requested_bytes = 0;
1804         else
1805             p->requested_bytes -= chunk->length;
1806         
1807         pa_memblockq_push_align(p->memblockq, chunk, delta);
1808         assert(p->sink_input);
1809         /*pa_log(__FILE__": after_recv: %u\n", pa_memblockq_get_length(p->memblockq));*/
1810
1811         pa_sink_notify(p->sink_input->sink);
1812 /*         pa_log(__FILE__": Recieved %u bytes.\n", chunk->length); */
1813
1814     } else {
1815         struct upload_stream *u = (struct upload_stream*) stream;
1816         size_t l;
1817         assert(u->type == UPLOAD_STREAM);
1818
1819         if (!u->memchunk.memblock) {
1820             if (u->length == chunk->length) {
1821                 u->memchunk = *chunk;
1822                 pa_memblock_ref(u->memchunk.memblock);
1823                 u->length = 0;
1824             } else {
1825                 u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
1826                 u->memchunk.index = u->memchunk.length = 0;
1827             }
1828         }
1829         
1830         assert(u->memchunk.memblock);
1831         
1832         l = u->length; 
1833         if (l > chunk->length)
1834             l = chunk->length;
1835
1836         if (l > 0) {
1837             memcpy((uint8_t*) u->memchunk.memblock->data + u->memchunk.index + u->memchunk.length,
1838                    (uint8_t*) chunk->memblock->data+chunk->index, l);
1839             u->memchunk.length += l;
1840             u->length -= l;
1841         }
1842     }
1843 }
1844
1845 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
1846     struct connection *c = userdata;
1847     assert(p && c);
1848     connection_free(c);
1849
1850 /*    pa_log(__FILE__": connection died.\n");*/
1851 }
1852
1853
1854 static void pstream_drain_callback(struct pa_pstream *p, void *userdata) {
1855     struct connection *c = userdata;
1856     assert(p && c);
1857
1858     send_memblock(c);
1859 }
1860
1861 /*** client callbacks ***/
1862
1863 static void client_kill_cb(struct pa_client *c) {
1864     assert(c && c->userdata);
1865     connection_free(c->userdata);
1866 }
1867
1868 /*** socket server callbacks ***/
1869
1870 static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) {
1871     struct pa_protocol_native *p = userdata;
1872     struct connection *c;
1873     assert(io && p);
1874
1875     c = pa_xmalloc(sizeof(struct connection));
1876
1877     c->authorized =!! p->public;
1878     c->protocol = p;
1879     assert(p->core);
1880     c->client = pa_client_new(p->core, "NATIVE", "Client");
1881     assert(c->client);
1882     c->client->kill = client_kill_cb;
1883     c->client->userdata = c;
1884     c->client->owner = p->module;
1885     
1886     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->memblock_stat);
1887     assert(c->pstream);
1888
1889     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
1890     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
1891     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
1892     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
1893
1894     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
1895     assert(c->pdispatch);
1896
1897     c->record_streams = pa_idxset_new(NULL, NULL);
1898     c->output_streams = pa_idxset_new(NULL, NULL);
1899     assert(c->record_streams && c->output_streams);
1900
1901     c->rrobin_index = PA_IDXSET_INVALID;
1902     c->subscription = NULL;
1903
1904     pa_idxset_put(p->connections, c, NULL);
1905 }
1906
1907 /*** module entry points ***/
1908
1909 static struct pa_protocol_native* protocol_new_internal(struct pa_core *c, struct pa_module *m, struct pa_modargs *ma) {
1910     struct pa_protocol_native *p;
1911     int public = 0;
1912     assert(c && ma);
1913
1914     if (pa_modargs_get_value_boolean(ma, "public", &public) < 0) {
1915         pa_log(__FILE__": public= expects a boolean argument.\n");
1916         return NULL;
1917     }
1918     
1919     p = pa_xmalloc(sizeof(struct pa_protocol_native));
1920
1921     if (pa_authkey_load_from_home(pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), p->auth_cookie, sizeof(p->auth_cookie)) < 0) {
1922         pa_xfree(p);
1923         return NULL;
1924     }
1925
1926     p->module = m;
1927     p->public = public;
1928     p->server = NULL;
1929     p->core = c;
1930     p->connections = pa_idxset_new(NULL, NULL);
1931     assert(p->connections);
1932
1933     return p;
1934 }
1935
1936 struct pa_protocol_native* pa_protocol_native_new(struct pa_core *core, struct pa_socket_server *server, struct pa_module *m, struct pa_modargs *ma) {
1937     struct pa_protocol_native *p;
1938
1939     if (!(p = protocol_new_internal(core, m, ma)))
1940         return NULL;
1941     
1942     p->server = server;
1943     pa_socket_server_set_callback(p->server, on_connection, p);
1944     
1945     return p;
1946 }
1947
1948 void pa_protocol_native_free(struct pa_protocol_native *p) {
1949     struct connection *c;
1950     assert(p);
1951
1952     while ((c = pa_idxset_first(p->connections, NULL)))
1953         connection_free(c);
1954     pa_idxset_free(p->connections, NULL, NULL);
1955
1956     if (p->server)
1957         pa_socket_server_unref(p->server);
1958     
1959     pa_xfree(p);
1960 }
1961
1962 struct pa_protocol_native* pa_protocol_native_new_iochannel(struct pa_core*core, struct pa_iochannel *io, struct pa_module *m, struct pa_modargs *ma) {
1963     struct pa_protocol_native *p;
1964
1965     if (!(p = protocol_new_internal(core, m, ma)))
1966         return NULL;
1967
1968     on_connection(NULL, io, p);
1969     
1970     return p;
1971 }