make a symbol in module-ptorocol-stub static
[profile/ivi/pulseaudio.git] / src / polyp.c
1 #include <stdio.h>
2 #include <assert.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <sys/types.h>
6 #include <sys/socket.h>
7 #include <netdb.h>
8
9 #include "polyp.h"
10 #include "protocol-native-spec.h"
11 #include "pdispatch.h"
12 #include "pstream.h"
13 #include "dynarray.h"
14 #include "socket-client.h"
15 #include "pstream-util.h"
16 #include "authkey.h"
17 #include "util.h"
18
19 #define DEFAULT_MAXLENGTH 204800
20 #define DEFAULT_TLENGTH 10240
21 #define DEFAULT_PREBUF 4096
22 #define DEFAULT_MINREQ 1024
23 #define DEFAULT_FRAGSIZE 1024
24
25 #define DEFAULT_TIMEOUT (5*60)
26 #define DEFAULT_SERVER "/tmp/polypaudio/native"
27 #define DEFAULT_PORT "4713"
28
29 struct pa_context {
30     char *name;
31     struct pa_mainloop_api* mainloop;
32     struct pa_socket_client *client;
33     struct pa_pstream *pstream;
34     struct pa_pdispatch *pdispatch;
35     struct pa_dynarray *record_streams, *playback_streams;
36     struct pa_stream *first_stream;
37     uint32_t ctag;
38     uint32_t error;
39     enum {
40         CONTEXT_UNCONNECTED,
41         CONTEXT_CONNECTING,
42         CONTEXT_AUTHORIZING,
43         CONTEXT_SETTING_NAME,
44         CONTEXT_READY,
45         CONTEXT_DEAD
46     } state;
47
48     void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
49     void *connect_complete_userdata;
50
51     void (*drain_complete_callback)(struct pa_context*c, void *userdata);
52     void *drain_complete_userdata;
53     
54     void (*die_callback)(struct pa_context*c, void *userdata);
55     void *die_userdata;
56
57     void (*stat_callback)(struct pa_context*c, uint32_t count, uint32_t total, void *userdata);
58     void *stat_userdata;
59     
60     uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
61 };
62
63 struct pa_stream {
64     struct pa_context *context;
65     struct pa_stream *next, *previous;
66
67     char *name;
68     struct pa_buffer_attr buffer_attr;
69     struct pa_sample_spec sample_spec;
70     uint32_t device_index;
71     uint32_t channel;
72     int channel_valid;
73     enum pa_stream_direction direction;
74     
75     enum { STREAM_LOOKING_UP, STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
76     uint32_t requested_bytes;
77
78     void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
79     void *read_userdata;
80
81     void (*write_callback)(struct pa_stream *p, size_t length, void *userdata);
82     void *write_userdata;
83     
84     void (*create_complete_callback)(struct pa_stream *s, int success, void *userdata);
85     void *create_complete_userdata;
86
87     void (*drain_complete_callback)(struct pa_stream *s, void *userdata);
88     void *drain_complete_userdata;
89     
90     void (*die_callback)(struct pa_stream*c, void *userdata);
91     void *die_userdata;
92
93     void (*get_latency_callback)(struct pa_stream*c, uint32_t latency, void *userdata);
94     void *get_latency_userdata;
95 };
96
97 static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
98 static void command_playback_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
99
100 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
101     [PA_COMMAND_ERROR] = { NULL },
102     [PA_COMMAND_REPLY] = { NULL },
103     [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL },
104     [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL },
105     [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
106     [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
107     [PA_COMMAND_EXIT] = { NULL },
108     [PA_COMMAND_REQUEST] = { command_request },
109     [PA_COMMAND_PLAYBACK_STREAM_KILLED] = { command_playback_stream_killed },
110     [PA_COMMAND_RECORD_STREAM_KILLED] = { command_playback_stream_killed },
111 };
112
113 struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
114     struct pa_context *c;
115     assert(mainloop && name);
116     
117     c = malloc(sizeof(struct pa_context));
118     assert(c);
119     c->name = strdup(name);
120     c->mainloop = mainloop;
121     c->client = NULL;
122     c->pstream = NULL;
123     c->pdispatch = NULL;
124     c->playback_streams = pa_dynarray_new();
125     assert(c->playback_streams);
126     c->record_streams = pa_dynarray_new();
127     assert(c->record_streams);
128     c->first_stream = NULL;
129     c->error = PA_ERROR_OK;
130     c->state = CONTEXT_UNCONNECTED;
131     c->ctag = 0;
132
133     c->connect_complete_callback = NULL;
134     c->connect_complete_userdata = NULL;
135
136     c->drain_complete_callback = NULL;
137     c->drain_complete_userdata = NULL;
138
139     c->die_callback = NULL;
140     c->die_userdata = NULL;
141
142     c->stat_callback = NULL;
143     c->stat_userdata = NULL;
144
145     pa_check_for_sigpipe();
146     return c;
147 }
148
149 void pa_context_free(struct pa_context *c) {
150     assert(c);
151
152     while (c->first_stream)
153         pa_stream_free(c->first_stream);
154     
155     if (c->client)
156         pa_socket_client_free(c->client);
157     if (c->pdispatch)
158         pa_pdispatch_free(c->pdispatch);
159     if (c->pstream)
160         pa_pstream_free(c->pstream);
161     if (c->record_streams)
162         pa_dynarray_free(c->record_streams, NULL, NULL);
163     if (c->playback_streams)
164         pa_dynarray_free(c->playback_streams, NULL, NULL);
165         
166     free(c->name);
167     free(c);
168 }
169
170 static void stream_dead(struct pa_stream *s) {
171     assert(s);
172     
173     if (s->state == STREAM_DEAD)
174         return;
175     
176     if (s->state == STREAM_READY) {
177         s->state = STREAM_DEAD;
178         if (s->die_callback)
179             s->die_callback(s, s->die_userdata);
180     } else
181         s->state = STREAM_DEAD;
182 }
183
184 static void context_dead(struct pa_context *c) {
185     struct pa_stream *s;
186     assert(c);
187     
188     if (c->state == CONTEXT_DEAD)
189         return;
190
191     if (c->pdispatch)
192         pa_pdispatch_free(c->pdispatch);
193     c->pdispatch = NULL;
194     
195     if (c->pstream)
196         pa_pstream_free(c->pstream);
197     c->pstream = NULL;
198     
199     if (c->client)
200         pa_socket_client_free(c->client);
201     c->client = NULL;
202     
203     for (s = c->first_stream; s; s = s->next)
204         stream_dead(s);
205
206     if (c->state == CONTEXT_READY) {
207         c->state = CONTEXT_DEAD;
208         if (c->die_callback)
209             c->die_callback(c, c->die_userdata);
210     } else
211         c->state = CONTEXT_DEAD;
212 }
213
214 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
215     struct pa_context *c = userdata;
216     assert(p && c);
217     c->error = PA_ERROR_CONNECTIONTERMINATED;
218     context_dead(c);
219 }
220
221 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
222     struct pa_context *c = userdata;
223     assert(p && packet && c);
224
225     if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
226         fprintf(stderr, "polyp.c: invalid packet.\n");
227         c->error = PA_ERROR_PROTOCOL;
228         context_dead(c);
229     }
230 }
231
232 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
233     struct pa_context *c = userdata;
234     struct pa_stream *s;
235     assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
236
237     if (!(s = pa_dynarray_get(c->record_streams, channel)))
238         return;
239
240     if (s->read_callback)
241         s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
242 }
243
244 static int handle_error(struct pa_context *c, uint32_t command, struct pa_tagstruct *t) {
245     assert(c && t);
246     
247     if (command == PA_COMMAND_ERROR) {
248         if (pa_tagstruct_getu32(t, &c->error) < 0) {
249             c->error = PA_ERROR_PROTOCOL;
250             return -1;
251         }
252
253         return 0;
254     }
255
256     c->error = (command == PA_COMMAND_TIMEOUT) ? PA_ERROR_TIMEOUT : PA_ERROR_INTERNAL;
257     return -1;
258 }
259
260 static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
261     struct pa_context *c = userdata;
262     assert(pd && c && (c->state == CONTEXT_AUTHORIZING || c->state == CONTEXT_SETTING_NAME));
263
264     if (command != PA_COMMAND_REPLY) {
265         handle_error(c, command, t);
266         context_dead(c);
267
268         if (c->connect_complete_callback)
269             c->connect_complete_callback(c, 0, c->connect_complete_userdata);
270         
271         return;
272     }
273
274     if (c->state == CONTEXT_AUTHORIZING) {
275         struct pa_tagstruct *t;
276         c->state = CONTEXT_SETTING_NAME;
277         t = pa_tagstruct_new(NULL, 0);
278         assert(t);
279         pa_tagstruct_putu32(t, PA_COMMAND_SET_NAME);
280         pa_tagstruct_putu32(t, tag = c->ctag++);
281         pa_tagstruct_puts(t, c->name);
282         pa_pstream_send_tagstruct(c->pstream, t);
283         pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
284     } else {
285         assert(c->state == CONTEXT_SETTING_NAME);
286         
287         c->state = CONTEXT_READY;
288
289         if (c->connect_complete_callback) 
290             c->connect_complete_callback(c, 1, c->connect_complete_userdata);
291     }
292
293     return;
294 }
295
296 static void on_connection(struct pa_socket_client *client, struct pa_iochannel*io, void *userdata) {
297     struct pa_context *c = userdata;
298     struct pa_tagstruct *t;
299     uint32_t tag;
300     assert(client && c && c->state == CONTEXT_CONNECTING);
301
302     pa_socket_client_free(client);
303     c->client = NULL;
304
305     if (!io) {
306         c->error = PA_ERROR_CONNECTIONREFUSED;
307         context_dead(c);
308
309         if (c->connect_complete_callback)
310             c->connect_complete_callback(c, 0, c->connect_complete_userdata);
311
312         return;
313     }
314     
315     c->pstream = pa_pstream_new(c->mainloop, io);
316     assert(c->pstream);
317     pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
318     pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
319     pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
320     
321     c->pdispatch = pa_pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX);
322     assert(c->pdispatch);
323
324     t = pa_tagstruct_new(NULL, 0);
325     assert(t);
326     pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
327     pa_tagstruct_putu32(t, tag = c->ctag++);
328     pa_tagstruct_put_arbitrary(t, c->auth_cookie, sizeof(c->auth_cookie));
329     pa_pstream_send_tagstruct(c->pstream, t);
330     pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
331     c->state = CONTEXT_AUTHORIZING;
332 }
333
334 static struct sockaddr *resolve_server(const char *server, size_t *len) {
335     struct sockaddr *sa;
336     struct addrinfo hints, *result = NULL;
337     char *port;
338     assert(server && len);
339
340     if ((port = strrchr(server, ':')))
341         port++;
342     if (!port)
343         port = DEFAULT_PORT;
344
345     memset(&hints, 0, sizeof(hints));
346     hints.ai_family = PF_UNSPEC;
347     hints.ai_socktype = SOCK_STREAM;
348     hints.ai_protocol = 0;
349
350     if (getaddrinfo(server, port, &hints, &result) != 0)
351         return NULL;
352     assert(result);
353     
354     sa = malloc(*len = result->ai_addrlen);
355     assert(sa);
356     memcpy(sa, result->ai_addr, *len);
357
358     freeaddrinfo(result);
359     
360     return sa;
361     
362 }
363
364 int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) {
365     assert(c && c->state == CONTEXT_UNCONNECTED);
366
367     if (pa_authkey_load_from_home(PA_NATIVE_COOKIE_FILE, c->auth_cookie, sizeof(c->auth_cookie)) < 0) {
368         c->error = PA_ERROR_AUTHKEY;
369         return -1;
370     }
371
372     if (!server)
373         if (!(server = getenv("POLYP_SERVER")))
374             server = DEFAULT_SERVER;
375
376     assert(!c->client);
377     
378     if (*server == '/') {
379         if (!(c->client = pa_socket_client_new_unix(c->mainloop, server))) {
380             c->error = PA_ERROR_CONNECTIONREFUSED;
381             return -1;
382         }
383     } else {
384         struct sockaddr* sa;
385         size_t sa_len;
386
387         if (!(sa = resolve_server(server, &sa_len))) {
388             c->error = PA_ERROR_INVALIDSERVER;
389             return -1;
390         }
391
392         c->client = pa_socket_client_new_sockaddr(c->mainloop, sa, sa_len);
393         free(sa);
394
395         if (!c->client) {
396             c->error = PA_ERROR_CONNECTIONREFUSED;
397             return -1;
398         }
399     }
400
401     c->connect_complete_callback = complete;
402     c->connect_complete_userdata = userdata;
403     
404     pa_socket_client_set_callback(c->client, on_connection, c);
405     c->state = CONTEXT_CONNECTING;
406
407     return 0;
408 }
409
410 int pa_context_is_dead(struct pa_context *c) {
411     assert(c);
412     return c->state == CONTEXT_DEAD;
413 }
414
415 int pa_context_is_ready(struct pa_context *c) {
416     assert(c);
417     return c->state == CONTEXT_READY;
418 }
419
420 int pa_context_errno(struct pa_context *c) {
421     assert(c);
422     return c->error;
423 }
424
425 void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) {
426     assert(c);
427     c->die_callback = cb;
428     c->die_userdata = userdata;
429 }
430
431 static void command_playback_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
432     struct pa_context *c = userdata;
433     struct pa_stream *s;
434     uint32_t channel;
435     assert(pd && (command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED) && t && c);
436
437     if (pa_tagstruct_getu32(t, &channel) < 0 ||
438         !pa_tagstruct_eof(t)) {
439         c->error = PA_ERROR_PROTOCOL;
440         context_dead(c);
441         return;
442     }
443     
444     if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
445         return;
446
447     c->error = PA_ERROR_KILLED;
448     stream_dead(s);
449 }
450
451 static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
452     struct pa_stream *s;
453     struct pa_context *c = userdata;
454     uint32_t bytes, channel;
455     assert(pd && command == PA_COMMAND_REQUEST && t && c);
456
457     if (pa_tagstruct_getu32(t, &channel) < 0 ||
458         pa_tagstruct_getu32(t, &bytes) < 0 ||
459         !pa_tagstruct_eof(t)) {
460         c->error = PA_ERROR_PROTOCOL;
461         context_dead(c);
462         return;
463     }
464     
465     if (!(s = pa_dynarray_get(c->playback_streams, channel)))
466         return;
467
468     if (s->state != STREAM_READY)
469         return;
470     
471     s->requested_bytes += bytes;
472     
473     if (s->requested_bytes && s->write_callback)
474         s->write_callback(s, s->requested_bytes, s->write_userdata);
475 }
476
477 static void create_stream_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
478     struct pa_stream *s = userdata;
479     assert(pd && s && s->state == STREAM_CREATING);
480
481     if (command != PA_COMMAND_REPLY) {
482         if (handle_error(s->context, command, t) < 0) {
483             context_dead(s->context);
484             return;
485         }
486
487         stream_dead(s);
488         if (s->create_complete_callback)
489             s->create_complete_callback(s, 0, s->create_complete_userdata);
490
491         return;
492     }
493
494     if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
495         pa_tagstruct_getu32(t, &s->device_index) < 0 ||
496         !pa_tagstruct_eof(t)) {
497         s->context->error = PA_ERROR_PROTOCOL;
498         context_dead(s->context);
499         return;
500     }
501
502     s->channel_valid = 1;
503     pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams :  s->context->record_streams, s->channel, s);
504     
505     s->state = STREAM_READY;
506     if (s->create_complete_callback)
507         s->create_complete_callback(s, 1, s->create_complete_userdata);
508 }
509
510 static void create_stream(struct pa_stream *s, uint32_t tdev_index) {
511     struct pa_tagstruct *t;
512     uint32_t tag;
513     assert(s);
514
515     s->state = STREAM_CREATING;
516     
517     t = pa_tagstruct_new(NULL, 0);
518     assert(t);
519     
520     pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
521     pa_tagstruct_putu32(t, tag = s->context->ctag++);
522     pa_tagstruct_puts(t, s->name);
523     pa_tagstruct_put_sample_spec(t, &s->sample_spec);
524     pa_tagstruct_putu32(t, tdev_index);
525     pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
526     if (s->direction == PA_STREAM_PLAYBACK) {
527         pa_tagstruct_putu32(t, s->buffer_attr.tlength);
528         pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
529         pa_tagstruct_putu32(t, s->buffer_attr.minreq);
530     } else
531         pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
532
533     pa_pstream_send_tagstruct(s->context->pstream, t);
534     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, s);
535 }
536
537 static void lookup_device_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
538     struct pa_stream *s = userdata;
539     uint32_t tdev;
540     assert(pd && s && s->state == STREAM_LOOKING_UP);
541
542     if (command != PA_COMMAND_REPLY) {
543         if (handle_error(s->context, command, t) < 0) {
544             context_dead(s->context);
545             return;
546         }
547
548         stream_dead(s);
549         if (s->create_complete_callback)
550             s->create_complete_callback(s, 0, s->create_complete_userdata);
551         return;
552     }
553
554     if (pa_tagstruct_getu32(t, &tdev) < 0 ||
555         !pa_tagstruct_eof(t)) {
556         s->context->error = PA_ERROR_PROTOCOL;
557         context_dead(s->context);
558         return;
559     }
560     
561     create_stream(s, tdev);
562 }
563
564 static void lookup_device(struct pa_stream *s, const char *tdev) {
565     struct pa_tagstruct *t;
566     uint32_t tag;
567     assert(s);
568     
569     s->state = STREAM_LOOKING_UP;
570
571     t = pa_tagstruct_new(NULL, 0);
572     assert(t);
573
574     pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_LOOKUP_SINK : PA_COMMAND_LOOKUP_SOURCE);
575     pa_tagstruct_putu32(t, tag = s->context->ctag++);
576     pa_tagstruct_puts(t, tdev);
577
578     pa_pstream_send_tagstruct(s->context->pstream, t);
579     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, lookup_device_callback, s);
580 }
581
582 struct pa_stream* pa_stream_new(
583     struct pa_context *c,
584     enum pa_stream_direction dir,
585     const char *dev,
586     const char *name,
587     const struct pa_sample_spec *ss,
588     const struct pa_buffer_attr *attr,
589     void (*complete) (struct pa_stream*s, int success, void *userdata),
590     void *userdata) {
591     
592     struct pa_stream *s;
593
594     assert(c && name && ss && c->state == CONTEXT_READY);
595     
596     s = malloc(sizeof(struct pa_stream));
597     assert(s);
598     s->context = c;
599
600     s->read_callback = NULL;
601     s->read_userdata = NULL;
602     s->write_callback = NULL;
603     s->write_userdata = NULL;
604     s->die_callback = NULL;
605     s->die_userdata = NULL;
606     s->create_complete_callback = complete;
607     s->create_complete_userdata = NULL;
608     s->get_latency_callback = NULL;
609     s->get_latency_userdata = NULL;
610
611     s->name = strdup(name);
612     s->state = STREAM_CREATING;
613     s->requested_bytes = 0;
614     s->channel = 0;
615     s->channel_valid = 0;
616     s->device_index = (uint32_t) -1;
617     s->direction = dir;
618     s->sample_spec = *ss;
619     if (attr)
620         s->buffer_attr = *attr;
621     else {
622         s->buffer_attr.maxlength = DEFAULT_MAXLENGTH;
623         s->buffer_attr.tlength = DEFAULT_TLENGTH;
624         s->buffer_attr.prebuf = DEFAULT_PREBUF;
625         s->buffer_attr.minreq = DEFAULT_MINREQ;
626         s->buffer_attr.fragsize = DEFAULT_FRAGSIZE;
627     }
628
629     s->next = c->first_stream;
630     if (s->next)
631         s->next->previous = s;
632     s->previous = NULL;
633     c->first_stream = s;
634
635     if (dev)
636         lookup_device(s, dev);
637     else
638         create_stream(s, (uint32_t) -1);
639     
640     return s;
641 }
642
643 void pa_stream_free(struct pa_stream *s) {
644     assert(s && s->context);
645
646     if (s->context->pdispatch) 
647         pa_pdispatch_unregister_reply(s->context->pdispatch, s);
648     
649     free(s->name);
650
651     if (s->channel_valid && s->context->state == CONTEXT_READY) {
652         struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0);
653         assert(t);
654     
655         pa_tagstruct_putu32(t, PA_COMMAND_DELETE_PLAYBACK_STREAM);
656         pa_tagstruct_putu32(t, s->context->ctag++);
657         pa_tagstruct_putu32(t, s->channel);
658         pa_pstream_send_tagstruct(s->context->pstream, t);
659     }
660     
661     if (s->channel_valid)
662         pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
663
664     if (s->next)
665         s->next->previous = s->previous;
666     if (s->previous)
667         s->previous->next = s->next;
668     else
669         s->context->first_stream = s->next;
670     
671     free(s);
672 }
673
674 void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) {
675     assert(s && cb);
676     s->write_callback = cb;
677     s->write_userdata = userdata;
678 }
679
680 void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
681     struct pa_memchunk chunk;
682     assert(s && s->context && data && length && s->state == STREAM_READY);
683
684     chunk.memblock = pa_memblock_new(length);
685     assert(chunk.memblock && chunk.memblock->data);
686     memcpy(chunk.memblock->data, data, length);
687     chunk.index = 0;
688     chunk.length = length;
689
690     pa_pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
691     pa_memblock_unref(chunk.memblock);
692
693     /*fprintf(stderr, "Sent %u bytes\n", length);*/
694     
695     if (length < s->requested_bytes)
696         s->requested_bytes -= length;
697     else
698         s->requested_bytes = 0;
699 }
700
701 size_t pa_stream_writable_size(struct pa_stream *s) {
702     assert(s && s->state == STREAM_READY);
703     return s->requested_bytes;
704 }
705
706 void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
707     assert(s && cb);
708     s->read_callback = cb;
709     s->read_userdata = userdata;
710 }
711
712 int pa_stream_is_dead(struct pa_stream *s) {
713     return s->state == STREAM_DEAD;
714 }
715
716 int pa_stream_is_ready(struct pa_stream*s) {
717     return s->state == STREAM_READY;
718 }
719
720 void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) {
721     assert(s);
722     s->die_callback = cb;
723     s->die_userdata = userdata;
724 }
725
726 int pa_context_is_pending(struct pa_context *c) {
727     assert(c);
728
729     if (c->state != CONTEXT_READY)
730         return 0;
731
732     return pa_pstream_is_pending(c->pstream) || pa_pdispatch_is_pending(c->pdispatch);
733 }
734
735 struct pa_context* pa_stream_get_context(struct pa_stream *p) {
736     assert(p);
737     return p->context;
738 }
739
740 static void set_dispatch_callbacks(struct pa_context *c);
741
742 static void pdispatch_drain_callback(struct pa_pdispatch*pd, void *userdata) {
743     set_dispatch_callbacks(userdata);
744 }
745
746 static void pstream_drain_callback(struct pa_pstream *s, void *userdata) {
747     set_dispatch_callbacks(userdata);
748 }
749
750 static void set_dispatch_callbacks(struct pa_context *c) {
751     assert(c && c->state == CONTEXT_READY);
752
753     pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
754     pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
755     
756     if (pa_pdispatch_is_pending(c->pdispatch)) {
757         pa_pdispatch_set_drain_callback(c->pdispatch, pdispatch_drain_callback, c);
758         return;
759     }
760
761     if (pa_pstream_is_pending(c->pstream)) {
762         pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
763         return;
764     }
765
766     assert(c->drain_complete_callback);
767     c->drain_complete_callback(c, c->drain_complete_userdata);
768 }
769
770 int pa_context_drain(
771     struct pa_context *c, 
772     void (*complete) (struct pa_context*c, void *userdata),
773     void *userdata) {
774
775     assert(c && c->state == CONTEXT_READY);
776
777     if (complete == NULL) {
778         c->drain_complete_callback = NULL;
779         pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
780         pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
781         return 0;
782     }
783     
784     if (!pa_context_is_pending(c))
785         return -1;
786     
787     c->drain_complete_callback = complete;
788     c->drain_complete_userdata = userdata;
789
790     set_dispatch_callbacks(c);
791
792     return 0;
793 }
794
795 static void stream_drain_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
796     struct pa_stream *s = userdata;
797     assert(pd && s);
798     
799     if (command != PA_COMMAND_REPLY) {
800         if (handle_error(s->context, command, t) < 0) {
801             context_dead(s->context);
802             return;
803         }
804
805         stream_dead(s);
806         return;
807     }
808
809     if (s->state != STREAM_READY)
810         return;
811
812     if (!pa_tagstruct_eof(t)) {
813         s->context->error = PA_ERROR_PROTOCOL;
814         context_dead(s->context);
815         return;
816     }
817
818     if (s->drain_complete_callback) {
819         void (*temp) (struct pa_stream*s, void *userdata) = s->drain_complete_callback;
820         s->drain_complete_callback = NULL;
821         temp(s, s->drain_complete_userdata);
822     }
823 }
824
825
826 void pa_stream_drain(struct pa_stream *s, void (*complete) (struct pa_stream*s, void *userdata), void *userdata) {
827     struct pa_tagstruct *t;
828     uint32_t tag;
829     assert(s && s->state == STREAM_READY);
830
831     if (!complete) {
832         s->drain_complete_callback = NULL;
833         return;
834     }
835
836     s->drain_complete_callback = complete;
837     s->drain_complete_userdata = userdata;
838
839     t = pa_tagstruct_new(NULL, 0);
840     assert(t);
841     
842     pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM);
843     pa_tagstruct_putu32(t, tag = s->context->ctag++);
844     pa_tagstruct_putu32(t, s->channel);
845     pa_pstream_send_tagstruct(s->context->pstream, t);
846     pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_drain_callback, s);
847 }
848
849 void pa_context_exit(struct pa_context *c) {
850     struct pa_tagstruct *t;
851     t = pa_tagstruct_new(NULL, 0);
852     assert(t);
853     pa_tagstruct_putu32(t, PA_COMMAND_EXIT);
854     pa_tagstruct_putu32(t, c->ctag++);
855     pa_pstream_send_tagstruct(c->pstream, t);
856 }
857
858 static void context_stat_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
859     struct pa_context *c = userdata;
860     uint32_t total, count;
861     assert(pd && c);
862
863     if (command != PA_COMMAND_REPLY) {
864         if (handle_error(c, command, t) < 0) {
865             context_dead(c);
866             return;
867         }
868
869         if (c->stat_callback)
870             c->stat_callback(c, (uint32_t) -1, (uint32_t) -1, c->stat_userdata);
871         return;
872     }
873
874     if (pa_tagstruct_getu32(t, &count) < 0 ||
875         pa_tagstruct_getu32(t, &total) < 0 ||
876         !pa_tagstruct_eof(t)) {
877         c->error = PA_ERROR_PROTOCOL;
878         context_dead(c);
879         return;
880     }
881
882     if (c->stat_callback)
883         c->stat_callback(c, count, total, c->stat_userdata);
884 }
885
886 void pa_context_stat(struct pa_context *c, void (*cb)(struct pa_context *c, uint32_t count, uint32_t total, void *userdata), void *userdata) {
887     uint32_t tag;
888     struct pa_tagstruct *t;
889
890     c->stat_callback = cb;
891     c->stat_userdata = userdata;
892
893     if (cb == NULL)
894         return;
895     
896     t = pa_tagstruct_new(NULL, 0);
897     assert(t);
898     pa_tagstruct_putu32(t, PA_COMMAND_STAT);
899     pa_tagstruct_putu32(t, tag = c->ctag++);
900     pa_pstream_send_tagstruct(c->pstream, t);
901     pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, context_stat_callback, c);
902 }
903
904 static void stream_get_latency_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
905     struct pa_stream *s = userdata;
906     uint32_t latency;
907     assert(pd && s);
908
909     if (command != PA_COMMAND_REPLY) {
910         if (handle_error(s->context, command, t) < 0) {
911             context_dead(s->context);
912             return;
913         }
914
915         if (s->get_latency_callback)
916             s->get_latency_callback(s, (uint32_t) -1, s->get_latency_userdata);
917         return;
918     }
919
920     if (pa_tagstruct_getu32(t, &latency) < 0 ||
921         !pa_tagstruct_eof(t)) {
922         s->context->error = PA_ERROR_PROTOCOL;
923         context_dead(s->context);
924         return;
925     }
926
927     if (s->get_latency_callback)
928         s->get_latency_callback(s, latency, s->get_latency_userdata);
929 }
930
931 void pa_stream_get_latency(struct pa_stream *p, void (*cb)(struct pa_stream *p, uint32_t latency, void *userdata), void *userdata) {
932     uint32_t tag;
933     struct pa_tagstruct *t;
934
935     p->get_latency_callback = cb;
936     p->get_latency_userdata = userdata;
937
938     if (cb == NULL)
939         return;
940     
941     t = pa_tagstruct_new(NULL, 0);
942     assert(t);
943     pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
944     pa_tagstruct_putu32(t, tag = p->context->ctag++);
945     pa_tagstruct_putu32(t, p->channel);
946     pa_pstream_send_tagstruct(p->context->pstream, t);
947     pa_pdispatch_register_reply(p->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, p);
948 }