sample cache work
[profile/ivi/pulseaudio-panda.git] / polyp / protocol-native.c
1 /* $Id$ */
2
3 /***
4   This file is part of polypaudio.
5  
6   polypaudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   polypaudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU General Public License
17   along with polypaudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <string.h>
27 #include <stdio.h>
28 #include <assert.h>
29 #include <stdlib.h>
30
31 #include "protocol-native.h"
32 #include "native-common.h"
33 #include "packet.h"
34 #include "client.h"
35 #include "source-output.h"
36 #include "sink-input.h"
37 #include "pstream.h"
38 #include "tagstruct.h"
39 #include "pdispatch.h"
40 #include "pstream-util.h"
41 #include "authkey.h"
42 #include "namereg.h"
43 #include "scache.h"
44
45 struct connection;
46 struct pa_protocol_native;
47
48 struct record_stream {
49     struct connection *connection;
50     uint32_t index;
51     struct pa_source_output *source_output;
52     struct pa_memblockq *memblockq;
53     size_t fragment_size;
54 };
55
56 struct playback_stream {
57     int type;
58     struct connection *connection;
59     uint32_t index;
60     struct pa_sink_input *sink_input;
61     struct pa_memblockq *memblockq;
62     size_t requested_bytes;
63     int drain_request;
64     uint32_t drain_tag;
65 };
66
67 struct upload_stream {
68     int type;
69     struct connection *connection;
70     uint32_t index;
71     struct pa_memchunk memchunk;
72     size_t length;
73     char *name;
74     struct pa_sample_spec sample_spec;
75 };
76
77 struct output_stream {
78     int type;
79 };
80
81 enum {
82     UPLOAD_STREAM,
83     PLAYBACK_STREAM
84 };
85
86 struct connection {
87     int authorized;
88     struct pa_protocol_native *protocol;
89     struct pa_client *client;
90     struct pa_pstream *pstream;
91     struct pa_pdispatch *pdispatch;
92     struct pa_idxset *record_streams, *output_streams;
93     uint32_t rrobin_index;
94 };
95
96 struct pa_protocol_native {
97     struct pa_module *module;
98     int public;
99     struct pa_core *core;
100     struct pa_socket_server *server;
101     struct pa_idxset *connections;
102     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
103 };
104
105 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
106 static void sink_input_drop_cb(struct pa_sink_input *i, size_t length);
107 static void sink_input_kill_cb(struct pa_sink_input *i);
108 static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
109
110 static void request_bytes(struct playback_stream*s);
111
112 static void source_output_kill_cb(struct pa_source_output *o);
113 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk);
114
115 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
116 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
117 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
118 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
119 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
120 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
121 static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
122 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
123 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
124 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
125 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
126 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
127 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
128 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
129
130 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
131     [PA_COMMAND_ERROR] = { NULL },
132     [PA_COMMAND_TIMEOUT] = { NULL },
133     [PA_COMMAND_REPLY] = { NULL },
134     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
135     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_stream },
136     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = { command_drain_playback_stream },
137     [PA_COMMAND_CREATE_RECORD_STREAM] = { command_create_record_stream },
138     [PA_COMMAND_DELETE_RECORD_STREAM] = { command_delete_stream },
139     [PA_COMMAND_AUTH] = { command_auth },
140     [PA_COMMAND_REQUEST] = { NULL },
141     [PA_COMMAND_EXIT] = { command_exit },
142     [PA_COMMAND_SET_NAME] = { command_set_name },
143     [PA_COMMAND_LOOKUP_SINK] = { command_lookup },
144     [PA_COMMAND_LOOKUP_SOURCE] = { command_lookup },
145     [PA_COMMAND_STAT] = { command_stat },
146     [PA_COMMAND_GET_PLAYBACK_LATENCY] = { command_get_playback_latency },
147     [PA_COMMAND_CREATE_UPLOAD_STREAM] = { command_create_upload_stream },
148     [PA_COMMAND_DELETE_UPLOAD_STREAM] = { command_delete_stream },
149     [PA_COMMAND_FINISH_UPLOAD_STREAM] = { command_finish_upload_stream },
150     [PA_COMMAND_PLAY_SAMPLE] = { command_play_sample },
151     [PA_COMMAND_REMOVE_SAMPLE] = { command_remove_sample },
152 };
153
154 /* structure management */
155
156 static struct upload_stream* upload_stream_new(struct connection *c, const struct pa_sample_spec *ss, const char *name, size_t length) {
157     struct upload_stream *s;
158     assert(c && ss && name && length);
159     
160     s = malloc(sizeof(struct upload_stream));
161     assert (s);
162     s->type = UPLOAD_STREAM;
163     s->connection = c;
164     s->sample_spec = *ss;
165     s->name = strdup(name);
166     assert(s->name);
167
168     s->memchunk.memblock = NULL;
169     s->memchunk.index = 0;
170     s->memchunk.length = 0;
171
172     s->length = length;
173     
174     pa_idxset_put(c->output_streams, s, &s->index);
175     return s;
176 }
177
178 static void upload_stream_free(struct upload_stream *o) {
179     assert(o && o->connection);
180
181     pa_idxset_remove_by_data(o->connection->output_streams, o, NULL);
182
183     free(o->name);
184     
185     if (o->memchunk.memblock)
186         pa_memblock_unref(o->memchunk.memblock);
187     
188     free(o);
189 }
190
191 static struct record_stream* record_stream_new(struct connection *c, struct pa_source *source, const struct pa_sample_spec *ss, const char *name, size_t maxlength, size_t fragment_size) {
192     struct record_stream *s;
193     struct pa_source_output *source_output;
194     size_t base;
195     assert(c && source && ss && name && maxlength);
196
197     if (!(source_output = pa_source_output_new(source, name, ss)))
198         return NULL;
199
200     s = malloc(sizeof(struct record_stream));
201     assert(s);
202     s->connection = c;
203     s->source_output = source_output;
204     s->source_output->push = source_output_push_cb;
205     s->source_output->kill = source_output_kill_cb;
206     s->source_output->userdata = s;
207     s->source_output->owner = c->protocol->module;
208     s->source_output->client = c->client;
209
210     s->memblockq = pa_memblockq_new(maxlength, 0, base = pa_frame_size(ss), 0, 0);
211     assert(s->memblockq);
212
213     s->fragment_size = (fragment_size/base)*base;
214     if (!s->fragment_size)
215         s->fragment_size = base;
216
217     pa_idxset_put(c->record_streams, s, &s->index);
218     return s;
219 }
220
221 static void record_stream_free(struct record_stream* r) {
222     assert(r && r->connection);
223
224     pa_idxset_remove_by_data(r->connection->record_streams, r, NULL);
225     pa_source_output_free(r->source_output);
226     pa_memblockq_free(r->memblockq);
227     free(r);
228 }
229
230 static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, const struct pa_sample_spec *ss, const char *name,
231                                                    size_t maxlength,
232                                                    size_t tlength,
233                                                    size_t prebuf,
234                                                    size_t minreq) {
235     struct playback_stream *s;
236     struct pa_sink_input *sink_input;
237     assert(c && sink && ss && name && maxlength);
238
239     if (!(sink_input = pa_sink_input_new(sink, name, ss)))
240         return NULL;
241     
242     s = malloc(sizeof(struct playback_stream));
243     assert (s);
244     s->type = PLAYBACK_STREAM;
245     s->connection = c;
246     s->sink_input = sink_input;
247     
248     s->sink_input->peek = sink_input_peek_cb;
249     s->sink_input->drop = sink_input_drop_cb;
250     s->sink_input->kill = sink_input_kill_cb;
251     s->sink_input->get_latency = sink_input_get_latency_cb;
252     s->sink_input->userdata = s;
253     s->sink_input->owner = c->protocol->module;
254     s->sink_input->client = c->client;
255     
256     s->memblockq = pa_memblockq_new(maxlength, tlength, pa_frame_size(ss), prebuf, minreq);
257     assert(s->memblockq);
258
259     s->requested_bytes = 0;
260     s->drain_request = 0;
261     
262     pa_idxset_put(c->output_streams, s, &s->index);
263     return s;
264 }
265
266 static void playback_stream_free(struct playback_stream* p) {
267     assert(p && p->connection);
268
269     if (p->drain_request)
270         pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERROR_NOENTITY);
271
272     pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
273     pa_sink_input_free(p->sink_input);
274     pa_memblockq_free(p->memblockq);
275     free(p);
276 }
277
278 static void connection_free(struct connection *c) {
279     struct record_stream *r;
280     struct output_stream *o;
281     assert(c && c->protocol);
282
283     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
284     while ((r = pa_idxset_first(c->record_streams, NULL)))
285         record_stream_free(r);
286     pa_idxset_free(c->record_streams, NULL, NULL);
287
288     while ((o = pa_idxset_first(c->output_streams, NULL)))
289         if (o->type == PLAYBACK_STREAM)
290             playback_stream_free((struct playback_stream*) o);
291         else
292             upload_stream_free((struct upload_stream*) o);
293     pa_idxset_free(c->output_streams, NULL, NULL);
294
295     pa_pdispatch_free(c->pdispatch);
296     pa_pstream_free(c->pstream);
297     pa_client_free(c->client);
298     free(c);
299 }
300
301 static void request_bytes(struct playback_stream *s) {
302     struct pa_tagstruct *t;
303     size_t l;
304     assert(s);
305
306     if (!(l = pa_memblockq_missing(s->memblockq)))
307         return;
308
309     if (l <= s->requested_bytes)
310         return;
311
312     l -= s->requested_bytes;
313
314     if (l < pa_memblockq_get_minreq(s->memblockq))
315         return;
316     
317     s->requested_bytes += l;
318
319     t = pa_tagstruct_new(NULL, 0);
320     assert(t);
321     pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
322     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
323     pa_tagstruct_putu32(t, s->index);
324     pa_tagstruct_putu32(t, l);
325     pa_pstream_send_tagstruct(s->connection->pstream, t);
326
327     /*fprintf(stderr, "Requesting %u bytes\n", l);*/
328 }
329
330 static void send_memblock(struct connection *c) {
331     uint32_t start;
332     struct record_stream *r;
333
334     start = PA_IDXSET_INVALID;
335     for (;;) {
336         struct pa_memchunk chunk;
337         
338         if (!(r = pa_idxset_rrobin(c->record_streams, &c->rrobin_index)))
339             return;
340
341         if (start == PA_IDXSET_INVALID)
342             start = c->rrobin_index;
343         else if (start == c->rrobin_index)
344             return;
345
346         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
347             if (chunk.length > r->fragment_size)
348                 chunk.length = r->fragment_size;
349
350             pa_pstream_send_memblock(c->pstream, r->index, 0, &chunk);
351             pa_memblockq_drop(r->memblockq, chunk.length);
352             pa_memblock_unref(chunk.memblock);
353             
354             return;
355         }
356     }
357 }
358
359 static void send_playback_stream_killed(struct playback_stream *p) {
360     struct pa_tagstruct *t;
361     assert(p);
362
363     t = pa_tagstruct_new(NULL, 0);
364     assert(t);
365     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
366     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
367     pa_tagstruct_putu32(t, p->index);
368     pa_pstream_send_tagstruct(p->connection->pstream, t);
369 }
370
371 static void send_record_stream_killed(struct record_stream *r) {
372     struct pa_tagstruct *t;
373     assert(r);
374
375     t = pa_tagstruct_new(NULL, 0);
376     assert(t);
377     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
378     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
379     pa_tagstruct_putu32(t, r->index);
380     pa_pstream_send_tagstruct(r->connection->pstream, t);
381 }
382
383
384 /*** sinkinput callbacks ***/
385
386 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) {
387     struct playback_stream *s;
388     assert(i && i->userdata && chunk);
389     s = i->userdata;
390
391     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
392         return -1;
393
394     return 0;
395 }
396
397 static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
398     struct playback_stream *s;
399     assert(i && i->userdata && length);
400     s = i->userdata;
401
402     pa_memblockq_drop(s->memblockq, length);
403     request_bytes(s);
404
405     if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
406         pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag);
407         s->drain_request = 0;
408     }
409 }
410
411 static void sink_input_kill_cb(struct pa_sink_input *i) {
412     assert(i && i->userdata);
413     send_playback_stream_killed((struct playback_stream *) i->userdata);
414     playback_stream_free((struct playback_stream *) i->userdata);
415 }
416
417 static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i) {
418     struct playback_stream *s;
419     assert(i && i->userdata);
420     s = i->userdata;
421
422     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
423 }
424
425 /*** source_output callbacks ***/
426
427 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk) {
428     struct record_stream *s;
429     assert(o && o->userdata && chunk);
430     s = o->userdata;
431     
432     pa_memblockq_push(s->memblockq, chunk, 0);
433     if (!pa_pstream_is_pending(s->connection->pstream))
434         send_memblock(s->connection);
435 }
436
437 static void source_output_kill_cb(struct pa_source_output *o) {
438     assert(o && o->userdata);
439     send_record_stream_killed((struct record_stream *) o->userdata);
440     record_stream_free((struct record_stream *) o->userdata);
441 }
442
443 /*** pdispatch callbacks ***/
444
445 static void protocol_error(struct connection *c) {
446     fprintf(stderr, __FILE__": protocol error, kicking client\n");
447     connection_free(c);
448 }
449
450 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
451     struct connection *c = userdata;
452     struct playback_stream *s;
453     size_t maxlength, tlength, prebuf, minreq;
454     uint32_t sink_index;
455     const char *name, *sink_name;
456     struct pa_sample_spec ss;
457     struct pa_tagstruct *reply;
458     struct pa_sink *sink;
459     assert(c && t && c->protocol && c->protocol->core);
460     
461     if (pa_tagstruct_gets(t, &name) < 0 ||
462         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
463         pa_tagstruct_getu32(t, &sink_index) < 0 ||
464         pa_tagstruct_gets(t, &sink_name) < 0 ||
465         pa_tagstruct_getu32(t, &maxlength) < 0 ||
466         pa_tagstruct_getu32(t, &tlength) < 0 ||
467         pa_tagstruct_getu32(t, &prebuf) < 0 ||
468         pa_tagstruct_getu32(t, &minreq) < 0 ||
469         !pa_tagstruct_eof(t)) {
470         protocol_error(c);
471         return;
472     }
473
474     if (!c->authorized) {
475         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
476         return;
477     }
478
479     if (!*sink_name || sink_index == (uint32_t) -1)
480         sink = pa_sink_get_default(c->protocol->core);
481     else if (sink_index != (uint32_t) -1)
482         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
483     else
484         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
485
486     if (!sink) {
487         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
488         return;
489     }
490     
491     if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, tlength, prebuf, minreq))) {
492         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
493         return;
494     }
495     
496     reply = pa_tagstruct_new(NULL, 0);
497     assert(reply);
498     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
499     pa_tagstruct_putu32(reply, tag);
500     pa_tagstruct_putu32(reply, s->index);
501     assert(s->sink_input);
502     pa_tagstruct_putu32(reply, s->sink_input->index);
503     pa_pstream_send_tagstruct(c->pstream, reply);
504     request_bytes(s);
505 }
506
507 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
508     struct connection *c = userdata;
509     uint32_t channel;
510     assert(c && t);
511     
512     if (pa_tagstruct_getu32(t, &channel) < 0 ||
513         !pa_tagstruct_eof(t)) {
514         protocol_error(c);
515         return;
516     }
517
518     if (!c->authorized) {
519         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
520         return;
521     }
522
523     if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
524         struct playback_stream *s;
525         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
526             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
527             return;
528         }
529
530         playback_stream_free(s);
531     } else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
532         struct record_stream *s;
533         if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
534             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
535             return;
536         }
537
538         record_stream_free(s);
539     } else {
540         struct upload_stream *s;
541         assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
542         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
543             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
544             return;
545         }
546
547         upload_stream_free(s);
548     }
549             
550     pa_pstream_send_simple_ack(c->pstream, tag);
551 }
552
553 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
554     struct connection *c = userdata;
555     struct record_stream *s;
556     size_t maxlength, fragment_size;
557     uint32_t source_index;
558     const char *name, *source_name;
559     struct pa_sample_spec ss;
560     struct pa_tagstruct *reply;
561     struct pa_source *source;
562     assert(c && t && c->protocol && c->protocol->core);
563     
564     if (pa_tagstruct_gets(t, &name) < 0 ||
565         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
566         pa_tagstruct_getu32(t, &source_index) < 0 ||
567         pa_tagstruct_gets(t, &source_name) < 0 ||
568         pa_tagstruct_getu32(t, &maxlength) < 0 ||
569         pa_tagstruct_getu32(t, &fragment_size) < 0 ||
570         !pa_tagstruct_eof(t)) {
571         protocol_error(c);
572         return;
573     }
574
575     if (!c->authorized) {
576         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
577         return;
578     }
579
580     if (!*source_name || source_index == (uint32_t) -1)
581         source = pa_source_get_default(c->protocol->core);
582     else if (source_index != (uint32_t) -1)
583         source = pa_idxset_get_by_index(c->protocol->core->sources, source_index);
584     else
585         source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE);
586
587     if (!source) {
588         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
589         return;
590     }
591     
592     if (!(s = record_stream_new(c, source, &ss, name, maxlength, fragment_size))) {
593         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
594         return;
595     }
596     
597     reply = pa_tagstruct_new(NULL, 0);
598     assert(reply);
599     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
600     pa_tagstruct_putu32(reply, tag);
601     pa_tagstruct_putu32(reply, s->index);
602     assert(s->source_output);
603     pa_tagstruct_putu32(reply, s->source_output->index);
604     pa_pstream_send_tagstruct(c->pstream, reply);
605 }
606
607 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
608     struct connection *c = userdata;
609     assert(c && t);
610     
611     if (!pa_tagstruct_eof(t)) {
612         protocol_error(c);
613         return;
614     }
615
616     if (!c->authorized) {
617         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
618         return;
619     }
620     
621     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
622     c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
623     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
624     return;
625 }
626
627 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
628     struct connection *c = userdata;
629     const void*cookie;
630     assert(c && t);
631
632     if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
633         !pa_tagstruct_eof(t)) {
634         protocol_error(c);
635         return;
636     }
637         
638     if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
639         fprintf(stderr, "protocol-native.c: Denied access to client with invalid authorization key.\n");
640         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
641         return;
642     }
643
644     c->authorized = 1;
645     pa_pstream_send_simple_ack(c->pstream, tag);
646     return;
647 }
648
649 static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
650     struct connection *c = userdata;
651     const char *name;
652     assert(c && t);
653
654     if (pa_tagstruct_gets(t, &name) < 0 ||
655         !pa_tagstruct_eof(t)) {
656         protocol_error(c);
657         return;
658     }
659
660     pa_client_rename(c->client, name);
661     pa_pstream_send_simple_ack(c->pstream, tag);
662     return;
663 }
664
665 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
666     struct connection *c = userdata;
667     const char *name;
668     uint32_t index = PA_IDXSET_INVALID;
669     assert(c && t);
670
671     if (pa_tagstruct_gets(t, &name) < 0 ||
672         !pa_tagstruct_eof(t)) {
673         protocol_error(c);
674         return;
675     }
676
677     if (!c->authorized) {
678         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
679         return;
680     }
681
682     if (command == PA_COMMAND_LOOKUP_SINK) {
683         struct pa_sink *sink;
684         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
685             index = sink->index;
686     } else {
687         struct pa_source *source;
688         assert(command == PA_COMMAND_LOOKUP_SOURCE);
689         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
690             index = source->index;
691     }
692
693     if (index == PA_IDXSET_INVALID)
694         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
695     else {
696         struct pa_tagstruct *reply;
697         reply = pa_tagstruct_new(NULL, 0);
698         assert(reply);
699         pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
700         pa_tagstruct_putu32(reply, tag);
701         pa_tagstruct_putu32(reply, index);
702         pa_pstream_send_tagstruct(c->pstream, reply);
703     }
704 }
705
706 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
707     struct connection *c = userdata;
708     uint32_t index;
709     struct playback_stream *s;
710     assert(c && t);
711
712     if (pa_tagstruct_getu32(t, &index) < 0 ||
713         !pa_tagstruct_eof(t)) {
714         protocol_error(c);
715         return;
716     }
717
718     if (!c->authorized) {
719         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
720         return;
721     }
722
723     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
724         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
725         return;
726     }
727
728     s->drain_request = 0;
729     
730     if (!pa_memblockq_is_readable(s->memblockq))
731         pa_pstream_send_simple_ack(c->pstream, tag);
732     else {
733         s->drain_request = 1;
734         s->drain_tag = tag;
735     }
736
737
738 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
739     struct connection *c = userdata;
740     assert(c && t);
741     struct pa_tagstruct *reply;
742
743     if (!pa_tagstruct_eof(t)) {
744         protocol_error(c);
745         return;
746     }
747
748     if (!c->authorized) {
749         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
750         return;
751     }
752
753     reply = pa_tagstruct_new(NULL, 0);
754     assert(reply);
755     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
756     pa_tagstruct_putu32(reply, tag);
757     pa_tagstruct_putu32(reply, pa_memblock_get_count());
758     pa_tagstruct_putu32(reply, pa_memblock_get_total());
759     pa_pstream_send_tagstruct(c->pstream, reply);
760 }
761
762 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
763     struct connection *c = userdata;
764     assert(c && t);
765     struct pa_tagstruct *reply;
766     struct playback_stream *s;
767     uint32_t index, latency;
768
769     if (pa_tagstruct_getu32(t, &index) < 0 ||
770         !pa_tagstruct_eof(t)) {
771         protocol_error(c);
772         return;
773     }
774
775     if (!c->authorized) {
776         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
777         return;
778     }
779
780     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
781         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
782         return;
783     }
784
785     latency = pa_sink_input_get_latency(s->sink_input);
786     reply = pa_tagstruct_new(NULL, 0);
787     assert(reply);
788     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
789     pa_tagstruct_putu32(reply, tag);
790     pa_tagstruct_putu32(reply, latency);
791     pa_pstream_send_tagstruct(c->pstream, reply);
792 }
793
794 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
795     struct connection *c = userdata;
796     struct upload_stream *s;
797     size_t length;
798     const char *name;
799     struct pa_sample_spec ss;
800     struct pa_tagstruct *reply;
801     assert(c && t && c->protocol && c->protocol->core);
802     
803     if (pa_tagstruct_gets(t, &name) < 0 ||
804         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
805         pa_tagstruct_getu32(t, &length) < 0 ||
806         !pa_tagstruct_eof(t)) {
807         protocol_error(c);
808         return;
809     }
810
811     if (!c->authorized) {
812         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
813         return;
814     }
815
816     if ((length % pa_frame_size(&ss)) != 0 || length <= 0 || !*name) {
817         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
818         return;
819     }
820     
821     if (!(s = upload_stream_new(c, &ss, name, length))) {
822         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
823         return;
824     }
825     
826     reply = pa_tagstruct_new(NULL, 0);
827     assert(reply);
828     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
829     pa_tagstruct_putu32(reply, tag);
830     pa_tagstruct_putu32(reply, s->index);
831     pa_pstream_send_tagstruct(c->pstream, reply);
832     
833     reply = pa_tagstruct_new(NULL, 0);
834     assert(reply);
835     pa_tagstruct_putu32(reply, PA_COMMAND_REQUEST);
836     pa_tagstruct_putu32(reply, (uint32_t) -1); /* tag */
837     pa_tagstruct_putu32(reply, s->index);
838     pa_tagstruct_putu32(reply, length);
839     pa_pstream_send_tagstruct(c->pstream, reply);
840 }
841
842 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
843     struct connection *c = userdata;
844     uint32_t channel;
845     struct upload_stream *s;
846     uint32_t index;
847     assert(c && t);
848     
849     if (pa_tagstruct_getu32(t, &channel) < 0 ||
850         !pa_tagstruct_eof(t)) {
851         protocol_error(c);
852         return;
853     }
854
855     if (!c->authorized) {
856         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
857         return;
858     }
859
860     if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
861         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
862         return;
863     }
864
865     pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->memchunk, &index);
866     pa_pstream_send_simple_ack(c->pstream, tag);
867     upload_stream_free(s);
868 }
869
870 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
871     struct connection *c = userdata;
872     uint32_t sink_index, volume;
873     struct pa_sink *sink;
874     const char *name, *sink_name;
875     assert(c && t);
876
877     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
878         pa_tagstruct_gets(t, &sink_name) < 0 ||
879         pa_tagstruct_getu32(t, &volume) < 0 ||
880         pa_tagstruct_gets(t, &name) < 0 ||
881         !pa_tagstruct_eof(t)) {
882         protocol_error(c);
883         return;
884     }
885     
886     if (!c->authorized) {
887         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
888         return;
889     }
890
891     if (!*sink_name && sink_index == (uint32_t) -1)
892         sink = pa_sink_get_default(c->protocol->core);
893     else if (sink_index != (uint32_t) -1)
894         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
895     else
896         sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK);
897
898     if (!sink) {
899         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
900         return;
901     }
902
903     if (pa_scache_play_item(c->protocol->core, name, sink, volume) < 0) {
904         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
905         return;
906     }
907
908     pa_pstream_send_simple_ack(c->pstream, tag);
909 }
910
911 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
912     struct connection *c = userdata;
913     const char *name;
914     assert(c && t);
915
916     if (pa_tagstruct_gets(t, &name) < 0 ||
917         !pa_tagstruct_eof(t)) {
918         protocol_error(c);
919         return;
920     }
921
922     if (!c->authorized) {
923         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
924         return;
925     }
926
927     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
928         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
929         return;
930     }
931
932     pa_pstream_send_simple_ack(c->pstream, tag);
933 }
934
935 /*** pstream callbacks ***/
936
937 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
938     struct connection *c = userdata;
939     assert(p && packet && packet->data && c);
940
941     if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
942         fprintf(stderr, "protocol-native: invalid packet.\n");
943         connection_free(c);
944     }
945 }
946
947 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
948     struct connection *c = userdata;
949     struct output_stream *stream;
950     assert(p && chunk && userdata);
951
952     if (!(stream = pa_idxset_get_by_index(c->output_streams, channel))) {
953         fprintf(stderr, "protocol-native: client sent block for invalid stream.\n");
954         connection_free(c);
955         return;
956     }
957
958     if (stream->type == PLAYBACK_STREAM) {
959         struct playback_stream *p = (struct playback_stream*) stream;
960         if (chunk->length >= p->requested_bytes)
961             p->requested_bytes = 0;
962         else
963             p->requested_bytes -= chunk->length;
964         
965         pa_memblockq_push_align(p->memblockq, chunk, delta);
966         assert(p->sink_input);
967         pa_sink_notify(p->sink_input->sink);
968         /*fprintf(stderr, "Recieved %u bytes.\n", chunk->length);*/
969     } else {
970         struct upload_stream *u = (struct upload_stream*) stream;
971         size_t l;
972         assert(u->type == UPLOAD_STREAM);
973
974         if (!u->memchunk.memblock) {
975             if (u->length == chunk->length) {
976                 u->memchunk = *chunk;
977                 pa_memblock_ref(u->memchunk.memblock);
978                 u->length = 0;
979                 fprintf(stderr, "COPY\n");
980             } else {
981                 u->memchunk.memblock = pa_memblock_new(u->length);
982                 u->memchunk.index = u->memchunk.length = 0;
983             }
984         }
985         
986         assert(u->memchunk.memblock);
987         
988         l = u->length; 
989         if (l > chunk->length)
990             l = chunk->length;
991
992         if (l > 0) {
993             memcpy(u->memchunk.memblock->data + u->memchunk.index + u->memchunk.length, chunk->memblock->data+chunk->index, l);
994             u->memchunk.length += l;
995             u->length -= l;
996         }
997     }
998 }
999
1000 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
1001     struct connection *c = userdata;
1002     assert(p && c);
1003     connection_free(c);
1004
1005     fprintf(stderr, "protocol-native: connection died.\n");
1006 }
1007
1008
1009 static void pstream_drain_callback(struct pa_pstream *p, void *userdata) {
1010     struct connection *c = userdata;
1011     assert(p && c);
1012
1013     send_memblock(c);
1014 }
1015
1016 /*** client callbacks ***/
1017
1018 static void client_kill_cb(struct pa_client *c) {
1019     assert(c && c->userdata);
1020     connection_free(c->userdata);
1021 }
1022
1023 /*** socket server callbacks ***/
1024
1025 static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) {
1026     struct pa_protocol_native *p = userdata;
1027     struct connection *c;
1028     assert(s && io && p);
1029
1030     c = malloc(sizeof(struct connection));
1031     assert(c);
1032     c->authorized = p->public;
1033     c->protocol = p;
1034     assert(p->core);
1035     c->client = pa_client_new(p->core, "NATIVE", "Client");
1036     assert(c->client);
1037     c->client->kill = client_kill_cb;
1038     c->client->userdata = c;
1039     c->client->owner = p->module;
1040     
1041     c->pstream = pa_pstream_new(p->core->mainloop, io);
1042     assert(c->pstream);
1043
1044     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
1045     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
1046     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
1047     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
1048
1049     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
1050     assert(c->pdispatch);
1051
1052     c->record_streams = pa_idxset_new(NULL, NULL);
1053     c->output_streams = pa_idxset_new(NULL, NULL);
1054     assert(c->record_streams && c->output_streams);
1055
1056     c->rrobin_index = PA_IDXSET_INVALID;
1057
1058     pa_idxset_put(p->connections, c, NULL);
1059 }
1060
1061 /*** module entry points ***/
1062
1063 struct pa_protocol_native* pa_protocol_native_new(struct pa_core *core, struct pa_socket_server *server, struct pa_module *m, struct pa_modargs *ma) {
1064     struct pa_protocol_native *p;
1065     uint32_t public;
1066     assert(core && server && ma);
1067
1068     if (pa_modargs_get_value_u32(ma, "public", &public) < 0) {
1069         fprintf(stderr, __FILE__": public= expects numeric argument.\n");
1070         return NULL;
1071     }
1072     
1073     p = malloc(sizeof(struct pa_protocol_native));
1074     assert(p);
1075
1076     if (pa_authkey_load_from_home(pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), p->auth_cookie, sizeof(p->auth_cookie)) < 0) {
1077         free(p);
1078         return NULL;
1079     }
1080
1081     p->module = m;
1082     p->public = public;
1083     p->server = server;
1084     p->core = core;
1085     p->connections = pa_idxset_new(NULL, NULL);
1086     assert(p->connections);
1087
1088     pa_socket_server_set_callback(p->server, on_connection, p);
1089     
1090     return p;
1091 }
1092
1093 void pa_protocol_native_free(struct pa_protocol_native *p) {
1094     struct connection *c;
1095     assert(p);
1096
1097     while ((c = pa_idxset_first(p->connections, NULL)))
1098         connection_free(c);
1099     pa_idxset_free(p->connections, NULL, NULL);
1100     pa_socket_server_free(p->server);
1101     free(p);
1102 }