proper ref counting for more objects
[profile/ivi/pulseaudio-panda.git] / polyp / protocol-native.c
1 /* $Id$ */
2
3 /***
4   This file is part of polypaudio.
5  
6   polypaudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   polypaudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU General Public License
17   along with polypaudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <string.h>
27 #include <stdio.h>
28 #include <assert.h>
29 #include <stdlib.h>
30
31 #include "protocol-native.h"
32 #include "native-common.h"
33 #include "packet.h"
34 #include "client.h"
35 #include "source-output.h"
36 #include "sink-input.h"
37 #include "pstream.h"
38 #include "tagstruct.h"
39 #include "pdispatch.h"
40 #include "pstream-util.h"
41 #include "authkey.h"
42 #include "namereg.h"
43 #include "scache.h"
44 #include "xmalloc.h"
45 #include "util.h"
46 #include "subscribe.h"
47
48 struct connection;
49 struct pa_protocol_native;
50
51 struct record_stream {
52     struct connection *connection;
53     uint32_t index;
54     struct pa_source_output *source_output;
55     struct pa_memblockq *memblockq;
56     size_t fragment_size;
57 };
58
59 struct playback_stream {
60     int type;
61     struct connection *connection;
62     uint32_t index;
63     struct pa_sink_input *sink_input;
64     struct pa_memblockq *memblockq;
65     size_t requested_bytes;
66     int drain_request;
67     uint32_t drain_tag;
68 };
69
70 struct upload_stream {
71     int type;
72     struct connection *connection;
73     uint32_t index;
74     struct pa_memchunk memchunk;
75     size_t length;
76     char *name;
77     struct pa_sample_spec sample_spec;
78 };
79
80 struct output_stream {
81     int type;
82 };
83
84 enum {
85     UPLOAD_STREAM,
86     PLAYBACK_STREAM
87 };
88
89 struct connection {
90     int authorized;
91     struct pa_protocol_native *protocol;
92     struct pa_client *client;
93     struct pa_pstream *pstream;
94     struct pa_pdispatch *pdispatch;
95     struct pa_idxset *record_streams, *output_streams;
96     uint32_t rrobin_index;
97     struct pa_subscription *subscription;
98 };
99
100 struct pa_protocol_native {
101     struct pa_module *module;
102     int public;
103     struct pa_core *core;
104     struct pa_socket_server *server;
105     struct pa_idxset *connections;
106     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
107 };
108
109 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
110 static void sink_input_drop_cb(struct pa_sink_input *i, size_t length);
111 static void sink_input_kill_cb(struct pa_sink_input *i);
112 static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
113
114 static void request_bytes(struct playback_stream*s);
115
116 static void source_output_kill_cb(struct pa_source_output *o);
117 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk);
118
119 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
120 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
121 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
122 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
123 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
124 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
125 static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
126 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
127 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
128 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
129 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
130 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
131 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
132 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
133 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
134 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
135 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
136 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
137
138 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
139     [PA_COMMAND_ERROR] = { NULL },
140     [PA_COMMAND_TIMEOUT] = { NULL },
141     [PA_COMMAND_REPLY] = { NULL },
142     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
143     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_stream },
144     [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = { command_drain_playback_stream },
145     [PA_COMMAND_CREATE_RECORD_STREAM] = { command_create_record_stream },
146     [PA_COMMAND_DELETE_RECORD_STREAM] = { command_delete_stream },
147     [PA_COMMAND_AUTH] = { command_auth },
148     [PA_COMMAND_REQUEST] = { NULL },
149     [PA_COMMAND_EXIT] = { command_exit },
150     [PA_COMMAND_SET_NAME] = { command_set_name },
151     [PA_COMMAND_LOOKUP_SINK] = { command_lookup },
152     [PA_COMMAND_LOOKUP_SOURCE] = { command_lookup },
153     [PA_COMMAND_STAT] = { command_stat },
154     [PA_COMMAND_GET_PLAYBACK_LATENCY] = { command_get_playback_latency },
155     [PA_COMMAND_CREATE_UPLOAD_STREAM] = { command_create_upload_stream },
156     [PA_COMMAND_DELETE_UPLOAD_STREAM] = { command_delete_stream },
157     [PA_COMMAND_FINISH_UPLOAD_STREAM] = { command_finish_upload_stream },
158     [PA_COMMAND_PLAY_SAMPLE] = { command_play_sample },
159     [PA_COMMAND_REMOVE_SAMPLE] = { command_remove_sample },
160     [PA_COMMAND_GET_SINK_INFO] = { command_get_info },
161     [PA_COMMAND_GET_SOURCE_INFO] = { command_get_info },
162     [PA_COMMAND_GET_CLIENT_INFO] = { command_get_info },
163     [PA_COMMAND_GET_MODULE_INFO] = { command_get_info },
164     [PA_COMMAND_GET_SINK_INFO_LIST] = { command_get_info_list },
165     [PA_COMMAND_GET_SOURCE_INFO_LIST] = { command_get_info_list },
166     [PA_COMMAND_GET_MODULE_INFO_LIST] = { command_get_info_list },
167     [PA_COMMAND_GET_CLIENT_INFO_LIST] = { command_get_info_list },
168     [PA_COMMAND_GET_SERVER_INFO] = { command_get_server_info },
169     [PA_COMMAND_SUBSCRIBE] = { command_subscribe },
170 };
171
172 /* structure management */
173
174 static struct upload_stream* upload_stream_new(struct connection *c, const struct pa_sample_spec *ss, const char *name, size_t length) {
175     struct upload_stream *s;
176     assert(c && ss && name && length);
177     
178     s = pa_xmalloc(sizeof(struct upload_stream));
179     s->type = UPLOAD_STREAM;
180     s->connection = c;
181     s->sample_spec = *ss;
182     s->name = pa_xstrdup(name);
183
184     s->memchunk.memblock = NULL;
185     s->memchunk.index = 0;
186     s->memchunk.length = 0;
187
188     s->length = length;
189     
190     pa_idxset_put(c->output_streams, s, &s->index);
191     return s;
192 }
193
194 static void upload_stream_free(struct upload_stream *o) {
195     assert(o && o->connection);
196
197     pa_idxset_remove_by_data(o->connection->output_streams, o, NULL);
198
199     pa_xfree(o->name);
200     
201     if (o->memchunk.memblock)
202         pa_memblock_unref(o->memchunk.memblock);
203     
204     pa_xfree(o);
205 }
206
207 static struct record_stream* record_stream_new(struct connection *c, struct pa_source *source, const struct pa_sample_spec *ss, const char *name, size_t maxlength, size_t fragment_size) {
208     struct record_stream *s;
209     struct pa_source_output *source_output;
210     size_t base;
211     assert(c && source && ss && name && maxlength);
212
213     if (!(source_output = pa_source_output_new(source, name, ss)))
214         return NULL;
215
216     s = pa_xmalloc(sizeof(struct record_stream));
217     s->connection = c;
218     s->source_output = source_output;
219     s->source_output->push = source_output_push_cb;
220     s->source_output->kill = source_output_kill_cb;
221     s->source_output->userdata = s;
222     s->source_output->owner = c->protocol->module;
223     s->source_output->client = c->client;
224
225     s->memblockq = pa_memblockq_new(maxlength, 0, base = pa_frame_size(ss), 0, 0);
226     assert(s->memblockq);
227
228     s->fragment_size = (fragment_size/base)*base;
229     if (!s->fragment_size)
230         s->fragment_size = base;
231
232     pa_idxset_put(c->record_streams, s, &s->index);
233     return s;
234 }
235
236 static void record_stream_free(struct record_stream* r) {
237     assert(r && r->connection);
238
239     pa_idxset_remove_by_data(r->connection->record_streams, r, NULL);
240     pa_source_output_free(r->source_output);
241     pa_memblockq_free(r->memblockq);
242     pa_xfree(r);
243 }
244
245 static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, const struct pa_sample_spec *ss, const char *name,
246                                                    size_t maxlength,
247                                                    size_t tlength,
248                                                    size_t prebuf,
249                                                    size_t minreq) {
250     struct playback_stream *s;
251     struct pa_sink_input *sink_input;
252     assert(c && sink && ss && name && maxlength);
253
254     if (!(sink_input = pa_sink_input_new(sink, name, ss)))
255         return NULL;
256     
257     s = pa_xmalloc(sizeof(struct playback_stream));
258     s->type = PLAYBACK_STREAM;
259     s->connection = c;
260     s->sink_input = sink_input;
261     
262     s->sink_input->peek = sink_input_peek_cb;
263     s->sink_input->drop = sink_input_drop_cb;
264     s->sink_input->kill = sink_input_kill_cb;
265     s->sink_input->get_latency = sink_input_get_latency_cb;
266     s->sink_input->userdata = s;
267     s->sink_input->owner = c->protocol->module;
268     s->sink_input->client = c->client;
269     
270     s->memblockq = pa_memblockq_new(maxlength, tlength, pa_frame_size(ss), prebuf, minreq);
271     assert(s->memblockq);
272
273     s->requested_bytes = 0;
274     s->drain_request = 0;
275     
276     pa_idxset_put(c->output_streams, s, &s->index);
277     return s;
278 }
279
280 static void playback_stream_free(struct playback_stream* p) {
281     assert(p && p->connection);
282
283     if (p->drain_request)
284         pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERROR_NOENTITY);
285
286     pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
287     pa_sink_input_free(p->sink_input);
288     pa_memblockq_free(p->memblockq);
289     pa_xfree(p);
290 }
291
292 static void connection_free(struct connection *c) {
293     struct record_stream *r;
294     struct output_stream *o;
295     assert(c && c->protocol);
296
297     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
298     while ((r = pa_idxset_first(c->record_streams, NULL)))
299         record_stream_free(r);
300     pa_idxset_free(c->record_streams, NULL, NULL);
301
302     while ((o = pa_idxset_first(c->output_streams, NULL)))
303         if (o->type == PLAYBACK_STREAM)
304             playback_stream_free((struct playback_stream*) o);
305         else
306             upload_stream_free((struct upload_stream*) o);
307     pa_idxset_free(c->output_streams, NULL, NULL);
308
309     pa_pdispatch_unref(c->pdispatch);
310     pa_pstream_close(c->pstream);
311     pa_pstream_unref(c->pstream);
312     pa_client_free(c->client);
313
314     if (c->subscription)
315         pa_subscription_free(c->subscription);
316     
317     pa_xfree(c);
318 }
319
320 static void request_bytes(struct playback_stream *s) {
321     struct pa_tagstruct *t;
322     size_t l;
323     assert(s);
324
325     if (!(l = pa_memblockq_missing(s->memblockq)))
326         return;
327
328     if (l <= s->requested_bytes)
329         return;
330
331     l -= s->requested_bytes;
332
333     if (l < pa_memblockq_get_minreq(s->memblockq))
334         return;
335     
336     s->requested_bytes += l;
337
338     t = pa_tagstruct_new(NULL, 0);
339     assert(t);
340     pa_tagstruct_putu32(t, PA_COMMAND_REQUEST);
341     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
342     pa_tagstruct_putu32(t, s->index);
343     pa_tagstruct_putu32(t, l);
344     pa_pstream_send_tagstruct(s->connection->pstream, t);
345
346     /*fprintf(stderr, "Requesting %u bytes\n", l);*/
347 }
348
349 static void send_memblock(struct connection *c) {
350     uint32_t start;
351     struct record_stream *r;
352
353     start = PA_IDXSET_INVALID;
354     for (;;) {
355         struct pa_memchunk chunk;
356         
357         if (!(r = pa_idxset_rrobin(c->record_streams, &c->rrobin_index)))
358             return;
359
360         if (start == PA_IDXSET_INVALID)
361             start = c->rrobin_index;
362         else if (start == c->rrobin_index)
363             return;
364
365         if (pa_memblockq_peek(r->memblockq,  &chunk) >= 0) {
366             if (chunk.length > r->fragment_size)
367                 chunk.length = r->fragment_size;
368
369             pa_pstream_send_memblock(c->pstream, r->index, 0, &chunk);
370             pa_memblockq_drop(r->memblockq, chunk.length);
371             pa_memblock_unref(chunk.memblock);
372             
373             return;
374         }
375     }
376 }
377
378 static void send_playback_stream_killed(struct playback_stream *p) {
379     struct pa_tagstruct *t;
380     assert(p);
381
382     t = pa_tagstruct_new(NULL, 0);
383     assert(t);
384     pa_tagstruct_putu32(t, PA_COMMAND_PLAYBACK_STREAM_KILLED);
385     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
386     pa_tagstruct_putu32(t, p->index);
387     pa_pstream_send_tagstruct(p->connection->pstream, t);
388 }
389
390 static void send_record_stream_killed(struct record_stream *r) {
391     struct pa_tagstruct *t;
392     assert(r);
393
394     t = pa_tagstruct_new(NULL, 0);
395     assert(t);
396     pa_tagstruct_putu32(t, PA_COMMAND_RECORD_STREAM_KILLED);
397     pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
398     pa_tagstruct_putu32(t, r->index);
399     pa_pstream_send_tagstruct(r->connection->pstream, t);
400 }
401
402
403 /*** sinkinput callbacks ***/
404
405 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) {
406     struct playback_stream *s;
407     assert(i && i->userdata && chunk);
408     s = i->userdata;
409
410     if (pa_memblockq_peek(s->memblockq, chunk) < 0)
411         return -1;
412
413     return 0;
414 }
415
416 static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
417     struct playback_stream *s;
418     assert(i && i->userdata && length);
419     s = i->userdata;
420
421     pa_memblockq_drop(s->memblockq, length);
422     request_bytes(s);
423
424     if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
425         pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag);
426         s->drain_request = 0;
427     }
428 }
429
430 static void sink_input_kill_cb(struct pa_sink_input *i) {
431     assert(i && i->userdata);
432     send_playback_stream_killed((struct playback_stream *) i->userdata);
433     playback_stream_free((struct playback_stream *) i->userdata);
434 }
435
436 static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i) {
437     struct playback_stream *s;
438     assert(i && i->userdata);
439     s = i->userdata;
440
441     return pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
442 }
443
444 /*** source_output callbacks ***/
445
446 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk) {
447     struct record_stream *s;
448     assert(o && o->userdata && chunk);
449     s = o->userdata;
450     
451     pa_memblockq_push(s->memblockq, chunk, 0);
452     if (!pa_pstream_is_pending(s->connection->pstream))
453         send_memblock(s->connection);
454 }
455
456 static void source_output_kill_cb(struct pa_source_output *o) {
457     assert(o && o->userdata);
458     send_record_stream_killed((struct record_stream *) o->userdata);
459     record_stream_free((struct record_stream *) o->userdata);
460 }
461
462 /*** pdispatch callbacks ***/
463
464 static void protocol_error(struct connection *c) {
465     fprintf(stderr, __FILE__": protocol error, kicking client\n");
466     connection_free(c);
467 }
468
469 static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
470     struct connection *c = userdata;
471     struct playback_stream *s;
472     size_t maxlength, tlength, prebuf, minreq;
473     uint32_t sink_index;
474     const char *name, *sink_name;
475     struct pa_sample_spec ss;
476     struct pa_tagstruct *reply;
477     struct pa_sink *sink;
478     assert(c && t && c->protocol && c->protocol->core);
479     
480     if (pa_tagstruct_gets(t, &name) < 0 ||
481         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
482         pa_tagstruct_getu32(t, &sink_index) < 0 ||
483         pa_tagstruct_gets(t, &sink_name) < 0 ||
484         pa_tagstruct_getu32(t, &maxlength) < 0 ||
485         pa_tagstruct_getu32(t, &tlength) < 0 ||
486         pa_tagstruct_getu32(t, &prebuf) < 0 ||
487         pa_tagstruct_getu32(t, &minreq) < 0 ||
488         !pa_tagstruct_eof(t)) {
489         protocol_error(c);
490         return;
491     }
492
493     if (!c->authorized) {
494         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
495         return;
496     }
497
498     if (sink_index != (uint32_t) -1)
499         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
500     else
501         sink = pa_namereg_get(c->protocol->core, *sink_name ? sink_name : NULL, PA_NAMEREG_SINK, 1);
502
503     if (!sink) {
504         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
505         return;
506     }
507     
508     if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, tlength, prebuf, minreq))) {
509         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
510         return;
511     }
512     
513     reply = pa_tagstruct_new(NULL, 0);
514     assert(reply);
515     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
516     pa_tagstruct_putu32(reply, tag);
517     pa_tagstruct_putu32(reply, s->index);
518     assert(s->sink_input);
519     pa_tagstruct_putu32(reply, s->sink_input->index);
520     pa_pstream_send_tagstruct(c->pstream, reply);
521     request_bytes(s);
522 }
523
524 static void command_delete_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
525     struct connection *c = userdata;
526     uint32_t channel;
527     assert(c && t);
528     
529     if (pa_tagstruct_getu32(t, &channel) < 0 ||
530         !pa_tagstruct_eof(t)) {
531         protocol_error(c);
532         return;
533     }
534
535     if (!c->authorized) {
536         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
537         return;
538     }
539
540     if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
541         struct playback_stream *s;
542         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
543             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
544             return;
545         }
546
547         playback_stream_free(s);
548     } else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
549         struct record_stream *s;
550         if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
551             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
552             return;
553         }
554
555         record_stream_free(s);
556     } else {
557         struct upload_stream *s;
558         assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
559         if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
560             pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
561             return;
562         }
563
564         upload_stream_free(s);
565     }
566             
567     pa_pstream_send_simple_ack(c->pstream, tag);
568 }
569
570 static void command_create_record_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
571     struct connection *c = userdata;
572     struct record_stream *s;
573     size_t maxlength, fragment_size;
574     uint32_t source_index;
575     const char *name, *source_name;
576     struct pa_sample_spec ss;
577     struct pa_tagstruct *reply;
578     struct pa_source *source;
579     assert(c && t && c->protocol && c->protocol->core);
580     
581     if (pa_tagstruct_gets(t, &name) < 0 ||
582         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
583         pa_tagstruct_getu32(t, &source_index) < 0 ||
584         pa_tagstruct_gets(t, &source_name) < 0 ||
585         pa_tagstruct_getu32(t, &maxlength) < 0 ||
586         pa_tagstruct_getu32(t, &fragment_size) < 0 ||
587         !pa_tagstruct_eof(t)) {
588         protocol_error(c);
589         return;
590     }
591
592     if (!c->authorized) {
593         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
594         return;
595     }
596
597     if (source_index != (uint32_t) -1)
598         source = pa_idxset_get_by_index(c->protocol->core->sources, source_index);
599     else
600         source = pa_namereg_get(c->protocol->core, *source_name ? source_name : NULL, PA_NAMEREG_SOURCE, 1);
601
602     if (!source) {
603         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
604         return;
605     }
606     
607     if (!(s = record_stream_new(c, source, &ss, name, maxlength, fragment_size))) {
608         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
609         return;
610     }
611     
612     reply = pa_tagstruct_new(NULL, 0);
613     assert(reply);
614     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
615     pa_tagstruct_putu32(reply, tag);
616     pa_tagstruct_putu32(reply, s->index);
617     assert(s->source_output);
618     pa_tagstruct_putu32(reply, s->source_output->index);
619     pa_pstream_send_tagstruct(c->pstream, reply);
620 }
621
622 static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
623     struct connection *c = userdata;
624     assert(c && t);
625     
626     if (!pa_tagstruct_eof(t)) {
627         protocol_error(c);
628         return;
629     }
630
631     if (!c->authorized) {
632         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
633         return;
634     }
635     
636     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
637     c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
638     pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
639     return;
640 }
641
642 static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
643     struct connection *c = userdata;
644     const void*cookie;
645     assert(c && t);
646
647     if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
648         !pa_tagstruct_eof(t)) {
649         protocol_error(c);
650         return;
651     }
652         
653     if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
654         fprintf(stderr, "protocol-native.c: Denied access to client with invalid authorization key.\n");
655         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
656         return;
657     }
658
659     c->authorized = 1;
660     pa_pstream_send_simple_ack(c->pstream, tag);
661     return;
662 }
663
664 static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
665     struct connection *c = userdata;
666     const char *name;
667     assert(c && t);
668
669     if (pa_tagstruct_gets(t, &name) < 0 ||
670         !pa_tagstruct_eof(t)) {
671         protocol_error(c);
672         return;
673     }
674
675     pa_client_rename(c->client, name);
676     pa_pstream_send_simple_ack(c->pstream, tag);
677     return;
678 }
679
680 static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
681     struct connection *c = userdata;
682     const char *name;
683     uint32_t index = PA_IDXSET_INVALID;
684     assert(c && t);
685
686     if (pa_tagstruct_gets(t, &name) < 0 ||
687         !pa_tagstruct_eof(t)) {
688         protocol_error(c);
689         return;
690     }
691
692     if (!c->authorized) {
693         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
694         return;
695     }
696
697     if (command == PA_COMMAND_LOOKUP_SINK) {
698         struct pa_sink *sink;
699         if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK, 1)))
700             index = sink->index;
701     } else {
702         struct pa_source *source;
703         assert(command == PA_COMMAND_LOOKUP_SOURCE);
704         if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE, 1)))
705             index = source->index;
706     }
707
708     if (index == PA_IDXSET_INVALID)
709         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
710     else {
711         struct pa_tagstruct *reply;
712         reply = pa_tagstruct_new(NULL, 0);
713         assert(reply);
714         pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
715         pa_tagstruct_putu32(reply, tag);
716         pa_tagstruct_putu32(reply, index);
717         pa_pstream_send_tagstruct(c->pstream, reply);
718     }
719 }
720
721 static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
722     struct connection *c = userdata;
723     uint32_t index;
724     struct playback_stream *s;
725     assert(c && t);
726
727     if (pa_tagstruct_getu32(t, &index) < 0 ||
728         !pa_tagstruct_eof(t)) {
729         protocol_error(c);
730         return;
731     }
732
733     if (!c->authorized) {
734         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
735         return;
736     }
737
738     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
739         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
740         return;
741     }
742
743     s->drain_request = 0;
744     
745     if (!pa_memblockq_is_readable(s->memblockq))
746         pa_pstream_send_simple_ack(c->pstream, tag);
747     else {
748         s->drain_request = 1;
749         s->drain_tag = tag;
750     }
751
752
753 static void command_stat(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
754     struct connection *c = userdata;
755     assert(c && t);
756     struct pa_tagstruct *reply;
757
758     if (!pa_tagstruct_eof(t)) {
759         protocol_error(c);
760         return;
761     }
762
763     if (!c->authorized) {
764         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
765         return;
766     }
767
768     reply = pa_tagstruct_new(NULL, 0);
769     assert(reply);
770     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
771     pa_tagstruct_putu32(reply, tag);
772     pa_tagstruct_putu32(reply, pa_memblock_get_count());
773     pa_tagstruct_putu32(reply, pa_memblock_get_total());
774     pa_pstream_send_tagstruct(c->pstream, reply);
775 }
776
777 static void command_get_playback_latency(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
778     struct connection *c = userdata;
779     assert(c && t);
780     struct pa_tagstruct *reply;
781     struct playback_stream *s;
782     uint32_t index, latency;
783
784     if (pa_tagstruct_getu32(t, &index) < 0 ||
785         !pa_tagstruct_eof(t)) {
786         protocol_error(c);
787         return;
788     }
789
790     if (!c->authorized) {
791         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
792         return;
793     }
794
795     if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
796         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
797         return;
798     }
799
800     latency = pa_sink_input_get_latency(s->sink_input);
801     reply = pa_tagstruct_new(NULL, 0);
802     assert(reply);
803     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
804     pa_tagstruct_putu32(reply, tag);
805     pa_tagstruct_putu32(reply, latency);
806     pa_pstream_send_tagstruct(c->pstream, reply);
807 }
808
809 static void command_create_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
810     struct connection *c = userdata;
811     struct upload_stream *s;
812     size_t length;
813     const char *name;
814     struct pa_sample_spec ss;
815     struct pa_tagstruct *reply;
816     assert(c && t && c->protocol && c->protocol->core);
817     
818     if (pa_tagstruct_gets(t, &name) < 0 ||
819         pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
820         pa_tagstruct_getu32(t, &length) < 0 ||
821         !pa_tagstruct_eof(t)) {
822         protocol_error(c);
823         return;
824     }
825
826     if (!c->authorized) {
827         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
828         return;
829     }
830
831     if ((length % pa_frame_size(&ss)) != 0 || length <= 0 || !*name) {
832         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
833         return;
834     }
835     
836     if (!(s = upload_stream_new(c, &ss, name, length))) {
837         pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
838         return;
839     }
840     
841     reply = pa_tagstruct_new(NULL, 0);
842     assert(reply);
843     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
844     pa_tagstruct_putu32(reply, tag);
845     pa_tagstruct_putu32(reply, s->index);
846     pa_pstream_send_tagstruct(c->pstream, reply);
847     
848     reply = pa_tagstruct_new(NULL, 0);
849     assert(reply);
850     pa_tagstruct_putu32(reply, PA_COMMAND_REQUEST);
851     pa_tagstruct_putu32(reply, (uint32_t) -1); /* tag */
852     pa_tagstruct_putu32(reply, s->index);
853     pa_tagstruct_putu32(reply, length);
854     pa_pstream_send_tagstruct(c->pstream, reply);
855 }
856
857 static void command_finish_upload_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
858     struct connection *c = userdata;
859     uint32_t channel;
860     struct upload_stream *s;
861     uint32_t index;
862     assert(c && t);
863     
864     if (pa_tagstruct_getu32(t, &channel) < 0 ||
865         !pa_tagstruct_eof(t)) {
866         protocol_error(c);
867         return;
868     }
869
870     if (!c->authorized) {
871         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
872         return;
873     }
874
875     if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
876         pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
877         return;
878     }
879
880     pa_scache_add_item(c->protocol->core, s->name, &s->sample_spec, &s->memchunk, &index);
881     pa_pstream_send_simple_ack(c->pstream, tag);
882     upload_stream_free(s);
883 }
884
885 static void command_play_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
886     struct connection *c = userdata;
887     uint32_t sink_index, volume;
888     struct pa_sink *sink;
889     const char *name, *sink_name;
890     assert(c && t);
891
892     if (pa_tagstruct_getu32(t, &sink_index) < 0 ||
893         pa_tagstruct_gets(t, &sink_name) < 0 ||
894         pa_tagstruct_getu32(t, &volume) < 0 ||
895         pa_tagstruct_gets(t, &name) < 0 ||
896         !pa_tagstruct_eof(t)) {
897         protocol_error(c);
898         return;
899     }
900     
901     if (!c->authorized) {
902         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
903         return;
904     }
905
906     if (sink_index != (uint32_t) -1)
907         sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
908     else
909         sink = pa_namereg_get(c->protocol->core, *sink_name ? sink_name : NULL, PA_NAMEREG_SINK, 1);
910
911     if (!sink) {
912         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
913         return;
914     }
915
916     if (pa_scache_play_item(c->protocol->core, name, sink, volume) < 0) {
917         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
918         return;
919     }
920
921     pa_pstream_send_simple_ack(c->pstream, tag);
922 }
923
924 static void command_remove_sample(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
925     struct connection *c = userdata;
926     const char *name;
927     assert(c && t);
928
929     if (pa_tagstruct_gets(t, &name) < 0 ||
930         !pa_tagstruct_eof(t)) {
931         protocol_error(c);
932         return;
933     }
934
935     if (!c->authorized) {
936         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
937         return;
938     }
939
940     if (pa_scache_remove_item(c->protocol->core, name) < 0) {
941         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
942         return;
943     }
944
945     pa_pstream_send_simple_ack(c->pstream, tag);
946 }
947
948 static void sink_fill_tagstruct(struct pa_tagstruct *t, struct pa_sink *sink) {
949     assert(t && sink);
950     pa_tagstruct_putu32(t, sink->index);
951     pa_tagstruct_puts(t, sink->name);
952     pa_tagstruct_puts(t, sink->description ? sink->description : "");
953     pa_tagstruct_put_sample_spec(t, &sink->sample_spec);
954     pa_tagstruct_putu32(t, sink->owner ? sink->owner->index : (uint32_t) -1);
955     pa_tagstruct_putu32(t, sink->volume);
956     pa_tagstruct_putu32(t, sink->monitor_source->index);
957     pa_tagstruct_puts(t, sink->monitor_source->name);
958     pa_tagstruct_putu32(t, pa_sink_get_latency(sink));
959 }
960
961 static void source_fill_tagstruct(struct pa_tagstruct *t, struct pa_source *source) {
962     assert(t && source);
963     pa_tagstruct_putu32(t, source->index);
964     pa_tagstruct_puts(t, source->name);
965     pa_tagstruct_puts(t, source->description ? source->description : "");
966     pa_tagstruct_put_sample_spec(t, &source->sample_spec);
967     pa_tagstruct_putu32(t, source->owner ? source->owner->index : (uint32_t) -1);
968     pa_tagstruct_putu32(t, source->monitor_of ? source->monitor_of->index : (uint32_t) -1);
969     pa_tagstruct_puts(t, source->monitor_of ? source->monitor_of->name : "");
970 }
971
972 static void client_fill_tagstruct(struct pa_tagstruct *t, struct pa_client *client) {
973     assert(t && client);
974     pa_tagstruct_putu32(t, client->index);
975     pa_tagstruct_puts(t, client->name);
976     pa_tagstruct_puts(t, client->protocol_name);
977     pa_tagstruct_putu32(t, client->owner ? client->owner->index : (uint32_t) -1);
978 }
979
980 static void module_fill_tagstruct(struct pa_tagstruct *t, struct pa_module *module) {
981     assert(t && module);
982     pa_tagstruct_putu32(t, module->index);
983     pa_tagstruct_puts(t, module->name);
984     pa_tagstruct_puts(t, module->argument ? module->argument : "");
985     pa_tagstruct_putu32(t, module->n_used);
986     pa_tagstruct_putu32(t, module->auto_unload);
987 }
988
989 static void command_get_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
990     struct connection *c = userdata;
991     uint32_t index;
992     struct pa_sink *sink = NULL;
993     struct pa_source *source = NULL;
994     struct pa_client *client = NULL;
995     struct pa_module *module = NULL;
996     const char *name;
997     struct pa_tagstruct *reply;
998     assert(c && t);
999
1000     if (pa_tagstruct_getu32(t, &index) < 0 ||
1001         pa_tagstruct_gets(t, &name) < 0 ||
1002         !pa_tagstruct_eof(t)) {
1003         protocol_error(c);
1004         return;
1005     }
1006     
1007     if (!c->authorized) {
1008         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1009         return;
1010     }
1011
1012     if (command == PA_COMMAND_GET_SINK_INFO) {
1013         if (index != (uint32_t) -1)
1014             sink = pa_idxset_get_by_index(c->protocol->core->sinks, index);
1015         else
1016             sink = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SINK, 1);
1017     } else if (command == PA_COMMAND_GET_SOURCE_INFO) {
1018         if (index != (uint32_t) -1)
1019             source = pa_idxset_get_by_index(c->protocol->core->sources, index);
1020         else
1021             source = pa_namereg_get(c->protocol->core, *name ? name : NULL, PA_NAMEREG_SOURCE, 1);
1022     } else if (command == PA_COMMAND_GET_CLIENT_INFO)
1023         client = pa_idxset_get_by_index(c->protocol->core->clients, index);
1024     else {
1025         assert(command == PA_COMMAND_GET_MODULE_INFO);
1026         module = pa_idxset_get_by_index(c->protocol->core->modules, index);
1027     }
1028     
1029     if (!sink && !source && !client && !module) {
1030         pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
1031         return;
1032     }
1033
1034     reply = pa_tagstruct_new(NULL, 0);
1035     assert(reply);
1036     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1037     pa_tagstruct_putu32(reply, tag); 
1038     if (sink)
1039         sink_fill_tagstruct(reply, sink);
1040     else if (source)
1041         source_fill_tagstruct(reply, source);
1042     else if (client)
1043         client_fill_tagstruct(reply, client);
1044     else
1045         module_fill_tagstruct(reply, module);
1046     pa_pstream_send_tagstruct(c->pstream, reply);
1047 }
1048
1049 static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1050     struct connection *c = userdata;
1051     struct pa_idxset *i;
1052     uint32_t index;
1053     void *p;
1054     struct pa_tagstruct *reply;
1055     assert(c && t);
1056
1057     if (!pa_tagstruct_eof(t)) {
1058         protocol_error(c);
1059         return;
1060     }
1061     
1062     if (!c->authorized) {
1063         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1064         return;
1065     }
1066
1067     reply = pa_tagstruct_new(NULL, 0);
1068     assert(reply);
1069     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1070     pa_tagstruct_putu32(reply, tag);
1071
1072     if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1073         i = c->protocol->core->sinks;
1074     else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1075         i = c->protocol->core->sources;
1076     else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1077         i = c->protocol->core->clients;
1078     else {
1079         assert(command == PA_COMMAND_GET_MODULE_INFO_LIST);
1080         i = c->protocol->core->modules;
1081     }
1082
1083     for (p = pa_idxset_first(i, &index); p; p = pa_idxset_next(i, &index)) {
1084         if (command == PA_COMMAND_GET_SINK_INFO_LIST)
1085             sink_fill_tagstruct(reply, p);
1086         else if (command == PA_COMMAND_GET_SOURCE_INFO_LIST)
1087             source_fill_tagstruct(reply, p);
1088         else if (command == PA_COMMAND_GET_CLIENT_INFO_LIST)
1089             client_fill_tagstruct(reply, p);
1090         else {
1091             assert(command == PA_COMMAND_GET_MODULE_INFO_LIST);
1092             module_fill_tagstruct(reply, p);
1093         }
1094     } 
1095     
1096     pa_pstream_send_tagstruct(c->pstream, reply);
1097 }
1098
1099 static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1100     struct connection *c = userdata;
1101     struct pa_tagstruct *reply;
1102     char txt[256];
1103     assert(c && t);
1104
1105     if (!pa_tagstruct_eof(t)) {
1106         protocol_error(c);
1107         return;
1108     }
1109     
1110     if (!c->authorized) {
1111         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1112         return;
1113     }
1114
1115     reply = pa_tagstruct_new(NULL, 0);
1116     assert(reply);
1117     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1118     pa_tagstruct_putu32(reply, tag);
1119     pa_tagstruct_puts(reply, PACKAGE_NAME);
1120     pa_tagstruct_puts(reply, PACKAGE_VERSION);
1121     pa_tagstruct_puts(reply, pa_get_user_name(txt, sizeof(txt)));
1122     pa_tagstruct_puts(reply, pa_get_host_name(txt, sizeof(txt)));
1123     pa_tagstruct_put_sample_spec(reply, &c->protocol->core->default_sample_spec);
1124     pa_pstream_send_tagstruct(c->pstream, reply);
1125 }
1126
1127 static void subscription_cb(struct pa_core *core, enum pa_subscription_event_type e, uint32_t index, void *userdata) {
1128     struct pa_tagstruct *t;
1129     struct connection *c = userdata;
1130     assert(c && core);
1131
1132     t = pa_tagstruct_new(NULL, 0);
1133     assert(t);
1134     pa_tagstruct_putu32(t, PA_COMMAND_SUBSCRIBE_EVENT);
1135     pa_tagstruct_putu32(t, (uint32_t) -1);
1136     pa_tagstruct_putu32(t, e);
1137     pa_tagstruct_putu32(t, index);
1138     pa_pstream_send_tagstruct(c->pstream, t);
1139 }
1140
1141 static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1142     struct connection *c = userdata;
1143     enum pa_subscription_mask m;
1144     assert(c && t);
1145
1146     if (pa_tagstruct_getu32(t, &m) < 0 ||
1147         !pa_tagstruct_eof(t)) {
1148         protocol_error(c);
1149         return;
1150     }
1151     
1152     if (!c->authorized) {
1153         pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
1154         return;
1155     }
1156
1157     if (c->subscription)
1158         pa_subscription_free(c->subscription);
1159
1160     if (m != 0) {
1161         c->subscription = pa_subscription_new(c->protocol->core, m, subscription_cb, c);
1162         assert(c->subscription);
1163     } else
1164         c->subscription = NULL;
1165
1166     pa_pstream_send_simple_ack(c->pstream, tag);
1167     
1168 }
1169
1170 /*** pstream callbacks ***/
1171
1172 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
1173     struct connection *c = userdata;
1174     assert(p && packet && packet->data && c);
1175
1176     if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
1177         fprintf(stderr, "protocol-native: invalid packet.\n");
1178         connection_free(c);
1179     }
1180 }
1181
1182 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
1183     struct connection *c = userdata;
1184     struct output_stream *stream;
1185     assert(p && chunk && userdata);
1186
1187     if (!(stream = pa_idxset_get_by_index(c->output_streams, channel))) {
1188         fprintf(stderr, "protocol-native: client sent block for invalid stream.\n");
1189         connection_free(c);
1190         return;
1191     }
1192
1193     if (stream->type == PLAYBACK_STREAM) {
1194         struct playback_stream *p = (struct playback_stream*) stream;
1195         if (chunk->length >= p->requested_bytes)
1196             p->requested_bytes = 0;
1197         else
1198             p->requested_bytes -= chunk->length;
1199         
1200         pa_memblockq_push_align(p->memblockq, chunk, delta);
1201         assert(p->sink_input);
1202         pa_sink_notify(p->sink_input->sink);
1203         /*fprintf(stderr, "Recieved %u bytes.\n", chunk->length);*/
1204     } else {
1205         struct upload_stream *u = (struct upload_stream*) stream;
1206         size_t l;
1207         assert(u->type == UPLOAD_STREAM);
1208
1209         if (!u->memchunk.memblock) {
1210             if (u->length == chunk->length) {
1211                 u->memchunk = *chunk;
1212                 pa_memblock_ref(u->memchunk.memblock);
1213                 u->length = 0;
1214                 fprintf(stderr, "COPY\n");
1215             } else {
1216                 u->memchunk.memblock = pa_memblock_new(u->length);
1217                 u->memchunk.index = u->memchunk.length = 0;
1218             }
1219         }
1220         
1221         assert(u->memchunk.memblock);
1222         
1223         l = u->length; 
1224         if (l > chunk->length)
1225             l = chunk->length;
1226
1227         if (l > 0) {
1228             memcpy(u->memchunk.memblock->data + u->memchunk.index + u->memchunk.length, chunk->memblock->data+chunk->index, l);
1229             u->memchunk.length += l;
1230             u->length -= l;
1231         }
1232     }
1233 }
1234
1235 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
1236     struct connection *c = userdata;
1237     assert(p && c);
1238     connection_free(c);
1239
1240     fprintf(stderr, "protocol-native: connection died.\n");
1241 }
1242
1243
1244 static void pstream_drain_callback(struct pa_pstream *p, void *userdata) {
1245     struct connection *c = userdata;
1246     assert(p && c);
1247
1248     send_memblock(c);
1249 }
1250
1251 /*** client callbacks ***/
1252
1253 static void client_kill_cb(struct pa_client *c) {
1254     assert(c && c->userdata);
1255     connection_free(c->userdata);
1256 }
1257
1258 /*** socket server callbacks ***/
1259
1260 static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) {
1261     struct pa_protocol_native *p = userdata;
1262     struct connection *c;
1263     assert(s && io && p);
1264
1265     c = pa_xmalloc(sizeof(struct connection));
1266     c->authorized = p->public;
1267     c->protocol = p;
1268     assert(p->core);
1269     c->client = pa_client_new(p->core, "NATIVE", "Client");
1270     assert(c->client);
1271     c->client->kill = client_kill_cb;
1272     c->client->userdata = c;
1273     c->client->owner = p->module;
1274     
1275     c->pstream = pa_pstream_new(p->core->mainloop, io);
1276     assert(c->pstream);
1277
1278     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
1279     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
1280     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
1281     pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
1282
1283     c->pdispatch = pa_pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
1284     assert(c->pdispatch);
1285
1286     c->record_streams = pa_idxset_new(NULL, NULL);
1287     c->output_streams = pa_idxset_new(NULL, NULL);
1288     assert(c->record_streams && c->output_streams);
1289
1290     c->rrobin_index = PA_IDXSET_INVALID;
1291     c->subscription = NULL;
1292
1293     pa_idxset_put(p->connections, c, NULL);
1294 }
1295
1296 /*** module entry points ***/
1297
1298 struct pa_protocol_native* pa_protocol_native_new(struct pa_core *core, struct pa_socket_server *server, struct pa_module *m, struct pa_modargs *ma) {
1299     struct pa_protocol_native *p;
1300     uint32_t public;
1301     assert(core && server && ma);
1302
1303     if (pa_modargs_get_value_u32(ma, "public", &public) < 0) {
1304         fprintf(stderr, __FILE__": public= expects numeric argument.\n");
1305         return NULL;
1306     }
1307     
1308     p = pa_xmalloc(sizeof(struct pa_protocol_native));
1309
1310     if (pa_authkey_load_from_home(pa_modargs_get_value(ma, "cookie", PA_NATIVE_COOKIE_FILE), p->auth_cookie, sizeof(p->auth_cookie)) < 0) {
1311         pa_xfree(p);
1312         return NULL;
1313     }
1314
1315     p->module = m;
1316     p->public = public;
1317     p->server = server;
1318     p->core = core;
1319     p->connections = pa_idxset_new(NULL, NULL);
1320     assert(p->connections);
1321
1322     pa_socket_server_set_callback(p->server, on_connection, p);
1323     
1324     return p;
1325 }
1326
1327 void pa_protocol_native_free(struct pa_protocol_native *p) {
1328     struct connection *c;
1329     assert(p);
1330
1331     while ((c = pa_idxset_first(p->connections, NULL)))
1332         connection_free(c);
1333     pa_idxset_free(p->connections, NULL, NULL);
1334     pa_socket_server_unref(p->server);
1335     pa_xfree(p);
1336 }