new features:
[profile/ivi/pulseaudio-panda.git] / polyp / protocol-native.c
1 /* $Id$ */
2
3 /***
4   This file is part of polypaudio.
5  
6   polypaudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   polypaudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU General Public License
17   along with polypaudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <string.h>
27 #include <stdio.h>
28 #include <assert.h>
29 #include <stdlib.h>
30
31 #include "protocol-native.h"
32 #include "native-common.h"
33 #include "packet.h"
34 #include "client.h"
35 #include "source-output.h"
36 #include "sink-input.h"
37 #include "pstream.h"
38 #include "tagstruct.h"
39 #include "pdispatch.h"
40 #include "pstream-util.h"
41 #include "authkey.h"
42 #include "namereg.h"
43 #include "scache.h"
44 #include "xmalloc.h"
45 #include "util.h"
46 #include "subscribe.h"
47
48 struct connection;
49 struct pa_protocol_native;
50
51 struct record_stream {
52     struct connection *connection;
53     uint32_t index;
54     struct pa_source_output *source_output;
55     struct pa_memblockq *memblockq;
56     size_t fragment_size;
57 };
58
59 struct playback_stream {
60     int type;
61     struct connection *connection;
62     uint32_t index;
63     struct pa_sink_input *sink_input;
64     struct pa_memblockq *memblockq;
65     size_t requested_bytes;
66     int drain_request;
67     uint32_t drain_tag;
68 };
69
70 struct upload_stream {
71     int type;
72     struct connection *connection;
73     uint32_t index;
74     struct pa_memchunk memchunk;
75     size_t length;
76     char *name;
77     struct pa_sample_spec sample_spec;
78 };
79
80 struct output_stream {
81     int type;
82 };
83
84 enum {
85     UPLOAD_STREAM,
86     PLAYBACK_STREAM
87 };
88
89 struct connection {
90     int authorized;
91     struct pa_protocol_native *protocol;
92     struct pa_client *client;
93     struct pa_pstream *pstream;
94     struct pa_pdispatch *pdispatch;
95     struct pa_idxset *record_streams, *output_streams;
96     uint32_t rrobin_index;
97     struct pa_subscription *subscription;
98 };
99
100 struct pa_protocol_native {
101     struct pa_module *module;
102     int public;
103     struct pa_core *core;
104     struct pa_socket_server *server;
105     struct pa_idxset *connections;
106     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
107 };
108
109 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
110 static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
111 static void sink_input_kill_cb(struct pa_sink_input *i);
112 static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
113
114 static void request_bytes(struct playback_stream*s);
115
116 static void source_output_kill_cb(struct pa_source_output *o);
117 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk);
118
119 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
120 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
121 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
122 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
123 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
124 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
125 static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
126 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
127 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
128 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
129 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
130 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
131 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
132 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
133 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
134 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
135 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
136 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
137 static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
138 static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
139 static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
140
141 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
142     [PA_COMMAND_ERROR] = { NULL },
143     [PA_COMMAND_TIMEOUT] = { NULL },
144     [PA_COMMAND_REPLY] = { NULL },
145     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
146     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_stream },
147     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = { command_drain_playback_stream },
148     [PA_COMMAND_CREATE_RECORD_STREAM] = { command_create_record_stream },
149     [PA_COMMAND_DELETE_RECORD_STREAM] = { command_delete_stream },
150     [PA_COMMAND_AUTH] = { command_auth },
151     [PA_COMMAND_REQUEST] = { NULL },
152     [PA_COMMAND_EXIT] = { command_exit },
153     [PA_COMMAND_SET_NAME] = { command_set_name },
154     [PA_COMMAND_LOOKUP_SINK] = { command_lookup },
155     [PA_COMMAND_LOOKUP_SOURCE] = { command_lookup },
156     [PA_COMMAND_STAT] = { command_stat },
157     [PA_COMMAND_GET_PLAYBACK_LATENCY] = { command_get_playback_latency },
158     [PA_COMMAND_CREATE_UPLOAD_STREAM] = { command_create_upload_stream },
159     [PA_COMMAND_DELETE_UPLOAD_STREAM] = { command_delete_stream },
160     [PA_COMMAND_FINISH_UPLOAD_STREAM] = { command_finish_upload_stream },
161     [PA_COMMAND_PLAY_SAMPLE] = { command_play_sample },
162     [PA_COMMAND_REMOVE_SAMPLE] = { command_remove_sample },
163     [PA_COMMAND_GET_SINK_INFO] = { command_get_info },
164     [PA_COMMAND_GET_SOURCE_INFO] = { command_get_info },
165     [PA_COMMAND_GET_CLIENT_INFO] = { command_get_info },
166     [PA_COMMAND_GET_MODULE_INFO] = { command_get_info },
167     [PA_COMMAND_GET_SINK_INPUT_INFO] = { command_get_info },
168     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO] = { command_get_info },
169     [PA_COMMAND_GET_SAMPLE_INFO] = { command_get_info },
170     [PA_COMMAND_GET_SINK_INFO_LIST] = { command_get_info_list },
171     [PA_COMMAND_GET_SOURCE_INFO_LIST] = { command_get_info_list },
172     [PA_COMMAND_GET_MODULE_INFO_LIST] = { command_get_info_list },
173     [PA_COMMAND_GET_CLIENT_INFO_LIST] = { command_get_info_list },
174     [PA_COMMAND_GET_SINK_INPUT_INFO_LIST] = { command_get_info_list },
175     [PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST] = { command_get_info_list },
176     [PA_COMMAND_GET_SAMPLE_INFO_LIST] = { command_get_info_list },
177     [PA_COMMAND_GET_SERVER_INFO] = { command_get_server_info },
178     [PA_COMMAND_SUBSCRIBE] = { command_subscribe },
179     [PA_COMMAND_SET_SINK_VOLUME] = { command_set_volume },
180     [PA_COMMAND_SET_SINK_INPUT_VOLUME] = { command_set_volume },
181     [PA_COMMAND_CORK_PLAYBACK_STREAM] = { command_cork_playback_stream },
182     [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = { command_flush_playback_stream },
183 };
184
185 /* structure management */
186
187 static struct upload_stream* upload_stream_new(struct connection *c, const struct pa_sample_spec *ss, const char *name, size_t length) {
188     struct upload_stream *s;
189     assert(c && ss && name && length);
190     
191     s = pa_xmalloc(sizeof(struct upload_stream));
192     s->type = UPLOAD_STREAM;
193     s->connection = c;
194     s->sample_spec = *ss;
195     s->name = pa_xstrdup(name);
196
197     s->memchunk.memblock = NULL;
198     s->memchunk.index = 0;
199     s->memchunk.length = 0;
200
201     s->length = length;
202     
203     pa_idxset_put(c->output_streams, s, &s->index);
204     return s;
205 }
206
207 static void upload_stream_free(struct upload_stream *o) {
208     assert(o && o->connection);
209
210     pa_idxset_remove_by_data(o->connection->output_streams, o, NULL);
211
212     pa_xfree(o->name);
213     
214     if (o->memchunk.memblock)
215         pa_memblock_unref(o->memchunk.memblock);
216     
217     pa_xfree(o);
218 }
219
220 static struct record_stream* record_stream_new(struct connection *c, struct pa_source *source, const struct pa_sample_spec *ss, const char *name, size_t maxlength, size_t fragment_size) {
221     struct record_stream *s;
222     struct pa_source_output *source_output;
223     size_t base;
224     assert(c && source && ss && name && maxlength);
225
226     if (!(source_output = pa_source_output_new(source, name, ss)))
227         return NULL;
228
229     s = pa_xmalloc(sizeof(struct record_stream));
230     s->connection = c;
231     s->source_output = source_output;
232     s->source_output->push = source_output_push_cb;
233     s->source_output->kill = source_output_kill_cb;
234     s->source_output->userdata = s;
235     s->source_output->owner = c->protocol->module;
236     s->source_output->client = c->client;
237
238     s->memblockq = pa_memblockq_new(maxlength, 0, base = pa_frame_size(ss), 0, 0, c->protocol->core->memblock_stat);
239     assert(s->memblockq);
240
241     s->fragment_size = (fragment_size/base)*base;
242     if (!s->fragment_size)
243         s->fragment_size = base;
244
245     pa_idxset_put(c->record_streams, s, &s->index);
246     return s;
247 }
248
249 static void record_stream_free(struct record_stream* r) {
250     assert(r && r->connection);
251
252     pa_idxset_remove_by_data(r->connection->record_streams, r, NULL);
253     pa_source_output_free(r->source_output);
254     pa_memblockq_free(r->memblockq);
255     pa_xfree(r);
256 }
257
258 static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, const struct pa_sample_spec *ss, const char *name,
259                                                    size_t maxlength,
260                                                    size_t tlength,
261                                                    size_t prebuf,
262                                                    size_t minreq) {
263     struct playback_stream *s;
264     struct pa_sink_input *sink_input;
265     assert(c && sink && ss && name && maxlength);
266
267     if (!(sink_input = pa_sink_input_new(sink, name, ss)))
268         return NULL;
269     
270     s = pa_xmalloc(sizeof(struct playback_stream));
271     s->type = PLAYBACK_STREAM;
272     s->connection = c;
273     s->sink_input = sink_input;
274     
275     s->sink_input->peek = sink_input_peek_cb;
276     s->sink_input->drop = sink_input_drop_cb;
277     s->sink_input->kill = sink_input_kill_cb;
278     s->sink_input->get_latency = sink_input_get_latency_cb;
279     s->sink_input->userdata = s;
280     s->sink_input->owner = c->protocol->module;
281     s->sink_input->client = c->client;
282     
283     s->memblockq = pa_memblockq_new(maxlength, tlength, pa_frame_size(ss), prebuf, minreq, c->protocol->core->memblock_stat);
284     assert(s->memblockq);
285
286     s->requested_bytes = 0;
287     s->drain_request = 0;
288     
289     pa_idxset_put(c->output_streams, s, &s->index);
290     return s;
291 }
292
293 static void playback_stream_free(struct playback_stream* p) {
294     assert(p && p->connection);
295
296     if (p->drain_request)
297         pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERROR_NOENTITY);
298
299     pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
300     pa_sink_input_free(p->sink_input);
301     pa_memblockq_free(p->memblockq);
302     pa_xfree(p);
303 }
304
305 static void connection_free(struct connection *c) {
306     struct record_stream *r;
307     struct output_stream *o;
308     assert(c && c->protocol);
309
310     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
311     while ((r = pa_idxset_first(c->record_streams, NULL)))
312         record_stream_free(r);
313     pa_idxset_free(c->record_streams, NULL, NULL);
314
315     while ((o = pa_idxset_first(c->output_streams, NULL)))
316         if (o->type == PLAYBACK_STREAM)
317             playback_stream_free((struct playback_stream*) o);
318         else
319             upload_stream_free((struct upload_stream*) o);
320     pa_idxset_free(c->output_streams, NULL, NULL);
321
322     pa_pdispatch_unref(c->pdispatch);
323     pa_pstream_close(c->pstream);
324     pa_pstream_unref(c->pstream);
325     pa_client_free(c->client);
326
327     if (c->subscription)
328         pa_subscription_free(c->subscription);
329     
330     pa_xfree(c);
331 }
332
333 static void request_bytes(struct playback_stream *s) {
334     struct pa_tagstruct *t;
335     size_t l;
336     assert(s);
337
338     if (!(l = pa_memblockq_missing(s->memblockq)))
339         return;
340
341     if (l <= s->requested_bytes)
342         return;
343
344     l -= s->requested_bytes;
345
346     if (l < pa_memblockq_get_minreq(s->memblockq))
347         return;
348     
349     s->requested_bytes += l;
350
351     t = pa_tagstruct_new(NULL, 0);
352     assert(t);
353     pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
354     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
355     pa_tagstruct_putu32(t, s->index);
356     pa_tagstruct_putu32(t, l);
357     pa_pstream_send_tagstruct(s->connection->pstream, t);
358
359     /*fprintf(stderr, "Requesting %u bytes\n", l);*/
360 }
361
362 static void send_memblock(struct connection *c) {
363     uint32_t start;
364     struct record_stream *r;
365
366     start = PA_IDXSET_INVALID;
367     for (;;) {
368         struct pa_memchunk chunk;
369         
370         if (!(r = pa_idxset_rrobin(c->record_streams, &c->rrobin_index)))
371             return;
372
373         if (start == PA_IDXSET_INVALID)
374             start = c->rrobin_index;
375         else if (start == c->rrobin_index)
376             return;
377
378         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
379             if (chunk.length > r->fragment_size)
380                 chunk.length = r->fragment_size;
381
382             pa_pstream_send_memblock(c->pstream, r->index, 0, &chunk);
383             pa_memblockq_drop(r->memblockq, &chunk, chunk.length);
384             pa_memblock_unref(chunk.memblock);
385             
386             return;
387         }
388     }
389 }
390
391 static void send_playback_stream_killed(struct playback_stream *p) {
392     struct pa_tagstruct *t;
393     assert(p);
394
395     t = pa_tagstruct_new(NULL, 0);
396     assert(t);
397     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
398     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
399     pa_tagstruct_putu32(t, p->index);
400     pa_pstream_send_tagstruct(p->connection->pstream, t);
401 }
402
403 static void send_record_stream_killed(struct record_stream *r) {
404     struct pa_tagstruct *t;
405     assert(r);
406
407     t = pa_tagstruct_new(NULL, 0);
408     assert(t);
409     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
410     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
411     pa_tagstruct_putu32(t, r->index);
412     pa_pstream_send_tagstruct(r->connection->pstream, t);
413 }
414
415
416 /*** sinkinput callbacks ***/
417
418 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) {
419     struct playback_stream *s;
420     assert(i && i->userdata && chunk);
421     s = i->userdata;
422
423     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
424         return -1;
425
426     return 0;
427 }
428
429 static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
430     struct playback_stream *s;
431     assert(i && i->userdata && length);
432     s = i->userdata;
433
434     pa_memblockq_drop(s->memblockq, chunk, length);
435     request_bytes(s);
436
437     if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
438         pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag);
439         s->drain_request = 0;
440     }
441 }
442
443 static void sink_input_kill_cb(struct pa_sink_input *i) {
444     assert(i && i->userdata);
445     send_playback_stream_killed((struct playback_stream *) i->userdata);
446     playback_stream_free((struct playback_stream *) i->userdata);
447 }
448
449 static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i) {
450     struct playback_stream *s;
451     assert(i && i->userdata);
452     s = i->userdata;
453
454     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
455 }
456
457 /*** source_output callbacks ***/
458
459 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk) {
460     struct record_stream *s;
461     assert(o && o->userdata && chunk);
462     s = o->userdata;
463     
464     pa_memblockq_push(s->memblockq, chunk, 0);
465     if (!pa_pstream_is_pending(s->connection->pstream))
466         send_memblock(s->connection);
467 }
468
469 static void source_output_kill_cb(struct pa_source_output *o) {
470     assert(o && o->userdata);
471     send_record_stream_killed((struct record_stream *) o->userdata);
472     record_stream_free((struct record_stream *) o->userdata);
473 }
474
475 /*** pdispatch callbacks ***/
476
477 static void protocol_error(struct connection *c) {
478     fprintf(stderr, __FILE__": protocol error, kicking client\n");
479     connection_free(c);
480 }
481
482 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
483     struct connection *c = userdata;
484     struct playback_stream *s;
485     size_t maxlength, tlength, prebuf, minreq;
486     uint32_t sink_index;
487     const char *name, *sink_name;
488     struct pa_sample_spec ss;
489     struct pa_tagstruct *reply;
490     struct pa_sink *sink;
491     assert(c && t && c->protocol && c->protocol->core);
492     
493     if (pa_tagstruct_gets(t, &name) < 0 ||
494         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
495         pa_tagstruct_getu32(t, &sink_index) < 0 ||
496         pa_tagstruct_gets(t, &sink_name) < 0 ||
497         pa_tagstruct_getu32(t, &maxlength) < 0 ||
498         pa_tagstruct_getu32(t, &tlength) < 0 ||
499         pa_tagstruct_getu32(t, &prebuf) < 0 ||
500         pa_tagstruct_getu32(t, &minreq) < 0 ||
501         !pa_tagstruct_eof(t)) {
502         protocol_error(c);
503         return;
504     }
505
506     if (!c->authorized) {
507         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
508         return;
509     }
510
511     if (sink_index != (uint32_t) -1)
512         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
513     else
514         sink = pa_namereg_get(c->protocol->core, *sink_name ? sink_name : NULL, PA_NAMEREG_SINK, 1);
515
516     if (!sink) {
517         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
518         return;
519     }
520     
521     if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, tlength, prebuf, minreq))) {
522         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
523         return;
524     }
525     
526     reply = pa_tagstruct_new(NULL, 0);
527     assert(reply);
528     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
529     pa_tagstruct_putu32(reply, tag);
530     pa_tagstruct_putu32(reply, s->index);
531     assert(s->sink_input);
532     pa_tagstruct_putu32(reply, s->sink_input->index);
533     pa_pstream_send_tagstruct(c->pstream, reply);
534     request_bytes(s);
535 }
536
537 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
538     struct connection *c = userdata;
539     uint32_t channel;
540     assert(c && t);
541     
542     if (pa_tagstruct_getu32(t, &channel) < 0 ||
543         !pa_tagstruct_eof(t)) {
544         protocol_error(c);
545         return;
546     }
547
548     if (!c->authorized) {
549         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
550         return;
551     }
552
553     if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
554         struct playback_stream *s;
555         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
556             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
557             return;
558         }
559
560         playback_stream_free(s);
561     } else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
562         struct record_stream *s;
563         if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
564             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
565             return;
566         }
567
568         record_stream_free(s);
569     } else {
570         struct upload_stream *s;
571         assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
572         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
573             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
574             return;
575         }
576
577         upload_stream_free(s);
578     }
579             
580     pa_pstream_send_simple_ack(c->pstream, tag);
581 }
582
583 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
584     struct connection *c = userdata;
585     struct record_stream *s;
586     size_t maxlength, fragment_size;
587     uint32_t source_index;
588     const char *name, *source_name;
589     struct pa_sample_spec ss;
590     struct pa_tagstruct *reply;
591     struct pa_source *source;
592     assert(c && t && c->protocol && c->protocol->core);
593     
594     if (pa_tagstruct_gets(t, &name) < 0 ||
595         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
596         pa_tagstruct_getu32(t, &source_index) < 0 ||
597         pa_tagstruct_gets(t, &source_name) < 0 ||
598         pa_tagstruct_getu32(t, &maxlength) < 0 ||
599         pa_tagstruct_getu32(t, &fragment_size) < 0 ||
600         !pa_tagstruct_eof(t)) {
601         protocol_error(c);
602         return;
603     }
604
605     if (!c->authorized) {
606         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
607         return;
608     }
609
610     if (source_index != (uint32_t) -1)
611         source = pa_idxset_get_by_index(c->protocol->core->sources, source_index);
612     else
613         source = pa_namereg_get(c->protocol->core, *source_name ? source_name : NULL, PA_NAMEREG_SOURCE, 1);
614
615     if (!source) {
616         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
617         return;
618     }
619     
620     if (!(s = record_stream_new(c, source, &ss, name, maxlength, fragment_size))) {
621         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
622         return;
623     }
624     
625     reply = pa_tagstruct_new(NULL, 0);
626     assert(reply);
627     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
628     pa_tagstruct_putu32(reply, tag);
629     pa_tagstruct_putu32(reply, s->index);
630     assert(s->source_output);
631     pa_tagstruct_putu32(reply, s->source_output->index);
632     pa_pstream_send_tagstruct(c->pstream, reply);
633 }
634
635 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
636     struct connection *c = userdata;
637     assert(c && t);
638     
639     if (!pa_tagstruct_eof(t)) {
640         protocol_error(c);
641         return;
642     }
643
644     if (!c->authorized) {
645         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
646         return;
647     }
648     
649     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
650     c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
651     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
652     return;
653 }
654
655 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
656     struct connection *c = userdata;
657     const void*cookie;
658     assert(c && t);
659
660     if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
661         !pa_tagstruct_eof(t)) {
662         protocol_error(c);
663         return;
664     }
665         
666     if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
667         fprintf(stderr, "protocol-native.c: Denied access to client with invalid authorization key.\n");
668         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
669         return;
670     }
671
672     c->authorized = 1;
673     pa_pstream_send_simple_ack(c->pstream, tag);
674     return;
675 }
676
677 static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
678     struct connection *c = userdata;
679     const char *name;
680     assert(c && t);
681
682     if (pa_tagstruct_gets(t, &name) < 0 ||
683         !pa_tagstruct_eof(t)) {
684         protocol_error(c);
685         return;
686     }
687
688     pa_client_rename(c->client, name);
689     pa_pstream_send_simple_ack(c->pstream, tag);
690     return;
691 }
692
693 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
694     struct connection *c = userdata;
695     const char *name;
696     uint32_t index = PA_IDXSET_INVALID;
697     assert(c && t);
698
699     if (pa_tagstruct_gets(t, &name) < 0 ||
700         !pa_tagstruct_eof(t)) {
701         protocol_error(c);
702         return;
703     }
704
705     if (!c->authorized) {
706         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
707         return;
708     }
709
710     if (command == PA_COMMAND_LOOKUP_SINK) {
711         struct pa_sink *sink;
712         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1)))
713             index = sink->index;
714     } else {
715         struct pa_source *source;
716         assert(command == PA_COMMAND_LOOKUP_SOURCE);
717         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1)))
718             index = source->index;
719     }
720
721     if (index == PA_IDXSET_INVALID)
722         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
723     else {
724         struct pa_tagstruct *reply;
725         reply = pa_tagstruct_new(NULL, 0);
726         assert(reply);
727         pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
728         pa_tagstruct_putu32(reply, tag);
729         pa_tagstruct_putu32(reply, index);
730         pa_pstream_send_tagstruct(c->pstream, reply);
731     }
732 }
733
734 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
735     struct connection *c = userdata;
736     uint32_t index;
737     struct playback_stream *s;
738     assert(c && t);
739
740     if (pa_tagstruct_getu32(t, &index) < 0 ||
741         !pa_tagstruct_eof(t)) {
742         protocol_error(c);
743         return;
744     }
745
746     if (!c->authorized) {
747         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
748         return;
749     }
750
751     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
752         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
753         return;
754     }
755
756     s->drain_request = 0;
757
758     pa_memblockq_prebuf_disable(s->memblockq);
759     
760     if (!pa_memblockq_is_readable(s->memblockq))
761         pa_pstream_send_simple_ack(c->pstream, tag);
762     else {
763         s->drain_request = 1;
764         s->drain_tag = tag;
765
766         pa_sink_notify(s->sink_input->sink);
767     }
768
769
770 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
771     struct connection *c = userdata;
772     struct pa_tagstruct *reply;
773     assert(c && t);
774
775     if (!pa_tagstruct_eof(t)) {
776         protocol_error(c);
777         return;
778     }
779
780     if (!c->authorized) {
781         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
782         return;
783     }
784
785     reply = pa_tagstruct_new(NULL, 0);
786     assert(reply);
787     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
788     pa_tagstruct_putu32(reply, tag);
789     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total);
790     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total_size);
791     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated);
792     pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated_size);
793     pa_pstream_send_tagstruct(c->pstream, reply);
794 }
795
796 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
797     struct connection *c = userdata;
798     struct pa_tagstruct *reply;
799     struct playback_stream *s;
800     uint32_t index, latency;
801     assert(c && t);
802
803     if (pa_tagstruct_getu32(t, &index) < 0 ||
804         !pa_tagstruct_eof(t)) {
805         protocol_error(c);
806         return;
807     }
808
809     if (!c->authorized) {
810         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
811         return;
812     }
813
814     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
815         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
816         return;
817     }
818
819     latency = pa_sink_input_get_latency(s->sink_input);
820     reply = pa_tagstruct_new(NULL, 0);
821     assert(reply);
822     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
823     pa_tagstruct_putu32(reply, tag);
824     pa_tagstruct_putu32(reply, latency);
825     pa_pstream_send_tagstruct(c->pstream, reply);
826 }
827
828 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
829     struct connection *c = userdata;
830     struct upload_stream *s;
831     size_t length;
832     const char *name;
833     struct pa_sample_spec ss;
834     struct pa_tagstruct *reply;
835     assert(c && t && c->protocol && c->protocol->core);
836     
837     if (pa_tagstruct_gets(t, &name) < 0 ||
838         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
839         pa_tagstruct_getu32(t, &length) < 0 ||
840         !pa_tagstruct_eof(t)) {
841         protocol_error(c);
842         return;
843     }
844
845     if (!c->authorized) {
846         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
847         return;
848     }
849
850     if ((length % pa_frame_size(&ss)) != 0 || length <= 0 || !*name) {
851         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
852         return;
853     }
854     
855     if (!(s = upload_stream_new(c, &ss, name, length))) {
856         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
857         return;
858     }
859     
860     reply = pa_tagstruct_new(NULL, 0);
861     assert(reply);
862     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
863     pa_tagstruct_putu32(reply, tag);
864     pa_tagstruct_putu32(reply, s->index);
865     pa_pstream_send_tagstruct(c->pstream, reply);
866     
867     reply = pa_tagstruct_new(NULL, 0);
868     assert(reply);
869     pa_tagstruct_putu32(reply, PA_COMMAND_REQUEST);
870     pa_tagstruct_putu32(reply, (uint32_t) -1); /* tag */
871     pa_tagstruct_putu32(reply, s->index);
872     pa_tagstruct_putu32(reply, length);
873     pa_pstream_send_tagstruct(c->pstream, reply);
874 }
875
876 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
877     struct connection *c = userdata;
878     uint32_t channel;
879     struct upload_stream *s;
880     uint32_t index;
881     assert(c && t);
882     
883     if (pa_tagstruct_getu32(t, &channel) < 0 ||
884         !pa_tagstruct_eof(t)) {
885         protocol_error(c);
886         return;
887     }
888
889     if (!c->authorized) {
890         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
891         return;
892     }
893
894     if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
895         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
896         return;
897     }
898
899     pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->memchunk, &index);
900     pa_pstream_send_simple_ack(c->pstream, tag);
901     upload_stream_free(s);
902 }
903
904 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
905     struct connection *c = userdata;
906     uint32_t sink_index, volume;
907     struct pa_sink *sink;
908     const char *name, *sink_name;
909     assert(c && t);
910
911     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
912         pa_tagstruct_gets(t, &sink_name) < 0 ||
913         pa_tagstruct_getu32(t, &volume) < 0 ||
914         pa_tagstruct_gets(t, &name) < 0 ||
915         !pa_tagstruct_eof(t)) {
916         protocol_error(c);
917         return;
918     }
919     
920     if (!c->authorized) {
921         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
922         return;
923     }
924
925     if (sink_index != (uint32_t) -1)
926         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
927     else
928         sink = pa_namereg_get(c->protocol->core, *sink_name ? sink_name : NULL, PA_NAMEREG_SINK, 1);
929
930     if (!sink) {
931         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
932         return;
933     }
934
935     if (pa_scache_play_item(c->protocol->core, name, sink, volume) < 0) {
936         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
937         return;
938     }
939
940     pa_pstream_send_simple_ack(c->pstream, tag);
941 }
942
943 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
944     struct connection *c = userdata;
945     const char *name;
946     assert(c && t);
947
948     if (pa_tagstruct_gets(t, &name) < 0 ||
949         !pa_tagstruct_eof(t)) {
950         protocol_error(c);
951         return;
952     }
953
954     if (!c->authorized) {
955         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
956         return;
957     }
958
959     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
960         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
961         return;
962     }
963
964     pa_pstream_send_simple_ack(c->pstream, tag);
965 }
966
967 static void sink_fill_tagstruct(struct pa_tagstruct *t, struct pa_sink *sink) {
968     assert(t && sink);
969     pa_tagstruct_putu32(t, sink->index);
970     pa_tagstruct_puts(t, sink->name);
971     pa_tagstruct_puts(t, sink->description ? sink->description : "");
972     pa_tagstruct_put_sample_spec(t, &sink->sample_spec);
973     pa_tagstruct_putu32(t, sink->owner ? sink->owner->index : (uint32_t) -1);
974     pa_tagstruct_putu32(t, sink->volume);
975     pa_tagstruct_putu32(t, sink->monitor_source->index);
976     pa_tagstruct_puts(t, sink->monitor_source->name);
977     pa_tagstruct_putu32(t, pa_sink_get_latency(sink));
978 }
979
980 static void source_fill_tagstruct(struct pa_tagstruct *t, struct pa_source *source) {
981     assert(t && source);
982     pa_tagstruct_putu32(t, source->index);
983     pa_tagstruct_puts(t, source->name);
984     pa_tagstruct_puts(t, source->description ? source->description : "");
985     pa_tagstruct_put_sample_spec(t, &source->sample_spec);
986     pa_tagstruct_putu32(t, source->owner ? source->owner->index : (uint32_t) -1);
987     pa_tagstruct_putu32(t, source->monitor_of ? source->monitor_of->index : (uint32_t) -1);
988     pa_tagstruct_puts(t, source->monitor_of ? source->monitor_of->name : "");
989 }
990
991 static void client_fill_tagstruct(struct pa_tagstruct *t, struct pa_client *client) {
992     assert(t && client);
993     pa_tagstruct_putu32(t, client->index);
994     pa_tagstruct_puts(t, client->name);
995     pa_tagstruct_puts(t, client->protocol_name);
996     pa_tagstruct_putu32(t, client->owner ? client->owner->index : (uint32_t) -1);
997 }
998
999 static void module_fill_tagstruct(struct pa_tagstruct *t, struct pa_module *module) {
1000     assert(t && module);
1001     pa_tagstruct_putu32(t, module->index);
1002     pa_tagstruct_puts(t, module->name);
1003     pa_tagstruct_puts(t, module->argument ? module->argument : "");
1004     pa_tagstruct_putu32(t, module->n_used);
1005     pa_tagstruct_putu32(t, module->auto_unload);
1006 }
1007
1008 static void sink_input_fill_tagstruct(struct pa_tagstruct *t, struct pa_sink_input *s) {
1009     assert(t && s);
1010     pa_tagstruct_putu32(t, s->index);
1011     pa_tagstruct_puts(t, s->name ? s->name : "");
1012     pa_tagstruct_putu32(t, s->owner ? s->owner->index : (uint32_t) -1);
1013     pa_tagstruct_putu32(t, s->client ? s->client->index : (uint32_t) -1);
1014     pa_tagstruct_putu32(t, s->sink->index);
1015     pa_tagstruct_put_sample_spec(t, &s->sample_spec);
1016     pa_tagstruct_putu32(t, s->volume);
1017     pa_tagstruct_putu32(t, pa_sink_input_get_latency(s));
1018 }
1019
1020 static void source_output_fill_tagstruct(struct pa_tagstruct *t, struct pa_source_output *s) {
1021     assert(t && s);
1022     pa_tagstruct_putu32(t, s->index);
1023     pa_tagstruct_puts(t, s->name ? s->name : "");
1024     pa_tagstruct_putu32(t, s->owner ? s->owner->index : (uint32_t) -1);
1025     pa_tagstruct_putu32(t, s->client ? s->client->index : (uint32_t) -1);
1026     pa_tagstruct_putu32(t, s->source->index);
1027     pa_tagstruct_put_sample_spec(t, &s->sample_spec);
1028 }
1029
1030 static void scache_fill_tagstruct(struct pa_tagstruct *t, struct pa_scache_entry *e) {
1031     assert(t && e);
1032     pa_tagstruct_putu32(t, e->index);
1033     pa_tagstruct_puts(t, e->name);
1034     pa_tagstruct_putu32(t, e->volume);
1035     pa_tagstruct_putu32(t, pa_bytes_to_usec(e->memchunk.length, &e->sample_spec));
1036     pa_tagstruct_put_sample_spec(t, &e->sample_spec);
1037 }
1038
1039 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1040     struct connection *c = userdata;
1041     uint32_t index;
1042     struct pa_sink *sink = NULL;
1043     struct pa_source *source = NULL;
1044     struct pa_client *client = NULL;
1045     struct pa_module *module = NULL;
1046     struct pa_sink_input *si = NULL;
1047     struct pa_source_output *so = NULL;
1048     struct pa_scache_entry *sce = NULL;
1049     const char *name;
1050     struct pa_tagstruct *reply;
1051     assert(c && t);
1052
1053     
1054     if (pa_tagstruct_getu32(t, &index) < 0 ||
1055         (command != PA_COMMAND_GET_CLIENT_INFO &&
1056          command != PA_COMMAND_GET_MODULE_INFO &&
1057          command != PA_COMMAND_GET_SINK_INPUT_INFO &&
1058          command != PA_COMMAND_GET_SOURCE_OUTPUT_INFO &&
1059          pa_tagstruct_gets(t, &name) < 0) ||
1060         !pa_tagstruct_eof(t)) {
1061         protocol_error(c);
1062         return;
1063     }
1064     
1065     if (!c->authorized) {
1066         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1067         return;
1068     }
1069
1070     if (command == PA_COMMAND_GET_SINK_INFO) {
1071         if (index != (uint32_t) -1)
1072             sink = pa_idxset_get_by_index(c->protocol->core->sinks, index);
1073         else
1074             sink = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SINK, 1);
1075     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
1076         if (index != (uint32_t) -1)
1077             source = pa_idxset_get_by_index(c->protocol->core->sources, index);
1078         else
1079             source = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SOURCE, 1);
1080     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
1081         client = pa_idxset_get_by_index(c->protocol->core->clients, index);
1082     else if (command == PA_COMMAND_GET_MODULE_INFO) 
1083         module = pa_idxset_get_by_index(c->protocol->core->modules, index);
1084     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO)
1085         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index);
1086     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO)
1087         so = pa_idxset_get_by_index(c->protocol->core->source_outputs, index);
1088     else {
1089         assert(command == PA_COMMAND_GET_SAMPLE_INFO && name);
1090         if (index != (uint32_t) -1)
1091             sce = pa_idxset_get_by_index(c->protocol->core->scache, index);
1092         else
1093             sce = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SAMPLE, 0);
1094     }
1095             
1096     if (!sink && !source && !client && !module && !si && !so && !sce) {
1097         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1098         return;
1099     }
1100
1101     reply = pa_tagstruct_new(NULL, 0);
1102     assert(reply);
1103     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1104     pa_tagstruct_putu32(reply, tag); 
1105     if (sink)
1106         sink_fill_tagstruct(reply, sink);
1107     else if (source)
1108         source_fill_tagstruct(reply, source);
1109     else if (client)
1110         client_fill_tagstruct(reply, client);
1111     else if (module)
1112         module_fill_tagstruct(reply, module);
1113     else if (si)
1114         sink_input_fill_tagstruct(reply, si);
1115     else if (so)
1116         source_output_fill_tagstruct(reply, so);
1117     else
1118         scache_fill_tagstruct(reply, sce);
1119     pa_pstream_send_tagstruct(c->pstream, reply);
1120 }
1121
1122 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1123     struct connection *c = userdata;
1124     struct pa_idxset *i;
1125     uint32_t index;
1126     void *p;
1127     struct pa_tagstruct *reply;
1128     assert(c && t);
1129
1130     if (!pa_tagstruct_eof(t)) {
1131         protocol_error(c);
1132         return;
1133     }
1134     
1135     if (!c->authorized) {
1136         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1137         return;
1138     }
1139
1140     reply = pa_tagstruct_new(NULL, 0);
1141     assert(reply);
1142     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1143     pa_tagstruct_putu32(reply, tag);
1144
1145     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1146         i = c->protocol->core->sinks;
1147     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1148         i = c->protocol->core->sources;
1149     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1150         i = c->protocol->core->clients;
1151     else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
1152         i = c->protocol->core->modules;
1153     else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
1154         i = c->protocol->core->sink_inputs;
1155     else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST)
1156         i = c->protocol->core->source_outputs;
1157     else {
1158         assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
1159         i = c->protocol->core->scache;
1160     }
1161
1162     if (i) {
1163         for (p = pa_idxset_first(i, &index); p; p = pa_idxset_next(i, &index)) {
1164             if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1165                 sink_fill_tagstruct(reply, p);
1166             else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1167                 source_fill_tagstruct(reply, p);
1168             else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1169                 client_fill_tagstruct(reply, p);
1170             else if (command == PA_COMMAND_GET_MODULE_INFO_LIST)
1171                 module_fill_tagstruct(reply, p);
1172             else if (command == PA_COMMAND_GET_SINK_INPUT_INFO_LIST)
1173                 sink_input_fill_tagstruct(reply, p);
1174             else if (command == PA_COMMAND_GET_SOURCE_OUTPUT_INFO_LIST) 
1175                 source_output_fill_tagstruct(reply, p);
1176             else {
1177                 assert(command == PA_COMMAND_GET_SAMPLE_INFO_LIST);
1178                 scache_fill_tagstruct(reply, p);
1179             }
1180         }
1181     }
1182     
1183     pa_pstream_send_tagstruct(c->pstream, reply);
1184 }
1185
1186 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1187     struct connection *c = userdata;
1188     struct pa_tagstruct *reply;
1189     char txt[256];
1190     assert(c && t);
1191
1192     if (!pa_tagstruct_eof(t)) {
1193         protocol_error(c);
1194         return;
1195     }
1196     
1197     if (!c->authorized) {
1198         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1199         return;
1200     }
1201
1202     reply = pa_tagstruct_new(NULL, 0);
1203     assert(reply);
1204     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1205     pa_tagstruct_putu32(reply, tag);
1206     pa_tagstruct_puts(reply, PACKAGE_NAME);
1207     pa_tagstruct_puts(reply, PACKAGE_VERSION);
1208     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
1209     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
1210     pa_tagstruct_put_sample_spec(reply, &c->protocol->core->default_sample_spec);
1211     pa_pstream_send_tagstruct(c->pstream, reply);
1212 }
1213
1214 static void subscription_cb(struct pa_core *core, enum pa_subscription_event_type e, uint32_t index, void *userdata) {
1215     struct pa_tagstruct *t;
1216     struct connection *c = userdata;
1217     assert(c && core);
1218
1219     t = pa_tagstruct_new(NULL, 0);
1220     assert(t);
1221     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
1222     pa_tagstruct_putu32(t, (uint32_t) -1);
1223     pa_tagstruct_putu32(t, e);
1224     pa_tagstruct_putu32(t, index);
1225     pa_pstream_send_tagstruct(c->pstream, t);
1226 }
1227
1228 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1229     struct connection *c = userdata;
1230     enum pa_subscription_mask m;
1231     assert(c && t);
1232
1233     if (pa_tagstruct_getu32(t, &m) < 0 ||
1234         !pa_tagstruct_eof(t)) {
1235         protocol_error(c);
1236         return;
1237     }
1238     
1239     if (!c->authorized) {
1240         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1241         return;
1242     }
1243
1244     if (c->subscription)
1245         pa_subscription_free(c->subscription);
1246
1247     if (m != 0) {
1248         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
1249         assert(c->subscription);
1250     } else
1251         c->subscription = NULL;
1252
1253     pa_pstream_send_simple_ack(c->pstream, tag);
1254 }
1255
1256 static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1257     struct connection *c = userdata;
1258     uint32_t index, volume;
1259     struct pa_sink *sink = NULL;
1260     struct pa_sink_input *si = NULL;
1261     const char *name = NULL;
1262     assert(c && t);
1263
1264     if (pa_tagstruct_getu32(t, &index) < 0 ||
1265         (command == PA_COMMAND_SET_SINK_VOLUME && pa_tagstruct_gets(t, &name) < 0) ||
1266         pa_tagstruct_getu32(t, &volume) ||
1267         !pa_tagstruct_eof(t)) {
1268         protocol_error(c);
1269         return;
1270     }
1271     
1272     if (!c->authorized) {
1273         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1274         return;
1275     }
1276
1277     if (command == PA_COMMAND_SET_SINK_VOLUME) {
1278         if (index != (uint32_t) -1)
1279             sink = pa_idxset_get_by_index(c->protocol->core->sinks, index);
1280         else
1281             sink = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SINK, 1);
1282     }  else {
1283         assert(command == PA_COMMAND_SET_SINK_INPUT_VOLUME);
1284         si = pa_idxset_get_by_index(c->protocol->core->sink_inputs, index);
1285     }
1286
1287     if (!si && !sink) {
1288         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1289         return;
1290     }
1291
1292     if (sink)
1293         pa_sink_set_volume(sink, volume);
1294     else if (si)
1295         pa_sink_input_set_volume(si, volume);
1296
1297     pa_pstream_send_simple_ack(c->pstream, tag);
1298 }
1299
1300 static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1301     struct connection *c = userdata;
1302     uint32_t index;
1303     uint32_t b;
1304     struct playback_stream *s;
1305     assert(c && t);
1306
1307     if (pa_tagstruct_getu32(t, &index) < 0 ||
1308         pa_tagstruct_getu32(t, &b) < 0 ||
1309         !pa_tagstruct_eof(t)) {
1310         protocol_error(c);
1311         return;
1312     }
1313
1314     if (!c->authorized) {
1315         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1316         return;
1317     }
1318
1319     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1320         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1321         return;
1322     }
1323
1324     pa_sink_input_cork(s->sink_input, b);
1325     pa_pstream_send_simple_ack(c->pstream, tag);
1326 }
1327
1328 static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1329     struct connection *c = userdata;
1330     uint32_t index;
1331     struct playback_stream *s;
1332     assert(c && t);
1333
1334     if (pa_tagstruct_getu32(t, &index) < 0 ||
1335         !pa_tagstruct_eof(t)) {
1336         protocol_error(c);
1337         return;
1338     }
1339
1340     if (!c->authorized) {
1341         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1342         return;
1343     }
1344
1345     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
1346         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1347         return;
1348     }
1349
1350     pa_memblockq_flush(s->memblockq);
1351     pa_pstream_send_simple_ack(c->pstream, tag);
1352 }
1353
1354 /*** pstream callbacks ***/
1355
1356 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
1357     struct connection *c = userdata;
1358     assert(p && packet && packet->data && c);
1359
1360     if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
1361         fprintf(stderr, "protocol-native: invalid packet.\n");
1362         connection_free(c);
1363     }
1364 }
1365
1366 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
1367     struct connection *c = userdata;
1368     struct output_stream *stream;
1369     assert(p && chunk && userdata);
1370
1371     if (!(stream = pa_idxset_get_by_index(c->output_streams, channel))) {
1372         fprintf(stderr, "protocol-native: client sent block for invalid stream.\n");
1373         connection_free(c);
1374         return;
1375     }
1376
1377     if (stream->type == PLAYBACK_STREAM) {
1378         struct playback_stream *p = (struct playback_stream*) stream;
1379         if (chunk->length >= p->requested_bytes)
1380             p->requested_bytes = 0;
1381         else
1382             p->requested_bytes -= chunk->length;
1383         
1384         pa_memblockq_push_align(p->memblockq, chunk, delta);
1385         assert(p->sink_input);
1386         pa_sink_notify(p->sink_input->sink);
1387         /*fprintf(stderr, "Recieved %u bytes.\n", chunk->length);*/
1388     } else {
1389         struct upload_stream *u = (struct upload_stream*) stream;
1390         size_t l;
1391         assert(u->type == UPLOAD_STREAM);
1392
1393         if (!u->memchunk.memblock) {
1394             if (u->length == chunk->length) {
1395                 u->memchunk = *chunk;
1396                 pa_memblock_ref(u->memchunk.memblock);
1397                 u->length = 0;
1398             } else {
1399                 u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
1400                 u->memchunk.index = u->memchunk.length = 0;
1401             }
1402         }
1403         
1404         assert(u->memchunk.memblock);
1405         
1406         l = u->length; 
1407         if (l > chunk->length)
1408             l = chunk->length;
1409
1410         if (l > 0) {
1411             memcpy(u->memchunk.memblock->data + u->memchunk.index + u->memchunk.length, chunk->memblock->data+chunk->index, l);
1412             u->memchunk.length += l;
1413             u->length -= l;
1414         }
1415     }
1416 }
1417
1418 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
1419     struct connection *c = userdata;
1420     assert(p && c);
1421     connection_free(c);
1422
1423     fprintf(stderr, "protocol-native: connection died.\n");
1424 }
1425
1426
1427 static void pstream_drain_callback(struct pa_pstream *p, void *userdata) {
1428     struct connection *c = userdata;
1429     assert(p && c);
1430
1431     send_memblock(c);
1432 }
1433
1434 /*** client callbacks ***/
1435
1436 static void client_kill_cb(struct pa_client *c) {
1437     assert(c && c->userdata);
1438     connection_free(c->userdata);
1439 }
1440
1441 /*** socket server callbacks ***/
1442
1443 static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) {
1444     struct pa_protocol_native *p = userdata;
1445     struct connection *c;
1446     assert(s && io && p);
1447
1448     c = pa_xmalloc(sizeof(struct connection));
1449     c->authorized = p->public;
1450     c->protocol = p;
1451     assert(p->core);
1452     c->client = pa_client_new(p->core, "NATIVE", "Client");
1453     assert(c->client);
1454     c->client->kill = client_kill_cb;
1455     c->client->userdata = c;
1456     c->client->owner = p->module;
1457     
1458     c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->memblock_stat);
1459     assert(c->pstream);
1460
1461     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
1462     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
1463     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
1464     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
1465
1466     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
1467     assert(c->pdispatch);
1468
1469     c->record_streams = pa_idxset_new(NULL, NULL);
1470     c->output_streams = pa_idxset_new(NULL, NULL);
1471     assert(c->record_streams && c->output_streams);
1472
1473     c->rrobin_index = PA_IDXSET_INVALID;
1474     c->subscription = NULL;
1475
1476     pa_idxset_put(p->connections, c, NULL);
1477 }
1478
1479 /*** module entry points ***/
1480
1481 struct pa_protocol_native* pa_protocol_native_new(struct pa_core *core, struct pa_socket_server *server, struct pa_module *m, struct pa_modargs *ma) {
1482     struct pa_protocol_native *p;
1483     uint32_t public;
1484     assert(core && server && ma);
1485
1486     if (pa_modargs_get_value_u32(ma, "public", &public) < 0) {
1487         fprintf(stderr, __FILE__": public= expects numeric argument.\n");
1488         return NULL;
1489     }
1490     
1491     p = pa_xmalloc(sizeof(struct pa_protocol_native));
1492
1493     if (pa_authkey_load_from_home(pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), p->auth_cookie, sizeof(p->auth_cookie)) < 0) {
1494         pa_xfree(p);
1495         return NULL;
1496     }
1497
1498     p->module = m;
1499     p->public = public;
1500     p->server = server;
1501     p->core = core;
1502     p->connections = pa_idxset_new(NULL, NULL);
1503     assert(p->connections);
1504
1505     pa_socket_server_set_callback(p->server, on_connection, p);
1506     
1507     return p;
1508 }
1509
1510 void pa_protocol_native_free(struct pa_protocol_native *p) {
1511     struct connection *c;
1512     assert(p);
1513
1514     while ((c = pa_idxset_first(p->connections, NULL)))
1515         connection_free(c);
1516     pa_idxset_free(p->connections, NULL, NULL);
1517     pa_socket_server_unref(p->server);
1518     pa_xfree(p);
1519 }